Tarantool development patches archive
 help / color / mirror / Atom feed
* [PATCH v4 0/4] Merger
@ 2019-05-07 22:30 Alexander Turenko
  2019-05-07 22:30 ` [PATCH v4 1/4] lua: add non-recursive msgpack decoding functions Alexander Turenko
                   ` (3 more replies)
  0 siblings, 4 replies; 9+ messages in thread
From: Alexander Turenko @ 2019-05-07 22:30 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches

Changes since v3:

- Renamed decode_{array,map}() to decode_{array,map}_header().
- Clarified skip_header option with an example in the docbot comment.
- Dropped merger_context both from C and Lua.
- Eliminated pointless casts after malloc().
- Renamed merger source to merge source.
- Renamed <object>s_count to <object>_count.
- A buffer source now checks input msgpack (uses mp_check()).
- Fixed a source chunk validation, added test cases.
- Eliminated box_* types and functions where possible.
- Added merge_source_next() helper.
- Made base source functions static inline.
- Moved sources setting into the merger constructor.
- Removed dead code from merge_source_less().
- Create a source with refs == 1.
- Don't reallocate tuples.
- Rebased on top of the new master.
- Naming, comments and other minor changes suggested by Vladimir.
- Updated examples repo and usage in graphql.

More information about these changes can be found in review for 3rd
version of the patchset:

https://www.freelists.org/post/tarantool-patches/PATCH-v3-47-lua-add-nonrecursive-msgpack-decoding-functions,6
https://www.freelists.org/post/tarantool-patches/PATCH-v3-57-netbox-add-skip-header-option-to-use-with-buffer,4
https://www.freelists.org/post/tarantool-patches/PATCH-v3-67-Add-merger-for-tuples-streams-C-part,7
https://www.freelists.org/post/tarantool-patches/PATCH-v3-77-Add-merger-for-tuple-streams-Lua-part,7

More documentation and usage examples:
https://github.com/Totktonada/tarantool-merger-examples

issue: https://github.com/tarantool/tarantool/issues/3276
branch: https://github.com/tarantool/tarantool/tree/Totktonada/gh-3276-on-board-merger

Alexander Turenko (4):
  lua: add non-recursive msgpack decoding functions
  net.box: add skip_header option to use with buffer
  Add merger for tuples streams (C part)
  Add merger for tuple streams (Lua part)

 src/box/CMakeLists.txt        |    3 +
 src/box/lua/init.c            |    7 +-
 src/box/lua/merger.c          | 1143 +++++++++++++++++++++++++++++++++
 src/box/lua/merger.h          |   47 ++
 src/box/lua/merger.lua        |   41 ++
 src/box/lua/net_box.lua       |   56 +-
 src/box/merger.c              |  355 ++++++++++
 src/box/merger.h              |  150 +++++
 src/lua/msgpack.c             |   82 +++
 test/app-tap/msgpack.test.lua |  182 +++++-
 test/box-tap/merger.test.lua  |  768 ++++++++++++++++++++++
 test/box/net.box.result       |  222 ++++++-
 test/box/net.box.test.lua     |   86 ++-
 test/unit/CMakeLists.txt      |    3 +
 test/unit/merger.result       |   71 ++
 test/unit/merger.test.c       |  285 ++++++++
 16 files changed, 3468 insertions(+), 33 deletions(-)
 create mode 100644 src/box/lua/merger.c
 create mode 100644 src/box/lua/merger.h
 create mode 100644 src/box/lua/merger.lua
 create mode 100644 src/box/merger.c
 create mode 100644 src/box/merger.h
 create mode 100755 test/box-tap/merger.test.lua
 create mode 100644 test/unit/merger.result
 create mode 100644 test/unit/merger.test.c

-- 
2.21.0

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH v4 1/4] lua: add non-recursive msgpack decoding functions
  2019-05-07 22:30 [PATCH v4 0/4] Merger Alexander Turenko
@ 2019-05-07 22:30 ` Alexander Turenko
  2019-05-13 13:52   ` Vladimir Davydov
  2019-05-07 22:30 ` [PATCH v4 2/4] net.box: add skip_header option to use with buffer Alexander Turenko
                   ` (2 subsequent siblings)
  3 siblings, 1 reply; 9+ messages in thread
From: Alexander Turenko @ 2019-05-07 22:30 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches

Needed for #3276.

@TarantoolBot document
Title: Non-recursive msgpack decoding functions

Contracts:

```
msgpack.decode_array_header(buf.rpos, buf:size()) -> arr_len, new_rpos
msgpack.decode_map_header(buf.rpos, buf:size()) -> map_len, new_rpos
```

These functions are intended to be used with a msgpack buffer received
from net.box. A user may want to skip {[IPROTO_DATA_KEY] = ...} wrapper
and an array header before pass the buffer to decode in some C function.

See https://github.com/tarantool/tarantool/issues/2195 for more
information re this net.box's API.
---
 src/lua/msgpack.c             |  82 +++++++++++++++
 test/app-tap/msgpack.test.lua | 182 +++++++++++++++++++++++++++++++++-
 2 files changed, 263 insertions(+), 1 deletion(-)

diff --git a/src/lua/msgpack.c b/src/lua/msgpack.c
index 1b1874eb2..d3cfc0e2b 100644
--- a/src/lua/msgpack.c
+++ b/src/lua/msgpack.c
@@ -422,6 +422,86 @@ lua_ibuf_msgpack_decode(lua_State *L)
 	return 2;
 }
 
+/**
+ * Verify and set arguments: data and size.
+ *
+ * Always return 0. In case of any fail raise a Lua error.
+ */
+static int
+verify_decode_header_args(lua_State *L, const char *func_name,
+			  const char **data_p, ptrdiff_t *size_p)
+{
+	/* Verify arguments count. */
+	if (lua_gettop(L) != 2)
+		return luaL_error(L, "Usage: %s(ptr, size)", func_name);
+
+	/* Verify ptr type. */
+	uint32_t ctypeid;
+	const char *data = *(char **) luaL_checkcdata(L, 1, &ctypeid);
+	if (ctypeid != CTID_CHAR_PTR)
+		return luaL_error(L, "%s: 'char *' expected", func_name);
+
+	/* Verify size type and value. */
+	ptrdiff_t size = (ptrdiff_t) luaL_checkinteger(L, 2);
+	if (size <= 0)
+		return luaL_error(L, "%s: non-positive size", func_name);
+
+	*data_p = data;
+	*size_p = size;
+
+	return 0;
+}
+
+/**
+ * msgpack.decode_array_header(buf.rpos, buf:size())
+ * -> arr_len, new_rpos
+ */
+static int
+lua_decode_array_header(lua_State *L)
+{
+	const char *func_name = "msgpack.decode_array_header";
+	const char *data;
+	ptrdiff_t size;
+	verify_decode_header_args(L, func_name, &data, &size);
+
+	if (mp_typeof(*data) != MP_ARRAY)
+		return luaL_error(L, "%s: unexpected msgpack type", func_name);
+
+	if (mp_check_array(data, data + size) > 0)
+		return luaL_error(L, "%s: unexpected end of buffer", func_name);
+
+	uint32_t len = mp_decode_array(&data);
+
+	lua_pushinteger(L, len);
+	*(const char **) luaL_pushcdata(L, CTID_CHAR_PTR) = data;
+	return 2;
+}
+
+/**
+ * msgpack.decode_map_header(buf.rpos, buf:size())
+ * -> map_len, new_rpos
+ */
+static int
+lua_decode_map_header(lua_State *L)
+{
+	const char *func_name = "msgpack.decode_map_header";
+	const char *data;
+	ptrdiff_t size;
+	verify_decode_header_args(L, func_name, &data, &size);
+
+	if (mp_typeof(*data) != MP_MAP)
+		return luaL_error(L, "%s: unexpected msgpack type", func_name);
+
+	if (mp_check_map(data, data + size) > 0)
+		return luaL_error(L, "%s: unexpected end of buffer", func_name);
+
+	uint32_t len = mp_decode_map(&data);
+
+	lua_pushinteger(L, len);
+	*(const char **) luaL_pushcdata(L, CTID_CHAR_PTR) = data;
+	return 2;
+}
+
 static int
 lua_msgpack_new(lua_State *L);
 
@@ -430,6 +510,8 @@ static const luaL_Reg msgpacklib[] = {
 	{ "decode", lua_msgpack_decode },
 	{ "decode_unchecked", lua_msgpack_decode_unchecked },
 	{ "ibuf_decode", lua_ibuf_msgpack_decode },
+	{ "decode_array_header", lua_decode_array_header },
+	{ "decode_map_header", lua_decode_map_header },
 	{ "new", lua_msgpack_new },
 	{ NULL, NULL }
 };
diff --git a/test/app-tap/msgpack.test.lua b/test/app-tap/msgpack.test.lua
index 0e1692ad9..1b0bb9806 100755
--- a/test/app-tap/msgpack.test.lua
+++ b/test/app-tap/msgpack.test.lua
@@ -49,9 +49,188 @@ local function test_misc(test, s)
     test:ok(not st and e:match("null"), "null ibuf")
 end
 
