[PATCH v3 7/7] Add merger for tuple streams (Lua part)

Alexander Turenko alexander.turenko at tarantool.org
Wed Apr 10 18:21:25 MSK 2019


Fixes #3276.

@TarantoolBot document
Title: Merger for tuple streams

The main concept of the merger is a source. It is an object that
provides a stream of tuples. There are four types of sources: a tuple
source, a table source, a buffer source and a merger itself.

A tuple source just return one tuple. However this source (as well as a
table and a buffer ones) supports fetching of a next data chunk, so the
API allows to create it from a Lua iterator:
`merger.new_tuple_source(gen, param, state)`. A `gen` function should
return `state, tuple` on each call and then return `nil` when no more
tuples available. Consider the example:

```lua
box.cfg({})
box.schema.space.create('s')
box.space.s:create_index('pk')
box.space.s:insert({1})
box.space.s:insert({2})
box.space.s:insert({3})

s = merger.new_tuple_source(box.space.s:pairs())
s:select()
---
- - [1]
  - [2]
  - [3]
...

s = merger.new_tuple_source(box.space.s:pairs())
s:pairs():totable()
---
- - [1]
  - [2]
  - [3]
...
```

As we see a source (it is common for all sources) has `:select()` and
`:pairs()` methods. The first one has two options: `buffer` and `limit`
with the same meaning as ones in net.box `:select()`. The `:pairs()`
method (or `:ipairs()` alias) returns a luafun iterator (it is a Lua
iterator, but also provides a set of handy methods to operate in
functional style).

The same API exists to create a table and a buffer source:
`merger.new_table_source(gen, param, state)` and
`merger.new_buffer_source(gen, param, state)`. A `gen` function should
return a table or a buffer on each call.

There are also helpers that are useful when all data are available at
once: `merger.new_source_fromtable(tbl)` and
`merger.new_source_frombuffer(buf)`.

A merger is a special kind of a source, which is created from a key_def
object and a set of sources. It performs a kind of the merge sort:
chooses a source with a minimal / maximal tuple on each step, consumes
a tuple from this source and repeats. The API to create a merger is the
following:

```lua
local ctx = merger.context.new(key_def.new(<...>))
local sources = {<...>}
local merger_inst = merger.new(ctx, sources, {
    -- Ascending (false) or descending (true) order.
    -- Default is ascending.
    reverse = <boolean> or <nil>,
})
```

An instance of a merger has the same `:select()` and `:pairs()` methods
as any other source. A merger context is a part of a merger state that
is immutable and can be cached across requests with the same ordering
rules (typically requests to a same space).

The `key_def.new()` function takes a table of key parts as an argument
in the same format as box.space.<...>.index.<...>.parts or
conn.space.<...>.index.<...>.parts (where conn is a net.box connection):

```
local key_parts = {
    {
        fieldno = <number>,
        type = <string>,
        [ is_nullable = <boolean>, ]
        [ collation_id = <number>, ]
        [ collation = <string>, ]
    },
    ...
}
local key_def_inst = key_def.new(key_parts)
```
---
 src/box/CMakeLists.txt       |    2 +
 src/box/lua/init.c           |    7 +-
 src/box/lua/merger.c         | 1184 ++++++++++++++++++++++++++++++++++
 src/box/lua/merger.h         |   47 ++
 src/box/lua/merger.lua       |   41 ++
 test/box-tap/merger.test.lua |  725 +++++++++++++++++++++
 6 files changed, 2005 insertions(+), 1 deletion(-)
 create mode 100644 src/box/lua/merger.c
 create mode 100644 src/box/lua/merger.h
 create mode 100644 src/box/lua/merger.lua
 create mode 100755 test/box-tap/merger.test.lua

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index d1251c326..491e3d160 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -13,6 +13,7 @@ lua_source(lua_sources lua/upgrade.lua)
 lua_source(lua_sources lua/console.lua)
 lua_source(lua_sources lua/xlog.lua)
 lua_source(lua_sources lua/key_def.lua)
+lua_source(lua_sources lua/merger.lua)
 set(bin_sources)
 bin_source(bin_sources bootstrap.snap bootstrap.h)
 
@@ -143,6 +144,7 @@ add_library(box STATIC
     lua/xlog.c
     lua/execute.c
     lua/key_def.c
+    lua/merger.c
     ${bin_sources})
 
 target_link_libraries(box box_error tuple stat xrow xlog vclock crc32 scramble
diff --git a/src/box/lua/init.c b/src/box/lua/init.c
index 6b8be5096..76b987b4b 100644
--- a/src/box/lua/init.c
+++ b/src/box/lua/init.c
@@ -60,6 +60,7 @@
 #include "box/lua/tuple.h"
 #include "box/lua/execute.h"
 #include "box/lua/key_def.h"
+#include "box/lua/merger.h"
 
 extern char session_lua[],
 	tuple_lua[],
@@ -70,7 +71,8 @@ extern char session_lua[],
 	feedback_daemon_lua[],
 	net_box_lua[],
 	upgrade_lua[],
-	console_lua[];
+	console_lua[],
+	merger_lua[];
 
 static const char *lua_sources[] = {
 	"box/session", session_lua,
@@ -83,6 +85,7 @@ static const char *lua_sources[] = {
 	"box/load_cfg", load_cfg_lua,
 	"box/xlog", xlog_lua,
 	"box/key_def", key_def_lua,
+	"box/merger", merger_lua,
 	NULL
 };
 
@@ -317,6 +320,8 @@ box_lua_init(struct lua_State *L)
 	lua_pop(L, 1);
 	luaopen_key_def(L);
 	lua_pop(L, 1);
+	luaopen_merger(L);
+	lua_pop(L, 1);
 
 	/* Load Lua extension */
 	for (const char **s = lua_sources; *s; s += 2) {
diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c
new file mode 100644
index 000000000..ebe60bc8d
--- /dev/null
+++ b/src/box/lua/merger.c
@@ -0,0 +1,1184 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "box/lua/merger.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <lua.h>             /* lua_*() */
+#include <lauxlib.h>         /* luaL_*() */
+
+#include "fiber.h"           /* fiber() */
+#include "diag.h"            /* diag_set() */
+
+#include "box/tuple.h"       /* box_tuple_*() */
+
+#include "lua/error.h"       /* luaT_error() */
+#include "lua/utils.h"       /* luaL_pushcdata(),
+				luaL_iterator_*() */
+
+#include "box/lua/key_def.h" /* check_key_def() */
+#include "box/lua/tuple.h"   /* luaT_tuple_new() */
+
+#include "small/ibuf.h"      /* struct ibuf */
+#include "msgpuck.h"         /* mp_*() */
+
+#include "box/merger.h"      /* merger_*() */
+
+static uint32_t merger_source_type_id = 0;
+static uint32_t merger_context_type_id = 0;
+static uint32_t ibuf_type_id = 0;
+
+/**
+ * A type of a function to create a source from a Lua iterator on
+ * a Lua stack.
+ *
+ * Such function is to be passed to lbox_merger_source_new() as
+ * a parameter.
+ */
+typedef struct merger_source *(*luaL_merger_source_new_f)(struct lua_State *L);
+
+/* {{{ Helpers */
+
+/**
+ * Extract an ibuf object from the Lua stack.
+ */
+static struct ibuf *
+luaT_check_ibuf(struct lua_State *L, int idx)
+{
+	if (lua_type(L, idx) != LUA_TCDATA)
+		return NULL;
+
+	uint32_t cdata_type;
+	struct ibuf *ibuf_ptr = luaL_checkcdata(L, idx, &cdata_type);
+	if (ibuf_ptr == NULL || cdata_type != ibuf_type_id)
+		return NULL;
+	return ibuf_ptr;
+}
+
+/**
+ * Extract a merger source from the Lua stack.
+ */
+static struct merger_source *
+luaT_check_merger_source(struct lua_State *L, int idx)
+{
+	uint32_t cdata_type;
+	struct merger_source **source_ptr = luaL_checkcdata(L, idx,
+							    &cdata_type);
+	if (source_ptr == NULL || cdata_type != merger_source_type_id)
+		return NULL;
+	return *source_ptr;
+}
+
+/**
+ * Extract a merger context from the Lua stack.
+ */
+static struct merger_context *
+luaT_check_merger_context(struct lua_State *L, int idx)
+{
+	uint32_t cdata_type;
+	struct merger_context **ctx_ptr = luaL_checkcdata(L, idx, &cdata_type);
+	if (ctx_ptr == NULL || cdata_type != merger_context_type_id)
+		return NULL;
+	return *ctx_ptr;
+}
+
+/**
+ * Skip an array around tuples and save its length.
+ */
+static int
+decode_header(struct ibuf *buf, size_t *len_p)
+{
+	/* Check the buffer is correct. */
+	if (buf->rpos > buf->wpos)
+		return -1;
+
+	/* Skip decoding if the buffer is empty. */
+	if (ibuf_used(buf) == 0) {
+		*len_p = 0;
+		return 0;
+	}
+
+	/* Check and skip the array around tuples. */
+	int ok = mp_typeof(*buf->rpos) == MP_ARRAY;
+	if (ok)
+		ok = mp_check_array(buf->rpos, buf->wpos) <= 0;
+	if (ok)
+		*len_p = mp_decode_array((const char **) &buf->rpos);
+	return ok ? 0 : -1;
+}
+
+/**
+ * Encode an array around tuples.
+ */
+static void
+encode_header(struct ibuf *obuf, uint32_t result_len)
+{
+	ibuf_reserve(obuf, mp_sizeof_array(result_len));
+	obuf->wpos = mp_encode_array(obuf->wpos, result_len);
+}
+
+/* }}} */
+
+/* {{{ Create, delete structures from Lua */
+
+/**
+ * Free a merger source from a Lua code.
+ */
+static int
+lbox_merger_source_gc(struct lua_State *L)
+{
+	struct merger_source *source;
+	if ((source = luaT_check_merger_source(L, 1)) == NULL)
+		return 0;
+	merger_source_unref(source);
+	return 0;
+}
+
+/**
+ * Free a merger context from a Lua code.
+ */
+static int
+lbox_merger_context_gc(struct lua_State *L)
+{
+	struct merger_context *ctx;
+	if ((ctx = luaT_check_merger_context(L, 1)) == NULL)
+		return 0;
+	merger_context_unref(ctx);
+	return 0;
+}
+
+/**
+ * Create a new source from a Lua iterator and push it onto the
+ * Lua stack.
+ *
+ * It is the helper for lbox_merger_new_buffer_source(),
+ * lbox_merger_new_table_source() and
+ * lbox_merger_new_tuple_source().
+ */
+static int
+lbox_merger_source_new(struct lua_State *L, const char *func_name,
+		       luaL_merger_source_new_f luaL_merger_source_new)
+{
+	int top = lua_gettop(L);
+	if (top < 1 || top > 3 || !luaL_iscallable(L, 1))
+		return luaL_error(L, "Usage: %s(gen, param, state)", func_name);
+
+	/*
+	 * luaL_merger_source_new() reads exactly three top
+	 * values.
+	 */
+	while (lua_gettop(L) < 3)
+		lua_pushnil(L);
+
+	struct merger_source *source = luaL_merger_source_new(L);
+	if (source == NULL)
+		return luaT_error(L);
+	merger_source_ref(source);
+	*(struct merger_source **) luaL_pushcdata(L, merger_source_type_id) =
+		source;
+	lua_pushcfunction(L, lbox_merger_source_gc);
+	luaL_setcdatagc(L, -2);
+
+	return 1;
+}
+
+/**
+ * Create a new merger context and push it to a Lua stack.
+ *
+ * Expect cdata<struct key_def> on a Lua stack.
+ */
+static int
+lbox_merger_context_new(struct lua_State *L)
+{
+	struct key_def *key_def;
+	if (lua_gettop(L) != 1 || (key_def = check_key_def(L, 1)) == NULL)
+		return luaL_error(L, "Usage: merger.context.new(key_def)");
+
+	struct merger_context *ctx = merger_context_new(key_def);
+	if (ctx == NULL)
+		return luaT_error(L);
+
+	merger_context_ref(ctx);
+	*(struct merger_context **) luaL_pushcdata(L, merger_context_type_id) =
+		ctx;
+	lua_pushcfunction(L, lbox_merger_context_gc);
+	luaL_setcdatagc(L, -2);
+
+	return 1;
+}
+
+/**
+ * Raise a Lua error with merger.new() usage info.
+ */
+static int
+lbox_merger_new_usage(struct lua_State *L, const char *param_name)
+{
+	static const char *usage = "merger.new(merger_context, "
+				   "{source, source, ...}[, {"
+				   "reverse = <boolean> or <nil>}])";
+	if (param_name == NULL)
+		return luaL_error(L, "Bad params, use: %s", usage);
+	else
+		return luaL_error(L, "Bad param \"%s\", use: %s", param_name,
+				  usage);
+}
+
+/**
+ * Parse a second parameter of merger.new() into an array of
+ * sources.
+ *
+ * Return an array of pointers to sources and set @a
+ * sources_count_ptr. In case of an error set a diag and return
+ * NULL.
+ *
+ * It is the helper for lbox_merger_new().
+ */
+static struct merger_source **
+luaT_merger_new_parse_sources(struct lua_State *L, int idx,
+			      uint32_t *sources_count_ptr)
+{
+	/* Allocate sources array. */
+	uint32_t sources_count = lua_objlen(L, idx);
+	const ssize_t sources_size =
+		sources_count * sizeof(struct merger_source *);
+	struct merger_source **sources =
+		(struct merger_source **) malloc(sources_size);
+	if (sources == NULL) {
+		diag_set(OutOfMemory, sources_size, "malloc", "sources");
+		return NULL;
+	}
+
+	/* Save all sources. */
+	for (uint32_t i = 0; i < sources_count; ++i) {
+		lua_pushinteger(L, i + 1);
+		lua_gettable(L, idx);
+
+		/* Extract a source from a Lua stack. */
+		struct merger_source *source = luaT_check_merger_source(L, -1);
+		if (source == NULL) {
+			free(sources);
+			diag_set(IllegalParams,
+				 "Unknown source type at index %d", i + 1);
+			return NULL;
+		}
+		sources[i] = source;
+	}
+	lua_pop(L, sources_count);
+
+	*sources_count_ptr = sources_count;
+	return sources;
+}
+
+/**
+ * Create a new merger and push it to a Lua stack as a merger
+ * source.
+ *
+ * Expect cdata<struct context>, a table of sources and
+ * (optionally) a table of options on a Lua stack.
+ */
+static int
+lbox_merger_new(struct lua_State *L)
+{
+	struct merger_context *ctx;
+	int top = lua_gettop(L);
+	bool ok = (top == 2 || top == 3) &&
+		/* Merger context. */
+		(ctx = luaT_check_merger_context(L, 1)) != NULL &&
+		/* Sources. */
+		lua_istable(L, 2) == 1 &&
+		/* Opts. */
+		(lua_isnoneornil(L, 3) == 1 || lua_istable(L, 3) == 1);
+	if (!ok)
+		return lbox_merger_new_usage(L, NULL);
+
+	/* Options. */
+	bool reverse = false;
+
+	/* Parse options. */
+	if (!lua_isnoneornil(L, 3)) {
+		/* Parse reverse. */
+		lua_pushstring(L, "reverse");
+		lua_gettable(L, 3);
+		if (!lua_isnil(L, -1)) {
+			if (lua_isboolean(L, -1))
+				reverse = lua_toboolean(L, -1);
+			else
+				return lbox_merger_new_usage(L, "reverse");
+		}
+		lua_pop(L, 1);
+	}
+
+	struct merger_source *merger = merger_new(ctx);
+	if (merger == NULL)
+		return luaT_error(L);
+
+	uint32_t sources_count = 0;
+	struct merger_source **sources = luaT_merger_new_parse_sources(L, 2,
+		&sources_count);
+	if (sources == NULL) {
+		merger->vtab->delete(merger);
+		return luaT_error(L);
+	}
+
+	if (merger_set_sources(merger, sources, sources_count) != 0) {
+		free(sources);
+		merger->vtab->delete(merger);
+		return luaT_error(L);
+	}
+	free(sources);
+	merger_set_reverse(merger, reverse);
+
+	merger_source_ref(merger);
+	*(struct merger_source **)
+		luaL_pushcdata(L, merger_source_type_id) = merger;
+	lua_pushcfunction(L, lbox_merger_source_gc);
+	luaL_setcdatagc(L, -2);
+
+	return 1;
+}
+
+/* }}} */
+
+/* {{{ Buffer merger source */
+
+struct merger_source_buffer {
+	struct merger_source base;
+	/*
+	 * A reference to a Lua iterator to fetch a next chunk of
+	 * tuples.
+	 */
+	struct luaL_iterator *fetch_it;
+	/*
+	 * A reference a buffer with a current chunk of tuples.
+	 * It is needed to prevent LuaJIT from collecting the
+	 * buffer while the source consider it as the current
+	 * one.
+	 */
+	int ref;
+	/*
+	 * A buffer with a current chunk of tuples.
+	 */
+	struct ibuf *buf;
+	/*
+	 * A merger stops before end of a buffer when it is not
+	 * the last merger in the chain.
+	 */
+	size_t remaining_tuples_cnt;
+};
+
+/* Virtual methods declarations */
+
+static void
+luaL_merger_source_buffer_delete(struct merger_source *base);
+static int
+luaL_merger_source_buffer_next(struct merger_source *base,
+			       box_tuple_format_t *format,
+			       struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Create a new merger source of the buffer type.
+ *
+ * Reads gen, param, state from the top of a Lua stack.
+ *
+ * In case of an error it returns NULL and sets a diag.
+ */
+static struct merger_source *
+luaL_merger_source_buffer_new(struct lua_State *L)
+{
+	static struct merger_source_vtab merger_source_buffer_vtab = {
+		.delete = luaL_merger_source_buffer_delete,
+		.next = luaL_merger_source_buffer_next,
+	};
+
+	struct merger_source_buffer *source = (struct merger_source_buffer *)
+		malloc(sizeof(struct merger_source_buffer));
+	if (source == NULL) {
+		diag_set(OutOfMemory, sizeof(struct merger_source_buffer),
+			 "malloc", "merger_source_buffer");
+		return NULL;
+	}
+
+	merger_source_new(&source->base, &merger_source_buffer_vtab);
+
+	source->fetch_it = luaL_iterator_new(L, 0);
+	source->ref = 0;
+	source->buf = NULL;
+	source->remaining_tuples_cnt = 0;
+
+	return &source->base;
+}
+
+/**
+ * Call a user provided function to get a next data chunk (a
+ * buffer).
+ *
+ * Return 1 when a new buffer is received, 0 when a buffers
+ * iterator ends and -1 at error and set a diag.
+ */
+static int
+luaL_merger_source_buffer_fetch(struct merger_source_buffer *source)
+{
+	struct lua_State *L = fiber()->storage.lua.stack;
+	int nresult = luaL_iterator_next(L, source->fetch_it);
+
+	/* Handle a Lua error in a gen function. */
+	if (nresult == -1)
+		return -1;
+
+	/* No more data: do nothing. */
+	if (nresult == 0)
+		return 0;
+
+	/* Handle incorrect results count. */
+	if (nresult != 2) {
+		diag_set(IllegalParams, "Expected <state>, <buffer>, got %d "
+			 "return values", nresult);
+		return -1;
+	}
+
+	/* Set a new buffer as the current chunk. */
+	if (source->ref > 0)
+		luaL_unref(L, LUA_REGISTRYINDEX, source->ref);
+	lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */
+	source->ref = luaL_ref(L, LUA_REGISTRYINDEX);
+	source->buf = luaT_check_ibuf(L, -1);
+	assert(source->buf != NULL);
+	lua_pop(L, nresult);
+
+	/* Update remaining_tuples_cnt and skip the header. */
+	if (decode_header(source->buf, &source->remaining_tuples_cnt) != 0) {
+		diag_set(IllegalParams, "Invalid merger source %p",
+			 &source->base);
+		return -1;
+	}
+	return 1;
+}
+
+/* Virtual methods */
+
+static void
+luaL_merger_source_buffer_delete(struct merger_source *base)
+{
+	struct merger_source_buffer *source = container_of(base,
+		struct merger_source_buffer, base);
+
+	assert(source->fetch_it != NULL);
+	luaL_iterator_delete(source->fetch_it);
+	if (source->ref > 0)
+		luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref);
+
+	free(source);
+}
+
+static int
+luaL_merger_source_buffer_next(struct merger_source *base,
+			       box_tuple_format_t *format,
+			       struct tuple **out)
+{
+	struct merger_source_buffer *source = container_of(base,
+		struct merger_source_buffer, base);
+
+	/*
+	 * Handle the case when all data were processed: ask a
+	 * next chunk until a non-empty chunk is received or a
+	 * chunks iterator ends.
+	 */
+	while (source->remaining_tuples_cnt == 0) {
+		int rc = luaL_merger_source_buffer_fetch(source);
+		if (rc < 0)
+			return -1;
+		if (rc == 0) {
+			*out = NULL;
+			return 0;
+		}
+	}
+	if (ibuf_used(source->buf) == 0) {
+		diag_set(IllegalParams, "Unexpected msgpack buffer end");
+		return -1;
+	}
+	const char *tuple_beg = source->buf->rpos;
+	const char *tuple_end = tuple_beg;
+	/*
+	 * mp_next() is faster then mp_check(), but can	read bytes
+	 * outside of the buffer and so can cause segmentation
+	 * faults or an incorrect result.
+	 *
+	 * We check buffer boundaries after the mp_next() call and
+	 * throw an error when the boundaries are violated, but it
+	 * does not save us from possible segmentation faults.
+	 *
+	 * It is in a user responsibility to provide valid
+	 * msgpack.
+	 */
+	mp_next(&tuple_end);
+	--source->remaining_tuples_cnt;
+	if (tuple_end > source->buf->wpos) {
+		diag_set(IllegalParams, "Unexpected msgpack buffer end");
+		return -1;
+	}
+	source->buf->rpos = (char *) tuple_end;
+	if (format == NULL)
+		format = box_tuple_format_default();
+	struct tuple *tuple = box_tuple_new(format, tuple_beg, tuple_end);
+	if (tuple == NULL)
+		return -1;
+
+	box_tuple_ref(tuple);
+	*out = tuple;
+	return 0;
+}
+
+/* Lua functions */
+
+/**
+ * Create a new buffer source and push it onto the Lua stack.
+ */
+static int
+lbox_merger_new_buffer_source(struct lua_State *L)
+{
+	return lbox_merger_source_new(L, "merger.new_buffer_source",
+				      luaL_merger_source_buffer_new);
+}
+
+/* }}} */
+
+/* {{{ Table merger source */
+
+struct merger_source_table {
+	struct merger_source base;
+	/*
+	 * A reference to a Lua iterator to fetch a next chunk of
+	 * tuples.
+	 */
+	struct luaL_iterator *fetch_it;
+	/*
+	 * A reference to a table with a current chunk of tuples.
+	 */
+	int ref;
+	/* An index of current tuples within a current chunk. */
+	int next_idx;
+};
+
+/* Virtual methods declarations */
+
+static void
+luaL_merger_source_table_delete(struct merger_source *base);
+static int
+luaL_merger_source_table_next(struct merger_source *base,
+			      box_tuple_format_t *format,
+			      struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Create a new merger source of the table type.
+ *
+ * In case of an error it returns NULL and set a diag.
+ */
+static struct merger_source *
+luaL_merger_source_table_new(struct lua_State *L)
+{
+	static struct merger_source_vtab merger_source_table_vtab = {
+		.delete = luaL_merger_source_table_delete,
+		.next = luaL_merger_source_table_next,
+	};
+
+	struct merger_source_table *source = (struct merger_source_table *)
+		malloc(sizeof(struct merger_source_table));
+	if (source == NULL) {
+		diag_set(OutOfMemory, sizeof(struct merger_source_table),
+			 "malloc", "merger_source_table");
+		return NULL;
+	}
+
+	merger_source_new(&source->base, &merger_source_table_vtab);
+
+	source->fetch_it = luaL_iterator_new(L, 0);
+	source->ref = 0;
+	source->next_idx = 1;
+
+	return &source->base;
+}
+
+/**
+ * Call a user provided function to fill the source.
+ *
+ * Return 0 when a tables iterator ends, 1 when a new table is
+ * received and -1 at an error (set a diag).
+ */
+static int
+luaL_merger_source_table_fetch(struct merger_source_table *source)
+{
+	struct lua_State *L = fiber()->storage.lua.stack;
+	int nresult = luaL_iterator_next(L, source->fetch_it);
+
+	/* Handle a Lua error in a gen function. */
+	if (nresult == -1)
+		return -1;
+
+	/* No more data: do nothing. */
+	if (nresult == 0)
+		return 0;
+
+	/* Handle incorrect results count. */
+	if (nresult != 2) {
+		diag_set(IllegalParams, "Expected <state>, <table>, got %d "
+			 "return values", nresult);
+		return -1;
+	}
+
+	/* Set a new table as the current chunk. */
+	if (source->ref > 0)
+		luaL_unref(L, LUA_REGISTRYINDEX, source->ref);
+	lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */
+	source->ref = luaL_ref(L, LUA_REGISTRYINDEX);
+	source->next_idx = 1;
+	lua_pop(L, nresult);
+
+	return 1;
+}
+
+/* Virtual methods */
+
+static void
+luaL_merger_source_table_delete(struct merger_source *base)
+{
+	struct merger_source_table *source = container_of(base,
+		struct merger_source_table, base);
+
+	assert(source->fetch_it != NULL);
+	luaL_iterator_delete(source->fetch_it);
+	if (source->ref > 0)
+		luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref);
+
+	free(source);
+}
+
+static int
+luaL_merger_source_table_next(struct merger_source *base,
+			      box_tuple_format_t *format,
+			      struct tuple **out)
+{
+	struct lua_State *L = fiber()->storage.lua.stack;
+	struct merger_source_table *source = container_of(base,
+		struct merger_source_table, base);
+
+	if (source->ref > 0) {
+		lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref);
+		lua_pushinteger(L, source->next_idx);
+		lua_gettable(L, -2);
+	}
+	/*
+	 * If all data were processed, try to fetch more.
+	 */
+	while (source->ref == 0 || lua_isnil(L, -1)) {
+		if (source->ref > 0)
+			lua_pop(L, 2);
+		int rc = luaL_merger_source_table_fetch(source);
+		if (rc < 0)
+			return -1;
+		if (rc == 0) {
+			*out = NULL;
+			return 0;
+		}
+		/*
+		 * Retry tuple extracting when a next table is
+		 * received.
+		 */
+		lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref);
+		lua_pushinteger(L, source->next_idx);
+		lua_gettable(L, -2);
+	}
+	if (format == NULL)
+		format = box_tuple_format_default();
+	struct tuple *tuple = luaT_tuple_new(L, -1, format);
+	if (tuple == NULL)
+		return -1;
+	++source->next_idx;
+	lua_pop(L, 2);
+
+	box_tuple_ref(tuple);
+	*out = tuple;
+	return 0;
+}
+
+/* Lua functions */
+
+/**
+ * Create a new table source and push it onto the Lua stack.
+ */
+static int
+lbox_merger_new_table_source(struct lua_State *L)
+{
+	return lbox_merger_source_new(L, "merger.new_table_source",
+				      luaL_merger_source_table_new);
+}
+
+/* }}} */
+
+/* {{{ Tuple merger source */
+
+struct merger_source_tuple {
+	struct merger_source base;
+	/*
+	 * A reference to a Lua iterator to fetch a next chunk of
+	 * tuples.
+	 */
+	struct luaL_iterator *fetch_it;
+};
+
+/* Virtual methods declarations */
+
+static void
+luaL_merger_source_tuple_delete(struct merger_source *base);
+static int
+luaL_merger_source_tuple_next(struct merger_source *base,
+			      box_tuple_format_t *format,
+			      struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Create a new merger source of the tuple type.
+ *
+ * In case of an error it returns NULL and set a diag.
+ */
+static struct merger_source *
+luaL_merger_source_tuple_new(struct lua_State *L)
+{
+	static struct merger_source_vtab merger_source_tuple_vtab = {
+		.delete = luaL_merger_source_tuple_delete,
+		.next = luaL_merger_source_tuple_next,
+	};
+
+	struct merger_source_tuple *source =
+		(struct merger_source_tuple *) malloc(
+		sizeof(struct merger_source_tuple));
+	if (source == NULL) {
+		diag_set(OutOfMemory, sizeof(struct merger_source_tuple),
+			 "malloc", "merger_source_tuple");
+		return NULL;
+	}
+
+	merger_source_new(&source->base, &merger_source_tuple_vtab);
+
+	source->fetch_it = luaL_iterator_new(L, 0);
+
+	return &source->base;
+}
+
+/**
+ * Call a user provided function to fill the source.
+ *
+ * Return 1 at success and push a resulting tuple to a the Lua
+ * stack.
+ * Return 0 when no more data.
+ * Return -1 at error (set a diag).
+ */
+static int
+luaL_merger_source_tuple_fetch(struct merger_source_tuple *source,
+			       struct lua_State *L)
+{
+	int nresult = luaL_iterator_next(L, source->fetch_it);
+
+	/* Handle a Lua error in a gen function. */
+	if (nresult == -1)
+		return -1;
+
+	/* No more data: do nothing. */
+	if (nresult == 0)
+		return 0;
+
+	/* Handle incorrect results count. */
+	if (nresult != 2) {
+		diag_set(IllegalParams, "Expected <state>, <tuple> got %d "
+			 "return values", nresult);
+		return -1;
+	}
+
+	/* Set a new tuple as the current chunk. */
+	lua_insert(L, -2); /* Swap state and tuple. */
+	lua_pop(L, 1); /* Pop state. */
+
+	return 1;
+}
+
+/* Virtual methods */
+
+static void
+luaL_merger_source_tuple_delete(struct merger_source *base)
+{
+	struct merger_source_tuple *source = container_of(base,
+		struct merger_source_tuple, base);
+
+	assert(source->fetch_it != NULL);
+	luaL_iterator_delete(source->fetch_it);
+
+	free(source);
+}
+
+static int
+luaL_merger_source_tuple_next(struct merger_source *base,
+				 box_tuple_format_t *format,
+				 struct tuple **out)
+{
+	struct lua_State *L = fiber()->storage.lua.stack;
+	struct merger_source_tuple *source = container_of(base,
+		struct merger_source_tuple, base);
+
+	int rc = luaL_merger_source_tuple_fetch(source, L);
+	if (rc < 0)
+		return -1;
+	/*
+	 * Check whether a tuple appears after the fetch.
+	 */
+	if (rc == 0) {
+		*out = NULL;
+		return 0;
+	}
+
+	if (format == NULL)
+		format = box_tuple_format_default();
+	struct tuple *tuple = luaT_tuple_new(L, -1, format);
+	if (tuple == NULL)
+		return -1;
+	lua_pop(L, 1);
+
+	box_tuple_ref(tuple);
+	*out = tuple;
+	return 0;
+}
+
+/* Lua functions */
+
+/**
+ * Create a new tuple source and push it onto the Lua stack.
+ */
+static int
+lbox_merger_new_tuple_source(struct lua_State *L)
+{
+	return lbox_merger_source_new(L, "merger.new_tuple_source",
+				      luaL_merger_source_tuple_new);
+}
+
+/* }}} */
+
+/* {{{ Merger source Lua methods */
+
+/**
+ * Iterator gen function to traverse source results.
+ *
+ * Expected a nil as the first parameter (param) and a
+ * merger_source as the second parameter (state) on a Lua stack.
+ *
+ * Push the original merger_source (as a new state) and a next
+ * tuple.
+ */
+static int
+lbox_merger_source_gen(struct lua_State *L)
+{
+	struct merger_source *source;
+	bool ok = lua_gettop(L) == 2 && lua_isnil(L, 1) &&
+		(source = luaT_check_merger_source(L, 2)) != NULL;
+	if (!ok)
+		return luaL_error(L, "Bad params, use: lbox_merger_source_gen("
+				  "nil, merger_source)");
+
+	struct tuple *tuple;
+	if (source->vtab->next(source, NULL, &tuple) != 0)
+		return luaT_error(L);
+	if (tuple == NULL) {
+		lua_pushnil(L);
+		lua_pushnil(L);
+		return 2;
+	}
+
+	/* Push merger_source, tuple. */
+	*(struct merger_source **)
+		luaL_pushcdata(L, merger_source_type_id) = source;
+	luaT_pushtuple(L, tuple);
+
+	/*
+	 * luaT_pushtuple() references the tuple, so we
+	 * unreference it on merger's side.
+	 */
+	box_tuple_unref(tuple);
+
+	return 2;
+}
+
+/**
+ * Iterate over merger source results from Lua.
+ *
+ * Push three values to the Lua stack:
+ *
+ * 1. gen (lbox_merger_source_gen wrapped by fun.wrap());
+ * 2. param (nil);
+ * 3. state (merger_source).
+ */
+static int
+lbox_merger_source_ipairs(struct lua_State *L)
+{
+	struct merger_source *source;
+	bool ok = lua_gettop(L) == 1 &&
+		(source = luaT_check_merger_source(L, 1)) != NULL;
+	if (!ok)
+		return luaL_error(L, "Usage: merger_source:ipairs()");
+	/* Stack: merger_source. */
+
+	luaL_loadstring(L, "return require('fun').wrap");
+	lua_call(L, 0, 1);
+	lua_insert(L, -2); /* Swap merger_source and wrap. */
+	/* Stack: wrap, merger_source. */
+
+	lua_pushcfunction(L, lbox_merger_source_gen);
+	lua_insert(L, -2); /* Swap merger_source and gen. */
+	/* Stack: wrap, gen, merger_source. */
+
+	/*
+	 * Push nil as an iterator param, because all needed state
+	 * is in a merger source.
+	 */
+	lua_pushnil(L);
+	/* Stack: wrap, gen, merger_source, nil. */
+
+	lua_insert(L, -2); /* Swap merger_source and nil. */
+	/* Stack: wrap, gen, nil, merger_source. */
+
+	/* Call fun.wrap(gen, nil, merger_source). */
+	lua_call(L, 3, 3);
+	return 3;
+}
+
+/**
+ * Write source results into ibuf.
+ *
+ * It is the helper for lbox_merger_source_select().
+ */
+static int
+encode_result_buffer(struct lua_State *L, struct merger_source *source,
+		     struct ibuf *obuf, uint32_t limit)
+{
+	uint32_t result_len = 0;
+	uint32_t result_len_offset = 4;
+
+	/*
+	 * Reserve maximum size for the array around resulting
+	 * tuples to set it later.
+	 */
+	encode_header(obuf, UINT32_MAX);
+
+	/* Fetch, merge and copy tuples to the buffer. */
+	struct tuple *tuple;
+	int rc = 0;
+	while (result_len < limit && (rc =
+	       source->vtab->next(source, NULL, &tuple)) == 0 &&
+	       tuple != NULL) {
+		uint32_t bsize = tuple->bsize;
+		ibuf_reserve(obuf, bsize);
+		memcpy(obuf->wpos, tuple_data(tuple), bsize);
+		obuf->wpos += bsize;
+		result_len_offset += bsize;
+		++result_len;
+
+		/* The received tuple is not more needed. */
+		box_tuple_unref(tuple);
+	}
+
+	if (rc != 0)
+		return luaT_error(L);
+
+	/* Write the real array size. */
+	mp_store_u32(obuf->wpos - result_len_offset, result_len);
+
+	return 0;
+}
+
+/**
+ * Write source results into a new Lua table.
+ *
+ * It is the helper for lbox_merger_source_select().
+ */
+static int
+create_result_table(struct lua_State *L, struct merger_source *source,
+		    uint32_t limit)
+{
+	/* Create result table. */
+	lua_newtable(L);
+
+	uint32_t cur = 1;
+
+	/* Fetch, merge and save tuples to the table. */
+	struct tuple *tuple;
+	int rc = 0;
+	while (cur - 1 < limit && (rc =
+	       source->vtab->next(source, NULL, &tuple)) == 0 &&
+	       tuple != NULL) {
+		luaT_pushtuple(L, tuple);
+		lua_rawseti(L, -2, cur);
+		++cur;
+
+		/*
+		 * luaT_pushtuple() references the tuple, so we
+		 * unreference it on merger's side.
+		 */
+		box_tuple_unref(tuple);
+	}
+
+	if (rc != 0)
+		return luaT_error(L);
+
+	return 1;
+}
+
+/**
+ * Raise a Lua error with merger_inst:select() usage info.
+ */
+static int
+lbox_merger_source_select_usage(struct lua_State *L, const char *param_name)
+{
+	static const char *usage = "merger_source:select([{"
+				   "buffer = <cdata<struct ibuf>> or <nil>, "
+				   "limit = <number> or <nil>}])";
+	if (param_name == NULL)
+		return luaL_error(L, "Bad params, use: %s", usage);
+	else
+		return luaL_error(L, "Bad param \"%s\", use: %s", param_name,
+				  usage);
+}
+
+/**
+ * Pull results of a merger source to a Lua stack.
+ *
+ * Write results into a buffer or a Lua table depending on
+ * options.
+ *
+ * Expected a merger source and options (optional) on a Lua stack.
+ *
+ * Return a Lua table or nothing when a 'buffer' option is
+ * provided.
+ */
+static int
+lbox_merger_source_select(struct lua_State *L)
+{
+	struct merger_source *source;
+	int top = lua_gettop(L);
+	bool ok = (top == 1 || top == 2) &&
+		/* Merger source. */
+		(source = luaT_check_merger_source(L, 1)) != NULL &&
+		/* Opts. */
+		(lua_isnoneornil(L, 2) == 1 || lua_istable(L, 2) == 1);
+	if (!ok)
+		return lbox_merger_source_select_usage(L, NULL);
+
+	uint32_t limit = 0xFFFFFFFF;
+	struct ibuf *obuf = NULL;
+
+	/* Parse options. */
+	if (!lua_isnoneornil(L, 2)) {
+		/* Parse buffer. */
+		lua_pushstring(L, "buffer");
+		lua_gettable(L, 2);
+		if (!lua_isnil(L, -1)) {
+			if ((obuf = luaT_check_ibuf(L, -1)) == NULL)
+				return lbox_merger_source_select_usage(L,
+					"buffer");
+		}
+		lua_pop(L, 1);
+
+		/* Parse limit. */
+		lua_pushstring(L, "limit");
+		lua_gettable(L, 2);
+		if (!lua_isnil(L, -1)) {
+			if (lua_isnumber(L, -1))
+				limit = lua_tointeger(L, -1);
+			else
+				return lbox_merger_source_select_usage(L,
+					"limit");
+		}
+		lua_pop(L, 1);
+	}
+
+	if (obuf == NULL)
+		return create_result_table(L, source, limit);
+	else
+		return encode_result_buffer(L, source, obuf, limit);
+}
+
+/* }}} */
+
+/**
+ * Register the module.
+ */
+LUA_API int
+luaopen_merger(struct lua_State *L)
+{
+	luaL_cdef(L, "struct merger_source;");
+	luaL_cdef(L, "struct merger_context;");
+	luaL_cdef(L, "struct ibuf;");
+
+	merger_source_type_id = luaL_ctypeid(L, "struct merger_source&");
+	merger_context_type_id = luaL_ctypeid(L, "struct merger_context&");
+	ibuf_type_id = luaL_ctypeid(L, "struct ibuf");
+
+	/* Export C functions to Lua. */
+	static const struct luaL_Reg meta[] = {
+		{"new_buffer_source", lbox_merger_new_buffer_source},
+		{"new_table_source", lbox_merger_new_table_source},
+		{"new_tuple_source", lbox_merger_new_tuple_source},
+		{"new", lbox_merger_new},
+		{NULL, NULL}
+	};
+	luaL_register_module(L, "merger", meta);
+
+	/* Add context.new(). */
+	lua_newtable(L); /* merger.context */
+	lua_pushcfunction(L, lbox_merger_context_new);
+	lua_setfield(L, -2, "new");
+	lua_setfield(L, -2, "context");
+
+	/* Add internal.{select,ipairs}(). */
+	lua_newtable(L); /* merger.internal */
+	lua_pushcfunction(L, lbox_merger_source_select);
+	lua_setfield(L, -2, "select");
+	lua_pushcfunction(L, lbox_merger_source_ipairs);
+	lua_setfield(L, -2, "ipairs");
+	lua_setfield(L, -2, "internal");
+
+	return 1;
+}
diff --git a/src/box/lua/merger.h b/src/box/lua/merger.h
new file mode 100644
index 000000000..c3f648678
--- /dev/null
+++ b/src/box/lua/merger.h
@@ -0,0 +1,47 @@
+#ifndef TARANTOOL_BOX_LUA_MERGER_H_INCLUDED
+#define TARANTOOL_BOX_LUA_MERGER_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct lua_State;
+
+int
+luaopen_merger(struct lua_State *L);
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_BOX_LUA_MERGER_H_INCLUDED */
diff --git a/src/box/lua/merger.lua b/src/box/lua/merger.lua
new file mode 100644
index 000000000..9bf7cc019
--- /dev/null
+++ b/src/box/lua/merger.lua
@@ -0,0 +1,41 @@
+local ffi = require('ffi')
+local fun = require('fun')
+local merger = require('merger')
+
+local ibuf_t = ffi.typeof('struct ibuf')
+local merger_source_t = ffi.typeof('struct merger_source')
+
+-- Create a source from one buffer.
+merger.new_source_frombuffer = function(buf)
+    local func_name = 'merger.new_source_frombuffer'
+    if type(buf) ~= 'cdata' or not ffi.istype(ibuf_t, buf) then
+        error(('Usage: %s(<cdata<struct ibuf>>)'):format(func_name), 0)
+    end
+
+    return merger.new_buffer_source(fun.iter({buf}))
+end
+
+-- Create a source from one table.
+merger.new_source_fromtable = function(tbl)
+    local func_name = 'merger.new_source_fromtable'
+    if type(tbl) ~= 'table' then
+        error(('Usage: %s(<table>)'):format(func_name), 0)
+    end
+
+    return merger.new_table_source(fun.iter({tbl}))
+end
+
+local methods = {
+    ['select'] = merger.internal.select,
+    ['pairs']  = merger.internal.ipairs,
+    ['ipairs']  = merger.internal.ipairs,
+}
+
+ffi.metatype(merger_source_t, {
+    __index = function(self, key)
+        return methods[key]
+    end,
+    -- Lua 5.2 compatibility
+    __pairs = merger.internal.ipairs,
+    __ipairs = merger.internal.ipairs,
+})
diff --git a/test/box-tap/merger.test.lua b/test/box-tap/merger.test.lua
new file mode 100755
index 000000000..1412c6126
--- /dev/null
+++ b/test/box-tap/merger.test.lua
@@ -0,0 +1,725 @@
+#!/usr/bin/env tarantool
+
+local tap = require('tap')
+local buffer = require('buffer')
+local msgpackffi = require('msgpackffi')
+local digest = require('digest')
+local key_def = require('key_def')
+local merger = require('merger')
+local fiber = require('fiber')
+local utf8 = require('utf8')
+local ffi = require('ffi')
+local fun = require('fun')
+
+-- A chunk size for table and buffer sources. A chunk size for
+-- tuple source is always 1.
+local FETCH_BLOCK_SIZE = 10
+
+local function merger_new_usage(param)
+    local msg = 'merger.new(merger_context, ' ..
+        '{source, source, ...}[, {' ..
+        'reverse = <boolean> or <nil>}])'
+    if not param then
+        return ('Bad params, use: %s'):format(msg)
+    else
+        return ('Bad param "%s", use: %s'):format(param, msg)
+    end
+end
+
+local function merger_select_usage(param)
+    local msg = 'merger_source:select([{' ..
+                'buffer = <cdata<struct ibuf>> or <nil>, ' ..
+                'limit = <number> or <nil>}])'
+    if not param then
+        return ('Bad params, use: %s'):format(msg)
+    else
+        return ('Bad param "%s", use: %s'):format(param, msg)
+    end
+end
+
+-- Get buffer with data encoded without last 'trunc' bytes.
+local function truncated_msgpack_buffer(data, trunc)
+    local data = msgpackffi.encode(data)
+    data = data:sub(1, data:len() - trunc)
+    local len = data:len()
+    local buf = buffer.ibuf()
+    -- Ensure we have enough buffer to write len + trunc bytes.
+    buf:reserve(len + trunc)
+    local p = buf:alloc(len)
+    -- Ensure len bytes follows with trunc zero bytes.
+    ffi.copy(p, data .. string.rep('\0', trunc), len + trunc)
+    return buf
+end
+
+local function truncated_msgpack_source(data, trunc)
+    local buf = truncated_msgpack_buffer(data, trunc)
+    return merger.new_source_frombuffer(buf)
+end
+
+local bad_source_new_calls = {
+    {
+        'Bad fetch iterator',
+        funcs = {'new_buffer_source', 'new_table_source',
+                 'new_tuple_source'},
+        params = {1},
+        exp_err = '^Usage: merger%.[a-z_]+%(gen, param, state%)$',
+    },
+    {
+        'Bad chunk type',
+        funcs = {'new_source_frombuffer', 'new_source_fromtable'},
+        params = {1},
+        exp_err = '^Usage: merger%.[a-z_]+%(<.+>%)$',
+    },
+    {
+        'Bad buffer chunk',
+        funcs = {'new_source_frombuffer'},
+        params = {ffi.new('char *')},
+        exp_err = '^Usage: merger%.[a-z_]+%(<cdata<struct ibuf>>%)$',
+    },
+}
+
+local bad_merger_new_calls = {
+    {
+        'Bad opts',
+        sources = {},
+        opts = 1,
+        exp_err = merger_new_usage(nil),
+    },
+    {
+        'Bad opts.reverse',
+        sources = {},
+        opts = {reverse = 1},
+        exp_err = merger_new_usage('reverse'),
+    },
+}
+
+local bad_merger_select_calls = {
+    {
+        'Wrong source of table type',
+        sources = {merger.new_source_fromtable({1})},
+        opts = nil,
+        exp_err = 'A tuple or a table expected, got number',
+    },
+    {
+        'Bad msgpack source: wrong length of the tuples array',
+        -- Remove the last tuple from msgpack data, but keep old
+        -- tuples array size.
+        sources = {
+            truncated_msgpack_source({{''}, {''}, {''}}, 2),
+        },
+        opts = {},
+        exp_err = 'Unexpected msgpack buffer end',
+    },
+    {
+        'Bad msgpack source: wrong length of a tuple',
+        -- Remove half of the last tuple, but keep old tuple size.
+        sources = {
+            truncated_msgpack_source({{''}, {''}, {''}}, 1),
+        },
+        opts = {},
+        exp_err = 'Unexpected msgpack buffer end',
+    },
+    {
+        'Bad opts.buffer (wrong type)',
+        sources = {},
+        opts = {buffer = 1},
+        exp_err = merger_select_usage('buffer'),
+    },
+    {
+        'Bad opts.buffer (wrong cdata type)',
+        sources = {},
+        opts = {buffer = ffi.new('char *')},
+        exp_err = merger_select_usage('buffer'),
+    },
+    {
+        'Bad opts.limit (wrong type)',
+        sources = {},
+        opts = {limit = 'hello'},
+        exp_err = merger_select_usage('limit'),
+    }
+}
+
+local schemas = {
+    {
+        name = 'small_unsigned',
+        parts = {
+            {
+                fieldno = 2,
+                type = 'unsigned',
+            }
+        },
+        gen_tuple = function(tupleno)
+            return {'id_' .. tostring(tupleno), tupleno}
+        end,
+    },
+    -- Test with N-1 equal parts and Nth different.
+    {
+        name = 'many_parts',
+        parts = (function()
+            local parts = {}
+            for i = 1, 16 do
+                parts[i] = {
+                    fieldno = i,
+                    type = 'unsigned',
+                }
+            end
+            return parts
+        end)(),
+        gen_tuple = function(tupleno)
+            local tuple = {}
+            -- 15 constant parts
+            for i = 1, 15 do
+                tuple[i] = i
+            end
+            -- 16th part is varying
+            tuple[16] = tupleno
+            return tuple
+        end,
+        -- reduce tuples count to decrease test run time
+        tuples_cnt = 16,
+    },
+    -- Test null value in nullable field of an index.
+    {
+        name = 'nullable',
+        parts = {
+            {
+                fieldno = 1,
+                type = 'unsigned',
+            },
+            {
+                fieldno = 2,
+                type = 'string',
+                is_nullable = true,
+            },
+        },
+        gen_tuple = function(i)
+            if i % 1 == 1 then
+                return {0, tostring(i)}
+            else
+                return {0, box.NULL}
+            end
+        end,
+    },
+    -- Test index part with 'collation_id' option (as in net.box's
+    -- response).
+    {
+        name = 'collation_id',
+        parts = {
+            {
+                fieldno = 1,
+                type = 'string',
+                collation_id = 2, -- unicode_ci
+            },
+        },
+        gen_tuple = function(i)
+            local letters = {'a', 'b', 'c', 'A', 'B', 'C'}
+            if i <= #letters then
+                return {letters[i]}
+            else
+                return {''}
+            end
+        end,
+    },
+    -- Test index part with 'collation' option (as in local index
+    -- parts).
+    {
+        name = 'collation',
+        parts = {
+            {
+                fieldno = 1,
+                type = 'string',
+                collation = 'unicode_ci',
+            },
+        },
+        gen_tuple = function(i)
+            local letters = {'a', 'b', 'c', 'A', 'B', 'C'}
+            if i <= #letters then
+                return {letters[i]}
+            else
+                return {''}
+            end
+        end,
+    },
+}
+
+local function is_unicode_ci_part(part)
+    return part.collation_id == 2 or part.collation == 'unicode_ci'
+end
+
+local function tuple_comparator(a, b, parts)
+    for _, part in ipairs(parts) do
+        local fieldno = part.fieldno
+        if a[fieldno] ~= b[fieldno] then
+            if a[fieldno] == nil then
+                return -1
+            end
+            if b[fieldno] == nil then
+                return 1
+            end
+            if is_unicode_ci_part(part) then
+                return utf8.casecmp(a[fieldno], b[fieldno])
+            end
+            return a[fieldno] < b[fieldno] and -1 or 1
+        end
+    end
+
+    return 0
+end
+
+local function sort_tuples(tuples, parts, opts)
+    local function tuple_comparator_wrapper(a, b)
+        local cmp = tuple_comparator(a, b, parts)
+        if cmp < 0 then
+            return not opts.reverse
+        elseif cmp > 0 then
+            return opts.reverse
+        else
+            return false
+        end
+    end
+
+    table.sort(tuples, tuple_comparator_wrapper)
+end
+
+local function lowercase_unicode_ci_fields(tuples, parts)
+    for i = 1, #tuples do
+        local tuple = tuples[i]
+        for _, part in ipairs(parts) do
+            if is_unicode_ci_part(part) then
+                -- Workaround #3709.
+                if tuple[part.fieldno]:len() > 0 then
+                    tuple[part.fieldno] = utf8.lower(tuple[part.fieldno])
+                end
+            end
+        end
+    end
+end
+
+local function fetch_source_gen(param, state)
+    local input_type = param.input_type
+    local tuples = param.tuples
+    local last_pos = state.last_pos
+    local fetch_block_size = FETCH_BLOCK_SIZE
+    -- A chunk size is always 1 for a tuple source.
+    if input_type == 'tuple' then
+        fetch_block_size = 1
+    end
+    local data = fun.iter(tuples):drop(last_pos):take(
+        fetch_block_size):totable()
+    if #data == 0 then
+        return
+    end
+    local new_state = {last_pos = last_pos + #data}
+    if input_type == 'table' then
+        return new_state, data
+    elseif input_type == 'buffer' then
+        local buf = buffer.ibuf()
+        msgpackffi.internal.encode_r(buf, data, 0)
+        return new_state, buf
+    elseif input_type == 'tuple' then
+        assert(#data <= 1)
+        if #data == 0 then return end
+        return new_state, data[1]
+    else
+        assert(false)
+    end
+end
+
+local function fetch_source_iterator(input_type, tuples)
+    local param = {
+        input_type = input_type,
+        tuples = tuples,
+    }
+    local state = {
+        last_pos = 0,
+    }
+    return fetch_source_gen, param, state
+end
+
+local function prepare_data(schema, tuples_cnt, sources_cnt, opts)
+    local opts = opts or {}
+    local input_type = opts.input_type
+    local use_table_as_tuple = opts.use_table_as_tuple
+    local use_fetch_source = opts.use_fetch_source
+
+    local tuples = {}
+    local exp_result = {}
+
+    -- Ensure empty sources are empty table and not nil.
+    for i = 1, sources_cnt do
+        if tuples[i] == nil then
+            tuples[i] = {}
+        end
+    end
+
+    -- Prepare N tables with tuples as input for merger.
+    for i = 1, tuples_cnt do
+        -- [1, sources_cnt]
+        local guava = digest.guava(i, sources_cnt) + 1
+        local tuple = schema.gen_tuple(i)
+        table.insert(exp_result, tuple)
+        if not use_table_as_tuple then
+            assert(input_type ~= 'buffer')
+            tuple = box.tuple.new(tuple)
+        end
+        table.insert(tuples[guava], tuple)
+    end
+
+    -- Sort tuples within each source.
+    for _, source_tuples in pairs(tuples) do
+        sort_tuples(source_tuples, schema.parts, opts)
+    end
+
+    -- Sort expected result.
+    sort_tuples(exp_result, schema.parts, opts)
+
+    -- Fill sources.
+    local sources
+    if use_fetch_source then
+        sources = {}
+        for i = 1, sources_cnt do
+            local func = ('new_%s_source'):format(input_type)
+            sources[i] = merger[func](fetch_source_iterator(input_type,
+                tuples[i]))
+        end
+    elseif input_type == 'table' then
+        -- Imitate netbox's select w/o {buffer = ...}.
+        sources = {}
+        for i = 1, sources_cnt do
+            sources[i] = merger.new_source_fromtable(tuples[i])
+        end
+    elseif input_type == 'buffer' then
+        -- Imitate netbox's select with {buffer = ...}.
+        sources = {}
+        for i = 1, sources_cnt do
+            local buf = buffer.ibuf()
+            sources[i] = merger.new_source_frombuffer(buf)
+            msgpackffi.internal.encode_r(buf, tuples[i], 0)
+        end
+    elseif input_type == 'tuple' then
+        assert(false)
+    else
+        assert(false)
+    end
+
+    return sources, exp_result
+end
+
+local function test_case_opts_str(opts)
+    local params = {}
+
+    if opts.input_type then
+        table.insert(params, 'input_type: ' .. opts.input_type)
+    end
+
+    if opts.output_type then
+        table.insert(params, 'output_type: ' .. opts.output_type)
+    end
+
+    if opts.reverse then
+        table.insert(params, 'reverse')
+    end
+
+    if opts.use_table_as_tuple then
+        table.insert(params, 'use_table_as_tuple')
+    end
+
+    if opts.use_fetch_source then
+        table.insert(params, 'use_fetch_source')
+    end
+
+    if next(params) == nil then
+        return ''
+    end
+
+    return (' (%s)'):format(table.concat(params, ', '))
+end
+
+local function run_merger(test, schema, tuples_cnt, sources_cnt, opts)
+    fiber.yield()
+
+    local opts = opts or {}
+
+    -- Prepare data.
+    local sources, exp_result = prepare_data(schema, tuples_cnt, sources_cnt,
+                                             opts)
+
+    -- Create a merger instance.
+    local merger_inst = merger.new(schema.merger_context, sources,
+        {reverse = opts.reverse})
+
+    local res
+
+    -- Run merger and prepare output for compare.
+    if opts.output_type == 'table' then
+        -- Table output.
+        res = merger_inst:select()
+    elseif opts.output_type == 'buffer' then
+        -- Buffer output.
+        local obuf = buffer.ibuf()
+        merger_inst:select({buffer = obuf})
+        res = msgpackffi.decode(obuf.rpos)
+    else
+        -- Tuple output.
+        assert(opts.output_type == 'tuple')
+        res = merger_inst:pairs():totable()
+    end
+
+    -- A bit more postprocessing to compare.
+    for i = 1, #res do
+        if type(res[i]) ~= 'table' then
+            res[i] = res[i]:totable()
+        end
+    end
+
+    -- unicode_ci does not differentiate btw 'A' and 'a', so the
+    -- order is arbitrary. We transform fields with unicode_ci
+    -- collation in parts to lower case before comparing.
+    lowercase_unicode_ci_fields(res, schema.parts)
+    lowercase_unicode_ci_fields(exp_result, schema.parts)
+
+    test:is_deeply(res, exp_result,
+        ('check order on %3d tuples in %4d sources%s')
+        :format(tuples_cnt, sources_cnt, test_case_opts_str(opts)))
+end
+
+local function run_case(test, schema, opts)
+    local opts = opts or {}
+
+    local case_name = ('testing on schema %s%s'):format(
+        schema.name, test_case_opts_str(opts))
+    local tuples_cnt = schema.tuples_cnt or 100
+
+    local input_type = opts.input_type
+    local use_table_as_tuple = opts.use_table_as_tuple
+    local use_fetch_source = opts.use_fetch_source
+
+    -- Skip meaningless flags combinations.
+    if input_type == 'buffer' and not use_table_as_tuple then
+        return
+    end
+    if input_type == 'tuple' and not use_fetch_source then
+        return
+    end
+
+    test:test(case_name, function(test)
+        test:plan(4)
+
+        -- Check with small buffers count.
+        run_merger(test, schema, tuples_cnt, 1, opts)
+        run_merger(test, schema, tuples_cnt, 2, opts)
+        run_merger(test, schema, tuples_cnt, 5, opts)
+
+        -- Check more buffers then tuples count.
+        run_merger(test, schema, tuples_cnt, 128, opts)
+    end)
+end
+
+local test = tap.test('merger')
+test:plan(#bad_source_new_calls + #bad_merger_new_calls +
+    #bad_merger_select_calls + 6 + #schemas * 48)
+
+-- For collations.
+box.cfg{}
+
+for _, case in ipairs(bad_source_new_calls) do
+    test:test(case[1], function(test)
+        local funcs = case.funcs
+        test:plan(#funcs)
+        for _, func in ipairs(funcs) do
+            local ok, err = pcall(merger[func], unpack(case.params))
+            test:ok(ok == false and err:match(case.exp_err), func)
+        end
+    end)
+end
+
+-- Create the merger context for the test cases below.
+local ctx = merger.context.new(key_def.new({{
+    fieldno = 1,
+    type = 'string',
+}}))
+
+-- Bad merger.new() calls.
+for _, case in ipairs(bad_merger_new_calls) do
+    local ok, err = pcall(merger.new, ctx, case.sources, case.opts)
+    err = tostring(err) -- cdata -> string
+    test:is_deeply({ok, err}, {false, case.exp_err}, case[1])
+end
+
+-- Bad source or/and opts parameters for merger's methods.
+for _, case in ipairs(bad_merger_select_calls) do
+    local merger_inst = merger.new(ctx, case.sources)
+    local ok, err = pcall(merger_inst.select, merger_inst, case.opts)
+    err = tostring(err) -- cdata -> string
+    test:is_deeply({ok, err}, {false, case.exp_err}, case[1])
+end
+
+-- Create a merger context for each schema.
+for _, schema in ipairs(schemas) do
+    schema.merger_context = merger.context.new(key_def.new(schema.parts))
+end
+
+test:test('use a source in two mergers', function(test)
+    test:plan(5)
+
+    local data = {{'a'}, {'b'}, {'c'}}
+    local source = merger.new_source_fromtable(data)
+    local i1 = merger.new(ctx, {source}):pairs()
+    local i2 = merger.new(ctx, {source}):pairs()
+
+    local t1 = i1:head():totable()
+    test:is_deeply(t1, data[1], 'tuple 1 from merger 1')
+
+    local t3 = i2:head():totable()
+    test:is_deeply(t3, data[3], 'tuple 3 from merger 2')
+
+    local t2 = i1:head():totable()
+    test:is_deeply(t2, data[2], 'tuple 2 from merger 1')
+
+    test:ok(i1:is_null(), 'merger 1 ends')
+    test:ok(i2:is_null(), 'merger 2 ends')
+end)
+
+local function reusable_source_gen(param)
+    local chunks = param.chunks
+    local idx = param.idx or 1
+
+    if idx > table.maxn(chunks) then
+        return
+    end
+
+    local chunk = chunks[idx]
+    param.idx = idx + 1
+
+    if chunk == nil then
+        return
+    end
+    return box.NULL, chunk
+end
+
+local function verify_reusable_source(test, source)
+    test:plan(3)
+
+    local exp = {{1}, {2}}
+    local res = source:pairs():map(box.tuple.totable):totable()
+    test:is_deeply(res, exp, '1st use')
+
+    local exp = {{3}, {4}, {5}}
+    local res = source:pairs():map(box.tuple.totable):totable()
+    test:is_deeply(res, exp, '2nd use')
+
+    local exp = {}
+    local res = source:pairs():map(box.tuple.totable):totable()
+    test:is_deeply(res, exp, 'end')
+end
+
+test:test('reuse a tuple source', function(test)
+    local tuples = {{1}, {2}, nil, {3}, {4}, {5}}
+    local source = merger.new_tuple_source(reusable_source_gen,
+        {chunks = tuples})
+    verify_reusable_source(test, source)
+end)
+
+test:test('reuse a table source', function(test)
+    local chunks = {{{1}}, {{2}}, {}, nil, {{3}}, {{4}}, {}, {{5}}}
+    local source = merger.new_table_source(reusable_source_gen,
+        {chunks = chunks})
+    verify_reusable_source(test, source)
+end)
+
+test:test('reuse a buffer source', function(test)
+    local chunks_tbl = {{{1}}, {{2}}, {}, nil, {{3}}, {{4}}, {}, {{5}}}
+    local chunks = {}
+    for i = 1, table.maxn(chunks_tbl) do
+        if chunks_tbl[i] == nil then
+            chunks[i] = nil
+        else
+            chunks[i] = buffer.ibuf()
+            msgpackffi.internal.encode_r(chunks[i], chunks_tbl[i], 0)
+        end
+    end
+    local source = merger.new_buffer_source(reusable_source_gen,
+        {chunks = chunks})
+    verify_reusable_source(test, source)
+end)
+
+test:test('use limit', function(test)
+    test:plan(6)
+
+    local data = {{'a'}, {'b'}}
+
+    local source = merger.new_source_fromtable(data)
+    local m = merger.new(ctx, {source})
+    local res = m:select({limit = 0})
+    test:is(#res, 0, 'table output with limit 0')
+
+    local source = merger.new_source_fromtable(data)
+    local m = merger.new(ctx, {source})
+    local res = m:select({limit = 1})
+    test:is(#res, 1, 'table output with limit 1')
+    test:is_deeply(res[1]:totable(), data[1], 'tuple content')
+
+    local source = merger.new_source_fromtable(data)
+    local m = merger.new(ctx, {source})
+    local obuf = buffer.ibuf()
+    m:select({buffer = obuf, limit = 0})
+    local res = msgpackffi.decode(obuf.rpos)
+    test:is(#res, 0, 'buffer output with limit 0')
+
+    local source = merger.new_source_fromtable(data)
+    local m = merger.new(ctx, {source})
+    obuf:recycle()
+    m:select({buffer = obuf, limit = 1})
+    local res = msgpackffi.decode(obuf.rpos)
+    test:is(#res, 1, 'buffer output with limit 1')
+    test:is_deeply(res[1], data[1], 'tuple content')
+end)
+
+test:test('cascade mergers', function(test)
+    test:plan(2)
+
+    local data = {{'a'}, {'b'}}
+
+    local source = merger.new_source_fromtable(data)
+    local m1 = merger.new(ctx, {source})
+    local m2 = merger.new(ctx, {m1})
+
+    local res = m2:pairs():map(box.tuple.totable):totable()
+    test:is_deeply(res, data, 'same context')
+
+    local ctx_unicode = merger.context.new(key_def.new({{
+        fieldno = 1,
+        type = 'string',
+        collation = 'unicode',
+    }}))
+
+    local source = merger.new_source_fromtable(data)
+    local m1 = merger.new(ctx, {source})
+    local m2 = merger.new(ctx_unicode, {m1})
+
+    local res = m2:pairs():map(box.tuple.totable):totable()
+    test:is_deeply(res, data, 'different contexts')
+end)
+
+-- Merging cases.
+for _, input_type in ipairs({'buffer', 'table', 'tuple'}) do
+    for _, output_type in ipairs({'buffer', 'table', 'tuple'}) do
+        for _, reverse in ipairs({false, true}) do
+            for _, use_table_as_tuple in ipairs({false, true}) do
+                for _, use_fetch_source in ipairs({false, true}) do
+                    for _, schema in ipairs(schemas) do
+                        run_case(test, schema, {
+                            input_type = input_type,
+                            output_type = output_type,
+                            reverse = reverse,
+                            use_table_as_tuple = use_table_as_tuple,
+                            use_fetch_source = use_fetch_source,
+                        })
+                    end
+                end
+            end
+        end
+    end
+end
+
+os.exit(test:check() and 0 or 1)
-- 
2.20.1




More information about the Tarantool-patches mailing list