From: Alexander Turenko <alexander.turenko@tarantool.org>
To: Vladimir Davydov <vdavydov.dev@gmail.com>
Cc: Alexander Turenko <alexander.turenko@tarantool.org>,
	tarantool-patches@freelists.org
Subject: [PATCH v4 3/4] Add merger for tuples streams (C part)
Date: Wed,  8 May 2019 01:30:47 +0300	[thread overview]
Message-ID: <fb384879110f6bd827aae18ceea3ac131f0c3476.1557267501.git.alexander.turenko@tarantool.org> (raw)
In-Reply-To: <cover.1557267501.git.alexander.turenko@tarantool.org>
Needed for #3276.
---
 src/box/CMakeLists.txt   |   1 +
 src/box/merger.c         | 355 +++++++++++++++++++++++++++++++++++++++
 src/box/merger.h         | 150 +++++++++++++++++
 test/unit/CMakeLists.txt |   3 +
 test/unit/merger.result  |  71 ++++++++
 test/unit/merger.test.c  | 285 +++++++++++++++++++++++++++++++
 6 files changed, 865 insertions(+)
 create mode 100644 src/box/merger.c
 create mode 100644 src/box/merger.h
 create mode 100644 test/unit/merger.result
 create mode 100644 test/unit/merger.test.c
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 2be0d1e35..ce328fb95 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -122,6 +122,7 @@ add_library(box STATIC
     execute.c
     wal.c
     call.c
+    merger.c
     ${lua_sources}
     lua/init.c
     lua/call.c
diff --git a/src/box/merger.c b/src/box/merger.c
new file mode 100644
index 000000000..744bba469
--- /dev/null
+++ b/src/box/merger.c
@@ -0,0 +1,355 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "box/merger.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#define HEAP_FORWARD_DECLARATION
+#include "salad/heap.h"
+
+#include "diag.h"             /* diag_set() */
+#include "box/tuple.h"        /* tuple_ref(), tuple_unref(),
+				 tuple_validate() */
+#include "box/tuple_format.h" /* box_tuple_format_new(),
+				 tuple_format_*() */
+#include "box/key_def.h"      /* key_def_*(),
+				 tuple_compare() */
+
+/* {{{ Merger */
+
+/**
+ * Holds a source to fetch next tuples and a last fetched tuple to
+ * compare the node against other nodes.
+ *
+ * The main reason why this structure is separated from a merge
+ * source is that a heap node can not be a member of several
+ * heaps.
+ *
+ * The second reason is that it allows to encapsulate all heap
+ * related logic inside this compilation unit, without any traces
+ * in externally visible structures.
+ */
+struct merger_heap_node {
+	/* A source of tuples. */
+	struct merge_source *source;
+	/*
+	 * A last fetched (refcounted) tuple to compare against
+	 * other nodes.
+	 */
+	struct tuple *tuple;
+	/* An anchor to make the structure a merger heap node. */
+	struct heap_node in_merger;
+};
+
+static bool
+merge_source_less(const heap_t *heap, const struct merger_heap_node *left,
+		  const struct merger_heap_node *right);
+#define HEAP_NAME merger_heap
+#define HEAP_LESS merge_source_less
+#define heap_value_t struct merger_heap_node
+#define heap_value_attr in_merger
+#include "salad/heap.h"
+#undef HEAP_NAME
+#undef HEAP_LESS
+#undef heap_value_t
+#undef heap_value_attr
+
+/**
+ * Holds a heap, parameters of a merge process and utility fields.
+ */
+struct merger {
+	/* A merger is a source. */
+	struct merge_source base;
+	/*
+	 * Whether a merge process started.
+	 *
+	 * The merger postpones charging of heap nodes until a
+	 * first output tuple is acquired.
+	 */
+	bool started;
+	/* A key_def to compare tuples. */
+	struct key_def *key_def;
+	/* A format to acquire compatible tuples from sources. */
+	struct tuple_format *format;
+	/*
+	 * A heap of sources (of nodes that contains a source to
+	 * be exact).
+	 */
+	heap_t heap;
+	/* An array of heap nodes. */
+	uint32_t node_count;
+	struct merger_heap_node *nodes;
+	/* Ascending (false) / descending (true) order. */
+	bool reverse;
+};
+
+/* Helpers */
+
+/**
+ * Data comparing function to construct a heap of sources.
+ */
+static bool
+merge_source_less(const heap_t *heap, const struct merger_heap_node *left,
+		  const struct merger_heap_node *right)
+{
+	assert(left->tuple != NULL);
+	assert(right->tuple != NULL);
+	struct merger *merger = container_of(heap, struct merger, heap);
+	int cmp = tuple_compare(left->tuple, right->tuple, merger->key_def);
+	return merger->reverse ? cmp >= 0 : cmp < 0;
+}
+
+/**
+ * Initialize a new merger heap node.
+ */
+static void
+merger_heap_node_create(struct merger_heap_node *node,
+			struct merge_source *source)
+{
+	node->source = source;
+	merge_source_ref(node->source);
+	node->tuple = NULL;
+	heap_node_create(&node->in_merger);
+}
+
+/**
+ * Free a merger heap node.
+ */
+static void
+merger_heap_node_delete(struct merger_heap_node *node)
+{
+	merge_source_unref(node->source);
+	if (node->tuple != NULL)
+		tuple_unref(node->tuple);
+}
+
+/**
+ * The helper to add a new heap node to a merger heap.
+ *
+ * Return -1 at an error and set a diag.
+ *
+ * Otherwise store a next tuple in node->tuple, add the node to
+ * merger->heap and return 0.
+ */
+static int
+merger_add_heap_node(struct merger *merger, struct merger_heap_node *node)
+{
+	struct tuple *tuple = NULL;
+
+	/* Acquire a next tuple. */
+	struct merge_source *source = node->source;
+	if (merge_source_next(source, merger->format, &tuple) != 0)
+		return -1;
+
+	/* Don't add an empty source to a heap. */
+	if (tuple == NULL)
+		return 0;
+
+	node->tuple = tuple;
+
+	/* Add a node to a heap. */
+	if (merger_heap_insert(&merger->heap, node) != 0) {
+		diag_set(OutOfMemory, 0, "malloc", "merger->heap");
+		return -1;
+	}
+
+	return 0;
+}
+
+/* Virtual methods declarations */
+
+static void
+merger_delete(struct merge_source *base);
+static int
+merger_next(struct merge_source *base, struct tuple_format *format,
+	    struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Set sources for a merger.
+ *
+ * It is the helper for merger_new().
+ *
+ * Return 0 at success. Return -1 at an error and set a diag.
+ */
+static int
+merger_set_sources(struct merger *merger, struct merge_source **sources,
+		   uint32_t source_count)
+{
+	const size_t nodes_size = sizeof(struct merger_heap_node) *
+		source_count;
+	struct merger_heap_node *nodes = malloc(nodes_size);
+	if (nodes == NULL) {
+		diag_set(OutOfMemory, nodes_size, "malloc",
+			 "merger heap nodes");
+		return -1;
+	}
+
+	for (uint32_t i = 0; i < source_count; ++i)
+		merger_heap_node_create(&nodes[i], sources[i]);
+
+	merger->node_count = source_count;
+	merger->nodes = nodes;
+	return 0;
+}
+
+
+struct merge_source *
+merger_new(struct key_def *key_def, struct merge_source **sources,
+	   uint32_t source_count, bool reverse)
+{
+	static struct merge_source_vtab merger_vtab = {
+		.destroy = merger_delete,
+		.next = merger_next,
+	};
+
+	struct merger *merger = malloc(sizeof(struct merger));
+	if (merger == NULL) {
+		diag_set(OutOfMemory, sizeof(struct merger), "malloc",
+			 "merger");
+		return NULL;
+	}
+
+	/*
+	 * We need to copy the key_def because it can be collected
+	 * before a merge process ends (say, by LuaJIT GC if the
+	 * key_def comes from Lua).
+	 */
+	key_def = key_def_dup(key_def);
+	if (key_def == NULL) {
+		free(merger);
+		return NULL;
+	}
+
+	struct tuple_format *format = box_tuple_format_new(&key_def, 1);
+	if (format == NULL) {
+		key_def_delete(key_def);
+		free(merger);
+		return NULL;
+	}
+
+	merge_source_create(&merger->base, &merger_vtab);
+	merger->started = false;
+	merger->key_def = key_def;
+	merger->format = format;
+	merger_heap_create(&merger->heap);
+	merger->node_count = 0;
+	merger->nodes = NULL;
+	merger->reverse = reverse;
+
+	if (merger_set_sources(merger, sources, source_count) != 0) {
+		key_def_delete(merger->key_def);
+		tuple_format_unref(merger->format);
+		merger_heap_destroy(&merger->heap);
+		free(merger);
+		return NULL;
+	}
+
+	return &merger->base;
+}
+
+/* Virtual methods */
+
+static void
+merger_delete(struct merge_source *base)
+{
+	struct merger *merger = container_of(base, struct merger, base);
+
+	key_def_delete(merger->key_def);
+	tuple_format_unref(merger->format);
+	merger_heap_destroy(&merger->heap);
+
+	for (uint32_t i = 0; i < merger->node_count; ++i)
+		merger_heap_node_delete(&merger->nodes[i]);
+
+	if (merger->nodes != NULL)
+		free(merger->nodes);
+
+	free(merger);
+}
+
+static int
+merger_next(struct merge_source *base, struct tuple_format *format,
+	    struct tuple **out)
+{
+	struct merger *merger = container_of(base, struct merger, base);
+
+	/*
+	 * Fetch a first tuple for each source and add all heap
+	 * nodes to a merger heap.
+	 */
+	if (!merger->started) {
+		for (uint32_t i = 0; i < merger->node_count; ++i) {
+			struct merger_heap_node *node = &merger->nodes[i];
+			if (merger_add_heap_node(merger, node) != 0)
+				return -1;
+		}
+		merger->started = true;
+	}
+
+	/* Get a next tuple. */
+	struct merger_heap_node *node = merger_heap_top(&merger->heap);
+	if (node == NULL) {
+		*out = NULL;
+		return 0;
+	}
+	struct tuple *tuple = node->tuple;
+	assert(tuple != NULL);
+
+	/* Validate the tuple. */
+	if (format != NULL && tuple_validate(format, tuple) != 0)
+		return -1;
+
+	/*
+	 * Note: An old node->tuple pointer will be written to
+	 * *out as refcounted tuple, so we don't unreference it
+	 * here.
+	 */
+	struct merge_source *source = node->source;
+	if (merge_source_next(source, merger->format, &node->tuple) != 0)
+		return -1;
+
+	/* Update a heap. */
+	if (node->tuple == NULL)
+		merger_heap_delete(&merger->heap, node);
+	else
+		merger_heap_update(&merger->heap, node);
+
+	*out = tuple;
+	return 0;
+}
+
+/* }}} */
diff --git a/src/box/merger.h b/src/box/merger.h
new file mode 100644
index 000000000..90ae175a9
--- /dev/null
+++ b/src/box/merger.h
@@ -0,0 +1,150 @@
+#ifndef TARANTOOL_BOX_MERGER_H_INCLUDED
+#define TARANTOOL_BOX_MERGER_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/* {{{ Structures */
+
+struct tuple;
+struct key_def;
+struct tuple_format;
+
+struct merge_source;
+
+struct merge_source_vtab {
+	/**
+	 * Free a merge source.
+	 *
+	 * Don't call it directly, use merge_source_unref()
+	 * instead.
+	 */
+	void (*destroy)(struct merge_source *base);
+	/**
+	 * Get a next tuple (refcounted) from a source.
+	 *
+	 * When format is not NULL the resulting tuple will be in
+	 * a compatible format.
+	 *
+	 * When format is NULL it means that it does not matter
+	 * for a caller in which format a tuple will be.
+	 *
+	 * Return 0 when successfully fetched a tuple or NULL. In
+	 * case of an error set a diag and return -1.
+	 */
+	int (*next)(struct merge_source *base, struct tuple_format *format,
+		    struct tuple **out);
+};
+
+/**
+ * Base (abstract) structure to represent a merge source.
+ *
+ * The structure does not hold any resources.
+ */
+struct merge_source {
+	/* Source-specific methods. */
+	const struct merge_source_vtab *vtab;
+	/* Reference counter. */
+	int refs;
+};
+
+/* }}} */
+
+/* {{{ Base merge source functions */
+
+/**
+ * Increment a merge source reference counter.
+ */
+static inline void
+merge_source_ref(struct merge_source *source)
+{
+	++source->refs;
+}
+
+/**
+ * Decrement a merge source reference counter. When it has
+ * reached zero, free the source (call destroy() virtual method).
+ */
+static inline void
+merge_source_unref(struct merge_source *source)
+{
+	assert(source->refs - 1 >= 0);
+	if (--source->refs == 0)
+		source->vtab->destroy(source);
+}
+
+/**
+ * @see merge_source_vtab
+ */
+static inline int
+merge_source_next(struct merge_source *source, struct tuple_format *format,
+		  struct tuple **out)
+{
+	return source->vtab->next(source, format, out);
+}
+
+/**
+ * Initialize a base merge source structure.
+ */
+static inline void
+merge_source_create(struct merge_source *source, struct merge_source_vtab *vtab)
+{
+	source->vtab = vtab;
+	source->refs = 1;
+}
+
+/* }}} */
+
+/* {{{ Merger */
+
+/**
+ * Create a new merger.
+ *
+ * Return NULL and set a diag in case of an error.
+ */
+struct merge_source *
+merger_new(struct key_def *key_def, struct merge_source **sources,
+	   uint32_t source_count, bool reverse);
+
+/* }}} */
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_BOX_MERGER_H_INCLUDED */
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 2c8340800..07dcd6cf2 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -224,3 +224,6 @@ target_link_libraries(swim.test unit swim)
 add_executable(swim_proto.test swim_proto.c swim_test_transport.c swim_test_ev.c
                swim_test_utils.c ${PROJECT_SOURCE_DIR}/src/version.c)
 target_link_libraries(swim_proto.test unit swim)
+
+add_executable(merger.test merger.test.c)
+target_link_libraries(merger.test unit core box)
diff --git a/test/unit/merger.result b/test/unit/merger.result
new file mode 100644
index 000000000..766ebce63
--- /dev/null
+++ b/test/unit/merger.result
@@ -0,0 +1,71 @@
+1..4
+	*** test_basic ***
+    1..9
+	*** test_array_source ***
+    ok 1 - array source next() (any format): tuple != NULL
+    ok 2 - array source next() (any format): skip tuple validation
+    ok 3 - array source next() (any format): check tuple size
+    ok 4 - array source next() (any format): check tuple data
+    ok 5 - array source next() (any format): tuple != NULL
+    ok 6 - array source next() (any format): skip tuple validation
+    ok 7 - array source next() (any format): check tuple size
+    ok 8 - array source next() (any format): check tuple data
+    ok 9 - array source is empty (any format)
+	*** test_array_source: done ***
+ok 1 - subtests
+    1..9
+	*** test_array_source ***
+    ok 1 - array source next() (user's format): tuple != NULL
+    ok 2 - array source next() (user's format): validate tuple
+    ok 3 - array source next() (user's format): check tuple size
+    ok 4 - array source next() (user's format): check tuple data
+    ok 5 - array source next() (user's format): tuple != NULL
+    ok 6 - array source next() (user's format): validate tuple
+    ok 7 - array source next() (user's format): check tuple size
+    ok 8 - array source next() (user's format): check tuple data
+    ok 9 - array source is empty (user's format)
+	*** test_array_source: done ***
+ok 2 - subtests
+    1..17
+	*** test_merger ***
+    ok 1 - merger next() (any format): tuple != NULL
+    ok 2 - merger next() (any format): skip tuple validation
+    ok 3 - merger next() (any format): check tuple size
+    ok 4 - merger next() (any format): check tuple data
+    ok 5 - merger next() (any format): tuple != NULL
+    ok 6 - merger next() (any format): skip tuple validation
+    ok 7 - merger next() (any format): check tuple size
+    ok 8 - merger next() (any format): check tuple data
+    ok 9 - merger next() (any format): tuple != NULL
+    ok 10 - merger next() (any format): skip tuple validation
+    ok 11 - merger next() (any format): check tuple size
+    ok 12 - merger next() (any format): check tuple data
+    ok 13 - merger next() (any format): tuple != NULL
+    ok 14 - merger next() (any format): skip tuple validation
+    ok 15 - merger next() (any format): check tuple size
+    ok 16 - merger next() (any format): check tuple data
+    ok 17 - merger is empty (any format)
+	*** test_merger: done ***
+ok 3 - subtests
+    1..17
+	*** test_merger ***
+    ok 1 - merger next() (user's format): tuple != NULL
+    ok 2 - merger next() (user's format): validate tuple
+    ok 3 - merger next() (user's format): check tuple size
+    ok 4 - merger next() (user's format): check tuple data
+    ok 5 - merger next() (user's format): tuple != NULL
+    ok 6 - merger next() (user's format): validate tuple
+    ok 7 - merger next() (user's format): check tuple size
+    ok 8 - merger next() (user's format): check tuple data
+    ok 9 - merger next() (user's format): tuple != NULL
+    ok 10 - merger next() (user's format): validate tuple
+    ok 11 - merger next() (user's format): check tuple size
+    ok 12 - merger next() (user's format): check tuple data
+    ok 13 - merger next() (user's format): tuple != NULL
+    ok 14 - merger next() (user's format): validate tuple
+    ok 15 - merger next() (user's format): check tuple size
+    ok 16 - merger next() (user's format): check tuple data
+    ok 17 - merger is empty (user's format)
+	*** test_merger: done ***
+ok 4 - subtests
+	*** test_basic: done ***
diff --git a/test/unit/merger.test.c b/test/unit/merger.test.c
new file mode 100644
index 000000000..b4a989a20
--- /dev/null
+++ b/test/unit/merger.test.c
@@ -0,0 +1,285 @@
+#include "unit.h"              /* plan, header, footer, is, ok */
+#include "memory.h"            /* memory_init() */
+#include "fiber.h"             /* fiber_init() */
+#include "box/tuple.h"         /* tuple_init(), tuple_*(),
+				  tuple_validate() */
+#include "box/tuple_format.h"  /* tuple_format_*,
+				  box_tuple_format_new() */
+#include "box/key_def.h"       /* key_def_new(),
+				  key_def_delete() */
+#include "box/merger.h"        /* merger_*() */
+
+/* {{{ Array merge source */
+
+struct merge_source_array {
+	struct merge_source base;
+	uint32_t tuple_count;
+	struct tuple **tuples;
+	uint32_t cur;
+};
+
+/* Virtual methods declarations */
+
+static void
+merge_source_array_destroy(struct merge_source *base);
+static int
+merge_source_array_next(struct merge_source *base, struct tuple_format *format,
+			struct tuple **out);
+
+/* Non-virtual methods */
+
+static struct merge_source *
+merge_source_array_new(bool even)
+{
+	static struct merge_source_vtab merge_source_array_vtab = {
+		.destroy = merge_source_array_destroy,
+		.next = merge_source_array_next,
+	};
+
+	struct merge_source_array *source = malloc(
+		sizeof(struct merge_source_array));
+	assert(source != NULL);
+
+	merge_source_create(&source->base, &merge_source_array_vtab);
+
+	uint32_t tuple_size = 2;
+	const uint32_t tuple_count = 2;
+	/* [1], [3] */
+	static const char *data_odd[] = {"\x91\x01", "\x91\x03"};
+	/* [2], [4] */
+	static const char *data_even[] = {"\x91\x02", "\x91\x04"};
+	const char **data = even ? data_even : data_odd;
+	source->tuples = malloc(sizeof(struct tuple *) * tuple_count);
+	assert(source->tuples != NULL);
+	struct tuple_format *format = tuple_format_runtime;
+	for (uint32_t i = 0; i < tuple_count; ++i) {
+		const char *end = data[i] + tuple_size;
+		source->tuples[i] = tuple_new(format, data[i], end);
+		tuple_ref(source->tuples[i]);
+	}
+	source->tuple_count = tuple_count;
+	source->cur = 0;
+
+	return &source->base;
+}
+
+/* Virtual methods */
+
+static void
+merge_source_array_destroy(struct merge_source *base)
+{
+	struct merge_source_array *source = container_of(base,
+		struct merge_source_array, base);
+
+	for (uint32_t i = 0; i < source->tuple_count; ++i)
+		tuple_unref(source->tuples[i]);
+
+	free(source->tuples);
+	free(source);
+}
+
+static int
+merge_source_array_next(struct merge_source *base, struct tuple_format *format,
+			struct tuple **out)
+{
+	struct merge_source_array *source = container_of(base,
+		struct merge_source_array, base);
+
+	if (source->cur == source->tuple_count) {
+		*out = NULL;
+		return 0;
+	}
+
+	struct tuple *tuple = source->tuples[source->cur];
+	assert(tuple != NULL);
+
+	/*
+	 * Note: The source still stores the tuple (and will
+	 * unreference it during destroy). Here we should give a
+	 * referenced tuple (so a caller should unreference it on
+	 * its side).
+	 */
+	tuple_ref(tuple);
+
+	*out = tuple;
+	++source->cur;
+	return 0;
+}
+
+/* }}} */
+
+static struct key_part_def key_part_unsigned = {
+	.fieldno = 0,
+	.type = FIELD_TYPE_UNSIGNED,
+	.coll_id = COLL_NONE,
+	.is_nullable = false,
+	.nullable_action = ON_CONFLICT_ACTION_DEFAULT,
+	.sort_order = SORT_ORDER_ASC,
+	.path = NULL,
+};
+
+static struct key_part_def key_part_integer = {
+	.fieldno = 0,
+	.type = FIELD_TYPE_INTEGER,
+	.coll_id = COLL_NONE,
+	.is_nullable = false,
+	.nullable_action = ON_CONFLICT_ACTION_DEFAULT,
+	.sort_order = SORT_ORDER_ASC,
+	.path = NULL,
+};
+
+uint32_t
+min_u32(uint32_t a, uint32_t b)
+{
+	return a < b ? a : b;
+}
+
+void
+check_tuple(struct tuple *tuple, struct tuple_format *format,
+	    const char *exp_data, uint32_t exp_data_len, const char *case_name)
+{
+	uint32_t size;
+	const char *data = tuple_data_range(tuple, &size);
+
+	ok(tuple != NULL, "%s: tuple != NULL", case_name);
+	if (format == NULL) {
+		ok(true, "%s: skip tuple validation", case_name);
+	} else {
+		int rc = tuple_validate(format, tuple);
+		is(rc, 0, "%s: validate tuple", case_name);
+	}
+	is(size, exp_data_len, "%s: check tuple size", case_name);
+	ok(!strncmp(data, exp_data, min_u32(size, exp_data_len)),
+	   "%s: check tuple data", case_name);
+}
+
+/**
+ * Check array source itself (just in case).
+ */
+int
+test_array_source(struct tuple_format *format)
+{
+	plan(9);
+	header();
+
+	/* [1], [3] */
+	const uint32_t exp_tuple_size = 2;
+	const uint32_t exp_tuple_count = 2;
+	static const char *exp_tuples_data[] = {"\x91\x01", "\x91\x03"};
+
+	struct merge_source *source = merge_source_array_new(false);
+	assert(source != NULL);
+
+	struct tuple *tuple = NULL;
+	const char *msg = format == NULL ?
+		"array source next() (any format)" :
+		"array source next() (user's format)";
+	for (uint32_t i = 0; i < exp_tuple_count; ++i) {
+		int rc = merge_source_next(source, format, &tuple);
+		(void) rc;
+		assert(rc == 0);
+		check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size,
+			    msg);
+		tuple_unref(tuple);
+	}
+	int rc = merge_source_next(source, format, &tuple);
+	(void) rc;
+	assert(rc == 0);
+	is(tuple, NULL, format == NULL ?
+	   "array source is empty (any format)" :
+	   "array source is empty (user's format)");
+
+	merge_source_unref(source);
+
+	footer();
+	return check_plan();
+}
+
+int
+test_merger(struct tuple_format *format)
+{
+	plan(17);
+	header();
+
+	/* [1], [2], [3], [4] */
+	const uint32_t exp_tuple_size = 2;
+	const uint32_t exp_tuple_count = 4;
+	static const char *exp_tuples_data[] = {
+		"\x91\x01", "\x91\x02", "\x91\x03", "\x91\x04",
+	};
+
+	const uint32_t source_count = 2;
+	struct merge_source *sources[] = {
+		merge_source_array_new(false),
+		merge_source_array_new(true),
+	};
+
+	struct key_def *key_def = key_def_new(&key_part_unsigned, 1);
+	struct merge_source *merger = merger_new(key_def, sources, source_count,
+						 false);
+	key_def_delete(key_def);
+
+	struct tuple *tuple = NULL;
+	const char *msg = format == NULL ?
+		"merger next() (any format)" :
+		"merger next() (user's format)";
+	for (uint32_t i = 0; i < exp_tuple_count; ++i) {
+		int rc = merge_source_next(merger, format, &tuple);
+		(void) rc;
+		assert(rc == 0);
+		check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size,
+			    msg);
+		tuple_unref(tuple);
+	}
+	int rc = merge_source_next(merger, format, &tuple);
+	(void) rc;
+	assert(rc == 0);
+	is(tuple, NULL, format == NULL ?
+	   "merger is empty (any format)" :
+	   "merger is empty (user's format)");
+
+	merge_source_unref(merger);
+	merge_source_unref(sources[0]);
+	merge_source_unref(sources[1]);
+
+	footer();
+	return check_plan();
+}
+
+int
+test_basic()
+{
+	plan(4);
+	header();
+
+	struct key_def *key_def = key_def_new(&key_part_integer, 1);
+	struct tuple_format *format = box_tuple_format_new(&key_def, 1);
+	assert(format != NULL);
+
+	test_array_source(NULL);
+	test_array_source(format);
+	test_merger(NULL);
+	test_merger(format);
+
+	key_def_delete(key_def);
+	tuple_format_unref(format);
+
+	footer();
+	return check_plan();
+}
+
+int
+main()
+{
+	memory_init();
+	fiber_init(fiber_c_invoke);
+	tuple_init(NULL);
+
+	int rc = test_basic();
+
+	tuple_free();
+	fiber_free();
+	memory_free();
+
+	return rc;
+}
-- 
2.21.0
next prev parent reply	other threads:[~2019-05-07 22:30 UTC|newest]
Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-05-07 22:30 [PATCH v4 0/4] Merger Alexander Turenko
2019-05-07 22:30 ` [PATCH v4 1/4] lua: add non-recursive msgpack decoding functions Alexander Turenko
2019-05-13 13:52   ` Vladimir Davydov
2019-05-07 22:30 ` [PATCH v4 2/4] net.box: add skip_header option to use with buffer Alexander Turenko
2019-05-13 13:52   ` Vladimir Davydov
2019-05-07 22:30 ` Alexander Turenko [this message]
2019-05-13 14:49   ` [PATCH v4 3/4] Add merger for tuples streams (C part) Vladimir Davydov
2019-05-07 22:30 ` [PATCH v4 4/4] Add merger for tuple streams (Lua part) Alexander Turenko
2019-05-13 14:49   ` Vladimir Davydov
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox
  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):
  git send-email \
    --in-reply-to=fb384879110f6bd827aae18ceea3ac131f0c3476.1557267501.git.alexander.turenko@tarantool.org \
    --to=alexander.turenko@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=vdavydov.dev@gmail.com \
    --subject='Re: [PATCH v4 3/4] Add merger for tuples streams (C part)' \
    /path/to/YOUR_REPLY
  https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox