Tarantool development patches archive
 help / color / mirror / Atom feed
From: Alexander Turenko <alexander.turenko@tarantool.org>
To: Vladimir Davydov <vdavydov.dev@gmail.com>
Cc: Alexander Turenko <alexander.turenko@tarantool.org>,
	tarantool-patches@freelists.org
Subject: [PATCH 3/3] Add merger for tuple streams
Date: Sun, 16 Dec 2018 23:17:26 +0300	[thread overview]
Message-ID: <29bd3bbb8544f5b0e070e6b7880dad71f709a7d9.1544989900.git.alexander.turenko@tarantool.org> (raw)
In-Reply-To: <cover.1544989900.git.alexander.turenko@tarantool.org>

Fixes #3276.
---
 src/CMakeLists.txt           |    2 +
 src/lua/init.c               |    5 +
 src/lua/merger.c             | 1643 ++++++++++++++++++++++++++++++++++
 src/lua/merger.h             |   39 +
 src/lua/merger.lua           |   19 +
 test/app-tap/merger.test.lua |  693 ++++++++++++++
 test/app-tap/suite.ini       |    1 +
 7 files changed, 2402 insertions(+)
 create mode 100644 src/lua/merger.c
 create mode 100644 src/lua/merger.h
 create mode 100644 src/lua/merger.lua
 create mode 100755 test/app-tap/merger.test.lua

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 04de5ad04..d2a5fc9c1 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -47,6 +47,7 @@ lua_source(lua_sources lua/trigger.lua)
 lua_source(lua_sources lua/table.lua)
 lua_source(lua_sources ../third_party/luafun/fun.lua)
 lua_source(lua_sources lua/httpc.lua)
+lua_source(lua_sources lua/merger.lua)
 lua_source(lua_sources lua/iconv.lua)
 # LuaJIT jit.* library
 lua_source(lua_sources "${CMAKE_BINARY_DIR}/third_party/luajit/src/jit/bc.lua")
@@ -181,6 +182,7 @@ set (server_sources
      lua/fio.c
      lua/crypto.c
      lua/httpc.c
+     lua/merger.c
      lua/utf8.c
      lua/info.c
      ${lua_sources}
diff --git a/src/lua/init.c b/src/lua/init.c
index ca4b47f3a..1eeab02b9 100644
--- a/src/lua/init.c
+++ b/src/lua/init.c
@@ -57,6 +57,7 @@
 #include "lua/pickle.h"
 #include "lua/fio.h"
 #include "lua/httpc.h"
+#include "lua/merger.h"
 #include "lua/utf8.h"
 #include "digest.h"
 #include <small/ibuf.h>
@@ -89,6 +90,7 @@ extern char strict_lua[],
 	errno_lua[],
 	fiber_lua[],
 	httpc_lua[],
+	merger_lua[],
 	log_lua[],
 	uri_lua[],
 	socket_lua[],
@@ -148,6 +150,7 @@ static const char *lua_modules[] = {
 	"internal.trigger", trigger_lua,
 	"pwd", pwd_lua,
 	"http.client", httpc_lua,
+	"merger", merger_lua,
 	"iconv", iconv_lua,
 	/* jit.* library */
 	"jit.vmdef", vmdef_lua,
@@ -452,6 +455,8 @@ tarantool_lua_init(const char *tarantool_bin, int argc, char **argv)
 	tarantool_lua_digest_init(L);
 	luaopen_http_client_driver(L);
 	lua_pop(L, 1);
+	luaopen_merger(L);
+	lua_pop(L, 1);
 	luaopen_msgpack(L);
 	lua_pop(L, 1);
 	luaopen_yaml(L);
diff --git a/src/lua/merger.c b/src/lua/merger.c
new file mode 100644
index 000000000..8caf8d47f
--- /dev/null
+++ b/src/lua/merger.c
@@ -0,0 +1,1643 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+/**
+ * API and basic usage
+ * -------------------
+ *
+ * The following example demonstrates API of the module:
+ *
+ * ```
+ * local net_box = require('net.box')
+ * local buffer = require('buffer')
+ * local merger = require('merger')
+ *
+ * -- The format of key_parts parameter is the same as
+ * -- `{box,conn}.space.<...>.index.<...>.parts` (where conn is
+ * -- net.box connection).
+ * local key_parts = {
+ *     {
+ *         fieldno = <number>,
+ *         type = <string>,
+ *         [ is_nullable = <boolean>, ]
+ *         [ collation_id = <number>, ]
+ *         [ collation = <string>, ]
+ *     },
+ *     ...
+ * }
+ *
+ * -- Create the merger instance.
+ * local merger_inst = merger.new(key_parts)
+ *
+ * -- Optional parameters.
+ * local opts = {
+ *     -- Output buffer, only for merger_inst:select(<...>).
+ *     [ buffer = <buffer>, ]
+ *     -- Ascending (default) or descending result order.
+ *     [ descending = <boolean>, ]
+ *     -- Buffer encoding / decoding options are described below.
+ *     [ decode = 'raw' / 'select' / 'call' / 'chain', ]
+ *     [ encode = 'raw' / 'select' / 'call' / 'chain', ]
+ *     [ encode_chain_len = <number>, ]
+ * }
+ *
+ * -- Prepare buffer source.
+ * local conn = net_box.connect('localhost:3301')
+ * local buf = buffer.ibuf()
+ * conn.space.s:select(nil, {buffer = buf}) -- read to buffer
+ *
+ * -- We have three sources here.
+ * local sources = {
+ *     buf,                   -- buffer source
+ *     box.space.s:select(),  -- table source
+ *     {box.space.s:pairs()}, -- iterator source
+ * }
+ *
+ * -- Read the whole result at once.
+ * local res = merger_inst:select(sources, opts)
+ *
+ * -- Read the result tuple per tuple.
+ * local res = {}
+ * for _, tuple in merger_inst:pairs(sources, opts) do
+ *     -- Some stop merge condition.
+ *     if tuple[1] > MAX_VALUE then break end
+ *     table.insert(res, tuple)
+ * end
+ *
+ * -- The same in the functional style.
+ * local function cond(tuple)
+ *     return tuple[1] <= MAX_VALUE
+ * end
+ * local res = merger_inst:pairs(sources, opts):take(cond):totable()
+ * ```
+ *
+ * The basic case of using merger is when there are M storages and
+ * data are partitioned (sharded) across them. A client want to
+ * fetch the data (tuples stream) from each storage and merge them
+ * into one tuple stream:
+ *
+ * ```
+ * local net_box = require('net.box')
+ * local buffer = require('buffer')
+ * local merger = require('merger')
+ *
+ * -- Prepare M sources.
+ * local connects = {
+ *     net_box.connect('localhost:3301'),
+ *     net_box.connect('localhost:3302'),
+ *     ...
+ *     net_box.connect('localhost:<...>'),
+ * }
+ * local sources = {}
+ * for _, conn in ipairs(connects) do
+ *     local buf = buffer.ibuf()
+ *     conn.space.<...>.index.<...>:select(<...>, {buffer = buf})
+ *     table.insert(sources, buf)
+ * end
+ *
+ * -- See the 'Notes...' section below.
+ * local key_parts = {}
+ * local space = connects[1].space.<...>
+ * local index = space.index.<...>
+ * for _, part in ipairs(index.parts) do
+ *     table.insert(key_parts, part)
+ * end
+ * if not index.unique then
+ *     for _, part in ipairs(space.index[0]) do
+ *         table.insert(key_parts, part)
+ *     end
+ * end
+ *
+ * -- Create the merger instance.
+ * local merger_inst = merger.new(key_parts)
+ *
+ * -- Merge.
+ * local res = merger_inst:select(sources)
+ * ```
+ *
+ * Notes re source sorting and key parts
+ * -------------------------------------
+ *
+ * The merger expects that each source tuples stream is sorted
+ * according to provided key parts and perform a kind of merge
+ * sort (choose minimal / maximal tuple across sources on each
+ * step). Tuples from select() from Tarantool's space are sorted
+ * according to key parts from index that was used. When secondary
+ * non-unique index is used tuples are sorted according to the key
+ * parts of the secondary index and, then, key parts of the
+ * primary index.
+ *
+ * Decoding / encoding buffers
+ * ---------------------------
+ *
+ * A select response has the following structure:
+ * `{[48] = {tuples}}`, while a call response is
+ * `{[48] = {{tuples}}}` (because it should support multiple
+ * return values). A user should specify how merger will
+ * operate on buffers, so merger has `decode` (how to read buffer
+ * sources) and `encode` (how to write to a resulting buffer)
+ * options. These options accept the following values:
+ *
+ * Option value       | Buffer structure
+ * ------------------ | ----------------
+ * 'raw'              | tuples
+ * 'select' (default) | {[48] = {tuples}}
+ * 'call'             | {[48] = {{tuples}}}
+ * 'chain'            | {[48] = {{{tuples, ...}}}}
+ *
+ * tuples is array of tuples. 'raw' and 'chain' options are about chaining
+ * mergers and they are described in the following section.
+ *
+ * How to check buffer data structure myself:
+ *
+ * ```
+ * #!usr/bin/env tarantool
+ *
+ * local net_box = require('net.box')
+ * local buffer = require('buffer')
+ * local ffi = require('ffi')
+ * local msgpack = require('msgpack')
+ * local yaml = require('yaml')
+ *
+ * box.cfg{listen = 3301}
+ * box.once('load_data', function()
+ *     box.schema.user.grant('guest', 'read,write,execute', 'universe')
+ *     box.schema.space.create('s')
+ *     box.space.s:create_index('pk')
+ *     box.space.s:insert({1})
+ *     box.space.s:insert({2})
+ *     box.space.s:insert({3})
+ *     box.space.s:insert({4})
+ * end)
+ *
+ * local function foo()
+ *     return box.space.s:select()
+ * end
+ * _G.foo = foo
+ *
+ * local conn = net_box.connect('localhost:3301')
+ *
+ * local buf = buffer.ibuf()
+ * conn.space.s:select(nil, {buffer = buf})
+ * local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
+ * local buf_lua = msgpack.decode(buf_str)
+ * print('select:\n' .. yaml.encode(buf_lua))
+ *
+ * local buf = buffer.ibuf()
+ * conn:call('foo', nil, {buffer = buf})
+ * local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
+ * local buf_lua = msgpack.decode(buf_str)
+ * print('call:\n' .. yaml.encode(buf_lua))
+ *
+ * os.exit()
+ * ```
+ *
+ * The `decode` option changes decoding algorithm of source
+ * buffers and does nothing for sources of other types. It will be
+ * completely ignored if there are no buffer sources.
+ *
+ * The `encode` option changes encoding algorithm of resulting
+ * buffer. When the option is provided, the `buffer` option should
+ * be provided too. When `encode` is 'chain', the
+ * `encode_chain_len` option is mandatory.
+ *
+ * Chaining mergers
+ * ----------------
+ *
+ * Chaining mergers is needed to process a batch select request,
+ * when one response (buffer) contains several results (tuple
+ * arrays) to merge with another responses of this kind. Reshaping
+ * of such results into separate buffers or lua table would lead
+ * to extra data copies within Lua memory and extra time consuming
+ * msgpack decoding, so the merger supports this case of source
+ * data shape natively.
+ *
+ * When the `decode` option is 'select' (or nil) or 'call' the
+ * merger expects a usual net.box's select / call result in each
+ * of source buffers.
+ *
+ * When the `decode` option is 'chain' or 'raw' the merger expects
+ * an array of results instead of just result. Pass 'chain' for
+ * the first `:select()` (or `:pairs()`) call and 'raw' for the
+ * following ones. It is possible (but not mandatory) to use
+ * different mergers for each result, just reuse the same buffers
+ * for consequent calls.
+ *
+ * ```
+ * -- Storage script
+ * -- --------------
+ *
+ * -- Return N results in a table.
+ * -- Each result is table of tuples.
+ * local function batch_select(<...>)
+ *     local res = {}
+ *     for i = 1, N do
+ *         local tuples = box.space.<...>:select(<...>)
+ *         table.insert(res, tuples)
+ *     end
+ *     return res
+ * end
+ *
+ * -- Expose to call it using net.box.
+ * _G.batch_select = batch_select
+ *
+ * -- Client script
+ * -- -------------
+ *
+ * local net_box = require('net.box')
+ * local buffer = require('buffer')
+ * local merger = require('merger')
+ *
+ * -- Prepare M sources.
+ * local connects = <...>
+ * local sources = {}
+ * for _, conn in ipairs(connects) do
+ *     local buf = buffer.ibuf()
+ *     conn:call('batch_select', <...>, {buffer = buf})
+ *     table.insert(sources, buf)
+ * end
+ *
+ * -- Now we have M sources and each have N results. We want to
+ * -- merge all 1st results, all 2nd results, ..., all Nth
+ * -- results.
+ *
+ * local merger_inst = merger.new(<...>)
+ *
+ * local res = {}
+ * for i = 1, N do
+ *     -- We use the same merger instance for each merge, but it
+ *     -- is possible to use different ones.
+ *     local tuples = merger_inst:select(sources, {
+ *         decode = i == 1 and 'chain' or 'raw',
+ *     })
+ *     table.insert(res, tuples)
+ * end
+ * ```
+ *
+ * When `buffer` option is passed it is possible to write results
+ * of several consequent merges into that buffer in the format
+ * another merger can accept (see cascading mergers below for the
+ * idea). Set `encode` to 'chain' to encode the first result
+ * and to 'raw' to encode consequent results. It is necessary to
+ * also set `encode_chain_len`, because size of resulting array
+ * is not known to a merger when it writes the first result.
+ *
+ * Constraints are:
+ *
+ * - `decode` option influences only on buffer sources
+ *   interpretation (ignored for sources of other types).
+ * - `encode_*` options are applicable only when `buffer`
+ *   options is provided.
+ *
+ * Cascading mergers
+ * -----------------
+ *
+ * The idea is simple: the merger output formats are the same as
+ * source formats, so it is possible to merge results of previous
+ * merges.
+ *
+ * The example below is synthetic to be simple. Real cases when
+ * cascading can be profitable likely involve additional layers
+ * of Tarantool instances between storages and clients or separate
+ * threads to merge blocks of each level.
+ *
+ * To be honest no one use this ability for now. It exists,
+ * because the same input and output formats looks as good
+ * property of the API.
+ *
+ * ```
+ * <...requires...>
+ *
+ * local sources = <...100 buffers...>
+ * local merger_inst = merger.new(<...>)
+ *
+ * -- We use buffer sources at 1st and 2nd merge layers, but read
+ * -- the final result as the table.
+ *
+ * local sources_level_2 = {}
+ * for i = 1, 10 do
+ *     -- Take next 10 first level sources.
+ *     local sources_level_1 = {}
+ *     for j = 1, 10 do
+ *         sources_level_1[j] = sources[(i - 1) * 10 + j]
+ *     end
+ *
+ *     -- Merge 10 sources into a second level source.
+ *     local result_level_1 = buffer.ibuf()
+ *     merger_inst:select(sources_level_1, {buffer = result_level_1})
+ *     sources_level_2[i] = result_level_1
+ * end
+ *
+ * local res = merger_inst:select(sources_level_2)
+ * ```
+ */
+
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+
+#include <lua.h>
+#include <lauxlib.h>
+
+#include "lua/error.h"
+#include "lua/utils.h"
+#include "small/ibuf.h"
+#include "msgpuck.h"
+#include "mpstream.h"
+#include "lua/msgpack.h"
+
+#define HEAP_FORWARD_DECLARATION
+#include "salad/heap.h"
+
+#include "box/iproto_constants.h" /* IPROTO_DATA */
+#include "box/field_def.h"
+#include "box/key_def.h"
+#include "box/schema_def.h"
+#include "box/tuple.h"
+#include "box/lua/tuple.h"
+#include "box/box.h"
+#include "box/index.h"
+#include "box/coll_id_cache.h"
+#include "lua/lua_iterator.h"
+#include "diag.h"
+
+#ifndef NDEBUG
+#include "say.h"
+/**
+ * Heap insert/delete/update macros wrapped with debug prints.
+ */
+#define MERGER_HEAP_INSERT(heap, hnode, source) do {			\
+	say_debug("merger: [source %p] insert: tuple: %s", (source),	\
+		  tuple_str((source)->tuple));				\
+	merger_heap_insert((heap), (hnode));				\
+} while(0)
+#define MERGER_HEAP_DELETE(heap, hnode, source) do {		\
+	say_debug("merger: [source %p] delete", (source));	\
+	merger_heap_delete((heap), (hnode));			\
+} while(0)
+#define MERGER_HEAP_UPDATE(heap, hnode, source) do {			\
+	say_debug("merger: [source %p] update: tuple: %s", (source),	\
+		  tuple_str((source)->tuple));				\
+	merger_heap_update((heap), (hnode));				\
+} while(0)
+#else /* !defined(NDEBUG) */
+/**
+ * Heap insert/delete/update macros wrappers w/o debug prints.
+ */
+#define MERGER_HEAP_INSERT(heap, hnode, source) do {	\
+	merger_heap_insert((heap), (hnode));		\
+} while(0)
+#define MERGER_HEAP_DELETE(heap, hnode, source) do {	\
+	merger_heap_delete((heap), (hnode));		\
+} while(0)
+#define MERGER_HEAP_UPDATE(heap, hnode, source) do {	\
+	merger_heap_update((heap), (hnode));		\
+} while(0)
+#endif /* !defined(NDEBUG) */
+
+/**
+ * Helper macros to push / throw out of memory errors to Lua.
+ */
+#define push_out_of_memory_error(L, size, what_name) do {	\
+	diag_set(OutOfMemory, (size), "malloc", (what_name));	\
+	luaT_pusherror(L, diag_last_error(diag_get()));		\
+} while(0)
+#define throw_out_of_memory_error(L, size, what_name) do {	\
+	diag_set(OutOfMemory, (size), "malloc", (what_name));	\
+	luaT_error(L);						\
+	unreachable();						\
+	return -1;						\
+} while(0)
+
+#define BOX_COLLATION_NAME_INDEX 1
+
+/**
+ * A type of data structure that holds source data.
+ */
+enum merger_source_type {
+	SOURCE_TYPE_BUFFER,
+	SOURCE_TYPE_TABLE,
+	SOURCE_TYPE_ITERATOR,
+	SOURCE_TYPE_NONE,
+};
+
+/**
+ * How data are encoded in a buffer.
+ *
+ * `decode` and `encode` options are parsed to values of this
+ * enum.
+ */
+enum merger_buffer_type {
+	BUFFER_TYPE_RAW,
+	BUFFER_TYPE_SELECT,
+	BUFFER_TYPE_CALL,
+	BUFFER_TYPE_CHAIN,
+	BUFFER_TYPE_NONE,
+};
+
+/**
+ * Hold state of a merge source.
+ */
+struct merger_source {
+	/*
+	 * A source is the heap node. Compared by the next tuple.
+	 */
+	struct heap_node hnode;
+	/* Union determinant. */
+	enum merger_source_type type;
+	/* Fields specific for certaint source types. */
+	union {
+		/* Buffer source. */
+		struct {
+			struct ibuf *buf;
+			/*
+			 * A merger stops before end of a buffer
+			 * when it is not the last merger in the
+			 * chain.
+			 */
+			size_t remaining_tuples_cnt;
+		} buf;
+		/* Table source. */
+		struct {
+			int ref;
+			int next_idx;
+		} tbl;
+		/* Iterator source. */
+		struct {
+			struct lua_iterator *it;
+		} it;
+	};
+	/* Next tuple. */
+	struct tuple *tuple;
+};
+
+/**
+ * Holds immutable parameters of a merger.
+ */
+struct merger {
+	struct key_def *key_def;
+	box_tuple_format_t *format;
+};
+
+/**
+ * Holds parameters of merge process, sources, result storage
+ * (if any), heap of sources and utility flags / counters.
+ */
+struct merger_iterator {
+	/* Heap of sources. */
+	heap_t heap;
+	/*
+	 * key_def is copied from merger.
+	 *
+	 * A merger can be collected by LuaJIT GC independently
+	 * from a merger_iterator, so we cannot just save pointer
+	 * to merger here and so we copy key_def from merger.
+	 */
+	struct key_def *key_def;
+	/* Parsed sources and decoding parameters. */
+	uint32_t sources_count;
+	struct merger_source **sources;
+	enum merger_buffer_type decode;
+	/* Ascending / descending order. */
+	int order;
+	/* Optional output buffer and encoding parameters. */
+	struct ibuf *obuf;
+	enum merger_buffer_type encode;
+	uint32_t encode_chain_len;
+};
+
+static uint32_t merger_type_id = 0;
+static uint32_t merger_iterator_type_id = 0;
+static uint32_t ibuf_type_id = 0;
+
+/* Forward declarations. */
+static bool
+source_less(const heap_t *heap, const struct heap_node *a,
+	    const struct heap_node *b);
+static int
+lbox_merger_gc(struct lua_State *L);
+static void
+merger_iterator_delete(struct lua_State *L, struct merger_iterator *it);
+static int
+lbox_merger_iterator_gc(struct lua_State *L);
+
+#define HEAP_NAME merger_heap
+#define HEAP_LESS source_less
+#include "salad/heap.h"
+
+/**
+ * Create the new tuple with specific format from a Lua table or a
+ * tuple.
+ *
+ * In case of an error push the error message to the Lua stack and
+ * return NULL.
+ */
+static struct tuple *
+luaT_gettuple_with_format(struct lua_State *L, int idx,
+			  box_tuple_format_t *format)
+{
+	struct tuple *tuple;
+	if (lua_istable(L, idx)) {
+		/* Based on lbox_tuple_new() code. */
+		struct ibuf *buf = tarantool_lua_ibuf;
+		ibuf_reset(buf);
+		struct mpstream stream;
+		mpstream_init(&stream, buf, ibuf_reserve_cb, ibuf_alloc_cb,
+		      luamp_error, L);
+		luamp_encode_tuple(L, luaL_msgpack_default, &stream, idx);
+		mpstream_flush(&stream);
+		tuple = box_tuple_new(format, buf->buf,
+				      buf->buf + ibuf_used(buf));
+		if (tuple == NULL) {
+			luaT_pusherror(L, diag_last_error(diag_get()));
+			return NULL;
+		}
+		ibuf_reinit(tarantool_lua_ibuf);
+		return tuple;
+	}
+	tuple = luaT_istuple(L, idx);
+	if (tuple == NULL) {
+		lua_pushfstring(L, "A tuple or a table expected, got %s",
+				lua_typename(L, lua_type(L, -1)));
+		return NULL;
+	}
+	/*
+	 * Create the new tuple with the format necessary for fast
+	 * comparisons.
+	 */
+	const char *tuple_beg = tuple_data(tuple);
+	const char *tuple_end = tuple_beg + tuple->bsize;
+	tuple = box_tuple_new(format, tuple_beg, tuple_end);
+	if (tuple == NULL) {
+		luaT_pusherror(L, diag_last_error(diag_get()));
+		return NULL;
+	}
+	return tuple;
+}
+
+/**
+ * Data comparing function to construct heap of sources.
+ */
+static bool
+source_less(const heap_t *heap, const struct heap_node *a,
+	    const struct heap_node *b)
+{
+	struct merger_source *left = container_of(a, struct merger_source,
+						  hnode);
+	struct merger_source *right = container_of(b, struct merger_source,
+						   hnode);
+	if (left->tuple == NULL && right->tuple == NULL)
+		return false;
+	if (left->tuple == NULL)
+		return false;
+	if (right->tuple == NULL)
+		return true;
+	struct merger_iterator *it = container_of(heap, struct merger_iterator,
+						  heap);
+	return it->order * box_tuple_compare(left->tuple, right->tuple,
+					     it->key_def) < 0;
+}
+
+/**
+ * Update source->tuple of specific source.
+ *
+ * Increases the reference counter of the tuple.
+ *
+ * Return 0 when successfully fetched a tuple or NULL. In case of
+ * an error push an error message to the Lua stack and return 1.
+ */
+static int
+source_fetch(struct lua_State *L, struct merger_source *source,
+	     box_tuple_format_t *format)
+{
+	source->tuple = NULL;
+
+	switch (source->type) {
+	case SOURCE_TYPE_BUFFER: {
+		if (source->buf.remaining_tuples_cnt == 0)
+			return 0;
+		--source->buf.remaining_tuples_cnt;
+		if (ibuf_used(source->buf.buf) == 0) {
+			lua_pushstring(L, "Unexpected msgpack buffer end");
+			return 1;
+		}
+		const char *tuple_beg = source->buf.buf->rpos;
+		const char *tuple_end = tuple_beg;
+		/*
+		 * mp_next() is faster then mp_check(), but can
+		 * read bytes outside of the buffer and so can
+		 * cause segmentation faults or incorrect result.
+		 *
+		 * We check buffer boundaries after the mp_next()
+		 * call and throw an error when the boundaries are
+		 * violated, but it does not save us from possible
+		 * segmentation faults.
+		 *
+		 * It is in a user responsibility to provide valid
+		 * msgpack.
+		 */
+		mp_next(&tuple_end);
+		if (tuple_end > source->buf.buf->wpos) {
+			lua_pushstring(L, "Unexpected msgpack buffer end");
+			return 1;
+		}
+		source->buf.buf->rpos = (char *) tuple_end;
+		source->tuple = box_tuple_new(format, tuple_beg, tuple_end);
+		if (source->tuple == NULL) {
+			luaT_pusherror(L, diag_last_error(diag_get()));
+			return 1;
+		}
+		break;
+	}
+	case SOURCE_TYPE_TABLE: {
+		lua_rawgeti(L, LUA_REGISTRYINDEX, source->tbl.ref);
+		lua_pushinteger(L, source->tbl.next_idx);
+		lua_gettable(L, -2);
+		if (lua_isnil(L, -1)) {
+			lua_pop(L, 2);
+			return 0;
+		}
+		source->tuple = luaT_gettuple_with_format(L, -1, format);
+		if (source->tuple == NULL)
+			return 1;
+		++source->tbl.next_idx;
+		lua_pop(L, 2);
+		break;
+	}
+	case SOURCE_TYPE_ITERATOR: {
+		int nresult = lua_iterator_next(L, source->it.it);
+		if (nresult == 0)
+			return 0;
+		source->tuple = luaT_gettuple_with_format(L, -nresult + 1,
+							  format);
+		if (source->tuple == NULL)
+			return 1;
+		lua_pop(L, nresult);
+		break;
+	}
+	case SOURCE_TYPE_NONE:
+	default:
+		unreachable();
+	}
+	box_tuple_ref(source->tuple);
+	return 0;
+}
+
+/**
+ * Extract a merger object from the Lua stack.
+ */
+static struct merger *
+check_merger(struct lua_State *L, int idx)
+{
+	uint32_t cdata_type;
+	struct merger **merger_ptr = luaL_checkcdata(L, idx, &cdata_type);
+	if (merger_ptr == NULL || cdata_type != merger_type_id)
+		return NULL;
+	return *merger_ptr;
+}
+
+/**
+ * Extract a merger_iterator object from the Lua stack.
+ */
+static struct merger_iterator *
+check_merger_iterator(struct lua_State *L, int idx)
+{
+	uint32_t cdata_type;
+	struct merger_iterator **it_ptr = luaL_checkcdata(L, idx, &cdata_type);
+	if (it_ptr == NULL || cdata_type != merger_iterator_type_id)
+		return NULL;
+	return *it_ptr;
+}
+
+/**
+ * Extract an ibuf object from the Lua stack.
+ */
+static struct ibuf *
+check_ibuf(struct lua_State *L, int idx)
+{
+	if (lua_type(L, idx) != LUA_TCDATA)
+		return NULL;
+
+	uint32_t cdata_type;
+	struct ibuf *ibuf_ptr = luaL_checkcdata(L, idx, &cdata_type);
+	if (ibuf_ptr == NULL || cdata_type != ibuf_type_id)
+		return NULL;
+	return ibuf_ptr;
+}
+
+#define RPOS_P(buf) ((const char **) &(buf)->rpos)
+
+/**
+ * Skip (and check) the wrapper around tuples array (and the array
+ * itself).
+ *
+ * Expected different kind of wrapping depending of it->decode.
+ */
+static int
+decode_header(struct merger_iterator *it, struct ibuf *buf, size_t *len_p)
+{
+	int ok = 1;
+	/* Decode {[IPROTO_DATA] = ...} header. */
+	if (it->decode != BUFFER_TYPE_RAW)
+		ok = mp_typeof(*buf->rpos) == MP_MAP &&
+			mp_decode_map(RPOS_P(buf)) == 1 &&
+			mp_typeof(*buf->rpos) == MP_UINT &&
+			mp_decode_uint(RPOS_P(buf)) == IPROTO_DATA;
+	/* Decode the array around call return values. */
+	if (ok && (it->decode == BUFFER_TYPE_CALL ||
+	    it->decode == BUFFER_TYPE_CHAIN))
+		ok = mp_typeof(*buf->rpos) == MP_ARRAY &&
+			mp_decode_array(RPOS_P(buf)) > 0;
+	/* Decode the array around chained input. */
+	if (ok && it->decode == BUFFER_TYPE_CHAIN)
+		ok = mp_typeof(*buf->rpos) == MP_ARRAY &&
+			mp_decode_array(RPOS_P(buf)) > 0;
+	/* Decode the array around tuples to merge. */
+	if (ok)
+		ok = mp_typeof(*buf->rpos) == MP_ARRAY;
+	if (ok)
+		*len_p = mp_decode_array(RPOS_P(buf));
+	return ok;
+}
+
+#undef RPOS_P
+
+/**
+ * Encode the wrapper around tuples array (and the array itself).
+ *
+ * The written msgpack depends on it->encode.
+ */
+static void
+encode_header(struct merger_iterator *it, uint32_t result_len)
+{
+	struct ibuf *obuf = it->obuf;
+
+	/* Encode {[IPROTO_DATA] = ...} header. */
+	if (it->encode != BUFFER_TYPE_RAW) {
+		ibuf_reserve(obuf, mp_sizeof_map(1) +
+			     mp_sizeof_uint(IPROTO_DATA));
+		obuf->wpos = mp_encode_map(obuf->wpos, 1);
+		obuf->wpos = mp_encode_uint(obuf->wpos, IPROTO_DATA);
+	}
+	/* Encode the array around call return values. */
+	if (it->encode == BUFFER_TYPE_CALL || it->encode == BUFFER_TYPE_CHAIN) {
+		ibuf_reserve(obuf, mp_sizeof_array(1));
+		obuf->wpos = mp_encode_array(obuf->wpos, 1);
+	}
+	/* Encode the array around chained output. */
+	if (it->encode == BUFFER_TYPE_CHAIN) {
+		ibuf_reserve(obuf, mp_sizeof_array(it->encode_chain_len));
+		obuf->wpos = mp_encode_array(obuf->wpos, it->encode_chain_len);
+	}
+	/* Encode the array around resulting tuples. */
+	ibuf_reserve(obuf, mp_sizeof_array(result_len));
+	obuf->wpos = mp_encode_array(obuf->wpos, result_len);
+}
+
+/**
+ * Push 'bad params' / 'bad param X' and the usage info to the Lua
+ * stack.
+ */
+static int
+merger_usage(struct lua_State *L, const char *param_name)
+{
+	static const char *usage = "merger_inst:{ipairs,pairs,select}("
+				   "{source, source, ...}[, {"
+				   "descending = <boolean> or <nil>, "
+				   "decode = 'raw' / 'select' / 'call' / "
+				   "'chain' / <nil>, "
+				   "buffer = <cdata<struct ibuf>> or <nil>, "
+				   "encode = 'raw' / 'select' / 'call' / "
+				   "'chain' / <nil>, "
+				   "encode_chain_len = <number> or <nil>}])";
+	if (param_name == NULL)
+		lua_pushfstring(L, "Bad params, use: %s", usage);
+	else
+		lua_pushfstring(L, "Bad param \"%s\", use: %s", param_name,
+				usage);
+	return 1;
+}
+
+/**
+ * Get a tuple from a top source, update the source, update the
+ * heap.
+ *
+ * The reference counter of the tuple is increased (in
+ * source_fetch).
+ *
+ * Return NULL when all sources are drained.
+ */
+static struct tuple *
+merger_next(struct lua_State *L, struct merger *merger,
+	    struct merger_iterator *it)
+{
+	struct heap_node *hnode = merger_heap_top(&it->heap);
+	if (hnode == NULL)
+		return NULL;
+
+	struct merger_source *source = container_of(hnode, struct merger_source,
+						    hnode);
+	struct tuple *tuple = source->tuple;
+	assert(tuple != NULL);
+	if (source_fetch(L, source, merger->format) != 0) {
+		lua_error(L);
+		unreachable();
+		return NULL;
+	}
+	if (source->tuple == NULL)
+		MERGER_HEAP_DELETE(&it->heap, hnode, source);
+	else
+		MERGER_HEAP_UPDATE(&it->heap, hnode, source);
+
+	return tuple;
+}
+
+/**
+ * Determine type of a merger source on the Lua stack.
+ *
+ * Set *buf_p to buffer when the source is valid source of buffer
+ * type and buf_p is not NULL.
+ */
+static enum merger_source_type
+parse_source_type(lua_State *L, int idx, struct ibuf **buf_p)
+{
+	if (lua_type(L, idx) == LUA_TCDATA) {
+		struct ibuf *buf = check_ibuf(L, idx);
+		if (buf == NULL)
+			return SOURCE_TYPE_NONE;
+		if (buf_p != NULL)
+			*buf_p = buf;
+		return SOURCE_TYPE_BUFFER;
+	} else if (lua_istable(L, idx)) {
+		lua_rawgeti(L, idx, 1);
+		int iscallable = luaT_iscallable(L, idx);
+		lua_pop(L, 1);
+		if (iscallable)
+			return SOURCE_TYPE_ITERATOR;
+		return SOURCE_TYPE_TABLE;
+	}
+
+	return SOURCE_TYPE_NONE;
+}
+
+/**
+ * Parse 'decode' / 'encode' options.
+ */
+static enum merger_buffer_type
+parse_buffer_type(lua_State *L, int idx)
+{
+	if (lua_isnoneornil(L, idx))
+		return BUFFER_TYPE_SELECT;
+
+	if (lua_type(L, idx) != LUA_TSTRING)
+		return BUFFER_TYPE_NONE;
+
+	size_t len;
+	const char *param = lua_tolstring(L, idx, &len);
+
+	if (!strncmp(param, "raw", len))
+		return BUFFER_TYPE_RAW;
+	else if (!strncmp(param, "select", len))
+		return BUFFER_TYPE_SELECT;
+	else if (!strncmp(param, "call", len))
+		return BUFFER_TYPE_CALL;
+	else if (!strncmp(param, "chain", len))
+		return BUFFER_TYPE_CHAIN;
+
+	return BUFFER_TYPE_NONE;
+}
+
+/**
+ * Parse optional third parameter of merger_inst:pairs() and
+ * merger_inst:select() into the merger_iterator structure.
+ *
+ * Returns 0 on success. In case of an error it pushes an error
+ * message to the Lua stack and returns 1.
+ */
+static int
+parse_opts(struct lua_State *L, int idx, struct merger_iterator *it)
+{
+	/* No opts: use defaults. */
+	if (lua_isnoneornil(L, idx))
+		return 0;
+
+	/* Not a table: error. */
+	if (!lua_istable(L, idx))
+		return merger_usage(L, NULL);
+
+	/* Parse descending to it->order. */
+	lua_pushstring(L, "descending");
+	lua_gettable(L, idx);
+	if (!lua_isnil(L, -1)) {
+		if (lua_isboolean(L, -1))
+			it->order = lua_toboolean(L, -1) ? -1 : 1;
+		else
+			return merger_usage(L, "descending");
+	}
+	lua_pop(L, 1);
+
+	/* Parse decode to it->decode. */
+	lua_pushstring(L, "decode");
+	lua_gettable(L, idx);
+	if (!lua_isnil(L, -1)) {
+		it->decode = parse_buffer_type(L, -1);
+		if (it->decode == BUFFER_TYPE_NONE)
+			return merger_usage(L, "decode");
+	}
+	lua_pop(L, 1);
+
+	/* Parse buffer. */
+	lua_pushstring(L, "buffer");
+	lua_gettable(L, idx);
+	if (!lua_isnil(L, -1)) {
+		if ((it->obuf = check_ibuf(L, -1)) == NULL)
+			return merger_usage(L, "buffer");
+	}
+	lua_pop(L, 1);
+
+	/* Parse encode to it->encode. */
+	lua_pushstring(L, "encode");
+	lua_gettable(L, idx);
+	if (!lua_isnil(L, -1)) {
+		if (it->obuf == NULL) {
+			lua_pushfstring(L, "\"buffer\" option is mandatory "
+					   "when \"encode\" is used");
+			return 1;
+		}
+		it->encode = parse_buffer_type(L, -1);
+		if (it->encode == BUFFER_TYPE_NONE)
+			return merger_usage(L, "encode");
+	}
+	lua_pop(L, 1);
+
+	/* Parse encode_chain_len. */
+	lua_pushstring(L, "encode_chain_len");
+	lua_gettable(L, idx);
+	if (!lua_isnil(L, -1)) {
+		if (it->encode != BUFFER_TYPE_CHAIN) {
+			lua_pushfstring(L, "\"encode_chain_len\" is "
+					   "forbidden without "
+					   "{encode = 'chain'}");
+			return 1;
+		}
+		if (lua_isnumber(L, -1))
+			it->encode_chain_len =
+				(uint32_t) lua_tointeger(L, -1);
+		else
+			return merger_usage(L, "encode_chain_len");
+	}
+	lua_pop(L, 1);
+
+	/* Verify output_chain_len is provided when we
+	 * going to use it for output buffer header
+	 * encoding. */
+	if (it->obuf != NULL && it->encode == BUFFER_TYPE_CHAIN &&
+	    it->encode_chain_len == 0) {
+		lua_pushfstring(L, "\"encode_chain_len\" is mandatory when "
+				   "\"buffer\" and {encode = 'chain'} are "
+				   "used");
+		return 1;
+	}
+
+	return 0;
+}
+
+/**
+ * Parse sources table: second parameter pf merger_isnt:pairs()
+ * and merger_inst:select() into the merger_iterator structure.
+ *
+ * Note: This function should be called when options are already
+ * parsed (using parse_opts()).
+ *
+ * Returns 0 on success. In case of an error it pushes an error
+ * message to the Lua stack and returns 1.
+ */
+static int
+parse_sources(struct lua_State *L, int idx, struct merger *merger,
+	      struct merger_iterator *it)
+{
+	/* Allocate sources array. */
+	uint32_t capacity = 8;
+	const ssize_t sources_size = capacity * sizeof(struct merger_source *);
+	it->sources = (struct merger_source **) malloc(sources_size);
+	if (it->sources == NULL) {
+		push_out_of_memory_error(L, sources_size, "it->sources");
+		return 1;
+	}
+
+	/* Fetch all sources. */
+	while (true) {
+		lua_pushinteger(L, it->sources_count + 1);
+		lua_gettable(L, idx);
+		if (lua_isnil(L, -1))
+			break;
+
+		/* Shrink sources array if needed. */
+		if (it->sources_count == capacity) {
+			capacity *= 2;
+			struct merger_source **new_sources;
+			const ssize_t new_sources_size =
+				capacity * sizeof(struct merger_source *);
+			new_sources = (struct merger_source **) realloc(
+				it->sources, new_sources_size);
+			if (new_sources == NULL) {
+				push_out_of_memory_error(L, new_sources_size,
+							 "new_sources");
+				return 1;
+			}
+			it->sources = new_sources;
+		}
+
+		/* Allocate the new source. */
+		it->sources[it->sources_count] = (struct merger_source *)
+			malloc(sizeof(struct merger_source));
+		struct merger_source *current_source =
+			it->sources[it->sources_count];
+		if (current_source == NULL) {
+			push_out_of_memory_error(L,
+						 sizeof(struct merger_source),
+						 "merger_source");
+			return 1;
+		}
+
+		/*
+		 * Set type and tuple to correctly proceed in
+		 * merger_iterator_delete() in case of any further
+		 * error.
+		 */
+		struct ibuf *buf = NULL;
+		current_source->type = parse_source_type(L, -1, &buf);
+		current_source->tuple = NULL;
+
+		/*
+		 * Note: We need to increment sources count right
+		 * after successful malloc() of the new source
+		 * (before any further error check), because
+		 * merger_iterator_delete() frees that amount of
+		 * sources.
+		 */
+		++it->sources_count;
+
+		/* Initialize the new source. */
+		switch (current_source->type) {
+		case SOURCE_TYPE_BUFFER:
+			if (!decode_header(it, buf,
+			    &current_source->buf.remaining_tuples_cnt)) {
+				lua_pushstring(L, "Invalid merge source");
+				return 1;
+			}
+			current_source->buf.buf = buf;
+			break;
+		case SOURCE_TYPE_TABLE:
+			/* Save a table ref and a next index. */
+			lua_pushvalue(L, -1); /* Popped by luaL_ref(). */
+			int tbl_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+			current_source->tbl.ref = tbl_ref;
+			current_source->tbl.next_idx = 1;
+			break;
+		case SOURCE_TYPE_ITERATOR:
+			/* Wrap and save iterator. */
+			current_source->it.it =
+				lua_iterator_new_fromtable(L, -1);
+			break;
+		case SOURCE_TYPE_NONE:
+			lua_pushfstring(L, "Unknown source type at index %d",
+					it->sources_count);
+			return 1;
+		default:
+			unreachable();
+			return 1;
+		}
+		if (source_fetch(L, current_source, merger->format) != 0)
+			return 1;
+		if (current_source->tuple != NULL)
+			MERGER_HEAP_INSERT(&it->heap,
+					   &current_source->hnode,
+					   current_source);
+	}
+	lua_pop(L, it->sources_count + 1);
+
+	return 0;
+}
+
+/**
+ * Parse sources and options on Lua stack and create the new
+ * merger_interator instance.
+ */
+static struct merger_iterator *
+merger_iterator_new(struct lua_State *L)
+{
+	struct merger *merger;
+	int ok = (lua_gettop(L) == 2 || lua_gettop(L) == 3) &&
+		/* Merger. */
+		(merger = check_merger(L, 1)) != NULL &&
+		/* Sources. */
+		lua_istable(L, 2) == 1 &&
+		/* Opts. */
+		(lua_isnoneornil(L, 3) == 1 || lua_istable(L, 3) == 1);
+	if (!ok) {
+		merger_usage(L, NULL);
+		lua_error(L);
+		unreachable();
+		return NULL;
+	}
+
+	struct merger_iterator *it = (struct merger_iterator *)
+		malloc(sizeof(struct merger_iterator));
+	merger_heap_create(&it->heap);
+	it->key_def = key_def_dup(merger->key_def);
+	it->sources_count = 0;
+	it->sources = NULL;
+	it->decode = BUFFER_TYPE_NONE;
+	it->order = 1;
+	it->obuf = NULL;
+	it->encode = BUFFER_TYPE_NONE;
+	it->encode_chain_len = 0;
+
+	if (parse_opts(L, 3, it) != 0 || parse_sources(L, 2, merger, it) != 0) {
+		merger_iterator_delete(L, it);
+		lua_error(L);
+		unreachable();
+		return NULL;
+	}
+
+	return it;
+}
+
+/**
+ * Iterator gen function to traverse merger results.
+ *
+ * Expected a merger instance as the first parameter (state) and a
+ * merger_iterator as the second parameter (param) on the Lua
+ * stack.
+ *
+ * Push the merger_iterator (the new param) and the next tuple.
+ */
+static int
+lbox_merger_iterator_gen(struct lua_State *L)
+{
+	struct merger *merger;
+	struct merger_iterator *it;
+	bool ok = (merger = check_merger(L, -2)) != NULL &&
+		(it = check_merger_iterator(L, -1)) != NULL;
+	if (!ok)
+		return luaL_error(L, "Bad params, use: "
+				     "lbox_merger_iterator_gen(merger, "
+				     "merger_iterator)");
+
+	struct tuple *tuple = merger_next(L, merger, it);
+	if (tuple == NULL) {
+		lua_pushnil(L);
+		lua_pushnil(L);
+		return 2;
+	}
+
+	/* Push merger_iterator, tuple. */
+	*(struct merger_iterator **)
+		luaL_pushcdata(L, merger_iterator_type_id) = it;
+	luaT_pushtuple(L, tuple);
+
+	box_tuple_unref(tuple);
+	return 2;
+}
+
+/**
+ * Iterate over merge results from Lua.
+ *
+ * Push three values to the Lua stack:
+ *
+ * 1. gen (lbox_merger_iterator_gen wrapped by fun.wrap());
+ * 2. param (merger);
+ * 3. state (merger_iterator).
+ */
+static int
+lbox_merger_ipairs(struct lua_State *L)
+{
+	/* Create merger_iterator. */
+	struct merger_iterator *it = merger_iterator_new(L);
+	lua_settop(L, 1); /* Pop sources, [opts]. */
+	/* Stack: merger. */
+
+	if (it->obuf != NULL)
+		return luaL_error(L, "\"buffer\" option is forbidden with "
+				  "merger_inst:pairs(<...>)");
+
+	luaL_loadstring(L, "return require('fun').wrap");
+	lua_call(L, 0, 1);
+	lua_insert(L, -2); /* Swap merger and wrap. */
+	/* Stack: wrap, merger. */
+
+	lua_pushcfunction(L, lbox_merger_iterator_gen);
+	lua_insert(L, -2); /* Swap merger and gen. */
+	/* Stack: wrap, gen, merger. */
+
+	*(struct merger_iterator **)
+		luaL_pushcdata(L, merger_iterator_type_id) = it;
+	lua_pushcfunction(L, lbox_merger_iterator_gc);
+	luaL_setcdatagc(L, -2);
+	/* Stack: wrap, gen, merger, merger_iterator. */
+
+	/* Call fun.wrap(gen, merger, merger_iterator). */
+	lua_call(L, 3, 3);
+	return 3;
+}
+
+/**
+ * Write merge results into ibuf.
+ */
+static void
+encode_result_buffer(struct lua_State *L, struct merger *merger,
+		     struct merger_iterator *it)
+{
+	struct ibuf *obuf = it->obuf;
+	uint32_t result_len = 0;
+	uint32_t result_len_offset = 4;
+
+	/*
+	 * Reserve maximum size for the array around resulting
+	 * tuples to set it later.
+	 */
+	encode_header(it, UINT32_MAX);
+
+	/* Fetch, merge and copy tuples to the buffer. */
+	struct tuple *tuple;
+	while ((tuple = merger_next(L, merger, it)) != NULL) {
+		uint32_t bsize = tuple->bsize;
+		ibuf_reserve(obuf, bsize);
+		memcpy(obuf->wpos, tuple_data(tuple), bsize);
+		obuf->wpos += bsize;
+		result_len_offset += bsize;
+		box_tuple_unref(tuple);
+		++result_len;
+	}
+
+	/* Write the real array size. */
+	mp_store_u32(obuf->wpos - result_len_offset, result_len);
+}
+
+/**
+ * Write merge results into the new Lua table.
+ */
+static int
+create_result_table(struct lua_State *L, struct merger *merger,
+		    struct merger_iterator *it)
+{
+	/* Create result table. */
+	lua_newtable(L);
+
+	uint32_t cur = 1;
+
+	/* Fetch, merge and save tuples to the table. */
+	struct tuple *tuple;
+	while ((tuple = merger_next(L, merger, it)) != NULL) {
+		luaT_pushtuple(L, tuple);
+		lua_rawseti(L, -2, cur);
+		box_tuple_unref(tuple);
+		++cur;
+	}
+
+	return 1;
+}
+
+/**
+ * Perform the merge.
+ *
+ * Write results into a buffer or a Lua table depending on options.
+ *
+ * Expected merger instance, sources table and options (optional)
+ * on the Lua stack.
+ *
+ * Return the Lua table or nothing when the 'buffer' option is
+ * provided.
+ */
+static int
+lbox_merger_select(struct lua_State *L)
+{
+	struct merger *merger = check_merger(L, 1);
+	if (merger == NULL) {
+		merger_usage(L, NULL);
+		lua_error(L);
+	}
+
+	struct merger_iterator *it = merger_iterator_new(L);
+	lua_settop(L, 0); /* Pop merger, sources, [opts]. */
+
+	if (it->obuf == NULL) {
+		return create_result_table(L, merger, it);
+	} else {
+		encode_result_buffer(L, merger, it);
+		return 0;
+	}
+}
+
+/**
+ * Create the new merger instance.
+ *
+ * Expected a table of key parts on the Lua stack.
+ *
+ * Returns the new instance.
+ */
+static int
+lbox_merger_new(struct lua_State *L)
+{
+	if (lua_gettop(L) != 1 || lua_istable(L, 1) != 1)
+		return luaL_error(L, "Bad params, use: merger.new({"
+				  "{fieldno = fieldno, type = type"
+				  "[, is_nullable = is_nullable"
+				  "[, collation_id = collation_id"
+				  "[, collation = collation]]]}, ...}");
+	uint32_t key_parts_count = 0;
+	uint32_t capacity = 8;
+
+	const ssize_t parts_size = sizeof(struct key_part_def) * capacity;
+	struct key_part_def *parts = NULL;
+	parts = (struct key_part_def *) malloc(parts_size);
+	if (parts == NULL)
+		throw_out_of_memory_error(L, parts_size, "parts");
+
+	while (true) {
+		lua_pushinteger(L, key_parts_count + 1);
+		lua_gettable(L, 1);
+		if (lua_isnil(L, -1))
+			break;
+
+		/* Extend parts if necessary. */
+		if (key_parts_count == capacity) {
+			capacity *= 2;
+			struct key_part_def *old_parts = parts;
+			const ssize_t parts_size =
+				sizeof(struct key_part_def) * capacity;
+			parts = (struct key_part_def *) realloc(parts,
+								parts_size);
+			if (parts == NULL) {
+				free(old_parts);
+				throw_out_of_memory_error(L, parts_size,
+							  "parts");
+			}
+		}
+
+		/* Set parts[key_parts_count].fieldno. */
+		lua_pushstring(L, "fieldno");
+		lua_gettable(L, -2);
+		if (lua_isnil(L, -1)) {
+			free(parts);
+			return luaL_error(L, "fieldno must not be nil");
+		}
+		/*
+		 * Transform one-based Lua fieldno to zero-based
+		 * fieldno to use in key_def_new().
+		 */
+		parts[key_parts_count].fieldno = lua_tointeger(L, -1) - 1;
+		lua_pop(L, 1);
+
+		/* Set parts[key_parts_count].type. */
+		lua_pushstring(L, "type");
+		lua_gettable(L, -2);
+		if (lua_isnil(L, -1)) {
+			free(parts);
+			return luaL_error(L, "type must not be nil");
+		}
+		size_t type_len;
+		const char *type_name = lua_tolstring(L, -1, &type_len);
+		lua_pop(L, 1);
+		parts[key_parts_count].type = field_type_by_name(type_name,
+								 type_len);
+		if (parts[key_parts_count].type == field_type_MAX) {
+			free(parts);
+			return luaL_error(L, "Unknown field type: %s",
+					  type_name);
+		}
+
+		/* Set parts[key_parts_count].is_nullable. */
+		lua_pushstring(L, "is_nullable");
+		lua_gettable(L, -2);
+		if (lua_isnil(L, -1)) {
+			parts[key_parts_count].is_nullable = false;
+			parts[key_parts_count].nullable_action =
+				ON_CONFLICT_ACTION_DEFAULT;
+		} else {
+			parts[key_parts_count].is_nullable =
+				lua_toboolean(L, -1);
+			parts[key_parts_count].nullable_action =
+				ON_CONFLICT_ACTION_NONE;
+		}
+		lua_pop(L, 1);
+
+		/* Set parts[key_parts_count].coll_id using collation_id. */
+		lua_pushstring(L, "collation_id");
+		lua_gettable(L, -2);
+		if (lua_isnil(L, -1))
+			parts[key_parts_count].coll_id = COLL_NONE;
+		else
+			parts[key_parts_count].coll_id = lua_tointeger(L, -1);
+		lua_pop(L, 1);
+
+		/* Set parts[key_parts_count].coll_id using collation. */
+		lua_pushstring(L, "collation");
+		lua_gettable(L, -2);
+		/* Check whether box.cfg{} was called. */
+		if ((parts[key_parts_count].coll_id != COLL_NONE ||
+		    !lua_isnil(L, -1)) && !box_is_configured()) {
+			free(parts);
+			return luaL_error(L, "Cannot use collations: "
+					  "please call box.cfg{}");
+		}
+		if (!lua_isnil(L, -1)) {
+			if (parts[key_parts_count].coll_id != COLL_NONE) {
+				free(parts);
+				return luaL_error(
+					L, "Conflicting options: collation_id "
+					"and collation");
+			}
+			size_t coll_name_len;
+			const char *coll_name = lua_tolstring(L, -1,
+							      &coll_name_len);
+			struct coll_id *coll_id = coll_by_name(coll_name,
+							       coll_name_len);
+			if (coll_id == NULL) {
+				free(parts);
+				return luaL_error(
+					L, "Unknown collation: \"%s\"",
+					coll_name);
+			}
+			parts[key_parts_count].coll_id = coll_id->id;
+		}
+		lua_pop(L, 1);
+
+		/* Check coll_id. */
+		struct coll_id *coll_id =
+			coll_by_id(parts[key_parts_count].coll_id);
+		if (parts[key_parts_count].coll_id != COLL_NONE &&
+		    coll_id == NULL) {
+			uint32_t collation_id = parts[key_parts_count].coll_id;
+			free(parts);
+			return luaL_error(L, "Unknown collation_id: %d",
+					  collation_id);
+		}
+
+		/* Set parts[key_parts_count].sort_order. */
+		parts[key_parts_count].sort_order = SORT_ORDER_ASC;
+
+		++key_parts_count;
+	}
+
+	struct merger *merger = calloc(1, sizeof(*merger));
+	if (merger == NULL) {
+		free(parts);
+		throw_out_of_memory_error(L, sizeof(*merger), "merger");
+	}
+	merger->key_def = key_def_new(parts, key_parts_count);
+	free(parts);
+	if (merger->key_def == NULL) {
+		return luaL_error(L, "Cannot create merger->key_def");
+	}
+
+	merger->format = box_tuple_format_new(&merger->key_def, 1);
+	if (merger->format == NULL) {
+		box_key_def_delete(merger->key_def);
+		free(merger);
+		return luaL_error(L, "Cannot create merger->format");
+	}
+
+	*(struct merger **) luaL_pushcdata(L, merger_type_id) = merger;
+
+	lua_pushcfunction(L, lbox_merger_gc);
+	luaL_setcdatagc(L, -2);
+
+	return 1;
+}
+
+/**
+ * Free the merger instance from a Lua code.
+ */
+static int
+lbox_merger_gc(struct lua_State *L)
+{
+	struct merger *merger;
+	if ((merger = check_merger(L, 1)) == NULL)
+		return 0;
+	box_key_def_delete(merger->key_def);
+	box_tuple_format_unref(merger->format);
+	free(merger);
+	return 0;
+}
+
+/**
+ * Free the merger iterator.
+ *
+ * We need to know Lua state here, because sources of table and
+ * iterator types are saved as references within the Lua state.
+ */
+static void
+merger_iterator_delete(struct lua_State *L, struct merger_iterator *it)
+{
+	merger_heap_destroy(&it->heap);
+	box_key_def_delete(it->key_def);
+
+	for (uint32_t i = 0; i < it->sources_count; ++i) {
+		assert(it->sources != NULL);
+		switch (it->sources[i]->type) {
+		case SOURCE_TYPE_BUFFER:
+			/* No-op. */
+			break;
+		case SOURCE_TYPE_TABLE:
+			luaL_unref(L, LUA_REGISTRYINDEX,
+				   it->sources[i]->tbl.ref);
+			break;
+		case SOURCE_TYPE_ITERATOR:
+			lua_iterator_free(L, it->sources[i]->it.it);
+			break;
+		case SOURCE_TYPE_NONE:
+			/*
+			 * We can reach this block when
+			 * parse_sources() find a bad source. Do
+			 * nothing, just free the memory.
+			 */
+			break;
+		default:
+			unreachable();
+		}
+		if (it->sources[i]->tuple != NULL)
+			box_tuple_unref(it->sources[i]->tuple);
+		free(it->sources[i]);
+	}
+
+	if (it->sources != NULL) {
+		assert(it->sources_count > 0);
+		free(it->sources);
+	}
+
+	free(it);
+}
+
+/**
+ * Free the merger iterator from a Lua code.
+ */
+static int
+lbox_merger_iterator_gc(struct lua_State *L)
+{
+	struct merger_iterator *it;
+	if ((it = check_merger_iterator(L, 1)) == NULL)
+		return 0;
+	merger_iterator_delete(L, it);
+	return 0;
+}
+
+/**
+ * Register the module.
+ */
+LUA_API int
+luaopen_merger(lua_State *L)
+{
+	luaL_cdef(L, "struct merger;");
+	luaL_cdef(L, "struct merger_iterator;");
+	luaL_cdef(L, "struct ibuf;");
+	merger_type_id = luaL_ctypeid(L, "struct merger&");
+	merger_iterator_type_id = luaL_ctypeid(L, "struct merger_iterator&");
+	ibuf_type_id = luaL_ctypeid(L, "struct ibuf");
+	lua_newtable(L);
+	static const struct luaL_Reg meta[] = {
+		{"new", lbox_merger_new},
+		{NULL, NULL}
+	};
+	luaL_register_module(L, "merger", meta);
+
+	/* Export C functions to Lua. */
+	lua_newtable(L); /* merger.internal */
+	lua_pushcfunction(L, lbox_merger_select);
+	lua_setfield(L, -2, "select");
+	lua_pushcfunction(L, lbox_merger_ipairs);
+	lua_setfield(L, -2, "ipairs");
+	lua_setfield(L, -2, "internal");
+
+	return 1;
+}
diff --git a/src/lua/merger.h b/src/lua/merger.h
new file mode 100644
index 000000000..6d7ca0957
--- /dev/null
+++ b/src/lua/merger.h
@@ -0,0 +1,39 @@
+#ifndef TARANTOOL_LUA_MERGER_H_INCLUDED
+#define TARANTOOL_LUA_MERGER_H_INCLUDED 1
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+struct lua_State;
+
+int
+luaopen_merger(struct lua_State *L);
+
+#endif /* TARANTOOL_LUA_MERGER_H_INCLUDED */
diff --git a/src/lua/merger.lua b/src/lua/merger.lua
new file mode 100644
index 000000000..173cf4154
--- /dev/null
+++ b/src/lua/merger.lua
@@ -0,0 +1,19 @@
+local ffi = require('ffi')
+local merger = require('merger')
+
+local merger_t = ffi.typeof('struct merger')
+
+local methods = {
+    ['select'] = merger.internal.select,
+    ['pairs']  = merger.internal.ipairs,
+    ['ipairs']  = merger.internal.ipairs,
+}
+
+ffi.metatype(merger_t, {
+    __index = function(self, key)
+        return methods[key]
+    end,
+    -- Lua 5.2 compatibility
+    __pairs = merger.internal.ipairs,
+    __ipairs = merger.internal.ipairs,
+})
diff --git a/test/app-tap/merger.test.lua b/test/app-tap/merger.test.lua
new file mode 100755
index 000000000..7ec6e072c
--- /dev/null
+++ b/test/app-tap/merger.test.lua
@@ -0,0 +1,693 @@
+#!/usr/bin/env tarantool
+
+local tap = require('tap')
+local buffer = require('buffer')
+local msgpackffi = require('msgpackffi')
+local digest = require('digest')
+local merger = require('merger')
+local crypto = require('crypto')
+local fiber = require('fiber')
+local utf8 = require('utf8')
+local ffi = require('ffi')
+
+local IPROTO_DATA = 48
+
+local function merger_usage(param)
+    local msg = 'merger_inst:{ipairs,pairs,select}(' ..
+        '{source, source, ...}[, {' ..
+        'descending = <boolean> or <nil>, ' ..
+        'decode = \'raw\' / \'select\' / \'call\' / \'chain\' / <nil>, ' ..
+        'buffer = <cdata<struct ibuf>> or <nil>, ' ..
+        'encode = \'raw\' / \'select\' / \'call\' / \'chain\' / <nil>, ' ..
+        'encode_chain_len = <number> or <nil>}])'
+    if not param then
+        return ('Bad params, use: %s'):format(msg)
+    else
+        return ('Bad param "%s", use: %s'):format(param, msg)
+    end
+end
+
+-- Get buffer with data encoded without last 'trunc' bytes.
+local function truncated_msgpack_buffer(data, trunc)
+    local data = msgpackffi.encode(data)
+    data = data:sub(1, data:len() - trunc)
+    local len = data:len()
+    local buf = buffer.ibuf()
+    -- Ensure we have enough buffer to write len + trunc bytes.
+    buf:reserve(len + trunc)
+    local p = buf:alloc(len)
+    -- Ensure len bytes follows with trunc zero bytes.
+    ffi.copy(p, data .. string.rep('\0', trunc), len + trunc)
+    return buf
+end
+
+local bad_merger_new_calls = {
+    -- Cases to call before box.cfg{}.
+    {
+        'Pass a field on an unknown type',
+        parts = {{
+            fieldno = 2,
+            type = 'unknown',
+        }},
+        exp_err = 'Unknown field type: unknown',
+    },
+    {
+        'Try to use collation_id before box.cfg{}',
+        parts = {{
+            fieldno = 1,
+            type = 'string',
+            collation_id = 2,
+        }},
+        exp_err = 'Cannot use collations: please call box.cfg{}',
+    },
+    {
+        'Try to use collation before box.cfg{}',
+        parts = {{
+            fieldno = 1,
+            type = 'string',
+            collation = 'unicode_ci',
+        }},
+        exp_err = 'Cannot use collations: please call box.cfg{}',
+    },
+    function()
+        -- For collations.
+        box.cfg{}
+    end,
+    -- Cases to call after box.cfg{}.
+    {
+        'Try to use both collation_id and collation',
+        parts = {{
+            fieldno = 1,
+            type = 'string',
+            collation_id = 2,
+            collation = 'unicode_ci',
+        }},
+        exp_err = 'Conflicting options: collation_id and collation',
+    },
+    {
+        'Unknown collation_id',
+        parts = {{
+            fieldno = 1,
+            type = 'string',
+            collation_id = 42,
+        }},
+        exp_err = 'Unknown collation_id: 42',
+    },
+    {
+        'Unknown collation name',
+        parts = {{
+            fieldno = 1,
+            type = 'string',
+            collation = 'unknown',
+        }},
+        exp_err = 'Unknown collation: "unknown"',
+    },
+}
+
+local bad_merger_methods_calls = {
+    {
+        'Bad opts',
+        sources = {},
+        opts = 1,
+        exp_err = merger_usage(nil),
+    },
+    {
+        'Bad opts.descending',
+        sources = {},
+        opts = {descending = 1},
+        exp_err = merger_usage('descending'),
+    },
+    {
+        'Bad opts.decode',
+        sources = {},
+        opts = {decode = 1},
+        exp_err = merger_usage('decode'),
+    },
+    {
+        'Bad source',
+        sources = {1},
+        opts = nil,
+        exp_err = 'Unknown source type at index 1',
+    },
+    {
+        'Bad cdata source',
+        sources = {ffi.new('char *')},
+        opts = nil,
+        exp_err = 'Unknown source type at index 1',
+    },
+    {
+        'Missed encode_chain_len',
+        sources = {},
+        opts = {buffer = buffer.ibuf(), encode = 'chain'},
+        exp_err = '"encode_chain_len" is mandatory when "buffer" and ' ..
+            '{encode = \'chain\'} are used',
+    },
+    {
+        'Wrong source of table type',
+        sources = {{1}},
+        opts = nil,
+        exp_err = 'A tuple or a table expected, got number',
+    },
+    {
+        'Use buffer with an iterator result',
+        sources = {},
+        opts = {buffer = buffer.ibuf()},
+        funcs = {'pairs', 'ipairs'},
+        exp_err = '"buffer" option is forbidden with merger_inst:pairs(<...>)',
+    },
+    {
+        'Bad decode type',
+        sources = {},
+        opts = {decode = 1},
+        exp_err = merger_usage('decode'),
+    },
+    {
+        'Bad decode string',
+        sources = {},
+        opts = {decode = 'bad value'},
+        exp_err = merger_usage('decode'),
+    },
+    {
+        'A table source ignores {decode = \'chain\'}',
+        sources = {{{''}}},
+        opts = {decode = 'chain'},
+        exp_err = nil,
+    },
+    {
+        'An iterator source ignores {decode = \'chain\'}',
+        sources = {{pairs({})}},
+        opts = {decode = 'chain'},
+        exp_err = nil,
+    },
+    {
+        'Bad encode type',
+        sources = {},
+        opts = {buffer = buffer.ibuf(), encode = 1},
+        funcs = {'select'},
+        exp_err = merger_usage('encode'),
+    },
+    {
+        'Bad encode string',
+        sources = {},
+        opts = {buffer = buffer.ibuf(), encode = 'bad value'},
+        funcs = {'select'},
+        exp_err = merger_usage('encode'),
+    },
+    {
+        -- Any encode value should lead to an error, but we check
+        -- only 'select' here.
+        'Use "encode" without "buffer"',
+        sources = {},
+        opts = {encode = 'select'},
+        exp_err = '"buffer" option is mandatory when "encode" is used',
+    },
+    {
+        'Use "encode_chain_len" without "encode"',
+        sources = {},
+        opts = {buffer = buffer.ibuf(), encode_chain_len = 1},
+        exp_err = '"encode_chain_len" is forbidden without ' ..
+            '{encode = \'chain\'}',
+    },
+    {
+        -- Any encode value except "chain" should lead to an
+        -- error, but we check only 'select' here.
+        'Use "encode_chain_len" with "encode" != "chain"',
+        sources = {},
+        opts = {buffer = buffer.ibuf(), encode = 'select',
+            encode_chain_len = 1},
+        exp_err = '"encode_chain_len" is forbidden without ' ..
+            '{encode = \'chain\'}',
+    },
+    {
+        'Bad encode_chain_len type',
+        sources = {},
+        opts = {buffer = buffer.ibuf(), encode = 'chain',
+            encode_chain_len = 'bad value'},
+        exp_err = merger_usage('encode_chain_len'),
+    },
+    {
+        'Bad msgpack source: wrong length of the tuples array',
+        -- Remove the last tuple from msgpack data, but keep old
+        -- tuples array size.
+        sources = {
+            truncated_msgpack_buffer({[IPROTO_DATA] = {{''}, {''}, {''}}}, 2),
+        },
+        opts = {},
+        funcs = {'select'},
+        exp_err = 'Unexpected msgpack buffer end',
+    },
+    {
+        'Bad msgpack source: wrong length of a tuple',
+        -- Remove half of the last tuple, but keep old tuple size.
+        sources = {
+            truncated_msgpack_buffer({[IPROTO_DATA] = {{''}, {''}, {''}}}, 1),
+        },
+        opts = {},
+        funcs = {'select'},
+        exp_err = 'Unexpected msgpack buffer end',
+    },
+}
+
+local schemas = {
+    {
+        name = 'small_unsigned',
+        parts = {
+            {
+                fieldno = 2,
+                type = 'unsigned',
+            }
+        },
+        gen_tuple = function(tupleno)
+            return {'id_' .. tostring(tupleno), tupleno}
+        end,
+    },
+    -- Merger allocates a memory for 8 parts by default.
+    -- Test that reallocation works properly.
+    -- Test with N-1 equal parts and Nth different.
+    {
+        name = 'many_parts',
+        parts = (function()
+            local parts = {}
+            for i = 1, 128 do
+                parts[i] = {
+                    fieldno = i,
+                    type = 'unsigned',
+                }
+            end
+            return parts
+        end)(),
+        gen_tuple = function(tupleno)
+            local tuple = {}
+            -- 127 constant parts
+            for i = 1, 127 do
+                tuple[i] = i
+            end
+            -- 128th part is varying
+            tuple[128] = tupleno
+            return tuple
+        end,
+        -- reduce tuples count to decrease test run time
+        tuples_cnt = 16,
+    },
+    -- Test null value in nullable field of an index.
+    {
+        name = 'nullable',
+        parts = {
+            {
+                fieldno = 1,
+                type = 'unsigned',
+            },
+            {
+                fieldno = 2,
+                type = 'string',
+                is_nullable = true,
+            },
+        },
+        gen_tuple = function(i)
+            if i % 1 == 1 then
+                return {0, tostring(i)}
+            else
+                return {0, box.NULL}
+            end
+        end,
+    },
+    -- Test index part with 'collation_id' option (as in net.box's
+    -- response).
+    {
+        name = 'collation_id',
+        parts = {
+            {
+                fieldno = 1,
+                type = 'string',
+                collation_id = 2, -- unicode_ci
+            },
+        },
+        gen_tuple = function(i)
+            local letters = {'a', 'b', 'c', 'A', 'B', 'C'}
+            if i <= #letters then
+                return {letters[i]}
+            else
+                return {''}
+            end
+        end,
+    },
+    -- Test index part with 'collation' option (as in local index
+    -- parts).
+    {
+        name = 'collation',
+        parts = {
+            {
+                fieldno = 1,
+                type = 'string',
+                collation = 'unicode_ci',
+            },
+        },
+        gen_tuple = function(i)
+            local letters = {'a', 'b', 'c', 'A', 'B', 'C'}
+            if i <= #letters then
+                return {letters[i]}
+            else
+                return {''}
+            end
+        end,
+    },
+}
+
+local function is_unicode_ci_part(part)
+    return part.collation_id == 2 or part.collation == 'unicode_ci'
+end
+
+local function sort_tuples(tuples, parts, opts)
+    local function tuple_comparator(a, b)
+        for _, part in ipairs(parts) do
+            local fieldno = part.fieldno
+            if a[fieldno] ~= b[fieldno] then
+                if a[fieldno] == nil then
+                    return -1
+                end
+                if b[fieldno] == nil then
+                    return 1
+                end
+                if is_unicode_ci_part(part) then
+                    return utf8.casecmp(a[fieldno], b[fieldno])
+                end
+                return a[fieldno] < b[fieldno] and -1 or 1
+            end
+        end
+
+        return 0
+    end
+
+    local function tuple_comparator_wrapper(a, b)
+        local cmp = tuple_comparator(a, b)
+        if cmp < 0 then
+            return not opts.descending
+        elseif cmp > 0 then
+            return opts.descending
+        else
+            return false
+        end
+    end
+
+    table.sort(tuples, tuple_comparator_wrapper)
+end
+
+local function lowercase_unicode_ci_fields(tuples, parts)
+    for i = 1, #tuples do
+        local tuple = tuples[i]
+        for _, part in ipairs(parts) do
+            if is_unicode_ci_part(part) then
+                -- Workaround #3709.
+                if tuple[part.fieldno]:len() > 0 then
+                    tuple[part.fieldno] = utf8.lower(tuple[part.fieldno])
+                end
+            end
+        end
+    end
+end
+
+local function prepare_data(schema, tuples_cnt, sources_cnt, opts)
+    local opts = opts or {}
+    local input_type = opts.input_type
+    local use_table_as_tuple = opts.use_table_as_tuple
+
+    local tuples = {}
+    local exp_result = {}
+
+    -- Ensure empty sources are empty table and not nil.
+    for i = 1, sources_cnt do
+        if tuples[i] == nil then
+            tuples[i] = {}
+        end
+    end
+
+    -- Prepare N tables with tuples as input for merger.
+    for i = 1, tuples_cnt do
+        -- [1, sources_cnt]
+        local guava = digest.guava(i, sources_cnt) + 1
+        local tuple = schema.gen_tuple(i)
+        table.insert(exp_result, tuple)
+        if not use_table_as_tuple then
+            assert(input_type ~= 'buffer')
+            tuple = box.tuple.new(tuple)
+        end
+        table.insert(tuples[guava], tuple)
+    end
+
+    -- Sort tuples within each source.
+    for _, source_tuples in pairs(tuples) do
+        sort_tuples(source_tuples, schema.parts, opts)
+    end
+
+    -- Sort expected result.
+    sort_tuples(exp_result, schema.parts, opts)
+
+    -- Fill sources.
+    local sources
+    if input_type == 'table' then
+        -- Imitate netbox's select w/o {buffer = ...}.
+        sources = tuples
+    elseif input_type == 'buffer' then
+        -- Imitate netbox's select with {buffer = ...}.
+        sources = {}
+        for i = 1, sources_cnt do
+            local data
+            if opts.decode == 'raw' then
+                data = tuples[i]
+            elseif opts.decode == nil or opts.decode == 'select' then
+                data = {[IPROTO_DATA] = tuples[i]}
+            elseif opts.decode == 'call' then
+                data = {[IPROTO_DATA] = {tuples[i]}}
+            elseif opts.decode == 'chain' then
+                data = {[IPROTO_DATA] = {{tuples[i]}}}
+            else
+                assert(false)
+            end
+            sources[i] = buffer.ibuf()
+            msgpackffi.internal.encode_r(sources[i], data, 0)
+        end
+    elseif input_type == 'iterator' then
+        -- Lua iterator.
+        sources = {}
+        for i = 1, sources_cnt do
+            sources[i] = {
+                -- gen (next)
+                next,
+                -- param (tuples)
+                tuples[i],
+                -- state (idx)
+                nil
+            }
+        end
+    end
+
+    return sources, exp_result
+end
+
+local function test_case_opts_str(opts)
+    local params = {}
+
+    if opts.decode then
+        table.insert(params, 'decode: ' .. opts.decode)
+    end
+
+    if opts.encode then
+        table.insert(params, 'encode: ' .. opts.encode)
+    end
+
+    if opts.input_type then
+        table.insert(params, 'input_type: ' .. opts.input_type)
+    end
+
+    if opts.output_type then
+        table.insert(params, 'output_type: ' .. opts.output_type)
+    end
+
+    if opts.descending then
+        table.insert(params, 'descending')
+    end
+
+    if opts.use_table_as_tuple then
+        table.insert(params, 'use_table_as_tuple')
+    end
+
+    if next(params) == nil then
+        return ''
+    end
+
+    return (' (%s)'):format(table.concat(params, ', '))
+end
+
+local function run_merger(test, schema, tuples_cnt, sources_cnt, opts)
+    fiber.yield()
+
+    local opts = opts or {}
+
+    -- Prepare data.
+    local sources, exp_result =
+        prepare_data(schema, tuples_cnt, sources_cnt, opts)
+
+    -- Create a merger instance and fill options.
+    local merger_inst = merger.new(schema.parts)
+    local merger_opts = {
+        decode = opts.decode,
+        encode = opts.encode,
+        descending = opts.descending,
+    }
+    if opts.output_type == 'buffer' then
+        merger_opts.buffer = buffer.ibuf()
+    end
+    if opts.encode == 'chain' then
+        merger_opts.encode_chain_len = 1
+    end
+
+    local res
+
+    -- Run merger and prepare output for compare.
+    if opts.output_type == 'table' then
+        -- Table output.
+        res = merger_inst:select(sources, merger_opts)
+    elseif opts.output_type == 'buffer' then
+        -- Buffer output.
+        merger_inst:select(sources, merger_opts)
+        local obuf = merger_opts.buffer
+        local data = msgpackffi.decode(obuf.rpos)
+
+        if opts.encode == 'raw' then
+            res = data
+        elseif opts.encode == nil or opts.encode == 'select' then
+            res = data[IPROTO_DATA]
+        elseif opts.encode == 'call' then
+            res = data[IPROTO_DATA][1]
+        elseif opts.encode == 'chain' then
+            res = data[IPROTO_DATA][1][1]
+        else
+            assert(false)
+        end
+    else
+        -- Iterator output.
+        assert(opts.output_type == 'iterator')
+        res = merger_inst:pairs(sources, merger_opts):totable()
+    end
+
+    -- A bit more postprocessing to compare.
+    for i = 1, #res do
+        if type(res[i]) ~= 'table' then
+            res[i] = res[i]:totable()
+        end
+    end
+
+    -- unicode_ci does not differentiate btw 'A' and 'a', so the
+    -- order is arbitrary. We transform fields with unicode_ci
+    -- collation in parts to lower case before comparing.
+    lowercase_unicode_ci_fields(res, schema.parts)
+    lowercase_unicode_ci_fields(exp_result, schema.parts)
+
+    test:is_deeply(res, exp_result,
+        ('check order on %3d tuples in %4d sources%s')
+        :format(tuples_cnt, sources_cnt, test_case_opts_str(opts)))
+end
+
+local function run_case(test, schema, opts)
+    local opts = opts or {}
+
+    local case_name = ('testing on schema %s%s'):format(
+        schema.name, test_case_opts_str(opts))
+    local tuples_cnt = schema.tuples_cnt or 100
+
+    local encode = opts.encode
+    local decode = opts.decode
+    local input_type = opts.input_type
+    local output_type = opts.output_type
+    local use_table_as_tuple = opts.use_table_as_tuple
+
+    -- Skip meaningless flags combinations.
+    if input_type ~= 'buffer' and decode ~= nil then
+        return
+    end
+    if output_type ~= 'buffer' and encode ~= nil then
+        return
+    end
+    if input_type == 'buffer' and not use_table_as_tuple then
+        return
+    end
+
+    test:test(case_name, function(test)
+        test:plan(6)
+
+        -- Check with small buffers count.
+        run_merger(test, schema, tuples_cnt, 1, opts)
+        run_merger(test, schema, tuples_cnt, 2, opts)
+        run_merger(test, schema, tuples_cnt, 3, opts)
+        run_merger(test, schema, tuples_cnt, 4, opts)
+        run_merger(test, schema, tuples_cnt, 5, opts)
+
+        -- Check more buffers then tuples count.
+        run_merger(test, schema, tuples_cnt, 1000, opts)
+    end)
+end
+
+local test = tap.test('merger')
+test:plan(#bad_merger_new_calls - 1 + #bad_merger_methods_calls +
+    #schemas * 126)
+
+-- Bad merger.new() calls.
+for _, case in ipairs(bad_merger_new_calls) do
+    if type(case) == 'function' then
+        case()
+    else
+        local ok, err = pcall(merger.new, case.parts)
+        test:is_deeply({ok, err}, {false, case.exp_err}, case[1])
+    end
+end
+
+-- Create the instance to use in testing merger's methods below.
+local merger_inst = merger.new({{
+    fieldno = 1,
+    type = 'string',
+}})
+
+-- Bad source or/and opts parameters for merger's methods.
+for _, case in ipairs(bad_merger_methods_calls) do
+    test:test(case[1], function(test)
+        local funcs = case.funcs or {'pairs', 'ipairs', 'select'}
+        test:plan(#funcs)
+        for _, func in ipairs(funcs) do
+            local exp_ok = case.exp_err == nil
+            local ok, err = pcall(merger_inst[func], merger_inst, case.sources,
+                case.opts)
+            if ok then
+                err = nil
+            end
+            test:is_deeply({ok, err}, {exp_ok, case.exp_err}, func)
+        end
+    end)
+end
+
+-- Merging cases.
+for _, decode in ipairs({'nil', 'raw', 'select', 'call', 'chain'}) do
+    for _, encode in ipairs({'nil', 'raw', 'select', 'call', 'chain'}) do
+        for _, input_type in ipairs({'buffer', 'table', 'iterator'}) do
+            for _, output_type in ipairs({'buffer', 'table', 'iterator'}) do
+                for _, descending in ipairs({false, true}) do
+                    for _, use_table_as_tuple in ipairs({false, true}) do
+                        for _, schema in ipairs(schemas) do
+                            decode = decode ~= 'nil' and decode or nil
+                            encode = encode ~= 'nil' and encode or nil
+                            run_case(test, schema, {
+                                decode = decode,
+                                encode = encode,
+                                input_type = input_type,
+                                output_type = output_type,
+                                descending = descending,
+                                use_table_as_tuple = use_table_as_tuple,
+                            })
+                        end
+                    end
+                end
+            end
+        end
+    end
+end
+
+os.exit(test:check() and 0 or 1)
diff --git a/test/app-tap/suite.ini b/test/app-tap/suite.ini
index 86af82637..1a2abb79f 100644
--- a/test/app-tap/suite.ini
+++ b/test/app-tap/suite.ini
@@ -2,4 +2,5 @@
 core = app
 description = application server tests (TAP)
 lua_libs = lua/require_mod.lua lua/serializer_test.lua
+long_run = merger.test.lua
 is_parallel = True
-- 
2.19.2

  parent reply	other threads:[~2018-12-16 20:17 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-12-16 20:17 [PATCH 0/3] Merger Alexander Turenko
2018-12-16 20:17 ` [PATCH 1/3] Add luaT_iscallable with support of cdata metatype Alexander Turenko
2018-12-26 18:35   ` Vladimir Davydov
2018-12-28  1:46     ` Alexander Turenko
2018-12-28  8:00       ` Vladimir Davydov
2018-12-16 20:17 ` [PATCH 2/3] Add module to ease using Lua iterators from C Alexander Turenko
2018-12-26 18:45   ` Vladimir Davydov
2018-12-31  6:43     ` Alexander Turenko
2018-12-16 20:17 ` Alexander Turenko [this message]
2018-12-26 20:11   ` [PATCH 3/3] Add merger for tuple streams Vladimir Davydov
2019-01-09 21:36     ` Alexander Turenko
2018-12-18 12:16 ` [PATCH 0/3] Merger Alexander Turenko
2019-03-22 14:24 ` [tarantool-patches] [PATCH 0/3] lua: add key_def lua module Kirill Shcherbatov
2019-03-22 16:20   ` Alexander Turenko

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=29bd3bbb8544f5b0e070e6b7880dad71f709a7d9.1544989900.git.alexander.turenko@tarantool.org \
    --to=alexander.turenko@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=vdavydov.dev@gmail.com \
    --subject='Re: [PATCH 3/3] Add merger for tuple streams' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox