[PATCH 3/3] Add merger for tuple streams

Vladimir Davydov vdavydov.dev at gmail.com
Wed Dec 26 23:11:10 MSK 2018


On Sun, Dec 16, 2018 at 11:17:26PM +0300, Alexander Turenko wrote:
> Fixes #3276.

DocBot request is missing. I guess you could paste that nice piece of
documentation you wrote in the source file right in the commit message.

> ---
>  src/CMakeLists.txt           |    2 +
>  src/lua/init.c               |    5 +
>  src/lua/merger.c             | 1643 ++++++++++++++++++++++++++++++++++
>  src/lua/merger.h             |   39 +
>  src/lua/merger.lua           |   19 +

Merger depends on tuple, key_def, and tuple_format, which are box
objects, hence it should be defined in src/box.

My main concern about the merger code added by this patch is that the
merger logic and its Lua wrapper are mixed together in the same file.
This makes the code difficult for understanding. It'd be great if you
could isolate the merger logic in src/box/merger.[hc] and leave only the
Lua wrapper code in src/box/lua/merger.[hc]. The merger could then be
submitted in a separate patch with an appropriate unit test - this would
also ease the review process.

A few comments regarding the API and the code are below.

>  test/app-tap/merger.test.lua |  693 ++++++++++++++
>  test/app-tap/suite.ini       |    1 +
>  7 files changed, 2402 insertions(+)
>  create mode 100644 src/lua/merger.c
>  create mode 100644 src/lua/merger.h
>  create mode 100644 src/lua/merger.lua
>  create mode 100755 test/app-tap/merger.test.lua

> diff --git a/src/lua/merger.c b/src/lua/merger.c
> +/**
> + * API and basic usage
> + * -------------------
> + *
> + * The following example demonstrates API of the module:
> + *
> + * ```
> + * local net_box = require('net.box')
> + * local buffer = require('buffer')
> + * local merger = require('merger')
> + *
> + * -- The format of key_parts parameter is the same as
> + * -- `{box,conn}.space.<...>.index.<...>.parts` (where conn is
> + * -- net.box connection).
> + * local key_parts = {
> + *     {
> + *         fieldno = <number>,
> + *         type = <string>,
> + *         [ is_nullable = <boolean>, ]
> + *         [ collation_id = <number>, ]
> + *         [ collation = <string>, ]
> + *     },
> + *     ...
> + * }
> + *
> + * -- Create the merger instance.
> + * local merger_inst = merger.new(key_parts)

IMO it's not a merger instance, it's rather a merger context.

What about

  local ctx = merger.context.new(key_parts)
  merger.pairs(ctx, {src1, src2, ...})

?

Note, ctx is a mere argument here, not a class object.

> + * Decoding / encoding buffers
> + * ---------------------------
> + *
> + * A select response has the following structure:
> + * `{[48] = {tuples}}`, while a call response is
> + * `{[48] = {{tuples}}}` (because it should support multiple
> + * return values). A user should specify how merger will
> + * operate on buffers, so merger has `decode` (how to read buffer
> + * sources) and `encode` (how to write to a resulting buffer)
> + * options. These options accept the following values:
> + *
> + * Option value       | Buffer structure
> + * ------------------ | ----------------
> + * 'raw'              | tuples
> + * 'select' (default) | {[48] = {tuples}}
> + * 'call'             | {[48] = {{tuples}}}
> + * 'chain'            | {[48] = {{{tuples, ...}}}}

I don't think we should make merger dependent on iproto. I understand
that it must be able to take an ibuf for performance considerations, but
I think that the buffer must always be formatted as a msgpack array of
tuples, without any extra headers. Headers should be removed either by
the caller (with msgpack Lua lib) or by net.box itself.

> +#ifndef NDEBUG
> +#include "say.h"
> +/**
> + * Heap insert/delete/update macros wrapped with debug prints.
> + */
> +#define MERGER_HEAP_INSERT(heap, hnode, source) do {			\
> +	say_debug("merger: [source %p] insert: tuple: %s", (source),	\
> +		  tuple_str((source)->tuple));				\
> +	merger_heap_insert((heap), (hnode));				\
> +} while(0)
> +#define MERGER_HEAP_DELETE(heap, hnode, source) do {		\
> +	say_debug("merger: [source %p] delete", (source));	\
> +	merger_heap_delete((heap), (hnode));			\
> +} while(0)
> +#define MERGER_HEAP_UPDATE(heap, hnode, source) do {			\
> +	say_debug("merger: [source %p] update: tuple: %s", (source),	\
> +		  tuple_str((source)->tuple));				\
> +	merger_heap_update((heap), (hnode));				\
> +} while(0)
> +#else /* !defined(NDEBUG) */
> +/**
> + * Heap insert/delete/update macros wrappers w/o debug prints.
> + */
> +#define MERGER_HEAP_INSERT(heap, hnode, source) do {	\
> +	merger_heap_insert((heap), (hnode));		\
> +} while(0)
> +#define MERGER_HEAP_DELETE(heap, hnode, source) do {	\
> +	merger_heap_delete((heap), (hnode));		\
> +} while(0)
> +#define MERGER_HEAP_UPDATE(heap, hnode, source) do {	\
> +	merger_heap_update((heap), (hnode));		\
> +} while(0)
> +#endif /* !defined(NDEBUG) */

say_debug() doesn't evaluate arguments if log_level < DEBUG so I think
it's no use to disable it if NDEBUG. I'd remove these macros and call
say_debug() directly at call sites.

Also, heap function may fail with ENOMEM unless you use heap_reserve().

> +
> +/**
> + * Helper macros to push / throw out of memory errors to Lua.
> + */
> +#define push_out_of_memory_error(L, size, what_name) do {	\
> +	diag_set(OutOfMemory, (size), "malloc", (what_name));	\
> +	luaT_pusherror(L, diag_last_error(diag_get()));		\
> +} while(0)
> +#define throw_out_of_memory_error(L, size, what_name) do {	\
> +	diag_set(OutOfMemory, (size), "malloc", (what_name));	\
> +	luaT_error(L);						\
> +	unreachable();						\
> +	return -1;						\
> +} while(0)

I wouldn't use these macros. They make a call only one line shorter:

	throw_out_of_memory_error(L, size, "obj");

instead of

	diag_set(OutOfMemory, size, "malloc", "obj");
	return luaT_error(L);

Not worth obscuring the code IMO.

> +
> +#define BOX_COLLATION_NAME_INDEX 1

Not used anywhere.

> +
> +/**
> + * A type of data structure that holds source data.
> + */
> +enum merger_source_type {
> +	SOURCE_TYPE_BUFFER,
> +	SOURCE_TYPE_TABLE,
> +	SOURCE_TYPE_ITERATOR,
> +	SOURCE_TYPE_NONE,
> +};

I'd prefer if you used vtab instead, because that would isolate code
and data of each iterator type in a separate function/struct, making
it easier to follow. Besides, you'll have to do that anyway provided
you agree to move the merger logic to src/box/merger.c.

> +
> +/**
> + * How data are encoded in a buffer.
> + *
> + * `decode` and `encode` options are parsed to values of this
> + * enum.
> + */
> +enum merger_buffer_type {
> +	BUFFER_TYPE_RAW,
> +	BUFFER_TYPE_SELECT,
> +	BUFFER_TYPE_CALL,
> +	BUFFER_TYPE_CHAIN,
> +	BUFFER_TYPE_NONE,
> +};
> +
> +/**
> + * Hold state of a merge source.
> + */
> +struct merger_source {
> +	/*
> +	 * A source is the heap node. Compared by the next tuple.
> +	 */
> +	struct heap_node hnode;
> +	/* Union determinant. */
> +	enum merger_source_type type;
> +	/* Fields specific for certaint source types. */
> +	union {
> +		/* Buffer source. */
> +		struct {
> +			struct ibuf *buf;
> +			/*
> +			 * A merger stops before end of a buffer
> +			 * when it is not the last merger in the
> +			 * chain.
> +			 */
> +			size_t remaining_tuples_cnt;
> +		} buf;
> +		/* Table source. */
> +		struct {
> +			int ref;
> +			int next_idx;
> +		} tbl;
> +		/* Iterator source. */
> +		struct {
> +			struct lua_iterator *it;
> +		} it;
> +	};
> +	/* Next tuple. */
> +	struct tuple *tuple;
> +};
> +
> +/**
> + * Holds immutable parameters of a merger.
> + */
> +struct merger {

Should be called merger_context IMO.

> +	struct key_def *key_def;
> +	box_tuple_format_t *format;
> +};
> +
> +/**
> + * Holds parameters of merge process, sources, result storage
> + * (if any), heap of sources and utility flags / counters.
> + */
> +struct merger_iterator {

Should be called merger or merger_state IMO.

> +	/* Heap of sources. */
> +	heap_t heap;
> +	/*
> +	 * key_def is copied from merger.
> +	 *
> +	 * A merger can be collected by LuaJIT GC independently
> +	 * from a merger_iterator, so we cannot just save pointer
> +	 * to merger here and so we copy key_def from merger.
> +	 */
> +	struct key_def *key_def;

And what about the tuple format? You don't seem to copy or reference it
anywhere. Confusing. I think that if the iterator needs the merger to
stay along, it'd better reference it.

> +	/* Parsed sources and decoding parameters. */
> +	uint32_t sources_count;
> +	struct merger_source **sources;
> +	enum merger_buffer_type decode;
> +	/* Ascending / descending order. */
> +	int order;
> +	/* Optional output buffer and encoding parameters. */
> +	struct ibuf *obuf;
> +	enum merger_buffer_type encode;
> +	uint32_t encode_chain_len;
> +};
> +
> +static uint32_t merger_type_id = 0;
> +static uint32_t merger_iterator_type_id = 0;
> +static uint32_t ibuf_type_id = 0;
> +
> +/* Forward declarations. */
> +static bool
> +source_less(const heap_t *heap, const struct heap_node *a,
> +	    const struct heap_node *b);
> +static int
> +lbox_merger_gc(struct lua_State *L);
> +static void
> +merger_iterator_delete(struct lua_State *L, struct merger_iterator *it);
> +static int
> +lbox_merger_iterator_gc(struct lua_State *L);
> +
> +#define HEAP_NAME merger_heap
> +#define HEAP_LESS source_less
> +#include "salad/heap.h"
> +
> +/**
> + * Create the new tuple with specific format from a Lua table or a
> + * tuple.
> + *
> + * In case of an error push the error message to the Lua stack and
> + * return NULL.
> + */
> +static struct tuple *
> +luaT_gettuple_with_format(struct lua_State *L, int idx,
> +			  box_tuple_format_t *format)
> +{
> +	struct tuple *tuple;
> +	if (lua_istable(L, idx)) {
> +		/* Based on lbox_tuple_new() code. */

Please define this as a separate function somewhere in
src/box/lua/tuple.c and reuse lbox_tuple_new() code instead
of copying it. May be done in a separate patch, I guess.

> +		struct ibuf *buf = tarantool_lua_ibuf;
> +		ibuf_reset(buf);
> +		struct mpstream stream;
> +		mpstream_init(&stream, buf, ibuf_reserve_cb, ibuf_alloc_cb,
> +		      luamp_error, L);
> +		luamp_encode_tuple(L, luaL_msgpack_default, &stream, idx);
> +		mpstream_flush(&stream);
> +		tuple = box_tuple_new(format, buf->buf,
> +				      buf->buf + ibuf_used(buf));
> +		if (tuple == NULL) {
> +			luaT_pusherror(L, diag_last_error(diag_get()));
> +			return NULL;
> +		}
> +		ibuf_reinit(tarantool_lua_ibuf);
> +		return tuple;
> +	}
> +	tuple = luaT_istuple(L, idx);
> +	if (tuple == NULL) {
> +		lua_pushfstring(L, "A tuple or a table expected, got %s",
> +				lua_typename(L, lua_type(L, -1)));
> +		return NULL;
> +	}
> +	/*
> +	 * Create the new tuple with the format necessary for fast
> +	 * comparisons.
> +	 */
> +	const char *tuple_beg = tuple_data(tuple);
> +	const char *tuple_end = tuple_beg + tuple->bsize;
> +	tuple = box_tuple_new(format, tuple_beg, tuple_end);
> +	if (tuple == NULL) {
> +		luaT_pusherror(L, diag_last_error(diag_get()));
> +		return NULL;
> +	}
> +	return tuple;
> +}

> +#define RPOS_P(buf) ((const char **) &(buf)->rpos)

Please, let's somehow get along without this macro.

> +
> +/**
> + * Skip (and check) the wrapper around tuples array (and the array
> + * itself).
> + *
> + * Expected different kind of wrapping depending of it->decode.
> + */
> +static int
> +decode_header(struct merger_iterator *it, struct ibuf *buf, size_t *len_p)
> +{
> +	int ok = 1;
> +	/* Decode {[IPROTO_DATA] = ...} header. */
> +	if (it->decode != BUFFER_TYPE_RAW)
> +		ok = mp_typeof(*buf->rpos) == MP_MAP &&
> +			mp_decode_map(RPOS_P(buf)) == 1 &&
> +			mp_typeof(*buf->rpos) == MP_UINT &&
> +			mp_decode_uint(RPOS_P(buf)) == IPROTO_DATA;
> +	/* Decode the array around call return values. */
> +	if (ok && (it->decode == BUFFER_TYPE_CALL ||
> +	    it->decode == BUFFER_TYPE_CHAIN))
> +		ok = mp_typeof(*buf->rpos) == MP_ARRAY &&
> +			mp_decode_array(RPOS_P(buf)) > 0;
> +	/* Decode the array around chained input. */
> +	if (ok && it->decode == BUFFER_TYPE_CHAIN)
> +		ok = mp_typeof(*buf->rpos) == MP_ARRAY &&
> +			mp_decode_array(RPOS_P(buf)) > 0;
> +	/* Decode the array around tuples to merge. */
> +	if (ok)
> +		ok = mp_typeof(*buf->rpos) == MP_ARRAY;
> +	if (ok)
> +		*len_p = mp_decode_array(RPOS_P(buf));
> +	return ok;
> +}
> +
> +#undef RPOS_P

> +/**
> + * Determine type of a merger source on the Lua stack.
> + *
> + * Set *buf_p to buffer when the source is valid source of buffer
> + * type and buf_p is not NULL.
> + */
> +static enum merger_source_type
> +parse_source_type(lua_State *L, int idx, struct ibuf **buf_p)
> +{
> +	if (lua_type(L, idx) == LUA_TCDATA) {
> +		struct ibuf *buf = check_ibuf(L, idx);

msgpack.decode takes ibuf.rpos rather than ibuf. May be, the merger
should do the same, for consistency?

> +		if (buf == NULL)
> +			return SOURCE_TYPE_NONE;
> +		if (buf_p != NULL)
> +			*buf_p = buf;
> +		return SOURCE_TYPE_BUFFER;
> +	} else if (lua_istable(L, idx)) {
> +		lua_rawgeti(L, idx, 1);
> +		int iscallable = luaT_iscallable(L, idx);
> +		lua_pop(L, 1);
> +		if (iscallable)
> +			return SOURCE_TYPE_ITERATOR;
> +		return SOURCE_TYPE_TABLE;
> +	}
> +
> +	return SOURCE_TYPE_NONE;
> +}

> +/**
> + * Parse sources table: second parameter pf merger_isnt:pairs()
> + * and merger_inst:select() into the merger_iterator structure.
> + *
> + * Note: This function should be called when options are already
> + * parsed (using parse_opts()).
> + *
> + * Returns 0 on success. In case of an error it pushes an error
> + * message to the Lua stack and returns 1.
> + */
> +static int
> +parse_sources(struct lua_State *L, int idx, struct merger *merger,
> +	      struct merger_iterator *it)
> +{
> +	/* Allocate sources array. */
> +	uint32_t capacity = 8;
> +	const ssize_t sources_size = capacity * sizeof(struct merger_source *);
> +	it->sources = (struct merger_source **) malloc(sources_size);
> +	if (it->sources == NULL) {
> +		push_out_of_memory_error(L, sources_size, "it->sources");
> +		return 1;
> +	}
> +
> +	/* Fetch all sources. */
> +	while (true) {
> +		lua_pushinteger(L, it->sources_count + 1);
> +		lua_gettable(L, idx);
> +		if (lua_isnil(L, -1))
> +			break;
> +
> +		/* Shrink sources array if needed. */

Grow

> +		if (it->sources_count == capacity) {
> +			capacity *= 2;
> +			struct merger_source **new_sources;
> +			const ssize_t new_sources_size =
> +				capacity * sizeof(struct merger_source *);
> +			new_sources = (struct merger_source **) realloc(
> +				it->sources, new_sources_size);
> +			if (new_sources == NULL) {
> +				push_out_of_memory_error(L, new_sources_size,
> +							 "new_sources");
> +				return 1;
> +			}
> +			it->sources = new_sources;
> +		}
> +
> +		/* Allocate the new source. */
> +		it->sources[it->sources_count] = (struct merger_source *)
> +			malloc(sizeof(struct merger_source));
> +		struct merger_source *current_source =
> +			it->sources[it->sources_count];
> +		if (current_source == NULL) {
> +			push_out_of_memory_error(L,
> +						 sizeof(struct merger_source),
> +						 "merger_source");
> +			return 1;
> +		}
> +
> +		/*
> +		 * Set type and tuple to correctly proceed in
> +		 * merger_iterator_delete() in case of any further
> +		 * error.
> +		 */
> +		struct ibuf *buf = NULL;
> +		current_source->type = parse_source_type(L, -1, &buf);
> +		current_source->tuple = NULL;
> +
> +		/*
> +		 * Note: We need to increment sources count right
> +		 * after successful malloc() of the new source
> +		 * (before any further error check), because
> +		 * merger_iterator_delete() frees that amount of
> +		 * sources.
> +		 */
> +		++it->sources_count;
> +
> +		/* Initialize the new source. */
> +		switch (current_source->type) {
> +		case SOURCE_TYPE_BUFFER:
> +			if (!decode_header(it, buf,
> +			    &current_source->buf.remaining_tuples_cnt)) {
> +				lua_pushstring(L, "Invalid merge source");
> +				return 1;
> +			}
> +			current_source->buf.buf = buf;
> +			break;
> +		case SOURCE_TYPE_TABLE:
> +			/* Save a table ref and a next index. */
> +			lua_pushvalue(L, -1); /* Popped by luaL_ref(). */
> +			int tbl_ref = luaL_ref(L, LUA_REGISTRYINDEX);
> +			current_source->tbl.ref = tbl_ref;
> +			current_source->tbl.next_idx = 1;
> +			break;
> +		case SOURCE_TYPE_ITERATOR:
> +			/* Wrap and save iterator. */
> +			current_source->it.it =
> +				lua_iterator_new_fromtable(L, -1);
> +			break;
> +		case SOURCE_TYPE_NONE:
> +			lua_pushfstring(L, "Unknown source type at index %d",
> +					it->sources_count);
> +			return 1;
> +		default:
> +			unreachable();
> +			return 1;
> +		}
> +		if (source_fetch(L, current_source, merger->format) != 0)
> +			return 1;
> +		if (current_source->tuple != NULL)
> +			MERGER_HEAP_INSERT(&it->heap,
> +					   &current_source->hnode,
> +					   current_source);
> +	}
> +	lua_pop(L, it->sources_count + 1);

This function is soo long. Let's split it.

> +
> +	return 0;
> +}

> +/**
> + * Create the new merger instance.
> + *
> + * Expected a table of key parts on the Lua stack.
> + *
> + * Returns the new instance.
> + */
> +static int
> +lbox_merger_new(struct lua_State *L)
> +{
> +	if (lua_gettop(L) != 1 || lua_istable(L, 1) != 1)
> +		return luaL_error(L, "Bad params, use: merger.new({"
> +				  "{fieldno = fieldno, type = type"
> +				  "[, is_nullable = is_nullable"
> +				  "[, collation_id = collation_id"
> +				  "[, collation = collation]]]}, ...}");
> +	uint32_t key_parts_count = 0;
> +	uint32_t capacity = 8;
> +
> +	const ssize_t parts_size = sizeof(struct key_part_def) * capacity;
> +	struct key_part_def *parts = NULL;
> +	parts = (struct key_part_def *) malloc(parts_size);
> +	if (parts == NULL)
> +		throw_out_of_memory_error(L, parts_size, "parts");
> +
> +	while (true) {
> +		lua_pushinteger(L, key_parts_count + 1);
> +		lua_gettable(L, 1);
> +		if (lua_isnil(L, -1))
> +			break;
> +
> +		/* Extend parts if necessary. */
> +		if (key_parts_count == capacity) {

Let's factor out key_def creation and define it elsewhere
(src/box/lua/key_def.[hc]?)

Can we somehow reuse the code of key_def_decode_parts for this?

> +			capacity *= 2;
> +			struct key_part_def *old_parts = parts;
> +			const ssize_t parts_size =
> +				sizeof(struct key_part_def) * capacity;
> +			parts = (struct key_part_def *) realloc(parts,
> +								parts_size);
> +			if (parts == NULL) {
> +				free(old_parts);
> +				throw_out_of_memory_error(L, parts_size,
> +							  "parts");
> +			}
> +		}
> +
> +		/* Set parts[key_parts_count].fieldno. */
> +		lua_pushstring(L, "fieldno");
> +		lua_gettable(L, -2);
> +		if (lua_isnil(L, -1)) {
> +			free(parts);
> +			return luaL_error(L, "fieldno must not be nil");
> +		}
> +		/*
> +		 * Transform one-based Lua fieldno to zero-based
> +		 * fieldno to use in key_def_new().
> +		 */
> +		parts[key_parts_count].fieldno = lua_tointeger(L, -1) - 1;
> +		lua_pop(L, 1);
> +
> +		/* Set parts[key_parts_count].type. */
> +		lua_pushstring(L, "type");
> +		lua_gettable(L, -2);
> +		if (lua_isnil(L, -1)) {
> +			free(parts);
> +			return luaL_error(L, "type must not be nil");
> +		}
> +		size_t type_len;
> +		const char *type_name = lua_tolstring(L, -1, &type_len);
> +		lua_pop(L, 1);
> +		parts[key_parts_count].type = field_type_by_name(type_name,
> +								 type_len);
> +		if (parts[key_parts_count].type == field_type_MAX) {
> +			free(parts);
> +			return luaL_error(L, "Unknown field type: %s",
> +					  type_name);
> +		}
> +
> +		/* Set parts[key_parts_count].is_nullable. */
> +		lua_pushstring(L, "is_nullable");
> +		lua_gettable(L, -2);
> +		if (lua_isnil(L, -1)) {
> +			parts[key_parts_count].is_nullable = false;
> +			parts[key_parts_count].nullable_action =
> +				ON_CONFLICT_ACTION_DEFAULT;
> +		} else {
> +			parts[key_parts_count].is_nullable =
> +				lua_toboolean(L, -1);
> +			parts[key_parts_count].nullable_action =
> +				ON_CONFLICT_ACTION_NONE;
> +		}
> +		lua_pop(L, 1);
> +
> +		/* Set parts[key_parts_count].coll_id using collation_id. */
> +		lua_pushstring(L, "collation_id");
> +		lua_gettable(L, -2);
> +		if (lua_isnil(L, -1))
> +			parts[key_parts_count].coll_id = COLL_NONE;
> +		else
> +			parts[key_parts_count].coll_id = lua_tointeger(L, -1);
> +		lua_pop(L, 1);
> +
> +		/* Set parts[key_parts_count].coll_id using collation. */
> +		lua_pushstring(L, "collation");
> +		lua_gettable(L, -2);
> +		/* Check whether box.cfg{} was called. */
> +		if ((parts[key_parts_count].coll_id != COLL_NONE ||
> +		    !lua_isnil(L, -1)) && !box_is_configured()) {
> +			free(parts);
> +			return luaL_error(L, "Cannot use collations: "
> +					  "please call box.cfg{}");
> +		}
> +		if (!lua_isnil(L, -1)) {
> +			if (parts[key_parts_count].coll_id != COLL_NONE) {
> +				free(parts);
> +				return luaL_error(
> +					L, "Conflicting options: collation_id "
> +					"and collation");
> +			}
> +			size_t coll_name_len;
> +			const char *coll_name = lua_tolstring(L, -1,
> +							      &coll_name_len);
> +			struct coll_id *coll_id = coll_by_name(coll_name,
> +							       coll_name_len);
> +			if (coll_id == NULL) {
> +				free(parts);
> +				return luaL_error(
> +					L, "Unknown collation: \"%s\"",
> +					coll_name);
> +			}
> +			parts[key_parts_count].coll_id = coll_id->id;
> +		}
> +		lua_pop(L, 1);
> +
> +		/* Check coll_id. */
> +		struct coll_id *coll_id =
> +			coll_by_id(parts[key_parts_count].coll_id);
> +		if (parts[key_parts_count].coll_id != COLL_NONE &&
> +		    coll_id == NULL) {
> +			uint32_t collation_id = parts[key_parts_count].coll_id;
> +			free(parts);
> +			return luaL_error(L, "Unknown collation_id: %d",
> +					  collation_id);
> +		}
> +
> +		/* Set parts[key_parts_count].sort_order. */
> +		parts[key_parts_count].sort_order = SORT_ORDER_ASC;
> +
> +		++key_parts_count;
> +	}
> +
> +	struct merger *merger = calloc(1, sizeof(*merger));
> +	if (merger == NULL) {
> +		free(parts);
> +		throw_out_of_memory_error(L, sizeof(*merger), "merger");
> +	}
> +	merger->key_def = key_def_new(parts, key_parts_count);
> +	free(parts);
> +	if (merger->key_def == NULL) {
> +		return luaL_error(L, "Cannot create merger->key_def");
> +	}
> +
> +	merger->format = box_tuple_format_new(&merger->key_def, 1);
> +	if (merger->format == NULL) {
> +		box_key_def_delete(merger->key_def);
> +		free(merger);
> +		return luaL_error(L, "Cannot create merger->format");
> +	}
> +
> +	*(struct merger **) luaL_pushcdata(L, merger_type_id) = merger;
> +
> +	lua_pushcfunction(L, lbox_merger_gc);
> +	luaL_setcdatagc(L, -2);
> +
> +	return 1;
> +}



More information about the Tarantool-patches mailing list