From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Alexander Turenko Subject: [PATCH v3 6/7] Add merger for tuples streams (C part) Date: Wed, 10 Apr 2019 18:21:24 +0300 Message-Id: <963ad528ad35199943931150956c1d5e2c374c40.1554906327.git.alexander.turenko@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit To: Vladimir Davydov Cc: Alexander Turenko , tarantool-patches@freelists.org List-ID: Needed for #3276. --- src/box/CMakeLists.txt | 1 + src/box/merger.c | 464 +++++++++++++++++++++++++++++++++++++++ src/box/merger.h | 180 +++++++++++++++ test/unit/CMakeLists.txt | 3 + test/unit/merger.result | 71 ++++++ test/unit/merger.test.c | 301 +++++++++++++++++++++++++ 6 files changed, 1020 insertions(+) create mode 100644 src/box/merger.c create mode 100644 src/box/merger.h create mode 100644 test/unit/merger.result create mode 100644 test/unit/merger.test.c diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index 7fbbc7803..d1251c326 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -121,6 +121,7 @@ add_library(box STATIC execute.c wal.c call.c + merger.c ${lua_sources} lua/init.c lua/call.c diff --git a/src/box/merger.c b/src/box/merger.c new file mode 100644 index 000000000..83f628758 --- /dev/null +++ b/src/box/merger.c @@ -0,0 +1,464 @@ +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include "box/merger.h" + +#include +#include +#include +#include + +#define HEAP_FORWARD_DECLARATION +#include "salad/heap.h" + +#include "trivia/util.h" /* unlikely() */ +#include "diag.h" /* diag_set() */ +#include "say.h" /* panic() */ +#include "box/tuple.h" /* box_tuple_unref() */ +#include "box/tuple_format.h" /* box_tuple_format_*(), + tuple_format_id() */ +#include "box/key_def.h" /* box_key_def_*(), + box_tuple_compare() */ + +/* {{{ Create, delete, ref, unref a source and a context */ + +enum { MERGER_SOURCE_REF_MAX = INT_MAX }; +enum { MERGER_CONTEXT_REF_MAX = INT_MAX }; + +void +merger_source_ref(struct merger_source *source) +{ + if (unlikely(source->refs >= MERGER_SOURCE_REF_MAX)) + panic("Merger source reference counter overflow"); + ++source->refs; +} + +void +merger_source_unref(struct merger_source *source) +{ + assert(source->refs - 1 >= 0); + if (--source->refs == 0) + source->vtab->delete(source); +} + +void +merger_source_new(struct merger_source *source, struct merger_source_vtab *vtab) +{ + source->vtab = vtab; + source->refs = 0; +} + +void +merger_context_delete(struct merger_context *ctx) +{ + box_key_def_delete(ctx->key_def); + box_tuple_format_unref(ctx->format); + free(ctx); +} + +void +merger_context_ref(struct merger_context *ctx) +{ + if (unlikely(ctx->refs >= MERGER_CONTEXT_REF_MAX)) + panic("Merger context reference counter overflow"); + ++ctx->refs; +} + +void +merger_context_unref(struct merger_context *ctx) +{ + assert(ctx->refs - 1 >= 0); + if (--ctx->refs == 0) + merger_context_delete(ctx); +} + +struct merger_context * +merger_context_new(const struct key_def *key_def) +{ + struct merger_context *ctx = (struct merger_context *) malloc( + sizeof(struct merger_context)); + if (ctx == NULL) { + diag_set(OutOfMemory, sizeof(struct merger_context), "malloc", + "merger_context"); + return NULL; + } + + /* + * We need to copy key_def, because a key_def from the + * parameter can be collected before merger_context end + * of life (say, by LuaJIT GC if the key_def comes from + * Lua). + */ + ctx->key_def = key_def_dup(key_def); + if (ctx->key_def == NULL) { + free(ctx); + return NULL; + } + + ctx->format = box_tuple_format_new(&ctx->key_def, 1); + if (ctx->format == NULL) { + box_key_def_delete(ctx->key_def); + free(ctx); + return NULL; + } + + ctx->refs = 0; + + return ctx; +} + +/* }}} */ + +/* {{{ Merger */ + +/** + * Holds a source to fetch next tuples and a last fetched tuple to + * compare the node against other nodes. + * + * The main reason why this structure is separated from a merger + * source is that a heap node can not be a member of several + * heaps. + * + * The second reason is that it allows to incapsulate all heap + * related logic inside this compilation unit, without any trails + * in externally visible structures. + */ +struct merger_heap_node { + /* A source of tuples. */ + struct merger_source *source; + /* + * A last fetched (refcounted) tuple to compare against + * other nodes. + */ + struct tuple *tuple; + /* An anchor to make the structure a merger heap node. */ + struct heap_node heap_node_anchor; +}; + +static bool +merger_source_less(const heap_t *heap, const struct merger_heap_node *left, + const struct merger_heap_node *right); +#define HEAP_NAME merger_heap +#define HEAP_LESS merger_source_less +#define heap_value_t struct merger_heap_node +#define heap_value_attr heap_node_anchor +#include "salad/heap.h" +#undef HEAP_NAME +#undef HEAP_LESS +#undef heap_value_t +#undef heap_value_attr + +/** + * Holds a heap, an immutable context, parameters of a merge + * process and utility fields. + */ +struct merger { + /* A merger is a source. */ + struct merger_source base; + /* + * Whether a merge process started. + * + * The merger postpones charging of heap nodes until a + * first output tuple is acquired. + */ + bool started; + /* A merger context. */ + struct merger_context *ctx; + /* + * A heap of sources (of nodes that contains a source to + * be exact). + */ + heap_t heap; + /* An array of heap nodes. */ + uint32_t nodes_count; + struct merger_heap_node *nodes; + /* Ascending (false) / descending (true) order. */ + bool reverse; +}; + +/* Helpers */ + +/** + * Data comparing function to construct a heap of sources. + */ +static bool +merger_source_less(const heap_t *heap, const struct merger_heap_node *left, + const struct merger_heap_node *right) +{ + if (left->tuple == NULL && right->tuple == NULL) + return false; + if (left->tuple == NULL) + return false; + if (right->tuple == NULL) + return true; + struct merger *merger = container_of(heap, struct merger, heap); + int cmp = box_tuple_compare(left->tuple, right->tuple, + merger->ctx->key_def); + return merger->reverse ? cmp >= 0 : cmp < 0; +} + +/** + * How much more memory the heap will reserve at the next grow. + * + * See HEAP(reserve)() function in lib/salad/heap.h. + */ +static size_t +heap_next_grow_size(const heap_t *heap) +{ + heap_off_t heap_capacity_diff = heap->capacity == 0 ? + HEAP_INITIAL_CAPACITY : heap->capacity; + return heap_capacity_diff * sizeof(struct heap_node *); +} + +/** + * Initialize a new merger heap node. + */ +static void +merger_heap_node_new(struct merger_heap_node *node, + struct merger_source *source) +{ + node->source = source; + merger_source_ref(node->source); + node->tuple = NULL; + heap_node_create(&node->heap_node_anchor); +} + +/** + * Free a merger heap node. + */ +static void +merger_heap_node_delete(struct merger_heap_node *node) +{ + merger_source_unref(node->source); + if (node->tuple != NULL) + box_tuple_unref(node->tuple); +} + +/** + * The helper to add a new heap node to a merger heap. + * + * Return -1 at an error and set a diag. + * + * Otherwise store a next tuple in node->tuple, add the node to + * merger->heap and return 0. + */ +static int +merger_add_heap_node(struct merger *merger, struct merger_heap_node *node) +{ + struct tuple *tuple = NULL; + + /* Acquire a next tuple. */ + struct merger_source *source = node->source; + if (source->vtab->next(source, merger->ctx->format, &tuple) != 0) + return -1; + + /* Don't add an empty source to a heap. */ + if (tuple == NULL) + return 0; + + node->tuple = tuple; + + /* Add a node to a heap. */ + if (merger_heap_insert(&merger->heap, node)) { + diag_set(OutOfMemory, heap_next_grow_size(&merger->heap), + "malloc", "merger->heap"); + return -1; + } + + return 0; +} + +/* Virtual methods declarations */ + +static void +merger_delete(struct merger_source *base); +static int +merger_next(struct merger_source *base, box_tuple_format_t *format, + struct tuple **out); + +/* Non-virtual methods */ + +struct merger_source * +merger_new(struct merger_context *ctx) +{ + static struct merger_source_vtab merger_vtab = { + .delete = merger_delete, + .next = merger_next, + }; + + struct merger *merger = (struct merger *) malloc(sizeof(struct merger)); + if (merger == NULL) { + diag_set(OutOfMemory, sizeof(struct merger), "malloc", + "merger"); + return NULL; + } + + merger_source_new(&merger->base, &merger_vtab); + + merger->started = false; + merger->ctx = ctx; + merger_context_ref(merger->ctx); + merger_heap_create(&merger->heap); + merger->nodes_count = 0; + merger->nodes = NULL; + merger->reverse = false; + + return &merger->base; +} + +int +merger_set_sources(struct merger_source *base, struct merger_source **sources, + uint32_t sources_count) +{ + struct merger *merger = container_of(base, struct merger, base); + + /* Ensure we don't leak old nodes. */ + assert(merger->nodes_count == 0); + assert(merger->nodes == NULL); + + const size_t nodes_size = + sources_count * sizeof(struct merger_heap_node); + struct merger_heap_node *nodes = (struct merger_heap_node *) malloc( + nodes_size); + if (nodes == NULL) { + diag_set(OutOfMemory, nodes_size, "malloc", + "merger heap nodes"); + return -1; + } + + for (uint32_t i = 0; i < sources_count; ++i) + merger_heap_node_new(&nodes[i], sources[i]); + + merger->nodes_count = sources_count; + merger->nodes = nodes; + return 0; +} + +void +merger_set_reverse(struct merger_source *base, bool reverse) +{ + struct merger *merger = container_of(base, struct merger, base); + + merger->reverse = reverse; +} + +/* Virtual methods */ + +static void +merger_delete(struct merger_source *base) +{ + struct merger *merger = container_of(base, struct merger, base); + + merger_context_unref(merger->ctx); + merger_heap_destroy(&merger->heap); + + for (uint32_t i = 0; i < merger->nodes_count; ++i) + merger_heap_node_delete(&merger->nodes[i]); + + if (merger->nodes != NULL) + free(merger->nodes); + + free(merger); +} + +static int +merger_next(struct merger_source *base, box_tuple_format_t *format, + struct tuple **out) +{ + struct merger *merger = container_of(base, struct merger, base); + + /* + * Fetch a first tuple for each source and add all heap + * nodes to a merger heap. + */ + if (!merger->started) { + for (uint32_t i = 0; i < merger->nodes_count; ++i) { + struct merger_heap_node *node = &merger->nodes[i]; + if (merger_add_heap_node(merger, node) != 0) + return -1; + } + merger->started = true; + } + + /* Get a next tuple. */ + struct merger_heap_node *node = merger_heap_top(&merger->heap); + if (node == NULL) { + *out = NULL; + return 0; + } + struct tuple *tuple = node->tuple; + assert(tuple != NULL); + + /* + * The tuples are stored in merger->ctx->format for + * fast comparisons, but we should return tuples in a + * requested format. + */ + uint32_t id_stored = tuple_format_id(merger->ctx->format); + assert(tuple->format_id == id_stored); + if (format == NULL) + format = merger->ctx->format; + uint32_t id_requested = tuple_format_id(format); + if (id_stored != id_requested) { + const char *tuple_beg = tuple_data(tuple); + const char *tuple_end = tuple_beg + tuple->bsize; + tuple = box_tuple_new(format, tuple_beg, tuple_end); + if (tuple == NULL) + return -1; + box_tuple_ref(tuple); + /* + * The node->tuple pointer will be rewritten below + * and in this branch it will not be returned. So + * we unreference it. + */ + box_tuple_unref(node->tuple); + } + + /* + * Note: An old node->tuple pointer will be written to + * *out as refcounted tuple or is already unreferenced + * above, so we don't unreference it here. + */ + struct merger_source *source = node->source; + if (source->vtab->next(source, merger->ctx->format, &node->tuple) != 0) + return -1; + + /* Update a heap. */ + if (node->tuple == NULL) + merger_heap_delete(&merger->heap, node); + else + merger_heap_update(&merger->heap, node); + + *out = tuple; + return 0; +} + +/* }}} */ diff --git a/src/box/merger.h b/src/box/merger.h new file mode 100644 index 000000000..2323dd7d7 --- /dev/null +++ b/src/box/merger.h @@ -0,0 +1,180 @@ +#ifndef TARANTOOL_BOX_MERGER_H_INCLUDED +#define TARANTOOL_BOX_MERGER_H_INCLUDED +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include +#include + +#if defined(__cplusplus) +extern "C" { +#endif /* defined(__cplusplus) */ + +/* {{{ Structures */ + +struct tuple; +struct key_def; +struct tuple_format; +typedef struct tuple_format box_tuple_format_t; + +struct merger_source; +struct merger_context; + +struct merger_source_vtab { + /** + * Free a merger source. + * + * Don't call it directly, use merger_source_unref() + * instead. + */ + void (*delete)(struct merger_source *base); + /** + * Get a next tuple (refcounted) from a source. + * + * When format is NULL it means that it does not matter + * for a caller in which format a tuple will be. + * + * Return 0 when successfully fetched a tuple or NULL. In + * case of an error set a diag and return -1. + */ + int (*next)(struct merger_source *base, box_tuple_format_t *format, + struct tuple **out); +}; + +/** + * Base (abstract) structure to represent a merger source. + * + * The structure does not hold any resources. + */ +struct merger_source { + /* Source-specific methods. */ + struct merger_source_vtab *vtab; + /* Reference counter. */ + int refs; +}; + +/** + * Holds immutable parameters of a merger. + */ +struct merger_context { + struct key_def *key_def; + box_tuple_format_t *format; + /* Reference counter. */ + int refs; +}; + +/* }}} */ + +/* {{{ Create, delete, ref, unref a source and a context */ + +/** + * Increment a merger source reference counter. + */ +void +merger_source_ref(struct merger_source *source); + +/** + * Decrement a merger source reference counter. When it has + * reached zero, free the source (call delete() virtual method). + */ +void +merger_source_unref(struct merger_source *source); + +/** + * Initialize a base merger source structure. + */ +void +merger_source_new(struct merger_source *source, + struct merger_source_vtab *vtab); + +/** + * Free a merger context. + */ +void +merger_context_delete(struct merger_context *ctx); + +/** + * Increment a merger context reference counter. + */ +void +merger_context_ref(struct merger_context *ctx); + +/** + * Decrement a merger context reference counter. When it has + * reached zero, free the context. + */ +void +merger_context_unref(struct merger_context *ctx); + +/** + * Create a new merger context. + * + * A returned merger context is NOT reference counted. + * + * Return NULL and set a diag in case of an error. + */ +struct merger_context * +merger_context_new(const struct key_def *key_def); + +/* }}} */ + +/* {{{ Merger */ + +/** + * Create a new merger w/o sources. + * + * Return NULL and set a diag in case of an error. + */ +struct merger_source * +merger_new(struct merger_context *ctx); + +/** + * Set sources for a merger. + * + * Return 0 at success. Return -1 at an error and set a diag. + */ +int +merger_set_sources(struct merger_source *base, struct merger_source **sources, + uint32_t sources_count); + +/** + * Set reverse flag for a merger. + */ +void +merger_set_reverse(struct merger_source *base, bool reverse); + +/* }}} */ + +#if defined(__cplusplus) +} /* extern "C" */ +#endif /* defined(__cplusplus) */ + +#endif /* TARANTOOL_BOX_MERGER_H_INCLUDED */ diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 64739ab09..afdcbebf6 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -224,3 +224,6 @@ target_link_libraries(swim.test unit swim) add_executable(swim_proto.test swim_proto.c swim_test_transport.c swim_test_ev.c swim_test_utils.c ${PROJECT_SOURCE_DIR}/src/version.c) target_link_libraries(swim_proto.test unit swim) + +add_executable(merger.test merger.test.c) +target_link_libraries(merger.test unit core box) diff --git a/test/unit/merger.result b/test/unit/merger.result new file mode 100644 index 000000000..11399e0bc --- /dev/null +++ b/test/unit/merger.result @@ -0,0 +1,71 @@ +1..4 + *** test_basic *** + 1..9 + *** test_array_source *** + ok 1 - array source next() (any format): tuple != NULL + ok 2 - array source next() (any format): skip tuple format id check + ok 3 - array source next() (any format): check tuple size + ok 4 - array source next() (any format): check tuple data + ok 5 - array source next() (any format): tuple != NULL + ok 6 - array source next() (any format): skip tuple format id check + ok 7 - array source next() (any format): check tuple size + ok 8 - array source next() (any format): check tuple data + ok 9 - array source is empty (any format) + *** test_array_source: done *** +ok 1 - subtests + 1..9 + *** test_array_source *** + ok 1 - array source next() (user's format): tuple != NULL + ok 2 - array source next() (user's format): check tuple format id + ok 3 - array source next() (user's format): check tuple size + ok 4 - array source next() (user's format): check tuple data + ok 5 - array source next() (user's format): tuple != NULL + ok 6 - array source next() (user's format): check tuple format id + ok 7 - array source next() (user's format): check tuple size + ok 8 - array source next() (user's format): check tuple data + ok 9 - array source is empty (user's format) + *** test_array_source: done *** +ok 2 - subtests + 1..17 + *** test_merger *** + ok 1 - merger next() (any format): tuple != NULL + ok 2 - merger next() (any format): skip tuple format id check + ok 3 - merger next() (any format): check tuple size + ok 4 - merger next() (any format): check tuple data + ok 5 - merger next() (any format): tuple != NULL + ok 6 - merger next() (any format): skip tuple format id check + ok 7 - merger next() (any format): check tuple size + ok 8 - merger next() (any format): check tuple data + ok 9 - merger next() (any format): tuple != NULL + ok 10 - merger next() (any format): skip tuple format id check + ok 11 - merger next() (any format): check tuple size + ok 12 - merger next() (any format): check tuple data + ok 13 - merger next() (any format): tuple != NULL + ok 14 - merger next() (any format): skip tuple format id check + ok 15 - merger next() (any format): check tuple size + ok 16 - merger next() (any format): check tuple data + ok 17 - merger is empty (any format) + *** test_merger: done *** +ok 3 - subtests + 1..17 + *** test_merger *** + ok 1 - merger next() (user's format): tuple != NULL + ok 2 - merger next() (user's format): check tuple format id + ok 3 - merger next() (user's format): check tuple size + ok 4 - merger next() (user's format): check tuple data + ok 5 - merger next() (user's format): tuple != NULL + ok 6 - merger next() (user's format): check tuple format id + ok 7 - merger next() (user's format): check tuple size + ok 8 - merger next() (user's format): check tuple data + ok 9 - merger next() (user's format): tuple != NULL + ok 10 - merger next() (user's format): check tuple format id + ok 11 - merger next() (user's format): check tuple size + ok 12 - merger next() (user's format): check tuple data + ok 13 - merger next() (user's format): tuple != NULL + ok 14 - merger next() (user's format): check tuple format id + ok 15 - merger next() (user's format): check tuple size + ok 16 - merger next() (user's format): check tuple data + ok 17 - merger is empty (user's format) + *** test_merger: done *** +ok 4 - subtests + *** test_basic: done *** diff --git a/test/unit/merger.test.c b/test/unit/merger.test.c new file mode 100644 index 000000000..0a25d8f04 --- /dev/null +++ b/test/unit/merger.test.c @@ -0,0 +1,301 @@ +#include "unit.h" /* plan, header, footer, is, ok */ +#include "memory.h" /* memory_init() */ +#include "fiber.h" /* fiber_init() */ +#include "box/tuple.h" /* tuple_init(), box_tuple_*(), + tuple_*() */ +#include "box/tuple_format.h" /* box_tuple_format_default(), + tuple_format_id() */ +#include "box/key_def.h" /* key_def_new(), + key_def_delete() */ +#include "box/merger.h" /* merger_*() */ + +/* {{{ Array merger source */ + +struct merger_source_array { + struct merger_source base; + uint32_t tuples_count; + struct tuple **tuples; + uint32_t cur; +}; + +/* Virtual methods declarations */ + +static void +merger_source_array_delete(struct merger_source *base); +static int +merger_source_array_next(struct merger_source *base, box_tuple_format_t *format, + struct tuple **out); + +/* Non-virtual methods */ + +static struct merger_source * +merger_source_array_new(bool even) +{ + static struct merger_source_vtab merger_source_array_vtab = { + .delete = merger_source_array_delete, + .next = merger_source_array_next, + }; + + struct merger_source_array *source = + (struct merger_source_array *) malloc( + sizeof(struct merger_source_array)); + assert(source != NULL); + + merger_source_new(&source->base, &merger_source_array_vtab); + + uint32_t tuple_size = 2; + const uint32_t tuples_count = 2; + /* {1}, {3} */ + static const char *data_odd[] = {"\x91\x01", "\x91\x03"}; + /* {2}, {4} */ + static const char *data_even[] = {"\x91\x02", "\x91\x04"}; + const char **data = even ? data_even : data_odd; + source->tuples = (struct tuple **) malloc( + tuples_count * sizeof(struct tuple *)); + assert(source->tuples != NULL); + box_tuple_format_t *format = box_tuple_format_default(); + for (uint32_t i = 0; i < tuples_count; ++i) { + const char *end = data[i] + tuple_size; + source->tuples[i] = box_tuple_new(format, data[i], end); + box_tuple_ref(source->tuples[i]); + } + source->tuples_count = tuples_count; + source->cur = 0; + + return &source->base; +} + +/* Virtual methods */ + +static void +merger_source_array_delete(struct merger_source *base) +{ + struct merger_source_array *source = container_of(base, + struct merger_source_array, base); + + for (uint32_t i = 0; i < source->tuples_count; ++i) + box_tuple_unref(source->tuples[i]); + + free(source->tuples); + free(source); +} + +static int +merger_source_array_next(struct merger_source *base, box_tuple_format_t *format, + struct tuple **out) +{ + struct merger_source_array *source = container_of(base, + struct merger_source_array, base); + + if (source->cur == source->tuples_count) { + *out = NULL; + return 0; + } + + struct tuple *tuple = source->tuples[source->cur]; + + box_tuple_format_t *default_format = box_tuple_format_default(); + uint32_t id_stored = tuple_format_id(default_format); + assert(tuple->format_id == id_stored); + if (format == NULL) + format = default_format; + uint32_t id_requested = tuple_format_id(format); + if (id_stored != id_requested) { + const char *tuple_beg = tuple_data(tuple); + const char *tuple_end = tuple_beg + tuple->bsize; + tuple = box_tuple_new(format, tuple_beg, tuple_end); + assert(tuple != NULL); + } + + assert(tuple != NULL); + box_tuple_ref(tuple); + *out = tuple; + ++source->cur; + return 0; +} + +/* }}} */ + +static struct key_part_def key_part_unsigned = { + .fieldno = 0, + .type = FIELD_TYPE_UNSIGNED, + .coll_id = COLL_NONE, + .is_nullable = false, + .nullable_action = ON_CONFLICT_ACTION_DEFAULT, + .sort_order = SORT_ORDER_ASC, + .path = NULL, +}; + +static struct key_part_def key_part_integer = { + .fieldno = 0, + .type = FIELD_TYPE_INTEGER, + .coll_id = COLL_NONE, + .is_nullable = false, + .nullable_action = ON_CONFLICT_ACTION_DEFAULT, + .sort_order = SORT_ORDER_ASC, + .path = NULL, +}; + +uint32_t +min_u32(uint32_t a, uint32_t b) +{ + return a < b ? a : b; +} + +void +check_tuple(struct tuple *tuple, box_tuple_format_t *format, + const char *exp_data, uint32_t exp_data_len, const char *case_name) +{ + uint32_t size; + const char *data = tuple_data_range(tuple, &size); + + ok(tuple != NULL, "%s: tuple != NULL", case_name); + if (format == NULL) { + ok(true, "%s: skip tuple format id check", case_name); + } else { + is(tuple->format_id, tuple_format_id(format), + "%s: check tuple format id", case_name); + } + is(size, exp_data_len, "%s: check tuple size", case_name); + ok(!strncmp(data, exp_data, min_u32(size, exp_data_len)), + "%s: check tuple data", case_name); +} + +/** + * Check array source itself (just in case). + */ +int +test_array_source(box_tuple_format_t *format) +{ + plan(9); + header(); + + /* {1}, {3} */ + const uint32_t exp_tuple_size = 2; + const uint32_t exp_tuples_count = 2; + static const char *exp_tuples_data[] = {"\x91\x01", "\x91\x03"}; + + struct merger_source *source = merger_source_array_new(false); + assert(source != NULL); + merger_source_ref(source); + + struct tuple *tuple = NULL; + const char *msg = format == NULL ? + "array source next() (any format)" : + "array source next() (user's format)"; + for (uint32_t i = 0; i < exp_tuples_count; ++i) { + int rc = source->vtab->next(source, format, &tuple); + (void) rc; + assert(rc == 0); + check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size, + msg); + box_tuple_unref(tuple); + } + int rc = source->vtab->next(source, format, &tuple); + (void) rc; + assert(rc == 0); + is(tuple, NULL, format == NULL ? + "array source is empty (any format)" : + "array source is empty (user's format)"); + + merger_source_unref(source); + + footer(); + return check_plan(); +} + +int +test_merger(box_tuple_format_t *format) +{ + plan(17); + header(); + + /* {1}, {2}, {3}, {4} */ + const uint32_t exp_tuple_size = 2; + const uint32_t exp_tuples_count = 4; + static const char *exp_tuples_data[] = { + "\x91\x01", "\x91\x02", "\x91\x03", "\x91\x04", + }; + + const uint32_t sources_count = 2; + struct merger_source *sources[] = { + merger_source_array_new(false), + merger_source_array_new(true), + }; + merger_source_ref(sources[0]); + merger_source_ref(sources[1]); + + struct key_def *key_def = key_def_new(&key_part_unsigned, 1); + struct merger_context *ctx = merger_context_new(key_def); + merger_context_ref(ctx); + key_def_delete(key_def); + struct merger_source *merger = merger_new(ctx); + merger_set_sources(merger, sources, sources_count); + merger_set_reverse(merger, false); + merger_source_ref(merger); + + struct tuple *tuple = NULL; + const char *msg = format == NULL ? + "merger next() (any format)" : + "merger next() (user's format)"; + for (uint32_t i = 0; i < exp_tuples_count; ++i) { + int rc = merger->vtab->next(merger, format, &tuple); + (void) rc; + assert(rc == 0); + check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size, + msg); + box_tuple_unref(tuple); + } + int rc = merger->vtab->next(merger, format, &tuple); + (void) rc; + assert(rc == 0); + is(tuple, NULL, format == NULL ? + "merger is empty (any format)" : + "merger is empty (user's format)"); + + merger_source_unref(merger); + merger_context_unref(ctx); + merger_source_unref(sources[0]); + merger_source_unref(sources[1]); + + footer(); + return check_plan(); +} + +int +test_basic() +{ + plan(4); + header(); + + struct key_def *key_def = key_def_new(&key_part_integer, 1); + box_tuple_format_t *format = box_tuple_format_new(&key_def, 1); + assert(format != NULL); + + test_array_source(NULL); + test_array_source(format); + test_merger(NULL); + test_merger(format); + + key_def_delete(key_def); + box_tuple_format_unref(format); + + footer(); + return check_plan(); +} + +int +main() +{ + memory_init(); + fiber_init(fiber_c_invoke); + tuple_init(NULL); + + int rc = test_basic(); + + tuple_free(); + fiber_free(); + memory_free(); + + return rc; +} -- 2.20.1