[PATCH v3 6/7] Add merger for tuples streams (C part)
Alexander Turenko
alexander.turenko at tarantool.org
Wed Apr 10 18:21:24 MSK 2019
Needed for #3276.
---
src/box/CMakeLists.txt | 1 +
src/box/merger.c | 464 +++++++++++++++++++++++++++++++++++++++
src/box/merger.h | 180 +++++++++++++++
test/unit/CMakeLists.txt | 3 +
test/unit/merger.result | 71 ++++++
test/unit/merger.test.c | 301 +++++++++++++++++++++++++
6 files changed, 1020 insertions(+)
create mode 100644 src/box/merger.c
create mode 100644 src/box/merger.h
create mode 100644 test/unit/merger.result
create mode 100644 test/unit/merger.test.c
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 7fbbc7803..d1251c326 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -121,6 +121,7 @@ add_library(box STATIC
execute.c
wal.c
call.c
+ merger.c
${lua_sources}
lua/init.c
lua/call.c
diff --git a/src/box/merger.c b/src/box/merger.c
new file mode 100644
index 000000000..83f628758
--- /dev/null
+++ b/src/box/merger.c
@@ -0,0 +1,464 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "box/merger.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#define HEAP_FORWARD_DECLARATION
+#include "salad/heap.h"
+
+#include "trivia/util.h" /* unlikely() */
+#include "diag.h" /* diag_set() */
+#include "say.h" /* panic() */
+#include "box/tuple.h" /* box_tuple_unref() */
+#include "box/tuple_format.h" /* box_tuple_format_*(),
+ tuple_format_id() */
+#include "box/key_def.h" /* box_key_def_*(),
+ box_tuple_compare() */
+
+/* {{{ Create, delete, ref, unref a source and a context */
+
+enum { MERGER_SOURCE_REF_MAX = INT_MAX };
+enum { MERGER_CONTEXT_REF_MAX = INT_MAX };
+
+void
+merger_source_ref(struct merger_source *source)
+{
+ if (unlikely(source->refs >= MERGER_SOURCE_REF_MAX))
+ panic("Merger source reference counter overflow");
+ ++source->refs;
+}
+
+void
+merger_source_unref(struct merger_source *source)
+{
+ assert(source->refs - 1 >= 0);
+ if (--source->refs == 0)
+ source->vtab->delete(source);
+}
+
+void
+merger_source_new(struct merger_source *source, struct merger_source_vtab *vtab)
+{
+ source->vtab = vtab;
+ source->refs = 0;
+}
+
+void
+merger_context_delete(struct merger_context *ctx)
+{
+ box_key_def_delete(ctx->key_def);
+ box_tuple_format_unref(ctx->format);
+ free(ctx);
+}
+
+void
+merger_context_ref(struct merger_context *ctx)
+{
+ if (unlikely(ctx->refs >= MERGER_CONTEXT_REF_MAX))
+ panic("Merger context reference counter overflow");
+ ++ctx->refs;
+}
+
+void
+merger_context_unref(struct merger_context *ctx)
+{
+ assert(ctx->refs - 1 >= 0);
+ if (--ctx->refs == 0)
+ merger_context_delete(ctx);
+}
+
+struct merger_context *
+merger_context_new(const struct key_def *key_def)
+{
+ struct merger_context *ctx = (struct merger_context *) malloc(
+ sizeof(struct merger_context));
+ if (ctx == NULL) {
+ diag_set(OutOfMemory, sizeof(struct merger_context), "malloc",
+ "merger_context");
+ return NULL;
+ }
+
+ /*
+ * We need to copy key_def, because a key_def from the
+ * parameter can be collected before merger_context end
+ * of life (say, by LuaJIT GC if the key_def comes from
+ * Lua).
+ */
+ ctx->key_def = key_def_dup(key_def);
+ if (ctx->key_def == NULL) {
+ free(ctx);
+ return NULL;
+ }
+
+ ctx->format = box_tuple_format_new(&ctx->key_def, 1);
+ if (ctx->format == NULL) {
+ box_key_def_delete(ctx->key_def);
+ free(ctx);
+ return NULL;
+ }
+
+ ctx->refs = 0;
+
+ return ctx;
+}
+
+/* }}} */
+
+/* {{{ Merger */
+
+/**
+ * Holds a source to fetch next tuples and a last fetched tuple to
+ * compare the node against other nodes.
+ *
+ * The main reason why this structure is separated from a merger
+ * source is that a heap node can not be a member of several
+ * heaps.
+ *
+ * The second reason is that it allows to incapsulate all heap
+ * related logic inside this compilation unit, without any trails
+ * in externally visible structures.
+ */
+struct merger_heap_node {
+ /* A source of tuples. */
+ struct merger_source *source;
+ /*
+ * A last fetched (refcounted) tuple to compare against
+ * other nodes.
+ */
+ struct tuple *tuple;
+ /* An anchor to make the structure a merger heap node. */
+ struct heap_node heap_node_anchor;
+};
+
+static bool
+merger_source_less(const heap_t *heap, const struct merger_heap_node *left,
+ const struct merger_heap_node *right);
+#define HEAP_NAME merger_heap
+#define HEAP_LESS merger_source_less
+#define heap_value_t struct merger_heap_node
+#define heap_value_attr heap_node_anchor
+#include "salad/heap.h"
+#undef HEAP_NAME
+#undef HEAP_LESS
+#undef heap_value_t
+#undef heap_value_attr
+
+/**
+ * Holds a heap, an immutable context, parameters of a merge
+ * process and utility fields.
+ */
+struct merger {
+ /* A merger is a source. */
+ struct merger_source base;
+ /*
+ * Whether a merge process started.
+ *
+ * The merger postpones charging of heap nodes until a
+ * first output tuple is acquired.
+ */
+ bool started;
+ /* A merger context. */
+ struct merger_context *ctx;
+ /*
+ * A heap of sources (of nodes that contains a source to
+ * be exact).
+ */
+ heap_t heap;
+ /* An array of heap nodes. */
+ uint32_t nodes_count;
+ struct merger_heap_node *nodes;
+ /* Ascending (false) / descending (true) order. */
+ bool reverse;
+};
+
+/* Helpers */
+
+/**
+ * Data comparing function to construct a heap of sources.
+ */
+static bool
+merger_source_less(const heap_t *heap, const struct merger_heap_node *left,
+ const struct merger_heap_node *right)
+{
+ if (left->tuple == NULL && right->tuple == NULL)
+ return false;
+ if (left->tuple == NULL)
+ return false;
+ if (right->tuple == NULL)
+ return true;
+ struct merger *merger = container_of(heap, struct merger, heap);
+ int cmp = box_tuple_compare(left->tuple, right->tuple,
+ merger->ctx->key_def);
+ return merger->reverse ? cmp >= 0 : cmp < 0;
+}
+
+/**
+ * How much more memory the heap will reserve at the next grow.
+ *
+ * See HEAP(reserve)() function in lib/salad/heap.h.
+ */
+static size_t
+heap_next_grow_size(const heap_t *heap)
+{
+ heap_off_t heap_capacity_diff = heap->capacity == 0 ?
+ HEAP_INITIAL_CAPACITY : heap->capacity;
+ return heap_capacity_diff * sizeof(struct heap_node *);
+}
+
+/**
+ * Initialize a new merger heap node.
+ */
+static void
+merger_heap_node_new(struct merger_heap_node *node,
+ struct merger_source *source)
+{
+ node->source = source;
+ merger_source_ref(node->source);
+ node->tuple = NULL;
+ heap_node_create(&node->heap_node_anchor);
+}
+
+/**
+ * Free a merger heap node.
+ */
+static void
+merger_heap_node_delete(struct merger_heap_node *node)
+{
+ merger_source_unref(node->source);
+ if (node->tuple != NULL)
+ box_tuple_unref(node->tuple);
+}
+
+/**
+ * The helper to add a new heap node to a merger heap.
+ *
+ * Return -1 at an error and set a diag.
+ *
+ * Otherwise store a next tuple in node->tuple, add the node to
+ * merger->heap and return 0.
+ */
+static int
+merger_add_heap_node(struct merger *merger, struct merger_heap_node *node)
+{
+ struct tuple *tuple = NULL;
+
+ /* Acquire a next tuple. */
+ struct merger_source *source = node->source;
+ if (source->vtab->next(source, merger->ctx->format, &tuple) != 0)
+ return -1;
+
+ /* Don't add an empty source to a heap. */
+ if (tuple == NULL)
+ return 0;
+
+ node->tuple = tuple;
+
+ /* Add a node to a heap. */
+ if (merger_heap_insert(&merger->heap, node)) {
+ diag_set(OutOfMemory, heap_next_grow_size(&merger->heap),
+ "malloc", "merger->heap");
+ return -1;
+ }
+
+ return 0;
+}
+
+/* Virtual methods declarations */
+
+static void
+merger_delete(struct merger_source *base);
+static int
+merger_next(struct merger_source *base, box_tuple_format_t *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+struct merger_source *
+merger_new(struct merger_context *ctx)
+{
+ static struct merger_source_vtab merger_vtab = {
+ .delete = merger_delete,
+ .next = merger_next,
+ };
+
+ struct merger *merger = (struct merger *) malloc(sizeof(struct merger));
+ if (merger == NULL) {
+ diag_set(OutOfMemory, sizeof(struct merger), "malloc",
+ "merger");
+ return NULL;
+ }
+
+ merger_source_new(&merger->base, &merger_vtab);
+
+ merger->started = false;
+ merger->ctx = ctx;
+ merger_context_ref(merger->ctx);
+ merger_heap_create(&merger->heap);
+ merger->nodes_count = 0;
+ merger->nodes = NULL;
+ merger->reverse = false;
+
+ return &merger->base;
+}
+
+int
+merger_set_sources(struct merger_source *base, struct merger_source **sources,
+ uint32_t sources_count)
+{
+ struct merger *merger = container_of(base, struct merger, base);
+
+ /* Ensure we don't leak old nodes. */
+ assert(merger->nodes_count == 0);
+ assert(merger->nodes == NULL);
+
+ const size_t nodes_size =
+ sources_count * sizeof(struct merger_heap_node);
+ struct merger_heap_node *nodes = (struct merger_heap_node *) malloc(
+ nodes_size);
+ if (nodes == NULL) {
+ diag_set(OutOfMemory, nodes_size, "malloc",
+ "merger heap nodes");
+ return -1;
+ }
+
+ for (uint32_t i = 0; i < sources_count; ++i)
+ merger_heap_node_new(&nodes[i], sources[i]);
+
+ merger->nodes_count = sources_count;
+ merger->nodes = nodes;
+ return 0;
+}
+
+void
+merger_set_reverse(struct merger_source *base, bool reverse)
+{
+ struct merger *merger = container_of(base, struct merger, base);
+
+ merger->reverse = reverse;
+}
+
+/* Virtual methods */
+
+static void
+merger_delete(struct merger_source *base)
+{
+ struct merger *merger = container_of(base, struct merger, base);
+
+ merger_context_unref(merger->ctx);
+ merger_heap_destroy(&merger->heap);
+
+ for (uint32_t i = 0; i < merger->nodes_count; ++i)
+ merger_heap_node_delete(&merger->nodes[i]);
+
+ if (merger->nodes != NULL)
+ free(merger->nodes);
+
+ free(merger);
+}
+
+static int
+merger_next(struct merger_source *base, box_tuple_format_t *format,
+ struct tuple **out)
+{
+ struct merger *merger = container_of(base, struct merger, base);
+
+ /*
+ * Fetch a first tuple for each source and add all heap
+ * nodes to a merger heap.
+ */
+ if (!merger->started) {
+ for (uint32_t i = 0; i < merger->nodes_count; ++i) {
+ struct merger_heap_node *node = &merger->nodes[i];
+ if (merger_add_heap_node(merger, node) != 0)
+ return -1;
+ }
+ merger->started = true;
+ }
+
+ /* Get a next tuple. */
+ struct merger_heap_node *node = merger_heap_top(&merger->heap);
+ if (node == NULL) {
+ *out = NULL;
+ return 0;
+ }
+ struct tuple *tuple = node->tuple;
+ assert(tuple != NULL);
+
+ /*
+ * The tuples are stored in merger->ctx->format for
+ * fast comparisons, but we should return tuples in a
+ * requested format.
+ */
+ uint32_t id_stored = tuple_format_id(merger->ctx->format);
+ assert(tuple->format_id == id_stored);
+ if (format == NULL)
+ format = merger->ctx->format;
+ uint32_t id_requested = tuple_format_id(format);
+ if (id_stored != id_requested) {
+ const char *tuple_beg = tuple_data(tuple);
+ const char *tuple_end = tuple_beg + tuple->bsize;
+ tuple = box_tuple_new(format, tuple_beg, tuple_end);
+ if (tuple == NULL)
+ return -1;
+ box_tuple_ref(tuple);
+ /*
+ * The node->tuple pointer will be rewritten below
+ * and in this branch it will not be returned. So
+ * we unreference it.
+ */
+ box_tuple_unref(node->tuple);
+ }
+
+ /*
+ * Note: An old node->tuple pointer will be written to
+ * *out as refcounted tuple or is already unreferenced
+ * above, so we don't unreference it here.
+ */
+ struct merger_source *source = node->source;
+ if (source->vtab->next(source, merger->ctx->format, &node->tuple) != 0)
+ return -1;
+
+ /* Update a heap. */
+ if (node->tuple == NULL)
+ merger_heap_delete(&merger->heap, node);
+ else
+ merger_heap_update(&merger->heap, node);
+
+ *out = tuple;
+ return 0;
+}
+
+/* }}} */
diff --git a/src/box/merger.h b/src/box/merger.h
new file mode 100644
index 000000000..2323dd7d7
--- /dev/null
+++ b/src/box/merger.h
@@ -0,0 +1,180 @@
+#ifndef TARANTOOL_BOX_MERGER_H_INCLUDED
+#define TARANTOOL_BOX_MERGER_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <stdbool.h>
+#include <stdint.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/* {{{ Structures */
+
+struct tuple;
+struct key_def;
+struct tuple_format;
+typedef struct tuple_format box_tuple_format_t;
+
+struct merger_source;
+struct merger_context;
+
+struct merger_source_vtab {
+ /**
+ * Free a merger source.
+ *
+ * Don't call it directly, use merger_source_unref()
+ * instead.
+ */
+ void (*delete)(struct merger_source *base);
+ /**
+ * Get a next tuple (refcounted) from a source.
+ *
+ * When format is NULL it means that it does not matter
+ * for a caller in which format a tuple will be.
+ *
+ * Return 0 when successfully fetched a tuple or NULL. In
+ * case of an error set a diag and return -1.
+ */
+ int (*next)(struct merger_source *base, box_tuple_format_t *format,
+ struct tuple **out);
+};
+
+/**
+ * Base (abstract) structure to represent a merger source.
+ *
+ * The structure does not hold any resources.
+ */
+struct merger_source {
+ /* Source-specific methods. */
+ struct merger_source_vtab *vtab;
+ /* Reference counter. */
+ int refs;
+};
+
+/**
+ * Holds immutable parameters of a merger.
+ */
+struct merger_context {
+ struct key_def *key_def;
+ box_tuple_format_t *format;
+ /* Reference counter. */
+ int refs;
+};
+
+/* }}} */
+
+/* {{{ Create, delete, ref, unref a source and a context */
+
+/**
+ * Increment a merger source reference counter.
+ */
+void
+merger_source_ref(struct merger_source *source);
+
+/**
+ * Decrement a merger source reference counter. When it has
+ * reached zero, free the source (call delete() virtual method).
+ */
+void
+merger_source_unref(struct merger_source *source);
+
+/**
+ * Initialize a base merger source structure.
+ */
+void
+merger_source_new(struct merger_source *source,
+ struct merger_source_vtab *vtab);
+
+/**
+ * Free a merger context.
+ */
+void
+merger_context_delete(struct merger_context *ctx);
+
+/**
+ * Increment a merger context reference counter.
+ */
+void
+merger_context_ref(struct merger_context *ctx);
+
+/**
+ * Decrement a merger context reference counter. When it has
+ * reached zero, free the context.
+ */
+void
+merger_context_unref(struct merger_context *ctx);
+
+/**
+ * Create a new merger context.
+ *
+ * A returned merger context is NOT reference counted.
+ *
+ * Return NULL and set a diag in case of an error.
+ */
+struct merger_context *
+merger_context_new(const struct key_def *key_def);
+
+/* }}} */
+
+/* {{{ Merger */
+
+/**
+ * Create a new merger w/o sources.
+ *
+ * Return NULL and set a diag in case of an error.
+ */
+struct merger_source *
+merger_new(struct merger_context *ctx);
+
+/**
+ * Set sources for a merger.
+ *
+ * Return 0 at success. Return -1 at an error and set a diag.
+ */
+int
+merger_set_sources(struct merger_source *base, struct merger_source **sources,
+ uint32_t sources_count);
+
+/**
+ * Set reverse flag for a merger.
+ */
+void
+merger_set_reverse(struct merger_source *base, bool reverse);
+
+/* }}} */
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_BOX_MERGER_H_INCLUDED */
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 64739ab09..afdcbebf6 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -224,3 +224,6 @@ target_link_libraries(swim.test unit swim)
add_executable(swim_proto.test swim_proto.c swim_test_transport.c swim_test_ev.c
swim_test_utils.c ${PROJECT_SOURCE_DIR}/src/version.c)
target_link_libraries(swim_proto.test unit swim)
+
+add_executable(merger.test merger.test.c)
+target_link_libraries(merger.test unit core box)
diff --git a/test/unit/merger.result b/test/unit/merger.result
new file mode 100644
index 000000000..11399e0bc
--- /dev/null
+++ b/test/unit/merger.result
@@ -0,0 +1,71 @@
+1..4
+ *** test_basic ***
+ 1..9
+ *** test_array_source ***
+ ok 1 - array source next() (any format): tuple != NULL
+ ok 2 - array source next() (any format): skip tuple format id check
+ ok 3 - array source next() (any format): check tuple size
+ ok 4 - array source next() (any format): check tuple data
+ ok 5 - array source next() (any format): tuple != NULL
+ ok 6 - array source next() (any format): skip tuple format id check
+ ok 7 - array source next() (any format): check tuple size
+ ok 8 - array source next() (any format): check tuple data
+ ok 9 - array source is empty (any format)
+ *** test_array_source: done ***
+ok 1 - subtests
+ 1..9
+ *** test_array_source ***
+ ok 1 - array source next() (user's format): tuple != NULL
+ ok 2 - array source next() (user's format): check tuple format id
+ ok 3 - array source next() (user's format): check tuple size
+ ok 4 - array source next() (user's format): check tuple data
+ ok 5 - array source next() (user's format): tuple != NULL
+ ok 6 - array source next() (user's format): check tuple format id
+ ok 7 - array source next() (user's format): check tuple size
+ ok 8 - array source next() (user's format): check tuple data
+ ok 9 - array source is empty (user's format)
+ *** test_array_source: done ***
+ok 2 - subtests
+ 1..17
+ *** test_merger ***
+ ok 1 - merger next() (any format): tuple != NULL
+ ok 2 - merger next() (any format): skip tuple format id check
+ ok 3 - merger next() (any format): check tuple size
+ ok 4 - merger next() (any format): check tuple data
+ ok 5 - merger next() (any format): tuple != NULL
+ ok 6 - merger next() (any format): skip tuple format id check
+ ok 7 - merger next() (any format): check tuple size
+ ok 8 - merger next() (any format): check tuple data
+ ok 9 - merger next() (any format): tuple != NULL
+ ok 10 - merger next() (any format): skip tuple format id check
+ ok 11 - merger next() (any format): check tuple size
+ ok 12 - merger next() (any format): check tuple data
+ ok 13 - merger next() (any format): tuple != NULL
+ ok 14 - merger next() (any format): skip tuple format id check
+ ok 15 - merger next() (any format): check tuple size
+ ok 16 - merger next() (any format): check tuple data
+ ok 17 - merger is empty (any format)
+ *** test_merger: done ***
+ok 3 - subtests
+ 1..17
+ *** test_merger ***
+ ok 1 - merger next() (user's format): tuple != NULL
+ ok 2 - merger next() (user's format): check tuple format id
+ ok 3 - merger next() (user's format): check tuple size
+ ok 4 - merger next() (user's format): check tuple data
+ ok 5 - merger next() (user's format): tuple != NULL
+ ok 6 - merger next() (user's format): check tuple format id
+ ok 7 - merger next() (user's format): check tuple size
+ ok 8 - merger next() (user's format): check tuple data
+ ok 9 - merger next() (user's format): tuple != NULL
+ ok 10 - merger next() (user's format): check tuple format id
+ ok 11 - merger next() (user's format): check tuple size
+ ok 12 - merger next() (user's format): check tuple data
+ ok 13 - merger next() (user's format): tuple != NULL
+ ok 14 - merger next() (user's format): check tuple format id
+ ok 15 - merger next() (user's format): check tuple size
+ ok 16 - merger next() (user's format): check tuple data
+ ok 17 - merger is empty (user's format)
+ *** test_merger: done ***
+ok 4 - subtests
+ *** test_basic: done ***
diff --git a/test/unit/merger.test.c b/test/unit/merger.test.c
new file mode 100644
index 000000000..0a25d8f04
--- /dev/null
+++ b/test/unit/merger.test.c
@@ -0,0 +1,301 @@
+#include "unit.h" /* plan, header, footer, is, ok */
+#include "memory.h" /* memory_init() */
+#include "fiber.h" /* fiber_init() */
+#include "box/tuple.h" /* tuple_init(), box_tuple_*(),
+ tuple_*() */
+#include "box/tuple_format.h" /* box_tuple_format_default(),
+ tuple_format_id() */
+#include "box/key_def.h" /* key_def_new(),
+ key_def_delete() */
+#include "box/merger.h" /* merger_*() */
+
+/* {{{ Array merger source */
+
+struct merger_source_array {
+ struct merger_source base;
+ uint32_t tuples_count;
+ struct tuple **tuples;
+ uint32_t cur;
+};
+
+/* Virtual methods declarations */
+
+static void
+merger_source_array_delete(struct merger_source *base);
+static int
+merger_source_array_next(struct merger_source *base, box_tuple_format_t *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+static struct merger_source *
+merger_source_array_new(bool even)
+{
+ static struct merger_source_vtab merger_source_array_vtab = {
+ .delete = merger_source_array_delete,
+ .next = merger_source_array_next,
+ };
+
+ struct merger_source_array *source =
+ (struct merger_source_array *) malloc(
+ sizeof(struct merger_source_array));
+ assert(source != NULL);
+
+ merger_source_new(&source->base, &merger_source_array_vtab);
+
+ uint32_t tuple_size = 2;
+ const uint32_t tuples_count = 2;
+ /* {1}, {3} */
+ static const char *data_odd[] = {"\x91\x01", "\x91\x03"};
+ /* {2}, {4} */
+ static const char *data_even[] = {"\x91\x02", "\x91\x04"};
+ const char **data = even ? data_even : data_odd;
+ source->tuples = (struct tuple **) malloc(
+ tuples_count * sizeof(struct tuple *));
+ assert(source->tuples != NULL);
+ box_tuple_format_t *format = box_tuple_format_default();
+ for (uint32_t i = 0; i < tuples_count; ++i) {
+ const char *end = data[i] + tuple_size;
+ source->tuples[i] = box_tuple_new(format, data[i], end);
+ box_tuple_ref(source->tuples[i]);
+ }
+ source->tuples_count = tuples_count;
+ source->cur = 0;
+
+ return &source->base;
+}
+
+/* Virtual methods */
+
+static void
+merger_source_array_delete(struct merger_source *base)
+{
+ struct merger_source_array *source = container_of(base,
+ struct merger_source_array, base);
+
+ for (uint32_t i = 0; i < source->tuples_count; ++i)
+ box_tuple_unref(source->tuples[i]);
+
+ free(source->tuples);
+ free(source);
+}
+
+static int
+merger_source_array_next(struct merger_source *base, box_tuple_format_t *format,
+ struct tuple **out)
+{
+ struct merger_source_array *source = container_of(base,
+ struct merger_source_array, base);
+
+ if (source->cur == source->tuples_count) {
+ *out = NULL;
+ return 0;
+ }
+
+ struct tuple *tuple = source->tuples[source->cur];
+
+ box_tuple_format_t *default_format = box_tuple_format_default();
+ uint32_t id_stored = tuple_format_id(default_format);
+ assert(tuple->format_id == id_stored);
+ if (format == NULL)
+ format = default_format;
+ uint32_t id_requested = tuple_format_id(format);
+ if (id_stored != id_requested) {
+ const char *tuple_beg = tuple_data(tuple);
+ const char *tuple_end = tuple_beg + tuple->bsize;
+ tuple = box_tuple_new(format, tuple_beg, tuple_end);
+ assert(tuple != NULL);
+ }
+
+ assert(tuple != NULL);
+ box_tuple_ref(tuple);
+ *out = tuple;
+ ++source->cur;
+ return 0;
+}
+
+/* }}} */
+
+static struct key_part_def key_part_unsigned = {
+ .fieldno = 0,
+ .type = FIELD_TYPE_UNSIGNED,
+ .coll_id = COLL_NONE,
+ .is_nullable = false,
+ .nullable_action = ON_CONFLICT_ACTION_DEFAULT,
+ .sort_order = SORT_ORDER_ASC,
+ .path = NULL,
+};
+
+static struct key_part_def key_part_integer = {
+ .fieldno = 0,
+ .type = FIELD_TYPE_INTEGER,
+ .coll_id = COLL_NONE,
+ .is_nullable = false,
+ .nullable_action = ON_CONFLICT_ACTION_DEFAULT,
+ .sort_order = SORT_ORDER_ASC,
+ .path = NULL,
+};
+
+uint32_t
+min_u32(uint32_t a, uint32_t b)
+{
+ return a < b ? a : b;
+}
+
+void
+check_tuple(struct tuple *tuple, box_tuple_format_t *format,
+ const char *exp_data, uint32_t exp_data_len, const char *case_name)
+{
+ uint32_t size;
+ const char *data = tuple_data_range(tuple, &size);
+
+ ok(tuple != NULL, "%s: tuple != NULL", case_name);
+ if (format == NULL) {
+ ok(true, "%s: skip tuple format id check", case_name);
+ } else {
+ is(tuple->format_id, tuple_format_id(format),
+ "%s: check tuple format id", case_name);
+ }
+ is(size, exp_data_len, "%s: check tuple size", case_name);
+ ok(!strncmp(data, exp_data, min_u32(size, exp_data_len)),
+ "%s: check tuple data", case_name);
+}
+
+/**
+ * Check array source itself (just in case).
+ */
+int
+test_array_source(box_tuple_format_t *format)
+{
+ plan(9);
+ header();
+
+ /* {1}, {3} */
+ const uint32_t exp_tuple_size = 2;
+ const uint32_t exp_tuples_count = 2;
+ static const char *exp_tuples_data[] = {"\x91\x01", "\x91\x03"};
+
+ struct merger_source *source = merger_source_array_new(false);
+ assert(source != NULL);
+ merger_source_ref(source);
+
+ struct tuple *tuple = NULL;
+ const char *msg = format == NULL ?
+ "array source next() (any format)" :
+ "array source next() (user's format)";
+ for (uint32_t i = 0; i < exp_tuples_count; ++i) {
+ int rc = source->vtab->next(source, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size,
+ msg);
+ box_tuple_unref(tuple);
+ }
+ int rc = source->vtab->next(source, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ is(tuple, NULL, format == NULL ?
+ "array source is empty (any format)" :
+ "array source is empty (user's format)");
+
+ merger_source_unref(source);
+
+ footer();
+ return check_plan();
+}
+
+int
+test_merger(box_tuple_format_t *format)
+{
+ plan(17);
+ header();
+
+ /* {1}, {2}, {3}, {4} */
+ const uint32_t exp_tuple_size = 2;
+ const uint32_t exp_tuples_count = 4;
+ static const char *exp_tuples_data[] = {
+ "\x91\x01", "\x91\x02", "\x91\x03", "\x91\x04",
+ };
+
+ const uint32_t sources_count = 2;
+ struct merger_source *sources[] = {
+ merger_source_array_new(false),
+ merger_source_array_new(true),
+ };
+ merger_source_ref(sources[0]);
+ merger_source_ref(sources[1]);
+
+ struct key_def *key_def = key_def_new(&key_part_unsigned, 1);
+ struct merger_context *ctx = merger_context_new(key_def);
+ merger_context_ref(ctx);
+ key_def_delete(key_def);
+ struct merger_source *merger = merger_new(ctx);
+ merger_set_sources(merger, sources, sources_count);
+ merger_set_reverse(merger, false);
+ merger_source_ref(merger);
+
+ struct tuple *tuple = NULL;
+ const char *msg = format == NULL ?
+ "merger next() (any format)" :
+ "merger next() (user's format)";
+ for (uint32_t i = 0; i < exp_tuples_count; ++i) {
+ int rc = merger->vtab->next(merger, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size,
+ msg);
+ box_tuple_unref(tuple);
+ }
+ int rc = merger->vtab->next(merger, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ is(tuple, NULL, format == NULL ?
+ "merger is empty (any format)" :
+ "merger is empty (user's format)");
+
+ merger_source_unref(merger);
+ merger_context_unref(ctx);
+ merger_source_unref(sources[0]);
+ merger_source_unref(sources[1]);
+
+ footer();
+ return check_plan();
+}
+
+int
+test_basic()
+{
+ plan(4);
+ header();
+
+ struct key_def *key_def = key_def_new(&key_part_integer, 1);
+ box_tuple_format_t *format = box_tuple_format_new(&key_def, 1);
+ assert(format != NULL);
+
+ test_array_source(NULL);
+ test_array_source(format);
+ test_merger(NULL);
+ test_merger(format);
+
+ key_def_delete(key_def);
+ box_tuple_format_unref(format);
+
+ footer();
+ return check_plan();
+}
+
+int
+main()
+{
+ memory_init();
+ fiber_init(fiber_c_invoke);
+ tuple_init(NULL);
+
+ int rc = test_basic();
+
+ tuple_free();
+ fiber_free();
+ memory_free();
+
+ return rc;
+}
--
2.20.1
More information about the Tarantool-patches
mailing list