[PATCH v3 6/7] Add merger for tuples streams (C part)

Alexander Turenko alexander.turenko at tarantool.org
Wed May 8 01:14:20 MSK 2019


Thanks!

I'll send 4th versions of the patchset.

WBR, Alexander Turenko.

On Tue, Apr 30, 2019 at 06:34:15PM +0300, Vladimir Davydov wrote:
> On Wed, Apr 10, 2019 at 06:21:24PM +0300, Alexander Turenko wrote:
> > diff --git a/src/box/merger.c b/src/box/merger.c
> > new file mode 100644
> > index 000000000..83f628758
> > --- /dev/null
> > +++ b/src/box/merger.c
> > @@ -0,0 +1,464 @@
> > +/*
> > + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> > + *
> > + * Redistribution and use in source and binary forms, with or
> > + * without modification, are permitted provided that the following
> > + * conditions are met:
> > + *
> > + * 1. Redistributions of source code must retain the above
> > + *    copyright notice, this list of conditions and the
> > + *    following disclaimer.
> > + *
> > + * 2. Redistributions in binary form must reproduce the above
> > + *    copyright notice, this list of conditions and the following
> > + *    disclaimer in the documentation and/or other materials
> > + *    provided with the distribution.
> > + *
> > + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> > + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> > + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> > + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> > + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> > + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> > + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> > + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> > + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> > + * SUCH DAMAGE.
> > + */
> > +
> > +#include "box/merger.h"
> > +
> > +#include <assert.h>
> > +#include <stdbool.h>
> > +#include <stdint.h>
> > +#include <stdlib.h>
> > +
> > +#define HEAP_FORWARD_DECLARATION
> > +#include "salad/heap.h"
> > +
> > +#include "trivia/util.h"      /* unlikely() */
> > +#include "diag.h"             /* diag_set() */
> > +#include "say.h"              /* panic() */
> > +#include "box/tuple.h"        /* box_tuple_unref() */
> > +#include "box/tuple_format.h" /* box_tuple_format_*(),
> > +				 tuple_format_id() */
> > +#include "box/key_def.h"      /* box_key_def_*(),
> > +				 box_tuple_compare() */
> > +
> > +/* {{{ Create, delete, ref, unref a source and a context */
> > +
> > +enum { MERGER_SOURCE_REF_MAX = INT_MAX };
> > +enum { MERGER_CONTEXT_REF_MAX = INT_MAX };
> > +
> > +void
> > +merger_source_ref(struct merger_source *source)
> 
> I tend to agree with Kostja that merger_source sounds sort of awkward,
> like it's a source of mergers (cf. tuple_source - source of tuples).

Okay. Renamed.

> Similarly, merger_context => merge_context. BTW what about including
> 'reverse' in merge_context?

The idea was to cache one context and use it to process, say, EQ and REQ
requests both.

Anyway, merger_context is gone.

> 
> > +{
> > +	if (unlikely(source->refs >= MERGER_SOURCE_REF_MAX))
> > +		panic("Merger source reference counter overflow");
> > +	++source->refs;
> > +}
> > +
> > +void
> > +merger_source_unref(struct merger_source *source)
> > +{
> > +	assert(source->refs - 1 >= 0);
> > +	if (--source->refs == 0)
> > +		source->vtab->delete(source);
> > +}
> > +
> > +void
> > +merger_source_new(struct merger_source *source, struct merger_source_vtab *vtab)
> 
> We typically call a constructor function _create: merger_source_create.

Changed. Also changed merger_heap_node_new() to
merger_heap_node_create().

> 
> > +{
> > +	source->vtab = vtab;
> > +	source->refs = 0;
> > +}
> > +
> > +void
> > +merger_context_delete(struct merger_context *ctx)
> > +{
> > +	box_key_def_delete(ctx->key_def);
> > +	box_tuple_format_unref(ctx->format);
> 
> I suggest avoid using box_ methods in the code - after all those are
> intended for external modules. Creating a key_def with key_def_dup and
> freeing it with box_key_def_delete looks especially weird. Let's use
> the underlying functions directly, shall we?

I had eliminated all box_* types and functions where it can be easily
done. The only function I keep use is box_tuple_format_new(), because it
is the convenient helper to create a format from a key_def.

> 
> > +	free(ctx);
> > +}
> > +
> > +void
> > +merger_context_ref(struct merger_context *ctx)
> > +{
> > +	if (unlikely(ctx->refs >= MERGER_CONTEXT_REF_MAX))
> 
> Wow, I wouldn't care, but okay.

Removed, because I made merge_source_ref/unref static inline and don't
want to include "util/trivia.h" (for unlikely()) and "say.h" (for
panic()) in merger.h.

> 
> > +		panic("Merger context reference counter overflow");
> > +	++ctx->refs;
> > +}
> > +
> > +void
> > +merger_context_unref(struct merger_context *ctx)
> > +{
> > +	assert(ctx->refs - 1 >= 0);
> > +	if (--ctx->refs == 0)
> > +		merger_context_delete(ctx);
> > +}
> > +
> > +struct merger_context *
> > +merger_context_new(const struct key_def *key_def)
> > +{
> > +	struct merger_context *ctx = (struct merger_context *) malloc(
> 
> Useless type conversion. There are a few more places like that.

Removed casts in all such places.

> 
> > +		sizeof(struct merger_context));
> > +	if (ctx == NULL) {
> > +		diag_set(OutOfMemory, sizeof(struct merger_context), "malloc",
> > +			 "merger_context");
> > +		return NULL;
> > +	}
> > +
> > +	/*
> > +	 * We need to copy key_def, because a key_def from the
> > +	 * parameter can be collected before merger_context end
> > +	 * of life (say, by LuaJIT GC if the key_def comes from
> > +	 * Lua).
> > +	 */
> > +	ctx->key_def = key_def_dup(key_def);
> > +	if (ctx->key_def == NULL) {
> > +		free(ctx);
> > +		return NULL;
> > +	}
> > +
> > +	ctx->format = box_tuple_format_new(&ctx->key_def, 1);
> > +	if (ctx->format == NULL) {
> > +		box_key_def_delete(ctx->key_def);
> > +		free(ctx);
> > +		return NULL;
> > +	}
> > +
> > +	ctx->refs = 0;
> > +
> > +	return ctx;
> > +}
> > +
> > +/* }}} */
> > +
> > +/* {{{ Merger */
> > +
> > +/**
> > + * Holds a source to fetch next tuples and a last fetched tuple to
> > + * compare the node against other nodes.
> > + *
> > + * The main reason why this structure is separated from a merger
> > + * source is that a heap node can not be a member of several
> > + * heaps.
> > + *
> > + * The second reason is that it allows to incapsulate all heap
> 
> encapsulate :)

Yep :)

> 
> > + * related logic inside this compilation unit, without any trails
> 
> 'trails' sounds weird to me in this context. I'd say 'traces'.

Okay.

> 
> > + * in externally visible structures.
> 
> Not sure that these two reasons are ironclad:
> 
>  - It's pointless to add the same source to two mergers (or to the same
>    merger twice). I think we'd better simply proscribe it. There's a
>    handy heap_node_is_stray helper that could be used to detect if a
>    source is already used by a merger.
> 
>  - merger_heap_node is pretty lightweight: it stores only pointers
>    and struct heap_node so all we need is include salad/heap.h into
>    merger.h, which seems to be okay.
> 
> At the same time, not introducing a separate anchor struct would reduce
> the amount of code you have to add and hence make the code a bit easier
> to follow IMO.

We discussed it voicely with Vladimir and he says that don't insist on
this.

I found it very misleading to have a next tuple for a merger in a
source. It is easier to work with the code (say, check whether a tuple
(un)referenced in a right way) when a merger don't hold its state in a
source.

> 
> > + */
> > +struct merger_heap_node {
> > +	/* A source of tuples. */
> > +	struct merger_source *source;
> > +	/*
> > +	 * A last fetched (refcounted) tuple to compare against
> > +	 * other nodes.
> > +	 */
> > +	struct tuple *tuple;
> > +	/* An anchor to make the structure a merger heap node. */
> > +	struct heap_node heap_node_anchor;
> 
> We typically call anchors after the accommodating container, in_merger
> in this case.

Okay.

> 
> > +};
> > +
> > +static bool
> > +merger_source_less(const heap_t *heap, const struct merger_heap_node *left,
> > +		   const struct merger_heap_node *right);
> > +#define HEAP_NAME merger_heap
> > +#define HEAP_LESS merger_source_less
> > +#define heap_value_t struct merger_heap_node
> > +#define heap_value_attr heap_node_anchor
> > +#include "salad/heap.h"
> > +#undef HEAP_NAME
> > +#undef HEAP_LESS
> > +#undef heap_value_t
> > +#undef heap_value_attr
> > +
> > +/**
> > + * Holds a heap, an immutable context, parameters of a merge
> > + * process and utility fields.
> > + */
> > +struct merger {
> > +	/* A merger is a source. */
> > +	struct merger_source base;
> > +	/*
> > +	 * Whether a merge process started.
> > +	 *
> > +	 * The merger postpones charging of heap nodes until a
> > +	 * first output tuple is acquired.
> > +	 */
> > +	bool started;
> > +	/* A merger context. */
> > +	struct merger_context *ctx;
> > +	/*
> > +	 * A heap of sources (of nodes that contains a source to
> > +	 * be exact).
> > +	 */
> > +	heap_t heap;
> > +	/* An array of heap nodes. */
> > +	uint32_t nodes_count;
> 
> node_count :)

Changed all such names.

> 
> > +	struct merger_heap_node *nodes;
> > +	/* Ascending (false) / descending (true) order. */
> > +	bool reverse;
> > +};
> > +
> > +/* Helpers */
> > +
> > +/**
> > + * Data comparing function to construct a heap of sources.
> > + */
> > +static bool
> > +merger_source_less(const heap_t *heap, const struct merger_heap_node *left,
> > +		   const struct merger_heap_node *right)
> > +{
> > +	if (left->tuple == NULL && right->tuple == NULL)
> > +		return false;
> > +	if (left->tuple == NULL)
> > +		return false;
> > +	if (right->tuple == NULL)
> > +		return true;
> 
> How can this happen? I thought we would remove a source that ended
> iteration (returned NULL), no?

You are right. Replaced with asserts.

> 
> > +	struct merger *merger = container_of(heap, struct merger, heap);
> > +	int cmp = box_tuple_compare(left->tuple, right->tuple,
> > +				    merger->ctx->key_def);
> > +	return merger->reverse ? cmp >= 0 : cmp < 0;
> > +}
> > +
> > +/**
> > + * How much more memory the heap will reserve at the next grow.
> > + *
> > + * See HEAP(reserve)() function in lib/salad/heap.h.
> > + */
> > +static size_t
> > +heap_next_grow_size(const heap_t *heap)
> 
> Please don't introduce this function so intricately depending on the
> heap implementation solely for error reporting - simply pass 0 to
> diag_set(OutOfMemory) - it's perfectly fine, we do that all the time.

Didn't know that it is okay to pass 0 here. Fixed.

> 
> > +{
> > +	heap_off_t heap_capacity_diff =	heap->capacity == 0 ?
> > +		HEAP_INITIAL_CAPACITY : heap->capacity;
> > +	return heap_capacity_diff * sizeof(struct heap_node *);
> > +}
> > +
> > +/**
> > + * Initialize a new merger heap node.
> > + */
> > +static void
> > +merger_heap_node_new(struct merger_heap_node *node,
> > +		     struct merger_source *source)
> > +{
> > +	node->source = source;
> > +	merger_source_ref(node->source);
> > +	node->tuple = NULL;
> > +	heap_node_create(&node->heap_node_anchor);
> > +}
> > +
> > +/**
> > + * Free a merger heap node.
> > + */
> > +static void
> > +merger_heap_node_delete(struct merger_heap_node *node)
> > +{
> > +	merger_source_unref(node->source);
> > +	if (node->tuple != NULL)
> > +		box_tuple_unref(node->tuple);
> > +}
> > +
> > +/**
> > + * The helper to add a new heap node to a merger heap.
> > + *
> > + * Return -1 at an error and set a diag.
> > + *
> > + * Otherwise store a next tuple in node->tuple, add the node to
> > + * merger->heap and return 0.
> > + */
> > +static int
> > +merger_add_heap_node(struct merger *merger, struct merger_heap_node *node)
> > +{
> > +	struct tuple *tuple = NULL;
> > +
> > +	/* Acquire a next tuple. */
> > +	struct merger_source *source = node->source;
> > +	if (source->vtab->next(source, merger->ctx->format, &tuple) != 0)
> > +		return -1;
> > +
> > +	/* Don't add an empty source to a heap. */
> > +	if (tuple == NULL)
> > +		return 0;
> > +
> > +	node->tuple = tuple;
> > +
> > +	/* Add a node to a heap. */
> > +	if (merger_heap_insert(&merger->heap, node)) {
> 
> According to our coding conventions, != 0 should be checked explicitly.

Fixed.

> 
> > +		diag_set(OutOfMemory, heap_next_grow_size(&merger->heap),
> > +			 "malloc", "merger->heap");
> > +		return -1;
> > +	}
> > +
> > +	return 0;
> > +}
> > +
> > +/* Virtual methods declarations */
> > +
> > +static void
> > +merger_delete(struct merger_source *base);
> > +static int
> > +merger_next(struct merger_source *base, box_tuple_format_t *format,
> > +	    struct tuple **out);
> > +
> > +/* Non-virtual methods */
> > +
> > +struct merger_source *
> > +merger_new(struct merger_context *ctx)
> > +{
> > +	static struct merger_source_vtab merger_vtab = {
> > +		.delete = merger_delete,
> > +		.next = merger_next,
> > +	};
> > +
> > +	struct merger *merger = (struct merger *) malloc(sizeof(struct merger));
> > +	if (merger == NULL) {
> > +		diag_set(OutOfMemory, sizeof(struct merger), "malloc",
> > +			 "merger");
> > +		return NULL;
> > +	}
> > +
> > +	merger_source_new(&merger->base, &merger_vtab);
> > +
> > +	merger->started = false;
> > +	merger->ctx = ctx;
> > +	merger_context_ref(merger->ctx);
> > +	merger_heap_create(&merger->heap);
> > +	merger->nodes_count = 0;
> > +	merger->nodes = NULL;
> > +	merger->reverse = false;
> > +
> > +	return &merger->base;
> > +}
> > +
> > +int
> > +merger_set_sources(struct merger_source *base, struct merger_source **sources,
> > +		   uint32_t sources_count)
> > +{
> > +	struct merger *merger = container_of(base, struct merger, base);
> > +
> > +	/* Ensure we don't leak old nodes. */
> > +	assert(merger->nodes_count == 0);
> > +	assert(merger->nodes == NULL);
> > +
> > +	const size_t nodes_size =
> > +		sources_count * sizeof(struct merger_heap_node);
> > +	struct merger_heap_node *nodes = (struct merger_heap_node *) malloc(
> > +		nodes_size);
> > +	if (nodes == NULL) {
> > +		diag_set(OutOfMemory, nodes_size, "malloc",
> > +			 "merger heap nodes");
> > +		return -1;
> > +	}
> > +
> > +	for (uint32_t i = 0; i < sources_count; ++i)
> > +		merger_heap_node_new(&nodes[i], sources[i]);
> > +
> > +	merger->nodes_count = sources_count;
> > +	merger->nodes = nodes;
> > +	return 0;
> > +}
> > +
> > +void
> > +merger_set_reverse(struct merger_source *base, bool reverse)
> > +{
> > +	struct merger *merger = container_of(base, struct merger, base);
> > +
> > +	merger->reverse = reverse;
> > +}
> > +
> > +/* Virtual methods */
> > +
> > +static void
> > +merger_delete(struct merger_source *base)
> > +{
> > +	struct merger *merger = container_of(base, struct merger, base);
> > +
> > +	merger_context_unref(merger->ctx);
> > +	merger_heap_destroy(&merger->heap);
> > +
> > +	for (uint32_t i = 0; i < merger->nodes_count; ++i)
> > +		merger_heap_node_delete(&merger->nodes[i]);
> > +
> > +	if (merger->nodes != NULL)
> > +		free(merger->nodes);
> > +
> > +	free(merger);
> > +}
> > +
> > +static int
> > +merger_next(struct merger_source *base, box_tuple_format_t *format,
> > +	    struct tuple **out)
> > +{
> > +	struct merger *merger = container_of(base, struct merger, base);
> > +
> > +	/*
> > +	 * Fetch a first tuple for each source and add all heap
> > +	 * nodes to a merger heap.
> > +	 */
> > +	if (!merger->started) {
> > +		for (uint32_t i = 0; i < merger->nodes_count; ++i) {
> > +			struct merger_heap_node *node = &merger->nodes[i];
> > +			if (merger_add_heap_node(merger, node) != 0)
> > +				return -1;
> > +		}
> > +		merger->started = true;
> > +	}
> > +
> > +	/* Get a next tuple. */
> > +	struct merger_heap_node *node = merger_heap_top(&merger->heap);
> > +	if (node == NULL) {
> > +		*out = NULL;
> > +		return 0;
> > +	}
> > +	struct tuple *tuple = node->tuple;
> > +	assert(tuple != NULL);
> > +
> > +	/*
> > +	 * The tuples are stored in merger->ctx->format for
> > +	 * fast comparisons, but we should return tuples in a
> > +	 * requested format.
> > +	 */
> > +	uint32_t id_stored = tuple_format_id(merger->ctx->format);
> > +	assert(tuple->format_id == id_stored);
> > +	if (format == NULL)
> > +		format = merger->ctx->format;
> > +	uint32_t id_requested = tuple_format_id(format);
> > +	if (id_stored != id_requested) {
> 
> Tuples returned by a source must be sorted. This means that you can't
> pass a merger to another merger using a different key def. That is,
> this code is somewhat pointless. And it looks weird to me.
>
> I have a suspicion that all those relocations are going to cost us more
> than tuple comparisons themselves. May be, we shouldn't relocate tuples
> at all, just validate and forward them as they are? After all, we are
> heading toward having offsets of all formatted fields so if we are given
> a tuple (e.g. from space.select) it's likely to have all fields indexed
> already.

Runtime tuples uses smalloc(), which should be cheaper that malloc(),
but your point looks important after merge context removal. Also I agree
that we should not recreate tuples from a local space. So I rewrote it
with tuple_validate() in merger_next() and in table and tuple sources.

> 
> > +		const char *tuple_beg = tuple_data(tuple);
> > +		const char *tuple_end = tuple_beg + tuple->bsize;
> > +		tuple = box_tuple_new(format, tuple_beg, tuple_end);
> > +		if (tuple == NULL)
> > +			return -1;
> > +		box_tuple_ref(tuple);
> > +		/*
> > +		 * The node->tuple pointer will be rewritten below
> > +		 * and in this branch it will not be returned. So
> > +		 * we unreference it.
> > +		 */
> > +		box_tuple_unref(node->tuple);
> > +	}
> > +
> > +	/*
> > +	 * Note: An old node->tuple pointer will be written to
> > +	 * *out as refcounted tuple or is already unreferenced
> > +	 * above, so we don't unreference it here.
> > +	 */
> > +	struct merger_source *source = node->source;
> > +	if (source->vtab->next(source, merger->ctx->format, &node->tuple) != 0)
> > +		return -1;
> > +
> > +	/* Update a heap. */
> > +	if (node->tuple == NULL)
> > +		merger_heap_delete(&merger->heap, node);
> > +	else
> > +		merger_heap_update(&merger->heap, node);
> > +
> > +	*out = tuple;
> > +	return 0;
> > +}
> > +
> > +/* }}} */
> > diff --git a/src/box/merger.h b/src/box/merger.h
> > new file mode 100644
> > index 000000000..2323dd7d7
> > --- /dev/null
> > +++ b/src/box/merger.h
> > @@ -0,0 +1,180 @@
> > +#ifndef TARANTOOL_BOX_MERGER_H_INCLUDED
> > +#define TARANTOOL_BOX_MERGER_H_INCLUDED
> > +/*
> > + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> > + *
> > + * Redistribution and use in source and binary forms, with or
> > + * without modification, are permitted provided that the following
> > + * conditions are met:
> > + *
> > + * 1. Redistributions of source code must retain the above
> > + *    copyright notice, this list of conditions and the
> > + *    following disclaimer.
> > + *
> > + * 2. Redistributions in binary form must reproduce the above
> > + *    copyright notice, this list of conditions and the following
> > + *    disclaimer in the documentation and/or other materials
> > + *    provided with the distribution.
> > + *
> > + * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
> > + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> > + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> > + * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> > + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> > + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> > + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> > + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> > + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> > + * SUCH DAMAGE.
> > + */
> > +
> > +#include <stdbool.h>
> > +#include <stdint.h>
> > +
> > +#if defined(__cplusplus)
> > +extern "C" {
> > +#endif /* defined(__cplusplus) */
> > +
> > +/* {{{ Structures */
> > +
> > +struct tuple;
> > +struct key_def;
> > +struct tuple_format;
> > +typedef struct tuple_format box_tuple_format_t;
> > +
> > +struct merger_source;
> > +struct merger_context;
> > +
> > +struct merger_source_vtab {
> > +	/**
> > +	 * Free a merger source.
> > +	 *
> > +	 * Don't call it directly, use merger_source_unref()
> > +	 * instead.
> > +	 */
> > +	void (*delete)(struct merger_source *base);
> 
> 'delete' is a reserved word in C++
> 
> IMO better avoid using it in a header file - it may be included in a C++
> file one day.

Changed to 'destroy'. Also changed names of implementations of this
virtual method:

* luaL_merge_source_*_delete() -> luaL_merge_source_*_destroy();
* merge_source_array_delete() -> merge_source_array_destroy() (in the
  unit test).

> 
> > +	/**
> > +	 * Get a next tuple (refcounted) from a source.
> > +	 *
> > +	 * When format is NULL it means that it does not matter
> > +	 * for a caller in which format a tuple will be.
> > +	 *
> > +	 * Return 0 when successfully fetched a tuple or NULL. In
> > +	 * case of an error set a diag and return -1.
> > +	 */
> > +	int (*next)(struct merger_source *base, box_tuple_format_t *format,
> > +		    struct tuple **out);
> 
> Let's please add inline merge_source_next wrapper function.
> It'd look neater than calling src->vtab->next directly.

Added.

Also made all base source functions (create, ref, unref, next) static
inline.

> Also, 'format' looks more like a parameter of a source to me, not an
> argument of this function. What about moving 'format' to merge_source
> struct?

It is the possible way to design this API. We'll need to allow a user to
set a format for a source in Lua too in this case. We discussed this a
bit with Vladimir. I have proposed to add a key_def argument for our Lua
sources, but Vladimir declines this saying that a source does not
perform tuple comparisons.

> 
> > +};
> > +
> > +/**
> > + * Base (abstract) structure to represent a merger source.
> > + *
> > + * The structure does not hold any resources.
> > + */
> > +struct merger_source {
> > +	/* Source-specific methods. */
> > +	struct merger_source_vtab *vtab;
> 
> Should be const.

Fixed.

> 
> > +	/* Reference counter. */
> > +	int refs;
> > +};
> > +
> > +/**
> > + * Holds immutable parameters of a merger.
> > + */
> > +struct merger_context {
> > +	struct key_def *key_def;
> > +	box_tuple_format_t *format;
> > +	/* Reference counter. */
> > +	int refs;
> > +};
> > +
> > +/* }}} */
> > +
> > +/* {{{ Create, delete, ref, unref a source and a context */
> > +
> > +/**
> > + * Increment a merger source reference counter.
> > + */
> > +void
> > +merger_source_ref(struct merger_source *source);
> > +
> > +/**
> > + * Decrement a merger source reference counter. When it has
> > + * reached zero, free the source (call delete() virtual method).
> > + */
> > +void
> > +merger_source_unref(struct merger_source *source);
> > +
> > +/**
> > + * Initialize a base merger source structure.
> > + */
> > +void
> > +merger_source_new(struct merger_source *source,
> > +		  struct merger_source_vtab *vtab);
> > +
> > +/**
> > + * Free a merger context.
> > + */
> > +void
> > +merger_context_delete(struct merger_context *ctx);
> 
> Why make it public? AFAICS you only need merger_context_unref outside.

Traces of some old code. Anyway, merger_context is gone.

> 
> > +
> > +/**
> > + * Increment a merger context reference counter.
> > + */
> > +void
> > +merger_context_ref(struct merger_context *ctx);
> > +
> > +/**
> > + * Decrement a merger context reference counter. When it has
> > + * reached zero, free the context.
> > + */
> > +void
> > +merger_context_unref(struct merger_context *ctx);
> > +
> > +/**
> > + * Create a new merger context.
> > + *
> > + * A returned merger context is NOT reference counted.
> 
> It is refernece counted. It just has the reference counter set to 0.
> Anyway, why not set it to 1 right away - the caller will do that in
> any case, no?

merger_context is gone, but this is applicable to merge_source too.
Sounds sensible for me. I had changed merge_source_create() to set refs
== 1 and had changed ref / unref calls appropriately.

> 
> > + *
> > + * Return NULL and set a diag in case of an error.
> > + */
> > +struct merger_context *
> > +merger_context_new(const struct key_def *key_def);
> > +
> > +/* }}} */
> > +
> > +/* {{{ Merger */
> > +
> > +/**
> > + * Create a new merger w/o sources.
> > + *
> > + * Return NULL and set a diag in case of an error.
> > + */
> > +struct merger_source *
> > +merger_new(struct merger_context *ctx);
> > +
> > +/**
> > + * Set sources for a merger.
> > + *
> > + * Return 0 at success. Return -1 at an error and set a diag.
> > + */
> > +int
> > +merger_set_sources(struct merger_source *base, struct merger_source **sources,
> > +		   uint32_t sources_count);
> 
> source_count :)

Changed all such names.

> 
> > +
> > +/**
> > + * Set reverse flag for a merger.
> > + */
> > +void
> > +merger_set_reverse(struct merger_source *base, bool reverse);
> 
> I'd rather pass all the arguments directly to the constructor, the way
> you do it in Lua, otherwise unsettling questions might arise: what
> happens if I call set_sources or set_reverse after using the merger?
> can I use a merger without calling those functions?

Good points. Done.

> 
> > +
> > +/* }}} */
> > +
> > +#if defined(__cplusplus)
> > +} /* extern "C" */
> > +#endif /* defined(__cplusplus) */
> > +
> > +#endif /* TARANTOOL_BOX_MERGER_H_INCLUDED */
> > diff --git a/test/unit/merger.test.c b/test/unit/merger.test.c
> > new file mode 100644
> > index 000000000..0a25d8f04
> > --- /dev/null
> > +++ b/test/unit/merger.test.c
> > @@ -0,0 +1,301 @@
> > +#include "unit.h"              /* plan, header, footer, is, ok */
> > +#include "memory.h"            /* memory_init() */
> > +#include "fiber.h"             /* fiber_init() */
> > +#include "box/tuple.h"         /* tuple_init(), box_tuple_*(),
> > +				  tuple_*() */
> > +#include "box/tuple_format.h"  /* box_tuple_format_default(),
> > +				  tuple_format_id() */
> > +#include "box/key_def.h"       /* key_def_new(),
> > +				  key_def_delete() */
> > +#include "box/merger.h"        /* merger_*() */
> > +
> > +/* {{{ Array merger source */
> > +
> > +struct merger_source_array {
> > +	struct merger_source base;
> > +	uint32_t tuples_count;
> > +	struct tuple **tuples;
> > +	uint32_t cur;
> > +};
> > +
> > +/* Virtual methods declarations */
> > +
> > +static void
> > +merger_source_array_delete(struct merger_source *base);
> > +static int
> > +merger_source_array_next(struct merger_source *base, box_tuple_format_t *format,
> > +			 struct tuple **out);
> > +
> > +/* Non-virtual methods */
> > +
> > +static struct merger_source *
> > +merger_source_array_new(bool even)
> > +{
> > +	static struct merger_source_vtab merger_source_array_vtab = {
> > +		.delete = merger_source_array_delete,
> > +		.next = merger_source_array_next,
> > +	};
> > +
> > +	struct merger_source_array *source =
> > +		(struct merger_source_array *) malloc(
> > +		sizeof(struct merger_source_array));
> > +	assert(source != NULL);
> > +
> > +	merger_source_new(&source->base, &merger_source_array_vtab);
> > +
> > +	uint32_t tuple_size = 2;
> > +	const uint32_t tuples_count = 2;
> > +	/* {1}, {3} */
> > +	static const char *data_odd[] = {"\x91\x01", "\x91\x03"};
> 
> You might want to use mp_format here.

Good idea, but here arrays of constant strings looks shorther and so I
prefer to leave it as is. Comments should clarify what the data mean.

Changed arrays in comments from Lua-style '{1}' to JSON-style '[1]'.

> 
> > +	/* {2}, {4} */
> > +	static const char *data_even[] = {"\x91\x02", "\x91\x04"};
> > +	const char **data = even ? data_even : data_odd;
> > +	source->tuples = (struct tuple **) malloc(
> > +		tuples_count * sizeof(struct tuple *));
> > +	assert(source->tuples != NULL);
> > +	box_tuple_format_t *format = box_tuple_format_default();
> > +	for (uint32_t i = 0; i < tuples_count; ++i) {
> > +		const char *end = data[i] + tuple_size;
> > +		source->tuples[i] = box_tuple_new(format, data[i], end);
> > +		box_tuple_ref(source->tuples[i]);
> > +	}
> > +	source->tuples_count = tuples_count;
> > +	source->cur = 0;
> > +
> > +	return &source->base;
> > +}



More information about the Tarantool-patches mailing list