[PATCH v3 6/7] Add merger for tuples streams (C part)

Alexander Turenko alexander.turenko at tarantool.org
Wed Apr 10 18:21:24 MSK 2019


Needed for #3276.
---
 src/box/CMakeLists.txt   |   1 +
 src/box/merger.c         | 464 +++++++++++++++++++++++++++++++++++++++
 src/box/merger.h         | 180 +++++++++++++++
 test/unit/CMakeLists.txt |   3 +
 test/unit/merger.result  |  71 ++++++
 test/unit/merger.test.c  | 301 +++++++++++++++++++++++++
 6 files changed, 1020 insertions(+)
 create mode 100644 src/box/merger.c
 create mode 100644 src/box/merger.h
 create mode 100644 test/unit/merger.result
 create mode 100644 test/unit/merger.test.c

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 7fbbc7803..d1251c326 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -121,6 +121,7 @@ add_library(box STATIC
     execute.c
     wal.c
     call.c
+    merger.c
     ${lua_sources}
     lua/init.c
     lua/call.c
diff --git a/src/box/merger.c b/src/box/merger.c
new file mode 100644
index 000000000..83f628758
--- /dev/null
+++ b/src/box/merger.c
@@ -0,0 +1,464 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "box/merger.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#define HEAP_FORWARD_DECLARATION
+#include "salad/heap.h"
+
+#include "trivia/util.h"      /* unlikely() */
+#include "diag.h"             /* diag_set() */
+#include "say.h"              /* panic() */
+#include "box/tuple.h"        /* box_tuple_unref() */
+#include "box/tuple_format.h" /* box_tuple_format_*(),
+				 tuple_format_id() */
+#include "box/key_def.h"      /* box_key_def_*(),
+				 box_tuple_compare() */
+
+/* {{{ Create, delete, ref, unref a source and a context */
+
+enum { MERGER_SOURCE_REF_MAX = INT_MAX };
+enum { MERGER_CONTEXT_REF_MAX = INT_MAX };
+
+void
+merger_source_ref(struct merger_source *source)
+{
+	if (unlikely(source->refs >= MERGER_SOURCE_REF_MAX))
+		panic("Merger source reference counter overflow");
+	++source->refs;
+}
+
+void
+merger_source_unref(struct merger_source *source)
+{
+	assert(source->refs - 1 >= 0);
+	if (--source->refs == 0)
+		source->vtab->delete(source);
+}
+
+void
+merger_source_new(struct merger_source *source, struct merger_source_vtab *vtab)
+{
+	source->vtab = vtab;
+	source->refs = 0;
+}
+
+void
+merger_context_delete(struct merger_context *ctx)
+{
+	box_key_def_delete(ctx->key_def);
+	box_tuple_format_unref(ctx->format);
+	free(ctx);
+}
+
+void
+merger_context_ref(struct merger_context *ctx)
+{
+	if (unlikely(ctx->refs >= MERGER_CONTEXT_REF_MAX))
+		panic("Merger context reference counter overflow");
+	++ctx->refs;
+}
+
+void
+merger_context_unref(struct merger_context *ctx)
+{
+	assert(ctx->refs - 1 >= 0);
+	if (--ctx->refs == 0)
+		merger_context_delete(ctx);
+}
+
+struct merger_context *
+merger_context_new(const struct key_def *key_def)
+{
+	struct merger_context *ctx = (struct merger_context *) malloc(
+		sizeof(struct merger_context));
+	if (ctx == NULL) {
+		diag_set(OutOfMemory, sizeof(struct merger_context), "malloc",
+			 "merger_context");
+		return NULL;
+	}
+
+	/*
+	 * We need to copy key_def, because a key_def from the
+	 * parameter can be collected before merger_context end
+	 * of life (say, by LuaJIT GC if the key_def comes from
+	 * Lua).
+	 */
+	ctx->key_def = key_def_dup(key_def);
+	if (ctx->key_def == NULL) {
+		free(ctx);
+		return NULL;
+	}
+
+	ctx->format = box_tuple_format_new(&ctx->key_def, 1);
+	if (ctx->format == NULL) {
+		box_key_def_delete(ctx->key_def);
+		free(ctx);
+		return NULL;
+	}
+
+	ctx->refs = 0;
+
+	return ctx;
+}
+
+/* }}} */
+
+/* {{{ Merger */
+
+/**
+ * Holds a source to fetch next tuples and a last fetched tuple to
+ * compare the node against other nodes.
+ *
+ * The main reason why this structure is separated from a merger
+ * source is that a heap node can not be a member of several
+ * heaps.
+ *
+ * The second reason is that it allows to incapsulate all heap
+ * related logic inside this compilation unit, without any trails
+ * in externally visible structures.
+ */
+struct merger_heap_node {
+	/* A source of tuples. */
+	struct merger_source *source;
+	/*
+	 * A last fetched (refcounted) tuple to compare against
+	 * other nodes.
+	 */
+	struct tuple *tuple;
+	/* An anchor to make the structure a merger heap node. */
+	struct heap_node heap_node_anchor;
+};
+
+static bool
+merger_source_less(const heap_t *heap, const struct merger_heap_node *left,
+		   const struct merger_heap_node *right);
+#define HEAP_NAME merger_heap
+#define HEAP_LESS merger_source_less
+#define heap_value_t struct merger_heap_node
+#define heap_value_attr heap_node_anchor
+#include "salad/heap.h"
+#undef HEAP_NAME
+#undef HEAP_LESS
+#undef heap_value_t
+#undef heap_value_attr
+
+/**
+ * Holds a heap, an immutable context, parameters of a merge
+ * process and utility fields.
+ */
+struct merger {
+	/* A merger is a source. */
+	struct merger_source base;
+	/*
+	 * Whether a merge process started.
+	 *
+	 * The merger postpones charging of heap nodes until a
+	 * first output tuple is acquired.
+	 */
+	bool started;
+	/* A merger context. */
+	struct merger_context *ctx;
+	/*
+	 * A heap of sources (of nodes that contains a source to
+	 * be exact).
+	 */
+	heap_t heap;
+	/* An array of heap nodes. */
+	uint32_t nodes_count;
+	struct merger_heap_node *nodes;
+	/* Ascending (false) / descending (true) order. */
+	bool reverse;
+};
+
+/* Helpers */
+
+/**
+ * Data comparing function to construct a heap of sources.
+ */
+static bool
+merger_source_less(const heap_t *heap, const struct merger_heap_node *left,
+		   const struct merger_heap_node *right)
+{
+	if (left->tuple == NULL && right->tuple == NULL)
+		return false;
+	if (left->tuple == NULL)
+		return false;
+	if (right->tuple == NULL)
+		return true;
+	struct merger *merger = container_of(heap, struct merger, heap);
+	int cmp = box_tuple_compare(left->tuple, right->tuple,
+				    merger->ctx->key_def);
+	return merger->reverse ? cmp >= 0 : cmp < 0;
+}
+
+/**
+ * How much more memory the heap will reserve at the next grow.
+ *
+ * See HEAP(reserve)() function in lib/salad/heap.h.
+ */
+static size_t
+heap_next_grow_size(const heap_t *heap)
+{
+	heap_off_t heap_capacity_diff =	heap->capacity == 0 ?
+		HEAP_INITIAL_CAPACITY : heap->capacity;
+	return heap_capacity_diff * sizeof(struct heap_node *);
+}
+
+/**
+ * Initialize a new merger heap node.
+ */
+static void
+merger_heap_node_new(struct merger_heap_node *node,
+		     struct merger_source *source)
+{
+	node->source = source;
+	merger_source_ref(node->source);
+	node->tuple = NULL;
+	heap_node_create(&node->heap_node_anchor);
+}
+
+/**
+ * Free a merger heap node.
+ */
+static void
+merger_heap_node_delete(struct merger_heap_node *node)
+{
+	merger_source_unref(node->source);
+	if (node->tuple != NULL)
+		box_tuple_unref(node->tuple);
+}
+
+/**
+ * The helper to add a new heap node to a merger heap.
+ *
+ * Return -1 at an error and set a diag.
+ *
+ * Otherwise store a next tuple in node->tuple, add the node to
+ * merger->heap and return 0.
+ */
+static int
+merger_add_heap_node(struct merger *merger, struct merger_heap_node *node)
+{
+	struct tuple *tuple = NULL;
+
+	/* Acquire a next tuple. */
+	struct merger_source *source = node->source;
+	if (source->vtab->next(source, merger->ctx->format, &tuple) != 0)
+		return -1;
+
+	/* Don't add an empty source to a heap. */
+	if (tuple == NULL)
+		return 0;
+
+	node->tuple = tuple;
+
+	/* Add a node to a heap. */
+	if (merger_heap_insert(&merger->heap, node)) {
+		diag_set(OutOfMemory, heap_next_grow_size(&merger->heap),
+			 "malloc", "merger->heap");
+		return -1;
+	}
+
+	return 0;
+}
+
+/* Virtual methods declarations */
+
+static void
+merger_delete(struct merger_source *base);
+static int
+merger_next(struct merger_source *base, box_tuple_format_t *format,
+	    struct tuple **out);
+
+/* Non-virtual methods */
+
+struct merger_source *
+merger_new(struct merger_context *ctx)
+{
+	static struct merger_source_vtab merger_vtab = {
+		.delete = merger_delete,
+		.next = merger_next,
+	};
+
+	struct merger *merger = (struct merger *) malloc(sizeof(struct merger));
+	if (merger == NULL) {
+		diag_set(OutOfMemory, sizeof(struct merger), "malloc",
+			 "merger");
+		return NULL;
+	}
+
+	merger_source_new(&merger->base, &merger_vtab);
+
+	merger->started = false;
+	merger->ctx = ctx;
+	merger_context_ref(merger->ctx);
+	merger_heap_create(&merger->heap);
+	merger->nodes_count = 0;
+	merger->nodes = NULL;
+	merger->reverse = false;
+
+	return &merger->base;
+}
+
+int
+merger_set_sources(struct merger_source *base, struct merger_source **sources,
+		   uint32_t sources_count)
+{
+	struct merger *merger = container_of(base, struct merger, base);
+
+	/* Ensure we don't leak old nodes. */
+	assert(merger->nodes_count == 0);
+	assert(merger->nodes == NULL);
+
+	const size_t nodes_size =
+		sources_count * sizeof(struct merger_heap_node);
+	struct merger_heap_node *nodes = (struct merger_heap_node *) malloc(
+		nodes_size);
+	if (nodes == NULL) {
+		diag_set(OutOfMemory, nodes_size, "malloc",
+			 "merger heap nodes");
+		return -1;
+	}
+
+	for (uint32_t i = 0; i < sources_count; ++i)
+		merger_heap_node_new(&nodes[i], sources[i]);
+
+	merger->nodes_count = sources_count;
+	merger->nodes = nodes;
+	return 0;
+}
+
+void
+merger_set_reverse(struct merger_source *base, bool reverse)
+{
+	struct merger *merger = container_of(base, struct merger, base);
+
+	merger->reverse = reverse;
+}
+
+/* Virtual methods */
+
+static void
+merger_delete(struct merger_source *base)
+{
+	struct merger *merger = container_of(base, struct merger, base);
+
+	merger_context_unref(merger->ctx);
+	merger_heap_destroy(&merger->heap);
+
+	for (uint32_t i = 0; i < merger->nodes_count; ++i)
+		merger_heap_node_delete(&merger->nodes[i]);
+
+	if (merger->nodes != NULL)
+		free(merger->nodes);
+
+	free(merger);
+}
+
+static int
+merger_next(struct merger_source *base, box_tuple_format_t *format,
+	    struct tuple **out)
+{
+	struct merger *merger = container_of(base, struct merger, base);
+
+	/*
+	 * Fetch a first tuple for each source and add all heap
+	 * nodes to a merger heap.
+	 */
+	if (!merger->started) {
+		for (uint32_t i = 0; i < merger->nodes_count; ++i) {
+			struct merger_heap_node *node = &merger->nodes[i];
+			if (merger_add_heap_node(merger, node) != 0)
+				return -1;
+		}
+		merger->started = true;
+	}
+
+	/* Get a next tuple. */
+	struct merger_heap_node *node = merger_heap_top(&merger->heap);
+	if (node == NULL) {
+		*out = NULL;
+		return 0;
+	}
+	struct tuple *tuple = node->tuple;
+	assert(tuple != NULL);
+
+	/*
+	 * The tuples are stored in merger->ctx->format for
+	 * fast comparisons, but we should return tuples in a
+	 * requested format.
+	 */
+	uint32_t id_stored = tuple_format_id(merger->ctx->format);
+	assert(tuple->format_id == id_stored);
+	if (format == NULL)
+		format = merger->ctx->format;
+	uint32_t id_requested = tuple_format_id(format);
+	if (id_stored != id_requested) {
+		const char *tuple_beg = tuple_data(tuple);
+		const char *tuple_end = tuple_beg + tuple->bsize;
+		tuple = box_tuple_new(format, tuple_beg, tuple_end);
+		if (tuple == NULL)
+			return -1;
+		box_tuple_ref(tuple);
+		/*
+		 * The node->tuple pointer will be rewritten below
+		 * and in this branch it will not be returned. So
+		 * we unreference it.
+		 */
+		box_tuple_unref(node->tuple);
+	}
+
+	/*
+	 * Note: An old node->tuple pointer will be written to
+	 * *out as refcounted tuple or is already unreferenced
+	 * above, so we don't unreference it here.
+	 */
+	struct merger_source *source = node->source;
+	if (source->vtab->next(source, merger->ctx->format, &node->tuple) != 0)
+		return -1;
+
+	/* Update a heap. */
+	if (node->tuple == NULL)
+		merger_heap_delete(&merger->heap, node);
+	else
+		merger_heap_update(&merger->heap, node);
+
+	*out = tuple;
+	return 0;
+}
+
+/* }}} */
diff --git a/src/box/merger.h b/src/box/merger.h
new file mode 100644
index 000000000..2323dd7d7
--- /dev/null
+++ b/src/box/merger.h
@@ -0,0 +1,180 @@
+#ifndef TARANTOOL_BOX_MERGER_H_INCLUDED
+#define TARANTOOL_BOX_MERGER_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <stdbool.h>
+#include <stdint.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/* {{{ Structures */
+
+struct tuple;
+struct key_def;
+struct tuple_format;
+typedef struct tuple_format box_tuple_format_t;
+
+struct merger_source;
+struct merger_context;
+
+struct merger_source_vtab {
+	/**
+	 * Free a merger source.
+	 *
+	 * Don't call it directly, use merger_source_unref()
+	 * instead.
+	 */
+	void (*delete)(struct merger_source *base);
+	/**
+	 * Get a next tuple (refcounted) from a source.
+	 *
+	 * When format is NULL it means that it does not matter
+	 * for a caller in which format a tuple will be.
+	 *
+	 * Return 0 when successfully fetched a tuple or NULL. In
+	 * case of an error set a diag and return -1.
+	 */
+	int (*next)(struct merger_source *base, box_tuple_format_t *format,
+		    struct tuple **out);
+};
+
+/**
+ * Base (abstract) structure to represent a merger source.
+ *
+ * The structure does not hold any resources.
+ */
+struct merger_source {
+	/* Source-specific methods. */
+	struct merger_source_vtab *vtab;
+	/* Reference counter. */
+	int refs;
+};
+
+/**
+ * Holds immutable parameters of a merger.
+ */
+struct merger_context {
+	struct key_def *key_def;
+	box_tuple_format_t *format;
+	/* Reference counter. */
+	int refs;
+};
+
+/* }}} */
+
+/* {{{ Create, delete, ref, unref a source and a context */
+
+/**
+ * Increment a merger source reference counter.
+ */
+void
+merger_source_ref(struct merger_source *source);
+
+/**
+ * Decrement a merger source reference counter. When it has
+ * reached zero, free the source (call delete() virtual method).
+ */
+void
+merger_source_unref(struct merger_source *source);
+
+/**
+ * Initialize a base merger source structure.
+ */
+void
+merger_source_new(struct merger_source *source,
+		  struct merger_source_vtab *vtab);
+
+/**
+ * Free a merger context.
+ */
+void
+merger_context_delete(struct merger_context *ctx);
+
+/**
+ * Increment a merger context reference counter.
+ */
+void
+merger_context_ref(struct merger_context *ctx);
+
+/**
+ * Decrement a merger context reference counter. When it has
+ * reached zero, free the context.
+ */
+void
+merger_context_unref(struct merger_context *ctx);
+
+/**
+ * Create a new merger context.
+ *
+ * A returned merger context is NOT reference counted.
+ *
+ * Return NULL and set a diag in case of an error.
+ */
+struct merger_context *
+merger_context_new(const struct key_def *key_def);
+
+/* }}} */
+
+/* {{{ Merger */
+
+/**
+ * Create a new merger w/o sources.
+ *
+ * Return NULL and set a diag in case of an error.
+ */
+struct merger_source *
+merger_new(struct merger_context *ctx);
+
+/**
+ * Set sources for a merger.
+ *
+ * Return 0 at success. Return -1 at an error and set a diag.
+ */
+int
+merger_set_sources(struct merger_source *base, struct merger_source **sources,
+		   uint32_t sources_count);
+
+/**
+ * Set reverse flag for a merger.
+ */
+void
+merger_set_reverse(struct merger_source *base, bool reverse);
+
+/* }}} */
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_BOX_MERGER_H_INCLUDED */
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 64739ab09..afdcbebf6 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -224,3 +224,6 @@ target_link_libraries(swim.test unit swim)
 add_executable(swim_proto.test swim_proto.c swim_test_transport.c swim_test_ev.c
                swim_test_utils.c ${PROJECT_SOURCE_DIR}/src/version.c)
 target_link_libraries(swim_proto.test unit swim)
+
+add_executable(merger.test merger.test.c)
+target_link_libraries(merger.test unit core box)
diff --git a/test/unit/merger.result b/test/unit/merger.result
new file mode 100644
index 000000000..11399e0bc
--- /dev/null
+++ b/test/unit/merger.result
@@ -0,0 +1,71 @@
+1..4
+	*** test_basic ***
+    1..9
+	*** test_array_source ***
+    ok 1 - array source next() (any format): tuple != NULL
+    ok 2 - array source next() (any format): skip tuple format id check
+    ok 3 - array source next() (any format): check tuple size
+    ok 4 - array source next() (any format): check tuple data
+    ok 5 - array source next() (any format): tuple != NULL
+    ok 6 - array source next() (any format): skip tuple format id check
+    ok 7 - array source next() (any format): check tuple size
+    ok 8 - array source next() (any format): check tuple data
+    ok 9 - array source is empty (any format)
+	*** test_array_source: done ***
+ok 1 - subtests
+    1..9
+	*** test_array_source ***
+    ok 1 - array source next() (user's format): tuple != NULL
+    ok 2 - array source next() (user's format): check tuple format id
+    ok 3 - array source next() (user's format): check tuple size
+    ok 4 - array source next() (user's format): check tuple data
+    ok 5 - array source next() (user's format): tuple != NULL
+    ok 6 - array source next() (user's format): check tuple format id
+    ok 7 - array source next() (user's format): check tuple size
+    ok 8 - array source next() (user's format): check tuple data
+    ok 9 - array source is empty (user's format)
+	*** test_array_source: done ***
+ok 2 - subtests
+    1..17
+	*** test_merger ***
+    ok 1 - merger next() (any format): tuple != NULL
+    ok 2 - merger next() (any format): skip tuple format id check
+    ok 3 - merger next() (any format): check tuple size
+    ok 4 - merger next() (any format): check tuple data
+    ok 5 - merger next() (any format): tuple != NULL
+    ok 6 - merger next() (any format): skip tuple format id check
+    ok 7 - merger next() (any format): check tuple size
+    ok 8 - merger next() (any format): check tuple data
+    ok 9 - merger next() (any format): tuple != NULL
+    ok 10 - merger next() (any format): skip tuple format id check
+    ok 11 - merger next() (any format): check tuple size
+    ok 12 - merger next() (any format): check tuple data
+    ok 13 - merger next() (any format): tuple != NULL
+    ok 14 - merger next() (any format): skip tuple format id check
+    ok 15 - merger next() (any format): check tuple size
+    ok 16 - merger next() (any format): check tuple data
+    ok 17 - merger is empty (any format)
+	*** test_merger: done ***
+ok 3 - subtests
+    1..17
+	*** test_merger ***
+    ok 1 - merger next() (user's format): tuple != NULL
+    ok 2 - merger next() (user's format): check tuple format id
+    ok 3 - merger next() (user's format): check tuple size
+    ok 4 - merger next() (user's format): check tuple data
+    ok 5 - merger next() (user's format): tuple != NULL
+    ok 6 - merger next() (user's format): check tuple format id
+    ok 7 - merger next() (user's format): check tuple size
+    ok 8 - merger next() (user's format): check tuple data
+    ok 9 - merger next() (user's format): tuple != NULL
+    ok 10 - merger next() (user's format): check tuple format id
+    ok 11 - merger next() (user's format): check tuple size
+    ok 12 - merger next() (user's format): check tuple data
+    ok 13 - merger next() (user's format): tuple != NULL
+    ok 14 - merger next() (user's format): check tuple format id
+    ok 15 - merger next() (user's format): check tuple size
+    ok 16 - merger next() (user's format): check tuple data
+    ok 17 - merger is empty (user's format)
+	*** test_merger: done ***
+ok 4 - subtests
+	*** test_basic: done ***
diff --git a/test/unit/merger.test.c b/test/unit/merger.test.c
new file mode 100644
index 000000000..0a25d8f04
--- /dev/null
+++ b/test/unit/merger.test.c
@@ -0,0 +1,301 @@
+#include "unit.h"              /* plan, header, footer, is, ok */
+#include "memory.h"            /* memory_init() */
+#include "fiber.h"             /* fiber_init() */
+#include "box/tuple.h"         /* tuple_init(), box_tuple_*(),
+				  tuple_*() */
+#include "box/tuple_format.h"  /* box_tuple_format_default(),
+				  tuple_format_id() */
+#include "box/key_def.h"       /* key_def_new(),
+				  key_def_delete() */
+#include "box/merger.h"        /* merger_*() */
+
+/* {{{ Array merger source */
+
+struct merger_source_array {
+	struct merger_source base;
+	uint32_t tuples_count;
+	struct tuple **tuples;
+	uint32_t cur;
+};
+
+/* Virtual methods declarations */
+
+static void
+merger_source_array_delete(struct merger_source *base);
+static int
+merger_source_array_next(struct merger_source *base, box_tuple_format_t *format,
+			 struct tuple **out);
+
+/* Non-virtual methods */
+
+static struct merger_source *
+merger_source_array_new(bool even)
+{
+	static struct merger_source_vtab merger_source_array_vtab = {
+		.delete = merger_source_array_delete,
+		.next = merger_source_array_next,
+	};
+
+	struct merger_source_array *source =
+		(struct merger_source_array *) malloc(
+		sizeof(struct merger_source_array));
+	assert(source != NULL);
+
+	merger_source_new(&source->base, &merger_source_array_vtab);
+
+	uint32_t tuple_size = 2;
+	const uint32_t tuples_count = 2;
+	/* {1}, {3} */
+	static const char *data_odd[] = {"\x91\x01", "\x91\x03"};
+	/* {2}, {4} */
+	static const char *data_even[] = {"\x91\x02", "\x91\x04"};
+	const char **data = even ? data_even : data_odd;
+	source->tuples = (struct tuple **) malloc(
+		tuples_count * sizeof(struct tuple *));
+	assert(source->tuples != NULL);
+	box_tuple_format_t *format = box_tuple_format_default();
+	for (uint32_t i = 0; i < tuples_count; ++i) {
+		const char *end = data[i] + tuple_size;
+		source->tuples[i] = box_tuple_new(format, data[i], end);
+		box_tuple_ref(source->tuples[i]);
+	}
+	source->tuples_count = tuples_count;
+	source->cur = 0;
+
+	return &source->base;
+}
+
+/* Virtual methods */
+
+static void
+merger_source_array_delete(struct merger_source *base)
+{
+	struct merger_source_array *source = container_of(base,
+		struct merger_source_array, base);
+
+	for (uint32_t i = 0; i < source->tuples_count; ++i)
+		box_tuple_unref(source->tuples[i]);
+
+	free(source->tuples);
+	free(source);
+}
+
+static int
+merger_source_array_next(struct merger_source *base, box_tuple_format_t *format,
+			 struct tuple **out)
+{
+	struct merger_source_array *source = container_of(base,
+		struct merger_source_array, base);
+
+	if (source->cur == source->tuples_count) {
+		*out = NULL;
+		return 0;
+	}
+
+	struct tuple *tuple = source->tuples[source->cur];
+
+	box_tuple_format_t *default_format = box_tuple_format_default();
+	uint32_t id_stored = tuple_format_id(default_format);
+	assert(tuple->format_id == id_stored);
+	if (format == NULL)
+		format = default_format;
+	uint32_t id_requested = tuple_format_id(format);
+	if (id_stored != id_requested) {
+		const char *tuple_beg = tuple_data(tuple);
+		const char *tuple_end = tuple_beg + tuple->bsize;
+		tuple = box_tuple_new(format, tuple_beg, tuple_end);
+		assert(tuple != NULL);
+	}
+
+	assert(tuple != NULL);
+	box_tuple_ref(tuple);
+	*out = tuple;
+	++source->cur;
+	return 0;
+}
+
+/* }}} */
+
+static struct key_part_def key_part_unsigned = {
+	.fieldno = 0,
+	.type = FIELD_TYPE_UNSIGNED,
+	.coll_id = COLL_NONE,
+	.is_nullable = false,
+	.nullable_action = ON_CONFLICT_ACTION_DEFAULT,
+	.sort_order = SORT_ORDER_ASC,
+	.path = NULL,
+};
+
+static struct key_part_def key_part_integer = {
+	.fieldno = 0,
+	.type = FIELD_TYPE_INTEGER,
+	.coll_id = COLL_NONE,
+	.is_nullable = false,
+	.nullable_action = ON_CONFLICT_ACTION_DEFAULT,
+	.sort_order = SORT_ORDER_ASC,
+	.path = NULL,
+};
+
+uint32_t
+min_u32(uint32_t a, uint32_t b)
+{
+	return a < b ? a : b;
+}
+
+void
+check_tuple(struct tuple *tuple, box_tuple_format_t *format,
+	    const char *exp_data, uint32_t exp_data_len, const char *case_name)
+{
+	uint32_t size;
+	const char *data = tuple_data_range(tuple, &size);
+
+	ok(tuple != NULL, "%s: tuple != NULL", case_name);
+	if (format == NULL) {
+		ok(true, "%s: skip tuple format id check", case_name);
+	} else {
+		is(tuple->format_id, tuple_format_id(format),
+		   "%s: check tuple format id", case_name);
+	}
+	is(size, exp_data_len, "%s: check tuple size", case_name);
+	ok(!strncmp(data, exp_data, min_u32(size, exp_data_len)),
+	   "%s: check tuple data", case_name);
+}
+
+/**
+ * Check array source itself (just in case).
+ */
+int
+test_array_source(box_tuple_format_t *format)
+{
+	plan(9);
+	header();
+
+	/* {1}, {3} */
+	const uint32_t exp_tuple_size = 2;
+	const uint32_t exp_tuples_count = 2;
+	static const char *exp_tuples_data[] = {"\x91\x01", "\x91\x03"};
+
+	struct merger_source *source = merger_source_array_new(false);
+	assert(source != NULL);
+	merger_source_ref(source);
+
+	struct tuple *tuple = NULL;
+	const char *msg = format == NULL ?
+		"array source next() (any format)" :
+		"array source next() (user's format)";
+	for (uint32_t i = 0; i < exp_tuples_count; ++i) {
+		int rc = source->vtab->next(source, format, &tuple);
+		(void) rc;
+		assert(rc == 0);
+		check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size,
+			    msg);
+		box_tuple_unref(tuple);
+	}
+	int rc = source->vtab->next(source, format, &tuple);
+	(void) rc;
+	assert(rc == 0);
+	is(tuple, NULL, format == NULL ?
+	   "array source is empty (any format)" :
+	   "array source is empty (user's format)");
+
+	merger_source_unref(source);
+
+	footer();
+	return check_plan();
+}
+
+int
+test_merger(box_tuple_format_t *format)
+{
+	plan(17);
+	header();
+
+	/* {1}, {2}, {3}, {4} */
+	const uint32_t exp_tuple_size = 2;
+	const uint32_t exp_tuples_count = 4;
+	static const char *exp_tuples_data[] = {
+		"\x91\x01", "\x91\x02", "\x91\x03", "\x91\x04",
+	};
+
+	const uint32_t sources_count = 2;
+	struct merger_source *sources[] = {
+		merger_source_array_new(false),
+		merger_source_array_new(true),
+	};
+	merger_source_ref(sources[0]);
+	merger_source_ref(sources[1]);
+
+	struct key_def *key_def = key_def_new(&key_part_unsigned, 1);
+	struct merger_context *ctx = merger_context_new(key_def);
+	merger_context_ref(ctx);
+	key_def_delete(key_def);
+	struct merger_source *merger = merger_new(ctx);
+	merger_set_sources(merger, sources, sources_count);
+	merger_set_reverse(merger, false);
+	merger_source_ref(merger);
+
+	struct tuple *tuple = NULL;
+	const char *msg = format == NULL ?
+		"merger next() (any format)" :
+		"merger next() (user's format)";
+	for (uint32_t i = 0; i < exp_tuples_count; ++i) {
+		int rc = merger->vtab->next(merger, format, &tuple);
+		(void) rc;
+		assert(rc == 0);
+		check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size,
+			    msg);
+		box_tuple_unref(tuple);
+	}
+	int rc = merger->vtab->next(merger, format, &tuple);
+	(void) rc;
+	assert(rc == 0);
+	is(tuple, NULL, format == NULL ?
+	   "merger is empty (any format)" :
+	   "merger is empty (user's format)");
+
+	merger_source_unref(merger);
+	merger_context_unref(ctx);
+	merger_source_unref(sources[0]);
+	merger_source_unref(sources[1]);
+
+	footer();
+	return check_plan();
+}
+
+int
+test_basic()
+{
+	plan(4);
+	header();
+
+	struct key_def *key_def = key_def_new(&key_part_integer, 1);
+	box_tuple_format_t *format = box_tuple_format_new(&key_def, 1);
+	assert(format != NULL);
+
+	test_array_source(NULL);
+	test_array_source(format);
+	test_merger(NULL);
+	test_merger(format);
+
+	key_def_delete(key_def);
+	box_tuple_format_unref(format);
+
+	footer();
+	return check_plan();
+}
+
+int
+main()
+{
+	memory_init();
+	fiber_init(fiber_c_invoke);
+	tuple_init(NULL);
+
+	int rc = test_basic();
+
+	tuple_free();
+	fiber_free();
+	memory_free();
+
+	return rc;
+}
-- 
2.20.1




More information about the Tarantool-patches mailing list