Tarantool development patches archive
 help / color / mirror / Atom feed
* [PATCH 0/3] Merger
@ 2018-12-16 20:17 Alexander Turenko
  2018-12-16 20:17 ` [PATCH 1/3] Add luaT_iscallable with support of cdata metatype Alexander Turenko
                   ` (4 more replies)
  0 siblings, 5 replies; 14+ messages in thread
From: Alexander Turenko @ 2018-12-16 20:17 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches

https://github.com/tarantool/tarantool/issues/3276
On top of 2.1: https://github.com/tarantool/tarantool/tree/Totktonada/gh-3276-on-board-merger
On top of 1.10: https://github.com/tarantool/tarantool/tree/Totktonada/gh-3276-on-board-merger-1.10

The emails below consist content for the 2.1 branch.

This is adaptation of [shard's merger][1] to be the built-in tarantool
module. The main reason of have this module built-in is to create
key_defs with nullability and collation support (which is not availiable
in the external API). The second reason is vague plan to have some more
general map-reduce support built-in in the future.

Most of changes made to the original merger are splitted into the
following categories:

* Support buffer/table/iterator input/output to allow merger's output
  being input for an another merger.
* Change API to net.box-like one (:select(), :pairs()) and change index
  parts format to be the same as for local spaces and as for net.box
  spaces.
* Support 'chain' format for input / output buffers to handle the case
  when tuples are part of the wrapping array, see comment in merger.c re
  chaining mergers and the [real usage][2].
* Split merger and merger_iterator to allow yield during merge process
  and use the same merger instance in an another merge.
* Document, test and comment the code.

Consider usage information at top of merger.c file to get the idea what
the module is.

[1]: https://github.com/tarantool/shard/blob/180948e99148973e89f75f8e4784315e183e3fa2/shard/driver.c
[2]: https://github.com/tarantool/graphql/pull/258

Alexander Turenko (3):
  Add luaT_iscallable with support of cdata metatype
  Add module to ease using Lua iterators from C
  Add merger for tuple streams

 extra/exports                    |    1 +
 src/CMakeLists.txt               |    2 +
 src/lua/init.c                   |    5 +
 src/lua/lua_iterator.h           |  117 +++
 src/lua/merger.c                 | 1643 ++++++++++++++++++++++++++++++
 src/lua/merger.h                 |   39 +
 src/lua/merger.lua               |   19 +
 src/lua/utils.c                  |   43 +
 src/lua/utils.h                  |   10 +
 test/app-tap/merger.test.lua     |  693 +++++++++++++
 test/app-tap/module_api.c        |   10 +
 test/app-tap/module_api.test.lua |   85 +-
 test/app-tap/suite.ini           |    1 +
 13 files changed, 2666 insertions(+), 2 deletions(-)
 create mode 100644 src/lua/lua_iterator.h
 create mode 100644 src/lua/merger.c
 create mode 100644 src/lua/merger.h
 create mode 100644 src/lua/merger.lua
 create mode 100755 test/app-tap/merger.test.lua

-- 
2.19.2

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [PATCH 1/3] Add luaT_iscallable with support of cdata metatype
  2018-12-16 20:17 [PATCH 0/3] Merger Alexander Turenko
@ 2018-12-16 20:17 ` Alexander Turenko
  2018-12-26 18:35   ` Vladimir Davydov
  2018-12-16 20:17 ` [PATCH 2/3] Add module to ease using Lua iterators from C Alexander Turenko
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 14+ messages in thread
From: Alexander Turenko @ 2018-12-16 20:17 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches

Needed for #3276.
---
 extra/exports                    |  1 +
 src/lua/utils.c                  | 43 ++++++++++++++++
 src/lua/utils.h                  | 10 ++++
 test/app-tap/module_api.c        | 10 ++++
 test/app-tap/module_api.test.lua | 85 +++++++++++++++++++++++++++++++-
 5 files changed, 147 insertions(+), 2 deletions(-)

diff --git a/extra/exports b/extra/exports
index 5f69e0730..52f0b2378 100644
--- a/extra/exports
+++ b/extra/exports
@@ -134,6 +134,7 @@ luaT_call
 luaT_cpcall
 luaT_state
 luaT_tolstring
+luaT_iscallable
 box_txn
 box_txn_begin
 box_txn_commit
diff --git a/src/lua/utils.c b/src/lua/utils.c
index 978fe61f1..7a6069fbb 100644
--- a/src/lua/utils.c
+++ b/src/lua/utils.c
@@ -920,6 +920,49 @@ luaT_tolstring(lua_State *L, int idx, size_t *len)
 	return lua_tolstring(L, -1, len);
 }
 
+/* Based on ffi_meta___call() from luajit/src/lib_ffi.c. */
+static int
+luaT_cdata_iscallable(lua_State *L, int idx)
+{
+	/* Calculate absolute value in the stack. */
+	if (idx < 0)
+		idx = lua_gettop(L) + idx + 1;
+
+	/* Get cdata from the stack. */
+	assert(lua_type(L, idx) == LUA_TCDATA);
+	GCcdata *cd = cdataV(L->base + idx - 1);
+
+	CTState *cts = ctype_cts(L);
+	CTypeID id = cd->ctypeid;
+	CType *ct = ctype_raw(cts, id);
+	if (ctype_isptr(ct->info))
+		id = ctype_cid(ct->info);
+
+	/* Get ctype metamethod. */
+	cTValue *tv = lj_ctype_meta(cts, id, MM_call);
+
+	return tv != NULL;
+}
+
+int
+luaT_iscallable(lua_State *L, int idx)
+{
+	/* Whether it is function. */
+	int res = lua_isfunction(L, idx);
+	if (res == 1)
+		return 1;
+
+	/* Whether it is cdata with metatype with __call field. */
+	if (lua_type(L, idx) == LUA_TCDATA)
+		return luaT_cdata_iscallable(L, idx);
+
+	/* Whether it has metatable with __call field. */
+	res = luaL_getmetafield(L, idx, "__call");
+	if (res == 1)
+		lua_pop(L, 1); /* Pop __call value. */
+	return res;
+}
+
 lua_State *
 luaT_state(void)
 {
diff --git a/src/lua/utils.h b/src/lua/utils.h
index a47e3d2b4..b62327217 100644
--- a/src/lua/utils.h
+++ b/src/lua/utils.h
@@ -438,6 +438,16 @@ luaT_state(void);
 LUA_API const char *
 luaT_tolstring(lua_State *L, int idx, size_t *ssize);
 
+/**
+ * Check whether a Lua object is a function or has
+ * metatable/metatype with a __call field.
+ *
+ * Note: It does not check type of __call metatable/metatype
+ * field.
+ */
+LUA_API int
+luaT_iscallable(lua_State *L, int idx);
+
 /** \endcond public */
 
 /**
diff --git a/test/app-tap/module_api.c b/test/app-tap/module_api.c
index 4abe1af48..ef0c292e8 100644
--- a/test/app-tap/module_api.c
+++ b/test/app-tap/module_api.c
@@ -440,6 +440,15 @@ test_tostring(lua_State *L)
 	return 1;
 }
 
+static int
+test_iscallable(lua_State *L)
+{
+	int exp = lua_toboolean(L, 2);
+	int res = luaT_iscallable(L, 1);
+	lua_pushboolean(L, res == exp);
+	return 1;
+}
+
 LUA_API int
 luaopen_module_api(lua_State *L)
 {
@@ -467,6 +476,7 @@ luaopen_module_api(lua_State *L)
 		{"test_cpcall", test_cpcall},
 		{"test_state", test_state},
 		{"test_tostring", test_tostring},
+		{"iscallable", test_iscallable},
 		{NULL, NULL}
 	};
 	luaL_register(L, "module_api", lib);
diff --git a/test/app-tap/module_api.test.lua b/test/app-tap/module_api.test.lua
index f93257236..a6658cc61 100755
--- a/test/app-tap/module_api.test.lua
+++ b/test/app-tap/module_api.test.lua
@@ -3,7 +3,9 @@
 local fio = require('fio')
 
 box.cfg{log = "tarantool.log"}
-build_path = os.getenv("BUILDDIR")
+-- Use BUILDDIR passed from test-run or cwd when run w/o
+-- test-run to find test/app-tap/module_api.{so,dylib}.
+build_path = os.getenv("BUILDDIR") or '.'
 package.cpath = fio.pathjoin(build_path, 'test/app-tap/?.so'   ) .. ';' ..
                 fio.pathjoin(build_path, 'test/app-tap/?.dylib') .. ';' ..
                 package.cpath
@@ -36,8 +38,86 @@ local function test_pushcdata(test, module)
     test:is(gc_counter, 1, 'pushcdata gc')
 end
 
+local function test_iscallable(test, module)
+    local ffi = require('ffi')
+
+    ffi.cdef([[
+        struct cdata_1 { int foo; };
+        struct cdata_2 { int foo; };
+    ]])
+
+    local cdata_1 = ffi.new('struct cdata_1')
+    local cdata_1_ref = ffi.new('struct cdata_1 &')
+    local cdata_2 = ffi.new('struct cdata_2')
+    local cdata_2_ref = ffi.new('struct cdata_2 &')
+
+    local nop = function() end
+
+    ffi.metatype('struct cdata_2', {
+        __call = nop,
+    })
+
+    local cases = {
+        {
+            obj = nop,
+            exp = true,
+            description = 'function',
+        },
+        {
+            obj = nil,
+            exp = false,
+            description = 'nil',
+        },
+        {
+            obj = 1,
+            exp = false,
+            description = 'number',
+        },
+        {
+            obj = {},
+            exp = false,
+            description = 'table without metatable',
+        },
+        {
+            obj = setmetatable({}, {}),
+            exp = false,
+            description = 'table without __call metatable field',
+        },
+        {
+            obj = setmetatable({}, {__call = nop}),
+            exp = true,
+            description = 'table with __call metatable field'
+        },
+        {
+            obj = cdata_1,
+            exp = false,
+            description = 'cdata without __call metatable field',
+        },
+        {
+            obj = cdata_1_ref,
+            exp = false,
+            description = 'cdata reference without __call metatable field',
+        },
+        {
+            obj = cdata_2,
+            exp = true,
+            description = 'cdata with __call metatable field',
+        },
+        {
+            obj = cdata_2_ref,
+            exp = true,
+            description = 'cdata reference with __call metatable field',
+        },
+    }
+
+    test:plan(#cases)
+    for _, case in ipairs(cases) do
+        test:ok(module.iscallable(case.obj, case.exp), case.description)
+    end
+end
+
 local test = require('tap').test("module_api", function(test)
-    test:plan(23)
+    test:plan(24)
     local status, module = pcall(require, 'module_api')
     test:is(status, true, "module")
     test:ok(status, "module is loaded")
@@ -62,6 +142,7 @@ local test = require('tap').test("module_api", function(test)
     test:like(msg, 'luaT_error', 'luaT_error')
 
     test:test("pushcdata", test_pushcdata, module)
+    test:test("iscallable", test_iscallable, module)
 
     space:drop()
 end)
-- 
2.19.2

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [PATCH 2/3] Add module to ease using Lua iterators from C
  2018-12-16 20:17 [PATCH 0/3] Merger Alexander Turenko
  2018-12-16 20:17 ` [PATCH 1/3] Add luaT_iscallable with support of cdata metatype Alexander Turenko
@ 2018-12-16 20:17 ` Alexander Turenko
  2018-12-26 18:45   ` Vladimir Davydov
  2018-12-16 20:17 ` [PATCH 3/3] Add merger for tuple streams Alexander Turenko
                   ` (2 subsequent siblings)
  4 siblings, 1 reply; 14+ messages in thread
From: Alexander Turenko @ 2018-12-16 20:17 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches

Needed for #3276.
---
 src/lua/lua_iterator.h | 117 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 117 insertions(+)
 create mode 100644 src/lua/lua_iterator.h

diff --git a/src/lua/lua_iterator.h b/src/lua/lua_iterator.h
new file mode 100644
index 000000000..3e1a88c93
--- /dev/null
+++ b/src/lua/lua_iterator.h
@@ -0,0 +1,117 @@
+#ifndef TARANTOOL_LUA_LUA_ITERATOR_H_INCLUDED
+#define TARANTOOL_LUA_LUA_ITERATOR_H_INCLUDED 1
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+/**
+ * This module contains helper functions to interact with a Lua
+ * iterator from C.
+ */
+
+#include <lua.h>
+
+/**
+ * Holds iterator state (references to Lua objects).
+ */
+struct lua_iterator {
+	int gen;
+	int param;
+	int state;
+};
+
+/**
+ * Create a Lua iterator from {gen, param, state}.
+ */
+struct lua_iterator *
+lua_iterator_new_fromtable(lua_State *L, int idx)
+{
+	struct lua_iterator *it = (struct lua_iterator *) malloc(
+		sizeof(struct lua_iterator));
+
+	lua_rawgeti(L, idx, 1); /* Popped by luaL_ref(). */
+	it->gen = luaL_ref(L, LUA_REGISTRYINDEX);
+	lua_rawgeti(L, idx, 2); /* Popped by luaL_ref(). */
+	it->param = luaL_ref(L, LUA_REGISTRYINDEX);
+	lua_rawgeti(L, idx, 3); /* Popped by luaL_ref(). */
+	it->state = luaL_ref(L, LUA_REGISTRYINDEX);
+
+	return it;
+}
+
+/**
+ * Move iterator to the next value. Push values returned by
+ * gen(param, state) and return its count. Zero means no more
+ * results available.
+ */
+int
+lua_iterator_next(lua_State *L, struct lua_iterator *it)
+{
+	int frame_start = lua_gettop(L);
+
+	/* Call gen(param, state). */
+	lua_rawgeti(L, LUA_REGISTRYINDEX, it->gen);
+	lua_rawgeti(L, LUA_REGISTRYINDEX, it->param);
+	lua_rawgeti(L, LUA_REGISTRYINDEX, it->state);
+	lua_call(L, 2, LUA_MULTRET);
+	int nresults = lua_gettop(L) - frame_start;
+	if (nresults == 0) {
+		luaL_error(L, "lua_iterator_next: gen(param, state) must "
+			      "return at least one result");
+		unreachable();
+		return 0;
+	}
+
+	/* The call above returns nil as the first result. */
+	if (lua_isnil(L, frame_start + 1)) {
+		lua_settop(L, frame_start);
+		return 0;
+	}
+
+	/* Save the first result to it->state. */
+	luaL_unref(L, LUA_REGISTRYINDEX, it->state);
+	lua_pushvalue(L, frame_start + 1); /* Popped by luaL_ref(). */
+	it->state = luaL_ref(L, LUA_REGISTRYINDEX);
+
+	return nresults;
+}
+
+/**
+ * Free all resources hold by the iterator.
+ */
+void lua_iterator_free(lua_State *L, struct lua_iterator *it)
+{
+	luaL_unref(L, LUA_REGISTRYINDEX, it->gen);
+	luaL_unref(L, LUA_REGISTRYINDEX, it->param);
+	luaL_unref(L, LUA_REGISTRYINDEX, it->state);
+	free(it);
+}
+
+#endif /* TARANTOOL_LUA_LUA_ITERATOR_H_INCLUDED */
-- 
2.19.2

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [PATCH 3/3] Add merger for tuple streams
  2018-12-16 20:17 [PATCH 0/3] Merger Alexander Turenko
  2018-12-16 20:17 ` [PATCH 1/3] Add luaT_iscallable with support of cdata metatype Alexander Turenko
  2018-12-16 20:17 ` [PATCH 2/3] Add module to ease using Lua iterators from C Alexander Turenko
@ 2018-12-16 20:17 ` Alexander Turenko
  2018-12-26 20:11   ` Vladimir Davydov
  2018-12-18 12:16 ` [PATCH 0/3] Merger Alexander Turenko
  2019-03-22 14:24 ` [tarantool-patches] [PATCH 0/3] lua: add key_def lua module Kirill Shcherbatov
  4 siblings, 1 reply; 14+ messages in thread
From: Alexander Turenko @ 2018-12-16 20:17 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches

Fixes #3276.
---
 src/CMakeLists.txt           |    2 +
 src/lua/init.c               |    5 +
 src/lua/merger.c             | 1643 ++++++++++++++++++++++++++++++++++
 src/lua/merger.h             |   39 +
 src/lua/merger.lua           |   19 +
 test/app-tap/merger.test.lua |  693 ++++++++++++++
 test/app-tap/suite.ini       |    1 +
 7 files changed, 2402 insertions(+)
 create mode 100644 src/lua/merger.c
 create mode 100644 src/lua/merger.h
 create mode 100644 src/lua/merger.lua
 create mode 100755 test/app-tap/merger.test.lua

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 04de5ad04..d2a5fc9c1 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -47,6 +47,7 @@ lua_source(lua_sources lua/trigger.lua)
 lua_source(lua_sources lua/table.lua)
 lua_source(lua_sources ../third_party/luafun/fun.lua)
 lua_source(lua_sources lua/httpc.lua)
+lua_source(lua_sources lua/merger.lua)
 lua_source(lua_sources lua/iconv.lua)
 # LuaJIT jit.* library
 lua_source(lua_sources "${CMAKE_BINARY_DIR}/third_party/luajit/src/jit/bc.lua")
@@ -181,6 +182,7 @@ set (server_sources
      lua/fio.c
      lua/crypto.c
      lua/httpc.c
+     lua/merger.c
      lua/utf8.c
      lua/info.c
      ${lua_sources}
diff --git a/src/lua/init.c b/src/lua/init.c
index ca4b47f3a..1eeab02b9 100644
--- a/src/lua/init.c
+++ b/src/lua/init.c
@@ -57,6 +57,7 @@
 #include "lua/pickle.h"
 #include "lua/fio.h"
 #include "lua/httpc.h"
+#include "lua/merger.h"
 #include "lua/utf8.h"
 #include "digest.h"
 #include <small/ibuf.h>
@@ -89,6 +90,7 @@ extern char strict_lua[],
 	errno_lua[],
 	fiber_lua[],
 	httpc_lua[],
+	merger_lua[],
 	log_lua[],
 	uri_lua[],
 	socket_lua[],
@@ -148,6 +150,7 @@ static const char *lua_modules[] = {
 	"internal.trigger", trigger_lua,
 	"pwd", pwd_lua,
 	"http.client", httpc_lua,
+	"merger", merger_lua,
 	"iconv", iconv_lua,
 	/* jit.* library */
 	"jit.vmdef", vmdef_lua,
@@ -452,6 +455,8 @@ tarantool_lua_init(const char *tarantool_bin, int argc, char **argv)
 	tarantool_lua_digest_init(L);
 	luaopen_http_client_driver(L);
 	lua_pop(L, 1);
+	luaopen_merger(L);
+	lua_pop(L, 1);
 	luaopen_msgpack(L);
 	lua_pop(L, 1);
 	luaopen_yaml(L);
diff --git a/src/lua/merger.c b/src/lua/merger.c
new file mode 100644
index 000000000..8caf8d47f
--- /dev/null
+++ b/src/lua/merger.c
@@ -0,0 +1,1643 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+/**
+ * API and basic usage
+ * -------------------
+ *
+ * The following example demonstrates API of the module:
+ *
+ * ```
+ * local net_box = require('net.box')
+ * local buffer = require('buffer')
+ * local merger = require('merger')
+ *
+ * -- The format of key_parts parameter is the same as
+ * -- `{box,conn}.space.<...>.index.<...>.parts` (where conn is
+ * -- net.box connection).
+ * local key_parts = {
+ *     {
+ *         fieldno = <number>,
+ *         type = <string>,
+ *         [ is_nullable = <boolean>, ]
+ *         [ collation_id = <number>, ]
+ *         [ collation = <string>, ]
+ *     },
+ *     ...
+ * }
+ *
+ * -- Create the merger instance.
+ * local merger_inst = merger.new(key_parts)
+ *
+ * -- Optional parameters.
+ * local opts = {
+ *     -- Output buffer, only for merger_inst:select(<...>).
+ *     [ buffer = <buffer>, ]
+ *     -- Ascending (default) or descending result order.
+ *     [ descending = <boolean>, ]
+ *     -- Buffer encoding / decoding options are described below.
+ *     [ decode = 'raw' / 'select' / 'call' / 'chain', ]
+ *     [ encode = 'raw' / 'select' / 'call' / 'chain', ]
+ *     [ encode_chain_len = <number>, ]
+ * }
+ *
+ * -- Prepare buffer source.
+ * local conn = net_box.connect('localhost:3301')
+ * local buf = buffer.ibuf()
+ * conn.space.s:select(nil, {buffer = buf}) -- read to buffer
+ *
+ * -- We have three sources here.
+ * local sources = {
+ *     buf,                   -- buffer source
+ *     box.space.s:select(),  -- table source
+ *     {box.space.s:pairs()}, -- iterator source
+ * }
+ *
+ * -- Read the whole result at once.
+ * local res = merger_inst:select(sources, opts)
+ *
+ * -- Read the result tuple per tuple.
+ * local res = {}
+ * for _, tuple in merger_inst:pairs(sources, opts) do
+ *     -- Some stop merge condition.
+ *     if tuple[1] > MAX_VALUE then break end
+ *     table.insert(res, tuple)
+ * end
+ *
+ * -- The same in the functional style.
+ * local function cond(tuple)
+ *     return tuple[1] <= MAX_VALUE
+ * end
+ * local res = merger_inst:pairs(sources, opts):take(cond):totable()
+ * ```
+ *
+ * The basic case of using merger is when there are M storages and
+ * data are partitioned (sharded) across them. A client want to
+ * fetch the data (tuples stream) from each storage and merge them
+ * into one tuple stream:
+ *
+ * ```
+ * local net_box = require('net.box')
+ * local buffer = require('buffer')
+ * local merger = require('merger')
+ *
+ * -- Prepare M sources.
+ * local connects = {
+ *     net_box.connect('localhost:3301'),
+ *     net_box.connect('localhost:3302'),
+ *     ...
+ *     net_box.connect('localhost:<...>'),
+ * }
+ * local sources = {}
+ * for _, conn in ipairs(connects) do
+ *     local buf = buffer.ibuf()
+ *     conn.space.<...>.index.<...>:select(<...>, {buffer = buf})
+ *     table.insert(sources, buf)
+ * end
+ *
+ * -- See the 'Notes...' section below.
+ * local key_parts = {}
+ * local space = connects[1].space.<...>
+ * local index = space.index.<...>
+ * for _, part in ipairs(index.parts) do
+ *     table.insert(key_parts, part)
+ * end
+ * if not index.unique then
+ *     for _, part in ipairs(space.index[0]) do
+ *         table.insert(key_parts, part)
+ *     end
+ * end
+ *
+ * -- Create the merger instance.
+ * local merger_inst = merger.new(key_parts)
+ *
+ * -- Merge.
+ * local res = merger_inst:select(sources)
+ * ```
+ *
+ * Notes re source sorting and key parts
+ * -------------------------------------
+ *
+ * The merger expects that each source tuples stream is sorted
+ * according to provided key parts and perform a kind of merge
+ * sort (choose minimal / maximal tuple across sources on each
+ * step). Tuples from select() from Tarantool's space are sorted
+ * according to key parts from index that was used. When secondary
+ * non-unique index is used tuples are sorted according to the key
+ * parts of the secondary index and, then, key parts of the
+ * primary index.
+ *
+ * Decoding / encoding buffers
+ * ---------------------------
+ *
+ * A select response has the following structure:
+ * `{[48] = {tuples}}`, while a call response is
+ * `{[48] = {{tuples}}}` (because it should support multiple
+ * return values). A user should specify how merger will
+ * operate on buffers, so merger has `decode` (how to read buffer
+ * sources) and `encode` (how to write to a resulting buffer)
+ * options. These options accept the following values:
+ *
+ * Option value       | Buffer structure
+ * ------------------ | ----------------
+ * 'raw'              | tuples
+ * 'select' (default) | {[48] = {tuples}}
+ * 'call'             | {[48] = {{tuples}}}
+ * 'chain'            | {[48] = {{{tuples, ...}}}}
+ *
+ * tuples is array of tuples. 'raw' and 'chain' options are about chaining
+ * mergers and they are described in the following section.
+ *
+ * How to check buffer data structure myself:
+ *
+ * ```
+ * #!usr/bin/env tarantool
+ *
+ * local net_box = require('net.box')
+ * local buffer = require('buffer')
+ * local ffi = require('ffi')
+ * local msgpack = require('msgpack')
+ * local yaml = require('yaml')
+ *
+ * box.cfg{listen = 3301}
+ * box.once('load_data', function()
+ *     box.schema.user.grant('guest', 'read,write,execute', 'universe')
+ *     box.schema.space.create('s')
+ *     box.space.s:create_index('pk')
+ *     box.space.s:insert({1})
+ *     box.space.s:insert({2})
+ *     box.space.s:insert({3})
+ *     box.space.s:insert({4})
+ * end)
+ *
+ * local function foo()
+ *     return box.space.s:select()
+ * end
+ * _G.foo = foo
+ *
+ * local conn = net_box.connect('localhost:3301')
+ *
+ * local buf = buffer.ibuf()
+ * conn.space.s:select(nil, {buffer = buf})
+ * local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
+ * local buf_lua = msgpack.decode(buf_str)
+ * print('select:\n' .. yaml.encode(buf_lua))
+ *
+ * local buf = buffer.ibuf()
+ * conn:call('foo', nil, {buffer = buf})
+ * local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
+ * local buf_lua = msgpack.decode(buf_str)
+ * print('call:\n' .. yaml.encode(buf_lua))
+ *
+ * os.exit()
+ * ```
+ *
+ * The `decode` option changes decoding algorithm of source
+ * buffers and does nothing for sources of other types. It will be
+ * completely ignored if there are no buffer sources.
+ *
+ * The `encode` option changes encoding algorithm of resulting
+ * buffer. When the option is provided, the `buffer` option should
+ * be provided too. When `encode` is 'chain', the
+ * `encode_chain_len` option is mandatory.
+ *
+ * Chaining mergers
+ * ----------------
+ *
+ * Chaining mergers is needed to process a batch select request,
+ * when one response (buffer) contains several results (tuple
+ * arrays) to merge with another responses of this kind. Reshaping
+ * of such results into separate buffers or lua table would lead
+ * to extra data copies within Lua memory and extra time consuming
+ * msgpack decoding, so the merger supports this case of source
+ * data shape natively.
+ *
+ * When the `decode` option is 'select' (or nil) or 'call' the
+ * merger expects a usual net.box's select / call result in each
+ * of source buffers.
+ *
+ * When the `decode` option is 'chain' or 'raw' the merger expects
+ * an array of results instead of just result. Pass 'chain' for
+ * the first `:select()` (or `:pairs()`) call and 'raw' for the
+ * following ones. It is possible (but not mandatory) to use
+ * different mergers for each result, just reuse the same buffers
+ * for consequent calls.
+ *
+ * ```
+ * -- Storage script
+ * -- --------------
+ *
+ * -- Return N results in a table.
+ * -- Each result is table of tuples.
+ * local function batch_select(<...>)
+ *     local res = {}
+ *     for i = 1, N do
+ *         local tuples = box.space.<...>:select(<...>)
+ *         table.insert(res, tuples)
+ *     end
+ *     return res
+ * end
+ *
+ * -- Expose to call it using net.box.
+ * _G.batch_select = batch_select
+ *
+ * -- Client script
+ * -- -------------
+ *
+ * local net_box = require('net.box')
+ * local buffer = require('buffer')
+ * local merger = require('merger')
+ *
+ * -- Prepare M sources.
+ * local connects = <...>
+ * local sources = {}
+ * for _, conn in ipairs(connects) do
+ *     local buf = buffer.ibuf()
+ *     conn:call('batch_select', <...>, {buffer = buf})
+ *     table.insert(sources, buf)
+ * end
+ *
+ * -- Now we have M sources and each have N results. We want to
+ * -- merge all 1st results, all 2nd results, ..., all Nth
+ * -- results.
+ *
+ * local merger_inst = merger.new(<...>)
+ *
+ * local res = {}
+ * for i = 1, N do
+ *     -- We use the same merger instance for each merge, but it
+ *     -- is possible to use different ones.
+ *     local tuples = merger_inst:select(sources, {
+ *         decode = i == 1 and 'chain' or 'raw',
+ *     })
+ *     table.insert(res, tuples)
+ * end
+ * ```
+ *
+ * When `buffer` option is passed it is possible to write results
+ * of several consequent merges into that buffer in the format
+ * another merger can accept (see cascading mergers below for the
+ * idea). Set `encode` to 'chain' to encode the first result
+ * and to 'raw' to encode consequent results. It is necessary to
+ * also set `encode_chain_len`, because size of resulting array
+ * is not known to a merger when it writes the first result.
+ *
+ * Constraints are:
+ *
+ * - `decode` option influences only on buffer sources
+ *   interpretation (ignored for sources of other types).
+ * - `encode_*` options are applicable only when `buffer`
+ *   options is provided.
+ *
+ * Cascading mergers
+ * -----------------
+ *
+ * The idea is simple: the merger output formats are the same as
+ * source formats, so it is possible to merge results of previous
+ * merges.
+ *
+ * The example below is synthetic to be simple. Real cases when
+ * cascading can be profitable likely involve additional layers
+ * of Tarantool instances between storages and clients or separate
+ * threads to merge blocks of each level.
+ *
+ * To be honest no one use this ability for now. It exists,
+ * because the same input and output formats looks as good
+ * property of the API.
+ *
+ * ```
+ * <...requires...>
+ *
+ * local sources = <...100 buffers...>
+ * local merger_inst = merger.new(<...>)
+ *
+ * -- We use buffer sources at 1st and 2nd merge layers, but read
+ * -- the final result as the table.
+ *
+ * local sources_level_2 = {}
+ * for i = 1, 10 do
+ *     -- Take next 10 first level sources.
+ *     local sources_level_1 = {}
+ *     for j = 1, 10 do
+ *         sources_level_1[j] = sources[(i - 1) * 10 + j]
+ *     end
+ *
+ *     -- Merge 10 sources into a second level source.
+ *     local result_level_1 = buffer.ibuf()
+ *     merger_inst:select(sources_level_1, {buffer = result_level_1})
+ *     sources_level_2[i] = result_level_1
+ * end
+ *
+ * local res = merger_inst:select(sources_level_2)
+ * ```
+ */
+
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+
+#include <lua.h>
+#include <lauxlib.h>
+
+#include "lua/error.h"
+#include "lua/utils.h"
+#include "small/ibuf.h"
+#include "msgpuck.h"
+#include "mpstream.h"
+#include "lua/msgpack.h"
+
+#define HEAP_FORWARD_DECLARATION
+#include "salad/heap.h"
+
+#include "box/iproto_constants.h" /* IPROTO_DATA */
+#include "box/field_def.h"
+#include "box/key_def.h"
+#include "box/schema_def.h"
+#include "box/tuple.h"
+#include "box/lua/tuple.h"
+#include "box/box.h"
+#include "box/index.h"
+#include "box/coll_id_cache.h"
+#include "lua/lua_iterator.h"
+#include "diag.h"
+
+#ifndef NDEBUG
+#include "say.h"
+/**
+ * Heap insert/delete/update macros wrapped with debug prints.
+ */
+#define MERGER_HEAP_INSERT(heap, hnode, source) do {			\
+	say_debug("merger: [source %p] insert: tuple: %s", (source),	\
+		  tuple_str((source)->tuple));				\
+	merger_heap_insert((heap), (hnode));				\
+} while(0)
+#define MERGER_HEAP_DELETE(heap, hnode, source) do {		\
+	say_debug("merger: [source %p] delete", (source));	\
+	merger_heap_delete((heap), (hnode));			\
+} while(0)
+#define MERGER_HEAP_UPDATE(heap, hnode, source) do {			\
+	say_debug("merger: [source %p] update: tuple: %s", (source),	\
+		  tuple_str((source)->tuple));				\
+	merger_heap_update((heap), (hnode));				\
+} while(0)
+#else /* !defined(NDEBUG) */
+/**
+ * Heap insert/delete/update macros wrappers w/o debug prints.
+ */
+#define MERGER_HEAP_INSERT(heap, hnode, source) do {	\
+	merger_heap_insert((heap), (hnode));		\
+} while(0)
+#define MERGER_HEAP_DELETE(heap, hnode, source) do {	\
+	merger_heap_delete((heap), (hnode));		\
+} while(0)
+#define MERGER_HEAP_UPDATE(heap, hnode, source) do {	\
+	merger_heap_update((heap), (hnode));		\
+} while(0)
+#endif /* !defined(NDEBUG) */
+
+/**
+ * Helper macros to push / throw out of memory errors to Lua.
+ */
+#define push_out_of_memory_error(L, size, what_name) do {	\
+	diag_set(OutOfMemory, (size), "malloc", (what_name));	\
+	luaT_pusherror(L, diag_last_error(diag_get()));		\
+} while(0)
+#define throw_out_of_memory_error(L, size, what_name) do {	\
+	diag_set(OutOfMemory, (size), "malloc", (what_name));	\
+	luaT_error(L);						\
+	unreachable();						\
+	return -1;						\
+} while(0)
+
+#define BOX_COLLATION_NAME_INDEX 1
+
+/**
+ * A type of data structure that holds source data.
+ */
+enum merger_source_type {
+	SOURCE_TYPE_BUFFER,
+	SOURCE_TYPE_TABLE,
+	SOURCE_TYPE_ITERATOR,
+	SOURCE_TYPE_NONE,
+};
+
+/**
+ * How data are encoded in a buffer.
+ *
+ * `decode` and `encode` options are parsed to values of this
+ * enum.
+ */
+enum merger_buffer_type {
+	BUFFER_TYPE_RAW,
+	BUFFER_TYPE_SELECT,
+	BUFFER_TYPE_CALL,
+	BUFFER_TYPE_CHAIN,
+	BUFFER_TYPE_NONE,
+};
+
+/**
+ * Hold state of a merge source.
+ */
+struct merger_source {
+	/*
+	 * A source is the heap node. Compared by the next tuple.
+	 */
+	struct heap_node hnode;
+	/* Union determinant. */
+	enum merger_source_type type;
+	/* Fields specific for certaint source types. */
+	union {
+		/* Buffer source. */
+		struct {
+			struct ibuf *buf;
+			/*
+			 * A merger stops before end of a buffer
+			 * when it is not the last merger in the
+			 * chain.
+			 */
+			size_t remaining_tuples_cnt;
+		} buf;
+		/* Table source. */
+		struct {
+			int ref;
+			int next_idx;
+		} tbl;
+		/* Iterator source. */
+		struct {
+			struct lua_iterator *it;
+		} it;
+	};
+	/* Next tuple. */
+	struct tuple *tuple;
+};
+
+/**
+ * Holds immutable parameters of a merger.
+ */
+struct merger {
+	struct key_def *key_def;
+	box_tuple_format_t *format;
+};
+
+/**
+ * Holds parameters of merge process, sources, result storage
+ * (if any), heap of sources and utility flags / counters.
+ */
+struct merger_iterator {
+	/* Heap of sources. */
+	heap_t heap;
+	/*
+	 * key_def is copied from merger.
+	 *
+	 * A merger can be collected by LuaJIT GC independently
+	 * from a merger_iterator, so we cannot just save pointer
+	 * to merger here and so we copy key_def from merger.
+	 */
+	struct key_def *key_def;
+	/* Parsed sources and decoding parameters. */
+	uint32_t sources_count;
+	struct merger_source **sources;
+	enum merger_buffer_type decode;
+	/* Ascending / descending order. */
+	int order;
+	/* Optional output buffer and encoding parameters. */
+	struct ibuf *obuf;
+	enum merger_buffer_type encode;
+	uint32_t encode_chain_len;
+};
+
+static uint32_t merger_type_id = 0;
+static uint32_t merger_iterator_type_id = 0;
+static uint32_t ibuf_type_id = 0;
+
+/* Forward declarations. */
+static bool
+source_less(const heap_t *heap, const struct heap_node *a,
+	    const struct heap_node *b);
+static int
+lbox_merger_gc(struct lua_State *L);
+static void
+merger_iterator_delete(struct lua_State *L, struct merger_iterator *it);
+static int
+lbox_merger_iterator_gc(struct lua_State *L);
+
+#define HEAP_NAME merger_heap
+#define HEAP_LESS source_less
+#include "salad/heap.h"
+
+/**
+ * Create the new tuple with specific format from a Lua table or a
+ * tuple.
+ *
+ * In case of an error push the error message to the Lua stack and
+ * return NULL.
+ */
+static struct tuple *
+luaT_gettuple_with_format(struct lua_State *L, int idx,
+			  box_tuple_format_t *format)
+{
+	struct tuple *tuple;
+	if (lua_istable(L, idx)) {
+		/* Based on lbox_tuple_new() code. */
+		struct ibuf *buf = tarantool_lua_ibuf;
+		ibuf_reset(buf);
+		struct mpstream stream;
+		mpstream_init(&stream, buf, ibuf_reserve_cb, ibuf_alloc_cb,
+		      luamp_error, L);
+		luamp_encode_tuple(L, luaL_msgpack_default, &stream, idx);
+		mpstream_flush(&stream);
+		tuple = box_tuple_new(format, buf->buf,
+				      buf->buf + ibuf_used(buf));
+		if (tuple == NULL) {
+			luaT_pusherror(L, diag_last_error(diag_get()));
+			return NULL;
+		}
+		ibuf_reinit(tarantool_lua_ibuf);
+		return tuple;
+	}
+	tuple = luaT_istuple(L, idx);
+	if (tuple == NULL) {
+		lua_pushfstring(L, "A tuple or a table expected, got %s",
+				lua_typename(L, lua_type(L, -1)));
+		return NULL;
+	}
+	/*
+	 * Create the new tuple with the format necessary for fast
+	 * comparisons.
+	 */
+	const char *tuple_beg = tuple_data(tuple);
+	const char *tuple_end = tuple_beg + tuple->bsize;
+	tuple = box_tuple_new(format, tuple_beg, tuple_end);
+	if (tuple == NULL) {
+		luaT_pusherror(L, diag_last_error(diag_get()));
+		return NULL;
+	}
+	return tuple;
+}
+
+/**
+ * Data comparing function to construct heap of sources.
+ */
+static bool
+source_less(const heap_t *heap, const struct heap_node *a,
+	    const struct heap_node *b)
+{
+	struct merger_source *left = container_of(a, struct merger_source,
+						  hnode);
+	struct merger_source *right = container_of(b, struct merger_source,
+						   hnode);
+	if (left->tuple == NULL && right->tuple == NULL)
+		return false;
+	if (left->tuple == NULL)
+		return false;
+	if (right->tuple == NULL)
+		return true;
+	struct merger_iterator *it = container_of(heap, struct merger_iterator,
+						  heap);
+	return it->order * box_tuple_compare(left->tuple, right->tuple,
+					     it->key_def) < 0;
+}
+
+/**
+ * Update source->tuple of specific source.
+ *
+ * Increases the reference counter of the tuple.
+ *
+ * Return 0 when successfully fetched a tuple or NULL. In case of
+ * an error push an error message to the Lua stack and return 1.
+ */
+static int
+source_fetch(struct lua_State *L, struct merger_source *source,
+	     box_tuple_format_t *format)
+{
+	source->tuple = NULL;
+
+	switch (source->type) {
+	case SOURCE_TYPE_BUFFER: {
+		if (source->buf.remaining_tuples_cnt == 0)
+			return 0;
+		--source->buf.remaining_tuples_cnt;
+		if (ibuf_used(source->buf.buf) == 0) {
+			lua_pushstring(L, "Unexpected msgpack buffer end");
+			return 1;
+		}
+		const char *tuple_beg = source->buf.buf->rpos;
+		const char *tuple_end = tuple_beg;
+		/*
+		 * mp_next() is faster then mp_check(), but can
+		 * read bytes outside of the buffer and so can
+		 * cause segmentation faults or incorrect result.
+		 *
+		 * We check buffer boundaries after the mp_next()
+		 * call and throw an error when the boundaries are
+		 * violated, but it does not save us from possible
+		 * segmentation faults.
+		 *
+		 * It is in a user responsibility to provide valid
+		 * msgpack.
+		 */
+		mp_next(&tuple_end);
+		if (tuple_end > source->buf.buf->wpos) {
+			lua_pushstring(L, "Unexpected msgpack buffer end");
+			return 1;
+		}
+		source->buf.buf->rpos = (char *) tuple_end;
+		source->tuple = box_tuple_new(format, tuple_beg, tuple_end);
+		if (source->tuple == NULL) {
+			luaT_pusherror(L, diag_last_error(diag_get()));
+			return 1;
+		}
+		break;
+	}
+	case SOURCE_TYPE_TABLE: {
+		lua_rawgeti(L, LUA_REGISTRYINDEX, source->tbl.ref);
+		lua_pushinteger(L, source->tbl.next_idx);
+		lua_gettable(L, -2);
+		if (lua_isnil(L, -1)) {
+			lua_pop(L, 2);
+			return 0;
+		}
+		source->tuple = luaT_gettuple_with_format(L, -1, format);
+		if (source->tuple == NULL)
+			return 1;
+		++source->tbl.next_idx;
+		lua_pop(L, 2);
+		break;
+	}
+	case SOURCE_TYPE_ITERATOR: {
+		int nresult = lua_iterator_next(L, source->it.it);
+		if (nresult == 0)
+			return 0;
+		source->tuple = luaT_gettuple_with_format(L, -nresult + 1,
+							  format);
+		if (source->tuple == NULL)
+			return 1;
+		lua_pop(L, nresult);
+		break;
+	}
+	case SOURCE_TYPE_NONE:
+	default:
+		unreachable();
+	}
+	box_tuple_ref(source->tuple);
+	return 0;
+}
+
+/**
+ * Extract a merger object from the Lua stack.
+ */
+static struct merger *
+check_merger(struct lua_State *L, int idx)
+{
+	uint32_t cdata_type;
+	struct merger **merger_ptr = luaL_checkcdata(L, idx, &cdata_type);
+	if (merger_ptr == NULL || cdata_type != merger_type_id)
+		return NULL;
+	return *merger_ptr;
+}
+
+/**
+ * Extract a merger_iterator object from the Lua stack.
+ */
+static struct merger_iterator *
+check_merger_iterator(struct lua_State *L, int idx)
+{
+	uint32_t cdata_type;
+	struct merger_iterator **it_ptr = luaL_checkcdata(L, idx, &cdata_type);
+	if (it_ptr == NULL || cdata_type != merger_iterator_type_id)
+		return NULL;
+	return *it_ptr;
+}
+
+/**
+ * Extract an ibuf object from the Lua stack.
+ */
+static struct ibuf *
+check_ibuf(struct lua_State *L, int idx)
+{
+	if (lua_type(L, idx) != LUA_TCDATA)
+		return NULL;
+
+	uint32_t cdata_type;
+	struct ibuf *ibuf_ptr = luaL_checkcdata(L, idx, &cdata_type);
+	if (ibuf_ptr == NULL || cdata_type != ibuf_type_id)
+		return NULL;
+	return ibuf_ptr;
+}
+
+#define RPOS_P(buf) ((const char **) &(buf)->rpos)
+
+/**
+ * Skip (and check) the wrapper around tuples array (and the array
+ * itself).
+ *
+ * Expected different kind of wrapping depending of it->decode.
+ */
+static int
+decode_header(struct merger_iterator *it, struct ibuf *buf, size_t *len_p)
+{
+	int ok = 1;
+	/* Decode {[IPROTO_DATA] = ...} header. */
+	if (it->decode != BUFFER_TYPE_RAW)
+		ok = mp_typeof(*buf->rpos) == MP_MAP &&
+			mp_decode_map(RPOS_P(buf)) == 1 &&
+			mp_typeof(*buf->rpos) == MP_UINT &&
+			mp_decode_uint(RPOS_P(buf)) == IPROTO_DATA;
+	/* Decode the array around call return values. */
+	if (ok && (it->decode == BUFFER_TYPE_CALL ||
+	    it->decode == BUFFER_TYPE_CHAIN))
+		ok = mp_typeof(*buf->rpos) == MP_ARRAY &&
+			mp_decode_array(RPOS_P(buf)) > 0;
+	/* Decode the array around chained input. */
+	if (ok && it->decode == BUFFER_TYPE_CHAIN)
+		ok = mp_typeof(*buf->rpos) == MP_ARRAY &&
+			mp_decode_array(RPOS_P(buf)) > 0;
+	/* Decode the array around tuples to merge. */
+	if (ok)
+		ok = mp_typeof(*buf->rpos) == MP_ARRAY;
+	if (ok)
+		*len_p = mp_decode_array(RPOS_P(buf));
+	return ok;
+}
+
+#undef RPOS_P
+
+/**
+ * Encode the wrapper around tuples array (and the array itself).
+ *
+ * The written msgpack depends on it->encode.
+ */
+static void
+encode_header(struct merger_iterator *it, uint32_t result_len)
+{
+	struct ibuf *obuf = it->obuf;
+
+	/* Encode {[IPROTO_DATA] = ...} header. */
+	if (it->encode != BUFFER_TYPE_RAW) {
+		ibuf_reserve(obuf, mp_sizeof_map(1) +
+			     mp_sizeof_uint(IPROTO_DATA));
+		obuf->wpos = mp_encode_map(obuf->wpos, 1);
+		obuf->wpos = mp_encode_uint(obuf->wpos, IPROTO_DATA);
+	}
+	/* Encode the array around call return values. */
+	if (it->encode == BUFFER_TYPE_CALL || it->encode == BUFFER_TYPE_CHAIN) {
+		ibuf_reserve(obuf, mp_sizeof_array(1));
+		obuf->wpos = mp_encode_array(obuf->wpos, 1);
+	}
+	/* Encode the array around chained output. */
+	if (it->encode == BUFFER_TYPE_CHAIN) {
+		ibuf_reserve(obuf, mp_sizeof_array(it->encode_chain_len));
+		obuf->wpos = mp_encode_array(obuf->wpos, it->encode_chain_len);
+	}
+	/* Encode the array around resulting tuples. */
+	ibuf_reserve(obuf, mp_sizeof_array(result_len));
+	obuf->wpos = mp_encode_array(obuf->wpos, result_len);
+}
+
+/**
+ * Push 'bad params' / 'bad param X' and the usage info to the Lua
+ * stack.
+ */
+static int
+merger_usage(struct lua_State *L, const char *param_name)
+{
+	static const char *usage = "merger_inst:{ipairs,pairs,select}("
+				   "{source, source, ...}[, {"
+				   "descending = <boolean> or <nil>, "
+				   "decode = 'raw' / 'select' / 'call' / "
+				   "'chain' / <nil>, "
+				   "buffer = <cdata<struct ibuf>> or <nil>, "
+				   "encode = 'raw' / 'select' / 'call' / "
+				   "'chain' / <nil>, "
+				   "encode_chain_len = <number> or <nil>}])";
+	if (param_name == NULL)
+		lua_pushfstring(L, "Bad params, use: %s", usage);
+	else
+		lua_pushfstring(L, "Bad param \"%s\", use: %s", param_name,
+				usage);
+	return 1;
+}
+
+/**
+ * Get a tuple from a top source, update the source, update the
+ * heap.
+ *
+ * The reference counter of the tuple is increased (in
+ * source_fetch).
+ *
+ * Return NULL when all sources are drained.
+ */
+static struct tuple *
+merger_next(struct lua_State *L, struct merger *merger,
+	    struct merger_iterator *it)
+{
+	struct heap_node *hnode = merger_heap_top(&it->heap);
+	if (hnode == NULL)
+		return NULL;
+
+	struct merger_source *source = container_of(hnode, struct merger_source,
+						    hnode);
+	struct tuple *tuple = source->tuple;
+	assert(tuple != NULL);
+	if (source_fetch(L, source, merger->format) != 0) {
+		lua_error(L);
+		unreachable();
+		return NULL;
+	}
+	if (source->tuple == NULL)
+		MERGER_HEAP_DELETE(&it->heap, hnode, source);
+	else
+		MERGER_HEAP_UPDATE(&it->heap, hnode, source);
+
+	return tuple;
+}
+
+/**
+ * Determine type of a merger source on the Lua stack.
+ *
+ * Set *buf_p to buffer when the source is valid source of buffer
+ * type and buf_p is not NULL.
+ */
+static enum merger_source_type
+parse_source_type(lua_State *L, int idx, struct ibuf **buf_p)
+{
+	if (lua_type(L, idx) == LUA_TCDATA) {
+		struct ibuf *buf = check_ibuf(L, idx);
+		if (buf == NULL)
+			return SOURCE_TYPE_NONE;
+		if (buf_p != NULL)
+			*buf_p = buf;
+		return SOURCE_TYPE_BUFFER;
+	} else if (lua_istable(L, idx)) {
+		lua_rawgeti(L, idx, 1);
+		int iscallable = luaT_iscallable(L, idx);
+		lua_pop(L, 1);
+		if (iscallable)
+			return SOURCE_TYPE_ITERATOR;
+		return SOURCE_TYPE_TABLE;
+	}
+
+	return SOURCE_TYPE_NONE;
+}
+
+/**
+ * Parse 'decode' / 'encode' options.
+ */
+static enum merger_buffer_type
+parse_buffer_type(lua_State *L, int idx)
+{
+	if (lua_isnoneornil(L, idx))
+		return BUFFER_TYPE_SELECT;
+
+	if (lua_type(L, idx) != LUA_TSTRING)
+		return BUFFER_TYPE_NONE;
+
+	size_t len;
+	const char *param = lua_tolstring(L, idx, &len);
+
+	if (!strncmp(param, "raw", len))
+		return BUFFER_TYPE_RAW;
+	else if (!strncmp(param, "select", len))
+		return BUFFER_TYPE_SELECT;
+	else if (!strncmp(param, "call", len))
+		return BUFFER_TYPE_CALL;
+	else if (!strncmp(param, "chain", len))
+		return BUFFER_TYPE_CHAIN;
+
+	return BUFFER_TYPE_NONE;
+}
+
+/**
+ * Parse optional third parameter of merger_inst:pairs() and
+ * merger_inst:select() into the merger_iterator structure.
+ *
+ * Returns 0 on success. In case of an error it pushes an error
+ * message to the Lua stack and returns 1.
+ */
+static int
+parse_opts(struct lua_State *L, int idx, struct merger_iterator *it)
+{
+	/* No opts: use defaults. */
+	if (lua_isnoneornil(L, idx))
+		return 0;
+
+	/* Not a table: error. */
+	if (!lua_istable(L, idx))
+		return merger_usage(L, NULL);
+
+	/* Parse descending to it->order. */
+	lua_pushstring(L, "descending");
+	lua_gettable(L, idx);
+	if (!lua_isnil(L, -1)) {
+		if (lua_isboolean(L, -1))
+			it->order = lua_toboolean(L, -1) ? -1 : 1;
+		else
+			return merger_usage(L, "descending");
+	}
+	lua_pop(L, 1);
+
+	/* Parse decode to it->decode. */
+	lua_pushstring(L, "decode");
+	lua_gettable(L, idx);
+	if (!lua_isnil(L, -1)) {
+		it->decode = parse_buffer_type(L, -1);
+		if (it->decode == BUFFER_TYPE_NONE)
+			return merger_usage(L, "decode");
+	}
+	lua_pop(L, 1);
+
+	/* Parse buffer. */
+	lua_pushstring(L, "buffer");
+	lua_gettable(L, idx);
+	if (!lua_isnil(L, -1)) {
+		if ((it->obuf = check_ibuf(L, -1)) == NULL)
+			return merger_usage(L, "buffer");
+	}
+	lua_pop(L, 1);
+
+	/* Parse encode to it->encode. */
+	lua_pushstring(L, "encode");
+	lua_gettable(L, idx);
+	if (!lua_isnil(L, -1)) {
+		if (it->obuf == NULL) {
+			lua_pushfstring(L, "\"buffer\" option is mandatory "
+					   "when \"encode\" is used");
+			return 1;
+		}
+		it->encode = parse_buffer_type(L, -1);
+		if (it->encode == BUFFER_TYPE_NONE)
+			return merger_usage(L, "encode");
+	}
+	lua_pop(L, 1);
+
+	/* Parse encode_chain_len. */
+	lua_pushstring(L, "encode_chain_len");
+	lua_gettable(L, idx);
+	if (!lua_isnil(L, -1)) {
+		if (it->encode != BUFFER_TYPE_CHAIN) {
+			lua_pushfstring(L, "\"encode_chain_len\" is "
+					   "forbidden without "
+					   "{encode = 'chain'}");
+			return 1;
+		}
+		if (lua_isnumber(L, -1))
+			it->encode_chain_len =
+				(uint32_t) lua_tointeger(L, -1);
+		else
+			return merger_usage(L, "encode_chain_len");
+	}
+	lua_pop(L, 1);
+
+	/* Verify output_chain_len is provided when we
+	 * going to use it for output buffer header
+	 * encoding. */
+	if (it->obuf != NULL && it->encode == BUFFER_TYPE_CHAIN &&
+	    it->encode_chain_len == 0) {
+		lua_pushfstring(L, "\"encode_chain_len\" is mandatory when "
+				   "\"buffer\" and {encode = 'chain'} are "
+				   "used");
+		return 1;
+	}
+
+	return 0;
+}
+
+/**
+ * Parse sources table: second parameter pf merger_isnt:pairs()
+ * and merger_inst:select() into the merger_iterator structure.
+ *
+ * Note: This function should be called when options are already
+ * parsed (using parse_opts()).
+ *
+ * Returns 0 on success. In case of an error it pushes an error
+ * message to the Lua stack and returns 1.
+ */
+static int
+parse_sources(struct lua_State *L, int idx, struct merger *merger,
+	      struct merger_iterator *it)
+{
+	/* Allocate sources array. */
+	uint32_t capacity = 8;
+	const ssize_t sources_size = capacity * sizeof(struct merger_source *);
+	it->sources = (struct merger_source **) malloc(sources_size);
+	if (it->sources == NULL) {
+		push_out_of_memory_error(L, sources_size, "it->sources");
+		return 1;
+	}
+
+	/* Fetch all sources. */
+	while (true) {
+		lua_pushinteger(L, it->sources_count + 1);
+		lua_gettable(L, idx);
+		if (lua_isnil(L, -1))
+			break;
+
+		/* Shrink sources array if needed. */
+		if (it->sources_count == capacity) {
+			capacity *= 2;
+			struct merger_source **new_sources;
+			const ssize_t new_sources_size =
+				capacity * sizeof(struct merger_source *);
+			new_sources = (struct merger_source **) realloc(
+				it->sources, new_sources_size);
+			if (new_sources == NULL) {
+				push_out_of_memory_error(L, new_sources_size,
+							 "new_sources");
+				return 1;
+			}
+			it->sources = new_sources;
+		}
+
+		/* Allocate the new source. */
+		it->sources[it->sources_count] = (struct merger_source *)
+			malloc(sizeof(struct merger_source));
+		struct merger_source *current_source =
+			it->sources[it->sources_count];
+		if (current_source == NULL) {
+			push_out_of_memory_error(L,
+						 sizeof(struct merger_source),
+						 "merger_source");
+			return 1;
+		}
+
+		/*
+		 * Set type and tuple to correctly proceed in
+		 * merger_iterator_delete() in case of any further
+		 * error.
+		 */
+		struct ibuf *buf = NULL;
+		current_source->type = parse_source_type(L, -1, &buf);
+		current_source->tuple = NULL;
+
+		/*
+		 * Note: We need to increment sources count right
+		 * after successful malloc() of the new source
+		 * (before any further error check), because
+		 * merger_iterator_delete() frees that amount of
+		 * sources.
+		 */
+		++it->sources_count;
+
+		/* Initialize the new source. */
+		switch (current_source->type) {
+		case SOURCE_TYPE_BUFFER:
+			if (!decode_header(it, buf,
+			    &current_source->buf.remaining_tuples_cnt)) {
+				lua_pushstring(L, "Invalid merge source");
+				return 1;
+			}
+			current_source->buf.buf = buf;
+			break;
+		case SOURCE_TYPE_TABLE:
+			/* Save a table ref and a next index. */
+			lua_pushvalue(L, -1); /* Popped by luaL_ref(). */
+			int tbl_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+			current_source->tbl.ref = tbl_ref;
+			current_source->tbl.next_idx = 1;
+			break;
+		case SOURCE_TYPE_ITERATOR:
+			/* Wrap and save iterator. */
+			current_source->it.it =
+				lua_iterator_new_fromtable(L, -1);
+			break;
+		case SOURCE_TYPE_NONE:
+			lua_pushfstring(L, "Unknown source type at index %d",
+					it->sources_count);
+			return 1;
+		default:
+			unreachable();
+			return 1;
+		}
+		if (source_fetch(L, current_source, merger->format) != 0)
+			return 1;
+		if (current_source->tuple != NULL)
+			MERGER_HEAP_INSERT(&it->heap,
+					   &current_source->hnode,
+					   current_source);
+	}
+	lua_pop(L, it->sources_count + 1);
+
+	return 0;
+}
+
+/**
+ * Parse sources and options on Lua stack and create the new
+ * merger_interator instance.
+ */
+static struct merger_iterator *
+merger_iterator_new(struct lua_State *L)
+{
+	struct merger *merger;
+	int ok = (lua_gettop(L) == 2 || lua_gettop(L) == 3) &&
+		/* Merger. */
+		(merger = check_merger(L, 1)) != NULL &&
+		/* Sources. */
+		lua_istable(L, 2) == 1 &&
+		/* Opts. */
+		(lua_isnoneornil(L, 3) == 1 || lua_istable(L, 3) == 1);
+	if (!ok) {
+		merger_usage(L, NULL);
+		lua_error(L);
+		unreachable();
+		return NULL;
+	}
+
+	struct merger_iterator *it = (struct merger_iterator *)
+		malloc(sizeof(struct merger_iterator));
+	merger_heap_create(&it->heap);
+	it->key_def = key_def_dup(merger->key_def);
+	it->sources_count = 0;
+	it->sources = NULL;
+	it->decode = BUFFER_TYPE_NONE;
+	it->order = 1;
+	it->obuf = NULL;
+	it->encode = BUFFER_TYPE_NONE;
+	it->encode_chain_len = 0;
+
+	if (parse_opts(L, 3, it) != 0 || parse_sources(L, 2, merger, it) != 0) {
+		merger_iterator_delete(L, it);
+		lua_error(L);
+		unreachable();
+		return NULL;
+	}
+
+	return it;
+}
+
+/**
+ * Iterator gen function to traverse merger results.
+ *
+ * Expected a merger instance as the first parameter (state) and a
+ * merger_iterator as the second parameter (param) on the Lua
+ * stack.
+ *
+ * Push the merger_iterator (the new param) and the next tuple.
+ */
+static int
+lbox_merger_iterator_gen(struct lua_State *L)
+{
+	struct merger *merger;
+	struct merger_iterator *it;
+	bool ok = (merger = check_merger(L, -2)) != NULL &&
+		(it = check_merger_iterator(L, -1)) != NULL;
+	if (!ok)
+		return luaL_error(L, "Bad params, use: "
+				     "lbox_merger_iterator_gen(merger, "
+				     "merger_iterator)");
+
+	struct tuple *tuple = merger_next(L, merger, it);
+	if (tuple == NULL) {
+		lua_pushnil(L);
+		lua_pushnil(L);
+		return 2;
+	}
+
+	/* Push merger_iterator, tuple. */
+	*(struct merger_iterator **)
+		luaL_pushcdata(L, merger_iterator_type_id) = it;
+	luaT_pushtuple(L, tuple);
+
+	box_tuple_unref(tuple);
+	return 2;
+}
+
+/**
+ * Iterate over merge results from Lua.
+ *
+ * Push three values to the Lua stack:
+ *
+ * 1. gen (lbox_merger_iterator_gen wrapped by fun.wrap());
+ * 2. param (merger);
+ * 3. state (merger_iterator).
+ */
+static int
+lbox_merger_ipairs(struct lua_State *L)
+{
+	/* Create merger_iterator. */
+	struct merger_iterator *it = merger_iterator_new(L);
+	lua_settop(L, 1); /* Pop sources, [opts]. */
+	/* Stack: merger. */
+
+	if (it->obuf != NULL)
+		return luaL_error(L, "\"buffer\" option is forbidden with "
+				  "merger_inst:pairs(<...>)");
+
+	luaL_loadstring(L, "return require('fun').wrap");
+	lua_call(L, 0, 1);
+	lua_insert(L, -2); /* Swap merger and wrap. */
+	/* Stack: wrap, merger. */
+
+	lua_pushcfunction(L, lbox_merger_iterator_gen);
+	lua_insert(L, -2); /* Swap merger and gen. */
+	/* Stack: wrap, gen, merger. */
+
+	*(struct merger_iterator **)
+		luaL_pushcdata(L, merger_iterator_type_id) = it;
+	lua_pushcfunction(L, lbox_merger_iterator_gc);
+	luaL_setcdatagc(L, -2);
+	/* Stack: wrap, gen, merger, merger_iterator. */
+
+	/* Call fun.wrap(gen, merger, merger_iterator). */
+	lua_call(L, 3, 3);
+	return 3;
+}
+
+/**
+ * Write merge results into ibuf.
+ */
+static void
+encode_result_buffer(struct lua_State *L, struct merger *merger,
+		     struct merger_iterator *it)
+{
+	struct ibuf *obuf = it->obuf;
+	uint32_t result_len = 0;
+	uint32_t result_len_offset = 4;
+
+	/*
+	 * Reserve maximum size for the array around resulting
+	 * tuples to set it later.
+	 */
+	encode_header(it, UINT32_MAX);
+
+	/* Fetch, merge and copy tuples to the buffer. */
+	struct tuple *tuple;
+	while ((tuple = merger_next(L, merger, it)) != NULL) {
+		uint32_t bsize = tuple->bsize;
+		ibuf_reserve(obuf, bsize);
+		memcpy(obuf->wpos, tuple_data(tuple), bsize);
+		obuf->wpos += bsize;
+		result_len_offset += bsize;
+		box_tuple_unref(tuple);
+		++result_len;
+	}
+
+	/* Write the real array size. */
+	mp_store_u32(obuf->wpos - result_len_offset, result_len);
+}
+
+/**
+ * Write merge results into the new Lua table.
+ */
+static int
+create_result_table(struct lua_State *L, struct merger *merger,
+		    struct merger_iterator *it)
+{
+	/* Create result table. */
+	lua_newtable(L);
+
+	uint32_t cur = 1;
+
+	/* Fetch, merge and save tuples to the table. */
+	struct tuple *tuple;
+	while ((tuple = merger_next(L, merger, it)) != NULL) {
+		luaT_pushtuple(L, tuple);
+		lua_rawseti(L, -2, cur);
+		box_tuple_unref(tuple);
+		++cur;
+	}
+
+	return 1;
+}
+
+/**
+ * Perform the merge.
+ *
+ * Write results into a buffer or a Lua table depending on options.
+ *
+ * Expected merger instance, sources table and options (optional)
+ * on the Lua stack.
+ *
+ * Return the Lua table or nothing when the 'buffer' option is
+ * provided.
+ */
+static int
+lbox_merger_select(struct lua_State *L)
+{
+	struct merger *merger = check_merger(L, 1);
+	if (merger == NULL) {
+		merger_usage(L, NULL);
+		lua_error(L);
+	}
+
+	struct merger_iterator *it = merger_iterator_new(L);
+	lua_settop(L, 0); /* Pop merger, sources, [opts]. */
+
+	if (it->obuf == NULL) {
+		return create_result_table(L, merger, it);
+	} else {
+		encode_result_buffer(L, merger, it);
+		return 0;
+	}
+}
+
+/**
+ * Create the new merger instance.
+ *
+ * Expected a table of key parts on the Lua stack.
+ *
+ * Returns the new instance.
+ */
+static int
+lbox_merger_new(struct lua_State *L)
+{
+	if (lua_gettop(L) != 1 || lua_istable(L, 1) != 1)
+		return luaL_error(L, "Bad params, use: merger.new({"
+				  "{fieldno = fieldno, type = type"
+				  "[, is_nullable = is_nullable"
+				  "[, collation_id = collation_id"
+				  "[, collation = collation]]]}, ...}");
+	uint32_t key_parts_count = 0;
+	uint32_t capacity = 8;
+
+	const ssize_t parts_size = sizeof(struct key_part_def) * capacity;
+	struct key_part_def *parts = NULL;
+	parts = (struct key_part_def *) malloc(parts_size);
+	if (parts == NULL)
+		throw_out_of_memory_error(L, parts_size, "parts");
+
+	while (true) {
+		lua_pushinteger(L, key_parts_count + 1);
+		lua_gettable(L, 1);
+		if (lua_isnil(L, -1))
+			break;
+
+		/* Extend parts if necessary. */
+		if (key_parts_count == capacity) {
+			capacity *= 2;
+			struct key_part_def *old_parts = parts;
+			const ssize_t parts_size =
+				sizeof(struct key_part_def) * capacity;
+			parts = (struct key_part_def *) realloc(parts,
+								parts_size);
+			if (parts == NULL) {
+				free(old_parts);
+				throw_out_of_memory_error(L, parts_size,
+							  "parts");
+			}
+		}
+
+		/* Set parts[key_parts_count].fieldno. */
+		lua_pushstring(L, "fieldno");
+		lua_gettable(L, -2);
+		if (lua_isnil(L, -1)) {
+			free(parts);
+			return luaL_error(L, "fieldno must not be nil");
+		}
+		/*
+		 * Transform one-based Lua fieldno to zero-based
+		 * fieldno to use in key_def_new().
+		 */
+		parts[key_parts_count].fieldno = lua_tointeger(L, -1) - 1;
+		lua_pop(L, 1);
+
+		/* Set parts[key_parts_count].type. */
+		lua_pushstring(L, "type");
+		lua_gettable(L, -2);
+		if (lua_isnil(L, -1)) {
+			free(parts);
+			return luaL_error(L, "type must not be nil");
+		}
+		size_t type_len;
+		const char *type_name = lua_tolstring(L, -1, &type_len);
+		lua_pop(L, 1);
+		parts[key_parts_count].type = field_type_by_name(type_name,
+								 type_len);
+		if (parts[key_parts_count].type == field_type_MAX) {
+			free(parts);
+			return luaL_error(L, "Unknown field type: %s",
+					  type_name);
+		}
+
+		/* Set parts[key_parts_count].is_nullable. */
+		lua_pushstring(L, "is_nullable");
+		lua_gettable(L, -2);
+		if (lua_isnil(L, -1)) {
+			parts[key_parts_count].is_nullable = false;
+			parts[key_parts_count].nullable_action =
+				ON_CONFLICT_ACTION_DEFAULT;
+		} else {
+			parts[key_parts_count].is_nullable =
+				lua_toboolean(L, -1);
+			parts[key_parts_count].nullable_action =
+				ON_CONFLICT_ACTION_NONE;
+		}
+		lua_pop(L, 1);
+
+		/* Set parts[key_parts_count].coll_id using collation_id. */
+		lua_pushstring(L, "collation_id");
+		lua_gettable(L, -2);
+		if (lua_isnil(L, -1))
+			parts[key_parts_count].coll_id = COLL_NONE;
+		else
+			parts[key_parts_count].coll_id = lua_tointeger(L, -1);
+		lua_pop(L, 1);
+
+		/* Set parts[key_parts_count].coll_id using collation. */
+		lua_pushstring(L, "collation");
+		lua_gettable(L, -2);
+		/* Check whether box.cfg{} was called. */
+		if ((parts[key_parts_count].coll_id != COLL_NONE ||
+		    !lua_isnil(L, -1)) && !box_is_configured()) {
+			free(parts);
+			return luaL_error(L, "Cannot use collations: "
+					  "please call box.cfg{}");
+		}
+		if (!lua_isnil(L, -1)) {
+			if (parts[key_parts_count].coll_id != COLL_NONE) {
+				free(parts);
+				return luaL_error(
+					L, "Conflicting options: collation_id "
+					"and collation");
+			}
+			size_t coll_name_len;
+			const char *coll_name = lua_tolstring(L, -1,
+							      &coll_name_len);
+			struct coll_id *coll_id = coll_by_name(coll_name,
+							       coll_name_len);
+			if (coll_id == NULL) {
+				free(parts);
+				return luaL_error(
+					L, "Unknown collation: \"%s\"",
+					coll_name);
+			}
+			parts[key_parts_count].coll_id = coll_id->id;
+		}
+		lua_pop(L, 1);
+
+		/* Check coll_id. */
+		struct coll_id *coll_id =
+			coll_by_id(parts[key_parts_count].coll_id);
+		if (parts[key_parts_count].coll_id != COLL_NONE &&
+		    coll_id == NULL) {
+			uint32_t collation_id = parts[key_parts_count].coll_id;
+			free(parts);
+			return luaL_error(L, "Unknown collation_id: %d",
+					  collation_id);
+		}
+
+		/* Set parts[key_parts_count].sort_order. */
+		parts[key_parts_count].sort_order = SORT_ORDER_ASC;
+
+		++key_parts_count;
+	}
+
+	struct merger *merger = calloc(1, sizeof(*merger));
+	if (merger == NULL) {
+		free(parts);
+		throw_out_of_memory_error(L, sizeof(*merger), "merger");
+	}
+	merger->key_def = key_def_new(parts, key_parts_count);
+	free(parts);
+	if (merger->key_def == NULL) {
+		return luaL_error(L, "Cannot create merger->key_def");
+	}
+
+	merger->format = box_tuple_format_new(&merger->key_def, 1);
+	if (merger->format == NULL) {
+		box_key_def_delete(merger->key_def);
+		free(merger);
+		return luaL_error(L, "Cannot create merger->format");
+	}
+
+	*(struct merger **) luaL_pushcdata(L, merger_type_id) = merger;
+
+	lua_pushcfunction(L, lbox_merger_gc);
+	luaL_setcdatagc(L, -2);
+
+	return 1;
+}
+
+/**
+ * Free the merger instance from a Lua code.
+ */
+static int
+lbox_merger_gc(struct lua_State *L)
+{
+	struct merger *merger;
+	if ((merger = check_merger(L, 1)) == NULL)
+		return 0;
+	box_key_def_delete(merger->key_def);
+	box_tuple_format_unref(merger->format);
+	free(merger);
+	return 0;
+}
+
+/**
+ * Free the merger iterator.
+ *
+ * We need to know Lua state here, because sources of table and
+ * iterator types are saved as references within the Lua state.
+ */
+static void
+merger_iterator_delete(struct lua_State *L, struct merger_iterator *it)
+{
+	merger_heap_destroy(&it->heap);
+	box_key_def_delete(it->key_def);
+
+	for (uint32_t i = 0; i < it->sources_count; ++i) {
+		assert(it->sources != NULL);
+		switch (it->sources[i]->type) {
+		case SOURCE_TYPE_BUFFER:
+			/* No-op. */
+			break;
+		case SOURCE_TYPE_TABLE:
+			luaL_unref(L, LUA_REGISTRYINDEX,
+				   it->sources[i]->tbl.ref);
+			break;
+		case SOURCE_TYPE_ITERATOR:
+			lua_iterator_free(L, it->sources[i]->it.it);
+			break;
+		case SOURCE_TYPE_NONE:
+			/*
+			 * We can reach this block when
+			 * parse_sources() find a bad source. Do
+			 * nothing, just free the memory.
+			 */
+			break;
+		default:
+			unreachable();
+		}
+		if (it->sources[i]->tuple != NULL)
+			box_tuple_unref(it->sources[i]->tuple);
+		free(it->sources[i]);
+	}
+
+	if (it->sources != NULL) {
+		assert(it->sources_count > 0);
+		free(it->sources);
+	}
+
+	free(it);
+}
+
+/**
+ * Free the merger iterator from a Lua code.
+ */
+static int
+lbox_merger_iterator_gc(struct lua_State *L)
+{
+	struct merger_iterator *it;
+	if ((it = check_merger_iterator(L, 1)) == NULL)
+		return 0;
+	merger_iterator_delete(L, it);
+	return 0;
+}
+
+/**
+ * Register the module.
+ */
+LUA_API int
+luaopen_merger(lua_State *L)
+{
+	luaL_cdef(L, "struct merger;");
+	luaL_cdef(L, "struct merger_iterator;");
+	luaL_cdef(L, "struct ibuf;");
+	merger_type_id = luaL_ctypeid(L, "struct merger&");
+	merger_iterator_type_id = luaL_ctypeid(L, "struct merger_iterator&");
+	ibuf_type_id = luaL_ctypeid(L, "struct ibuf");
+	lua_newtable(L);
+	static const struct luaL_Reg meta[] = {
+		{"new", lbox_merger_new},
+		{NULL, NULL}
+	};
+	luaL_register_module(L, "merger", meta);
+
+	/* Export C functions to Lua. */
+	lua_newtable(L); /* merger.internal */
+	lua_pushcfunction(L, lbox_merger_select);
+	lua_setfield(L, -2, "select");
+	lua_pushcfunction(L, lbox_merger_ipairs);
+	lua_setfield(L, -2, "ipairs");
+	lua_setfield(L, -2, "internal");
+
+	return 1;
+}
diff --git a/src/lua/merger.h b/src/lua/merger.h
new file mode 100644
index 000000000..6d7ca0957
--- /dev/null
+++ b/src/lua/merger.h
@@ -0,0 +1,39 @@
+#ifndef TARANTOOL_LUA_MERGER_H_INCLUDED
+#define TARANTOOL_LUA_MERGER_H_INCLUDED 1
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+struct lua_State;
+
+int
+luaopen_merger(struct lua_State *L);
+
+#endif /* TARANTOOL_LUA_MERGER_H_INCLUDED */
diff --git a/src/lua/merger.lua b/src/lua/merger.lua
new file mode 100644
index 000000000..173cf4154
--- /dev/null
+++ b/src/lua/merger.lua
@@ -0,0 +1,19 @@
+local ffi = require('ffi')
+local merger = require('merger')
+
+local merger_t = ffi.typeof('struct merger')
+
+local methods = {
+    ['select'] = merger.internal.select,
+    ['pairs']  = merger.internal.ipairs,
+    ['ipairs']  = merger.internal.ipairs,
+}
+
+ffi.metatype(merger_t, {
+    __index = function(self, key)
+        return methods[key]
+    end,
+    -- Lua 5.2 compatibility
+    __pairs = merger.internal.ipairs,
+    __ipairs = merger.internal.ipairs,
+})
diff --git a/test/app-tap/merger.test.lua b/test/app-tap/merger.test.lua
new file mode 100755
index 000000000..7ec6e072c
--- /dev/null
+++ b/test/app-tap/merger.test.lua
@@ -0,0 +1,693 @@
+#!/usr/bin/env tarantool
+
+local tap = require('tap')
+local buffer = require('buffer')
+local msgpackffi = require('msgpackffi')
+local digest = require('digest')
+local merger = require('merger')
+local crypto = require('crypto')
+local fiber = require('fiber')
+local utf8 = require('utf8')
+local ffi = require('ffi')
+
+local IPROTO_DATA = 48
+
+local function merger_usage(param)
+    local msg = 'merger_inst:{ipairs,pairs,select}(' ..
+        '{source, source, ...}[, {' ..
+        'descending = <boolean> or <nil>, ' ..
+        'decode = \'raw\' / \'select\' / \'call\' / \'chain\' / <nil>, ' ..
+        'buffer = <cdata<struct ibuf>> or <nil>, ' ..
+        'encode = \'raw\' / \'select\' / \'call\' / \'chain\' / <nil>, ' ..
+        'encode_chain_len = <number> or <nil>}])'
+    if not param then
+        return ('Bad params, use: %s'):format(msg)
+    else
+        return ('Bad param "%s", use: %s'):format(param, msg)
+    end
+end
+
+-- Get buffer with data encoded without last 'trunc' bytes.
+local function truncated_msgpack_buffer(data, trunc)
+    local data = msgpackffi.encode(data)
+    data = data:sub(1, data:len() - trunc)
+    local len = data:len()
+    local buf = buffer.ibuf()
+    -- Ensure we have enough buffer to write len + trunc bytes.
+    buf:reserve(len + trunc)
+    local p = buf:alloc(len)
+    -- Ensure len bytes follows with trunc zero bytes.
+    ffi.copy(p, data .. string.rep('\0', trunc), len + trunc)
+    return buf
+end
+
+local bad_merger_new_calls = {
+    -- Cases to call before box.cfg{}.
+    {
+        'Pass a field on an unknown type',
+        parts = {{
+            fieldno = 2,
+            type = 'unknown',
+        }},
+        exp_err = 'Unknown field type: unknown',
+    },
+    {
+        'Try to use collation_id before box.cfg{}',
+        parts = {{
+            fieldno = 1,
+            type = 'string',
+            collation_id = 2,
+        }},
+        exp_err = 'Cannot use collations: please call box.cfg{}',
+    },
+    {
+        'Try to use collation before box.cfg{}',
+        parts = {{
+            fieldno = 1,
+            type = 'string',
+            collation = 'unicode_ci',
+        }},
+        exp_err = 'Cannot use collations: please call box.cfg{}',
+    },
+    function()
+        -- For collations.
+        box.cfg{}
+    end,
+    -- Cases to call after box.cfg{}.
+    {
+        'Try to use both collation_id and collation',
+        parts = {{
+            fieldno = 1,
+            type = 'string',
+            collation_id = 2,
+            collation = 'unicode_ci',
+        }},
+        exp_err = 'Conflicting options: collation_id and collation',
+    },
+    {
+        'Unknown collation_id',
+        parts = {{
+            fieldno = 1,
+            type = 'string',
+            collation_id = 42,
+        }},
+        exp_err = 'Unknown collation_id: 42',
+    },
+    {
+        'Unknown collation name',
+        parts = {{
+            fieldno = 1,
+            type = 'string',
+            collation = 'unknown',
+        }},
+        exp_err = 'Unknown collation: "unknown"',
+    },
+}
+
+local bad_merger_methods_calls = {
+    {
+        'Bad opts',
+        sources = {},
+        opts = 1,
+        exp_err = merger_usage(nil),
+    },
+    {
+        'Bad opts.descending',
+        sources = {},
+        opts = {descending = 1},
+        exp_err = merger_usage('descending'),
+    },
+    {
+        'Bad opts.decode',
+        sources = {},
+        opts = {decode = 1},
+        exp_err = merger_usage('decode'),
+    },
+    {
+        'Bad source',
+        sources = {1},
+        opts = nil,
+        exp_err = 'Unknown source type at index 1',
+    },
+    {
+        'Bad cdata source',
+        sources = {ffi.new('char *')},
+        opts = nil,
+        exp_err = 'Unknown source type at index 1',
+    },
+    {
+        'Missed encode_chain_len',
+        sources = {},
+        opts = {buffer = buffer.ibuf(), encode = 'chain'},
+        exp_err = '"encode_chain_len" is mandatory when "buffer" and ' ..
+            '{encode = \'chain\'} are used',
+    },
+    {
+        'Wrong source of table type',
+        sources = {{1}},
+        opts = nil,
+        exp_err = 'A tuple or a table expected, got number',
+    },
+    {
+        'Use buffer with an iterator result',
+        sources = {},
+        opts = {buffer = buffer.ibuf()},
+        funcs = {'pairs', 'ipairs'},
+        exp_err = '"buffer" option is forbidden with merger_inst:pairs(<...>)',
+    },
+    {
+        'Bad decode type',
+        sources = {},
+        opts = {decode = 1},
+        exp_err = merger_usage('decode'),
+    },
+    {
+        'Bad decode string',
+        sources = {},
+        opts = {decode = 'bad value'},
+        exp_err = merger_usage('decode'),
+    },
+    {
+        'A table source ignores {decode = \'chain\'}',
+        sources = {{{''}}},
+        opts = {decode = 'chain'},
+        exp_err = nil,
+    },
+    {
+        'An iterator source ignores {decode = \'chain\'}',
+        sources = {{pairs({})}},
+        opts = {decode = 'chain'},
+        exp_err = nil,
+    },
+    {
+        'Bad encode type',
+        sources = {},
+        opts = {buffer = buffer.ibuf(), encode = 1},
+        funcs = {'select'},
+        exp_err = merger_usage('encode'),
+    },
+    {
+        'Bad encode string',
+        sources = {},
+        opts = {buffer = buffer.ibuf(), encode = 'bad value'},
+        funcs = {'select'},
+        exp_err = merger_usage('encode'),
+    },
+    {
+        -- Any encode value should lead to an error, but we check
+        -- only 'select' here.
+        'Use "encode" without "buffer"',
+        sources = {},
+        opts = {encode = 'select'},
+        exp_err = '"buffer" option is mandatory when "encode" is used',
+    },
+    {
+        'Use "encode_chain_len" without "encode"',
+        sources = {},
+        opts = {buffer = buffer.ibuf(), encode_chain_len = 1},
+        exp_err = '"encode_chain_len" is forbidden without ' ..
+            '{encode = \'chain\'}',
+    },
+    {
+        -- Any encode value except "chain" should lead to an
+        -- error, but we check only 'select' here.
+        'Use "encode_chain_len" with "encode" != "chain"',
+        sources = {},
+        opts = {buffer = buffer.ibuf(), encode = 'select',
+            encode_chain_len = 1},
+        exp_err = '"encode_chain_len" is forbidden without ' ..
+            '{encode = \'chain\'}',
+    },
+    {
+        'Bad encode_chain_len type',
+        sources = {},
+        opts = {buffer = buffer.ibuf(), encode = 'chain',
+            encode_chain_len = 'bad value'},
+        exp_err = merger_usage('encode_chain_len'),
+    },
+    {
+        'Bad msgpack source: wrong length of the tuples array',
+        -- Remove the last tuple from msgpack data, but keep old
+        -- tuples array size.
+        sources = {
+            truncated_msgpack_buffer({[IPROTO_DATA] = {{''}, {''}, {''}}}, 2),
+        },
+        opts = {},
+        funcs = {'select'},
+        exp_err = 'Unexpected msgpack buffer end',
+    },
+    {
+        'Bad msgpack source: wrong length of a tuple',
+        -- Remove half of the last tuple, but keep old tuple size.
+        sources = {
+            truncated_msgpack_buffer({[IPROTO_DATA] = {{''}, {''}, {''}}}, 1),
+        },
+        opts = {},
+        funcs = {'select'},
+        exp_err = 'Unexpected msgpack buffer end',
+    },
+}
+
+local schemas = {
+    {
+        name = 'small_unsigned',
+        parts = {
+            {
+                fieldno = 2,
+                type = 'unsigned',
+            }
+        },
+        gen_tuple = function(tupleno)
+            return {'id_' .. tostring(tupleno), tupleno}
+        end,
+    },
+    -- Merger allocates a memory for 8 parts by default.
+    -- Test that reallocation works properly.
+    -- Test with N-1 equal parts and Nth different.
+    {
+        name = 'many_parts',
+        parts = (function()
+            local parts = {}
+            for i = 1, 128 do
+                parts[i] = {
+                    fieldno = i,
+                    type = 'unsigned',
+                }
+            end
+            return parts
+        end)(),
+        gen_tuple = function(tupleno)
+            local tuple = {}
+            -- 127 constant parts
+            for i = 1, 127 do
+                tuple[i] = i
+            end
+            -- 128th part is varying
+            tuple[128] = tupleno
+            return tuple
+        end,
+        -- reduce tuples count to decrease test run time
+        tuples_cnt = 16,
+    },
+    -- Test null value in nullable field of an index.
+    {
+        name = 'nullable',
+        parts = {
+            {
+                fieldno = 1,
+                type = 'unsigned',
+            },
+            {
+                fieldno = 2,
+                type = 'string',
+                is_nullable = true,
+            },
+        },
+        gen_tuple = function(i)
+            if i % 1 == 1 then
+                return {0, tostring(i)}
+            else
+                return {0, box.NULL}
+            end
+        end,
+    },
+    -- Test index part with 'collation_id' option (as in net.box's
+    -- response).
+    {
+        name = 'collation_id',
+        parts = {
+            {
+                fieldno = 1,
+                type = 'string',
+                collation_id = 2, -- unicode_ci
+            },
+        },
+        gen_tuple = function(i)
+            local letters = {'a', 'b', 'c', 'A', 'B', 'C'}
+            if i <= #letters then
+                return {letters[i]}
+            else
+                return {''}
+            end
+        end,
+    },
+    -- Test index part with 'collation' option (as in local index
+    -- parts).
+    {
+        name = 'collation',
+        parts = {
+            {
+                fieldno = 1,
+                type = 'string',
+                collation = 'unicode_ci',
+            },
+        },
+        gen_tuple = function(i)
+            local letters = {'a', 'b', 'c', 'A', 'B', 'C'}
+            if i <= #letters then
+                return {letters[i]}
+            else
+                return {''}
+            end
+        end,
+    },
+}
+
+local function is_unicode_ci_part(part)
+    return part.collation_id == 2 or part.collation == 'unicode_ci'
+end
+
+local function sort_tuples(tuples, parts, opts)
+    local function tuple_comparator(a, b)
+        for _, part in ipairs(parts) do
+            local fieldno = part.fieldno
+            if a[fieldno] ~= b[fieldno] then
+                if a[fieldno] == nil then
+                    return -1
+                end
+                if b[fieldno] == nil then
+                    return 1
+                end
+                if is_unicode_ci_part(part) then
+                    return utf8.casecmp(a[fieldno], b[fieldno])
+                end
+                return a[fieldno] < b[fieldno] and -1 or 1
+            end
+        end
+
+        return 0
+    end
+
+    local function tuple_comparator_wrapper(a, b)
+        local cmp = tuple_comparator(a, b)
+        if cmp < 0 then
+            return not opts.descending
+        elseif cmp > 0 then
+            return opts.descending
+        else
+            return false
+        end
+    end
+
+    table.sort(tuples, tuple_comparator_wrapper)
+end
+
+local function lowercase_unicode_ci_fields(tuples, parts)
+    for i = 1, #tuples do
+        local tuple = tuples[i]
+        for _, part in ipairs(parts) do
+            if is_unicode_ci_part(part) then
+                -- Workaround #3709.
+                if tuple[part.fieldno]:len() > 0 then
+                    tuple[part.fieldno] = utf8.lower(tuple[part.fieldno])
+                end
+            end
+        end
+    end
+end
+
+local function prepare_data(schema, tuples_cnt, sources_cnt, opts)
+    local opts = opts or {}
+    local input_type = opts.input_type
+    local use_table_as_tuple = opts.use_table_as_tuple
+
+    local tuples = {}
+    local exp_result = {}
+
+    -- Ensure empty sources are empty table and not nil.
+    for i = 1, sources_cnt do
+        if tuples[i] == nil then
+            tuples[i] = {}
+        end
+    end
+
+    -- Prepare N tables with tuples as input for merger.
+    for i = 1, tuples_cnt do
+        -- [1, sources_cnt]
+        local guava = digest.guava(i, sources_cnt) + 1
+        local tuple = schema.gen_tuple(i)
+        table.insert(exp_result, tuple)
+        if not use_table_as_tuple then
+            assert(input_type ~= 'buffer')
+            tuple = box.tuple.new(tuple)
+        end
+        table.insert(tuples[guava], tuple)
+    end
+
+    -- Sort tuples within each source.
+    for _, source_tuples in pairs(tuples) do
+        sort_tuples(source_tuples, schema.parts, opts)
+    end
+
+    -- Sort expected result.
+    sort_tuples(exp_result, schema.parts, opts)
+
+    -- Fill sources.
+    local sources
+    if input_type == 'table' then
+        -- Imitate netbox's select w/o {buffer = ...}.
+        sources = tuples
+    elseif input_type == 'buffer' then
+        -- Imitate netbox's select with {buffer = ...}.
+        sources = {}
+        for i = 1, sources_cnt do
+            local data
+            if opts.decode == 'raw' then
+                data = tuples[i]
+            elseif opts.decode == nil or opts.decode == 'select' then
+                data = {[IPROTO_DATA] = tuples[i]}
+            elseif opts.decode == 'call' then
+                data = {[IPROTO_DATA] = {tuples[i]}}
+            elseif opts.decode == 'chain' then
+                data = {[IPROTO_DATA] = {{tuples[i]}}}
+            else
+                assert(false)
+            end
+            sources[i] = buffer.ibuf()
+            msgpackffi.internal.encode_r(sources[i], data, 0)
+        end
+    elseif input_type == 'iterator' then
+        -- Lua iterator.
+        sources = {}
+        for i = 1, sources_cnt do
+            sources[i] = {
+                -- gen (next)
+                next,
+                -- param (tuples)
+                tuples[i],
+                -- state (idx)
+                nil
+            }
+        end
+    end
+
+    return sources, exp_result
+end
+
+local function test_case_opts_str(opts)
+    local params = {}
+
+    if opts.decode then
+        table.insert(params, 'decode: ' .. opts.decode)
+    end
+
+    if opts.encode then
+        table.insert(params, 'encode: ' .. opts.encode)
+    end
+
+    if opts.input_type then
+        table.insert(params, 'input_type: ' .. opts.input_type)
+    end
+
+    if opts.output_type then
+        table.insert(params, 'output_type: ' .. opts.output_type)
+    end
+
+    if opts.descending then
+        table.insert(params, 'descending')
+    end
+
+    if opts.use_table_as_tuple then
+        table.insert(params, 'use_table_as_tuple')
+    end
+
+    if next(params) == nil then
+        return ''
+    end
+
+    return (' (%s)'):format(table.concat(params, ', '))
+end
+
+local function run_merger(test, schema, tuples_cnt, sources_cnt, opts)
+    fiber.yield()
+
+    local opts = opts or {}
+
+    -- Prepare data.
+    local sources, exp_result =
+        prepare_data(schema, tuples_cnt, sources_cnt, opts)
+
+    -- Create a merger instance and fill options.
+    local merger_inst = merger.new(schema.parts)
+    local merger_opts = {
+        decode = opts.decode,
+        encode = opts.encode,
+        descending = opts.descending,
+    }
+    if opts.output_type == 'buffer' then
+        merger_opts.buffer = buffer.ibuf()
+    end
+    if opts.encode == 'chain' then
+        merger_opts.encode_chain_len = 1
+    end
+
+    local res
+
+    -- Run merger and prepare output for compare.
+    if opts.output_type == 'table' then
+        -- Table output.
+        res = merger_inst:select(sources, merger_opts)
+    elseif opts.output_type == 'buffer' then
+        -- Buffer output.
+        merger_inst:select(sources, merger_opts)
+        local obuf = merger_opts.buffer
+        local data = msgpackffi.decode(obuf.rpos)
+
+        if opts.encode == 'raw' then
+            res = data
+        elseif opts.encode == nil or opts.encode == 'select' then
+            res = data[IPROTO_DATA]
+        elseif opts.encode == 'call' then
+            res = data[IPROTO_DATA][1]
+        elseif opts.encode == 'chain' then
+            res = data[IPROTO_DATA][1][1]
+        else
+            assert(false)
+        end
+    else
+        -- Iterator output.
+        assert(opts.output_type == 'iterator')
+        res = merger_inst:pairs(sources, merger_opts):totable()
+    end
+
+    -- A bit more postprocessing to compare.
+    for i = 1, #res do
+        if type(res[i]) ~= 'table' then
+            res[i] = res[i]:totable()
+        end
+    end
+
+    -- unicode_ci does not differentiate btw 'A' and 'a', so the
+    -- order is arbitrary. We transform fields with unicode_ci
+    -- collation in parts to lower case before comparing.
+    lowercase_unicode_ci_fields(res, schema.parts)
+    lowercase_unicode_ci_fields(exp_result, schema.parts)
+
+    test:is_deeply(res, exp_result,
+        ('check order on %3d tuples in %4d sources%s')
+        :format(tuples_cnt, sources_cnt, test_case_opts_str(opts)))
+end
+
+local function run_case(test, schema, opts)
+    local opts = opts or {}
+
+    local case_name = ('testing on schema %s%s'):format(
+        schema.name, test_case_opts_str(opts))
+    local tuples_cnt = schema.tuples_cnt or 100
+
+    local encode = opts.encode
+    local decode = opts.decode
+    local input_type = opts.input_type
+    local output_type = opts.output_type
+    local use_table_as_tuple = opts.use_table_as_tuple
+
+    -- Skip meaningless flags combinations.
+    if input_type ~= 'buffer' and decode ~= nil then
+        return
+    end
+    if output_type ~= 'buffer' and encode ~= nil then
+        return
+    end
+    if input_type == 'buffer' and not use_table_as_tuple then
+        return
+    end
+
+    test:test(case_name, function(test)
+        test:plan(6)
+
+        -- Check with small buffers count.
+        run_merger(test, schema, tuples_cnt, 1, opts)
+        run_merger(test, schema, tuples_cnt, 2, opts)
+        run_merger(test, schema, tuples_cnt, 3, opts)
+        run_merger(test, schema, tuples_cnt, 4, opts)
+        run_merger(test, schema, tuples_cnt, 5, opts)
+
+        -- Check more buffers then tuples count.
+        run_merger(test, schema, tuples_cnt, 1000, opts)
+    end)
+end
+
+local test = tap.test('merger')
+test:plan(#bad_merger_new_calls - 1 + #bad_merger_methods_calls +
+    #schemas * 126)
+
+-- Bad merger.new() calls.
+for _, case in ipairs(bad_merger_new_calls) do
+    if type(case) == 'function' then
+        case()
+    else
+        local ok, err = pcall(merger.new, case.parts)
+        test:is_deeply({ok, err}, {false, case.exp_err}, case[1])
+    end
+end
+
+-- Create the instance to use in testing merger's methods below.
+local merger_inst = merger.new({{
+    fieldno = 1,
+    type = 'string',
+}})
+
+-- Bad source or/and opts parameters for merger's methods.
+for _, case in ipairs(bad_merger_methods_calls) do
+    test:test(case[1], function(test)
+        local funcs = case.funcs or {'pairs', 'ipairs', 'select'}
+        test:plan(#funcs)
+        for _, func in ipairs(funcs) do
+            local exp_ok = case.exp_err == nil
+            local ok, err = pcall(merger_inst[func], merger_inst, case.sources,
+                case.opts)
+            if ok then
+                err = nil
+            end
+            test:is_deeply({ok, err}, {exp_ok, case.exp_err}, func)
+        end
+    end)
+end
+
+-- Merging cases.
+for _, decode in ipairs({'nil', 'raw', 'select', 'call', 'chain'}) do
+    for _, encode in ipairs({'nil', 'raw', 'select', 'call', 'chain'}) do
+        for _, input_type in ipairs({'buffer', 'table', 'iterator'}) do
+            for _, output_type in ipairs({'buffer', 'table', 'iterator'}) do
+                for _, descending in ipairs({false, true}) do
+                    for _, use_table_as_tuple in ipairs({false, true}) do
+                        for _, schema in ipairs(schemas) do
+                            decode = decode ~= 'nil' and decode or nil
+                            encode = encode ~= 'nil' and encode or nil
+                            run_case(test, schema, {
+                                decode = decode,
+                                encode = encode,
+                                input_type = input_type,
+                                output_type = output_type,
+                                descending = descending,
+                                use_table_as_tuple = use_table_as_tuple,
+                            })
+                        end
+                    end
+                end
+            end
+        end
+    end
+end
+
+os.exit(test:check() and 0 or 1)
diff --git a/test/app-tap/suite.ini b/test/app-tap/suite.ini
index 86af82637..1a2abb79f 100644
--- a/test/app-tap/suite.ini
+++ b/test/app-tap/suite.ini
@@ -2,4 +2,5 @@
 core = app
 description = application server tests (TAP)
 lua_libs = lua/require_mod.lua lua/serializer_test.lua
+long_run = merger.test.lua
 is_parallel = True
-- 
2.19.2

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [PATCH 0/3] Merger
  2018-12-16 20:17 [PATCH 0/3] Merger Alexander Turenko
                   ` (2 preceding siblings ...)
  2018-12-16 20:17 ` [PATCH 3/3] Add merger for tuple streams Alexander Turenko
@ 2018-12-18 12:16 ` Alexander Turenko
  2019-03-22 14:24 ` [tarantool-patches] [PATCH 0/3] lua: add key_def lua module Kirill Shcherbatov
  4 siblings, 0 replies; 14+ messages in thread
From: Alexander Turenko @ 2018-12-18 12:16 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

Added fixup:

diff --git a/src/lua/merger.c b/src/lua/merger.c
index 8caf8d47f..433859b0b 100644
--- a/src/lua/merger.c
+++ b/src/lua/merger.c
@@ -1352,9 +1352,12 @@ lbox_merger_select(struct lua_State *L)
 	lua_settop(L, 0); /* Pop merger, sources, [opts]. */
 
 	if (it->obuf == NULL) {
-		return create_result_table(L, merger, it);
+		create_result_table(L, merger, it);
+		merger_iterator_delete(L, it);
+		return 1;
 	} else {
 		encode_result_buffer(L, merger, it);
+		merger_iterator_delete(L, it);
 		return 0;
 	}
 }

WBR, Alexander Turenko.

On Sun, Dec 16, 2018 at 11:17:23PM +0300, Alexander Turenko wrote:
> https://github.com/tarantool/tarantool/issues/3276
> On top of 2.1: https://github.com/tarantool/tarantool/tree/Totktonada/gh-3276-on-board-merger
> On top of 1.10: https://github.com/tarantool/tarantool/tree/Totktonada/gh-3276-on-board-merger-1.10

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [PATCH 1/3] Add luaT_iscallable with support of cdata metatype
  2018-12-16 20:17 ` [PATCH 1/3] Add luaT_iscallable with support of cdata metatype Alexander Turenko
@ 2018-12-26 18:35   ` Vladimir Davydov
  2018-12-28  1:46     ` Alexander Turenko
  0 siblings, 1 reply; 14+ messages in thread
From: Vladimir Davydov @ 2018-12-26 18:35 UTC (permalink / raw)
  To: Alexander Turenko; +Cc: tarantool-patches

On Sun, Dec 16, 2018 at 11:17:24PM +0300, Alexander Turenko wrote:
> Needed for #3276.
> ---
>  extra/exports                    |  1 +
>  src/lua/utils.c                  | 43 ++++++++++++++++
>  src/lua/utils.h                  | 10 ++++
>  test/app-tap/module_api.c        | 10 ++++
>  test/app-tap/module_api.test.lua | 85 +++++++++++++++++++++++++++++++-
>  5 files changed, 147 insertions(+), 2 deletions(-)
> 
> diff --git a/extra/exports b/extra/exports
> index 5f69e0730..52f0b2378 100644
> --- a/extra/exports
> +++ b/extra/exports
> @@ -134,6 +134,7 @@ luaT_call
>  luaT_cpcall
>  luaT_state
>  luaT_tolstring
> +luaT_iscallable

Do we really need to export it?

>  box_txn
>  box_txn_begin
>  box_txn_commit
> diff --git a/src/lua/utils.c b/src/lua/utils.c
> index 978fe61f1..7a6069fbb 100644
> --- a/src/lua/utils.c
> +++ b/src/lua/utils.c
> @@ -920,6 +920,49 @@ luaT_tolstring(lua_State *L, int idx, size_t *len)
>  	return lua_tolstring(L, -1, len);
>  }
>  
> +/* Based on ffi_meta___call() from luajit/src/lib_ffi.c. */
> +static int
> +luaT_cdata_iscallable(lua_State *L, int idx)

I think this function should have prefix luaL_ rather than luaT_.
Other than that, this particular patch looks OK to me.

> +{
> +	/* Calculate absolute value in the stack. */
> +	if (idx < 0)
> +		idx = lua_gettop(L) + idx + 1;
> +
> +	/* Get cdata from the stack. */
> +	assert(lua_type(L, idx) == LUA_TCDATA);
> +	GCcdata *cd = cdataV(L->base + idx - 1);
> +
> +	CTState *cts = ctype_cts(L);
> +	CTypeID id = cd->ctypeid;
> +	CType *ct = ctype_raw(cts, id);
> +	if (ctype_isptr(ct->info))
> +		id = ctype_cid(ct->info);
> +
> +	/* Get ctype metamethod. */
> +	cTValue *tv = lj_ctype_meta(cts, id, MM_call);
> +
> +	return tv != NULL;
> +}

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [PATCH 2/3] Add module to ease using Lua iterators from C
  2018-12-16 20:17 ` [PATCH 2/3] Add module to ease using Lua iterators from C Alexander Turenko
@ 2018-12-26 18:45   ` Vladimir Davydov
  2018-12-31  6:43     ` Alexander Turenko
  0 siblings, 1 reply; 14+ messages in thread
From: Vladimir Davydov @ 2018-12-26 18:45 UTC (permalink / raw)
  To: Alexander Turenko; +Cc: tarantool-patches

On Sun, Dec 16, 2018 at 11:17:25PM +0300, Alexander Turenko wrote:
> Needed for #3276.
> ---
>  src/lua/lua_iterator.h | 117 +++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 117 insertions(+)
>  create mode 100644 src/lua/lua_iterator.h
> 
> diff --git a/src/lua/lua_iterator.h b/src/lua/lua_iterator.h
> new file mode 100644
> index 000000000..3e1a88c93
> --- /dev/null
> +++ b/src/lua/lua_iterator.h
> @@ -0,0 +1,117 @@
> +#ifndef TARANTOOL_LUA_LUA_ITERATOR_H_INCLUDED
> +#define TARANTOOL_LUA_LUA_ITERATOR_H_INCLUDED 1

Nit: we don't set an include guard macro to any particular value.

> +/*
> + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +
> +/**
> + * This module contains helper functions to interact with a Lua
> + * iterator from C.
> + */
> +
> +#include <lua.h>

extern 'C' is missing.

> +
> +/**
> + * Holds iterator state (references to Lua objects).
> + */
> +struct lua_iterator {

I guess this structure as well as the functions below should have
luaL_ prefix.

> +	int gen;
> +	int param;
> +	int state;
> +};
> +
> +/**
> + * Create a Lua iterator from {gen, param, state}.
> + */
> +struct lua_iterator *
> +lua_iterator_new_fromtable(lua_State *L, int idx)

If you include this header into two different source files, you'll get a
linker error. All functions defined in a header should be marked static
inline. Anyway, I don't think that defining these functions in a header
is a good idea - they are heavy and not supposed to be blazing fast.

May be, just move them to lua/util.c?

> +{
> +	struct lua_iterator *it = (struct lua_iterator *) malloc(
> +		sizeof(struct lua_iterator));
> +
> +	lua_rawgeti(L, idx, 1); /* Popped by luaL_ref(). */
> +	it->gen = luaL_ref(L, LUA_REGISTRYINDEX);
> +	lua_rawgeti(L, idx, 2); /* Popped by luaL_ref(). */
> +	it->param = luaL_ref(L, LUA_REGISTRYINDEX);
> +	lua_rawgeti(L, idx, 3); /* Popped by luaL_ref(). */
> +	it->state = luaL_ref(L, LUA_REGISTRYINDEX);
> +
> +	return it;
> +}
> +
> +/**
> + * Move iterator to the next value. Push values returned by
> + * gen(param, state) and return its count. Zero means no more
> + * results available.
> + */
> +int
> +lua_iterator_next(lua_State *L, struct lua_iterator *it)
> +{
> +	int frame_start = lua_gettop(L);
> +
> +	/* Call gen(param, state). */
> +	lua_rawgeti(L, LUA_REGISTRYINDEX, it->gen);
> +	lua_rawgeti(L, LUA_REGISTRYINDEX, it->param);
> +	lua_rawgeti(L, LUA_REGISTRYINDEX, it->state);
> +	lua_call(L, 2, LUA_MULTRET);
> +	int nresults = lua_gettop(L) - frame_start;
> +	if (nresults == 0) {
> +		luaL_error(L, "lua_iterator_next: gen(param, state) must "
> +			      "return at least one result");
> +		unreachable();
> +		return 0;
> +	}
> +
> +	/* The call above returns nil as the first result. */
> +	if (lua_isnil(L, frame_start + 1)) {
> +		lua_settop(L, frame_start);
> +		return 0;
> +	}
> +
> +	/* Save the first result to it->state. */
> +	luaL_unref(L, LUA_REGISTRYINDEX, it->state);
> +	lua_pushvalue(L, frame_start + 1); /* Popped by luaL_ref(). */
> +	it->state = luaL_ref(L, LUA_REGISTRYINDEX);
> +
> +	return nresults;
> +}
> +
> +/**
> + * Free all resources hold by the iterator.

Nit: s/hold/held

> + */
> +void lua_iterator_free(lua_State *L, struct lua_iterator *it)
> +{
> +	luaL_unref(L, LUA_REGISTRYINDEX, it->gen);
> +	luaL_unref(L, LUA_REGISTRYINDEX, it->param);
> +	luaL_unref(L, LUA_REGISTRYINDEX, it->state);
> +	free(it);
> +}
> +
> +#endif /* TARANTOOL_LUA_LUA_ITERATOR_H_INCLUDED */

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [PATCH 3/3] Add merger for tuple streams
  2018-12-16 20:17 ` [PATCH 3/3] Add merger for tuple streams Alexander Turenko
@ 2018-12-26 20:11   ` Vladimir Davydov
  2019-01-09 21:36     ` Alexander Turenko
  0 siblings, 1 reply; 14+ messages in thread
From: Vladimir Davydov @ 2018-12-26 20:11 UTC (permalink / raw)
  To: Alexander Turenko; +Cc: tarantool-patches

On Sun, Dec 16, 2018 at 11:17:26PM +0300, Alexander Turenko wrote:
> Fixes #3276.

DocBot request is missing. I guess you could paste that nice piece of
documentation you wrote in the source file right in the commit message.

> ---
>  src/CMakeLists.txt           |    2 +
>  src/lua/init.c               |    5 +
>  src/lua/merger.c             | 1643 ++++++++++++++++++++++++++++++++++
>  src/lua/merger.h             |   39 +
>  src/lua/merger.lua           |   19 +

Merger depends on tuple, key_def, and tuple_format, which are box
objects, hence it should be defined in src/box.

My main concern about the merger code added by this patch is that the
merger logic and its Lua wrapper are mixed together in the same file.
This makes the code difficult for understanding. It'd be great if you
could isolate the merger logic in src/box/merger.[hc] and leave only the
Lua wrapper code in src/box/lua/merger.[hc]. The merger could then be
submitted in a separate patch with an appropriate unit test - this would
also ease the review process.

A few comments regarding the API and the code are below.

>  test/app-tap/merger.test.lua |  693 ++++++++++++++
>  test/app-tap/suite.ini       |    1 +
>  7 files changed, 2402 insertions(+)
>  create mode 100644 src/lua/merger.c
>  create mode 100644 src/lua/merger.h
>  create mode 100644 src/lua/merger.lua
>  create mode 100755 test/app-tap/merger.test.lua

> diff --git a/src/lua/merger.c b/src/lua/merger.c
> +/**
> + * API and basic usage
> + * -------------------
> + *
> + * The following example demonstrates API of the module:
> + *
> + * ```
> + * local net_box = require('net.box')
> + * local buffer = require('buffer')
> + * local merger = require('merger')
> + *
> + * -- The format of key_parts parameter is the same as
> + * -- `{box,conn}.space.<...>.index.<...>.parts` (where conn is
> + * -- net.box connection).
> + * local key_parts = {
> + *     {
> + *         fieldno = <number>,
> + *         type = <string>,
> + *         [ is_nullable = <boolean>, ]
> + *         [ collation_id = <number>, ]
> + *         [ collation = <string>, ]
> + *     },
> + *     ...
> + * }
> + *
> + * -- Create the merger instance.
> + * local merger_inst = merger.new(key_parts)

IMO it's not a merger instance, it's rather a merger context.

What about

  local ctx = merger.context.new(key_parts)
  merger.pairs(ctx, {src1, src2, ...})

?

Note, ctx is a mere argument here, not a class object.

> + * Decoding / encoding buffers
> + * ---------------------------
> + *
> + * A select response has the following structure:
> + * `{[48] = {tuples}}`, while a call response is
> + * `{[48] = {{tuples}}}` (because it should support multiple
> + * return values). A user should specify how merger will
> + * operate on buffers, so merger has `decode` (how to read buffer
> + * sources) and `encode` (how to write to a resulting buffer)
> + * options. These options accept the following values:
> + *
> + * Option value       | Buffer structure
> + * ------------------ | ----------------
> + * 'raw'              | tuples
> + * 'select' (default) | {[48] = {tuples}}
> + * 'call'             | {[48] = {{tuples}}}
> + * 'chain'            | {[48] = {{{tuples, ...}}}}

I don't think we should make merger dependent on iproto. I understand
that it must be able to take an ibuf for performance considerations, but
I think that the buffer must always be formatted as a msgpack array of
tuples, without any extra headers. Headers should be removed either by
the caller (with msgpack Lua lib) or by net.box itself.

> +#ifndef NDEBUG
> +#include "say.h"
> +/**
> + * Heap insert/delete/update macros wrapped with debug prints.
> + */
> +#define MERGER_HEAP_INSERT(heap, hnode, source) do {			\
> +	say_debug("merger: [source %p] insert: tuple: %s", (source),	\
> +		  tuple_str((source)->tuple));				\
> +	merger_heap_insert((heap), (hnode));				\
> +} while(0)
> +#define MERGER_HEAP_DELETE(heap, hnode, source) do {		\
> +	say_debug("merger: [source %p] delete", (source));	\
> +	merger_heap_delete((heap), (hnode));			\
> +} while(0)
> +#define MERGER_HEAP_UPDATE(heap, hnode, source) do {			\
> +	say_debug("merger: [source %p] update: tuple: %s", (source),	\
> +		  tuple_str((source)->tuple));				\
> +	merger_heap_update((heap), (hnode));				\
> +} while(0)
> +#else /* !defined(NDEBUG) */
> +/**
> + * Heap insert/delete/update macros wrappers w/o debug prints.
> + */
> +#define MERGER_HEAP_INSERT(heap, hnode, source) do {	\
> +	merger_heap_insert((heap), (hnode));		\
> +} while(0)
> +#define MERGER_HEAP_DELETE(heap, hnode, source) do {	\
> +	merger_heap_delete((heap), (hnode));		\
> +} while(0)
> +#define MERGER_HEAP_UPDATE(heap, hnode, source) do {	\
> +	merger_heap_update((heap), (hnode));		\
> +} while(0)
> +#endif /* !defined(NDEBUG) */

say_debug() doesn't evaluate arguments if log_level < DEBUG so I think
it's no use to disable it if NDEBUG. I'd remove these macros and call
say_debug() directly at call sites.

Also, heap function may fail with ENOMEM unless you use heap_reserve().

> +
> +/**
> + * Helper macros to push / throw out of memory errors to Lua.
> + */
> +#define push_out_of_memory_error(L, size, what_name) do {	\
> +	diag_set(OutOfMemory, (size), "malloc", (what_name));	\
> +	luaT_pusherror(L, diag_last_error(diag_get()));		\
> +} while(0)
> +#define throw_out_of_memory_error(L, size, what_name) do {	\
> +	diag_set(OutOfMemory, (size), "malloc", (what_name));	\
> +	luaT_error(L);						\
> +	unreachable();						\
> +	return -1;						\
> +} while(0)

I wouldn't use these macros. They make a call only one line shorter:

	throw_out_of_memory_error(L, size, "obj");

instead of

	diag_set(OutOfMemory, size, "malloc", "obj");
	return luaT_error(L);

Not worth obscuring the code IMO.

> +
> +#define BOX_COLLATION_NAME_INDEX 1

Not used anywhere.

> +
> +/**
> + * A type of data structure that holds source data.
> + */
> +enum merger_source_type {
> +	SOURCE_TYPE_BUFFER,
> +	SOURCE_TYPE_TABLE,
> +	SOURCE_TYPE_ITERATOR,
> +	SOURCE_TYPE_NONE,
> +};

I'd prefer if you used vtab instead, because that would isolate code
and data of each iterator type in a separate function/struct, making
it easier to follow. Besides, you'll have to do that anyway provided
you agree to move the merger logic to src/box/merger.c.

> +
> +/**
> + * How data are encoded in a buffer.
> + *
> + * `decode` and `encode` options are parsed to values of this
> + * enum.
> + */
> +enum merger_buffer_type {
> +	BUFFER_TYPE_RAW,
> +	BUFFER_TYPE_SELECT,
> +	BUFFER_TYPE_CALL,
> +	BUFFER_TYPE_CHAIN,
> +	BUFFER_TYPE_NONE,
> +};
> +
> +/**
> + * Hold state of a merge source.
> + */
> +struct merger_source {
> +	/*
> +	 * A source is the heap node. Compared by the next tuple.
> +	 */
> +	struct heap_node hnode;
> +	/* Union determinant. */
> +	enum merger_source_type type;
> +	/* Fields specific for certaint source types. */
> +	union {
> +		/* Buffer source. */
> +		struct {
> +			struct ibuf *buf;
> +			/*
> +			 * A merger stops before end of a buffer
> +			 * when it is not the last merger in the
> +			 * chain.
> +			 */
> +			size_t remaining_tuples_cnt;
> +		} buf;
> +		/* Table source. */
> +		struct {
> +			int ref;
> +			int next_idx;
> +		} tbl;
> +		/* Iterator source. */
> +		struct {
> +			struct lua_iterator *it;
> +		} it;
> +	};
> +	/* Next tuple. */
> +	struct tuple *tuple;
> +};
> +
> +/**
> + * Holds immutable parameters of a merger.
> + */
> +struct merger {

Should be called merger_context IMO.

> +	struct key_def *key_def;
> +	box_tuple_format_t *format;
> +};
> +
> +/**
> + * Holds parameters of merge process, sources, result storage
> + * (if any), heap of sources and utility flags / counters.
> + */
> +struct merger_iterator {

Should be called merger or merger_state IMO.

> +	/* Heap of sources. */
> +	heap_t heap;
> +	/*
> +	 * key_def is copied from merger.
> +	 *
> +	 * A merger can be collected by LuaJIT GC independently
> +	 * from a merger_iterator, so we cannot just save pointer
> +	 * to merger here and so we copy key_def from merger.
> +	 */
> +	struct key_def *key_def;

And what about the tuple format? You don't seem to copy or reference it
anywhere. Confusing. I think that if the iterator needs the merger to
stay along, it'd better reference it.

> +	/* Parsed sources and decoding parameters. */
> +	uint32_t sources_count;
> +	struct merger_source **sources;
> +	enum merger_buffer_type decode;
> +	/* Ascending / descending order. */
> +	int order;
> +	/* Optional output buffer and encoding parameters. */
> +	struct ibuf *obuf;
> +	enum merger_buffer_type encode;
> +	uint32_t encode_chain_len;
> +};
> +
> +static uint32_t merger_type_id = 0;
> +static uint32_t merger_iterator_type_id = 0;
> +static uint32_t ibuf_type_id = 0;
> +
> +/* Forward declarations. */
> +static bool
> +source_less(const heap_t *heap, const struct heap_node *a,
> +	    const struct heap_node *b);
> +static int
> +lbox_merger_gc(struct lua_State *L);
> +static void
> +merger_iterator_delete(struct lua_State *L, struct merger_iterator *it);
> +static int
> +lbox_merger_iterator_gc(struct lua_State *L);
> +
> +#define HEAP_NAME merger_heap
> +#define HEAP_LESS source_less
> +#include "salad/heap.h"
> +
> +/**
> + * Create the new tuple with specific format from a Lua table or a
> + * tuple.
> + *
> + * In case of an error push the error message to the Lua stack and
> + * return NULL.
> + */
> +static struct tuple *
> +luaT_gettuple_with_format(struct lua_State *L, int idx,
> +			  box_tuple_format_t *format)
> +{
> +	struct tuple *tuple;
> +	if (lua_istable(L, idx)) {
> +		/* Based on lbox_tuple_new() code. */

Please define this as a separate function somewhere in
src/box/lua/tuple.c and reuse lbox_tuple_new() code instead
of copying it. May be done in a separate patch, I guess.

> +		struct ibuf *buf = tarantool_lua_ibuf;
> +		ibuf_reset(buf);
> +		struct mpstream stream;
> +		mpstream_init(&stream, buf, ibuf_reserve_cb, ibuf_alloc_cb,
> +		      luamp_error, L);
> +		luamp_encode_tuple(L, luaL_msgpack_default, &stream, idx);
> +		mpstream_flush(&stream);
> +		tuple = box_tuple_new(format, buf->buf,
> +				      buf->buf + ibuf_used(buf));
> +		if (tuple == NULL) {
> +			luaT_pusherror(L, diag_last_error(diag_get()));
> +			return NULL;
> +		}
> +		ibuf_reinit(tarantool_lua_ibuf);
> +		return tuple;
> +	}
> +	tuple = luaT_istuple(L, idx);
> +	if (tuple == NULL) {
> +		lua_pushfstring(L, "A tuple or a table expected, got %s",
> +				lua_typename(L, lua_type(L, -1)));
> +		return NULL;
> +	}
> +	/*
> +	 * Create the new tuple with the format necessary for fast
> +	 * comparisons.
> +	 */
> +	const char *tuple_beg = tuple_data(tuple);
> +	const char *tuple_end = tuple_beg + tuple->bsize;
> +	tuple = box_tuple_new(format, tuple_beg, tuple_end);
> +	if (tuple == NULL) {
> +		luaT_pusherror(L, diag_last_error(diag_get()));
> +		return NULL;
> +	}
> +	return tuple;
> +}

> +#define RPOS_P(buf) ((const char **) &(buf)->rpos)

Please, let's somehow get along without this macro.

> +
> +/**
> + * Skip (and check) the wrapper around tuples array (and the array
> + * itself).
> + *
> + * Expected different kind of wrapping depending of it->decode.
> + */
> +static int
> +decode_header(struct merger_iterator *it, struct ibuf *buf, size_t *len_p)
> +{
> +	int ok = 1;
> +	/* Decode {[IPROTO_DATA] = ...} header. */
> +	if (it->decode != BUFFER_TYPE_RAW)
> +		ok = mp_typeof(*buf->rpos) == MP_MAP &&
> +			mp_decode_map(RPOS_P(buf)) == 1 &&
> +			mp_typeof(*buf->rpos) == MP_UINT &&
> +			mp_decode_uint(RPOS_P(buf)) == IPROTO_DATA;
> +	/* Decode the array around call return values. */
> +	if (ok && (it->decode == BUFFER_TYPE_CALL ||
> +	    it->decode == BUFFER_TYPE_CHAIN))
> +		ok = mp_typeof(*buf->rpos) == MP_ARRAY &&
> +			mp_decode_array(RPOS_P(buf)) > 0;
> +	/* Decode the array around chained input. */
> +	if (ok && it->decode == BUFFER_TYPE_CHAIN)
> +		ok = mp_typeof(*buf->rpos) == MP_ARRAY &&
> +			mp_decode_array(RPOS_P(buf)) > 0;
> +	/* Decode the array around tuples to merge. */
> +	if (ok)
> +		ok = mp_typeof(*buf->rpos) == MP_ARRAY;
> +	if (ok)
> +		*len_p = mp_decode_array(RPOS_P(buf));
> +	return ok;
> +}
> +
> +#undef RPOS_P

> +/**
> + * Determine type of a merger source on the Lua stack.
> + *
> + * Set *buf_p to buffer when the source is valid source of buffer
> + * type and buf_p is not NULL.
> + */
> +static enum merger_source_type
> +parse_source_type(lua_State *L, int idx, struct ibuf **buf_p)
> +{
> +	if (lua_type(L, idx) == LUA_TCDATA) {
> +		struct ibuf *buf = check_ibuf(L, idx);

msgpack.decode takes ibuf.rpos rather than ibuf. May be, the merger
should do the same, for consistency?

> +		if (buf == NULL)
> +			return SOURCE_TYPE_NONE;
> +		if (buf_p != NULL)
> +			*buf_p = buf;
> +		return SOURCE_TYPE_BUFFER;
> +	} else if (lua_istable(L, idx)) {
> +		lua_rawgeti(L, idx, 1);
> +		int iscallable = luaT_iscallable(L, idx);
> +		lua_pop(L, 1);
> +		if (iscallable)
> +			return SOURCE_TYPE_ITERATOR;
> +		return SOURCE_TYPE_TABLE;
> +	}
> +
> +	return SOURCE_TYPE_NONE;
> +}

> +/**
> + * Parse sources table: second parameter pf merger_isnt:pairs()
> + * and merger_inst:select() into the merger_iterator structure.
> + *
> + * Note: This function should be called when options are already
> + * parsed (using parse_opts()).
> + *
> + * Returns 0 on success. In case of an error it pushes an error
> + * message to the Lua stack and returns 1.
> + */
> +static int
> +parse_sources(struct lua_State *L, int idx, struct merger *merger,
> +	      struct merger_iterator *it)
> +{
> +	/* Allocate sources array. */
> +	uint32_t capacity = 8;
> +	const ssize_t sources_size = capacity * sizeof(struct merger_source *);
> +	it->sources = (struct merger_source **) malloc(sources_size);
> +	if (it->sources == NULL) {
> +		push_out_of_memory_error(L, sources_size, "it->sources");
> +		return 1;
> +	}
> +
> +	/* Fetch all sources. */
> +	while (true) {
> +		lua_pushinteger(L, it->sources_count + 1);
> +		lua_gettable(L, idx);
> +		if (lua_isnil(L, -1))
> +			break;
> +
> +		/* Shrink sources array if needed. */

Grow

> +		if (it->sources_count == capacity) {
> +			capacity *= 2;
> +			struct merger_source **new_sources;
> +			const ssize_t new_sources_size =
> +				capacity * sizeof(struct merger_source *);
> +			new_sources = (struct merger_source **) realloc(
> +				it->sources, new_sources_size);
> +			if (new_sources == NULL) {
> +				push_out_of_memory_error(L, new_sources_size,
> +							 "new_sources");
> +				return 1;
> +			}
> +			it->sources = new_sources;
> +		}
> +
> +		/* Allocate the new source. */
> +		it->sources[it->sources_count] = (struct merger_source *)
> +			malloc(sizeof(struct merger_source));
> +		struct merger_source *current_source =
> +			it->sources[it->sources_count];
> +		if (current_source == NULL) {
> +			push_out_of_memory_error(L,
> +						 sizeof(struct merger_source),
> +						 "merger_source");
> +			return 1;
> +		}
> +
> +		/*
> +		 * Set type and tuple to correctly proceed in
> +		 * merger_iterator_delete() in case of any further
> +		 * error.
> +		 */
> +		struct ibuf *buf = NULL;
> +		current_source->type = parse_source_type(L, -1, &buf);
> +		current_source->tuple = NULL;
> +
> +		/*
> +		 * Note: We need to increment sources count right
> +		 * after successful malloc() of the new source
> +		 * (before any further error check), because
> +		 * merger_iterator_delete() frees that amount of
> +		 * sources.
> +		 */
> +		++it->sources_count;
> +
> +		/* Initialize the new source. */
> +		switch (current_source->type) {
> +		case SOURCE_TYPE_BUFFER:
> +			if (!decode_header(it, buf,
> +			    &current_source->buf.remaining_tuples_cnt)) {
> +				lua_pushstring(L, "Invalid merge source");
> +				return 1;
> +			}
> +			current_source->buf.buf = buf;
> +			break;
> +		case SOURCE_TYPE_TABLE:
> +			/* Save a table ref and a next index. */
> +			lua_pushvalue(L, -1); /* Popped by luaL_ref(). */
> +			int tbl_ref = luaL_ref(L, LUA_REGISTRYINDEX);
> +			current_source->tbl.ref = tbl_ref;
> +			current_source->tbl.next_idx = 1;
> +			break;
> +		case SOURCE_TYPE_ITERATOR:
> +			/* Wrap and save iterator. */
> +			current_source->it.it =
> +				lua_iterator_new_fromtable(L, -1);
> +			break;
> +		case SOURCE_TYPE_NONE:
> +			lua_pushfstring(L, "Unknown source type at index %d",
> +					it->sources_count);
> +			return 1;
> +		default:
> +			unreachable();
> +			return 1;
> +		}
> +		if (source_fetch(L, current_source, merger->format) != 0)
> +			return 1;
> +		if (current_source->tuple != NULL)
> +			MERGER_HEAP_INSERT(&it->heap,
> +					   &current_source->hnode,
> +					   current_source);
> +	}
> +	lua_pop(L, it->sources_count + 1);

This function is soo long. Let's split it.

> +
> +	return 0;
> +}

> +/**
> + * Create the new merger instance.
> + *
> + * Expected a table of key parts on the Lua stack.
> + *
> + * Returns the new instance.
> + */
> +static int
> +lbox_merger_new(struct lua_State *L)
> +{
> +	if (lua_gettop(L) != 1 || lua_istable(L, 1) != 1)
> +		return luaL_error(L, "Bad params, use: merger.new({"
> +				  "{fieldno = fieldno, type = type"
> +				  "[, is_nullable = is_nullable"
> +				  "[, collation_id = collation_id"
> +				  "[, collation = collation]]]}, ...}");
> +	uint32_t key_parts_count = 0;
> +	uint32_t capacity = 8;
> +
> +	const ssize_t parts_size = sizeof(struct key_part_def) * capacity;
> +	struct key_part_def *parts = NULL;
> +	parts = (struct key_part_def *) malloc(parts_size);
> +	if (parts == NULL)
> +		throw_out_of_memory_error(L, parts_size, "parts");
> +
> +	while (true) {
> +		lua_pushinteger(L, key_parts_count + 1);
> +		lua_gettable(L, 1);
> +		if (lua_isnil(L, -1))
> +			break;
> +
> +		/* Extend parts if necessary. */
> +		if (key_parts_count == capacity) {

Let's factor out key_def creation and define it elsewhere
(src/box/lua/key_def.[hc]?)

Can we somehow reuse the code of key_def_decode_parts for this?

> +			capacity *= 2;
> +			struct key_part_def *old_parts = parts;
> +			const ssize_t parts_size =
> +				sizeof(struct key_part_def) * capacity;
> +			parts = (struct key_part_def *) realloc(parts,
> +								parts_size);
> +			if (parts == NULL) {
> +				free(old_parts);
> +				throw_out_of_memory_error(L, parts_size,
> +							  "parts");
> +			}
> +		}
> +
> +		/* Set parts[key_parts_count].fieldno. */
> +		lua_pushstring(L, "fieldno");
> +		lua_gettable(L, -2);
> +		if (lua_isnil(L, -1)) {
> +			free(parts);
> +			return luaL_error(L, "fieldno must not be nil");
> +		}
> +		/*
> +		 * Transform one-based Lua fieldno to zero-based
> +		 * fieldno to use in key_def_new().
> +		 */
> +		parts[key_parts_count].fieldno = lua_tointeger(L, -1) - 1;
> +		lua_pop(L, 1);
> +
> +		/* Set parts[key_parts_count].type. */
> +		lua_pushstring(L, "type");
> +		lua_gettable(L, -2);
> +		if (lua_isnil(L, -1)) {
> +			free(parts);
> +			return luaL_error(L, "type must not be nil");
> +		}
> +		size_t type_len;
> +		const char *type_name = lua_tolstring(L, -1, &type_len);
> +		lua_pop(L, 1);
> +		parts[key_parts_count].type = field_type_by_name(type_name,
> +								 type_len);
> +		if (parts[key_parts_count].type == field_type_MAX) {
> +			free(parts);
> +			return luaL_error(L, "Unknown field type: %s",
> +					  type_name);
> +		}
> +
> +		/* Set parts[key_parts_count].is_nullable. */
> +		lua_pushstring(L, "is_nullable");
> +		lua_gettable(L, -2);
> +		if (lua_isnil(L, -1)) {
> +			parts[key_parts_count].is_nullable = false;
> +			parts[key_parts_count].nullable_action =
> +				ON_CONFLICT_ACTION_DEFAULT;
> +		} else {
> +			parts[key_parts_count].is_nullable =
> +				lua_toboolean(L, -1);
> +			parts[key_parts_count].nullable_action =
> +				ON_CONFLICT_ACTION_NONE;
> +		}
> +		lua_pop(L, 1);
> +
> +		/* Set parts[key_parts_count].coll_id using collation_id. */
> +		lua_pushstring(L, "collation_id");
> +		lua_gettable(L, -2);
> +		if (lua_isnil(L, -1))
> +			parts[key_parts_count].coll_id = COLL_NONE;
> +		else
> +			parts[key_parts_count].coll_id = lua_tointeger(L, -1);
> +		lua_pop(L, 1);
> +
> +		/* Set parts[key_parts_count].coll_id using collation. */
> +		lua_pushstring(L, "collation");
> +		lua_gettable(L, -2);
> +		/* Check whether box.cfg{} was called. */
> +		if ((parts[key_parts_count].coll_id != COLL_NONE ||
> +		    !lua_isnil(L, -1)) && !box_is_configured()) {
> +			free(parts);
> +			return luaL_error(L, "Cannot use collations: "
> +					  "please call box.cfg{}");
> +		}
> +		if (!lua_isnil(L, -1)) {
> +			if (parts[key_parts_count].coll_id != COLL_NONE) {
> +				free(parts);
> +				return luaL_error(
> +					L, "Conflicting options: collation_id "
> +					"and collation");
> +			}
> +			size_t coll_name_len;
> +			const char *coll_name = lua_tolstring(L, -1,
> +							      &coll_name_len);
> +			struct coll_id *coll_id = coll_by_name(coll_name,
> +							       coll_name_len);
> +			if (coll_id == NULL) {
> +				free(parts);
> +				return luaL_error(
> +					L, "Unknown collation: \"%s\"",
> +					coll_name);
> +			}
> +			parts[key_parts_count].coll_id = coll_id->id;
> +		}
> +		lua_pop(L, 1);
> +
> +		/* Check coll_id. */
> +		struct coll_id *coll_id =
> +			coll_by_id(parts[key_parts_count].coll_id);
> +		if (parts[key_parts_count].coll_id != COLL_NONE &&
> +		    coll_id == NULL) {
> +			uint32_t collation_id = parts[key_parts_count].coll_id;
> +			free(parts);
> +			return luaL_error(L, "Unknown collation_id: %d",
> +					  collation_id);
> +		}
> +
> +		/* Set parts[key_parts_count].sort_order. */
> +		parts[key_parts_count].sort_order = SORT_ORDER_ASC;
> +
> +		++key_parts_count;
> +	}
> +
> +	struct merger *merger = calloc(1, sizeof(*merger));
> +	if (merger == NULL) {
> +		free(parts);
> +		throw_out_of_memory_error(L, sizeof(*merger), "merger");
> +	}
> +	merger->key_def = key_def_new(parts, key_parts_count);
> +	free(parts);
> +	if (merger->key_def == NULL) {
> +		return luaL_error(L, "Cannot create merger->key_def");
> +	}
> +
> +	merger->format = box_tuple_format_new(&merger->key_def, 1);
> +	if (merger->format == NULL) {
> +		box_key_def_delete(merger->key_def);
> +		free(merger);
> +		return luaL_error(L, "Cannot create merger->format");
> +	}
> +
> +	*(struct merger **) luaL_pushcdata(L, merger_type_id) = merger;
> +
> +	lua_pushcfunction(L, lbox_merger_gc);
> +	luaL_setcdatagc(L, -2);
> +
> +	return 1;
> +}

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [PATCH 1/3] Add luaT_iscallable with support of cdata metatype
  2018-12-26 18:35   ` Vladimir Davydov
@ 2018-12-28  1:46     ` Alexander Turenko
  2018-12-28  8:00       ` Vladimir Davydov
  0 siblings, 1 reply; 14+ messages in thread
From: Alexander Turenko @ 2018-12-28  1:46 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

Updated Totktonada/gh-3276-on-board-merger and
Totktonada/gh-3276-on-board-merger-1.10.

On Wed, Dec 26, 2018 at 09:35:56PM +0300, Vladimir Davydov wrote:
> On Sun, Dec 16, 2018 at 11:17:24PM +0300, Alexander Turenko wrote:
> > Needed for #3276.
> > ---
> >  extra/exports                    |  1 +
> >  src/lua/utils.c                  | 43 ++++++++++++++++
> >  src/lua/utils.h                  | 10 ++++
> >  test/app-tap/module_api.c        | 10 ++++
> >  test/app-tap/module_api.test.lua | 85 +++++++++++++++++++++++++++++++-
> >  5 files changed, 147 insertions(+), 2 deletions(-)
> > 
> > diff --git a/extra/exports b/extra/exports
> > index 5f69e0730..52f0b2378 100644
> > --- a/extra/exports
> > +++ b/extra/exports
> > @@ -134,6 +134,7 @@ luaT_call
> >  luaT_cpcall
> >  luaT_state
> >  luaT_tolstring
> > +luaT_iscallable
> 
> Do we really need to export it?

No, we don't obligated to export it. I'll remove it and rewrite the test
case (at least I'll try) if you sure we shouldn't. But I think it is
better to leave it as the public function.

I think luajit's support of cdata (in C/Lua APIs) could be better and we
can invest some effort to make it better for users of our C API and ffi.
Also filed #3915.

> 
> >  box_txn
> >  box_txn_begin
> >  box_txn_commit
> > diff --git a/src/lua/utils.c b/src/lua/utils.c
> > index 978fe61f1..7a6069fbb 100644
> > --- a/src/lua/utils.c
> > +++ b/src/lua/utils.c
> > @@ -920,6 +920,49 @@ luaT_tolstring(lua_State *L, int idx, size_t *len)
> >  	return lua_tolstring(L, -1, len);
> >  }
> >  
> > +/* Based on ffi_meta___call() from luajit/src/lib_ffi.c. */
> > +static int
> > +luaT_cdata_iscallable(lua_State *L, int idx)
> 
> I think this function should have prefix luaL_ rather than luaT_.
> Other than that, this particular patch looks OK to me.
> 

Fixed.

> > +{
> > +	/* Calculate absolute value in the stack. */
> > +	if (idx < 0)
> > +		idx = lua_gettop(L) + idx + 1;
> > +
> > +	/* Get cdata from the stack. */
> > +	assert(lua_type(L, idx) == LUA_TCDATA);
> > +	GCcdata *cd = cdataV(L->base + idx - 1);
> > +
> > +	CTState *cts = ctype_cts(L);
> > +	CTypeID id = cd->ctypeid;
> > +	CType *ct = ctype_raw(cts, id);
> > +	if (ctype_isptr(ct->info))
> > +		id = ctype_cid(ct->info);
> > +
> > +	/* Get ctype metamethod. */
> > +	cTValue *tv = lj_ctype_meta(cts, id, MM_call);
> > +
> > +	return tv != NULL;
> > +}

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [PATCH 1/3] Add luaT_iscallable with support of cdata metatype
  2018-12-28  1:46     ` Alexander Turenko
@ 2018-12-28  8:00       ` Vladimir Davydov
  0 siblings, 0 replies; 14+ messages in thread
From: Vladimir Davydov @ 2018-12-28  8:00 UTC (permalink / raw)
  To: Alexander Turenko; +Cc: tarantool-patches

On Fri, Dec 28, 2018 at 04:46:50AM +0300, Alexander Turenko wrote:
> Updated Totktonada/gh-3276-on-board-merger and
> Totktonada/gh-3276-on-board-merger-1.10.
> 
> On Wed, Dec 26, 2018 at 09:35:56PM +0300, Vladimir Davydov wrote:
> > On Sun, Dec 16, 2018 at 11:17:24PM +0300, Alexander Turenko wrote:
> > > Needed for #3276.
> > > ---
> > >  extra/exports                    |  1 +
> > >  src/lua/utils.c                  | 43 ++++++++++++++++
> > >  src/lua/utils.h                  | 10 ++++
> > >  test/app-tap/module_api.c        | 10 ++++
> > >  test/app-tap/module_api.test.lua | 85 +++++++++++++++++++++++++++++++-
> > >  5 files changed, 147 insertions(+), 2 deletions(-)
> > > 
> > > diff --git a/extra/exports b/extra/exports
> > > index 5f69e0730..52f0b2378 100644
> > > --- a/extra/exports
> > > +++ b/extra/exports
> > > @@ -134,6 +134,7 @@ luaT_call
> > >  luaT_cpcall
> > >  luaT_state
> > >  luaT_tolstring
> > > +luaT_iscallable
> > 
> > Do we really need to export it?
> 
> No, we don't obligated to export it. I'll remove it and rewrite the test
> case (at least I'll try) if you sure we shouldn't. But I think it is

Ah, you need it for a unit test. OK, let's leave it then.

> better to leave it as the public function.
> 
> I think luajit's support of cdata (in C/Lua APIs) could be better and we
> can invest some effort to make it better for users of our C API and ffi.
> Also filed #3915.

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [PATCH 2/3] Add module to ease using Lua iterators from C
  2018-12-26 18:45   ` Vladimir Davydov
@ 2018-12-31  6:43     ` Alexander Turenko
  0 siblings, 0 replies; 14+ messages in thread
From: Alexander Turenko @ 2018-12-31  6:43 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

On Wed, Dec 26, 2018 at 09:45:09PM +0300, Vladimir Davydov wrote:
> On Sun, Dec 16, 2018 at 11:17:25PM +0300, Alexander Turenko wrote:
> > Needed for #3276.
> > ---
> >  src/lua/lua_iterator.h | 117 +++++++++++++++++++++++++++++++++++++++++
> >  1 file changed, 117 insertions(+)
> >  create mode 100644 src/lua/lua_iterator.h
> > 
> > diff --git a/src/lua/lua_iterator.h b/src/lua/lua_iterator.h
> > new file mode 100644
> > index 000000000..3e1a88c93
> > --- /dev/null
> > +++ b/src/lua/lua_iterator.h
> > @@ -0,0 +1,117 @@
> > +#ifndef TARANTOOL_LUA_LUA_ITERATOR_H_INCLUDED
> > +#define TARANTOOL_LUA_LUA_ITERATOR_H_INCLUDED 1
> 
> Nit: we don't set an include guard macro to any particular value.
> 

I removed this file now, but I fixed guard in merger.h.

> > > > +/**
> > + * This module contains helper functions to interact with a Lua
> > + * iterator from C.
> > + */
> > +
> > +#include <lua.h>
> 
> extern 'C' is missing.
> 

The declarations was moved to utils.h, under extern 'C'.

> > +
> > +/**
> > + * Holds iterator state (references to Lua objects).
> > + */
> > +struct lua_iterator {
> 
> I guess this structure as well as the functions below should have
> luaL_ prefix.
> 

The idea was to name the class 'lua iterator', but now I see it looks
like namespace/prefix 'lua'. Replaced 'lua' with 'luaL'.

> > +	int gen;
> > +	int param;
> > +	int state;
> > +};
> > +
> > +/**
> > + * Create a Lua iterator from {gen, param, state}.
> > + */
> > +struct lua_iterator *
> > +lua_iterator_new_fromtable(lua_State *L, int idx)
> 
> If you include this header into two different source files, you'll get a
> linker error. All functions defined in a header should be marked static
> inline. Anyway, I don't think that defining these functions in a header
> is a good idea - they are heavy and not supposed to be blazing fast.
> 

Ouch, thanks.

> May be, just move them to lua/util.c?

Moved to src/lua/utils.[ch]. I feel 'one class is one module' approach
as kinda more structured, but this is really small one. I wrapped this
functions into {{{ }}} block instead.

> > +/**
> > + * Free all resources hold by the iterator.
> 
> Nit: s/hold/held

Fixed.

> 
> > + */
> > +void lua_iterator_free(lua_State *L, struct lua_iterator *it)
> > +{
> > +	luaL_unref(L, LUA_REGISTRYINDEX, it->gen);
> > +	luaL_unref(L, LUA_REGISTRYINDEX, it->param);
> > +	luaL_unref(L, LUA_REGISTRYINDEX, it->state);
> > +	free(it);
> > +}
> > +
> > +#endif /* TARANTOOL_LUA_LUA_ITERATOR_H_INCLUDED */

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [PATCH 3/3] Add merger for tuple streams
  2018-12-26 20:11   ` Vladimir Davydov
@ 2019-01-09 21:36     ` Alexander Turenko
  0 siblings, 0 replies; 14+ messages in thread
From: Alexander Turenko @ 2019-01-09 21:36 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

Updated most of aspects you point me.

Now the patchset looks so:

* Add luaL_iscallable with support of cdata metatype
* Add functions to ease using Lua iterators from C
* lua: add luaT_newtuple()
* lua: add luaT_new_key_def()
* net.box: add helpers to decode msgpack headers
* Add merger for tuple streams

I'll resend it with v2. It is more to end the iteration in some clean
way then to review, because the chunked data transfer API research is
still on my side.

Totktonada/gh-3276-on-board-merger-1.10 branch stale for now. I'll
update it if it will be requested explicitly.
Totktonada/gh-3276-on-board-merger was updated.

I updated graphql usage of merger (it still don't merged awaiting
stabilizing the merger API):

https://github.com/tarantool/graphql/commit/a442f6451d407c5833abef5b14303662cfd16a62
https://github.com/tarantool/graphql/commit/738501641a43e7602567469109ab0032b3c78896

WBR, Alexander Turenko.

On Wed, Dec 26, 2018 at 11:11:10PM +0300, Vladimir Davydov wrote:
> On Sun, Dec 16, 2018 at 11:17:26PM +0300, Alexander Turenko wrote:
> > Fixes #3276.
> 
> DocBot request is missing. I guess you could paste that nice piece of
> documentation you wrote in the source file right in the commit message.
> 

Thanks. Done.

> > ---
> >  src/CMakeLists.txt           |    2 +
> >  src/lua/init.c               |    5 +
> >  src/lua/merger.c             | 1643 ++++++++++++++++++++++++++++++++++
> >  src/lua/merger.h             |   39 +
> >  src/lua/merger.lua           |   19 +
> 
> Merger depends on tuple, key_def, and tuple_format, which are box
> objects, hence it should be defined in src/box.
> 

Moved to src/box/lua/merger.[ch].

> My main concern about the merger code added by this patch is that the
> merger logic and its Lua wrapper are mixed together in the same file.
> This makes the code difficult for understanding. It'd be great if you
> could isolate the merger logic in src/box/merger.[hc] and leave only the
> Lua wrapper code in src/box/lua/merger.[hc]. The merger could then be
> submitted in a separate patch with an appropriate unit test - this would
> also ease the review process.

The merger will be very tiny, because most of logic interoperates with
Lua, but this should look more structured, so I agree.

We discussed with Vladimir how to split the code. He dislike the idea to
store basic fields in struct merger_state in box/merger.[ch] and extend
this structure with fetch_source_ref in box/lua/merger.[ch].

Also discussed: fetch_source has source.idx parameter and a user should
map source index to net.box connections or some other kind of 'upstream
source'. Vladimir don't think it is good API.

We also discussed whether it is possible to define a source as an object
with methods like next/fetch or next/next_batch. It is unclear how it
should work with buffer. I'll investigate possibilities and will get
back to this in the following email.

So this is not done.

> 
> A few comments regarding the API and the code are below.
> 
> >  test/app-tap/merger.test.lua |  693 ++++++++++++++
> >  test/app-tap/suite.ini       |    1 +
> >  7 files changed, 2402 insertions(+)
> >  create mode 100644 src/lua/merger.c
> >  create mode 100644 src/lua/merger.h
> >  create mode 100644 src/lua/merger.lua
> >  create mode 100755 test/app-tap/merger.test.lua
> 
> > diff --git a/src/lua/merger.c b/src/lua/merger.c
> > +/**
> > + * API and basic usage
> > + * -------------------
> > + *
> > + * The following example demonstrates API of the module:
> > + *
> > + * ```
> > + * local net_box = require('net.box')
> > + * local buffer = require('buffer')
> > + * local merger = require('merger')
> > + *
> > + * -- The format of key_parts parameter is the same as
> > + * -- `{box,conn}.space.<...>.index.<...>.parts` (where conn is
> > + * -- net.box connection).
> > + * local key_parts = {
> > + *     {
> > + *         fieldno = <number>,
> > + *         type = <string>,
> > + *         [ is_nullable = <boolean>, ]
> > + *         [ collation_id = <number>, ]
> > + *         [ collation = <string>, ]
> > + *     },
> > + *     ...
> > + * }
> > + *
> > + * -- Create the merger instance.
> > + * local merger_inst = merger.new(key_parts)
> 
> IMO it's not a merger instance, it's rather a merger context.
> 
> What about
> 
>   local ctx = merger.context.new(key_parts)
>   merger.pairs(ctx, {src1, src2, ...})
> 
> ?
> 
> Note, ctx is a mere argument here, not a class object.
> 

I agreed: key_def + format feels more as a context then as an instance.
And writing '_inst' postfixes in a user code was distracting a bit.

Done.

> > + * Decoding / encoding buffers
> > + * ---------------------------
> > + *
> > + * A select response has the following structure:
> > + * `{[48] = {tuples}}`, while a call response is
> > + * `{[48] = {{tuples}}}` (because it should support multiple
> > + * return values). A user should specify how merger will
> > + * operate on buffers, so merger has `decode` (how to read buffer
> > + * sources) and `encode` (how to write to a resulting buffer)
> > + * options. These options accept the following values:
> > + *
> > + * Option value       | Buffer structure
> > + * ------------------ | ----------------
> > + * 'raw'              | tuples
> > + * 'select' (default) | {[48] = {tuples}}
> > + * 'call'             | {[48] = {{tuples}}}
> > + * 'chain'            | {[48] = {{{tuples, ...}}}}
> 
> I don't think we should make merger dependent on iproto. I understand
> that it must be able to take an ibuf for performance considerations, but
> I think that the buffer must always be formatted as a msgpack array of
> tuples, without any extra headers. Headers should be removed either by
> the caller (with msgpack Lua lib) or by net.box itself.
> 

I understand your concern and agreed with arguments, but from user
perspective it is convenient to just say {decode = 'call'}. The new way
to decode should be convenient too, I think.

Added two functions (see the commit message for the documentation and
examples).

net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)
    -> new_rpos
    -> nil, err_msg
msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, [, arr_len])
    -> new_rpos, arr_len
    -> nil, err_msg

I think this API has several advantages that need to be taken into
consideration if we'll want to change it:

* It is simple for a user and don't bloat examples, because allows to
  check and skip iproto_data/array headers in 1-3 lines of code (see the
  example in docbot comment).
* It follows msgpack lua module convention and receives buf.rpos as
  input (however does not allow a string argument).
* It checks for out-of-buffer errors: so it will not crash in case of,
  say, empty buffer.

> > +#ifndef NDEBUG
> > +#include "say.h"
> > +/**
> > + * Heap insert/delete/update macros wrapped with debug prints.
> > + */
> > +#define MERGER_HEAP_INSERT(heap, hnode, source) do {			\
> > +	say_debug("merger: [source %p] insert: tuple: %s", (source),	\
> > +		  tuple_str((source)->tuple));				\
> > +	merger_heap_insert((heap), (hnode));				\
> > +} while(0)
> > +#define MERGER_HEAP_DELETE(heap, hnode, source) do {		\
> > +	say_debug("merger: [source %p] delete", (source));	\
> > +	merger_heap_delete((heap), (hnode));			\
> > +} while(0)
> > +#define MERGER_HEAP_UPDATE(heap, hnode, source) do {			\
> > +	say_debug("merger: [source %p] update: tuple: %s", (source),	\
> > +		  tuple_str((source)->tuple));				\
> > +	merger_heap_update((heap), (hnode));				\
> > +} while(0)
> > +#else /* !defined(NDEBUG) */
> > +/**
> > + * Heap insert/delete/update macros wrappers w/o debug prints.
> > + */
> > +#define MERGER_HEAP_INSERT(heap, hnode, source) do {	\
> > +	merger_heap_insert((heap), (hnode));		\
> > +} while(0)
> > +#define MERGER_HEAP_DELETE(heap, hnode, source) do {	\
> > +	merger_heap_delete((heap), (hnode));		\
> > +} while(0)
> > +#define MERGER_HEAP_UPDATE(heap, hnode, source) do {	\
> > +	merger_heap_update((heap), (hnode));		\
> > +} while(0)
> > +#endif /* !defined(NDEBUG) */
> 
> say_debug() doesn't evaluate arguments if log_level < DEBUG so I think
> it's no use to disable it if NDEBUG. I'd remove these macros and call
> say_debug() directly at call sites.
> 

The difference just in one jump (because of if statement), but it is on
the critical path: that is why I do it in that way.

I had left these prints, because I was debugging a problem in shard's
merger in the past with them, but now I think it is even easier to debug
buffers with something like:

```
yaml.encode(msgpackffi.decode(ffi.string(buf.rpos, buf.wpos - buf.rpos)))
```

I have removed these debug prints.

> Also, heap function may fail with ENOMEM unless you use heap_reserve().

Added check for merger_heap_insert() return value.

> 
> > +
> > +/**
> > + * Helper macros to push / throw out of memory errors to Lua.
> > + */
> > +#define push_out_of_memory_error(L, size, what_name) do {	\
> > +	diag_set(OutOfMemory, (size), "malloc", (what_name));	\
> > +	luaT_pusherror(L, diag_last_error(diag_get()));		\
> > +} while(0)
> > +#define throw_out_of_memory_error(L, size, what_name) do {	\
> > +	diag_set(OutOfMemory, (size), "malloc", (what_name));	\
> > +	luaT_error(L);						\
> > +	unreachable();						\
> > +	return -1;						\
> > +} while(0)
> 
> I wouldn't use these macros. They make a call only one line shorter:
> 
> 	throw_out_of_memory_error(L, size, "obj");
> 
> instead of
> 
> 	diag_set(OutOfMemory, size, "malloc", "obj");
> 	return luaT_error(L);
> 
> Not worth obscuring the code IMO.

Okay, expanded.

> 
> > +
> > +#define BOX_COLLATION_NAME_INDEX 1
> 
> Not used anywhere.

Removed in 2.1. It is used in 1.10.

> 
> > +
> > +/**
> > + * A type of data structure that holds source data.
> > + */
> > +enum merger_source_type {
> > +	SOURCE_TYPE_BUFFER,
> > +	SOURCE_TYPE_TABLE,
> > +	SOURCE_TYPE_ITERATOR,
> > +	SOURCE_TYPE_NONE,
> > +};
> 
> I'd prefer if you used vtab instead, because that would isolate code
> and data of each iterator type in a separate function/struct, making
> it easier to follow. Besides, you'll have to do that anyway provided
> you agree to move the merger logic to src/box/merger.c.
> 

Done.

> > +
> > +/**
> > + * How data are encoded in a buffer.
> > + *
> > + * `decode` and `encode` options are parsed to values of this
> > + * enum.
> > + */
> > +enum merger_buffer_type {
> > +	BUFFER_TYPE_RAW,
> > +	BUFFER_TYPE_SELECT,
> > +	BUFFER_TYPE_CALL,
> > +	BUFFER_TYPE_CHAIN,
> > +	BUFFER_TYPE_NONE,
> > +};
> > +
> > +/**
> > + * Hold state of a merge source.
> > + */
> > +struct merger_source {
> > +	/*
> > +	 * A source is the heap node. Compared by the next tuple.
> > +	 */
> > +	struct heap_node hnode;
> > +	/* Union determinant. */
> > +	enum merger_source_type type;
> > +	/* Fields specific for certaint source types. */
> > +	union {
> > +		/* Buffer source. */
> > +		struct {
> > +			struct ibuf *buf;
> > +			/*
> > +			 * A merger stops before end of a buffer
> > +			 * when it is not the last merger in the
> > +			 * chain.
> > +			 */
> > +			size_t remaining_tuples_cnt;
> > +		} buf;
> > +		/* Table source. */
> > +		struct {
> > +			int ref;
> > +			int next_idx;
> > +		} tbl;
> > +		/* Iterator source. */
> > +		struct {
> > +			struct lua_iterator *it;
> > +		} it;
> > +	};
> > +	/* Next tuple. */
> > +	struct tuple *tuple;
> > +};
> > +
> > +/**
> > + * Holds immutable parameters of a merger.
> > + */
> > +struct merger {
> 
> Should be called merger_context IMO.

I agree. Changed.

> 
> > +	struct key_def *key_def;
> > +	box_tuple_format_t *format;
> > +};
> > +
> > +/**
> > + * Holds parameters of merge process, sources, result storage
> > + * (if any), heap of sources and utility flags / counters.
> > + */
> > +struct merger_iterator {
> 
> Should be called merger or merger_state IMO.

I'm tentative about the name 'merger', because the term is very common.
However I understand the concept of 'central' structure of the module
like it is struct tuple for tuple.{c,h,lua}.

'merger_state' looks okay for me. This structure, wrapped into cdata, is
used as 'param' (3rd) value in the iterator triplet when
merger_inst:pairs(...) / merger.pairs(ctx, ...) is called, but 'state'
(2nd) value is struct merger / struct merger_context, but this
terminology is vague. So using context as 'state' and state as 'param'
looks okay eventually.

I'll choose merger_state for now and we can discuss it later again.
It should be quite easy to change it later if we need to do so.

> 
> > +	/* Heap of sources. */
> > +	heap_t heap;
> > +	/*
> > +	 * key_def is copied from merger.
> > +	 *
> > +	 * A merger can be collected by LuaJIT GC independently
> > +	 * from a merger_iterator, so we cannot just save pointer
> > +	 * to merger here and so we copy key_def from merger.
> > +	 */
> > +	struct key_def *key_def;
> 
> And what about the tuple format? You don't seem to copy or reference it
> anywhere. Confusing. I think that if the iterator needs the merger to
> stay along, it'd better reference it.
> 

We need key_def in the source_less() function, this is why we copy it
here. In source_less() we have no struct merger / merger_context, but
have struct merger_iterator / merger_state (because it contains the
heap). I have updated the comment as follows:

/*
 * Copy of key_def from merger_context.
 *
 * A merger_context can be collected by LuaJIT GC
 * independently from a merger_state, so we need either
 * copy key_def or implement reference counting for
 * merger_context and save the pointer.
 *
 * key_def is needed in source_less(), where merger_state
 * is known, but merger_context is not.
 */

> > +	/* Parsed sources and decoding parameters. */
> > +	uint32_t sources_count;
> > +	struct merger_source **sources;
> > +	enum merger_buffer_type decode;
> > +	/* Ascending / descending order. */
> > +	int order;
> > +	/* Optional output buffer and encoding parameters. */
> > +	struct ibuf *obuf;
> > +	enum merger_buffer_type encode;
> > +	uint32_t encode_chain_len;
> > +};
> > +
> > +static uint32_t merger_type_id = 0;
> > +static uint32_t merger_iterator_type_id = 0;
> > +static uint32_t ibuf_type_id = 0;
> > +
> > +/* Forward declarations. */
> > +static bool
> > +source_less(const heap_t *heap, const struct heap_node *a,
> > +	    const struct heap_node *b);
> > +static int
> > +lbox_merger_gc(struct lua_State *L);
> > +static void
> > +merger_iterator_delete(struct lua_State *L, struct merger_iterator *it);
> > +static int
> > +lbox_merger_iterator_gc(struct lua_State *L);
> > +
> > +#define HEAP_NAME merger_heap
> > +#define HEAP_LESS source_less
> > +#include "salad/heap.h"
> > +
> > +/**
> > + * Create the new tuple with specific format from a Lua table or a
> > + * tuple.
> > + *
> > + * In case of an error push the error message to the Lua stack and
> > + * return NULL.
> > + */
> > +static struct tuple *
> > +luaT_gettuple_with_format(struct lua_State *L, int idx,
> > +			  box_tuple_format_t *format)
> > +{
> > +	struct tuple *tuple;
> > +	if (lua_istable(L, idx)) {
> > +		/* Based on lbox_tuple_new() code. */
> 
> Please define this as a separate function somewhere in
> src/box/lua/tuple.c and reuse lbox_tuple_new() code instead
> of copying it. May be done in a separate patch, I guess.

Done.

The code becomes a bit weird, but the reason is the mix of mine
requirements and lbox_tuple_new() ones.

I need:

* Choose a table / a tuple with index on the lua stack.
* Pushed (don't throwed) errors to perform clean-up before rethrow.

lbox_tuple_new() need:

* Support old parameters format: box.tuple.new(1, 2, 3), so I booked
  idx == 0 for this (it is not valid lua stack index).

From the other side, the code now is not duplicated and the new funtion
(named luaT_newtuple()) is more flexible then lbox_tuple_new(): hope
it'll help with reusing the code later.

> 
> > +		struct ibuf *buf = tarantool_lua_ibuf;
> > +		ibuf_reset(buf);
> > +		struct mpstream stream;
> > +		mpstream_init(&stream, buf, ibuf_reserve_cb, ibuf_alloc_cb,
> > +		      luamp_error, L);
> > +		luamp_encode_tuple(L, luaL_msgpack_default, &stream, idx);
> > +		mpstream_flush(&stream);
> > +		tuple = box_tuple_new(format, buf->buf,
> > +				      buf->buf + ibuf_used(buf));
> > +		if (tuple == NULL) {
> > +			luaT_pusherror(L, diag_last_error(diag_get()));
> > +			return NULL;
> > +		}
> > +		ibuf_reinit(tarantool_lua_ibuf);
> > +		return tuple;
> > +	}
> > +	tuple = luaT_istuple(L, idx);
> > +	if (tuple == NULL) {
> > +		lua_pushfstring(L, "A tuple or a table expected, got %s",
> > +				lua_typename(L, lua_type(L, -1)));
> > +		return NULL;
> > +	}
> > +	/*
> > +	 * Create the new tuple with the format necessary for fast
> > +	 * comparisons.
> > +	 */
> > +	const char *tuple_beg = tuple_data(tuple);
> > +	const char *tuple_end = tuple_beg + tuple->bsize;
> > +	tuple = box_tuple_new(format, tuple_beg, tuple_end);
> > +	if (tuple == NULL) {
> > +		luaT_pusherror(L, diag_last_error(diag_get()));
> > +		return NULL;
> > +	}
> > +	return tuple;
> > +}
> 
> > +#define RPOS_P(buf) ((const char **) &(buf)->rpos)
> 
> Please, let's somehow get along without this macro.

Done.

> 
> > +
> > +/**
> > + * Skip (and check) the wrapper around tuples array (and the array
> > + * itself).
> > + *
> > + * Expected different kind of wrapping depending of it->decode.
> > + */
> > +static int
> > +decode_header(struct merger_iterator *it, struct ibuf *buf, size_t *len_p)
> > +{
> > +	int ok = 1;
> > +	/* Decode {[IPROTO_DATA] = ...} header. */
> > +	if (it->decode != BUFFER_TYPE_RAW)
> > +		ok = mp_typeof(*buf->rpos) == MP_MAP &&
> > +			mp_decode_map(RPOS_P(buf)) == 1 &&
> > +			mp_typeof(*buf->rpos) == MP_UINT &&
> > +			mp_decode_uint(RPOS_P(buf)) == IPROTO_DATA;
> > +	/* Decode the array around call return values. */
> > +	if (ok && (it->decode == BUFFER_TYPE_CALL ||
> > +	    it->decode == BUFFER_TYPE_CHAIN))
> > +		ok = mp_typeof(*buf->rpos) == MP_ARRAY &&
> > +			mp_decode_array(RPOS_P(buf)) > 0;
> > +	/* Decode the array around chained input. */
> > +	if (ok && it->decode == BUFFER_TYPE_CHAIN)
> > +		ok = mp_typeof(*buf->rpos) == MP_ARRAY &&
> > +			mp_decode_array(RPOS_P(buf)) > 0;
> > +	/* Decode the array around tuples to merge. */
> > +	if (ok)
> > +		ok = mp_typeof(*buf->rpos) == MP_ARRAY;
> > +	if (ok)
> > +		*len_p = mp_decode_array(RPOS_P(buf));
> > +	return ok;
> > +}
> > +
> > +#undef RPOS_P
> 
> > +/**
> > + * Determine type of a merger source on the Lua stack.
> > + *
> > + * Set *buf_p to buffer when the source is valid source of buffer
> > + * type and buf_p is not NULL.
> > + */
> > +static enum merger_source_type
> > +parse_source_type(lua_State *L, int idx, struct ibuf **buf_p)
> > +{
> > +	if (lua_type(L, idx) == LUA_TCDATA) {
> > +		struct ibuf *buf = check_ibuf(L, idx);
> 
> msgpack.decode takes ibuf.rpos rather than ibuf. May be, the merger
> should do the same, for consistency?

I have the following concerns here:

1. In this case it cannot check for end of buffer. See comment around
   mp_next() call. BTW, with ibuf.wpos we can replace it with mp_check()
   in the future.
2. I think the API looks better from a user perspective when the user
   just passes blackbox objects between compatible APIs and don't need
   to understand the internal structure of these objects.
3. Tricky callback contract in case of chunked data transfer, see below.

We can receive rpos, store a read position inside merger and return it
to the user to let him set rpos manually (to allow ibuf_reserve_slow()
reuse this memory). Then merger should clear its read position, but only
if the user really update rpos. So the contract between merger and user
becomes tricky.

So I leave it as is until we discuss it again.

BTW, msgpack/msgpackffi tends to be universal as possible, so
cdata<const char *> as the parameter type looks appropriate. These
modules don't need to handle interlaced reads and writes, so data in the
buffer do not move between reads.

Merger need to handle such cases (esp. after introducing chunked data
transfer support -- user callback to write more data), so using a buffer
as a buffer (not just as const char *) is appropriate here.

> 
> > +		if (buf == NULL)
> > +			return SOURCE_TYPE_NONE;
> > +		if (buf_p != NULL)
> > +			*buf_p = buf;
> > +		return SOURCE_TYPE_BUFFER;
> > +	} else if (lua_istable(L, idx)) {
> > +		lua_rawgeti(L, idx, 1);
> > +		int iscallable = luaT_iscallable(L, idx);
> > +		lua_pop(L, 1);
> > +		if (iscallable)
> > +			return SOURCE_TYPE_ITERATOR;
> > +		return SOURCE_TYPE_TABLE;
> > +	}
> > +
> > +	return SOURCE_TYPE_NONE;
> > +}
> 
> > +/**
> > + * Parse sources table: second parameter pf merger_isnt:pairs()
> > + * and merger_inst:select() into the merger_iterator structure.
> > + *
> > + * Note: This function should be called when options are already
> > + * parsed (using parse_opts()).
> > + *
> > + * Returns 0 on success. In case of an error it pushes an error
> > + * message to the Lua stack and returns 1.
> > + */
> > +static int
> > +parse_sources(struct lua_State *L, int idx, struct merger *merger,
> > +	      struct merger_iterator *it)
> > +{
> > +	/* Allocate sources array. */
> > +	uint32_t capacity = 8;
> > +	const ssize_t sources_size = capacity * sizeof(struct merger_source *);
> > +	it->sources = (struct merger_source **) malloc(sources_size);
> > +	if (it->sources == NULL) {
> > +		push_out_of_memory_error(L, sources_size, "it->sources");
> > +		return 1;
> > +	}
> > +
> > +	/* Fetch all sources. */
> > +	while (true) {
> > +		lua_pushinteger(L, it->sources_count + 1);
> > +		lua_gettable(L, idx);
> > +		if (lua_isnil(L, -1))
> > +			break;
> > +
> > +		/* Shrink sources array if needed. */
> 
> Grow

Fixed.

> 
> > +		if (it->sources_count == capacity) {
> > +			capacity *= 2;
> > +			struct merger_source **new_sources;
> > +			const ssize_t new_sources_size =
> > +				capacity * sizeof(struct merger_source *);
> > +			new_sources = (struct merger_source **) realloc(
> > +				it->sources, new_sources_size);
> > +			if (new_sources == NULL) {
> > +				push_out_of_memory_error(L, new_sources_size,
> > +							 "new_sources");
> > +				return 1;
> > +			}
> > +			it->sources = new_sources;
> > +		}
> > +
> > +		/* Allocate the new source. */
> > +		it->sources[it->sources_count] = (struct merger_source *)
> > +			malloc(sizeof(struct merger_source));
> > +		struct merger_source *current_source =
> > +			it->sources[it->sources_count];
> > +		if (current_source == NULL) {
> > +			push_out_of_memory_error(L,
> > +						 sizeof(struct merger_source),
> > +						 "merger_source");
> > +			return 1;
> > +		}
> > +
> > +		/*
> > +		 * Set type and tuple to correctly proceed in
> > +		 * merger_iterator_delete() in case of any further
> > +		 * error.
> > +		 */
> > +		struct ibuf *buf = NULL;
> > +		current_source->type = parse_source_type(L, -1, &buf);
> > +		current_source->tuple = NULL;
> > +
> > +		/*
> > +		 * Note: We need to increment sources count right
> > +		 * after successful malloc() of the new source
> > +		 * (before any further error check), because
> > +		 * merger_iterator_delete() frees that amount of
> > +		 * sources.
> > +		 */
> > +		++it->sources_count;
> > +
> > +		/* Initialize the new source. */
> > +		switch (current_source->type) {
> > +		case SOURCE_TYPE_BUFFER:
> > +			if (!decode_header(it, buf,
> > +			    &current_source->buf.remaining_tuples_cnt)) {
> > +				lua_pushstring(L, "Invalid merge source");
> > +				return 1;
> > +			}
> > +			current_source->buf.buf = buf;
> > +			break;
> > +		case SOURCE_TYPE_TABLE:
> > +			/* Save a table ref and a next index. */
> > +			lua_pushvalue(L, -1); /* Popped by luaL_ref(). */
> > +			int tbl_ref = luaL_ref(L, LUA_REGISTRYINDEX);
> > +			current_source->tbl.ref = tbl_ref;
> > +			current_source->tbl.next_idx = 1;
> > +			break;
> > +		case SOURCE_TYPE_ITERATOR:
> > +			/* Wrap and save iterator. */
> > +			current_source->it.it =
> > +				lua_iterator_new_fromtable(L, -1);
> > +			break;
> > +		case SOURCE_TYPE_NONE:
> > +			lua_pushfstring(L, "Unknown source type at index %d",
> > +					it->sources_count);
> > +			return 1;
> > +		default:
> > +			unreachable();
> > +			return 1;
> > +		}
> > +		if (source_fetch(L, current_source, merger->format) != 0)
> > +			return 1;
> > +		if (current_source->tuple != NULL)
> > +			MERGER_HEAP_INSERT(&it->heap,
> > +					   &current_source->hnode,
> > +					   current_source);
> > +	}
> > +	lua_pop(L, it->sources_count + 1);
> 
> This function is soo long. Let's split it.

I have wrapped sources creation into the merger_source_new() function
and the parse_sources() function now is within the screen of code.

> 
> > +
> > +	return 0;
> > +}
> 
> > +/**
> > + * Create the new merger instance.
> > + *
> > + * Expected a table of key parts on the Lua stack.
> > + *
> > + * Returns the new instance.
> > + */
> > +static int
> > +lbox_merger_new(struct lua_State *L)
> > +{
> > +	if (lua_gettop(L) != 1 || lua_istable(L, 1) != 1)
> > +		return luaL_error(L, "Bad params, use: merger.new({"
> > +				  "{fieldno = fieldno, type = type"
> > +				  "[, is_nullable = is_nullable"
> > +				  "[, collation_id = collation_id"
> > +				  "[, collation = collation]]]}, ...}");
> > +	uint32_t key_parts_count = 0;
> > +	uint32_t capacity = 8;
> > +
> > +	const ssize_t parts_size = sizeof(struct key_part_def) * capacity;
> > +	struct key_part_def *parts = NULL;
> > +	parts = (struct key_part_def *) malloc(parts_size);
> > +	if (parts == NULL)
> > +		throw_out_of_memory_error(L, parts_size, "parts");
> > +
> > +	while (true) {
> > +		lua_pushinteger(L, key_parts_count + 1);
> > +		lua_gettable(L, 1);
> > +		if (lua_isnil(L, -1))
> > +			break;
> > +
> > +		/* Extend parts if necessary. */
> > +		if (key_parts_count == capacity) {
> 
> Let's factor out key_def creation and define it elsewhere
> (src/box/lua/key_def.[hc]?)

Good idea. I have added the code almost as is under name
luaT_new_key_def() to src/box/lua/key_def.c, added to exports (and
module.h) to test.

I don't much like the idea of exporting it, but I don't know how to test
it in the unit-test way: it depends on box stuff and requires lua state.

> 
> Can we somehow reuse the code of key_def_decode_parts for this?

In short: no, we cannot, as I see.

Merger parses two formats:

* box.space.s.index.pk.parts Lua table;
* net_box_conn.space.s.index.pk.parts Lua table.

They are almost identical, but former shows collation by name in
'collation' field, while latter shows it in 'collation_id' field as a
number.

box.space._index:get(<...>)[6] format is very different from them:

* it can be an array as well as map;
* 'field' instead of 'fieldno';
* 'nullable_action' enum instead of 'is_nullable' flag.

> 
> > +			capacity *= 2;
> > +			struct key_part_def *old_parts = parts;
> > +			const ssize_t parts_size =
> > +				sizeof(struct key_part_def) * capacity;
> > +			parts = (struct key_part_def *) realloc(parts,
> > +								parts_size);
> > +			if (parts == NULL) {
> > +				free(old_parts);
> > +				throw_out_of_memory_error(L, parts_size,
> > +							  "parts");
> > +			}
> > +		}
> > +
> > +		/* Set parts[key_parts_count].fieldno. */
> > +		lua_pushstring(L, "fieldno");
> > +		lua_gettable(L, -2);
> > +		if (lua_isnil(L, -1)) {
> > +			free(parts);
> > +			return luaL_error(L, "fieldno must not be nil");
> > +		}
> > +		/*
> > +		 * Transform one-based Lua fieldno to zero-based
> > +		 * fieldno to use in key_def_new().
> > +		 */
> > +		parts[key_parts_count].fieldno = lua_tointeger(L, -1) - 1;
> > +		lua_pop(L, 1);
> > +
> > +		/* Set parts[key_parts_count].type. */
> > +		lua_pushstring(L, "type");
> > +		lua_gettable(L, -2);
> > +		if (lua_isnil(L, -1)) {
> > +			free(parts);
> > +			return luaL_error(L, "type must not be nil");
> > +		}
> > +		size_t type_len;
> > +		const char *type_name = lua_tolstring(L, -1, &type_len);
> > +		lua_pop(L, 1);
> > +		parts[key_parts_count].type = field_type_by_name(type_name,
> > +								 type_len);
> > +		if (parts[key_parts_count].type == field_type_MAX) {
> > +			free(parts);
> > +			return luaL_error(L, "Unknown field type: %s",
> > +					  type_name);
> > +		}
> > +
> > +		/* Set parts[key_parts_count].is_nullable. */
> > +		lua_pushstring(L, "is_nullable");
> > +		lua_gettable(L, -2);
> > +		if (lua_isnil(L, -1)) {
> > +			parts[key_parts_count].is_nullable = false;
> > +			parts[key_parts_count].nullable_action =
> > +				ON_CONFLICT_ACTION_DEFAULT;
> > +		} else {
> > +			parts[key_parts_count].is_nullable =
> > +				lua_toboolean(L, -1);
> > +			parts[key_parts_count].nullable_action =
> > +				ON_CONFLICT_ACTION_NONE;
> > +		}
> > +		lua_pop(L, 1);
> > +
> > +		/* Set parts[key_parts_count].coll_id using collation_id. */
> > +		lua_pushstring(L, "collation_id");
> > +		lua_gettable(L, -2);
> > +		if (lua_isnil(L, -1))
> > +			parts[key_parts_count].coll_id = COLL_NONE;
> > +		else
> > +			parts[key_parts_count].coll_id = lua_tointeger(L, -1);
> > +		lua_pop(L, 1);
> > +
> > +		/* Set parts[key_parts_count].coll_id using collation. */
> > +		lua_pushstring(L, "collation");
> > +		lua_gettable(L, -2);
> > +		/* Check whether box.cfg{} was called. */
> > +		if ((parts[key_parts_count].coll_id != COLL_NONE ||
> > +		    !lua_isnil(L, -1)) && !box_is_configured()) {
> > +			free(parts);
> > +			return luaL_error(L, "Cannot use collations: "
> > +					  "please call box.cfg{}");
> > +		}
> > +		if (!lua_isnil(L, -1)) {
> > +			if (parts[key_parts_count].coll_id != COLL_NONE) {
> > +				free(parts);
> > +				return luaL_error(
> > +					L, "Conflicting options: collation_id "
> > +					"and collation");
> > +			}
> > +			size_t coll_name_len;
> > +			const char *coll_name = lua_tolstring(L, -1,
> > +							      &coll_name_len);
> > +			struct coll_id *coll_id = coll_by_name(coll_name,
> > +							       coll_name_len);
> > +			if (coll_id == NULL) {
> > +				free(parts);
> > +				return luaL_error(
> > +					L, "Unknown collation: \"%s\"",
> > +					coll_name);
> > +			}
> > +			parts[key_parts_count].coll_id = coll_id->id;
> > +		}
> > +		lua_pop(L, 1);
> > +
> > +		/* Check coll_id. */
> > +		struct coll_id *coll_id =
> > +			coll_by_id(parts[key_parts_count].coll_id);
> > +		if (parts[key_parts_count].coll_id != COLL_NONE &&
> > +		    coll_id == NULL) {
> > +			uint32_t collation_id = parts[key_parts_count].coll_id;
> > +			free(parts);
> > +			return luaL_error(L, "Unknown collation_id: %d",
> > +					  collation_id);
> > +		}
> > +
> > +		/* Set parts[key_parts_count].sort_order. */
> > +		parts[key_parts_count].sort_order = SORT_ORDER_ASC;
> > +
> > +		++key_parts_count;
> > +	}
> > +
> > +	struct merger *merger = calloc(1, sizeof(*merger));
> > +	if (merger == NULL) {
> > +		free(parts);
> > +		throw_out_of_memory_error(L, sizeof(*merger), "merger");
> > +	}
> > +	merger->key_def = key_def_new(parts, key_parts_count);
> > +	free(parts);
> > +	if (merger->key_def == NULL) {
> > +		return luaL_error(L, "Cannot create merger->key_def");
> > +	}
> > +
> > +	merger->format = box_tuple_format_new(&merger->key_def, 1);
> > +	if (merger->format == NULL) {
> > +		box_key_def_delete(merger->key_def);
> > +		free(merger);
> > +		return luaL_error(L, "Cannot create merger->format");
> > +	}
> > +
> > +	*(struct merger **) luaL_pushcdata(L, merger_type_id) = merger;
> > +
> > +	lua_pushcfunction(L, lbox_merger_gc);
> > +	luaL_setcdatagc(L, -2);
> > +
> > +	return 1;
> > +}

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [tarantool-patches] [PATCH 0/3] lua: add key_def lua module
  2018-12-16 20:17 [PATCH 0/3] Merger Alexander Turenko
                   ` (3 preceding siblings ...)
  2018-12-18 12:16 ` [PATCH 0/3] Merger Alexander Turenko
@ 2019-03-22 14:24 ` Kirill Shcherbatov
  2019-03-22 16:20   ` Alexander Turenko
  4 siblings, 1 reply; 14+ messages in thread
From: Kirill Shcherbatov @ 2019-03-22 14:24 UTC (permalink / raw)
  To: tarantool-patches, Alexander Turenko; +Cc: Vladimir Davydov

Hi! Can't find the patch that we were discussed, so, I've copied It from branch by my own.

The code you implemented is very similar in many ways to the one already implemented in key_def.c.
I have one principal proposal and one advice(may be good enough):
1) at first, all map key names must follow their semantic-twin is used with create_index/alter;
    I mean field, not fieldno e.g.
    The errors also must not differ, I believe.
    https://tarantool.io/en/doc/1.10/book/box/box_space/#box-space-create-index

2) (proposal) I think you can do without the function load_key_def_set_part, that repeats]
    key_def_decode_parts code in many moments. You may encode tuple on region with
    lbox_encode_tuple_on_gc() and pass it to key_def_decode_parts(). While key_def:new is
    not performance-critical, this is better managed way to solve this problem, I think. Consider my
    RFC diff below:

diff --git a/src/box/lua/key_def.c b/src/box/lua/key_def.c
index 48d111b03..653e43817 100644
--- a/src/box/lua/key_def.c
+++ b/src/box/lua/key_def.c
@@ -38,117 +38,12 @@
 #include "box/box.h"
 #include "box/coll_id_cache.h"
 #include "lua/utils.h"
+#include "fiber.h"
+#include "box/lua/misc.h" /* lbox_encode_tuple_on_gc() */
 #include "box/tuple_format.h" /* TUPLE_INDEX_BASE */
 
 static uint32_t key_def_type_id = 0;
 
-/**
- * Set key_part_def from a table on top of a Lua stack.
- *
- * When successful return 0, otherwise return -1 and set a diag.
- */
-static int
-luaT_key_def_set_part(struct lua_State *L, struct key_part_def *part)
-{
-	/* Set part->fieldno. */
-	lua_pushstring(L, "fieldno");
-	lua_gettable(L, -2);
-	if (lua_isnil(L, -1)) {
-		diag_set(IllegalParams, "fieldno must not be nil");
-		return -1;
-	}
-	/*
-	 * Transform one-based Lua fieldno to zero-based
-	 * fieldno to use in key_def_new().
-	 */
-	part->fieldno = lua_tointeger(L, -1) - TUPLE_INDEX_BASE;
-	lua_pop(L, 1);
-
-	/* Set part->type. */
-	lua_pushstring(L, "type");
-	lua_gettable(L, -2);
-	if (lua_isnil(L, -1)) {
-		diag_set(IllegalParams, "type must not be nil");
-		return -1;
-	}
-	size_t type_len;
-	const char *type_name = lua_tolstring(L, -1, &type_len);
-	lua_pop(L, 1);
-	part->type = field_type_by_name(type_name, type_len);
-	switch (part->type) {
-	case FIELD_TYPE_ANY:
-	case FIELD_TYPE_ARRAY:
-	case FIELD_TYPE_MAP:
-		/* Tuple comparators don't support these types. */
-		diag_set(IllegalParams, "Unsupported field type: %s",
-			 type_name);
-		return -1;
-	case field_type_MAX:
-		diag_set(IllegalParams, "Unknown field type: %s", type_name);
-		return -1;
-	default:
-		/* Pass though. */
-		break;
-	}
-
-	/* Set part->is_nullable and part->nullable_action. */
-	lua_pushstring(L, "is_nullable");
-	lua_gettable(L, -2);
-	if (lua_isnil(L, -1)) {
-		part->is_nullable = false;
-		part->nullable_action = ON_CONFLICT_ACTION_DEFAULT;
-	} else {
-		part->is_nullable = lua_toboolean(L, -1);
-		part->nullable_action = ON_CONFLICT_ACTION_NONE;
-	}
-	lua_pop(L, 1);
-
-	/*
-	 * Set part->coll_id using collation_id.
-	 *
-	 * The value will be checked in key_def_new().
-	 */
-	lua_pushstring(L, "collation_id");
-	lua_gettable(L, -2);
-	if (lua_isnil(L, -1))
-		part->coll_id = COLL_NONE;
-	else
-		part->coll_id = lua_tointeger(L, -1);
-	lua_pop(L, 1);
-
-	/* Set part->coll_id using collation. */
-	lua_pushstring(L, "collation");
-	lua_gettable(L, -2);
-	if (!lua_isnil(L, -1)) {
-		/* Check for conflicting options. */
-		if (part->coll_id != COLL_NONE) {
-			diag_set(IllegalParams, "Conflicting options: "
-				 "collation_id and collation");
-			return -1;
-		}
-
-		size_t coll_name_len;
-		const char *coll_name = lua_tolstring(L, -1, &coll_name_len);
-		struct coll_id *coll_id = coll_by_name(coll_name,
-						       coll_name_len);
-		if (coll_id == NULL) {
-			diag_set(IllegalParams, "Unknown collation: \"%s\"",
-				 coll_name);
-			return -1;
-		}
-		part->coll_id = coll_id->id;
-	}
-	lua_pop(L, 1);
-
-	/* Set part->sort_order. */
-	part->sort_order = SORT_ORDER_ASC;
-
-	/* Set part->path. */
-	part->path = NULL;
-
-	return 0;
-}
-
 struct key_def *
 check_key_def(struct lua_State *L, int idx)
 {
@@ -194,33 +89,34 @@ lbox_key_def_new(struct lua_State *L)
 				  "[, collation_id = <number>]"
 				  "[, collation = <string>]}, ...}");
 
+	size_t tuple_len;
 	uint32_t part_count = lua_objlen(L, 1);
-	const ssize_t parts_size = sizeof(struct key_part_def) * part_count;
-	struct key_part_def *parts = malloc(parts_size);
-	if (parts == NULL) {
-		diag_set(OutOfMemory, parts_size, "malloc", "parts");
-		return luaT_error(L);
+	const ssize_t part_def_size = sizeof(struct key_part_def) * part_count;
+	struct key_part_def *part_def = malloc(part_def_size);
+	if (part_def == NULL) {
+		diag_set(OutOfMemory, part_def_size, "malloc", "parts");
+		goto error;
 	}
-
-	for (uint32_t i = 0; i < part_count; ++i) {
-		lua_pushinteger(L, i + 1);
-		lua_gettable(L, 1);
-		if (luaT_key_def_set_part(L, &parts[i]) != 0) {
-			free(parts);
-			return luaT_error(L);
-		}
-	}
-
-	struct key_def *key_def = key_def_new(parts, part_count);
-	free(parts);
+	const char *parts = lbox_encode_tuple_on_gc(L, 1, &tuple_len);
+	if (parts == NULL)
+		goto error;
+	(void)mp_decode_array(&parts);
+	if (key_def_decode_parts(part_def, part_count, &parts,
+				 NULL, 0, &fiber()->gc) != 0)
+		goto error;
+
+	struct key_def *key_def = key_def_new(part_def, part_count);
 	if (key_def == NULL)
-		return luaT_error(L);
+		goto error;
 
 	*(struct key_def **) luaL_pushcdata(L, key_def_type_id) = key_def;
 	lua_pushcfunction(L, lbox_key_def_gc);
 	luaL_setcdatagc(L, -2);
 
 	return 1;
+error:
+	free(part_def);
+	return luaT_error(L);
 }
 
 LUA_API int
diff --git a/test/box-tap/key_def.test.lua b/test/box-tap/key_def.test.lua
index 7e6e0e330..3e7366252 100755
--- a/test/box-tap/key_def.test.lua
+++ b/test/box-tap/key_def.test.lua
@@ -5,7 +5,7 @@ local ffi = require('ffi')
 local key_def = require('key_def')
 
 local usage_error = 'Bad params, use: key_def.new({' ..
-                    '{fieldno = fieldno, type = type' ..
+                    '{field = fieldno, type = type' ..
                     '[, is_nullable = <boolean>]' ..
                     '[, collation_id = <number>]' ..
                     '[, collation = <string>]}, ...}'
@@ -24,7 +24,7 @@ local cases = {
     {
         'Pass a field on an unknown type',
         parts = {{
-            fieldno = 2,
+            field = 2,
             type = 'unknown',
         }},
         exp_err = 'Unknown field type: unknown',
@@ -32,7 +32,7 @@ local cases = {
     {
         'Try to use collation_id before box.cfg{}',
         parts = {{
-            fieldno = 1,
+            field = 1,
             type = 'string',
             collation_id = 2,
         }},
@@ -41,7 +41,7 @@ local cases = {
     {
         'Try to use collation before box.cfg{}',
         parts = {{
-            fieldno = 1,
+            field = 1,
             type = 'string',
             collation = 'unicode_ci',
         }},
@@ -55,7 +55,7 @@ local cases = {
     {
         'Try to use both collation_id and collation',
         parts = {{
-            fieldno = 1,
+            field = 1,
             type = 'string',
             collation_id = 2,
             collation = 'unicode_ci',
@@ -65,7 +65,7 @@ local cases = {
     {
         'Unknown collation_id',
         parts = {{
-            fieldno = 1,
+            field = 1,
             type = 'string',
             collation_id = 42,
         }},
@@ -74,7 +74,7 @@ local cases = {
     {
         'Unknown collation name',
         parts = {{
-            fieldno = 1,
+            field = 1,
             type = 'string',
             collation = 'unknown',
         }},
@@ -103,8 +103,7 @@ local cases = {
     {
         'Success case; one part',
         parts = {
-            fieldno = 1,
-            type = 'string',
+            {field = 1, type = 'string'},
         },
         exp_err = nil,
     },

================================================================

Your original patch. Consider my 3 comments:

================================================================

> There are two reasons to add this module:
> 
> * Incapsulate key_def creation from a Lua table (factor it out from
>   merger's code).
> * Support comparing tuple with tuple and/or tuple with key from Lua in
>   the future.
> 
> The format of `parts` parameter in the `key_def.new(parts)` call is
> compatible with the following structures:
> 
> * box.space[...].index[...].parts;
> * net_box_conn.space[...].index[...].parts.
> 
> Needed for #3276.
> Needed for #3398.
> 
> @TarantoolBot document
> Title: Document built-in key_def lua module
> 
> Now there is only stub with the `key_def.new(parts)` function that
> returns cdata<struct key_def &>. The only way to use it for now is pass
> it to the merger.
> 
> This module will be improved in the scope of
> https://github.com/tarantool/tarantool/issues/3398
> 
> See the commit message for more info.
> ---
>  src/CMakeLists.txt            |   1 +
>  src/box/CMakeLists.txt        |   1 +
>  src/box/lua/init.c            |   3 +
>  src/box/lua/key_def.c         | 240 ++++++++++++++++++++++++++++++++++
>  src/box/lua/key_def.h         |  56 ++++++++
>  test/box-tap/key_def.test.lua | 137 +++++++++++++++++++
>  6 files changed, 438 insertions(+)
>  create mode 100644 src/box/lua/key_def.c
>  create mode 100644 src/box/lua/key_def.h
>  create mode 100755 test/box-tap/key_def.test.lua
> 
> diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
> index 7c2395517..a6a18142b 100644
> --- a/src/CMakeLists.txt
> +++ b/src/CMakeLists.txt
> @@ -136,6 +136,7 @@ set(api_headers
>      ${CMAKE_SOURCE_DIR}/src/lua/string.h
>      ${CMAKE_SOURCE_DIR}/src/box/txn.h
>      ${CMAKE_SOURCE_DIR}/src/box/key_def.h
> +    ${CMAKE_SOURCE_DIR}/src/box/lua/key_def.h
>      ${CMAKE_SOURCE_DIR}/src/box/field_def.h
>      ${CMAKE_SOURCE_DIR}/src/box/tuple.h
>      ${CMAKE_SOURCE_DIR}/src/box/tuple_format.h
> diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
> index 59e91b65a..f25c21045 100644
> --- a/src/box/CMakeLists.txt
> +++ b/src/box/CMakeLists.txt
> @@ -139,6 +139,7 @@ add_library(box STATIC
>      lua/net_box.c
>      lua/xlog.c
>      lua/sql.c
> +    lua/key_def.c
>      ${bin_sources})
>  
>  target_link_libraries(box box_error tuple stat xrow xlog vclock crc32 scramble
> diff --git a/src/box/lua/init.c b/src/box/lua/init.c
> index 744b2c895..69f346414 100644
> --- a/src/box/lua/init.c
> +++ b/src/box/lua/init.c
> @@ -59,6 +59,7 @@
>  #include "box/lua/console.h"
>  #include "box/lua/tuple.h"
>  #include "box/lua/sql.h"
> +#include "box/lua/key_def.h"
>  
>  extern char session_lua[],
>  	tuple_lua[],
> @@ -312,6 +313,8 @@ box_lua_init(struct lua_State *L)
>  	lua_pop(L, 1);
>  	tarantool_lua_console_init(L);
>  	lua_pop(L, 1);
> +	luaopen_key_def(L);
> +	lua_pop(L, 1);
>  
>  	/* Load Lua extension */
>  	for (const char **s = lua_sources; *s; s += 2) {
> diff --git a/src/box/lua/key_def.c b/src/box/lua/key_def.c
> new file mode 100644
> index 000000000..48d111b03
> --- /dev/null
> +++ b/src/box/lua/key_def.c
> @@ -0,0 +1,240 @@
> +/*
> + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +
> +#include "box/lua/key_def.h"
> +
> +#include <lua.h>
> +#include <lauxlib.h>
> +#include "diag.h"
> +#include "box/key_def.h"
> +#include "box/box.h"
> +#include "box/coll_id_cache.h"
> +#include "lua/utils.h"
> +#include "box/tuple_format.h" /* TUPLE_INDEX_BASE */
> +
> +static uint32_t key_def_type_id = 0;
> +
> +/**
> + * Set key_part_def from a table on top of a Lua stack.
> + *
> + * When successful return 0, otherwise return -1 and set a diag.
> + */
> +static int
> +luaT_key_def_set_part(struct lua_State *L, struct key_part_def *part)
> +{
> +	/* Set part->fieldno. */
> +	lua_pushstring(L, "fieldno");
> +	lua_gettable(L, -2);
> +	if (lua_isnil(L, -1)) {
> +		diag_set(IllegalParams, "fieldno must not be nil");
> +		return -1;
> +	}
> +	/*
> +	 * Transform one-based Lua fieldno to zero-based
> +	 * fieldno to use in key_def_new().
> +	 */
> +	part->fieldno = lua_tointeger(L, -1) - TUPLE_INDEX_BASE;
> +	lua_pop(L, 1);
> +
> +	/* Set part->type. */
> +	lua_pushstring(L, "type");
> +	lua_gettable(L, -2);
> +	if (lua_isnil(L, -1)) {
> +		diag_set(IllegalParams, "type must not be nil");
> +		return -1;
> +	}
> +	size_t type_len;
> +	const char *type_name = lua_tolstring(L, -1, &type_len);
> +	lua_pop(L, 1);
> +	part->type = field_type_by_name(type_name, type_len);
> +	switch (part->type) {
> +	case FIELD_TYPE_ANY:
> +	case FIELD_TYPE_ARRAY:
> +	case FIELD_TYPE_MAP:
> +		/* Tuple comparators don't support these types. */
> +		diag_set(IllegalParams, "Unsupported field type: %s",
> +			 type_name);
> +		return -1;
> +	case field_type_MAX:
> +		diag_set(IllegalParams, "Unknown field type: %s", type_name);
> +		return -1;
> +	default:
> +		/* Pass though. */
> +		break;
> +	}
> +
> +	/* Set part->is_nullable and part->nullable_action. */
> +	lua_pushstring(L, "is_nullable");
> +	lua_gettable(L, -2);
> +	if (lua_isnil(L, -1)) {
> +		part->is_nullable = false;
> +		part->nullable_action = ON_CONFLICT_ACTION_DEFAULT;
> +	} else {
> +		part->is_nullable = lua_toboolean(L, -1);
> +		part->nullable_action = ON_CONFLICT_ACTION_NONE;
> +	}
> +	lua_pop(L, 1);
> +
> +	/*
> +	 * Set part->coll_id using collation_id.
> +	 *
> +	 * The value will be checked in key_def_new().
> +	 */
> +	lua_pushstring(L, "collation_id");
> +	lua_gettable(L, -2);
> +	if (lua_isnil(L, -1))
> +		part->coll_id = COLL_NONE;
> +	else
> +		part->coll_id = lua_tointeger(L, -1);
> +	lua_pop(L, 1);
> +
> +	/* Set part->coll_id using collation. */
> +	lua_pushstring(L, "collation");
> +	lua_gettable(L, -2);
> +	if (!lua_isnil(L, -1)) {
> +		/* Check for conflicting options. */
> +		if (part->coll_id != COLL_NONE) {
> +			diag_set(IllegalParams, "Conflicting options: "
> +				 "collation_id and collation");
> +			return -1;
> +		}
> +
> +		size_t coll_name_len;
> +		const char *coll_name = lua_tolstring(L, -1, &coll_name_len);
> +		struct coll_id *coll_id = coll_by_name(coll_name,
> +						       coll_name_len);
> +		if (coll_id == NULL) {
> +			diag_set(IllegalParams, "Unknown collation: \"%s\"",
> +				 coll_name);
> +			return -1;
> +		}
> +		part->coll_id = coll_id->id;
> +	}
> +	lua_pop(L, 1);
> +
> +	/* Set part->sort_order. */
> +	part->sort_order = SORT_ORDER_ASC;
> +
> +	/* Set part->path. */
> +	part->path = NULL;
> +
> +	return 0;
> +}
> +
> +struct key_def *
> +check_key_def(struct lua_State *L, int idx)
> +{
> +	if (lua_type(L, idx) != LUA_TCDATA)
> +		return NULL;
> +
> +	uint32_t cdata_type;
> +	struct key_def **key_def_ptr = luaL_checkcdata(L, idx, &cdata_type);
> +	if (key_def_ptr == NULL || cdata_type != key_def_type_id)
> +		return NULL;
> +	return *key_def_ptr;
> +}
> +
> +/**
> + * Free a key_def from a Lua code.
> + */
> +static int
> +lbox_key_def_gc(struct lua_State *L)
> +{
> +	struct key_def *key_def = check_key_def(L, 1);
> +	if (key_def == NULL)
> +		return 0;
> +	box_key_def_delete(key_def);
> +	return 0;
> +}
> +
> +/**
> + * Create a new key_def from a Lua table.
> + *
> + * Expected a table of key parts on the Lua stack. The format is
> + * the same as box.space.<...>.index.<...>.parts or corresponding
> + * net.box's one.
> + *
> + * Return the new key_def as cdata.
> + */
> +static int
> +lbox_key_def_new(struct lua_State *L)
> +{
> +	if (lua_gettop(L) != 1 || lua_istable(L, 1) != 1)
> +		return luaL_error(L, "Bad params, use: key_def.new({"
> +				  "{fieldno = fieldno, type = type"
> +				  "[, is_nullable = <boolean>]"
> +				  "[, collation_id = <number>]"
> +				  "[, collation = <string>]}, ...}");
> +
> +	uint32_t part_count = lua_objlen(L, 1);
> +	const ssize_t parts_size = sizeof(struct key_part_def) * part_count;
> +	struct key_part_def *parts = malloc(parts_size);
> +	if (parts == NULL) {
> +		diag_set(OutOfMemory, parts_size, "malloc", "parts");
> +		return luaT_error(L);
> +	}
> +
> +	for (uint32_t i = 0; i < part_count; ++i) {
> +		lua_pushinteger(L, i + 1);
> +		lua_gettable(L, 1);
> +		if (luaT_key_def_set_part(L, &parts[i]) != 0) {
> +			free(parts);
> +			return luaT_error(L);
> +		}
> +	}
> +
> +	struct key_def *key_def = key_def_new(parts, part_count);
> +	free(parts);
> +	if (key_def == NULL)
> +		return luaT_error(L);
> +
> +	*(struct key_def **) luaL_pushcdata(L, key_def_type_id) = key_def;
> +	lua_pushcfunction(L, lbox_key_def_gc);
> +	luaL_setcdatagc(L, -2);
> +
> +	return 1;
> +}
> +
> +LUA_API int
> +luaopen_key_def(struct lua_State *L)
> +{
> +	luaL_cdef(L, "struct key_def;");
> +	key_def_type_id = luaL_ctypeid(L, "struct key_def&");
> +
> +	/* Export C functions to Lua. */
> +	static const struct luaL_Reg meta[] = {
> +		{"new", lbox_key_def_new},
> +		{NULL, NULL}
> +	};
> +	luaL_register_module(L, "key_def", meta);
> +
> +	return 1;
> +}
> diff --git a/src/box/lua/key_def.h b/src/box/lua/key_def.h
> new file mode 100644
> index 000000000..11cc0bfd4
> --- /dev/null
> +++ b/src/box/lua/key_def.h
> @@ -0,0 +1,56 @@
> +#ifndef TARANTOOL_BOX_LUA_KEY_DEF_H_INCLUDED
> +#define TARANTOOL_BOX_LUA_KEY_DEF_H_INCLUDED
> +/*
> + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +
> +#if defined(__cplusplus)
> +extern "C" {
> +#endif /* defined(__cplusplus) */
> +
> +struct lua_State;
> +
> +/**
> + * Extract a key_def object from a Lua stack.
> + */
> +struct key_def *
> +check_key_def(struct lua_State *L, int idx);
> +
> +/**
> + * Register the module.
> + */
> +int
> +luaopen_key_def(struct lua_State *L);
> +
> +#if defined(__cplusplus)
> +} /* extern "C" */
> +#endif /* defined(__cplusplus) */
> +
> +#endif /* TARANTOOL_BOX_LUA_KEY_DEF_H_INCLUDED */
> diff --git a/test/box-tap/key_def.test.lua b/test/box-tap/key_def.test.lua
> new file mode 100755
> index 000000000..7e6e0e330
> --- /dev/null
> +++ b/test/box-tap/key_def.test.lua
> @@ -0,0 +1,137 @@
> +#!/usr/bin/env tarantool
> +
> +local tap = require('tap')
> +local ffi = require('ffi')
> +local key_def = require('key_def')
> +
> +local usage_error = 'Bad params, use: key_def.new({' ..
> +                    '{fieldno = fieldno, type = type' ..

1. s/fieldno/field

> +                    '[, is_nullable = <boolean>]' ..
> +                    '[, collation_id = <number>]' ..
> +                    '[, collation = <string>]}, ...}'
> +
> +local function coll_not_found(fieldno, collation)
> +    if type(collation) == 'number' then
> +        return ('Wrong index options (field %d): ' ..
> +               'collation was not found by ID'):format(fieldno)
> +    end
> +
> +    return ('Unknown collation: "%s"'):format(collation)
> +end
> +
> +local cases = {
> +    -- Cases to call before box.cfg{}.
> +    {
> +        'Pass a field on an unknown type',
> +        parts = {{
> +            fieldno = 2,

2. s/fieldno/field and so on

> +            type = 'unknown',
> +        }},
> +        exp_err = 'Unknown field type: unknown',
> +    },
> +    {
> +        'Try to use collation_id before box.cfg{}',
> +        parts = {{
> +            fieldno = 1,
> +            type = 'string',
> +            collation_id = 2,
> +        }},
> +        exp_err = coll_not_found(1, 2),
> +    },
> +    {
> +        'Try to use collation before box.cfg{}',
> +        parts = {{
> +            fieldno = 1,
> +            type = 'string',
> +            collation = 'unicode_ci',

3. If you accept my proposal to reuse the existing code, you will have to do something
about the fact that the collation can now be set using the name of the pre-resolver
'collation_name' --> coll_id, which occurs in update_index_parts in schema.lua

Despite the non-zero complexity of the solution to this problem, I think this place is not
a problem in the context of this decision.

> +        }},
> +        exp_err = coll_not_found(1, 'unicode_ci'),
> +    },
> +    function()
> +        -- For collations.
> +        box.cfg{}
> +    end,
> +    -- Cases to call after box.cfg{}.
> +    {
> +        'Try to use both collation_id and collation',
> +        parts = {{
> +            fieldno = 1,
> +            type = 'string',
> +            collation_id = 2,
> +            collation = 'unicode_ci',
> +        }},
> +        exp_err = 'Conflicting options: collation_id and collation',
> +    },
> +    {
> +        'Unknown collation_id',
> +        parts = {{
> +            fieldno = 1,
> +            type = 'string',
> +            collation_id = 42,
> +        }},
> +        exp_err = coll_not_found(1, 42),
> +    },
> +    {
> +        'Unknown collation name',
> +        parts = {{
> +            fieldno = 1,
> +            type = 'string',
> +            collation = 'unknown',
> +        }},
> +        exp_err = 'Unknown collation: "unknown"',
> +    },
> +    {
> +        'Bad parts parameter type',
> +        parts = 1,
> +        exp_err = usage_error,
> +    },
> +    {
> +        'No parameters',
> +        params = {},
> +        exp_err = usage_error,
> +    },
> +    {
> +        'Two parameters',
> +        params = {{}, {}},
> +        exp_err = usage_error,
> +    },
> +    {
> +        'Success case; zero parts',
> +        parts = {},
> +        exp_err = nil,
> +    },
> +    {
> +        'Success case; one part',
> +        parts = {
> +            fieldno = 1,
> +            type = 'string',
> +        },
> +        exp_err = nil,
> +    },
> +}
> +
> +local test = tap.test('key_def')
> +
> +test:plan(#cases - 1)
> +for _, case in ipairs(cases) do
> +    if type(case) == 'function' then
> +        case()
> +    else
> +        local ok, res
> +        if case.params then
> +            ok, res = pcall(key_def.new, unpack(case.params))
> +        else
> +            ok, res = pcall(key_def.new, case.parts)
> +        end
> +        if case.exp_err == nil then
> +            ok = ok and type(res) == 'cdata' and
> +                ffi.istype('struct key_def', res)
> +            test:ok(ok, case[1])
> +        else
> +            local err = tostring(res) -- cdata -> string
> +            test:is_deeply({ok, err}, {false, case.exp_err}, case[1])
> +        end
> +    end
> +end
> +
> +os.exit(test:check() and 0 or 1)
> -- 
> 2.21.0

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [tarantool-patches] [PATCH 0/3] lua: add key_def lua module
  2019-03-22 14:24 ` [tarantool-patches] [PATCH 0/3] lua: add key_def lua module Kirill Shcherbatov
@ 2019-03-22 16:20   ` Alexander Turenko
  0 siblings, 0 replies; 14+ messages in thread
From: Alexander Turenko @ 2019-03-22 16:20 UTC (permalink / raw)
  To: Kirill Shcherbatov; +Cc: tarantool-patches, Vladimir Davydov

Thank you for the review!

Answers are below.

WBR, Alexander Turenko.

On Fri, Mar 22, 2019 at 05:24:33PM +0300, Kirill Shcherbatov wrote:
> Hi! Can't find the patch that we were discussed, so, I've copied It from branch by my own.
> 
> The code you implemented is very similar in many ways to the one already implemented in key_def.c.
> I have one principal proposal and one advice(may be good enough):
> 1) at first, all map key names must follow their semantic-twin is used with create_index/alter;
>     I mean field, not fieldno e.g.
>     The errors also must not differ, I believe.
>     https://tarantool.io/en/doc/1.10/book/box/box_space/#box-space-create-index

Sadly, create_index/alter format differs from one that is used in
box.space.<...>.index.<...>.parts. I think we definitely should support
the latter one, because typical use case of the key_def module is when
you already have an index and want to use its parts to create a Lua
key_def instance.

> 
> 2) (proposal) I think you can do without the function load_key_def_set_part, that repeats]
>     key_def_decode_parts code in many moments. You may encode tuple on region with
>     lbox_encode_tuple_on_gc() and pass it to key_def_decode_parts(). While key_def:new is
>     not performance-critical, this is better managed way to solve this problem, I think. Consider my
>     RFC diff below:
> 

It would be good to do it in that way (it recuces code duplication), but
I have two possible problems in the mind:

* The formats are different: field vs fieldno.
* Support net.box's collation_id (instead of collation). Maybe this
  problem will gone after fixing #3941.

^ permalink raw reply	[flat|nested] 14+ messages in thread

end of thread, other threads:[~2019-03-22 16:20 UTC | newest]

Thread overview: 14+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-12-16 20:17 [PATCH 0/3] Merger Alexander Turenko
2018-12-16 20:17 ` [PATCH 1/3] Add luaT_iscallable with support of cdata metatype Alexander Turenko
2018-12-26 18:35   ` Vladimir Davydov
2018-12-28  1:46     ` Alexander Turenko
2018-12-28  8:00       ` Vladimir Davydov
2018-12-16 20:17 ` [PATCH 2/3] Add module to ease using Lua iterators from C Alexander Turenko
2018-12-26 18:45   ` Vladimir Davydov
2018-12-31  6:43     ` Alexander Turenko
2018-12-16 20:17 ` [PATCH 3/3] Add merger for tuple streams Alexander Turenko
2018-12-26 20:11   ` Vladimir Davydov
2019-01-09 21:36     ` Alexander Turenko
2018-12-18 12:16 ` [PATCH 0/3] Merger Alexander Turenko
2019-03-22 14:24 ` [tarantool-patches] [PATCH 0/3] lua: add key_def lua module Kirill Shcherbatov
2019-03-22 16:20   ` Alexander Turenko

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox