[PATCH v4 3/4] Add merger for tuples streams (C part)
Alexander Turenko
alexander.turenko at tarantool.org
Wed May 8 01:30:47 MSK 2019
Needed for #3276.
---
src/box/CMakeLists.txt | 1 +
src/box/merger.c | 355 +++++++++++++++++++++++++++++++++++++++
src/box/merger.h | 150 +++++++++++++++++
test/unit/CMakeLists.txt | 3 +
test/unit/merger.result | 71 ++++++++
test/unit/merger.test.c | 285 +++++++++++++++++++++++++++++++
6 files changed, 865 insertions(+)
create mode 100644 src/box/merger.c
create mode 100644 src/box/merger.h
create mode 100644 test/unit/merger.result
create mode 100644 test/unit/merger.test.c
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 2be0d1e35..ce328fb95 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -122,6 +122,7 @@ add_library(box STATIC
execute.c
wal.c
call.c
+ merger.c
${lua_sources}
lua/init.c
lua/call.c
diff --git a/src/box/merger.c b/src/box/merger.c
new file mode 100644
index 000000000..744bba469
--- /dev/null
+++ b/src/box/merger.c
@@ -0,0 +1,355 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "box/merger.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#define HEAP_FORWARD_DECLARATION
+#include "salad/heap.h"
+
+#include "diag.h" /* diag_set() */
+#include "box/tuple.h" /* tuple_ref(), tuple_unref(),
+ tuple_validate() */
+#include "box/tuple_format.h" /* box_tuple_format_new(),
+ tuple_format_*() */
+#include "box/key_def.h" /* key_def_*(),
+ tuple_compare() */
+
+/* {{{ Merger */
+
+/**
+ * Holds a source to fetch next tuples and a last fetched tuple to
+ * compare the node against other nodes.
+ *
+ * The main reason why this structure is separated from a merge
+ * source is that a heap node can not be a member of several
+ * heaps.
+ *
+ * The second reason is that it allows to encapsulate all heap
+ * related logic inside this compilation unit, without any traces
+ * in externally visible structures.
+ */
+struct merger_heap_node {
+ /* A source of tuples. */
+ struct merge_source *source;
+ /*
+ * A last fetched (refcounted) tuple to compare against
+ * other nodes.
+ */
+ struct tuple *tuple;
+ /* An anchor to make the structure a merger heap node. */
+ struct heap_node in_merger;
+};
+
+static bool
+merge_source_less(const heap_t *heap, const struct merger_heap_node *left,
+ const struct merger_heap_node *right);
+#define HEAP_NAME merger_heap
+#define HEAP_LESS merge_source_less
+#define heap_value_t struct merger_heap_node
+#define heap_value_attr in_merger
+#include "salad/heap.h"
+#undef HEAP_NAME
+#undef HEAP_LESS
+#undef heap_value_t
+#undef heap_value_attr
+
+/**
+ * Holds a heap, parameters of a merge process and utility fields.
+ */
+struct merger {
+ /* A merger is a source. */
+ struct merge_source base;
+ /*
+ * Whether a merge process started.
+ *
+ * The merger postpones charging of heap nodes until a
+ * first output tuple is acquired.
+ */
+ bool started;
+ /* A key_def to compare tuples. */
+ struct key_def *key_def;
+ /* A format to acquire compatible tuples from sources. */
+ struct tuple_format *format;
+ /*
+ * A heap of sources (of nodes that contains a source to
+ * be exact).
+ */
+ heap_t heap;
+ /* An array of heap nodes. */
+ uint32_t node_count;
+ struct merger_heap_node *nodes;
+ /* Ascending (false) / descending (true) order. */
+ bool reverse;
+};
+
+/* Helpers */
+
+/**
+ * Data comparing function to construct a heap of sources.
+ */
+static bool
+merge_source_less(const heap_t *heap, const struct merger_heap_node *left,
+ const struct merger_heap_node *right)
+{
+ assert(left->tuple != NULL);
+ assert(right->tuple != NULL);
+ struct merger *merger = container_of(heap, struct merger, heap);
+ int cmp = tuple_compare(left->tuple, right->tuple, merger->key_def);
+ return merger->reverse ? cmp >= 0 : cmp < 0;
+}
+
+/**
+ * Initialize a new merger heap node.
+ */
+static void
+merger_heap_node_create(struct merger_heap_node *node,
+ struct merge_source *source)
+{
+ node->source = source;
+ merge_source_ref(node->source);
+ node->tuple = NULL;
+ heap_node_create(&node->in_merger);
+}
+
+/**
+ * Free a merger heap node.
+ */
+static void
+merger_heap_node_delete(struct merger_heap_node *node)
+{
+ merge_source_unref(node->source);
+ if (node->tuple != NULL)
+ tuple_unref(node->tuple);
+}
+
+/**
+ * The helper to add a new heap node to a merger heap.
+ *
+ * Return -1 at an error and set a diag.
+ *
+ * Otherwise store a next tuple in node->tuple, add the node to
+ * merger->heap and return 0.
+ */
+static int
+merger_add_heap_node(struct merger *merger, struct merger_heap_node *node)
+{
+ struct tuple *tuple = NULL;
+
+ /* Acquire a next tuple. */
+ struct merge_source *source = node->source;
+ if (merge_source_next(source, merger->format, &tuple) != 0)
+ return -1;
+
+ /* Don't add an empty source to a heap. */
+ if (tuple == NULL)
+ return 0;
+
+ node->tuple = tuple;
+
+ /* Add a node to a heap. */
+ if (merger_heap_insert(&merger->heap, node) != 0) {
+ diag_set(OutOfMemory, 0, "malloc", "merger->heap");
+ return -1;
+ }
+
+ return 0;
+}
+
+/* Virtual methods declarations */
+
+static void
+merger_delete(struct merge_source *base);
+static int
+merger_next(struct merge_source *base, struct tuple_format *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Set sources for a merger.
+ *
+ * It is the helper for merger_new().
+ *
+ * Return 0 at success. Return -1 at an error and set a diag.
+ */
+static int
+merger_set_sources(struct merger *merger, struct merge_source **sources,
+ uint32_t source_count)
+{
+ const size_t nodes_size = sizeof(struct merger_heap_node) *
+ source_count;
+ struct merger_heap_node *nodes = malloc(nodes_size);
+ if (nodes == NULL) {
+ diag_set(OutOfMemory, nodes_size, "malloc",
+ "merger heap nodes");
+ return -1;
+ }
+
+ for (uint32_t i = 0; i < source_count; ++i)
+ merger_heap_node_create(&nodes[i], sources[i]);
+
+ merger->node_count = source_count;
+ merger->nodes = nodes;
+ return 0;
+}
+
+
+struct merge_source *
+merger_new(struct key_def *key_def, struct merge_source **sources,
+ uint32_t source_count, bool reverse)
+{
+ static struct merge_source_vtab merger_vtab = {
+ .destroy = merger_delete,
+ .next = merger_next,
+ };
+
+ struct merger *merger = malloc(sizeof(struct merger));
+ if (merger == NULL) {
+ diag_set(OutOfMemory, sizeof(struct merger), "malloc",
+ "merger");
+ return NULL;
+ }
+
+ /*
+ * We need to copy the key_def because it can be collected
+ * before a merge process ends (say, by LuaJIT GC if the
+ * key_def comes from Lua).
+ */
+ key_def = key_def_dup(key_def);
+ if (key_def == NULL) {
+ free(merger);
+ return NULL;
+ }
+
+ struct tuple_format *format = box_tuple_format_new(&key_def, 1);
+ if (format == NULL) {
+ key_def_delete(key_def);
+ free(merger);
+ return NULL;
+ }
+
+ merge_source_create(&merger->base, &merger_vtab);
+ merger->started = false;
+ merger->key_def = key_def;
+ merger->format = format;
+ merger_heap_create(&merger->heap);
+ merger->node_count = 0;
+ merger->nodes = NULL;
+ merger->reverse = reverse;
+
+ if (merger_set_sources(merger, sources, source_count) != 0) {
+ key_def_delete(merger->key_def);
+ tuple_format_unref(merger->format);
+ merger_heap_destroy(&merger->heap);
+ free(merger);
+ return NULL;
+ }
+
+ return &merger->base;
+}
+
+/* Virtual methods */
+
+static void
+merger_delete(struct merge_source *base)
+{
+ struct merger *merger = container_of(base, struct merger, base);
+
+ key_def_delete(merger->key_def);
+ tuple_format_unref(merger->format);
+ merger_heap_destroy(&merger->heap);
+
+ for (uint32_t i = 0; i < merger->node_count; ++i)
+ merger_heap_node_delete(&merger->nodes[i]);
+
+ if (merger->nodes != NULL)
+ free(merger->nodes);
+
+ free(merger);
+}
+
+static int
+merger_next(struct merge_source *base, struct tuple_format *format,
+ struct tuple **out)
+{
+ struct merger *merger = container_of(base, struct merger, base);
+
+ /*
+ * Fetch a first tuple for each source and add all heap
+ * nodes to a merger heap.
+ */
+ if (!merger->started) {
+ for (uint32_t i = 0; i < merger->node_count; ++i) {
+ struct merger_heap_node *node = &merger->nodes[i];
+ if (merger_add_heap_node(merger, node) != 0)
+ return -1;
+ }
+ merger->started = true;
+ }
+
+ /* Get a next tuple. */
+ struct merger_heap_node *node = merger_heap_top(&merger->heap);
+ if (node == NULL) {
+ *out = NULL;
+ return 0;
+ }
+ struct tuple *tuple = node->tuple;
+ assert(tuple != NULL);
+
+ /* Validate the tuple. */
+ if (format != NULL && tuple_validate(format, tuple) != 0)
+ return -1;
+
+ /*
+ * Note: An old node->tuple pointer will be written to
+ * *out as refcounted tuple, so we don't unreference it
+ * here.
+ */
+ struct merge_source *source = node->source;
+ if (merge_source_next(source, merger->format, &node->tuple) != 0)
+ return -1;
+
+ /* Update a heap. */
+ if (node->tuple == NULL)
+ merger_heap_delete(&merger->heap, node);
+ else
+ merger_heap_update(&merger->heap, node);
+
+ *out = tuple;
+ return 0;
+}
+
+/* }}} */
diff --git a/src/box/merger.h b/src/box/merger.h
new file mode 100644
index 000000000..90ae175a9
--- /dev/null
+++ b/src/box/merger.h
@@ -0,0 +1,150 @@
+#ifndef TARANTOOL_BOX_MERGER_H_INCLUDED
+#define TARANTOOL_BOX_MERGER_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/* {{{ Structures */
+
+struct tuple;
+struct key_def;
+struct tuple_format;
+
+struct merge_source;
+
+struct merge_source_vtab {
+ /**
+ * Free a merge source.
+ *
+ * Don't call it directly, use merge_source_unref()
+ * instead.
+ */
+ void (*destroy)(struct merge_source *base);
+ /**
+ * Get a next tuple (refcounted) from a source.
+ *
+ * When format is not NULL the resulting tuple will be in
+ * a compatible format.
+ *
+ * When format is NULL it means that it does not matter
+ * for a caller in which format a tuple will be.
+ *
+ * Return 0 when successfully fetched a tuple or NULL. In
+ * case of an error set a diag and return -1.
+ */
+ int (*next)(struct merge_source *base, struct tuple_format *format,
+ struct tuple **out);
+};
+
+/**
+ * Base (abstract) structure to represent a merge source.
+ *
+ * The structure does not hold any resources.
+ */
+struct merge_source {
+ /* Source-specific methods. */
+ const struct merge_source_vtab *vtab;
+ /* Reference counter. */
+ int refs;
+};
+
+/* }}} */
+
+/* {{{ Base merge source functions */
+
+/**
+ * Increment a merge source reference counter.
+ */
+static inline void
+merge_source_ref(struct merge_source *source)
+{
+ ++source->refs;
+}
+
+/**
+ * Decrement a merge source reference counter. When it has
+ * reached zero, free the source (call destroy() virtual method).
+ */
+static inline void
+merge_source_unref(struct merge_source *source)
+{
+ assert(source->refs - 1 >= 0);
+ if (--source->refs == 0)
+ source->vtab->destroy(source);
+}
+
+/**
+ * @see merge_source_vtab
+ */
+static inline int
+merge_source_next(struct merge_source *source, struct tuple_format *format,
+ struct tuple **out)
+{
+ return source->vtab->next(source, format, out);
+}
+
+/**
+ * Initialize a base merge source structure.
+ */
+static inline void
+merge_source_create(struct merge_source *source, struct merge_source_vtab *vtab)
+{
+ source->vtab = vtab;
+ source->refs = 1;
+}
+
+/* }}} */
+
+/* {{{ Merger */
+
+/**
+ * Create a new merger.
+ *
+ * Return NULL and set a diag in case of an error.
+ */
+struct merge_source *
+merger_new(struct key_def *key_def, struct merge_source **sources,
+ uint32_t source_count, bool reverse);
+
+/* }}} */
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_BOX_MERGER_H_INCLUDED */
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 2c8340800..07dcd6cf2 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -224,3 +224,6 @@ target_link_libraries(swim.test unit swim)
add_executable(swim_proto.test swim_proto.c swim_test_transport.c swim_test_ev.c
swim_test_utils.c ${PROJECT_SOURCE_DIR}/src/version.c)
target_link_libraries(swim_proto.test unit swim)
+
+add_executable(merger.test merger.test.c)
+target_link_libraries(merger.test unit core box)
diff --git a/test/unit/merger.result b/test/unit/merger.result
new file mode 100644
index 000000000..766ebce63
--- /dev/null
+++ b/test/unit/merger.result
@@ -0,0 +1,71 @@
+1..4
+ *** test_basic ***
+ 1..9
+ *** test_array_source ***
+ ok 1 - array source next() (any format): tuple != NULL
+ ok 2 - array source next() (any format): skip tuple validation
+ ok 3 - array source next() (any format): check tuple size
+ ok 4 - array source next() (any format): check tuple data
+ ok 5 - array source next() (any format): tuple != NULL
+ ok 6 - array source next() (any format): skip tuple validation
+ ok 7 - array source next() (any format): check tuple size
+ ok 8 - array source next() (any format): check tuple data
+ ok 9 - array source is empty (any format)
+ *** test_array_source: done ***
+ok 1 - subtests
+ 1..9
+ *** test_array_source ***
+ ok 1 - array source next() (user's format): tuple != NULL
+ ok 2 - array source next() (user's format): validate tuple
+ ok 3 - array source next() (user's format): check tuple size
+ ok 4 - array source next() (user's format): check tuple data
+ ok 5 - array source next() (user's format): tuple != NULL
+ ok 6 - array source next() (user's format): validate tuple
+ ok 7 - array source next() (user's format): check tuple size
+ ok 8 - array source next() (user's format): check tuple data
+ ok 9 - array source is empty (user's format)
+ *** test_array_source: done ***
+ok 2 - subtests
+ 1..17
+ *** test_merger ***
+ ok 1 - merger next() (any format): tuple != NULL
+ ok 2 - merger next() (any format): skip tuple validation
+ ok 3 - merger next() (any format): check tuple size
+ ok 4 - merger next() (any format): check tuple data
+ ok 5 - merger next() (any format): tuple != NULL
+ ok 6 - merger next() (any format): skip tuple validation
+ ok 7 - merger next() (any format): check tuple size
+ ok 8 - merger next() (any format): check tuple data
+ ok 9 - merger next() (any format): tuple != NULL
+ ok 10 - merger next() (any format): skip tuple validation
+ ok 11 - merger next() (any format): check tuple size
+ ok 12 - merger next() (any format): check tuple data
+ ok 13 - merger next() (any format): tuple != NULL
+ ok 14 - merger next() (any format): skip tuple validation
+ ok 15 - merger next() (any format): check tuple size
+ ok 16 - merger next() (any format): check tuple data
+ ok 17 - merger is empty (any format)
+ *** test_merger: done ***
+ok 3 - subtests
+ 1..17
+ *** test_merger ***
+ ok 1 - merger next() (user's format): tuple != NULL
+ ok 2 - merger next() (user's format): validate tuple
+ ok 3 - merger next() (user's format): check tuple size
+ ok 4 - merger next() (user's format): check tuple data
+ ok 5 - merger next() (user's format): tuple != NULL
+ ok 6 - merger next() (user's format): validate tuple
+ ok 7 - merger next() (user's format): check tuple size
+ ok 8 - merger next() (user's format): check tuple data
+ ok 9 - merger next() (user's format): tuple != NULL
+ ok 10 - merger next() (user's format): validate tuple
+ ok 11 - merger next() (user's format): check tuple size
+ ok 12 - merger next() (user's format): check tuple data
+ ok 13 - merger next() (user's format): tuple != NULL
+ ok 14 - merger next() (user's format): validate tuple
+ ok 15 - merger next() (user's format): check tuple size
+ ok 16 - merger next() (user's format): check tuple data
+ ok 17 - merger is empty (user's format)
+ *** test_merger: done ***
+ok 4 - subtests
+ *** test_basic: done ***
diff --git a/test/unit/merger.test.c b/test/unit/merger.test.c
new file mode 100644
index 000000000..b4a989a20
--- /dev/null
+++ b/test/unit/merger.test.c
@@ -0,0 +1,285 @@
+#include "unit.h" /* plan, header, footer, is, ok */
+#include "memory.h" /* memory_init() */
+#include "fiber.h" /* fiber_init() */
+#include "box/tuple.h" /* tuple_init(), tuple_*(),
+ tuple_validate() */
+#include "box/tuple_format.h" /* tuple_format_*,
+ box_tuple_format_new() */
+#include "box/key_def.h" /* key_def_new(),
+ key_def_delete() */
+#include "box/merger.h" /* merger_*() */
+
+/* {{{ Array merge source */
+
+struct merge_source_array {
+ struct merge_source base;
+ uint32_t tuple_count;
+ struct tuple **tuples;
+ uint32_t cur;
+};
+
+/* Virtual methods declarations */
+
+static void
+merge_source_array_destroy(struct merge_source *base);
+static int
+merge_source_array_next(struct merge_source *base, struct tuple_format *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+static struct merge_source *
+merge_source_array_new(bool even)
+{
+ static struct merge_source_vtab merge_source_array_vtab = {
+ .destroy = merge_source_array_destroy,
+ .next = merge_source_array_next,
+ };
+
+ struct merge_source_array *source = malloc(
+ sizeof(struct merge_source_array));
+ assert(source != NULL);
+
+ merge_source_create(&source->base, &merge_source_array_vtab);
+
+ uint32_t tuple_size = 2;
+ const uint32_t tuple_count = 2;
+ /* [1], [3] */
+ static const char *data_odd[] = {"\x91\x01", "\x91\x03"};
+ /* [2], [4] */
+ static const char *data_even[] = {"\x91\x02", "\x91\x04"};
+ const char **data = even ? data_even : data_odd;
+ source->tuples = malloc(sizeof(struct tuple *) * tuple_count);
+ assert(source->tuples != NULL);
+ struct tuple_format *format = tuple_format_runtime;
+ for (uint32_t i = 0; i < tuple_count; ++i) {
+ const char *end = data[i] + tuple_size;
+ source->tuples[i] = tuple_new(format, data[i], end);
+ tuple_ref(source->tuples[i]);
+ }
+ source->tuple_count = tuple_count;
+ source->cur = 0;
+
+ return &source->base;
+}
+
+/* Virtual methods */
+
+static void
+merge_source_array_destroy(struct merge_source *base)
+{
+ struct merge_source_array *source = container_of(base,
+ struct merge_source_array, base);
+
+ for (uint32_t i = 0; i < source->tuple_count; ++i)
+ tuple_unref(source->tuples[i]);
+
+ free(source->tuples);
+ free(source);
+}
+
+static int
+merge_source_array_next(struct merge_source *base, struct tuple_format *format,
+ struct tuple **out)
+{
+ struct merge_source_array *source = container_of(base,
+ struct merge_source_array, base);
+
+ if (source->cur == source->tuple_count) {
+ *out = NULL;
+ return 0;
+ }
+
+ struct tuple *tuple = source->tuples[source->cur];
+ assert(tuple != NULL);
+
+ /*
+ * Note: The source still stores the tuple (and will
+ * unreference it during destroy). Here we should give a
+ * referenced tuple (so a caller should unreference it on
+ * its side).
+ */
+ tuple_ref(tuple);
+
+ *out = tuple;
+ ++source->cur;
+ return 0;
+}
+
+/* }}} */
+
+static struct key_part_def key_part_unsigned = {
+ .fieldno = 0,
+ .type = FIELD_TYPE_UNSIGNED,
+ .coll_id = COLL_NONE,
+ .is_nullable = false,
+ .nullable_action = ON_CONFLICT_ACTION_DEFAULT,
+ .sort_order = SORT_ORDER_ASC,
+ .path = NULL,
+};
+
+static struct key_part_def key_part_integer = {
+ .fieldno = 0,
+ .type = FIELD_TYPE_INTEGER,
+ .coll_id = COLL_NONE,
+ .is_nullable = false,
+ .nullable_action = ON_CONFLICT_ACTION_DEFAULT,
+ .sort_order = SORT_ORDER_ASC,
+ .path = NULL,
+};
+
+uint32_t
+min_u32(uint32_t a, uint32_t b)
+{
+ return a < b ? a : b;
+}
+
+void
+check_tuple(struct tuple *tuple, struct tuple_format *format,
+ const char *exp_data, uint32_t exp_data_len, const char *case_name)
+{
+ uint32_t size;
+ const char *data = tuple_data_range(tuple, &size);
+
+ ok(tuple != NULL, "%s: tuple != NULL", case_name);
+ if (format == NULL) {
+ ok(true, "%s: skip tuple validation", case_name);
+ } else {
+ int rc = tuple_validate(format, tuple);
+ is(rc, 0, "%s: validate tuple", case_name);
+ }
+ is(size, exp_data_len, "%s: check tuple size", case_name);
+ ok(!strncmp(data, exp_data, min_u32(size, exp_data_len)),
+ "%s: check tuple data", case_name);
+}
+
+/**
+ * Check array source itself (just in case).
+ */
+int
+test_array_source(struct tuple_format *format)
+{
+ plan(9);
+ header();
+
+ /* [1], [3] */
+ const uint32_t exp_tuple_size = 2;
+ const uint32_t exp_tuple_count = 2;
+ static const char *exp_tuples_data[] = {"\x91\x01", "\x91\x03"};
+
+ struct merge_source *source = merge_source_array_new(false);
+ assert(source != NULL);
+
+ struct tuple *tuple = NULL;
+ const char *msg = format == NULL ?
+ "array source next() (any format)" :
+ "array source next() (user's format)";
+ for (uint32_t i = 0; i < exp_tuple_count; ++i) {
+ int rc = merge_source_next(source, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size,
+ msg);
+ tuple_unref(tuple);
+ }
+ int rc = merge_source_next(source, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ is(tuple, NULL, format == NULL ?
+ "array source is empty (any format)" :
+ "array source is empty (user's format)");
+
+ merge_source_unref(source);
+
+ footer();
+ return check_plan();
+}
+
+int
+test_merger(struct tuple_format *format)
+{
+ plan(17);
+ header();
+
+ /* [1], [2], [3], [4] */
+ const uint32_t exp_tuple_size = 2;
+ const uint32_t exp_tuple_count = 4;
+ static const char *exp_tuples_data[] = {
+ "\x91\x01", "\x91\x02", "\x91\x03", "\x91\x04",
+ };
+
+ const uint32_t source_count = 2;
+ struct merge_source *sources[] = {
+ merge_source_array_new(false),
+ merge_source_array_new(true),
+ };
+
+ struct key_def *key_def = key_def_new(&key_part_unsigned, 1);
+ struct merge_source *merger = merger_new(key_def, sources, source_count,
+ false);
+ key_def_delete(key_def);
+
+ struct tuple *tuple = NULL;
+ const char *msg = format == NULL ?
+ "merger next() (any format)" :
+ "merger next() (user's format)";
+ for (uint32_t i = 0; i < exp_tuple_count; ++i) {
+ int rc = merge_source_next(merger, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size,
+ msg);
+ tuple_unref(tuple);
+ }
+ int rc = merge_source_next(merger, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ is(tuple, NULL, format == NULL ?
+ "merger is empty (any format)" :
+ "merger is empty (user's format)");
+
+ merge_source_unref(merger);
+ merge_source_unref(sources[0]);
+ merge_source_unref(sources[1]);
+
+ footer();
+ return check_plan();
+}
+
+int
+test_basic()
+{
+ plan(4);
+ header();
+
+ struct key_def *key_def = key_def_new(&key_part_integer, 1);
+ struct tuple_format *format = box_tuple_format_new(&key_def, 1);
+ assert(format != NULL);
+
+ test_array_source(NULL);
+ test_array_source(format);
+ test_merger(NULL);
+ test_merger(format);
+
+ key_def_delete(key_def);
+ tuple_format_unref(format);
+
+ footer();
+ return check_plan();
+}
+
+int
+main()
+{
+ memory_init();
+ fiber_init(fiber_c_invoke);
+ tuple_init(NULL);
+
+ int rc = test_basic();
+
+ tuple_free();
+ fiber_free();
+ memory_free();
+
+ return rc;
+}
--
2.21.0
More information about the Tarantool-patches
mailing list