[PATCH 3/3] Add merger for tuple streams

Alexander Turenko alexander.turenko at tarantool.org
Thu Jan 10 00:36:14 MSK 2019


Updated most of aspects you point me.

Now the patchset looks so:

* Add luaL_iscallable with support of cdata metatype
* Add functions to ease using Lua iterators from C
* lua: add luaT_newtuple()
* lua: add luaT_new_key_def()
* net.box: add helpers to decode msgpack headers
* Add merger for tuple streams

I'll resend it with v2. It is more to end the iteration in some clean
way then to review, because the chunked data transfer API research is
still on my side.

Totktonada/gh-3276-on-board-merger-1.10 branch stale for now. I'll
update it if it will be requested explicitly.
Totktonada/gh-3276-on-board-merger was updated.

I updated graphql usage of merger (it still don't merged awaiting
stabilizing the merger API):

https://github.com/tarantool/graphql/commit/a442f6451d407c5833abef5b14303662cfd16a62
https://github.com/tarantool/graphql/commit/738501641a43e7602567469109ab0032b3c78896

WBR, Alexander Turenko.

On Wed, Dec 26, 2018 at 11:11:10PM +0300, Vladimir Davydov wrote:
> On Sun, Dec 16, 2018 at 11:17:26PM +0300, Alexander Turenko wrote:
> > Fixes #3276.
> 
> DocBot request is missing. I guess you could paste that nice piece of
> documentation you wrote in the source file right in the commit message.
> 

Thanks. Done.

> > ---
> >  src/CMakeLists.txt           |    2 +
> >  src/lua/init.c               |    5 +
> >  src/lua/merger.c             | 1643 ++++++++++++++++++++++++++++++++++
> >  src/lua/merger.h             |   39 +
> >  src/lua/merger.lua           |   19 +
> 
> Merger depends on tuple, key_def, and tuple_format, which are box
> objects, hence it should be defined in src/box.
> 

Moved to src/box/lua/merger.[ch].

> My main concern about the merger code added by this patch is that the
> merger logic and its Lua wrapper are mixed together in the same file.
> This makes the code difficult for understanding. It'd be great if you
> could isolate the merger logic in src/box/merger.[hc] and leave only the
> Lua wrapper code in src/box/lua/merger.[hc]. The merger could then be
> submitted in a separate patch with an appropriate unit test - this would
> also ease the review process.

The merger will be very tiny, because most of logic interoperates with
Lua, but this should look more structured, so I agree.

We discussed with Vladimir how to split the code. He dislike the idea to
store basic fields in struct merger_state in box/merger.[ch] and extend
this structure with fetch_source_ref in box/lua/merger.[ch].

Also discussed: fetch_source has source.idx parameter and a user should
map source index to net.box connections or some other kind of 'upstream
source'. Vladimir don't think it is good API.

We also discussed whether it is possible to define a source as an object
with methods like next/fetch or next/next_batch. It is unclear how it
should work with buffer. I'll investigate possibilities and will get
back to this in the following email.

So this is not done.

> 
> A few comments regarding the API and the code are below.
> 
> >  test/app-tap/merger.test.lua |  693 ++++++++++++++
> >  test/app-tap/suite.ini       |    1 +
> >  7 files changed, 2402 insertions(+)
> >  create mode 100644 src/lua/merger.c
> >  create mode 100644 src/lua/merger.h
> >  create mode 100644 src/lua/merger.lua
> >  create mode 100755 test/app-tap/merger.test.lua
> 
> > diff --git a/src/lua/merger.c b/src/lua/merger.c
> > +/**
> > + * API and basic usage
> > + * -------------------
> > + *
> > + * The following example demonstrates API of the module:
> > + *
> > + * ```
> > + * local net_box = require('net.box')
> > + * local buffer = require('buffer')
> > + * local merger = require('merger')
> > + *
> > + * -- The format of key_parts parameter is the same as
> > + * -- `{box,conn}.space.<...>.index.<...>.parts` (where conn is
> > + * -- net.box connection).
> > + * local key_parts = {
> > + *     {
> > + *         fieldno = <number>,
> > + *         type = <string>,
> > + *         [ is_nullable = <boolean>, ]
> > + *         [ collation_id = <number>, ]
> > + *         [ collation = <string>, ]
> > + *     },
> > + *     ...
> > + * }
> > + *
> > + * -- Create the merger instance.
> > + * local merger_inst = merger.new(key_parts)
> 
> IMO it's not a merger instance, it's rather a merger context.
> 
> What about
> 
>   local ctx = merger.context.new(key_parts)
>   merger.pairs(ctx, {src1, src2, ...})
> 
> ?
> 
> Note, ctx is a mere argument here, not a class object.
> 

I agreed: key_def + format feels more as a context then as an instance.
And writing '_inst' postfixes in a user code was distracting a bit.

Done.

> > + * Decoding / encoding buffers
> > + * ---------------------------
> > + *
> > + * A select response has the following structure:
> > + * `{[48] = {tuples}}`, while a call response is
> > + * `{[48] = {{tuples}}}` (because it should support multiple
> > + * return values). A user should specify how merger will
> > + * operate on buffers, so merger has `decode` (how to read buffer
> > + * sources) and `encode` (how to write to a resulting buffer)
> > + * options. These options accept the following values:
> > + *
> > + * Option value       | Buffer structure
> > + * ------------------ | ----------------
> > + * 'raw'              | tuples
> > + * 'select' (default) | {[48] = {tuples}}
> > + * 'call'             | {[48] = {{tuples}}}
> > + * 'chain'            | {[48] = {{{tuples, ...}}}}
> 
> I don't think we should make merger dependent on iproto. I understand
> that it must be able to take an ibuf for performance considerations, but
> I think that the buffer must always be formatted as a msgpack array of
> tuples, without any extra headers. Headers should be removed either by
> the caller (with msgpack Lua lib) or by net.box itself.
> 

I understand your concern and agreed with arguments, but from user
perspective it is convenient to just say {decode = 'call'}. The new way
to decode should be convenient too, I think.

Added two functions (see the commit message for the documentation and
examples).

net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)
    -> new_rpos
    -> nil, err_msg
msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, [, arr_len])
    -> new_rpos, arr_len
    -> nil, err_msg

I think this API has several advantages that need to be taken into
consideration if we'll want to change it:

* It is simple for a user and don't bloat examples, because allows to
  check and skip iproto_data/array headers in 1-3 lines of code (see the
  example in docbot comment).
* It follows msgpack lua module convention and receives buf.rpos as
  input (however does not allow a string argument).
* It checks for out-of-buffer errors: so it will not crash in case of,
  say, empty buffer.

> > +#ifndef NDEBUG
> > +#include "say.h"
> > +/**
> > + * Heap insert/delete/update macros wrapped with debug prints.
> > + */
> > +#define MERGER_HEAP_INSERT(heap, hnode, source) do {			\
> > +	say_debug("merger: [source %p] insert: tuple: %s", (source),	\
> > +		  tuple_str((source)->tuple));				\
> > +	merger_heap_insert((heap), (hnode));				\
> > +} while(0)
> > +#define MERGER_HEAP_DELETE(heap, hnode, source) do {		\
> > +	say_debug("merger: [source %p] delete", (source));	\
> > +	merger_heap_delete((heap), (hnode));			\
> > +} while(0)
> > +#define MERGER_HEAP_UPDATE(heap, hnode, source) do {			\
> > +	say_debug("merger: [source %p] update: tuple: %s", (source),	\
> > +		  tuple_str((source)->tuple));				\
> > +	merger_heap_update((heap), (hnode));				\
> > +} while(0)
> > +#else /* !defined(NDEBUG) */
> > +/**
> > + * Heap insert/delete/update macros wrappers w/o debug prints.
> > + */
> > +#define MERGER_HEAP_INSERT(heap, hnode, source) do {	\
> > +	merger_heap_insert((heap), (hnode));		\
> > +} while(0)
> > +#define MERGER_HEAP_DELETE(heap, hnode, source) do {	\
> > +	merger_heap_delete((heap), (hnode));		\
> > +} while(0)
> > +#define MERGER_HEAP_UPDATE(heap, hnode, source) do {	\
> > +	merger_heap_update((heap), (hnode));		\
> > +} while(0)
> > +#endif /* !defined(NDEBUG) */
> 
> say_debug() doesn't evaluate arguments if log_level < DEBUG so I think
> it's no use to disable it if NDEBUG. I'd remove these macros and call
> say_debug() directly at call sites.
> 

The difference just in one jump (because of if statement), but it is on
the critical path: that is why I do it in that way.

I had left these prints, because I was debugging a problem in shard's
merger in the past with them, but now I think it is even easier to debug
buffers with something like:

```
yaml.encode(msgpackffi.decode(ffi.string(buf.rpos, buf.wpos - buf.rpos)))
```

I have removed these debug prints.

> Also, heap function may fail with ENOMEM unless you use heap_reserve().

Added check for merger_heap_insert() return value.

> 
> > +
> > +/**
> > + * Helper macros to push / throw out of memory errors to Lua.
> > + */
> > +#define push_out_of_memory_error(L, size, what_name) do {	\
> > +	diag_set(OutOfMemory, (size), "malloc", (what_name));	\
> > +	luaT_pusherror(L, diag_last_error(diag_get()));		\
> > +} while(0)
> > +#define throw_out_of_memory_error(L, size, what_name) do {	\
> > +	diag_set(OutOfMemory, (size), "malloc", (what_name));	\
> > +	luaT_error(L);						\
> > +	unreachable();						\
> > +	return -1;						\
> > +} while(0)
> 
> I wouldn't use these macros. They make a call only one line shorter:
> 
> 	throw_out_of_memory_error(L, size, "obj");
> 
> instead of
> 
> 	diag_set(OutOfMemory, size, "malloc", "obj");
> 	return luaT_error(L);
> 
> Not worth obscuring the code IMO.

Okay, expanded.

> 
> > +
> > +#define BOX_COLLATION_NAME_INDEX 1
> 
> Not used anywhere.

Removed in 2.1. It is used in 1.10.

> 
> > +
> > +/**
> > + * A type of data structure that holds source data.
> > + */
> > +enum merger_source_type {
> > +	SOURCE_TYPE_BUFFER,
> > +	SOURCE_TYPE_TABLE,
> > +	SOURCE_TYPE_ITERATOR,
> > +	SOURCE_TYPE_NONE,
> > +};
> 
> I'd prefer if you used vtab instead, because that would isolate code
> and data of each iterator type in a separate function/struct, making
> it easier to follow. Besides, you'll have to do that anyway provided
> you agree to move the merger logic to src/box/merger.c.
> 

Done.

> > +
> > +/**
> > + * How data are encoded in a buffer.
> > + *
> > + * `decode` and `encode` options are parsed to values of this
> > + * enum.
> > + */
> > +enum merger_buffer_type {
> > +	BUFFER_TYPE_RAW,
> > +	BUFFER_TYPE_SELECT,
> > +	BUFFER_TYPE_CALL,
> > +	BUFFER_TYPE_CHAIN,
> > +	BUFFER_TYPE_NONE,
> > +};
> > +
> > +/**
> > + * Hold state of a merge source.
> > + */
> > +struct merger_source {
> > +	/*
> > +	 * A source is the heap node. Compared by the next tuple.
> > +	 */
> > +	struct heap_node hnode;
> > +	/* Union determinant. */
> > +	enum merger_source_type type;
> > +	/* Fields specific for certaint source types. */
> > +	union {
> > +		/* Buffer source. */
> > +		struct {
> > +			struct ibuf *buf;
> > +			/*
> > +			 * A merger stops before end of a buffer
> > +			 * when it is not the last merger in the
> > +			 * chain.
> > +			 */
> > +			size_t remaining_tuples_cnt;
> > +		} buf;
> > +		/* Table source. */
> > +		struct {
> > +			int ref;
> > +			int next_idx;
> > +		} tbl;
> > +		/* Iterator source. */
> > +		struct {
> > +			struct lua_iterator *it;
> > +		} it;
> > +	};
> > +	/* Next tuple. */
> > +	struct tuple *tuple;
> > +};
> > +
> > +/**
> > + * Holds immutable parameters of a merger.
> > + */
> > +struct merger {
> 
> Should be called merger_context IMO.

I agree. Changed.

> 
> > +	struct key_def *key_def;
> > +	box_tuple_format_t *format;
> > +};
> > +
> > +/**
> > + * Holds parameters of merge process, sources, result storage
> > + * (if any), heap of sources and utility flags / counters.
> > + */
> > +struct merger_iterator {
> 
> Should be called merger or merger_state IMO.

I'm tentative about the name 'merger', because the term is very common.
However I understand the concept of 'central' structure of the module
like it is struct tuple for tuple.{c,h,lua}.

'merger_state' looks okay for me. This structure, wrapped into cdata, is
used as 'param' (3rd) value in the iterator triplet when
merger_inst:pairs(...) / merger.pairs(ctx, ...) is called, but 'state'
(2nd) value is struct merger / struct merger_context, but this
terminology is vague. So using context as 'state' and state as 'param'
looks okay eventually.

I'll choose merger_state for now and we can discuss it later again.
It should be quite easy to change it later if we need to do so.

> 
> > +	/* Heap of sources. */
> > +	heap_t heap;
> > +	/*
> > +	 * key_def is copied from merger.
> > +	 *
> > +	 * A merger can be collected by LuaJIT GC independently
> > +	 * from a merger_iterator, so we cannot just save pointer
> > +	 * to merger here and so we copy key_def from merger.
> > +	 */
> > +	struct key_def *key_def;
> 
> And what about the tuple format? You don't seem to copy or reference it
> anywhere. Confusing. I think that if the iterator needs the merger to
> stay along, it'd better reference it.
> 

We need key_def in the source_less() function, this is why we copy it
here. In source_less() we have no struct merger / merger_context, but
have struct merger_iterator / merger_state (because it contains the
heap). I have updated the comment as follows:

/*
 * Copy of key_def from merger_context.
 *
 * A merger_context can be collected by LuaJIT GC
 * independently from a merger_state, so we need either
 * copy key_def or implement reference counting for
 * merger_context and save the pointer.
 *
 * key_def is needed in source_less(), where merger_state
 * is known, but merger_context is not.
 */

> > +	/* Parsed sources and decoding parameters. */
> > +	uint32_t sources_count;
> > +	struct merger_source **sources;
> > +	enum merger_buffer_type decode;
> > +	/* Ascending / descending order. */
> > +	int order;
> > +	/* Optional output buffer and encoding parameters. */
> > +	struct ibuf *obuf;
> > +	enum merger_buffer_type encode;
> > +	uint32_t encode_chain_len;
> > +};
> > +
> > +static uint32_t merger_type_id = 0;
> > +static uint32_t merger_iterator_type_id = 0;
> > +static uint32_t ibuf_type_id = 0;
> > +
> > +/* Forward declarations. */
> > +static bool
> > +source_less(const heap_t *heap, const struct heap_node *a,
> > +	    const struct heap_node *b);
> > +static int
> > +lbox_merger_gc(struct lua_State *L);
> > +static void
> > +merger_iterator_delete(struct lua_State *L, struct merger_iterator *it);
> > +static int
> > +lbox_merger_iterator_gc(struct lua_State *L);
> > +
> > +#define HEAP_NAME merger_heap
> > +#define HEAP_LESS source_less
> > +#include "salad/heap.h"
> > +
> > +/**
> > + * Create the new tuple with specific format from a Lua table or a
> > + * tuple.
> > + *
> > + * In case of an error push the error message to the Lua stack and
> > + * return NULL.
> > + */
> > +static struct tuple *
> > +luaT_gettuple_with_format(struct lua_State *L, int idx,
> > +			  box_tuple_format_t *format)
> > +{
> > +	struct tuple *tuple;
> > +	if (lua_istable(L, idx)) {
> > +		/* Based on lbox_tuple_new() code. */
> 
> Please define this as a separate function somewhere in
> src/box/lua/tuple.c and reuse lbox_tuple_new() code instead
> of copying it. May be done in a separate patch, I guess.

Done.

The code becomes a bit weird, but the reason is the mix of mine
requirements and lbox_tuple_new() ones.

I need:

* Choose a table / a tuple with index on the lua stack.
* Pushed (don't throwed) errors to perform clean-up before rethrow.

lbox_tuple_new() need:

* Support old parameters format: box.tuple.new(1, 2, 3), so I booked
  idx == 0 for this (it is not valid lua stack index).

>From the other side, the code now is not duplicated and the new funtion
(named luaT_newtuple()) is more flexible then lbox_tuple_new(): hope
it'll help with reusing the code later.

> 
> > +		struct ibuf *buf = tarantool_lua_ibuf;
> > +		ibuf_reset(buf);
> > +		struct mpstream stream;
> > +		mpstream_init(&stream, buf, ibuf_reserve_cb, ibuf_alloc_cb,
> > +		      luamp_error, L);
> > +		luamp_encode_tuple(L, luaL_msgpack_default, &stream, idx);
> > +		mpstream_flush(&stream);
> > +		tuple = box_tuple_new(format, buf->buf,
> > +				      buf->buf + ibuf_used(buf));
> > +		if (tuple == NULL) {
> > +			luaT_pusherror(L, diag_last_error(diag_get()));
> > +			return NULL;
> > +		}
> > +		ibuf_reinit(tarantool_lua_ibuf);
> > +		return tuple;
> > +	}
> > +	tuple = luaT_istuple(L, idx);
> > +	if (tuple == NULL) {
> > +		lua_pushfstring(L, "A tuple or a table expected, got %s",
> > +				lua_typename(L, lua_type(L, -1)));
> > +		return NULL;
> > +	}
> > +	/*
> > +	 * Create the new tuple with the format necessary for fast
> > +	 * comparisons.
> > +	 */
> > +	const char *tuple_beg = tuple_data(tuple);
> > +	const char *tuple_end = tuple_beg + tuple->bsize;
> > +	tuple = box_tuple_new(format, tuple_beg, tuple_end);
> > +	if (tuple == NULL) {
> > +		luaT_pusherror(L, diag_last_error(diag_get()));
> > +		return NULL;
> > +	}
> > +	return tuple;
> > +}
> 
> > +#define RPOS_P(buf) ((const char **) &(buf)->rpos)
> 
> Please, let's somehow get along without this macro.

Done.

> 
> > +
> > +/**
> > + * Skip (and check) the wrapper around tuples array (and the array
> > + * itself).
> > + *
> > + * Expected different kind of wrapping depending of it->decode.
> > + */
> > +static int
> > +decode_header(struct merger_iterator *it, struct ibuf *buf, size_t *len_p)
> > +{
> > +	int ok = 1;
> > +	/* Decode {[IPROTO_DATA] = ...} header. */
> > +	if (it->decode != BUFFER_TYPE_RAW)
> > +		ok = mp_typeof(*buf->rpos) == MP_MAP &&
> > +			mp_decode_map(RPOS_P(buf)) == 1 &&
> > +			mp_typeof(*buf->rpos) == MP_UINT &&
> > +			mp_decode_uint(RPOS_P(buf)) == IPROTO_DATA;
> > +	/* Decode the array around call return values. */
> > +	if (ok && (it->decode == BUFFER_TYPE_CALL ||
> > +	    it->decode == BUFFER_TYPE_CHAIN))
> > +		ok = mp_typeof(*buf->rpos) == MP_ARRAY &&
> > +			mp_decode_array(RPOS_P(buf)) > 0;
> > +	/* Decode the array around chained input. */
> > +	if (ok && it->decode == BUFFER_TYPE_CHAIN)
> > +		ok = mp_typeof(*buf->rpos) == MP_ARRAY &&
> > +			mp_decode_array(RPOS_P(buf)) > 0;
> > +	/* Decode the array around tuples to merge. */
> > +	if (ok)
> > +		ok = mp_typeof(*buf->rpos) == MP_ARRAY;
> > +	if (ok)
> > +		*len_p = mp_decode_array(RPOS_P(buf));
> > +	return ok;
> > +}
> > +
> > +#undef RPOS_P
> 
> > +/**
> > + * Determine type of a merger source on the Lua stack.
> > + *
> > + * Set *buf_p to buffer when the source is valid source of buffer
> > + * type and buf_p is not NULL.
> > + */
> > +static enum merger_source_type
> > +parse_source_type(lua_State *L, int idx, struct ibuf **buf_p)
> > +{
> > +	if (lua_type(L, idx) == LUA_TCDATA) {
> > +		struct ibuf *buf = check_ibuf(L, idx);
> 
> msgpack.decode takes ibuf.rpos rather than ibuf. May be, the merger
> should do the same, for consistency?

I have the following concerns here:

1. In this case it cannot check for end of buffer. See comment around
   mp_next() call. BTW, with ibuf.wpos we can replace it with mp_check()
   in the future.
2. I think the API looks better from a user perspective when the user
   just passes blackbox objects between compatible APIs and don't need
   to understand the internal structure of these objects.
3. Tricky callback contract in case of chunked data transfer, see below.

We can receive rpos, store a read position inside merger and return it
to the user to let him set rpos manually (to allow ibuf_reserve_slow()
reuse this memory). Then merger should clear its read position, but only
if the user really update rpos. So the contract between merger and user
becomes tricky.

So I leave it as is until we discuss it again.

BTW, msgpack/msgpackffi tends to be universal as possible, so
cdata<const char *> as the parameter type looks appropriate. These
modules don't need to handle interlaced reads and writes, so data in the
buffer do not move between reads.

Merger need to handle such cases (esp. after introducing chunked data
transfer support -- user callback to write more data), so using a buffer
as a buffer (not just as const char *) is appropriate here.

> 
> > +		if (buf == NULL)
> > +			return SOURCE_TYPE_NONE;
> > +		if (buf_p != NULL)
> > +			*buf_p = buf;
> > +		return SOURCE_TYPE_BUFFER;
> > +	} else if (lua_istable(L, idx)) {
> > +		lua_rawgeti(L, idx, 1);
> > +		int iscallable = luaT_iscallable(L, idx);
> > +		lua_pop(L, 1);
> > +		if (iscallable)
> > +			return SOURCE_TYPE_ITERATOR;
> > +		return SOURCE_TYPE_TABLE;
> > +	}
> > +
> > +	return SOURCE_TYPE_NONE;
> > +}
> 
> > +/**
> > + * Parse sources table: second parameter pf merger_isnt:pairs()
> > + * and merger_inst:select() into the merger_iterator structure.
> > + *
> > + * Note: This function should be called when options are already
> > + * parsed (using parse_opts()).
> > + *
> > + * Returns 0 on success. In case of an error it pushes an error
> > + * message to the Lua stack and returns 1.
> > + */
> > +static int
> > +parse_sources(struct lua_State *L, int idx, struct merger *merger,
> > +	      struct merger_iterator *it)
> > +{
> > +	/* Allocate sources array. */
> > +	uint32_t capacity = 8;
> > +	const ssize_t sources_size = capacity * sizeof(struct merger_source *);
> > +	it->sources = (struct merger_source **) malloc(sources_size);
> > +	if (it->sources == NULL) {
> > +		push_out_of_memory_error(L, sources_size, "it->sources");
> > +		return 1;
> > +	}
> > +
> > +	/* Fetch all sources. */
> > +	while (true) {
> > +		lua_pushinteger(L, it->sources_count + 1);
> > +		lua_gettable(L, idx);
> > +		if (lua_isnil(L, -1))
> > +			break;
> > +
> > +		/* Shrink sources array if needed. */
> 
> Grow

Fixed.

> 
> > +		if (it->sources_count == capacity) {
> > +			capacity *= 2;
> > +			struct merger_source **new_sources;
> > +			const ssize_t new_sources_size =
> > +				capacity * sizeof(struct merger_source *);
> > +			new_sources = (struct merger_source **) realloc(
> > +				it->sources, new_sources_size);
> > +			if (new_sources == NULL) {
> > +				push_out_of_memory_error(L, new_sources_size,
> > +							 "new_sources");
> > +				return 1;
> > +			}
> > +			it->sources = new_sources;
> > +		}
> > +
> > +		/* Allocate the new source. */
> > +		it->sources[it->sources_count] = (struct merger_source *)
> > +			malloc(sizeof(struct merger_source));
> > +		struct merger_source *current_source =
> > +			it->sources[it->sources_count];
> > +		if (current_source == NULL) {
> > +			push_out_of_memory_error(L,
> > +						 sizeof(struct merger_source),
> > +						 "merger_source");
> > +			return 1;
> > +		}
> > +
> > +		/*
> > +		 * Set type and tuple to correctly proceed in
> > +		 * merger_iterator_delete() in case of any further
> > +		 * error.
> > +		 */
> > +		struct ibuf *buf = NULL;
> > +		current_source->type = parse_source_type(L, -1, &buf);
> > +		current_source->tuple = NULL;
> > +
> > +		/*
> > +		 * Note: We need to increment sources count right
> > +		 * after successful malloc() of the new source
> > +		 * (before any further error check), because
> > +		 * merger_iterator_delete() frees that amount of
> > +		 * sources.
> > +		 */
> > +		++it->sources_count;
> > +
> > +		/* Initialize the new source. */
> > +		switch (current_source->type) {
> > +		case SOURCE_TYPE_BUFFER:
> > +			if (!decode_header(it, buf,
> > +			    &current_source->buf.remaining_tuples_cnt)) {
> > +				lua_pushstring(L, "Invalid merge source");
> > +				return 1;
> > +			}
> > +			current_source->buf.buf = buf;
> > +			break;
> > +		case SOURCE_TYPE_TABLE:
> > +			/* Save a table ref and a next index. */
> > +			lua_pushvalue(L, -1); /* Popped by luaL_ref(). */
> > +			int tbl_ref = luaL_ref(L, LUA_REGISTRYINDEX);
> > +			current_source->tbl.ref = tbl_ref;
> > +			current_source->tbl.next_idx = 1;
> > +			break;
> > +		case SOURCE_TYPE_ITERATOR:
> > +			/* Wrap and save iterator. */
> > +			current_source->it.it =
> > +				lua_iterator_new_fromtable(L, -1);
> > +			break;
> > +		case SOURCE_TYPE_NONE:
> > +			lua_pushfstring(L, "Unknown source type at index %d",
> > +					it->sources_count);
> > +			return 1;
> > +		default:
> > +			unreachable();
> > +			return 1;
> > +		}
> > +		if (source_fetch(L, current_source, merger->format) != 0)
> > +			return 1;
> > +		if (current_source->tuple != NULL)
> > +			MERGER_HEAP_INSERT(&it->heap,
> > +					   &current_source->hnode,
> > +					   current_source);
> > +	}
> > +	lua_pop(L, it->sources_count + 1);
> 
> This function is soo long. Let's split it.

I have wrapped sources creation into the merger_source_new() function
and the parse_sources() function now is within the screen of code.

> 
> > +
> > +	return 0;
> > +}
> 
> > +/**
> > + * Create the new merger instance.
> > + *
> > + * Expected a table of key parts on the Lua stack.
> > + *
> > + * Returns the new instance.
> > + */
> > +static int
> > +lbox_merger_new(struct lua_State *L)
> > +{
> > +	if (lua_gettop(L) != 1 || lua_istable(L, 1) != 1)
> > +		return luaL_error(L, "Bad params, use: merger.new({"
> > +				  "{fieldno = fieldno, type = type"
> > +				  "[, is_nullable = is_nullable"
> > +				  "[, collation_id = collation_id"
> > +				  "[, collation = collation]]]}, ...}");
> > +	uint32_t key_parts_count = 0;
> > +	uint32_t capacity = 8;
> > +
> > +	const ssize_t parts_size = sizeof(struct key_part_def) * capacity;
> > +	struct key_part_def *parts = NULL;
> > +	parts = (struct key_part_def *) malloc(parts_size);
> > +	if (parts == NULL)
> > +		throw_out_of_memory_error(L, parts_size, "parts");
> > +
> > +	while (true) {
> > +		lua_pushinteger(L, key_parts_count + 1);
> > +		lua_gettable(L, 1);
> > +		if (lua_isnil(L, -1))
> > +			break;
> > +
> > +		/* Extend parts if necessary. */
> > +		if (key_parts_count == capacity) {
> 
> Let's factor out key_def creation and define it elsewhere
> (src/box/lua/key_def.[hc]?)

Good idea. I have added the code almost as is under name
luaT_new_key_def() to src/box/lua/key_def.c, added to exports (and
module.h) to test.

I don't much like the idea of exporting it, but I don't know how to test
it in the unit-test way: it depends on box stuff and requires lua state.

> 
> Can we somehow reuse the code of key_def_decode_parts for this?

In short: no, we cannot, as I see.

Merger parses two formats:

* box.space.s.index.pk.parts Lua table;
* net_box_conn.space.s.index.pk.parts Lua table.

They are almost identical, but former shows collation by name in
'collation' field, while latter shows it in 'collation_id' field as a
number.

box.space._index:get(<...>)[6] format is very different from them:

* it can be an array as well as map;
* 'field' instead of 'fieldno';
* 'nullable_action' enum instead of 'is_nullable' flag.

> 
> > +			capacity *= 2;
> > +			struct key_part_def *old_parts = parts;
> > +			const ssize_t parts_size =
> > +				sizeof(struct key_part_def) * capacity;
> > +			parts = (struct key_part_def *) realloc(parts,
> > +								parts_size);
> > +			if (parts == NULL) {
> > +				free(old_parts);
> > +				throw_out_of_memory_error(L, parts_size,
> > +							  "parts");
> > +			}
> > +		}
> > +
> > +		/* Set parts[key_parts_count].fieldno. */
> > +		lua_pushstring(L, "fieldno");
> > +		lua_gettable(L, -2);
> > +		if (lua_isnil(L, -1)) {
> > +			free(parts);
> > +			return luaL_error(L, "fieldno must not be nil");
> > +		}
> > +		/*
> > +		 * Transform one-based Lua fieldno to zero-based
> > +		 * fieldno to use in key_def_new().
> > +		 */
> > +		parts[key_parts_count].fieldno = lua_tointeger(L, -1) - 1;
> > +		lua_pop(L, 1);
> > +
> > +		/* Set parts[key_parts_count].type. */
> > +		lua_pushstring(L, "type");
> > +		lua_gettable(L, -2);
> > +		if (lua_isnil(L, -1)) {
> > +			free(parts);
> > +			return luaL_error(L, "type must not be nil");
> > +		}
> > +		size_t type_len;
> > +		const char *type_name = lua_tolstring(L, -1, &type_len);
> > +		lua_pop(L, 1);
> > +		parts[key_parts_count].type = field_type_by_name(type_name,
> > +								 type_len);
> > +		if (parts[key_parts_count].type == field_type_MAX) {
> > +			free(parts);
> > +			return luaL_error(L, "Unknown field type: %s",
> > +					  type_name);
> > +		}
> > +
> > +		/* Set parts[key_parts_count].is_nullable. */
> > +		lua_pushstring(L, "is_nullable");
> > +		lua_gettable(L, -2);
> > +		if (lua_isnil(L, -1)) {
> > +			parts[key_parts_count].is_nullable = false;
> > +			parts[key_parts_count].nullable_action =
> > +				ON_CONFLICT_ACTION_DEFAULT;
> > +		} else {
> > +			parts[key_parts_count].is_nullable =
> > +				lua_toboolean(L, -1);
> > +			parts[key_parts_count].nullable_action =
> > +				ON_CONFLICT_ACTION_NONE;
> > +		}
> > +		lua_pop(L, 1);
> > +
> > +		/* Set parts[key_parts_count].coll_id using collation_id. */
> > +		lua_pushstring(L, "collation_id");
> > +		lua_gettable(L, -2);
> > +		if (lua_isnil(L, -1))
> > +			parts[key_parts_count].coll_id = COLL_NONE;
> > +		else
> > +			parts[key_parts_count].coll_id = lua_tointeger(L, -1);
> > +		lua_pop(L, 1);
> > +
> > +		/* Set parts[key_parts_count].coll_id using collation. */
> > +		lua_pushstring(L, "collation");
> > +		lua_gettable(L, -2);
> > +		/* Check whether box.cfg{} was called. */
> > +		if ((parts[key_parts_count].coll_id != COLL_NONE ||
> > +		    !lua_isnil(L, -1)) && !box_is_configured()) {
> > +			free(parts);
> > +			return luaL_error(L, "Cannot use collations: "
> > +					  "please call box.cfg{}");
> > +		}
> > +		if (!lua_isnil(L, -1)) {
> > +			if (parts[key_parts_count].coll_id != COLL_NONE) {
> > +				free(parts);
> > +				return luaL_error(
> > +					L, "Conflicting options: collation_id "
> > +					"and collation");
> > +			}
> > +			size_t coll_name_len;
> > +			const char *coll_name = lua_tolstring(L, -1,
> > +							      &coll_name_len);
> > +			struct coll_id *coll_id = coll_by_name(coll_name,
> > +							       coll_name_len);
> > +			if (coll_id == NULL) {
> > +				free(parts);
> > +				return luaL_error(
> > +					L, "Unknown collation: \"%s\"",
> > +					coll_name);
> > +			}
> > +			parts[key_parts_count].coll_id = coll_id->id;
> > +		}
> > +		lua_pop(L, 1);
> > +
> > +		/* Check coll_id. */
> > +		struct coll_id *coll_id =
> > +			coll_by_id(parts[key_parts_count].coll_id);
> > +		if (parts[key_parts_count].coll_id != COLL_NONE &&
> > +		    coll_id == NULL) {
> > +			uint32_t collation_id = parts[key_parts_count].coll_id;
> > +			free(parts);
> > +			return luaL_error(L, "Unknown collation_id: %d",
> > +					  collation_id);
> > +		}
> > +
> > +		/* Set parts[key_parts_count].sort_order. */
> > +		parts[key_parts_count].sort_order = SORT_ORDER_ASC;
> > +
> > +		++key_parts_count;
> > +	}
> > +
> > +	struct merger *merger = calloc(1, sizeof(*merger));
> > +	if (merger == NULL) {
> > +		free(parts);
> > +		throw_out_of_memory_error(L, sizeof(*merger), "merger");
> > +	}
> > +	merger->key_def = key_def_new(parts, key_parts_count);
> > +	free(parts);
> > +	if (merger->key_def == NULL) {
> > +		return luaL_error(L, "Cannot create merger->key_def");
> > +	}
> > +
> > +	merger->format = box_tuple_format_new(&merger->key_def, 1);
> > +	if (merger->format == NULL) {
> > +		box_key_def_delete(merger->key_def);
> > +		free(merger);
> > +		return luaL_error(L, "Cannot create merger->format");
> > +	}
> > +
> > +	*(struct merger **) luaL_pushcdata(L, merger_type_id) = merger;
> > +
> > +	lua_pushcfunction(L, lbox_merger_gc);
> > +	luaL_setcdatagc(L, -2);
> > +
> > +	return 1;
> > +}



More information about the Tarantool-patches mailing list