From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Wed, 8 May 2019 01:14:20 +0300 From: Alexander Turenko Subject: Re: [PATCH v3 6/7] Add merger for tuples streams (C part) Message-ID: <20190507221417.6d3xvc4y5kzpxz4z@tkn_work_nb> References: <963ad528ad35199943931150956c1d5e2c374c40.1554906327.git.alexander.turenko@tarantool.org> <20190430153415.beufsnt2txjhkkhc@esperanza> MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Disposition: inline In-Reply-To: <20190430153415.beufsnt2txjhkkhc@esperanza> To: Vladimir Davydov Cc: tarantool-patches@freelists.org List-ID: Thanks! I'll send 4th versions of the patchset. WBR, Alexander Turenko. On Tue, Apr 30, 2019 at 06:34:15PM +0300, Vladimir Davydov wrote: > On Wed, Apr 10, 2019 at 06:21:24PM +0300, Alexander Turenko wrote: > > diff --git a/src/box/merger.c b/src/box/merger.c > > new file mode 100644 > > index 000000000..83f628758 > > --- /dev/null > > +++ b/src/box/merger.c > > @@ -0,0 +1,464 @@ > > +/* > > + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. > > + * > > + * Redistribution and use in source and binary forms, with or > > + * without modification, are permitted provided that the following > > + * conditions are met: > > + * > > + * 1. Redistributions of source code must retain the above > > + * copyright notice, this list of conditions and the > > + * following disclaimer. > > + * > > + * 2. Redistributions in binary form must reproduce the above > > + * copyright notice, this list of conditions and the following > > + * disclaimer in the documentation and/or other materials > > + * provided with the distribution. > > + * > > + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND > > + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED > > + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL > > + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, > > + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL > > + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF > > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR > > + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF > > + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF > > + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF > > + * SUCH DAMAGE. > > + */ > > + > > +#include "box/merger.h" > > + > > +#include > > +#include > > +#include > > +#include > > + > > +#define HEAP_FORWARD_DECLARATION > > +#include "salad/heap.h" > > + > > +#include "trivia/util.h" /* unlikely() */ > > +#include "diag.h" /* diag_set() */ > > +#include "say.h" /* panic() */ > > +#include "box/tuple.h" /* box_tuple_unref() */ > > +#include "box/tuple_format.h" /* box_tuple_format_*(), > > + tuple_format_id() */ > > +#include "box/key_def.h" /* box_key_def_*(), > > + box_tuple_compare() */ > > + > > +/* {{{ Create, delete, ref, unref a source and a context */ > > + > > +enum { MERGER_SOURCE_REF_MAX = INT_MAX }; > > +enum { MERGER_CONTEXT_REF_MAX = INT_MAX }; > > + > > +void > > +merger_source_ref(struct merger_source *source) > > I tend to agree with Kostja that merger_source sounds sort of awkward, > like it's a source of mergers (cf. tuple_source - source of tuples). Okay. Renamed. > Similarly, merger_context => merge_context. BTW what about including > 'reverse' in merge_context? The idea was to cache one context and use it to process, say, EQ and REQ requests both. Anyway, merger_context is gone. > > > +{ > > + if (unlikely(source->refs >= MERGER_SOURCE_REF_MAX)) > > + panic("Merger source reference counter overflow"); > > + ++source->refs; > > +} > > + > > +void > > +merger_source_unref(struct merger_source *source) > > +{ > > + assert(source->refs - 1 >= 0); > > + if (--source->refs == 0) > > + source->vtab->delete(source); > > +} > > + > > +void > > +merger_source_new(struct merger_source *source, struct merger_source_vtab *vtab) > > We typically call a constructor function _create: merger_source_create. Changed. Also changed merger_heap_node_new() to merger_heap_node_create(). > > > +{ > > + source->vtab = vtab; > > + source->refs = 0; > > +} > > + > > +void > > +merger_context_delete(struct merger_context *ctx) > > +{ > > + box_key_def_delete(ctx->key_def); > > + box_tuple_format_unref(ctx->format); > > I suggest avoid using box_ methods in the code - after all those are > intended for external modules. Creating a key_def with key_def_dup and > freeing it with box_key_def_delete looks especially weird. Let's use > the underlying functions directly, shall we? I had eliminated all box_* types and functions where it can be easily done. The only function I keep use is box_tuple_format_new(), because it is the convenient helper to create a format from a key_def. > > > + free(ctx); > > +} > > + > > +void > > +merger_context_ref(struct merger_context *ctx) > > +{ > > + if (unlikely(ctx->refs >= MERGER_CONTEXT_REF_MAX)) > > Wow, I wouldn't care, but okay. Removed, because I made merge_source_ref/unref static inline and don't want to include "util/trivia.h" (for unlikely()) and "say.h" (for panic()) in merger.h. > > > + panic("Merger context reference counter overflow"); > > + ++ctx->refs; > > +} > > + > > +void > > +merger_context_unref(struct merger_context *ctx) > > +{ > > + assert(ctx->refs - 1 >= 0); > > + if (--ctx->refs == 0) > > + merger_context_delete(ctx); > > +} > > + > > +struct merger_context * > > +merger_context_new(const struct key_def *key_def) > > +{ > > + struct merger_context *ctx = (struct merger_context *) malloc( > > Useless type conversion. There are a few more places like that. Removed casts in all such places. > > > + sizeof(struct merger_context)); > > + if (ctx == NULL) { > > + diag_set(OutOfMemory, sizeof(struct merger_context), "malloc", > > + "merger_context"); > > + return NULL; > > + } > > + > > + /* > > + * We need to copy key_def, because a key_def from the > > + * parameter can be collected before merger_context end > > + * of life (say, by LuaJIT GC if the key_def comes from > > + * Lua). > > + */ > > + ctx->key_def = key_def_dup(key_def); > > + if (ctx->key_def == NULL) { > > + free(ctx); > > + return NULL; > > + } > > + > > + ctx->format = box_tuple_format_new(&ctx->key_def, 1); > > + if (ctx->format == NULL) { > > + box_key_def_delete(ctx->key_def); > > + free(ctx); > > + return NULL; > > + } > > + > > + ctx->refs = 0; > > + > > + return ctx; > > +} > > + > > +/* }}} */ > > + > > +/* {{{ Merger */ > > + > > +/** > > + * Holds a source to fetch next tuples and a last fetched tuple to > > + * compare the node against other nodes. > > + * > > + * The main reason why this structure is separated from a merger > > + * source is that a heap node can not be a member of several > > + * heaps. > > + * > > + * The second reason is that it allows to incapsulate all heap > > encapsulate :) Yep :) > > > + * related logic inside this compilation unit, without any trails > > 'trails' sounds weird to me in this context. I'd say 'traces'. Okay. > > > + * in externally visible structures. > > Not sure that these two reasons are ironclad: > > - It's pointless to add the same source to two mergers (or to the same > merger twice). I think we'd better simply proscribe it. There's a > handy heap_node_is_stray helper that could be used to detect if a > source is already used by a merger. > > - merger_heap_node is pretty lightweight: it stores only pointers > and struct heap_node so all we need is include salad/heap.h into > merger.h, which seems to be okay. > > At the same time, not introducing a separate anchor struct would reduce > the amount of code you have to add and hence make the code a bit easier > to follow IMO. We discussed it voicely with Vladimir and he says that don't insist on this. I found it very misleading to have a next tuple for a merger in a source. It is easier to work with the code (say, check whether a tuple (un)referenced in a right way) when a merger don't hold its state in a source. > > > + */ > > +struct merger_heap_node { > > + /* A source of tuples. */ > > + struct merger_source *source; > > + /* > > + * A last fetched (refcounted) tuple to compare against > > + * other nodes. > > + */ > > + struct tuple *tuple; > > + /* An anchor to make the structure a merger heap node. */ > > + struct heap_node heap_node_anchor; > > We typically call anchors after the accommodating container, in_merger > in this case. Okay. > > > +}; > > + > > +static bool > > +merger_source_less(const heap_t *heap, const struct merger_heap_node *left, > > + const struct merger_heap_node *right); > > +#define HEAP_NAME merger_heap > > +#define HEAP_LESS merger_source_less > > +#define heap_value_t struct merger_heap_node > > +#define heap_value_attr heap_node_anchor > > +#include "salad/heap.h" > > +#undef HEAP_NAME > > +#undef HEAP_LESS > > +#undef heap_value_t > > +#undef heap_value_attr > > + > > +/** > > + * Holds a heap, an immutable context, parameters of a merge > > + * process and utility fields. > > + */ > > +struct merger { > > + /* A merger is a source. */ > > + struct merger_source base; > > + /* > > + * Whether a merge process started. > > + * > > + * The merger postpones charging of heap nodes until a > > + * first output tuple is acquired. > > + */ > > + bool started; > > + /* A merger context. */ > > + struct merger_context *ctx; > > + /* > > + * A heap of sources (of nodes that contains a source to > > + * be exact). > > + */ > > + heap_t heap; > > + /* An array of heap nodes. */ > > + uint32_t nodes_count; > > node_count :) Changed all such names. > > > + struct merger_heap_node *nodes; > > + /* Ascending (false) / descending (true) order. */ > > + bool reverse; > > +}; > > + > > +/* Helpers */ > > + > > +/** > > + * Data comparing function to construct a heap of sources. > > + */ > > +static bool > > +merger_source_less(const heap_t *heap, const struct merger_heap_node *left, > > + const struct merger_heap_node *right) > > +{ > > + if (left->tuple == NULL && right->tuple == NULL) > > + return false; > > + if (left->tuple == NULL) > > + return false; > > + if (right->tuple == NULL) > > + return true; > > How can this happen? I thought we would remove a source that ended > iteration (returned NULL), no? You are right. Replaced with asserts. > > > + struct merger *merger = container_of(heap, struct merger, heap); > > + int cmp = box_tuple_compare(left->tuple, right->tuple, > > + merger->ctx->key_def); > > + return merger->reverse ? cmp >= 0 : cmp < 0; > > +} > > + > > +/** > > + * How much more memory the heap will reserve at the next grow. > > + * > > + * See HEAP(reserve)() function in lib/salad/heap.h. > > + */ > > +static size_t > > +heap_next_grow_size(const heap_t *heap) > > Please don't introduce this function so intricately depending on the > heap implementation solely for error reporting - simply pass 0 to > diag_set(OutOfMemory) - it's perfectly fine, we do that all the time. Didn't know that it is okay to pass 0 here. Fixed. > > > +{ > > + heap_off_t heap_capacity_diff = heap->capacity == 0 ? > > + HEAP_INITIAL_CAPACITY : heap->capacity; > > + return heap_capacity_diff * sizeof(struct heap_node *); > > +} > > + > > +/** > > + * Initialize a new merger heap node. > > + */ > > +static void > > +merger_heap_node_new(struct merger_heap_node *node, > > + struct merger_source *source) > > +{ > > + node->source = source; > > + merger_source_ref(node->source); > > + node->tuple = NULL; > > + heap_node_create(&node->heap_node_anchor); > > +} > > + > > +/** > > + * Free a merger heap node. > > + */ > > +static void > > +merger_heap_node_delete(struct merger_heap_node *node) > > +{ > > + merger_source_unref(node->source); > > + if (node->tuple != NULL) > > + box_tuple_unref(node->tuple); > > +} > > + > > +/** > > + * The helper to add a new heap node to a merger heap. > > + * > > + * Return -1 at an error and set a diag. > > + * > > + * Otherwise store a next tuple in node->tuple, add the node to > > + * merger->heap and return 0. > > + */ > > +static int > > +merger_add_heap_node(struct merger *merger, struct merger_heap_node *node) > > +{ > > + struct tuple *tuple = NULL; > > + > > + /* Acquire a next tuple. */ > > + struct merger_source *source = node->source; > > + if (source->vtab->next(source, merger->ctx->format, &tuple) != 0) > > + return -1; > > + > > + /* Don't add an empty source to a heap. */ > > + if (tuple == NULL) > > + return 0; > > + > > + node->tuple = tuple; > > + > > + /* Add a node to a heap. */ > > + if (merger_heap_insert(&merger->heap, node)) { > > According to our coding conventions, != 0 should be checked explicitly. Fixed. > > > + diag_set(OutOfMemory, heap_next_grow_size(&merger->heap), > > + "malloc", "merger->heap"); > > + return -1; > > + } > > + > > + return 0; > > +} > > + > > +/* Virtual methods declarations */ > > + > > +static void > > +merger_delete(struct merger_source *base); > > +static int > > +merger_next(struct merger_source *base, box_tuple_format_t *format, > > + struct tuple **out); > > + > > +/* Non-virtual methods */ > > + > > +struct merger_source * > > +merger_new(struct merger_context *ctx) > > +{ > > + static struct merger_source_vtab merger_vtab = { > > + .delete = merger_delete, > > + .next = merger_next, > > + }; > > + > > + struct merger *merger = (struct merger *) malloc(sizeof(struct merger)); > > + if (merger == NULL) { > > + diag_set(OutOfMemory, sizeof(struct merger), "malloc", > > + "merger"); > > + return NULL; > > + } > > + > > + merger_source_new(&merger->base, &merger_vtab); > > + > > + merger->started = false; > > + merger->ctx = ctx; > > + merger_context_ref(merger->ctx); > > + merger_heap_create(&merger->heap); > > + merger->nodes_count = 0; > > + merger->nodes = NULL; > > + merger->reverse = false; > > + > > + return &merger->base; > > +} > > + > > +int > > +merger_set_sources(struct merger_source *base, struct merger_source **sources, > > + uint32_t sources_count) > > +{ > > + struct merger *merger = container_of(base, struct merger, base); > > + > > + /* Ensure we don't leak old nodes. */ > > + assert(merger->nodes_count == 0); > > + assert(merger->nodes == NULL); > > + > > + const size_t nodes_size = > > + sources_count * sizeof(struct merger_heap_node); > > + struct merger_heap_node *nodes = (struct merger_heap_node *) malloc( > > + nodes_size); > > + if (nodes == NULL) { > > + diag_set(OutOfMemory, nodes_size, "malloc", > > + "merger heap nodes"); > > + return -1; > > + } > > + > > + for (uint32_t i = 0; i < sources_count; ++i) > > + merger_heap_node_new(&nodes[i], sources[i]); > > + > > + merger->nodes_count = sources_count; > > + merger->nodes = nodes; > > + return 0; > > +} > > + > > +void > > +merger_set_reverse(struct merger_source *base, bool reverse) > > +{ > > + struct merger *merger = container_of(base, struct merger, base); > > + > > + merger->reverse = reverse; > > +} > > + > > +/* Virtual methods */ > > + > > +static void > > +merger_delete(struct merger_source *base) > > +{ > > + struct merger *merger = container_of(base, struct merger, base); > > + > > + merger_context_unref(merger->ctx); > > + merger_heap_destroy(&merger->heap); > > + > > + for (uint32_t i = 0; i < merger->nodes_count; ++i) > > + merger_heap_node_delete(&merger->nodes[i]); > > + > > + if (merger->nodes != NULL) > > + free(merger->nodes); > > + > > + free(merger); > > +} > > + > > +static int > > +merger_next(struct merger_source *base, box_tuple_format_t *format, > > + struct tuple **out) > > +{ > > + struct merger *merger = container_of(base, struct merger, base); > > + > > + /* > > + * Fetch a first tuple for each source and add all heap > > + * nodes to a merger heap. > > + */ > > + if (!merger->started) { > > + for (uint32_t i = 0; i < merger->nodes_count; ++i) { > > + struct merger_heap_node *node = &merger->nodes[i]; > > + if (merger_add_heap_node(merger, node) != 0) > > + return -1; > > + } > > + merger->started = true; > > + } > > + > > + /* Get a next tuple. */ > > + struct merger_heap_node *node = merger_heap_top(&merger->heap); > > + if (node == NULL) { > > + *out = NULL; > > + return 0; > > + } > > + struct tuple *tuple = node->tuple; > > + assert(tuple != NULL); > > + > > + /* > > + * The tuples are stored in merger->ctx->format for > > + * fast comparisons, but we should return tuples in a > > + * requested format. > > + */ > > + uint32_t id_stored = tuple_format_id(merger->ctx->format); > > + assert(tuple->format_id == id_stored); > > + if (format == NULL) > > + format = merger->ctx->format; > > + uint32_t id_requested = tuple_format_id(format); > > + if (id_stored != id_requested) { > > Tuples returned by a source must be sorted. This means that you can't > pass a merger to another merger using a different key def. That is, > this code is somewhat pointless. And it looks weird to me. > > I have a suspicion that all those relocations are going to cost us more > than tuple comparisons themselves. May be, we shouldn't relocate tuples > at all, just validate and forward them as they are? After all, we are > heading toward having offsets of all formatted fields so if we are given > a tuple (e.g. from space.select) it's likely to have all fields indexed > already. Runtime tuples uses smalloc(), which should be cheaper that malloc(), but your point looks important after merge context removal. Also I agree that we should not recreate tuples from a local space. So I rewrote it with tuple_validate() in merger_next() and in table and tuple sources. > > > + const char *tuple_beg = tuple_data(tuple); > > + const char *tuple_end = tuple_beg + tuple->bsize; > > + tuple = box_tuple_new(format, tuple_beg, tuple_end); > > + if (tuple == NULL) > > + return -1; > > + box_tuple_ref(tuple); > > + /* > > + * The node->tuple pointer will be rewritten below > > + * and in this branch it will not be returned. So > > + * we unreference it. > > + */ > > + box_tuple_unref(node->tuple); > > + } > > + > > + /* > > + * Note: An old node->tuple pointer will be written to > > + * *out as refcounted tuple or is already unreferenced > > + * above, so we don't unreference it here. > > + */ > > + struct merger_source *source = node->source; > > + if (source->vtab->next(source, merger->ctx->format, &node->tuple) != 0) > > + return -1; > > + > > + /* Update a heap. */ > > + if (node->tuple == NULL) > > + merger_heap_delete(&merger->heap, node); > > + else > > + merger_heap_update(&merger->heap, node); > > + > > + *out = tuple; > > + return 0; > > +} > > + > > +/* }}} */ > > diff --git a/src/box/merger.h b/src/box/merger.h > > new file mode 100644 > > index 000000000..2323dd7d7 > > --- /dev/null > > +++ b/src/box/merger.h > > @@ -0,0 +1,180 @@ > > +#ifndef TARANTOOL_BOX_MERGER_H_INCLUDED > > +#define TARANTOOL_BOX_MERGER_H_INCLUDED > > +/* > > + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. > > + * > > + * Redistribution and use in source and binary forms, with or > > + * without modification, are permitted provided that the following > > + * conditions are met: > > + * > > + * 1. Redistributions of source code must retain the above > > + * copyright notice, this list of conditions and the > > + * following disclaimer. > > + * > > + * 2. Redistributions in binary form must reproduce the above > > + * copyright notice, this list of conditions and the following > > + * disclaimer in the documentation and/or other materials > > + * provided with the distribution. > > + * > > + * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND > > + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED > > + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL > > + * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, > > + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL > > + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF > > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR > > + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF > > + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF > > + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF > > + * SUCH DAMAGE. > > + */ > > + > > +#include > > +#include > > + > > +#if defined(__cplusplus) > > +extern "C" { > > +#endif /* defined(__cplusplus) */ > > + > > +/* {{{ Structures */ > > + > > +struct tuple; > > +struct key_def; > > +struct tuple_format; > > +typedef struct tuple_format box_tuple_format_t; > > + > > +struct merger_source; > > +struct merger_context; > > + > > +struct merger_source_vtab { > > + /** > > + * Free a merger source. > > + * > > + * Don't call it directly, use merger_source_unref() > > + * instead. > > + */ > > + void (*delete)(struct merger_source *base); > > 'delete' is a reserved word in C++ > > IMO better avoid using it in a header file - it may be included in a C++ > file one day. Changed to 'destroy'. Also changed names of implementations of this virtual method: * luaL_merge_source_*_delete() -> luaL_merge_source_*_destroy(); * merge_source_array_delete() -> merge_source_array_destroy() (in the unit test). > > > + /** > > + * Get a next tuple (refcounted) from a source. > > + * > > + * When format is NULL it means that it does not matter > > + * for a caller in which format a tuple will be. > > + * > > + * Return 0 when successfully fetched a tuple or NULL. In > > + * case of an error set a diag and return -1. > > + */ > > + int (*next)(struct merger_source *base, box_tuple_format_t *format, > > + struct tuple **out); > > Let's please add inline merge_source_next wrapper function. > It'd look neater than calling src->vtab->next directly. Added. Also made all base source functions (create, ref, unref, next) static inline. > Also, 'format' looks more like a parameter of a source to me, not an > argument of this function. What about moving 'format' to merge_source > struct? It is the possible way to design this API. We'll need to allow a user to set a format for a source in Lua too in this case. We discussed this a bit with Vladimir. I have proposed to add a key_def argument for our Lua sources, but Vladimir declines this saying that a source does not perform tuple comparisons. > > > +}; > > + > > +/** > > + * Base (abstract) structure to represent a merger source. > > + * > > + * The structure does not hold any resources. > > + */ > > +struct merger_source { > > + /* Source-specific methods. */ > > + struct merger_source_vtab *vtab; > > Should be const. Fixed. > > > + /* Reference counter. */ > > + int refs; > > +}; > > + > > +/** > > + * Holds immutable parameters of a merger. > > + */ > > +struct merger_context { > > + struct key_def *key_def; > > + box_tuple_format_t *format; > > + /* Reference counter. */ > > + int refs; > > +}; > > + > > +/* }}} */ > > + > > +/* {{{ Create, delete, ref, unref a source and a context */ > > + > > +/** > > + * Increment a merger source reference counter. > > + */ > > +void > > +merger_source_ref(struct merger_source *source); > > + > > +/** > > + * Decrement a merger source reference counter. When it has > > + * reached zero, free the source (call delete() virtual method). > > + */ > > +void > > +merger_source_unref(struct merger_source *source); > > + > > +/** > > + * Initialize a base merger source structure. > > + */ > > +void > > +merger_source_new(struct merger_source *source, > > + struct merger_source_vtab *vtab); > > + > > +/** > > + * Free a merger context. > > + */ > > +void > > +merger_context_delete(struct merger_context *ctx); > > Why make it public? AFAICS you only need merger_context_unref outside. Traces of some old code. Anyway, merger_context is gone. > > > + > > +/** > > + * Increment a merger context reference counter. > > + */ > > +void > > +merger_context_ref(struct merger_context *ctx); > > + > > +/** > > + * Decrement a merger context reference counter. When it has > > + * reached zero, free the context. > > + */ > > +void > > +merger_context_unref(struct merger_context *ctx); > > + > > +/** > > + * Create a new merger context. > > + * > > + * A returned merger context is NOT reference counted. > > It is refernece counted. It just has the reference counter set to 0. > Anyway, why not set it to 1 right away - the caller will do that in > any case, no? merger_context is gone, but this is applicable to merge_source too. Sounds sensible for me. I had changed merge_source_create() to set refs == 1 and had changed ref / unref calls appropriately. > > > + * > > + * Return NULL and set a diag in case of an error. > > + */ > > +struct merger_context * > > +merger_context_new(const struct key_def *key_def); > > + > > +/* }}} */ > > + > > +/* {{{ Merger */ > > + > > +/** > > + * Create a new merger w/o sources. > > + * > > + * Return NULL and set a diag in case of an error. > > + */ > > +struct merger_source * > > +merger_new(struct merger_context *ctx); > > + > > +/** > > + * Set sources for a merger. > > + * > > + * Return 0 at success. Return -1 at an error and set a diag. > > + */ > > +int > > +merger_set_sources(struct merger_source *base, struct merger_source **sources, > > + uint32_t sources_count); > > source_count :) Changed all such names. > > > + > > +/** > > + * Set reverse flag for a merger. > > + */ > > +void > > +merger_set_reverse(struct merger_source *base, bool reverse); > > I'd rather pass all the arguments directly to the constructor, the way > you do it in Lua, otherwise unsettling questions might arise: what > happens if I call set_sources or set_reverse after using the merger? > can I use a merger without calling those functions? Good points. Done. > > > + > > +/* }}} */ > > + > > +#if defined(__cplusplus) > > +} /* extern "C" */ > > +#endif /* defined(__cplusplus) */ > > + > > +#endif /* TARANTOOL_BOX_MERGER_H_INCLUDED */ > > diff --git a/test/unit/merger.test.c b/test/unit/merger.test.c > > new file mode 100644 > > index 000000000..0a25d8f04 > > --- /dev/null > > +++ b/test/unit/merger.test.c > > @@ -0,0 +1,301 @@ > > +#include "unit.h" /* plan, header, footer, is, ok */ > > +#include "memory.h" /* memory_init() */ > > +#include "fiber.h" /* fiber_init() */ > > +#include "box/tuple.h" /* tuple_init(), box_tuple_*(), > > + tuple_*() */ > > +#include "box/tuple_format.h" /* box_tuple_format_default(), > > + tuple_format_id() */ > > +#include "box/key_def.h" /* key_def_new(), > > + key_def_delete() */ > > +#include "box/merger.h" /* merger_*() */ > > + > > +/* {{{ Array merger source */ > > + > > +struct merger_source_array { > > + struct merger_source base; > > + uint32_t tuples_count; > > + struct tuple **tuples; > > + uint32_t cur; > > +}; > > + > > +/* Virtual methods declarations */ > > + > > +static void > > +merger_source_array_delete(struct merger_source *base); > > +static int > > +merger_source_array_next(struct merger_source *base, box_tuple_format_t *format, > > + struct tuple **out); > > + > > +/* Non-virtual methods */ > > + > > +static struct merger_source * > > +merger_source_array_new(bool even) > > +{ > > + static struct merger_source_vtab merger_source_array_vtab = { > > + .delete = merger_source_array_delete, > > + .next = merger_source_array_next, > > + }; > > + > > + struct merger_source_array *source = > > + (struct merger_source_array *) malloc( > > + sizeof(struct merger_source_array)); > > + assert(source != NULL); > > + > > + merger_source_new(&source->base, &merger_source_array_vtab); > > + > > + uint32_t tuple_size = 2; > > + const uint32_t tuples_count = 2; > > + /* {1}, {3} */ > > + static const char *data_odd[] = {"\x91\x01", "\x91\x03"}; > > You might want to use mp_format here. Good idea, but here arrays of constant strings looks shorther and so I prefer to leave it as is. Comments should clarify what the data mean. Changed arrays in comments from Lua-style '{1}' to JSON-style '[1]'. > > > + /* {2}, {4} */ > > + static const char *data_even[] = {"\x91\x02", "\x91\x04"}; > > + const char **data = even ? data_even : data_odd; > > + source->tuples = (struct tuple **) malloc( > > + tuples_count * sizeof(struct tuple *)); > > + assert(source->tuples != NULL); > > + box_tuple_format_t *format = box_tuple_format_default(); > > + for (uint32_t i = 0; i < tuples_count; ++i) { > > + const char *end = data[i] + tuple_size; > > + source->tuples[i] = box_tuple_new(format, data[i], end); > > + box_tuple_ref(source->tuples[i]); > > + } > > + source->tuples_count = tuples_count; > > + source->cur = 0; > > + > > + return &source->base; > > +}