+local function test_decode_array_map_header(test, s)
+    local ffi = require('ffi')
+
+    local usage_err = 'Usage: msgpack%.decode_[^_(]+_header%(ptr, size%)'
+    local end_of_buffer_err = 'msgpack%.decode_[^_]+_header: unexpected end ' ..
+        'of buffer'
+    local non_positive_size_err = 'msgpack.decode_[^_]+_header: ' ..
+        'non%-positive size'
+
+    local decode_cases = {
+        {
+            'fixarray',
+            func = s.decode_array_header,
+            data = ffi.cast('char *', '\x94'),
+            size = 1,
+            exp_len = 4,
+            exp_rewind = 1,
+        },
+        {
+            'array 16',
+            func = s.decode_array_header,
+            data = ffi.cast('char *', '\xdc\x00\x04'),
+            size = 3,
+            exp_len = 4,
+            exp_rewind = 3,
+        },
+        {
+            'array 32',
+            func = s.decode_array_header,
+            data = ffi.cast('char *', '\xdd\x00\x00\x00\x04'),
+            size = 5,
+            exp_len = 4,
+            exp_rewind = 5,
+        },
+        {
+            'truncated array 16',
+            func = s.decode_array_header,
+            data = ffi.cast('char *', '\xdc\x00'),
+            size = 2,
+            exp_err = end_of_buffer_err,
+        },
+        {
+            'truncated array 32',
+            func = s.decode_array_header,
+            data = ffi.cast('char *', '\xdd\x00\x00\x00'),
+            size = 4,
+            exp_err = end_of_buffer_err,
+        },
+        {
+            'fixmap',
+            func = s.decode_map_header,
+            data = ffi.cast('char *', '\x84'),
+            size = 1,
+            exp_len = 4,
+            exp_rewind = 1,
+        },
+        {
+            'map 16',
+            func = s.decode_map_header,
+            data = ffi.cast('char *', '\xde\x00\x04'),
+            size = 3,
+            exp_len = 4,
+            exp_rewind = 3,
+        },
+        {
+            'array 32',
+            func = s.decode_map_header,
+            data = ffi.cast('char *', '\xdf\x00\x00\x00\x04'),
+            size = 5,
+            exp_len = 4,
+            exp_rewind = 5,
+        },
+        {
+            'truncated map 16',
+            func = s.decode_map_header,
+            data = ffi.cast('char *', '\xde\x00'),
+            size = 2,
+            exp_err = end_of_buffer_err,
+        },
+        {
+            'truncated map 32',
+            func = s.decode_map_header,
+            data = ffi.cast('char *', '\xdf\x00\x00\x00'),
+            size = 4,
+            exp_err = end_of_buffer_err,
+        },
+    }
+
+    local bad_api_cases = {
+        {
+            'wrong msgpack type',
+            data = ffi.cast('char *', '\xc0'),
+            size = 1,
+            exp_err = 'msgpack.decode_[^_]+_header: unexpected msgpack type',
+        },
+        {
+            'zero size buffer',
+            data = ffi.cast('char *', ''),
+            size = 0,
+            exp_err = non_positive_size_err,
+        },
+        {
+            'negative size buffer',
+            data = ffi.cast('char *', ''),
+            size = -1,
+            exp_err = non_positive_size_err,
+        },
+        {
+            'size is nil',
+            data = ffi.cast('char *', ''),
+            size = nil,
+            exp_err = 'bad argument',
+        },
+        {
+            'no arguments',
+            args = {},
+            exp_err = usage_err,
+        },
+        {
+            'one argument',
+            args = {ffi.cast('char *', '')},
+            exp_err = usage_err,
+        },
+        {
+            'data is nil',
+            args = {nil, 1},
+            exp_err = 'expected cdata as 1 argument',
+        },
+        {
+            'data is not cdata',
+            args = {1, 1},
+            exp_err = 'expected cdata as 1 argument',
+        },
+        {
+            'data with wrong cdata type',
+            args = {box.tuple.new(), 1},
+            exp_err = "msgpack.decode_[^_]+_header: 'char %*' expected",
+        },
+        {
+            'size has wrong type',
+            args = {ffi.cast('char *', ''), 'eee'},
+            exp_err = 'bad argument',
+        },
+    }
+
+    test:plan(#decode_cases + 2 * #bad_api_cases)
+
+    -- Decode cases.
+    for _, case in ipairs(decode_cases) do
+        if case.exp_err ~= nil then
+            local ok, err = pcall(case.func, case.data, case.size)
+            local description = ('bad; %s'):format(case[1])
+            test:ok(ok == false and err:match(case.exp_err), description)
+        else
+            local len, new_buf = case.func(case.data, case.size)
+            local rewind = new_buf - case.data
+            local description = ('good; %s'):format(case[1])
+            test:is_deeply({len, rewind}, {case.exp_len, case.exp_rewind},
+                description)
+        end
+    end
+
+    -- Bad api usage cases.
+    for _, func_name in ipairs({'decode_array_header', 'decode_map_header'}) do
+        for _, case in ipairs(bad_api_cases) do
+            local ok, err
+            if case.args ~= nil then
+                local args_len = table.maxn(case.args)
+                ok, err = pcall(s[func_name], unpack(case.args, 1, args_len))
+            else
+                ok, err = pcall(s[func_name], case.data, case.size)
+            end
+            local description = ('%s bad api usage; %s'):format(func_name,
+                                                                case[1])
+            test:ok(ok == false and err:match(case.exp_err), description)
+        end
+    end
+end
+
 tap.test("msgpack", function(test)
     local serializer = require('msgpack')
-    test:plan(10)
+    test:plan(11)
     test:test("unsigned", common.test_unsigned, serializer)
     test:test("signed", common.test_signed, serializer)
     test:test("double", common.test_double, serializer)
@@ -62,4 +241,5 @@ tap.test("msgpack", function(test)
     test:test("ucdata", common.test_ucdata, serializer)
     test:test("offsets", test_offsets, serializer)
     test:test("misc", test_misc, serializer)
+    test:test("decode_array_map", test_decode_array_map_header, serializer)
 end)
-- 
2.21.0

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH v4 2/4] net.box: add skip_header option to use with buffer
  2019-05-07 22:30 [PATCH v4 0/4] Merger Alexander Turenko
  2019-05-07 22:30 ` [PATCH v4 1/4] lua: add non-recursive msgpack decoding functions Alexander Turenko
@ 2019-05-07 22:30 ` Alexander Turenko
  2019-05-13 13:52   ` Vladimir Davydov
  2019-05-07 22:30 ` [PATCH v4 3/4] Add merger for tuples streams (C part) Alexander Turenko
  2019-05-07 22:30 ` [PATCH v4 4/4] Add merger for tuple streams (Lua part) Alexander Turenko
  3 siblings, 1 reply; 9+ messages in thread
From: Alexander Turenko @ 2019-05-07 22:30 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches

Needed for #3276.

@TarantoolBot document
Title: net.box: skip_header option

This option instructs net.box to skip {[IPROTO_DATA_KEY] = ...} wrapper
from a buffer. This may be needed to pass this buffer to some C function
when it expects some specific msgpack input.

Usage example:

```lua
local net_box = require('net.box')
local buffer = require('buffer')
local ffi = require('ffi')
local msgpack = require('msgpack')
local yaml = require('yaml')

box.cfg{listen = 3301}
box.once('load_data', function()
    box.schema.user.grant('guest', 'read,write,execute', 'universe')
    box.schema.space.create('s')
    box.space.s:create_index('pk')
    box.space.s:insert({1})
    box.space.s:insert({2})
    box.space.s:insert({3})
    box.space.s:insert({4})
end)

local function foo()
    return box.space.s:select()
end
_G.foo = foo

local conn = net_box.connect('localhost:3301')

local buf = buffer.ibuf()
conn.space.s:select(nil, {buffer = buf})
local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
local buf_lua = msgpack.decode(buf_str)
print('select:\n' .. yaml.encode(buf_lua))
-- {48: [[1], [2], [3], [4]]}

local buf = buffer.ibuf()
conn.space.s:select(nil, {buffer = buf, skip_header = true})
local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
local buf_lua = msgpack.decode(buf_str)
print('select:\n' .. yaml.encode(buf_lua))
-- [[1], [2], [3], [4]]

local buf = buffer.ibuf()
conn:call('foo', nil, {buffer = buf})
local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
local buf_lua = msgpack.decode(buf_str)
print('call:\n' .. yaml.encode(buf_lua))
-- {48: [[[1], [2], [3], [4]]]}

local buf = buffer.ibuf()
conn:call('foo', nil, {buffer = buf, skip_header = true})
local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
local buf_lua = msgpack.decode(buf_str)
print('call:\n' .. yaml.encode(buf_lua))
-- [[[1], [2], [3], [4]]]

os.exit()
```
---
 src/box/lua/net_box.lua   |  56 ++++++----
 test/box/net.box.result   | 222 +++++++++++++++++++++++++++++++++++++-
 test/box/net.box.test.lua |  86 ++++++++++++++-
 3 files changed, 333 insertions(+), 31 deletions(-)

diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index e1c4b652b..0cfb5075d 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -10,11 +10,12 @@ local urilib   = require('uri')
 local internal = require('net.box.lib')
 local trigger  = require('internal.trigger')
 
-local band          = bit.band
-local max           = math.max
-local fiber_clock   = fiber.clock
-local fiber_self    = fiber.self
-local decode        = msgpack.decode_unchecked
+local band              = bit.band
+local max               = math.max
+local fiber_clock       = fiber.clock
+local fiber_self        = fiber.self
+local decode            = msgpack.decode_unchecked
+local decode_map_header = msgpack.decode_map_header
 
 local table_new           = require('table.new')
 local check_iterator_type = box.internal.check_iterator_type
@@ -483,8 +484,8 @@ local function create_transport(host, port, user, password, callback,
     -- @retval nil, error Error occured.
     -- @retval not nil Future object.
     --
-    local function perform_async_request(buffer, method, on_push, on_push_ctx,
-                                         ...)
+    local function perform_async_request(buffer, skip_header, method, on_push,
+                                         on_push_ctx, ...)
         if state ~= 'active' and state ~= 'fetch_schema' then
             return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
                                        reason = last_error})
@@ -497,12 +498,13 @@ local function create_transport(host, port, user, password, callback,
         local id = next_request_id
         method_encoder[method](send_buf, id, ...)
         next_request_id = next_id(id)
-        -- Request in most cases has maximum 8 members:
-        -- method, buffer, id, cond, errno, response, on_push,
-        -- on_push_ctx.
-        local request = setmetatable(table_new(0, 8), request_mt)
+        -- Request in most cases has maximum 9 members:
+        -- method, buffer, skip_header, id, cond, errno, response,
+        -- on_push, on_push_ctx.
+        local request = setmetatable(table_new(0, 9), request_mt)
         request.method = method
         request.buffer = buffer
+        request.skip_header = skip_header
         request.id = id
         request.cond = fiber.cond()
         requests[id] = request
@@ -516,10 +518,11 @@ local function create_transport(host, port, user, password, callback,
     -- @retval nil, error Error occured.
     -- @retval not nil Response object.
     --
-    local function perform_request(timeout, buffer, method, on_push,
-                                   on_push_ctx, ...)
+    local function perform_request(timeout, buffer, skip_header, method,
+                                   on_push, on_push_ctx, ...)
         local request, err =
-            perform_async_request(buffer, method, on_push, on_push_ctx, ...)
+            perform_async_request(buffer, skip_header, method, on_push,
+                                  on_push_ctx, ...)
         if not request then
             return nil, err
         end
@@ -551,6 +554,15 @@ local function create_transport(host, port, user, password, callback,
         if buffer ~= nil then
             -- Copy xrow.body to user-provided buffer
             local body_len = body_end - body_rpos
+            if request.skip_header then
+                -- Skip {[IPROTO_DATA_KEY] = ...} wrapper.
+                local map_len, key
+                map_len, body_rpos = decode_map_header(body_rpos, body_len)
+                assert(map_len == 1)
+                key, body_rpos = decode(body_rpos)
+                assert(key == IPROTO_DATA_KEY)
+                body_len = body_end - body_rpos
+            end
             local wpos = buffer:alloc(body_len)
             ffi.copy(wpos, body_rpos, body_len)
             body_len = tonumber(body_len)
@@ -1047,18 +1059,19 @@ end
 
 function remote_methods:_request(method, opts, ...)
     local transport = self._transport
-    local on_push, on_push_ctx, buffer, deadline
+    local on_push, on_push_ctx, buffer, skip_header, deadline
     -- Extract options, set defaults, check if the request is
     -- async.
     if opts then
         buffer = opts.buffer
+        skip_header = opts.skip_header
         if opts.is_async then
             if opts.on_push or opts.on_push_ctx then
                 error('To handle pushes in an async request use future:pairs()')
             end
             local res, err =
-                transport.perform_async_request(buffer, method, table.insert,
-                                                {}, ...)
+                transport.perform_async_request(buffer, skip_header, method,
+                                                table.insert, {}, ...)
             if err then
                 box.error(err)
             end
@@ -1084,8 +1097,9 @@ function remote_methods:_request(method, opts, ...)
         transport.wait_state('active', timeout)
         timeout = deadline and max(0, deadline - fiber_clock())
     end
-    local res, err = transport.perform_request(timeout, buffer, method,
-                                               on_push, on_push_ctx, ...)
+    local res, err = transport.perform_request(timeout, buffer, skip_header,
+                                               method, on_push, on_push_ctx,
+                                               ...)
     if err then
         box.error(err)
     end
@@ -1288,10 +1302,10 @@ function console_methods:eval(line, timeout)
     end
     if self.protocol == 'Binary' then
         local loader = 'return require("console").eval(...)'
-        res, err = pr(timeout, nil, 'eval', nil, nil, loader, {line})
+        res, err = pr(timeout, nil, false, 'eval', nil, nil, loader, {line})
     else
         assert(self.protocol == 'Lua console')
-        res, err = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n')
+        res, err = pr(timeout, nil, false, 'inject', nil, nil, line..'$EOF$\n')
     end
     if err then
         box.error(err)
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 3894f97d4..d02fed7a3 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -29,7 +29,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
                             offset, limit, key)
     return ret
 end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end
 test_run:cmd("setopt delimiter ''");
 ---
 ...
@@ -1598,6 +1598,18 @@ result
 ---
 - {48: [[2]]}
 ...
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[2]]
+...
 -- insert
 c.space.test:insert({3}, {buffer = ibuf})
 ---
@@ -1610,6 +1622,21 @@ result
 ---
 - {48: [[3]]}
 ...
+-- insert + skip_header
+_ = space:delete({3})
+---
+...
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
 -- update
 c.space.test:update({3}, {}, {buffer = ibuf})
 ---
@@ -1633,6 +1660,29 @@ result
 ---
 - {48: [[3]]}
 ...
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
 -- upsert
 c.space.test:upsert({4}, {}, {buffer = ibuf})
 ---
@@ -1645,6 +1695,18 @@ result
 ---
 - {48: []}
 ...
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
 -- delete
 c.space.test:upsert({4}, {}, {buffer = ibuf})
 ---
@@ -1657,6 +1719,18 @@ result
 ---
 - {48: []}
 ...
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
 -- select
 c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
 ---
@@ -1669,6 +1743,18 @@ result
 ---
 - {48: [[3], [2], [1, 'hello']]}
 ...
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true})
+---
+- 17
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3], [2], [1, 'hello']]
+...
 -- select
 len = c.space.test:select({}, {buffer = ibuf})
 ---
@@ -1692,6 +1778,29 @@ result
 ---
 - {48: [[1, 'hello'], [2], [3], [4]]}
 ...
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+---
+...
+ibuf.rpos + len == ibuf.wpos
+---
+- true
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+ibuf.rpos == ibuf.wpos
+---
+- true
+...
+len
+---
+- 19
+...
+result
+---
+- [[1, 'hello'], [2], [3], [4]]
+...
 -- call
 c:call("echo", {1, 2, 3}, {buffer = ibuf})
 ---
@@ -1726,6 +1835,40 @@ result
 ---
 - {48: []}
 ...
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
 -- eval
 c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
 ---
@@ -1760,6 +1903,75 @@ result
 ---
 - {48: []}
 ...
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+-- make several request into a buffer with skip_header, then read
+-- results
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
 -- unsupported methods
 c.space.test:get({1}, { buffer = ibuf})
 ---
@@ -2603,7 +2815,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 ---
 ...
 c.state
@@ -3244,7 +3456,7 @@ c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
 future = c:call('long_function', {1, 2, 3}, {is_async = true})
 ---
 ...
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 ---
 ...
 while not c:is_connected() do fiber.sleep(0.01) end
@@ -3379,7 +3591,7 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 ---
 ...
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
@@ -3398,7 +3610,7 @@ c = net:connect(box.cfg.listen)
 data = msgpack.encode(18400000000000000000)..'aaaaaaa'
 ---
 ...
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
 ---
 - null
 - Peer closed
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index d05b8c605..bea43002d 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -12,7 +12,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
                             offset, limit, key)
     return ret
 end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end
 test_run:cmd("setopt delimiter ''");
 
 LISTEN = require('uri').parse(box.cfg.listen)
@@ -626,11 +626,22 @@ c.space.test:replace({2}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- insert
 c.space.test:insert({3}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- insert + skip_header
+_ = space:delete({3})
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- update
 c.space.test:update({3}, {}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -639,21 +650,44 @@ c.space.test.index.primary:update({3}, {}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- upsert
 c.space.test:upsert({4}, {}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- delete
 c.space.test:upsert({4}, {}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- select
 c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- select
 len = c.space.test:select({}, {buffer = ibuf})
 ibuf.rpos + len == ibuf.wpos
@@ -662,6 +696,14 @@ ibuf.rpos == ibuf.wpos
 len
 result
 
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+ibuf.rpos + len == ibuf.wpos
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+ibuf.rpos == ibuf.wpos
+len
+result
+
 -- call
 c:call("echo", {1, 2, 3}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -673,6 +715,17 @@ c:call("echo", nil, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- eval
 c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -684,6 +737,29 @@ c:eval("echo(...)", nil, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
+-- make several request into a buffer with skip_header, then read
+-- results
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- unsupported methods
 c.space.test:get({1}, { buffer = ibuf})
 c.space.test.index.primary:min({}, { buffer = ibuf})
@@ -1079,7 +1155,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 c.state
 while not c:is_connected() do fiber.sleep(0.01) end
 c:ping()
@@ -1312,7 +1388,7 @@ finalize_long()
 --
 c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
 future = c:call('long_function', {1, 2, 3}, {is_async = true})
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 while not c:is_connected() do fiber.sleep(0.01) end
 finalize_long()
 future:wait_result(100)
@@ -1369,7 +1445,7 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
 c:close()
 
@@ -1379,7 +1455,7 @@ c:close()
 --
 c = net:connect(box.cfg.listen)
 data = msgpack.encode(18400000000000000000)..'aaaaaaa'
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
 c:close()
 test_run:grep_log('default', 'too big packet size in the header') ~= nil
 
-- 
2.21.0

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH v4 3/4] Add merger for tuples streams (C part)
  2019-05-07 22:30 [PATCH v4 0/4] Merger Alexander Turenko
  2019-05-07 22:30 ` [PATCH v4 1/4] lua: add non-recursive msgpack decoding functions Alexander Turenko
  2019-05-07 22:30 ` [PATCH v4 2/4] net.box: add skip_header option to use with buffer Alexander Turenko
@ 2019-05-07 22:30 ` Alexander Turenko
  2019-05-13 14:49   ` Vladimir Davydov
  2019-05-07 22:30 ` [PATCH v4 4/4] Add merger for tuple streams (Lua part) Alexander Turenko
  3 siblings, 1 reply; 9+ messages in thread
From: Alexander Turenko @ 2019-05-07 22:30 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches

Needed for #3276.
---
 src/box/CMakeLists.txt   |   1 +
 src/box/merger.c         | 355 +++++++++++++++++++++++++++++++++++++++
 src/box/merger.h         | 150 +++++++++++++++++
 test/unit/CMakeLists.txt |   3 +
 test/unit/merger.result  |  71 ++++++++
 test/unit/merger.test.c  | 285 +++++++++++++++++++++++++++++++
 6 files changed, 865 insertions(+)
 create mode 100644 src/box/merger.c
 create mode 100644 src/box/merger.h
 create mode 100644 test/unit/merger.result
 create mode 100644 test/unit/merger.test.c

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 2be0d1e35..ce328fb95 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -122,6 +122,7 @@ add_library(box STATIC
     execute.c
     wal.c
     call.c
+    merger.c
     ${lua_sources}
     lua/init.c
     lua/call.c
diff --git a/src/box/merger.c b/src/box/merger.c
new file mode 100644
index 000000000..744bba469
--- /dev/null
+++ b/src/box/merger.c
@@ -0,0 +1,355 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "box/merger.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#define HEAP_FORWARD_DECLARATION
+#include "salad/heap.h"
+
+#include "diag.h"             /* diag_set() */
+#include "box/tuple.h"        /* tuple_ref(), tuple_unref(),
+				 tuple_validate() */
+#include "box/tuple_format.h" /* box_tuple_format_new(),
+				 tuple_format_*() */
+#include "box/key_def.h"      /* key_def_*(),
+				 tuple_compare() */
+
+/* {{{ Merger */
+
+/**
+ * Holds a source to fetch next tuples and a last fetched tuple to
+ * compare the node against other nodes.
+ *
+ * The main reason why this structure is separated from a merge
+ * source is that a heap node can not be a member of several
+ * heaps.
+ *
+ * The second reason is that it allows to encapsulate all heap
+ * related logic inside this compilation unit, without any traces
+ * in externally visible structures.
+ */
+struct merger_heap_node {
+	/* A source of tuples. */
+	struct merge_source *source;
+	/*
+	 * A last fetched (refcounted) tuple to compare against
+	 * other nodes.
+	 */
+	struct tuple *tuple;
+	/* An anchor to make the structure a merger heap node. */
+	struct heap_node in_merger;
+};
+
+static bool
+merge_source_less(const heap_t *heap, const struct merger_heap_node *left,
+		  const struct merger_heap_node *right);
+#define HEAP_NAME merger_heap
+#define HEAP_LESS merge_source_less
+#define heap_value_t struct merger_heap_node
+#define heap_value_attr in_merger
+#include "salad/heap.h"
+#undef HEAP_NAME
+#undef HEAP_LESS
+#undef heap_value_t
+#undef heap_value_attr
+
+/**
+ * Holds a heap, parameters of a merge process and utility fields.
+ */
+struct merger {
+	/* A merger is a source. */
+	struct merge_source base;
+	/*
+	 * Whether a merge process started.
+	 *
+	 * The merger postpones charging of heap nodes until a
+	 * first output tuple is acquired.
+	 */
+	bool started;
+	/* A key_def to compare tuples. */
+	struct key_def *key_def;
+	/* A format to acquire compatible tuples from sources. */
+	struct tuple_format *format;
+	/*
+	 * A heap of sources (of nodes that contains a source to
+	 * be exact).
+	 */
+	heap_t heap;
+	/* An array of heap nodes. */
+	uint32_t node_count;
+	struct merger_heap_node *nodes;
+	/* Ascending (false) / descending (true) order. */
+	bool reverse;
+};
+
+/* Helpers */
+
+/**
+ * Data comparing function to construct a heap of sources.
+ */
+static bool
+merge_source_less(const heap_t *heap, const struct merger_heap_node *left,
+		  const struct merger_heap_node *right)
+{
+	assert(left->tuple != NULL);
+	assert(right->tuple != NULL);
+	struct merger *merger = container_of(heap, struct merger, heap);
+	int cmp = tuple_compare(left->tuple, right->tuple, merger->key_def);
+	return merger->reverse ? cmp >= 0 : cmp < 0;
+}
+
+/**
+ * Initialize a new merger heap node.
+ */
+static void
+merger_heap_node_create(struct merger_heap_node *node,
+			struct merge_source *source)
+{
+	node->source = source;
+	merge_source_ref(node->source);
+	node->tuple = NULL;
+	heap_node_create(&node->in_merger);
+}
+
+/**
+ * Free a merger heap node.
+ */
+static void
+merger_heap_node_delete(struct merger_heap_node *node)
+{
+	merge_source_unref(node->source);
+	if (node->tuple != NULL)
+		tuple_unref(node->tuple);
+}
+
+/**
+ * The helper to add a new heap node to a merger heap.
+ *
+ * Return -1 at an error and set a diag.
+ *
+ * Otherwise store a next tuple in node->tuple, add the node to
+ * merger->heap and return 0.
+ */
+static int
+merger_add_heap_node(struct merger *merger, struct merger_heap_node *node)
+{
+	struct tuple *tuple = NULL;
+
+	/* Acquire a next tuple. */
+	struct merge_source *source = node->source;
+	if (merge_source_next(source, merger->format, &tuple) != 0)
+		return -1;
+
+	/* Don't add an empty source to a heap. */
+	if (tuple == NULL)
+		return 0;
+
+	node->tuple = tuple;
+
+	/* Add a node to a heap. */
+	if (merger_heap_insert(&merger->heap, node) != 0) {
+		diag_set(OutOfMemory, 0, "malloc", "merger->heap");
+		return -1;
+	}
+
+	return 0;
+}
+
+/* Virtual methods declarations */
+
+static void
+merger_delete(struct merge_source *base);
+static int
+merger_next(struct merge_source *base, struct tuple_format *format,
+	    struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Set sources for a merger.
+ *
+ * It is the helper for merger_new().
+ *
+ * Return 0 at success. Return -1 at an error and set a diag.
+ */
+static int
+merger_set_sources(struct merger *merger, struct merge_source **sources,
+		   uint32_t source_count)
+{
+	const size_t nodes_size = sizeof(struct merger_heap_node) *
+		source_count;
+	struct merger_heap_node *nodes = malloc(nodes_size);
+	if (nodes == NULL) {
+		diag_set(OutOfMemory, nodes_size, "malloc",
+			 "merger heap nodes");
+		return -1;
+	}
+
+	for (uint32_t i = 0; i < source_count; ++i)
+		merger_heap_node_create(&nodes[i], sources[i]);
+
+	merger->node_count = source_count;
+	merger->nodes = nodes;
+	return 0;
+}
+
+
+struct merge_source *
+merger_new(struct key_def *key_def, struct merge_source **sources,
+	   uint32_t source_count, bool reverse)
+{
+	static struct merge_source_vtab merger_vtab = {
+		.destroy = merger_delete,
+		.next = merger_next,
+	};
+
+	struct merger *merger = malloc(sizeof(struct merger));
+	if (merger == NULL) {
+		diag_set(OutOfMemory, sizeof(struct merger), "malloc",
+			 "merger");
+		return NULL;
+	}
+
+	/*
+	 * We need to copy the key_def because it can be collected
+	 * before a merge process ends (say, by LuaJIT GC if the
+	 * key_def comes from Lua).
+	 */
+	key_def = key_def_dup(key_def);
+	if (key_def == NULL) {
+		free(merger);
+		return NULL;
+	}
+
+	struct tuple_format *format = box_tuple_format_new(&key_def, 1);
+	if (format == NULL) {
+		key_def_delete(key_def);
+		free(merger);
+		return NULL;
+	}
+
+	merge_source_create(&merger->base, &merger_vtab);
+	merger->started = false;
+	merger->key_def = key_def;
+	merger->format = format;
+	merger_heap_create(&merger->heap);
+	merger->node_count = 0;
+	merger->nodes = NULL;
+	merger->reverse = reverse;
+
+	if (merger_set_sources(merger, sources, source_count) != 0) {
+		key_def_delete(merger->key_def);
+		tuple_format_unref(merger->format);
+		merger_heap_destroy(&merger->heap);
+		free(merger);
+		return NULL;
+	}
+
+	return &merger->base;
+}
+
+/* Virtual methods */
+
+static void
+merger_delete(struct merge_source *base)
+{
+	struct merger *merger = container_of(base, struct merger, base);
+
+	key_def_delete(merger->key_def);
+	tuple_format_unref(merger->format);
+	merger_heap_destroy(&merger->heap);
+
+	for (uint32_t i = 0; i < merger->node_count; ++i)
+		merger_heap_node_delete(&merger->nodes[i]);
+
+	if (merger->nodes != NULL)
+		free(merger->nodes);
+
+	free(merger);
+}
+
+static int
+merger_next(struct merge_source *base, struct tuple_format *format,
+	    struct tuple **out)
+{
+	struct merger *merger = container_of(base, struct merger, base);
+
+	/*
+	 * Fetch a first tuple for each source and add all heap
+	 * nodes to a merger heap.
+	 */
+	if (!merger->started) {
+		for (uint32_t i = 0; i < merger->node_count; ++i) {
+			struct merger_heap_node *node = &merger->nodes[i];
+			if (merger_add_heap_node(merger, node) != 0)
+				return -1;
+		}
+		merger->started = true;
+	}
+
+	/* Get a next tuple. */
+	struct merger_heap_node *node = merger_heap_top(&merger->heap);
+	if (node == NULL) {
+		*out = NULL;
+		return 0;
+	}
+	struct tuple *tuple = node->tuple;
+	assert(tuple != NULL);
+
+	/* Validate the tuple. */
+	if (format != NULL && tuple_validate(format, tuple) != 0)
+		return -1;
+
+	/*
+	 * Note: An old node->tuple pointer will be written to
+	 * *out as refcounted tuple, so we don't unreference it
+	 * here.
+	 */
+	struct merge_source *source = node->source;
+	if (merge_source_next(source, merger->format, &node->tuple) != 0)
+		return -1;
+
+	/* Update a heap. */
+	if (node->tuple == NULL)
+		merger_heap_delete(&merger->heap, node);
+	else
+		merger_heap_update(&merger->heap, node);
+
+	*out = tuple;
+	return 0;
+}
+
+/* }}} */
diff --git a/src/box/merger.h b/src/box/merger.h
new file mode 100644
index 000000000..90ae175a9
--- /dev/null
+++ b/src/box/merger.h
@@ -0,0 +1,150 @@
+#ifndef TARANTOOL_BOX_MERGER_H_INCLUDED
+#define TARANTOOL_BOX_MERGER_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/* {{{ Structures */
+
+struct tuple;
+struct key_def;
+struct tuple_format;
+
+struct merge_source;
+
+struct merge_source_vtab {
+	/**
+	 * Free a merge source.
+	 *
+	 * Don't call it directly, use merge_source_unref()
+	 * instead.
+	 */
+	void (*destroy)(struct merge_source *base);
+	/**
+	 * Get a next tuple (refcounted) from a source.
+	 *
+	 * When format is not NULL the resulting tuple will be in
+	 * a compatible format.
+	 *
+	 * When format is NULL it means that it does not matter
+	 * for a caller in which format a tuple will be.
+	 *
+	 * Return 0 when successfully fetched a tuple or NULL. In
+	 * case of an error set a diag and return -1.
+	 */
+	int (*next)(struct merge_source *base, struct tuple_format *format,
+		    struct tuple **out);
+};
+
+/**
+ * Base (abstract) structure to represent a merge source.
+ *
+ * The structure does not hold any resources.
+ */
+struct merge_source {
+	/* Source-specific methods. */
+	const struct merge_source_vtab *vtab;
+	/* Reference counter. */
+	int refs;
+};
+
+/* }}} */
+
+/* {{{ Base merge source functions */
+
+/**
+ * Increment a merge source reference counter.
+ */
+static inline void
+merge_source_ref(struct merge_source *source)
+{
+	++source->refs;
+}
+
+/**
+ * Decrement a merge source reference counter. When it has
+ * reached zero, free the source (call destroy() virtual method).
+ */
+static inline void
+merge_source_unref(struct merge_source *source)
+{
+	assert(source->refs - 1 >= 0);
+	if (--source->refs == 0)
+		source->vtab->destroy(source);
+}
+
+/**
+ * @see merge_source_vtab
+ */
+static inline int
+merge_source_next(struct merge_source *source, struct tuple_format *format,
+		  struct tuple **out)
+{
+	return source->vtab->next(source, format, out);
+}
+
+/**
+ * Initialize a base merge source structure.
+ */
+static inline void
+merge_source_create(struct merge_source *source, struct merge_source_vtab *vtab)
+{
+	source->vtab = vtab;
+	source->refs = 1;
+}
+
+/* }}} */
+
+/* {{{ Merger */
+
+/**
+ * Create a new merger.
+ *
+ * Return NULL and set a diag in case of an error.
+ */
+struct merge_source *
+merger_new(struct key_def *key_def, struct merge_source **sources,
+	   uint32_t source_count, bool reverse);
+
+/* }}} */
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_BOX_MERGER_H_INCLUDED */
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 2c8340800..07dcd6cf2 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -224,3 +224,6 @@ target_link_libraries(swim.test unit swim)
 add_executable(swim_proto.test swim_proto.c swim_test_transport.c swim_test_ev.c
                swim_test_utils.c ${PROJECT_SOURCE_DIR}/src/version.c)
 target_link_libraries(swim_proto.test unit swim)
+
+add_executable(merger.test merger.test.c)
+target_link_libraries(merger.test unit core box)
diff --git a/test/unit/merger.result b/test/unit/merger.result
new file mode 100644
index 000000000..766ebce63
--- /dev/null
+++ b/test/unit/merger.result
@@ -0,0 +1,71 @@
+1..4
+	*** test_basic ***
+    1..9
+	*** test_array_source ***
+    ok 1 - array source next() (any format): tuple != NULL
+    ok 2 - array source next() (any format): skip tuple validation
+    ok 3 - array source next() (any format): check tuple size
+    ok 4 - array source next() (any format): check tuple data
+    ok 5 - array source next() (any format): tuple != NULL
+    ok 6 - array source next() (any format): skip tuple validation
+    ok 7 - array source next() (any format): check tuple size
+    ok 8 - array source next() (any format): check tuple data
+    ok 9 - array source is empty (any format)
+	*** test_array_source: done ***
+ok 1 - subtests
+    1..9
+	*** test_array_source ***
+    ok 1 - array source next() (user's format): tuple != NULL
+    ok 2 - array source next() (user's format): validate tuple
+    ok 3 - array source next() (user's format): check tuple size
+    ok 4 - array source next() (user's format): check tuple data
+    ok 5 - array source next() (user's format): tuple != NULL
+    ok 6 - array source next() (user's format): validate tuple
+    ok 7 - array source next() (user's format): check tuple size
+    ok 8 - array source next() (user's format): check tuple data
+    ok 9 - array source is empty (user's format)
+	*** test_array_source: done ***
+ok 2 - subtests
+    1..17
+	*** test_merger ***
+    ok 1 - merger next() (any format): tuple != NULL
+    ok 2 - merger next() (any format): skip tuple validation
+    ok 3 - merger next() (any format): check tuple size
+    ok 4 - merger next() (any format): check tuple data
+    ok 5 - merger next() (any format): tuple != NULL
+    ok 6 - merger next() (any format): skip tuple validation
+    ok 7 - merger next() (any format): check tuple size
+    ok 8 - merger next() (any format): check tuple data
+    ok 9 - merger next() (any format): tuple != NULL
+    ok 10 - merger next() (any format): skip tuple validation
+    ok 11 - merger next() (any format): check tuple size
+    ok 12 - merger next() (any format): check tuple data
+    ok 13 - merger next() (any format): tuple != NULL
+    ok 14 - merger next() (any format): skip tuple validation
+    ok 15 - merger next() (any format): check tuple size
+    ok 16 - merger next() (any format): check tuple data
+    ok 17 - merger is empty (any format)
+	*** test_merger: done ***
+ok 3 - subtests
+    1..17
+	*** test_merger ***
+    ok 1 - merger next() (user's format): tuple != NULL
+    ok 2 - merger next() (user's format): validate tuple
+    ok 3 - merger next() (user's format): check tuple size
+    ok 4 - merger next() (user's format): check tuple data
+    ok 5 - merger next() (user's format): tuple != NULL
+    ok 6 - merger next() (user's format): validate tuple
+    ok 7 - merger next() (user's format): check tuple size
+    ok 8 - merger next() (user's format): check tuple data
+    ok 9 - merger next() (user's format): tuple != NULL
+    ok 10 - merger next() (user's format): validate tuple
+    ok 11 - merger next() (user's format): check tuple size
+    ok 12 - merger next() (user's format): check tuple data
+    ok 13 - merger next() (user's format): tuple != NULL
+    ok 14 - merger next() (user's format): validate tuple
+    ok 15 - merger next() (user's format): check tuple size
+    ok 16 - merger next() (user's format): check tuple data
+    ok 17 - merger is empty (user's format)
+	*** test_merger: done ***
+ok 4 - subtests
+	*** test_basic: done ***
diff --git a/test/unit/merger.test.c b/test/unit/merger.test.c
new file mode 100644
index 000000000..b4a989a20
--- /dev/null
+++ b/test/unit/merger.test.c
@@ -0,0 +1,285 @@
+#include "unit.h"              /* plan, header, footer, is, ok */
+#include "memory.h"            /* memory_init() */
+#include "fiber.h"             /* fiber_init() */
+#include "box/tuple.h"         /* tuple_init(), tuple_*(),
+				  tuple_validate() */
+#include "box/tuple_format.h"  /* tuple_format_*,
+				  box_tuple_format_new() */
+#include "box/key_def.h"       /* key_def_new(),
+				  key_def_delete() */
+#include "box/merger.h"        /* merger_*() */
+
+/* {{{ Array merge source */
+
+struct merge_source_array {
+	struct merge_source base;
+	uint32_t tuple_count;
+	struct tuple **tuples;
+	uint32_t cur;
+};
+
+/* Virtual methods declarations */
+
+static void
+merge_source_array_destroy(struct merge_source *base);
+static int
+merge_source_array_next(struct merge_source *base, struct tuple_format *format,
+			struct tuple **out);
+
+/* Non-virtual methods */
+
+static struct merge_source *
+merge_source_array_new(bool even)
+{
+	static struct merge_source_vtab merge_source_array_vtab = {
+		.destroy = merge_source_array_destroy,
+		.next = merge_source_array_next,
+	};
+
+	struct merge_source_array *source = malloc(
+		sizeof(struct merge_source_array));
+	assert(source != NULL);
+
+	merge_source_create(&source->base, &merge_source_array_vtab);
+
+	uint32_t tuple_size = 2;
+	const uint32_t tuple_count = 2;
+	/* [1], [3] */
+	static const char *data_odd[] = {"\x91\x01", "\x91\x03"};
+	/* [2], [4] */
+	static const char *data_even[] = {"\x91\x02", "\x91\x04"};
+	const char **data = even ? data_even : data_odd;
+	source->tuples = malloc(sizeof(struct tuple *) * tuple_count);
+	assert(source->tuples != NULL);
+	struct tuple_format *format = tuple_format_runtime;
+	for (uint32_t i = 0; i < tuple_count; ++i) {
+		const char *end = data[i] + tuple_size;
+		source->tuples[i] = tuple_new(format, data[i], end);
+		tuple_ref(source->tuples[i]);
+	}
+	source->tuple_count = tuple_count;
+	source->cur = 0;
+
+	return &source->base;
+}
+
+/* Virtual methods */
+
+static void
+merge_source_array_destroy(struct merge_source *base)
+{
+	struct merge_source_array *source = container_of(base,
+		struct merge_source_array, base);
+
+	for (uint32_t i = 0; i < source->tuple_count; ++i)
+		tuple_unref(source->tuples[i]);
+
+	free(source->tuples);
+	free(source);
+}
+
+static int
+merge_source_array_next(struct merge_source *base, struct tuple_format *format,
+			struct tuple **out)
+{
+	struct merge_source_array *source = container_of(base,
+		struct merge_source_array, base);
+
+	if (source->cur == source->tuple_count) {
+		*out = NULL;
+		return 0;
+	}
+
+	struct tuple *tuple = source->tuples[source->cur];
+	assert(tuple != NULL);
+
+	/*
+	 * Note: The source still stores the tuple (and will
+	 * unreference it during destroy). Here we should give a
+	 * referenced tuple (so a caller should unreference it on
+	 * its side).
+	 */
+	tuple_ref(tuple);
+
+	*out = tuple;
+	++source->cur;
+	return 0;
+}
+
+/* }}} */
+
+static struct key_part_def key_part_unsigned = {
+	.fieldno = 0,
+	.type = FIELD_TYPE_UNSIGNED,
+	.coll_id = COLL_NONE,
+	.is_nullable = false,
+	.nullable_action = ON_CONFLICT_ACTION_DEFAULT,
+	.sort_order = SORT_ORDER_ASC,
+	.path = NULL,
+};
+
+static struct key_part_def key_part_integer = {
+	.fieldno = 0,
+	.type = FIELD_TYPE_INTEGER,
+	.coll_id = COLL_NONE,
+	.is_nullable = false,
+	.nullable_action = ON_CONFLICT_ACTION_DEFAULT,
+	.sort_order = SORT_ORDER_ASC,
+	.path = NULL,
+};
+
+uint32_t
+min_u32(uint32_t a, uint32_t b)
+{
+	return a < b ? a : b;
+}
+
+void
+check_tuple(struct tuple *tuple, struct tuple_format *format,
+	    const char *exp_data, uint32_t exp_data_len, const char *case_name)
+{
+	uint32_t size;
+	const char *data = tuple_data_range(tuple, &size);
+
+	ok(tuple != NULL, "%s: tuple != NULL", case_name);
+	if (format == NULL) {
+		ok(true, "%s: skip tuple validation", case_name);
+	} else {
+		int rc = tuple_validate(format, tuple);
+		is(rc, 0, "%s: validate tuple", case_name);
+	}
+	is(size, exp_data_len, "%s: check tuple size", case_name);
+	ok(!strncmp(data, exp_data, min_u32(size, exp_data_len)),
+	   "%s: check tuple data", case_name);
+}
+
+/**
+ * Check array source itself (just in case).
+ */
+int
+test_array_source(struct tuple_format *format)
+{
+	plan(9);
+	header();
+
+	/* [1], [3] */
+	const uint32_t exp_tuple_size = 2;
+	const uint32_t exp_tuple_count = 2;
+	static const char *exp_tuples_data[] = {"\x91\x01", "\x91\x03"};
+
+	struct merge_source *source = merge_source_array_new(false);
+	assert(source != NULL);
+
+	struct tuple *tuple = NULL;
+	const char *msg = format == NULL ?
+		"array source next() (any format)" :
+		"array source next() (user's format)";
+	for (uint32_t i = 0; i < exp_tuple_count; ++i) {
+		int rc = merge_source_next(source, format, &tuple);
+		(void) rc;
+		assert(rc == 0);
+		check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size,
+			    msg);
+		tuple_unref(tuple);
+	}
+	int rc = merge_source_next(source, format, &tuple);
+	(void) rc;
+	assert(rc == 0);
+	is(tuple, NULL, format == NULL ?
+	   "array source is empty (any format)" :
+	   "array source is empty (user's format)");
+
+	merge_source_unref(source);
+
+	footer();
+	return check_plan();
+}
+
+int
+test_merger(struct tuple_format *format)
+{
+	plan(17);
+	header();
+
+	/* [1], [2], [3], [4] */
+	const uint32_t exp_tuple_size = 2;
+	const uint32_t exp_tuple_count = 4;
+	static const char *exp_tuples_data[] = {
+		"\x91\x01", "\x91\x02", "\x91\x03", "\x91\x04",
+	};
+
+	const uint32_t source_count = 2;
+	struct merge_source *sources[] = {
+		merge_source_array_new(false),
+		merge_source_array_new(true),
+	};
+
+	struct key_def *key_def = key_def_new(&key_part_unsigned, 1);
+	struct merge_source *merger = merger_new(key_def, sources, source_count,
+						 false);
+	key_def_delete(key_def);
+
+	struct tuple *tuple = NULL;
+	const char *msg = format == NULL ?
+		"merger next() (any format)" :
+		"merger next() (user's format)";
+	for (uint32_t i = 0; i < exp_tuple_count; ++i) {
+		int rc = merge_source_next(merger, format, &tuple);
+		(void) rc;
+		assert(rc == 0);
+		check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size,
+			    msg);
+		tuple_unref(tuple);
+	}
+	int rc = merge_source_next(merger, format, &tuple);
+	(void) rc;
+	assert(rc == 0);
+	is(tuple, NULL, format == NULL ?
+	   "merger is empty (any format)" :
+	   "merger is empty (user's format)");
+
+	merge_source_unref(merger);
+	merge_source_unref(sources[0]);
+	merge_source_unref(sources[1]);
+
+	footer();
+	return check_plan();
+}
+
+int
+test_basic()
+{
+	plan(4);
+	header();
+
+	struct key_def *key_def = key_def_new(&key_part_integer, 1);
+	struct tuple_format *format = box_tuple_format_new(&key_def, 1);
+	assert(format != NULL);
+
+	test_array_source(NULL);
+	test_array_source(format);
+	test_merger(NULL);
+	test_merger(format);
+
+	key_def_delete(key_def);
+	tuple_format_unref(format);
+
+	footer();
+	return check_plan();
+}
+
+int
+main()
+{
+	memory_init();
+	fiber_init(fiber_c_invoke);
+	tuple_init(NULL);
+
+	int rc = test_basic();
+
+	tuple_free();
+	fiber_free();
+	memory_free();
+
+	return rc;
+}
-- 
2.21.0

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH v4 4/4] Add merger for tuple streams (Lua part)
  2019-05-07 22:30 [PATCH v4 0/4] Merger Alexander Turenko
                   ` (2 preceding siblings ...)
  2019-05-07 22:30 ` [PATCH v4 3/4] Add merger for tuples streams (C part) Alexander Turenko
@ 2019-05-07 22:30 ` Alexander Turenko
  2019-05-13 14:49   ` Vladimir Davydov
  3 siblings, 1 reply; 9+ messages in thread
From: Alexander Turenko @ 2019-05-07 22:30 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches

Fixes #3276.

@TarantoolBot document
Title: Merger for tuple streams

The main concept of the merger is a source. It is an object that
provides a stream of tuples. There are four types of sources: a tuple
source, a table source, a buffer source and a merger itself.

A tuple source just return one tuple. However this source (as well as a
table and a buffer ones) supports fetching of a next data chunk, so the
API allows to create it from a Lua iterator:
`merger.new_tuple_source(gen, param, state)`. A `gen` function should
return `state, tuple` on each call and then return `nil` when no more
tuples available. Consider the example:

```lua
box.cfg({})
box.schema.space.create('s')
box.space.s:create_index('pk')
box.space.s:insert({1})
box.space.s:insert({2})
box.space.s:insert({3})

s = merger.new_tuple_source(box.space.s:pairs())
s:select()
---
- - [1]
  - [2]
  - [3]
...

s = merger.new_tuple_source(box.space.s:pairs())
s:pairs():totable()
---
- - [1]
  - [2]
  - [3]
...
```

As we see a source (it is common for all sources) has `:select()` and
`:pairs()` methods. The first one has two options: `buffer` and `limit`
with the same meaning as ones in net.box `:select()`. The `:pairs()`
method (or `:ipairs()` alias) returns a luafun iterator (it is a Lua
iterator, but also provides a set of handy methods to operate in
functional style).

The same API exists to create a table and a buffer source:
`merger.new_table_source(gen, param, state)` and
`merger.new_buffer_source(gen, param, state)`. A `gen` function should
return a table or a buffer on each call.

There are also helpers that are useful when all data are available at
once: `merger.new_source_fromtable(tbl)` and
`merger.new_source_frombuffer(buf)`.

A merger is a special kind of a source, which is created from a key_def
object and a set of sources. It performs a kind of the merge sort:
chooses a source with a minimal / maximal tuple on each step, consumes
a tuple from this source and repeats. The API to create a merger is the
following:

```lua
local key_def_lib = require('key_def')
local merger = require('merger')

local key_def = key_def_lib.new(<...>)
local sources = {<...>}
local merger_inst = merger.new(key_def, sources, {
    -- Ascending (false) or descending (true) order.
    -- Default is ascending.
    reverse = <boolean> or <nil>,
})
```

An instance of a merger has the same `:select()` and `:pairs()` methods
as any other source.

The `key_def_lib.new()` function takes a table of key parts as an
argument in the same format as box.space.<...>.index.<...>.parts or
conn.space.<...>.index.<...>.parts (where conn is a net.box connection):

```
local key_parts = {
    {
        fieldno = <number>,
        type = <string>,
        [ is_nullable = <boolean>, ]
        [ collation_id = <number>, ]
        [ collation = <string>, ]
    },
    ...
}
local key_def = key_def_lib.new(key_parts)
```

A key_def can be cached across requests with the same ordering rules
(typically requests to a same space).
---
 src/box/CMakeLists.txt       |    2 +
 src/box/lua/init.c           |    7 +-
 src/box/lua/merger.c         | 1143 ++++++++++++++++++++++++++++++++++
 src/box/lua/merger.h         |   47 ++
 src/box/lua/merger.lua       |   41 ++
 test/box-tap/merger.test.lua |  768 +++++++++++++++++++++++
 6 files changed, 2007 insertions(+), 1 deletion(-)
 create mode 100644 src/box/lua/merger.c
 create mode 100644 src/box/lua/merger.h
 create mode 100644 src/box/lua/merger.lua
 create mode 100755 test/box-tap/merger.test.lua

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index ce328fb95..0864c3433 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -13,6 +13,7 @@ lua_source(lua_sources lua/upgrade.lua)
 lua_source(lua_sources lua/console.lua)
 lua_source(lua_sources lua/xlog.lua)
 lua_source(lua_sources lua/key_def.lua)
+lua_source(lua_sources lua/merger.lua)
 set(bin_sources)
 bin_source(bin_sources bootstrap.snap bootstrap.h)
 
@@ -144,6 +145,7 @@ add_library(box STATIC
     lua/xlog.c
     lua/execute.c
     lua/key_def.c
+    lua/merger.c
     ${bin_sources})
 
 target_link_libraries(box box_error tuple stat xrow xlog vclock crc32 scramble
diff --git a/src/box/lua/init.c b/src/box/lua/init.c
index 6b8be5096..76b987b4b 100644
--- a/src/box/lua/init.c
+++ b/src/box/lua/init.c
@@ -60,6 +60,7 @@
 #include "box/lua/tuple.h"
 #include "box/lua/execute.h"
 #include "box/lua/key_def.h"
+#include "box/lua/merger.h"
 
 extern char session_lua[],
 	tuple_lua[],
@@ -70,7 +71,8 @@ extern char session_lua[],
 	feedback_daemon_lua[],
 	net_box_lua[],
 	upgrade_lua[],
-	console_lua[];
+	console_lua[],
+	merger_lua[];
 
 static const char *lua_sources[] = {
 	"box/session", session_lua,
@@ -83,6 +85,7 @@ static const char *lua_sources[] = {
 	"box/load_cfg", load_cfg_lua,
 	"box/xlog", xlog_lua,
 	"box/key_def", key_def_lua,
+	"box/merger", merger_lua,
 	NULL
 };
 
@@ -317,6 +320,8 @@ box_lua_init(struct lua_State *L)
 	lua_pop(L, 1);
 	luaopen_key_def(L);
 	lua_pop(L, 1);
+	luaopen_merger(L);
+	lua_pop(L, 1);
 
 	/* Load Lua extension */
 	for (const char **s = lua_sources; *s; s += 2) {
diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c
new file mode 100644
index 000000000..a61adb389
--- /dev/null
+++ b/src/box/lua/merger.c
@@ -0,0 +1,1143 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "box/lua/merger.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <lua.h>             /* lua_*() */
+#include <lauxlib.h>         /* luaL_*() */
+
+#include "fiber.h"           /* fiber() */
+#include "diag.h"            /* diag_set() */
+
+#include "box/tuple.h"       /* tuple_format_runtime,
+				tuple_*(), ... */
+
+#include "lua/error.h"       /* luaT_error() */
+#include "lua/utils.h"       /* luaL_pushcdata(),
+				luaL_iterator_*() */
+
+#include "box/lua/key_def.h" /* luaT_check_key_def() */
+#include "box/lua/tuple.h"   /* luaT_tuple_new() */
+
+#include "small/ibuf.h"      /* struct ibuf */
+#include "msgpuck.h"         /* mp_*() */
+
+#include "box/merger.h"      /* merge_source_*, merger_*() */
+
+static uint32_t CTID_STRUCT_MERGE_SOURCE_REF = 0;
+static uint32_t CTID_STRUCT_IBUF = 0;
+
+/**
+ * A type of a function to create a source from a Lua iterator on
+ * a Lua stack.
+ *
+ * Such function is to be passed to lbox_merge_source_new() as
+ * a parameter.
+ */
+typedef struct merge_source *(*luaL_merge_source_new_f)(struct lua_State *L);
+
+/* {{{ Helpers */
+
+/**
+ * Extract an ibuf object from the Lua stack.
+ */
+static struct ibuf *
+luaT_check_ibuf(struct lua_State *L, int idx)
+{
+	if (lua_type(L, idx) != LUA_TCDATA)
+		return NULL;
+
+	uint32_t cdata_type;
+	struct ibuf *ibuf_ptr = luaL_checkcdata(L, idx, &cdata_type);
+	if (ibuf_ptr == NULL || cdata_type != CTID_STRUCT_IBUF)
+		return NULL;
+	return ibuf_ptr;
+}
+
+/**
+ * Extract a merge source from the Lua stack.
+ */
+static struct merge_source *
+luaT_check_merge_source(struct lua_State *L, int idx)
+{
+	uint32_t cdata_type;
+	struct merge_source **source_ptr = luaL_checkcdata(L, idx, &cdata_type);
+	if (source_ptr == NULL || cdata_type != CTID_STRUCT_MERGE_SOURCE_REF)
+		return NULL;
+	return *source_ptr;
+}
+
+/**
+ * Skip an array around tuples and save its length.
+ */
+static int
+decode_header(struct ibuf *buf, size_t *len_p)
+{
+	/* Check the buffer is correct. */
+	if (buf->rpos > buf->wpos)
+		return -1;
+
+	/* Skip decoding if the buffer is empty. */
+	if (ibuf_used(buf) == 0) {
+		*len_p = 0;
+		return 0;
+	}
+
+	/* Check and skip the array around tuples. */
+	int ok = mp_typeof(*buf->rpos) == MP_ARRAY;
+	if (ok)
+		ok = mp_check_array(buf->rpos, buf->wpos) <= 0;
+	if (ok)
+		*len_p = mp_decode_array((const char **) &buf->rpos);
+	return ok ? 0 : -1;
+}
+
+/**
+ * Encode an array around tuples.
+ */
+static void
+encode_header(struct ibuf *output_buffer, uint32_t result_len)
+{
+	ibuf_reserve(output_buffer, mp_sizeof_array(result_len));
+	output_buffer->wpos = mp_encode_array(output_buffer->wpos, result_len);
+}
+
+/**
+ * Get a tuple from a Lua stack.
+ *
+ * If a Lua table is on a specified index, create a tuple with
+ * provided format and return. If format is NULL use the runtime
+ * format.
+ *
+ * If a tuple is on a specified index, validate it against
+ * provided format (if it is not NULL) and return.
+ *
+ * In case of an error return NULL and set a diag.
+ */
+static struct tuple *
+luaT_gettuple(struct lua_State *L, int idx, struct tuple_format *format)
+{
+	struct tuple *tuple = luaT_istuple(L, idx);
+	if (tuple == NULL) {
+		/* Create a tuple from a Lua table. */
+		if (format == NULL)
+			format = tuple_format_runtime;
+		if ((tuple = luaT_tuple_new(L, idx, format)) == NULL)
+			return NULL;
+	} else {
+		/* Validate a tuple. */
+		if (format != NULL && tuple_validate(format, tuple) != 0)
+			return NULL;
+	}
+	return tuple;
+}
+
+/* }}} */
+
+/* {{{ Create, destroy structures from Lua */
+
+/**
+ * Free a merge source from a Lua code.
+ */
+static int
+lbox_merge_source_gc(struct lua_State *L)
+{
+	struct merge_source *source = luaT_check_merge_source(L, 1);
+	assert(source != NULL);
+	merge_source_unref(source);
+	return 0;
+}
+
+/**
+ * Create a new source from a Lua iterator and push it onto the
+ * Lua stack.
+ *
+ * It is the helper for lbox_merger_new_buffer_source(),
+ * lbox_merger_new_table_source() and
+ * lbox_merger_new_tuple_source().
+ */
+static int
+lbox_merge_source_new(struct lua_State *L, const char *func_name,
+		      luaL_merge_source_new_f luaL_merge_source_new)
+{
+	int top = lua_gettop(L);
+	if (top < 1 || top > 3 || !luaL_iscallable(L, 1))
+		return luaL_error(L, "Usage: %s(gen, param, state)", func_name);
+
+	/*
+	 * luaL_merge_source_new() reads exactly three top values.
+	 */
+	while (lua_gettop(L) < 3)
+		lua_pushnil(L);
+
+	struct merge_source *source = luaL_merge_source_new(L);
+	if (source == NULL) {
+		merge_source_unref(source);
+		return luaT_error(L);
+	}
+	*(struct merge_source **)
+		luaL_pushcdata(L, CTID_STRUCT_MERGE_SOURCE_REF) = source;
+	lua_pushcfunction(L, lbox_merge_source_gc);
+	luaL_setcdatagc(L, -2);
+
+	return 1;
+}
+
+/**
+ * Raise a Lua error with merger.new() usage info.
+ */
+static int
+lbox_merger_new_usage(struct lua_State *L, const char *param_name)
+{
+	static const char *usage = "merger.new(key_def, "
+				   "{source, source, ...}[, {"
+				   "reverse = <boolean> or <nil>}])";
+	if (param_name == NULL)
+		return luaL_error(L, "Bad params, use: %s", usage);
+	else
+		return luaL_error(L, "Bad param \"%s\", use: %s", param_name,
+				  usage);
+}
+
+/**
+ * Parse a second parameter of merger.new() into an array of
+ * sources.
+ *
+ * Return an array of pointers to sources and set @a
+ * source_count_ptr. In case of an error set a diag and return
+ * NULL.
+ *
+ * It is the helper for lbox_merger_new().
+ */
+static struct merge_source **
+luaT_merger_new_parse_sources(struct lua_State *L, int idx,
+			      uint32_t *source_count_ptr)
+{
+	/* Allocate sources array. */
+	uint32_t source_count = lua_objlen(L, idx);
+	const size_t sources_size = sizeof(struct merge_source *) *
+		source_count;
+	struct merge_source **sources = malloc(sources_size);
+	if (sources == NULL) {
+		diag_set(OutOfMemory, sources_size, "malloc", "sources");
+		return NULL;
+	}
+
+	/* Save all sources. */
+	for (uint32_t i = 0; i < source_count; ++i) {
+		lua_pushinteger(L, i + 1);
+		lua_gettable(L, idx);
+
+		/* Extract a source from a Lua stack. */
+		struct merge_source *source = luaT_check_merge_source(L, -1);
+		if (source == NULL) {
+			free(sources);
+			diag_set(IllegalParams,
+				 "Unknown source type at index %d", i + 1);
+			return NULL;
+		}
+		sources[i] = source;
+	}
+	lua_pop(L, source_count);
+
+	*source_count_ptr = source_count;
+	return sources;
+}
+
+/**
+ * Create a new merger and push it to a Lua stack as a merge
+ * source.
+ *
+ * Expect cdata<struct key_def>, a table of sources and
+ * (optionally) a table of options on a Lua stack.
+ */
+static int
+lbox_merger_new(struct lua_State *L)
+{
+	struct key_def *key_def;
+	int top = lua_gettop(L);
+	bool ok = (top == 2 || top == 3) &&
+		/* key_def. */
+		(key_def = luaT_check_key_def(L, 1)) != NULL &&
+		/* Sources. */
+		lua_istable(L, 2) == 1 &&
+		/* Opts. */
+		(lua_isnoneornil(L, 3) == 1 || lua_istable(L, 3) == 1);
+	if (!ok)
+		return lbox_merger_new_usage(L, NULL);
+
+	/* Options. */
+	bool reverse = false;
+
+	/* Parse options. */
+	if (!lua_isnoneornil(L, 3)) {
+		/* Parse reverse. */
+		lua_pushstring(L, "reverse");
+		lua_gettable(L, 3);
+		if (!lua_isnil(L, -1)) {
+			if (lua_isboolean(L, -1))
+				reverse = lua_toboolean(L, -1);
+			else
+				return lbox_merger_new_usage(L, "reverse");
+		}
+		lua_pop(L, 1);
+	}
+
+	uint32_t source_count = 0;
+	struct merge_source **sources = luaT_merger_new_parse_sources(L, 2,
+		&source_count);
+	if (sources == NULL)
+		return luaT_error(L);
+
+	struct merge_source *merger = merger_new(key_def, sources, source_count,
+						 reverse);
+	free(sources);
+	if (merger == NULL) {
+		merge_source_unref(merger);
+		return luaT_error(L);
+	}
+
+	*(struct merge_source **)
+		luaL_pushcdata(L, CTID_STRUCT_MERGE_SOURCE_REF) = merger;
+	lua_pushcfunction(L, lbox_merge_source_gc);
+	luaL_setcdatagc(L, -2);
+
+	return 1;
+}
+
+/* }}} */
+
+/* {{{ Buffer merge source */
+
+struct merge_source_buffer {
+	struct merge_source base;
+	/*
+	 * A reference to a Lua iterator to fetch a next chunk of
+	 * tuples.
+	 */
+	struct luaL_iterator *fetch_it;
+	/*
+	 * A reference to a buffer storing the current chunk of
+	 * tuples. It is needed to prevent LuaJIT from collecting
+	 * the buffer while the source consider it as the current
+	 * one.
+	 */
+	int ref;
+	/*
+	 * A buffer with a current chunk of tuples.
+	 */
+	struct ibuf *buf;
+	/*
+	 * A merger stops before end of a buffer when it is not
+	 * the last merger in the chain.
+	 */
+	size_t remaining_tuple_count;
+};
+
+/* Virtual methods declarations */
+
+static void
+luaL_merge_source_buffer_destroy(struct merge_source *base);
+static int
+luaL_merge_source_buffer_next(struct merge_source *base,
+			      struct tuple_format *format,
+			      struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Create a new merge source of the buffer type.
+ *
+ * Reads gen, param, state from the top of a Lua stack.
+ *
+ * In case of an error it returns NULL and sets a diag.
+ */
+static struct merge_source *
+luaL_merge_source_buffer_new(struct lua_State *L)
+{
+	static struct merge_source_vtab merge_source_buffer_vtab = {
+		.destroy = luaL_merge_source_buffer_destroy,
+		.next = luaL_merge_source_buffer_next,
+	};
+
+	struct merge_source_buffer *source = malloc(
+		sizeof(struct merge_source_buffer));
+	if (source == NULL) {
+		diag_set(OutOfMemory, sizeof(struct merge_source_buffer),
+			 "malloc", "merge_source_buffer");
+		return NULL;
+	}
+
+	merge_source_create(&source->base, &merge_source_buffer_vtab);
+
+	source->fetch_it = luaL_iterator_new(L, 0);
+	source->ref = 0;
+	source->buf = NULL;
+	source->remaining_tuple_count = 0;
+
+	return &source->base;
+}
+
+/**
+ * Call a user provided function to get a next data chunk (a
+ * buffer).
+ *
+ * Return 1 when a new buffer is received, 0 when a buffers
+ * iterator ends and -1 at error and set a diag.
+ */
+static int
+luaL_merge_source_buffer_fetch(struct merge_source_buffer *source)
+{
+	struct lua_State *L = fiber()->storage.lua.stack;
+	int nresult = luaL_iterator_next(L, source->fetch_it);
+
+	/* Handle a Lua error in a gen function. */
+	if (nresult == -1)
+		return -1;
+
+	/* No more data: do nothing. */
+	if (nresult == 0)
+		return 0;
+
+	/* Handle incorrect results count. */
+	if (nresult != 2) {
+		diag_set(IllegalParams, "Expected <state>, <buffer>, got %d "
+			 "return values", nresult);
+		return -1;
+	}
+
+	/* Set a new buffer as the current chunk. */
+	if (source->ref > 0) {
+		luaL_unref(L, LUA_REGISTRYINDEX, source->ref);
+		source->ref = 0;
+	}
+	lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */
+	source->buf = luaT_check_ibuf(L, -1);
+	if (source->buf == NULL) {
+		diag_set(IllegalParams, "Expected <state>, <buffer>");
+		return -1;
+	}
+	source->ref = luaL_ref(L, LUA_REGISTRYINDEX);
+	lua_pop(L, nresult);
+
+	/* Update remaining_tuple_count and skip the header. */
+	if (decode_header(source->buf, &source->remaining_tuple_count) != 0) {
+		diag_set(IllegalParams, "Invalid merge source %p",
+			 &source->base);
+		return -1;
+	}
+	return 1;
+}
+
+/* Virtual methods */
+
+static void
+luaL_merge_source_buffer_destroy(struct merge_source *base)
+{
+	struct merge_source_buffer *source = container_of(base,
+		struct merge_source_buffer, base);
+
+	assert(source->fetch_it != NULL);
+	luaL_iterator_delete(source->fetch_it);
+	if (source->ref > 0)
+		luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref);
+
+	free(source);
+}
+
+static int
+luaL_merge_source_buffer_next(struct merge_source *base,
+			      struct tuple_format *format,
+			      struct tuple **out)
+{
+	struct merge_source_buffer *source = container_of(base,
+		struct merge_source_buffer, base);
+
+	/*
+	 * Handle the case when all data were processed: ask a
+	 * next chunk until a non-empty chunk is received or a
+	 * chunks iterator ends.
+	 */
+	while (source->remaining_tuple_count == 0) {
+		int rc = luaL_merge_source_buffer_fetch(source);
+		if (rc < 0)
+			return -1;
+		if (rc == 0) {
+			*out = NULL;
+			return 0;
+		}
+	}
+	if (ibuf_used(source->buf) == 0) {
+		diag_set(IllegalParams, "Unexpected msgpack buffer end");
+		return -1;
+	}
+	const char *tuple_beg = source->buf->rpos;
+	const char *tuple_end = tuple_beg;
+	if (mp_check(&tuple_end, source->buf->wpos) != 0) {
+		diag_set(IllegalParams, "Unexpected msgpack buffer end");
+		return -1;
+	}
+	--source->remaining_tuple_count;
+	source->buf->rpos = (char *) tuple_end;
+	if (format == NULL)
+		format = tuple_format_runtime;
+	struct tuple *tuple = tuple_new(format, tuple_beg, tuple_end);
+	if (tuple == NULL)
+		return -1;
+
+	tuple_ref(tuple);
+	*out = tuple;
+	return 0;
+}
+
+/* Lua functions */
+
+/**
+ * Create a new buffer source and push it onto the Lua stack.
+ */
+static int
+lbox_merger_new_buffer_source(struct lua_State *L)
+{
+	return lbox_merge_source_new(L, "merger.new_buffer_source",
+				     luaL_merge_source_buffer_new);
+}
+
+/* }}} */
+
+/* {{{ Table merge source */
+
+struct merge_source_table {
+	struct merge_source base;
+	/*
+	 * A reference to a Lua iterator to fetch a next chunk of
+	 * tuples.
+	 */
+	struct luaL_iterator *fetch_it;
+	/*
+	 * A reference to a table with a current chunk of tuples.
+	 */
+	int ref;
+	/* An index of current tuples within a current chunk. */
+	int next_idx;
+};
+
+/* Virtual methods declarations */
+
+static void
+luaL_merge_source_table_destroy(struct merge_source *base);
+static int
+luaL_merge_source_table_next(struct merge_source *base,
+			     struct tuple_format *format,
+			     struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Create a new merge source of the table type.
+ *
+ * In case of an error it returns NULL and set a diag.
+ */
+static struct merge_source *
+luaL_merge_source_table_new(struct lua_State *L)
+{
+	static struct merge_source_vtab merge_source_table_vtab = {
+		.destroy = luaL_merge_source_table_destroy,
+		.next = luaL_merge_source_table_next,
+	};
+
+	struct merge_source_table *source = malloc(
+		sizeof(struct merge_source_table));
+	if (source == NULL) {
+		diag_set(OutOfMemory, sizeof(struct merge_source_table),
+			 "malloc", "merge_source_table");
+		return NULL;
+	}
+
+	merge_source_create(&source->base, &merge_source_table_vtab);
+
+	source->fetch_it = luaL_iterator_new(L, 0);
+	source->ref = 0;
+	source->next_idx = 1;
+
+	return &source->base;
+}
+
+/**
+ * Call a user provided function to fill the source.
+ *
+ * Return 0 when a tables iterator ends, 1 when a new table is
+ * received and -1 at an error (set a diag).
+ */
+static int
+luaL_merge_source_table_fetch(struct merge_source_table *source)
+{
+	struct lua_State *L = fiber()->storage.lua.stack;
+	int nresult = luaL_iterator_next(L, source->fetch_it);
+
+	/* Handle a Lua error in a gen function. */
+	if (nresult == -1)
+		return -1;
+
+	/* No more data: do nothing. */
+	if (nresult == 0)
+		return 0;
+
+	/* Handle incorrect results count. */
+	if (nresult != 2) {
+		diag_set(IllegalParams, "Expected <state>, <table>, got %d "
+			 "return values", nresult);
+		return -1;
+	}
+
+	/* Set a new table as the current chunk. */
+	if (source->ref > 0) {
+		luaL_unref(L, LUA_REGISTRYINDEX, source->ref);
+		source->ref = 0;
+	}
+	lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */
+	if (lua_istable(L, -1) == 0) {
+		diag_set(IllegalParams, "Expected <state>, <table>");
+		return -1;
+	}
+	source->ref = luaL_ref(L, LUA_REGISTRYINDEX);
+	source->next_idx = 1;
+	lua_pop(L, nresult);
+
+	return 1;
+}
+
+/* Virtual methods */
+
+static void
+luaL_merge_source_table_destroy(struct merge_source *base)
+{
+	struct merge_source_table *source = container_of(base,
+		struct merge_source_table, base);
+
+	assert(source->fetch_it != NULL);
+	luaL_iterator_delete(source->fetch_it);
+	if (source->ref > 0)
+		luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref);
+
+	free(source);
+}
+
+static int
+luaL_merge_source_table_next(struct merge_source *base,
+			     struct tuple_format *format,
+			     struct tuple **out)
+{
+	struct lua_State *L = fiber()->storage.lua.stack;
+	struct merge_source_table *source = container_of(base,
+		struct merge_source_table, base);
+
+	if (source->ref > 0) {
+		lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref);
+		lua_pushinteger(L, source->next_idx);
+		lua_gettable(L, -2);
+	}
+	/*
+	 * If all data were processed, try to fetch more.
+	 */
+	while (source->ref == 0 || lua_isnil(L, -1)) {
+		if (source->ref > 0)
+			lua_pop(L, 2);
+		int rc = luaL_merge_source_table_fetch(source);
+		if (rc < 0)
+			return -1;
+		if (rc == 0) {
+			*out = NULL;
+			return 0;
+		}
+		/*
+		 * Retry tuple extracting when a next table is
+		 * received.
+		 */
+		lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref);
+		lua_pushinteger(L, source->next_idx);
+		lua_gettable(L, -2);
+	}
+
+	struct tuple *tuple = luaT_gettuple(L, -1, format);
+	if (tuple == NULL)
+		return -1;
+
+	++source->next_idx;
+	lua_pop(L, 2);
+
+	tuple_ref(tuple);
+	*out = tuple;
+	return 0;
+}
+
+/* Lua functions */
+
+/**
+ * Create a new table source and push it onto the Lua stack.
+ */
+static int
+lbox_merger_new_table_source(struct lua_State *L)
+{
+	return lbox_merge_source_new(L, "merger.new_table_source",
+				     luaL_merge_source_table_new);
+}
+
+/* }}} */
+
+/* {{{ Tuple merge source */
+
+struct merge_source_tuple {
+	struct merge_source base;
+	/*
+	 * A reference to a Lua iterator to fetch a next chunk of
+	 * tuples.
+	 */
+	struct luaL_iterator *fetch_it;
+};
+
+/* Virtual methods declarations */
+
+static void
+luaL_merge_source_tuple_destroy(struct merge_source *base);
+static int
+luaL_merge_source_tuple_next(struct merge_source *base,
+			     struct tuple_format *format,
+			     struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Create a new merge source of the tuple type.
+ *
+ * In case of an error it returns NULL and set a diag.
+ */
+static struct merge_source *
+luaL_merge_source_tuple_new(struct lua_State *L)
+{
+	static struct merge_source_vtab merge_source_tuple_vtab = {
+		.destroy = luaL_merge_source_tuple_destroy,
+		.next = luaL_merge_source_tuple_next,
+	};
+
+	struct merge_source_tuple *source = malloc(
+		sizeof(struct merge_source_tuple));
+	if (source == NULL) {
+		diag_set(OutOfMemory, sizeof(struct merge_source_tuple),
+			 "malloc", "merge_source_tuple");
+		return NULL;
+	}
+
+	merge_source_create(&source->base, &merge_source_tuple_vtab);
+
+	source->fetch_it = luaL_iterator_new(L, 0);
+
+	return &source->base;
+}
+
+/**
+ * Call a user provided function to fill the source.
+ *
+ * This function does not check whether a user-provided value
+ * is a tuple. A called should check it on its side.
+ *
+ * Return 1 at success and push a resulting tuple to a the Lua
+ * stack.
+ * Return 0 when no more data.
+ * Return -1 at error (set a diag).
+ */
+static int
+luaL_merge_source_tuple_fetch(struct merge_source_tuple *source,
+			       struct lua_State *L)
+{
+	int nresult = luaL_iterator_next(L, source->fetch_it);
+
+	/* Handle a Lua error in a gen function. */
+	if (nresult == -1)
+		return -1;
+
+	/* No more data: do nothing. */
+	if (nresult == 0)
+		return 0;
+
+	/* Handle incorrect results count. */
+	if (nresult != 2) {
+		diag_set(IllegalParams, "Expected <state>, <tuple> got %d "
+			 "return values", nresult);
+		return -1;
+	}
+
+	/* Set a new tuple as the current chunk. */
+	lua_insert(L, -2); /* Swap state and tuple. */
+	lua_pop(L, 1); /* Pop state. */
+
+	return 1;
+}
+
+/* Virtual methods */
+
+static void
+luaL_merge_source_tuple_destroy(struct merge_source *base)
+{
+	struct merge_source_tuple *source = container_of(base,
+		struct merge_source_tuple, base);
+
+	assert(source->fetch_it != NULL);
+	luaL_iterator_delete(source->fetch_it);
+
+	free(source);
+}
+
+static int
+luaL_merge_source_tuple_next(struct merge_source *base,
+			     struct tuple_format *format,
+			     struct tuple **out)
+{
+	struct lua_State *L = fiber()->storage.lua.stack;
+	struct merge_source_tuple *source = container_of(base,
+		struct merge_source_tuple, base);
+
+	int rc = luaL_merge_source_tuple_fetch(source, L);
+	if (rc < 0)
+		return -1;
+	/*
+	 * Check whether a tuple appears after the fetch.
+	 */
+	if (rc == 0) {
+		*out = NULL;
+		return 0;
+	}
+
+	struct tuple *tuple = luaT_gettuple(L, -1, format);
+	if (tuple == NULL)
+		return -1;
+
+	lua_pop(L, 1);
+	tuple_ref(tuple);
+	*out = tuple;
+	return 0;
+}
+
+/* Lua functions */
+
+/**
+ * Create a new tuple source and push it onto the Lua stack.
+ */
+static int
+lbox_merger_new_tuple_source(struct lua_State *L)
+{
+	return lbox_merge_source_new(L, "merger.new_tuple_source",
+				     luaL_merge_source_tuple_new);
+}
+
+/* }}} */
+
+/* {{{ Merge source Lua methods */
+
+/**
+ * Iterator gen function to traverse source results.
+ *
+ * Expected a nil as the first parameter (param) and a
+ * merge_source as the second parameter (state) on a Lua stack.
+ *
+ * Push the original merge_source (as a new state) and a next
+ * tuple.
+ */
+static int
+lbox_merge_source_gen(struct lua_State *L)
+{
+	struct merge_source *source;
+	bool ok = lua_gettop(L) == 2 && lua_isnil(L, 1) &&
+		(source = luaT_check_merge_source(L, 2)) != NULL;
+	if (!ok)
+		return luaL_error(L, "Bad params, use: lbox_merge_source_gen("
+				  "nil, merge_source)");
+
+	struct tuple *tuple;
+	if (merge_source_next(source, NULL, &tuple) != 0)
+		return luaT_error(L);
+	if (tuple == NULL) {
+		lua_pushnil(L);
+		lua_pushnil(L);
+		return 2;
+	}
+
+	/* Push merge_source, tuple. */
+	*(struct merge_source **)
+		luaL_pushcdata(L, CTID_STRUCT_MERGE_SOURCE_REF) = source;
+	luaT_pushtuple(L, tuple);
+
+	/*
+	 * luaT_pushtuple() references the tuple, so we
+	 * unreference it on merger's side.
+	 */
+	tuple_unref(tuple);
+
+	return 2;
+}
+
+/**
+ * Iterate over merge source results from Lua.
+ *
+ * Push three values to the Lua stack:
+ *
+ * 1. gen (lbox_merge_source_gen wrapped by fun.wrap());
+ * 2. param (nil);
+ * 3. state (merge_source).
+ */
+static int
+lbox_merge_source_ipairs(struct lua_State *L)
+{
+	struct merge_source *source;
+	bool ok = lua_gettop(L) == 1 &&
+		(source = luaT_check_merge_source(L, 1)) != NULL;
+	if (!ok)
+		return luaL_error(L, "Usage: merge_source:ipairs()");
+	/* Stack: merge_source. */
+
+	luaL_loadstring(L, "return require('fun').wrap");
+	lua_call(L, 0, 1);
+	lua_insert(L, -2); /* Swap merge_source and wrap. */
+	/* Stack: wrap, merge_source. */
+
+	lua_pushcfunction(L, lbox_merge_source_gen);
+	lua_insert(L, -2); /* Swap merge_source and gen. */
+	/* Stack: wrap, gen, merge_source. */
+
+	/*
+	 * Push nil as an iterator param, because all needed state
+	 * is in a merge source.
+	 */
+	lua_pushnil(L);
+	/* Stack: wrap, gen, merge_source, nil. */
+
+	lua_insert(L, -2); /* Swap merge_source and nil. */
+	/* Stack: wrap, gen, nil, merge_source. */
+
+	/* Call fun.wrap(gen, nil, merge_source). */
+	lua_call(L, 3, 3);
+	return 3;
+}
+
+/**
+ * Write source results into ibuf.
+ *
+ * It is the helper for lbox_merge_source_select().
+ */
+static int
+encode_result_buffer(struct lua_State *L, struct merge_source *source,
+		     struct ibuf *output_buffer, uint32_t limit)
+{
+	uint32_t result_len = 0;
+	uint32_t result_len_offset = 4;
+
+	/*
+	 * Reserve maximum size for the array around resulting
+	 * tuples to set it later.
+	 */
+	encode_header(output_buffer, UINT32_MAX);
+
+	/* Fetch, merge and copy tuples to the buffer. */
+	struct tuple *tuple;
+	int rc = 0;
+	while (result_len < limit && (rc =
+	       merge_source_next(source, NULL, &tuple)) == 0 &&
+	       tuple != NULL) {
+		uint32_t bsize = tuple->bsize;
+		ibuf_reserve(output_buffer, bsize);
+		memcpy(output_buffer->wpos, tuple_data(tuple), bsize);
+		output_buffer->wpos += bsize;
+		result_len_offset += bsize;
+		++result_len;
+
+		/* The received tuple is not more needed. */
+		tuple_unref(tuple);
+	}
+
+	if (rc != 0)
+		return luaT_error(L);
+
+	/* Write the real array size. */
+	mp_store_u32(output_buffer->wpos - result_len_offset, result_len);
+
+	return 0;
+}
+
+/**
+ * Write source results into a new Lua table.
+ *
+ * It is the helper for lbox_merge_source_select().
+ */
+static int
+create_result_table(struct lua_State *L, struct merge_source *source,
+		    uint32_t limit)
+{
+	/* Create result table. */
+	lua_newtable(L);
+
+	uint32_t cur = 1;
+
+	/* Fetch, merge and save tuples to the table. */
+	struct tuple *tuple;
+	int rc = 0;
+	while (cur - 1 < limit && (rc =
+	       merge_source_next(source, NULL, &tuple)) == 0 &&
+	       tuple != NULL) {
+		luaT_pushtuple(L, tuple);
+		lua_rawseti(L, -2, cur);
+		++cur;
+
+		/*
+		 * luaT_pushtuple() references the tuple, so we
+		 * unreference it on merger's side.
+		 */
+		tuple_unref(tuple);
+	}
+
+	if (rc != 0)
+		return luaT_error(L);
+
+	return 1;
+}
+
+/**
+ * Raise a Lua error with merger_inst:select() usage info.
+ */
+static int
+lbox_merge_source_select_usage(struct lua_State *L, const char *param_name)
+{
+	static const char *usage = "merge_source:select([{"
+				   "buffer = <cdata<struct ibuf>> or <nil>, "
+				   "limit = <number> or <nil>}])";
+	if (param_name == NULL)
+		return luaL_error(L, "Bad params, use: %s", usage);
+	else
+		return luaL_error(L, "Bad param \"%s\", use: %s", param_name,
+				  usage);
+}
+
+/**
+ * Pull results of a merge source to a Lua stack.
+ *
+ * Write results into a buffer or a Lua table depending on
+ * options.
+ *
+ * Expected a merge source and options (optional) on a Lua stack.
+ *
+ * Return a Lua table or nothing when a 'buffer' option is
+ * provided.
+ */
+static int
+lbox_merge_source_select(struct lua_State *L)
+{
+	struct merge_source *source;
+	int top = lua_gettop(L);
+	bool ok = (top == 1 || top == 2) &&
+		/* Merge source. */
+		(source = luaT_check_merge_source(L, 1)) != NULL &&
+		/* Opts. */
+		(lua_isnoneornil(L, 2) == 1 || lua_istable(L, 2) == 1);
+	if (!ok)
+		return lbox_merge_source_select_usage(L, NULL);
+
+	uint32_t limit = UINT32_MAX;
+	struct ibuf *output_buffer = NULL;
+
+	/* Parse options. */
+	if (!lua_isnoneornil(L, 2)) {
+		/* Parse buffer. */
+		lua_pushstring(L, "buffer");
+		lua_gettable(L, 2);
+		if (!lua_isnil(L, -1)) {
+			if ((output_buffer = luaT_check_ibuf(L, -1)) == NULL)
+				return lbox_merge_source_select_usage(L,
+					"buffer");
+		}
+		lua_pop(L, 1);
+
+		/* Parse limit. */
+		lua_pushstring(L, "limit");
+		lua_gettable(L, 2);
+		if (!lua_isnil(L, -1)) {
+			if (lua_isnumber(L, -1))
+				limit = lua_tointeger(L, -1);
+			else
+				return lbox_merge_source_select_usage(L,
+					"limit");
+		}
+		lua_pop(L, 1);
+	}
+
+	if (output_buffer == NULL)
+		return create_result_table(L, source, limit);
+	else
+		return encode_result_buffer(L, source, output_buffer, limit);
+}
+
+/* }}} */
+
+/**
+ * Register the module.
+ */
+LUA_API int
+luaopen_merger(struct lua_State *L)
+{
+	luaL_cdef(L, "struct merge_source;");
+	luaL_cdef(L, "struct ibuf;");
+
+	CTID_STRUCT_MERGE_SOURCE_REF = luaL_ctypeid(L, "struct merge_source&");
+	CTID_STRUCT_IBUF = luaL_ctypeid(L, "struct ibuf");
+
+	/* Export C functions to Lua. */
+	static const struct luaL_Reg meta[] = {
+		{"new_buffer_source", lbox_merger_new_buffer_source},
+		{"new_table_source", lbox_merger_new_table_source},
+		{"new_tuple_source", lbox_merger_new_tuple_source},
+		{"new", lbox_merger_new},
+		{NULL, NULL}
+	};
+	luaL_register_module(L, "merger", meta);
+
+	/* Add internal.{select,ipairs}(). */
+	lua_newtable(L); /* merger.internal */
+	lua_pushcfunction(L, lbox_merge_source_select);
+	lua_setfield(L, -2, "select");
+	lua_pushcfunction(L, lbox_merge_source_ipairs);
+	lua_setfield(L, -2, "ipairs");
+	lua_setfield(L, -2, "internal");
+
+	return 1;
+}
diff --git a/src/box/lua/merger.h b/src/box/lua/merger.h
new file mode 100644
index 000000000..c3f648678
--- /dev/null
+++ b/src/box/lua/merger.h
@@ -0,0 +1,47 @@
+#ifndef TARANTOOL_BOX_LUA_MERGER_H_INCLUDED
+#define TARANTOOL_BOX_LUA_MERGER_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct lua_State;
+
+int
+luaopen_merger(struct lua_State *L);
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_BOX_LUA_MERGER_H_INCLUDED */
diff --git a/src/box/lua/merger.lua b/src/box/lua/merger.lua
new file mode 100644
index 000000000..11abd1a25
--- /dev/null
+++ b/src/box/lua/merger.lua
@@ -0,0 +1,41 @@
+local ffi = require('ffi')
+local fun = require('fun')
+local merger = require('merger')
+
+local ibuf_t = ffi.typeof('struct ibuf')
+local merge_source_t = ffi.typeof('struct merge_source')
+
+-- Create a source from one buffer.
+merger.new_source_frombuffer = function(buf)
+    local func_name = 'merger.new_source_frombuffer'
+    if type(buf) ~= 'cdata' or not ffi.istype(ibuf_t, buf) then
+        error(('Usage: %s(<cdata<struct ibuf>>)'):format(func_name), 0)
+    end
+
+    return merger.new_buffer_source(fun.iter({buf}))
+end
+
+-- Create a source from one table.
+merger.new_source_fromtable = function(tbl)
+    local func_name = 'merger.new_source_fromtable'
+    if type(tbl) ~= 'table' then
+        error(('Usage: %s(<table>)'):format(func_name), 0)
+    end
+
+    return merger.new_table_source(fun.iter({tbl}))
+end
+
+local methods = {
+    ['select'] = merger.internal.select,
+    ['pairs']  = merger.internal.ipairs,
+    ['ipairs']  = merger.internal.ipairs,
+}
+
+ffi.metatype(merge_source_t, {
+    __index = function(self, key)
+        return methods[key]
+    end,
+    -- Lua 5.2 compatibility
+    __pairs = merger.internal.ipairs,
+    __ipairs = merger.internal.ipairs,
+})
diff --git a/test/box-tap/merger.test.lua b/test/box-tap/merger.test.lua
new file mode 100755
index 000000000..ee9eaeaed
--- /dev/null
+++ b/test/box-tap/merger.test.lua
@@ -0,0 +1,768 @@
+#!/usr/bin/env tarantool
+
+local tap = require('tap')
+local buffer = require('buffer')
+local msgpackffi = require('msgpackffi')
+local digest = require('digest')
+local key_def_lib = require('key_def')
+local merger = require('merger')
+local fiber = require('fiber')
+local utf8 = require('utf8')
+local ffi = require('ffi')
+local fun = require('fun')
+
+-- A chunk size for table and buffer sources. A chunk size for
+-- tuple source is always 1.
+local FETCH_BLOCK_SIZE = 10
+
+local function merger_new_usage(param)
+    local msg = 'merger.new(key_def, ' ..
+        '{source, source, ...}[, {' ..
+        'reverse = <boolean> or <nil>}])'
+    if not param then
+        return ('Bad params, use: %s'):format(msg)
+    else
+        return ('Bad param "%s", use: %s'):format(param, msg)
+    end
+end
+
+local function merger_select_usage(param)
+    local msg = 'merge_source:select([{' ..
+                'buffer = <cdata<struct ibuf>> or <nil>, ' ..
+                'limit = <number> or <nil>}])'
+    if not param then
+        return ('Bad params, use: %s'):format(msg)
+    else
+        return ('Bad param "%s", use: %s'):format(param, msg)
+    end
+end
+
+-- Get buffer with data encoded without last 'trunc' bytes.
+local function truncated_msgpack_buffer(data, trunc)
+    local data = msgpackffi.encode(data)
+    data = data:sub(1, data:len() - trunc)
+    local len = data:len()
+    local buf = buffer.ibuf()
+    -- Ensure we have enough buffer to write len + trunc bytes.
+    buf:reserve(len + trunc)
+    local p = buf:alloc(len)
+    -- Ensure len bytes follows with trunc zero bytes.
+    ffi.copy(p, data .. string.rep('\0', trunc), len + trunc)
+    return buf
+end
+
+local function truncated_msgpack_source(data, trunc)
+    local buf = truncated_msgpack_buffer(data, trunc)
+    return merger.new_source_frombuffer(buf)
+end
+
+local bad_source_new_calls = {
+    {
+        'Bad fetch iterator',
+        funcs = {'new_buffer_source', 'new_table_source',
+                 'new_tuple_source'},
+        params = {1},
+        exp_err = '^Usage: merger%.[a-z_]+%(gen, param, state%)$',
+    },
+    {
+        'Bad chunk type',
+        funcs = {'new_source_frombuffer', 'new_source_fromtable'},
+        params = {1},
+        exp_err = '^Usage: merger%.[a-z_]+%(<.+>%)$',
+    },
+    {
+        'Bad buffer chunk',
+        funcs = {'new_source_frombuffer'},
+        params = {ffi.new('char *')},
+        exp_err = '^Usage: merger%.[a-z_]+%(<cdata<struct ibuf>>%)$',
+    },
+}
+
+local bad_chunks = {
+    {
+        'Bad buffer source chunk (not cdata)',
+        func = 'new_buffer_source',
+        chunk = 1,
+        exp_err = 'Expected <state>, <buffer>',
+    },
+    {
+        'Bad buffer source chunk (wrong ctype)',
+        func = 'new_buffer_source',
+        chunk = ffi.new('char *'),
+        exp_err = 'Expected <state>, <buffer>',
+    },
+    {
+        'Bad table source chunk',
+        func = 'new_table_source',
+        chunk = 1,
+        exp_err = 'Expected <state>, <table>',
+    },
+    {
+        'Bad tuple source chunk (not cdata)',
+        func = 'new_tuple_source',
+        chunk = 1,
+        exp_err = 'A tuple or a table expected, got number',
+    },
+    {
+        'Bad tuple source chunk (wrong ctype)',
+        func = 'new_tuple_source',
+        chunk = ffi.new('char *'),
+        exp_err = 'A tuple or a table expected, got cdata',
+    },
+}
+
+local bad_merger_new_calls = {
+    {
+        'Bad opts',
+        sources = {},
+        opts = 1,
+        exp_err = merger_new_usage(nil),
+    },
+    {
+        'Bad opts.reverse',
+        sources = {},
+        opts = {reverse = 1},
+        exp_err = merger_new_usage('reverse'),
+    },
+}
+
+local bad_merger_select_calls = {
+    {
+        'Wrong source of table type',
+        sources = {merger.new_source_fromtable({1})},
+        opts = nil,
+        exp_err = 'A tuple or a table expected, got number',
+    },
+    {
+        'Bad msgpack source: wrong length of the tuples array',
+        -- Remove the last tuple from msgpack data, but keep old
+        -- tuples array size.
+        sources = {
+            truncated_msgpack_source({{''}, {''}, {''}}, 2),
+        },
+        opts = {},
+        exp_err = 'Unexpected msgpack buffer end',
+    },
+    {
+        'Bad msgpack source: wrong length of a tuple',
+        -- Remove half of the last tuple, but keep old tuple size.
+        sources = {
+            truncated_msgpack_source({{''}, {''}, {''}}, 1),
+        },
+        opts = {},
+        exp_err = 'Unexpected msgpack buffer end',
+    },
+    {
+        'Bad opts.buffer (wrong type)',
+        sources = {},
+        opts = {buffer = 1},
+        exp_err = merger_select_usage('buffer'),
+    },
+    {
+        'Bad opts.buffer (wrong cdata type)',
+        sources = {},
+        opts = {buffer = ffi.new('char *')},
+        exp_err = merger_select_usage('buffer'),
+    },
+    {
+        'Bad opts.limit (wrong type)',
+        sources = {},
+        opts = {limit = 'hello'},
+        exp_err = merger_select_usage('limit'),
+    }
+}
+
+local schemas = {
+    {
+        name = 'small_unsigned',
+        parts = {
+            {
+                fieldno = 2,
+                type = 'unsigned',
+            }
+        },
+        gen_tuple = function(tupleno)
+            return {'id_' .. tostring(tupleno), tupleno}
+        end,
+    },
+    -- Test with N-1 equal parts and Nth different.
+    {
+        name = 'many_parts',
+        parts = (function()
+            local parts = {}
+            for i = 1, 16 do
+                parts[i] = {
+                    fieldno = i,
+                    type = 'unsigned',
+                }
+            end
+            return parts
+        end)(),
+        gen_tuple = function(tupleno)
+            local tuple = {}
+            -- 15 constant parts
+            for i = 1, 15 do
+                tuple[i] = i
+            end
+            -- 16th part is varying
+            tuple[16] = tupleno
+            return tuple
+        end,
+        -- reduce tuple count to decrease test run time
+        tuple_count = 16,
+    },
+    -- Test null value in nullable field of an index.
+    {
+        name = 'nullable',
+        parts = {
+            {
+                fieldno = 1,
+                type = 'unsigned',
+            },
+            {
+                fieldno = 2,
+                type = 'string',
+                is_nullable = true,
+            },
+        },
+        gen_tuple = function(i)
+            if i % 1 == 1 then
+                return {0, tostring(i)}
+            else
+                return {0, box.NULL}
+            end
+        end,
+    },
+    -- Test index part with 'collation_id' option (as in net.box's
+    -- response).
+    {
+        name = 'collation_id',
+        parts = {
+            {
+                fieldno = 1,
+                type = 'string',
+                collation_id = 2, -- unicode_ci
+            },
+        },
+        gen_tuple = function(i)
+            local letters = {'a', 'b', 'c', 'A', 'B', 'C'}
+            if i <= #letters then
+                return {letters[i]}
+            else
+                return {''}
+            end
+        end,
+    },
+    -- Test index part with 'collation' option (as in local index
+    -- parts).
+    {
+        name = 'collation',
+        parts = {
+            {
+                fieldno = 1,
+                type = 'string',
+                collation = 'unicode_ci',
+            },
+        },
+        gen_tuple = function(i)
+            local letters = {'a', 'b', 'c', 'A', 'B', 'C'}
+            if i <= #letters then
+                return {letters[i]}
+            else
+                return {''}
+            end
+        end,
+    },
+}
+
+local function is_unicode_ci_part(part)
+    return part.collation_id == 2 or part.collation == 'unicode_ci'
+end
+
+local function tuple_comparator(a, b, parts)
+    for _, part in ipairs(parts) do
+        local fieldno = part.fieldno
+        if a[fieldno] ~= b[fieldno] then
+            if a[fieldno] == nil then
+                return -1
+            end
+            if b[fieldno] == nil then
+                return 1
+            end
+            if is_unicode_ci_part(part) then
+                return utf8.casecmp(a[fieldno], b[fieldno])
+            end
+            return a[fieldno] < b[fieldno] and -1 or 1
+        end
+    end
+
+    return 0
+end
+
+local function sort_tuples(tuples, parts, opts)
+    local function tuple_comparator_wrapper(a, b)
+        local cmp = tuple_comparator(a, b, parts)
+        if cmp < 0 then
+            return not opts.reverse
+        elseif cmp > 0 then
+            return opts.reverse
+        else
+            return false
+        end
+    end
+
+    table.sort(tuples, tuple_comparator_wrapper)
+end
+
+local function lowercase_unicode_ci_fields(tuples, parts)
+    for i = 1, #tuples do
+        local tuple = tuples[i]
+        for _, part in ipairs(parts) do
+            if is_unicode_ci_part(part) then
+                -- Workaround #3709.
+                if tuple[part.fieldno]:len() > 0 then
+                    tuple[part.fieldno] = utf8.lower(tuple[part.fieldno])
+                end
+            end
+        end
+    end
+end
+
+local function fetch_source_gen(param, state)
+    local input_type = param.input_type
+    local tuples = param.tuples
+    local last_pos = state.last_pos
+    local fetch_block_size = FETCH_BLOCK_SIZE
+    -- A chunk size is always 1 for a tuple source.
+    if input_type == 'tuple' then
+        fetch_block_size = 1
+    end
+    local data = fun.iter(tuples):drop(last_pos):take(
+        fetch_block_size):totable()
+    if #data == 0 then
+        return
+    end
+    local new_state = {last_pos = last_pos + #data}
+    if input_type == 'table' then
+        return new_state, data
+    elseif input_type == 'buffer' then
+        local buf = buffer.ibuf()
+        msgpackffi.internal.encode_r(buf, data, 0)
+        return new_state, buf
+    elseif input_type == 'tuple' then
+        assert(#data <= 1)
+        if #data == 0 then return end
+        return new_state, data[1]
+    else
+        assert(false)
+    end
+end
+
+local function fetch_source_iterator(input_type, tuples)
+    local param = {
+        input_type = input_type,
+        tuples = tuples,
+    }
+    local state = {
+        last_pos = 0,
+    }
+    return fetch_source_gen, param, state
+end
+
+local function prepare_data(schema, tuple_count, source_count, opts)
+    local opts = opts or {}
+    local input_type = opts.input_type
+    local use_table_as_tuple = opts.use_table_as_tuple
+    local use_fetch_source = opts.use_fetch_source
+
+    local tuples = {}
+    local exp_result = {}
+
+    -- Ensure empty sources are empty table and not nil.
+    for i = 1, source_count do
+        if tuples[i] == nil then
+            tuples[i] = {}
+        end
+    end
+
+    -- Prepare N tables with tuples as input for merger.
+    for i = 1, tuple_count do
+        -- [1, source_count]
+        local guava = digest.guava(i, source_count) + 1
+        local tuple = schema.gen_tuple(i)
+        table.insert(exp_result, tuple)
+        if not use_table_as_tuple then
+            assert(input_type ~= 'buffer')
+            tuple = box.tuple.new(tuple)
+        end
+        table.insert(tuples[guava], tuple)
+    end
+
+    -- Sort tuples within each source.
+    for _, source_tuples in pairs(tuples) do
+        sort_tuples(source_tuples, schema.parts, opts)
+    end
+
+    -- Sort expected result.
+    sort_tuples(exp_result, schema.parts, opts)
+
+    -- Fill sources.
+    local sources
+    if use_fetch_source then
+        sources = {}
+        for i = 1, source_count do
+            local func = ('new_%s_source'):format(input_type)
+            sources[i] = merger[func](fetch_source_iterator(input_type,
+                tuples[i]))
+        end
+    elseif input_type == 'table' then
+        -- Imitate netbox's select w/o {buffer = ...}.
+        sources = {}
+        for i = 1, source_count do
+            sources[i] = merger.new_source_fromtable(tuples[i])
+        end
+    elseif input_type == 'buffer' then
+        -- Imitate netbox's select with {buffer = ...}.
+        sources = {}
+        for i = 1, source_count do
+            local buf = buffer.ibuf()
+            sources[i] = merger.new_source_frombuffer(buf)
+            msgpackffi.internal.encode_r(buf, tuples[i], 0)
+        end
+    elseif input_type == 'tuple' then
+        assert(false)
+    else
+        assert(false)
+    end
+
+    return sources, exp_result
+end
+
+local function test_case_opts_str(opts)
+    local params = {}
+
+    if opts.input_type then
+        table.insert(params, 'input_type: ' .. opts.input_type)
+    end
+
+    if opts.output_type then
+        table.insert(params, 'output_type: ' .. opts.output_type)
+    end
+
+    if opts.reverse then
+        table.insert(params, 'reverse')
+    end
+
+    if opts.use_table_as_tuple then
+        table.insert(params, 'use_table_as_tuple')
+    end
+
+    if opts.use_fetch_source then
+        table.insert(params, 'use_fetch_source')
+    end
+
+    if next(params) == nil then
+        return ''
+    end
+
+    return (' (%s)'):format(table.concat(params, ', '))
+end
+
+local function run_merger(test, schema, tuple_count, source_count, opts)
+    fiber.yield()
+
+    local opts = opts or {}
+
+    -- Prepare data.
+    local sources, exp_result = prepare_data(schema, tuple_count, source_count,
+                                             opts)
+
+    -- Create a merger instance.
+    local merger_inst = merger.new(schema.key_def, sources,
+        {reverse = opts.reverse})
+
+    local res
+
+    -- Run merger and prepare output for compare.
+    if opts.output_type == 'table' then
+        -- Table output.
+        res = merger_inst:select()
+    elseif opts.output_type == 'buffer' then
+        -- Buffer output.
+        local output_buffer = buffer.ibuf()
+        merger_inst:select({buffer = output_buffer})
+        res = msgpackffi.decode(output_buffer.rpos)
+    else
+        -- Tuple output.
+        assert(opts.output_type == 'tuple')
+        res = merger_inst:pairs():totable()
+    end
+
+    -- A bit more postprocessing to compare.
+    for i = 1, #res do
+        if type(res[i]) ~= 'table' then
+            res[i] = res[i]:totable()
+        end
+    end
+
+    -- unicode_ci does not differentiate btw 'A' and 'a', so the
+    -- order is arbitrary. We transform fields with unicode_ci
+    -- collation in parts to lower case before comparing.
+    lowercase_unicode_ci_fields(res, schema.parts)
+    lowercase_unicode_ci_fields(exp_result, schema.parts)
+
+    test:is_deeply(res, exp_result,
+        ('check order on %3d tuples in %4d sources%s')
+        :format(tuple_count, source_count, test_case_opts_str(opts)))
+end
+
+local function run_case(test, schema, opts)
+    local opts = opts or {}
+
+    local case_name = ('testing on schema %s%s'):format(
+        schema.name, test_case_opts_str(opts))
+    local tuple_count = schema.tuple_count or 100
+
+    local input_type = opts.input_type
+    local use_table_as_tuple = opts.use_table_as_tuple
+    local use_fetch_source = opts.use_fetch_source
+
+    -- Skip meaningless flags combinations.
+    if input_type == 'buffer' and not use_table_as_tuple then
+        return
+    end
+    if input_type == 'tuple' and not use_fetch_source then
+        return
+    end
+
+    test:test(case_name, function(test)
+        test:plan(4)
+
+        -- Check with small buffer count.
+        run_merger(test, schema, tuple_count, 1, opts)
+        run_merger(test, schema, tuple_count, 2, opts)
+        run_merger(test, schema, tuple_count, 5, opts)
+
+        -- Check more buffers then tuple count.
+        run_merger(test, schema, tuple_count, 128, opts)
+    end)
+end
+
+local test = tap.test('merger')
+test:plan(#bad_source_new_calls + #bad_chunks + #bad_merger_new_calls +
+    #bad_merger_select_calls + 6 + #schemas * 48)
+
+-- For collations.
+box.cfg{}
+
+for _, case in ipairs(bad_source_new_calls) do
+    test:test(case[1], function(test)
+        local funcs = case.funcs
+        test:plan(#funcs)
+        for _, func in ipairs(funcs) do
+            local ok, err = pcall(merger[func], unpack(case.params))
+            test:ok(ok == false and err:match(case.exp_err), func)
+        end
+    end)
+end
+
+for _, case in ipairs(bad_chunks) do
+    local source = merger[case.func](function(_, state)
+        return state, case.chunk
+    end, {}, {})
+    local ok, err = pcall(function()
+        return source:pairs():take(1):totable()
+    end)
+    test:ok(ok == false and err:match(case.exp_err), case[1])
+end
+
+-- Create the key_def for the test cases below.
+local key_def = key_def_lib.new({{
+    fieldno = 1,
+    type = 'string',
+}})
+
+-- Bad merger.new() calls.
+for _, case in ipairs(bad_merger_new_calls) do
+    local ok, err = pcall(merger.new, key_def, case.sources, case.opts)
+    err = tostring(err) -- cdata -> string
+    test:is_deeply({ok, err}, {false, case.exp_err}, case[1])
+end
+
+-- Bad source or/and opts parameters for merger's methods.
+for _, case in ipairs(bad_merger_select_calls) do
+    local merger_inst = merger.new(key_def, case.sources)
+    local ok, err = pcall(merger_inst.select, merger_inst, case.opts)
+    err = tostring(err) -- cdata -> string
+    test:is_deeply({ok, err}, {false, case.exp_err}, case[1])
+end
+
+-- Create a key_def for each schema.
+for _, schema in ipairs(schemas) do
+    schema.key_def = key_def_lib.new(schema.parts)
+end
+
+test:test('use a source in two mergers', function(test)
+    test:plan(5)
+
+    local data = {{'a'}, {'b'}, {'c'}}
+    local source = merger.new_source_fromtable(data)
+    local i1 = merger.new(key_def, {source}):pairs()
+    local i2 = merger.new(key_def, {source}):pairs()
+
+    local t1 = i1:head():totable()
+    test:is_deeply(t1, data[1], 'tuple 1 from merger 1')
+
+    local t3 = i2:head():totable()
+    test:is_deeply(t3, data[3], 'tuple 3 from merger 2')
+
+    local t2 = i1:head():totable()
+    test:is_deeply(t2, data[2], 'tuple 2 from merger 1')
+
+    test:ok(i1:is_null(), 'merger 1 ends')
+    test:ok(i2:is_null(), 'merger 2 ends')
+end)
+
+local function reusable_source_gen(param)
+    local chunks = param.chunks
+    local idx = param.idx or 1
+
+    if idx > table.maxn(chunks) then
+        return
+    end
+
+    local chunk = chunks[idx]
+    param.idx = idx + 1
+
+    if chunk == nil then
+        return
+    end
+    return box.NULL, chunk
+end
+
+local function verify_reusable_source(test, source)
+    test:plan(3)
+
+    local exp = {{1}, {2}}
+    local res = source:pairs():map(box.tuple.totable):totable()
+    test:is_deeply(res, exp, '1st use')
+
+    local exp = {{3}, {4}, {5}}
+    local res = source:pairs():map(box.tuple.totable):totable()
+    test:is_deeply(res, exp, '2nd use')
+
+    local exp = {}
+    local res = source:pairs():map(box.tuple.totable):totable()
+    test:is_deeply(res, exp, 'end')
+end
+
+test:test('reuse a tuple source', function(test)
+    local tuples = {{1}, {2}, nil, {3}, {4}, {5}}
+    local source = merger.new_tuple_source(reusable_source_gen,
+        {chunks = tuples})
+    verify_reusable_source(test, source)
+end)
+
+test:test('reuse a table source', function(test)
+    local chunks = {{{1}}, {{2}}, {}, nil, {{3}}, {{4}}, {}, {{5}}}
+    local source = merger.new_table_source(reusable_source_gen,
+        {chunks = chunks})
+    verify_reusable_source(test, source)
+end)
+
+test:test('reuse a buffer source', function(test)
+    local chunks_tbl = {{{1}}, {{2}}, {}, nil, {{3}}, {{4}}, {}, {{5}}}
+    local chunks = {}
+    for i = 1, table.maxn(chunks_tbl) do
+        if chunks_tbl[i] == nil then
+            chunks[i] = nil
+        else
+            chunks[i] = buffer.ibuf()
+            msgpackffi.internal.encode_r(chunks[i], chunks_tbl[i], 0)
+        end
+    end
+    local source = merger.new_buffer_source(reusable_source_gen,
+        {chunks = chunks})
+    verify_reusable_source(test, source)
+end)
+
+test:test('use limit', function(test)
+    test:plan(6)
+
+    local data = {{'a'}, {'b'}}
+
+    local source = merger.new_source_fromtable(data)
+    local m = merger.new(key_def, {source})
+    local res = m:select({limit = 0})
+    test:is(#res, 0, 'table output with limit 0')
+
+    local source = merger.new_source_fromtable(data)
+    local m = merger.new(key_def, {source})
+    local res = m:select({limit = 1})
+    test:is(#res, 1, 'table output with limit 1')
+    test:is_deeply(res[1]:totable(), data[1], 'tuple content')
+
+    local source = merger.new_source_fromtable(data)
+    local m = merger.new(key_def, {source})
+    local output_buffer = buffer.ibuf()
+    m:select({buffer = output_buffer, limit = 0})
+    local res = msgpackffi.decode(output_buffer.rpos)
+    test:is(#res, 0, 'buffer output with limit 0')
+
+    local source = merger.new_source_fromtable(data)
+    local m = merger.new(key_def, {source})
+    output_buffer:recycle()
+    m:select({buffer = output_buffer, limit = 1})
+    local res = msgpackffi.decode(output_buffer.rpos)
+    test:is(#res, 1, 'buffer output with limit 1')
+    test:is_deeply(res[1], data[1], 'tuple content')
+end)
+
+test:test('cascade mergers', function(test)
+    test:plan(2)
+
+    local data = {{'a'}, {'b'}}
+
+    local source = merger.new_source_fromtable(data)
+    local m1 = merger.new(key_def, {source})
+    local m2 = merger.new(key_def, {m1})
+
+    local res = m2:pairs():map(box.tuple.totable):totable()
+    test:is_deeply(res, data, 'same key_def')
+
+    local key_def_unicode = key_def_lib.new({{
+        fieldno = 1,
+        type = 'string',
+        collation = 'unicode',
+    }})
+
+    local source = merger.new_source_fromtable(data)
+    local m1 = merger.new(key_def, {source})
+    local m2 = merger.new(key_def_unicode, {m1})
+
+    local res = m2:pairs():map(box.tuple.totable):totable()
+    test:is_deeply(res, data, 'different key_defs')
+end)
+
+-- Merging cases.
+for _, input_type in ipairs({'buffer', 'table', 'tuple'}) do
+    for _, output_type in ipairs({'buffer', 'table', 'tuple'}) do
+        for _, reverse in ipairs({false, true}) do
+            for _, use_table_as_tuple in ipairs({false, true}) do
+                for _, use_fetch_source in ipairs({false, true}) do
+                    for _, schema in ipairs(schemas) do
+                        run_case(test, schema, {
+                            input_type = input_type,
+                            output_type = output_type,
+                            reverse = reverse,
+                            use_table_as_tuple = use_table_as_tuple,
+                            use_fetch_source = use_fetch_source,
+                        })
+                    end
+                end
+            end
+        end
+    end
+end
+
+os.exit(test:check() and 0 or 1)
-- 
2.21.0

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v4 1/4] lua: add non-recursive msgpack decoding functions
  2019-05-07 22:30 ` [PATCH v4 1/4] lua: add non-recursive msgpack decoding functions Alexander Turenko
@ 2019-05-13 13:52   ` Vladimir Davydov
  0 siblings, 0 replies; 9+ messages in thread
From: Vladimir Davydov @ 2019-05-13 13:52 UTC (permalink / raw)
  To: Alexander Turenko; +Cc: tarantool-patches

Pushed to master, thanks!

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v4 2/4] net.box: add skip_header option to use with buffer
  2019-05-07 22:30 ` [PATCH v4 2/4] net.box: add skip_header option to use with buffer Alexander Turenko
@ 2019-05-13 13:52   ` Vladimir Davydov
  0 siblings, 0 replies; 9+ messages in thread
From: Vladimir Davydov @ 2019-05-13 13:52 UTC (permalink / raw)
  To: Alexander Turenko; +Cc: tarantool-patches

Pushed to master, thanks!

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v4 3/4] Add merger for tuples streams (C part)
  2019-05-07 22:30 ` [PATCH v4 3/4] Add merger for tuples streams (C part) Alexander Turenko
@ 2019-05-13 14:49   ` Vladimir Davydov
  0 siblings, 0 replies; 9+ messages in thread
From: Vladimir Davydov @ 2019-05-13 14:49 UTC (permalink / raw)
  To: Alexander Turenko; +Cc: tarantool-patches

Pushed to master, thanks!

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v4 4/4] Add merger for tuple streams (Lua part)
  2019-05-07 22:30 ` [PATCH v4 4/4] Add merger for tuple streams (Lua part) Alexander Turenko
@ 2019-05-13 14:49   ` Vladimir Davydov
  0 siblings, 0 replies; 9+ messages in thread
From: Vladimir Davydov @ 2019-05-13 14:49 UTC (permalink / raw)
  To: Alexander Turenko; +Cc: tarantool-patches

Pushed to master, thanks!

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2019-05-13 14:49 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-05-07 22:30 [PATCH v4 0/4] Merger Alexander Turenko
2019-05-07 22:30 ` [PATCH v4 1/4] lua: add non-recursive msgpack decoding functions Alexander Turenko
2019-05-13 13:52   ` Vladimir Davydov
2019-05-07 22:30 ` [PATCH v4 2/4] net.box: add skip_header option to use with buffer Alexander Turenko
2019-05-13 13:52   ` Vladimir Davydov
2019-05-07 22:30 ` [PATCH v4 3/4] Add merger for tuples streams (C part) Alexander Turenko
2019-05-13 14:49   ` Vladimir Davydov
2019-05-07 22:30 ` [PATCH v4 4/4] Add merger for tuple streams (Lua part) Alexander Turenko
2019-05-13 14:49   ` Vladimir Davydov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox