From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Alexander Turenko Subject: [PATCH v4 3/4] Add merger for tuples streams (C part) Date: Wed, 8 May 2019 01:30:47 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit To: Vladimir Davydov Cc: Alexander Turenko , tarantool-patches@freelists.org List-ID: Needed for #3276. --- src/box/CMakeLists.txt | 1 + src/box/merger.c | 355 +++++++++++++++++++++++++++++++++++++++ src/box/merger.h | 150 +++++++++++++++++ test/unit/CMakeLists.txt | 3 + test/unit/merger.result | 71 ++++++++ test/unit/merger.test.c | 285 +++++++++++++++++++++++++++++++ 6 files changed, 865 insertions(+) create mode 100644 src/box/merger.c create mode 100644 src/box/merger.h create mode 100644 test/unit/merger.result create mode 100644 test/unit/merger.test.c diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index 2be0d1e35..ce328fb95 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -122,6 +122,7 @@ add_library(box STATIC execute.c wal.c call.c + merger.c ${lua_sources} lua/init.c lua/call.c diff --git a/src/box/merger.c b/src/box/merger.c new file mode 100644 index 000000000..744bba469 --- /dev/null +++ b/src/box/merger.c @@ -0,0 +1,355 @@ +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include "box/merger.h" + +#include +#include +#include +#include + +#define HEAP_FORWARD_DECLARATION +#include "salad/heap.h" + +#include "diag.h" /* diag_set() */ +#include "box/tuple.h" /* tuple_ref(), tuple_unref(), + tuple_validate() */ +#include "box/tuple_format.h" /* box_tuple_format_new(), + tuple_format_*() */ +#include "box/key_def.h" /* key_def_*(), + tuple_compare() */ + +/* {{{ Merger */ + +/** + * Holds a source to fetch next tuples and a last fetched tuple to + * compare the node against other nodes. + * + * The main reason why this structure is separated from a merge + * source is that a heap node can not be a member of several + * heaps. + * + * The second reason is that it allows to encapsulate all heap + * related logic inside this compilation unit, without any traces + * in externally visible structures. + */ +struct merger_heap_node { + /* A source of tuples. */ + struct merge_source *source; + /* + * A last fetched (refcounted) tuple to compare against + * other nodes. + */ + struct tuple *tuple; + /* An anchor to make the structure a merger heap node. */ + struct heap_node in_merger; +}; + +static bool +merge_source_less(const heap_t *heap, const struct merger_heap_node *left, + const struct merger_heap_node *right); +#define HEAP_NAME merger_heap +#define HEAP_LESS merge_source_less +#define heap_value_t struct merger_heap_node +#define heap_value_attr in_merger +#include "salad/heap.h" +#undef HEAP_NAME +#undef HEAP_LESS +#undef heap_value_t +#undef heap_value_attr + +/** + * Holds a heap, parameters of a merge process and utility fields. + */ +struct merger { + /* A merger is a source. */ + struct merge_source base; + /* + * Whether a merge process started. + * + * The merger postpones charging of heap nodes until a + * first output tuple is acquired. + */ + bool started; + /* A key_def to compare tuples. */ + struct key_def *key_def; + /* A format to acquire compatible tuples from sources. */ + struct tuple_format *format; + /* + * A heap of sources (of nodes that contains a source to + * be exact). + */ + heap_t heap; + /* An array of heap nodes. */ + uint32_t node_count; + struct merger_heap_node *nodes; + /* Ascending (false) / descending (true) order. */ + bool reverse; +}; + +/* Helpers */ + +/** + * Data comparing function to construct a heap of sources. + */ +static bool +merge_source_less(const heap_t *heap, const struct merger_heap_node *left, + const struct merger_heap_node *right) +{ + assert(left->tuple != NULL); + assert(right->tuple != NULL); + struct merger *merger = container_of(heap, struct merger, heap); + int cmp = tuple_compare(left->tuple, right->tuple, merger->key_def); + return merger->reverse ? cmp >= 0 : cmp < 0; +} + +/** + * Initialize a new merger heap node. + */ +static void +merger_heap_node_create(struct merger_heap_node *node, + struct merge_source *source) +{ + node->source = source; + merge_source_ref(node->source); + node->tuple = NULL; + heap_node_create(&node->in_merger); +} + +/** + * Free a merger heap node. + */ +static void +merger_heap_node_delete(struct merger_heap_node *node) +{ + merge_source_unref(node->source); + if (node->tuple != NULL) + tuple_unref(node->tuple); +} + +/** + * The helper to add a new heap node to a merger heap. + * + * Return -1 at an error and set a diag. + * + * Otherwise store a next tuple in node->tuple, add the node to + * merger->heap and return 0. + */ +static int +merger_add_heap_node(struct merger *merger, struct merger_heap_node *node) +{ + struct tuple *tuple = NULL; + + /* Acquire a next tuple. */ + struct merge_source *source = node->source; + if (merge_source_next(source, merger->format, &tuple) != 0) + return -1; + + /* Don't add an empty source to a heap. */ + if (tuple == NULL) + return 0; + + node->tuple = tuple; + + /* Add a node to a heap. */ + if (merger_heap_insert(&merger->heap, node) != 0) { + diag_set(OutOfMemory, 0, "malloc", "merger->heap"); + return -1; + } + + return 0; +} + +/* Virtual methods declarations */ + +static void +merger_delete(struct merge_source *base); +static int +merger_next(struct merge_source *base, struct tuple_format *format, + struct tuple **out); + +/* Non-virtual methods */ + +/** + * Set sources for a merger. + * + * It is the helper for merger_new(). + * + * Return 0 at success. Return -1 at an error and set a diag. + */ +static int +merger_set_sources(struct merger *merger, struct merge_source **sources, + uint32_t source_count) +{ + const size_t nodes_size = sizeof(struct merger_heap_node) * + source_count; + struct merger_heap_node *nodes = malloc(nodes_size); + if (nodes == NULL) { + diag_set(OutOfMemory, nodes_size, "malloc", + "merger heap nodes"); + return -1; + } + + for (uint32_t i = 0; i < source_count; ++i) + merger_heap_node_create(&nodes[i], sources[i]); + + merger->node_count = source_count; + merger->nodes = nodes; + return 0; +} + + +struct merge_source * +merger_new(struct key_def *key_def, struct merge_source **sources, + uint32_t source_count, bool reverse) +{ + static struct merge_source_vtab merger_vtab = { + .destroy = merger_delete, + .next = merger_next, + }; + + struct merger *merger = malloc(sizeof(struct merger)); + if (merger == NULL) { + diag_set(OutOfMemory, sizeof(struct merger), "malloc", + "merger"); + return NULL; + } + + /* + * We need to copy the key_def because it can be collected + * before a merge process ends (say, by LuaJIT GC if the + * key_def comes from Lua). + */ + key_def = key_def_dup(key_def); + if (key_def == NULL) { + free(merger); + return NULL; + } + + struct tuple_format *format = box_tuple_format_new(&key_def, 1); + if (format == NULL) { + key_def_delete(key_def); + free(merger); + return NULL; + } + + merge_source_create(&merger->base, &merger_vtab); + merger->started = false; + merger->key_def = key_def; + merger->format = format; + merger_heap_create(&merger->heap); + merger->node_count = 0; + merger->nodes = NULL; + merger->reverse = reverse; + + if (merger_set_sources(merger, sources, source_count) != 0) { + key_def_delete(merger->key_def); + tuple_format_unref(merger->format); + merger_heap_destroy(&merger->heap); + free(merger); + return NULL; + } + + return &merger->base; +} + +/* Virtual methods */ + +static void +merger_delete(struct merge_source *base) +{ + struct merger *merger = container_of(base, struct merger, base); + + key_def_delete(merger->key_def); + tuple_format_unref(merger->format); + merger_heap_destroy(&merger->heap); + + for (uint32_t i = 0; i < merger->node_count; ++i) + merger_heap_node_delete(&merger->nodes[i]); + + if (merger->nodes != NULL) + free(merger->nodes); + + free(merger); +} + +static int +merger_next(struct merge_source *base, struct tuple_format *format, + struct tuple **out) +{ + struct merger *merger = container_of(base, struct merger, base); + + /* + * Fetch a first tuple for each source and add all heap + * nodes to a merger heap. + */ + if (!merger->started) { + for (uint32_t i = 0; i < merger->node_count; ++i) { + struct merger_heap_node *node = &merger->nodes[i]; + if (merger_add_heap_node(merger, node) != 0) + return -1; + } + merger->started = true; + } + + /* Get a next tuple. */ + struct merger_heap_node *node = merger_heap_top(&merger->heap); + if (node == NULL) { + *out = NULL; + return 0; + } + struct tuple *tuple = node->tuple; + assert(tuple != NULL); + + /* Validate the tuple. */ + if (format != NULL && tuple_validate(format, tuple) != 0) + return -1; + + /* + * Note: An old node->tuple pointer will be written to + * *out as refcounted tuple, so we don't unreference it + * here. + */ + struct merge_source *source = node->source; + if (merge_source_next(source, merger->format, &node->tuple) != 0) + return -1; + + /* Update a heap. */ + if (node->tuple == NULL) + merger_heap_delete(&merger->heap, node); + else + merger_heap_update(&merger->heap, node); + + *out = tuple; + return 0; +} + +/* }}} */ diff --git a/src/box/merger.h b/src/box/merger.h new file mode 100644 index 000000000..90ae175a9 --- /dev/null +++ b/src/box/merger.h @@ -0,0 +1,150 @@ +#ifndef TARANTOOL_BOX_MERGER_H_INCLUDED +#define TARANTOOL_BOX_MERGER_H_INCLUDED +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include +#include +#include + +#if defined(__cplusplus) +extern "C" { +#endif /* defined(__cplusplus) */ + +/* {{{ Structures */ + +struct tuple; +struct key_def; +struct tuple_format; + +struct merge_source; + +struct merge_source_vtab { + /** + * Free a merge source. + * + * Don't call it directly, use merge_source_unref() + * instead. + */ + void (*destroy)(struct merge_source *base); + /** + * Get a next tuple (refcounted) from a source. + * + * When format is not NULL the resulting tuple will be in + * a compatible format. + * + * When format is NULL it means that it does not matter + * for a caller in which format a tuple will be. + * + * Return 0 when successfully fetched a tuple or NULL. In + * case of an error set a diag and return -1. + */ + int (*next)(struct merge_source *base, struct tuple_format *format, + struct tuple **out); +}; + +/** + * Base (abstract) structure to represent a merge source. + * + * The structure does not hold any resources. + */ +struct merge_source { + /* Source-specific methods. */ + const struct merge_source_vtab *vtab; + /* Reference counter. */ + int refs; +}; + +/* }}} */ + +/* {{{ Base merge source functions */ + +/** + * Increment a merge source reference counter. + */ +static inline void +merge_source_ref(struct merge_source *source) +{ + ++source->refs; +} + +/** + * Decrement a merge source reference counter. When it has + * reached zero, free the source (call destroy() virtual method). + */ +static inline void +merge_source_unref(struct merge_source *source) +{ + assert(source->refs - 1 >= 0); + if (--source->refs == 0) + source->vtab->destroy(source); +} + +/** + * @see merge_source_vtab + */ +static inline int +merge_source_next(struct merge_source *source, struct tuple_format *format, + struct tuple **out) +{ + return source->vtab->next(source, format, out); +} + +/** + * Initialize a base merge source structure. + */ +static inline void +merge_source_create(struct merge_source *source, struct merge_source_vtab *vtab) +{ + source->vtab = vtab; + source->refs = 1; +} + +/* }}} */ + +/* {{{ Merger */ + +/** + * Create a new merger. + * + * Return NULL and set a diag in case of an error. + */ +struct merge_source * +merger_new(struct key_def *key_def, struct merge_source **sources, + uint32_t source_count, bool reverse); + +/* }}} */ + +#if defined(__cplusplus) +} /* extern "C" */ +#endif /* defined(__cplusplus) */ + +#endif /* TARANTOOL_BOX_MERGER_H_INCLUDED */ diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 2c8340800..07dcd6cf2 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -224,3 +224,6 @@ target_link_libraries(swim.test unit swim) add_executable(swim_proto.test swim_proto.c swim_test_transport.c swim_test_ev.c swim_test_utils.c ${PROJECT_SOURCE_DIR}/src/version.c) target_link_libraries(swim_proto.test unit swim) + +add_executable(merger.test merger.test.c) +target_link_libraries(merger.test unit core box) diff --git a/test/unit/merger.result b/test/unit/merger.result new file mode 100644 index 000000000..766ebce63 --- /dev/null +++ b/test/unit/merger.result @@ -0,0 +1,71 @@ +1..4 + *** test_basic *** + 1..9 + *** test_array_source *** + ok 1 - array source next() (any format): tuple != NULL + ok 2 - array source next() (any format): skip tuple validation + ok 3 - array source next() (any format): check tuple size + ok 4 - array source next() (any format): check tuple data + ok 5 - array source next() (any format): tuple != NULL + ok 6 - array source next() (any format): skip tuple validation + ok 7 - array source next() (any format): check tuple size + ok 8 - array source next() (any format): check tuple data + ok 9 - array source is empty (any format) + *** test_array_source: done *** +ok 1 - subtests + 1..9 + *** test_array_source *** + ok 1 - array source next() (user's format): tuple != NULL + ok 2 - array source next() (user's format): validate tuple + ok 3 - array source next() (user's format): check tuple size + ok 4 - array source next() (user's format): check tuple data + ok 5 - array source next() (user's format): tuple != NULL + ok 6 - array source next() (user's format): validate tuple + ok 7 - array source next() (user's format): check tuple size + ok 8 - array source next() (user's format): check tuple data + ok 9 - array source is empty (user's format) + *** test_array_source: done *** +ok 2 - subtests + 1..17 + *** test_merger *** + ok 1 - merger next() (any format): tuple != NULL + ok 2 - merger next() (any format): skip tuple validation + ok 3 - merger next() (any format): check tuple size + ok 4 - merger next() (any format): check tuple data + ok 5 - merger next() (any format): tuple != NULL + ok 6 - merger next() (any format): skip tuple validation + ok 7 - merger next() (any format): check tuple size + ok 8 - merger next() (any format): check tuple data + ok 9 - merger next() (any format): tuple != NULL + ok 10 - merger next() (any format): skip tuple validation + ok 11 - merger next() (any format): check tuple size + ok 12 - merger next() (any format): check tuple data + ok 13 - merger next() (any format): tuple != NULL + ok 14 - merger next() (any format): skip tuple validation + ok 15 - merger next() (any format): check tuple size + ok 16 - merger next() (any format): check tuple data + ok 17 - merger is empty (any format) + *** test_merger: done *** +ok 3 - subtests + 1..17 + *** test_merger *** + ok 1 - merger next() (user's format): tuple != NULL + ok 2 - merger next() (user's format): validate tuple + ok 3 - merger next() (user's format): check tuple size + ok 4 - merger next() (user's format): check tuple data + ok 5 - merger next() (user's format): tuple != NULL + ok 6 - merger next() (user's format): validate tuple + ok 7 - merger next() (user's format): check tuple size + ok 8 - merger next() (user's format): check tuple data + ok 9 - merger next() (user's format): tuple != NULL + ok 10 - merger next() (user's format): validate tuple + ok 11 - merger next() (user's format): check tuple size + ok 12 - merger next() (user's format): check tuple data + ok 13 - merger next() (user's format): tuple != NULL + ok 14 - merger next() (user's format): validate tuple + ok 15 - merger next() (user's format): check tuple size + ok 16 - merger next() (user's format): check tuple data + ok 17 - merger is empty (user's format) + *** test_merger: done *** +ok 4 - subtests + *** test_basic: done *** diff --git a/test/unit/merger.test.c b/test/unit/merger.test.c new file mode 100644 index 000000000..b4a989a20 --- /dev/null +++ b/test/unit/merger.test.c @@ -0,0 +1,285 @@ +#include "unit.h" /* plan, header, footer, is, ok */ +#include "memory.h" /* memory_init() */ +#include "fiber.h" /* fiber_init() */ +#include "box/tuple.h" /* tuple_init(), tuple_*(), + tuple_validate() */ +#include "box/tuple_format.h" /* tuple_format_*, + box_tuple_format_new() */ +#include "box/key_def.h" /* key_def_new(), + key_def_delete() */ +#include "box/merger.h" /* merger_*() */ + +/* {{{ Array merge source */ + +struct merge_source_array { + struct merge_source base; + uint32_t tuple_count; + struct tuple **tuples; + uint32_t cur; +}; + +/* Virtual methods declarations */ + +static void +merge_source_array_destroy(struct merge_source *base); +static int +merge_source_array_next(struct merge_source *base, struct tuple_format *format, + struct tuple **out); + +/* Non-virtual methods */ + +static struct merge_source * +merge_source_array_new(bool even) +{ + static struct merge_source_vtab merge_source_array_vtab = { + .destroy = merge_source_array_destroy, + .next = merge_source_array_next, + }; + + struct merge_source_array *source = malloc( + sizeof(struct merge_source_array)); + assert(source != NULL); + + merge_source_create(&source->base, &merge_source_array_vtab); + + uint32_t tuple_size = 2; + const uint32_t tuple_count = 2; + /* [1], [3] */ + static const char *data_odd[] = {"\x91\x01", "\x91\x03"}; + /* [2], [4] */ + static const char *data_even[] = {"\x91\x02", "\x91\x04"}; + const char **data = even ? data_even : data_odd; + source->tuples = malloc(sizeof(struct tuple *) * tuple_count); + assert(source->tuples != NULL); + struct tuple_format *format = tuple_format_runtime; + for (uint32_t i = 0; i < tuple_count; ++i) { + const char *end = data[i] + tuple_size; + source->tuples[i] = tuple_new(format, data[i], end); + tuple_ref(source->tuples[i]); + } + source->tuple_count = tuple_count; + source->cur = 0; + + return &source->base; +} + +/* Virtual methods */ + +static void +merge_source_array_destroy(struct merge_source *base) +{ + struct merge_source_array *source = container_of(base, + struct merge_source_array, base); + + for (uint32_t i = 0; i < source->tuple_count; ++i) + tuple_unref(source->tuples[i]); + + free(source->tuples); + free(source); +} + +static int +merge_source_array_next(struct merge_source *base, struct tuple_format *format, + struct tuple **out) +{ + struct merge_source_array *source = container_of(base, + struct merge_source_array, base); + + if (source->cur == source->tuple_count) { + *out = NULL; + return 0; + } + + struct tuple *tuple = source->tuples[source->cur]; + assert(tuple != NULL); + + /* + * Note: The source still stores the tuple (and will + * unreference it during destroy). Here we should give a + * referenced tuple (so a caller should unreference it on + * its side). + */ + tuple_ref(tuple); + + *out = tuple; + ++source->cur; + return 0; +} + +/* }}} */ + +static struct key_part_def key_part_unsigned = { + .fieldno = 0, + .type = FIELD_TYPE_UNSIGNED, + .coll_id = COLL_NONE, + .is_nullable = false, + .nullable_action = ON_CONFLICT_ACTION_DEFAULT, + .sort_order = SORT_ORDER_ASC, + .path = NULL, +}; + +static struct key_part_def key_part_integer = { + .fieldno = 0, + .type = FIELD_TYPE_INTEGER, + .coll_id = COLL_NONE, + .is_nullable = false, + .nullable_action = ON_CONFLICT_ACTION_DEFAULT, + .sort_order = SORT_ORDER_ASC, + .path = NULL, +}; + +uint32_t +min_u32(uint32_t a, uint32_t b) +{ + return a < b ? a : b; +} + +void +check_tuple(struct tuple *tuple, struct tuple_format *format, + const char *exp_data, uint32_t exp_data_len, const char *case_name) +{ + uint32_t size; + const char *data = tuple_data_range(tuple, &size); + + ok(tuple != NULL, "%s: tuple != NULL", case_name); + if (format == NULL) { + ok(true, "%s: skip tuple validation", case_name); + } else { + int rc = tuple_validate(format, tuple); + is(rc, 0, "%s: validate tuple", case_name); + } + is(size, exp_data_len, "%s: check tuple size", case_name); + ok(!strncmp(data, exp_data, min_u32(size, exp_data_len)), + "%s: check tuple data", case_name); +} + +/** + * Check array source itself (just in case). + */ +int +test_array_source(struct tuple_format *format) +{ + plan(9); + header(); + + /* [1], [3] */ + const uint32_t exp_tuple_size = 2; + const uint32_t exp_tuple_count = 2; + static const char *exp_tuples_data[] = {"\x91\x01", "\x91\x03"}; + + struct merge_source *source = merge_source_array_new(false); + assert(source != NULL); + + struct tuple *tuple = NULL; + const char *msg = format == NULL ? + "array source next() (any format)" : + "array source next() (user's format)"; + for (uint32_t i = 0; i < exp_tuple_count; ++i) { + int rc = merge_source_next(source, format, &tuple); + (void) rc; + assert(rc == 0); + check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size, + msg); + tuple_unref(tuple); + } + int rc = merge_source_next(source, format, &tuple); + (void) rc; + assert(rc == 0); + is(tuple, NULL, format == NULL ? + "array source is empty (any format)" : + "array source is empty (user's format)"); + + merge_source_unref(source); + + footer(); + return check_plan(); +} + +int +test_merger(struct tuple_format *format) +{ + plan(17); + header(); + + /* [1], [2], [3], [4] */ + const uint32_t exp_tuple_size = 2; + const uint32_t exp_tuple_count = 4; + static const char *exp_tuples_data[] = { + "\x91\x01", "\x91\x02", "\x91\x03", "\x91\x04", + }; + + const uint32_t source_count = 2; + struct merge_source *sources[] = { + merge_source_array_new(false), + merge_source_array_new(true), + }; + + struct key_def *key_def = key_def_new(&key_part_unsigned, 1); + struct merge_source *merger = merger_new(key_def, sources, source_count, + false); + key_def_delete(key_def); + + struct tuple *tuple = NULL; + const char *msg = format == NULL ? + "merger next() (any format)" : + "merger next() (user's format)"; + for (uint32_t i = 0; i < exp_tuple_count; ++i) { + int rc = merge_source_next(merger, format, &tuple); + (void) rc; + assert(rc == 0); + check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size, + msg); + tuple_unref(tuple); + } + int rc = merge_source_next(merger, format, &tuple); + (void) rc; + assert(rc == 0); + is(tuple, NULL, format == NULL ? + "merger is empty (any format)" : + "merger is empty (user's format)"); + + merge_source_unref(merger); + merge_source_unref(sources[0]); + merge_source_unref(sources[1]); + + footer(); + return check_plan(); +} + +int +test_basic() +{ + plan(4); + header(); + + struct key_def *key_def = key_def_new(&key_part_integer, 1); + struct tuple_format *format = box_tuple_format_new(&key_def, 1); + assert(format != NULL); + + test_array_source(NULL); + test_array_source(format); + test_merger(NULL); + test_merger(format); + + key_def_delete(key_def); + tuple_format_unref(format); + + footer(); + return check_plan(); +} + +int +main() +{ + memory_init(); + fiber_init(fiber_c_invoke); + tuple_init(NULL); + + int rc = test_basic(); + + tuple_free(); + fiber_free(); + memory_free(); + + return rc; +} -- 2.21.0