* [PATCH v4 1/4] lua: add non-recursive msgpack decoding functions
2019-05-07 22:30 [PATCH v4 0/4] Merger Alexander Turenko
@ 2019-05-07 22:30 ` Alexander Turenko
2019-05-13 13:52 ` Vladimir Davydov
2019-05-07 22:30 ` [PATCH v4 2/4] net.box: add skip_header option to use with buffer Alexander Turenko
` (2 subsequent siblings)
3 siblings, 1 reply; 9+ messages in thread
From: Alexander Turenko @ 2019-05-07 22:30 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches
Needed for #3276.
@TarantoolBot document
Title: Non-recursive msgpack decoding functions
Contracts:
```
msgpack.decode_array_header(buf.rpos, buf:size()) -> arr_len, new_rpos
msgpack.decode_map_header(buf.rpos, buf:size()) -> map_len, new_rpos
```
These functions are intended to be used with a msgpack buffer received
from net.box. A user may want to skip {[IPROTO_DATA_KEY] = ...} wrapper
and an array header before pass the buffer to decode in some C function.
See https://github.com/tarantool/tarantool/issues/2195 for more
information re this net.box's API.
---
src/lua/msgpack.c | 82 +++++++++++++++
test/app-tap/msgpack.test.lua | 182 +++++++++++++++++++++++++++++++++-
2 files changed, 263 insertions(+), 1 deletion(-)
diff --git a/src/lua/msgpack.c b/src/lua/msgpack.c
index 1b1874eb2..d3cfc0e2b 100644
--- a/src/lua/msgpack.c
+++ b/src/lua/msgpack.c
@@ -422,6 +422,86 @@ lua_ibuf_msgpack_decode(lua_State *L)
return 2;
}
+/**
+ * Verify and set arguments: data and size.
+ *
+ * Always return 0. In case of any fail raise a Lua error.
+ */
+static int
+verify_decode_header_args(lua_State *L, const char *func_name,
+ const char **data_p, ptrdiff_t *size_p)
+{
+ /* Verify arguments count. */
+ if (lua_gettop(L) != 2)
+ return luaL_error(L, "Usage: %s(ptr, size)", func_name);
+
+ /* Verify ptr type. */
+ uint32_t ctypeid;
+ const char *data = *(char **) luaL_checkcdata(L, 1, &ctypeid);
+ if (ctypeid != CTID_CHAR_PTR)
+ return luaL_error(L, "%s: 'char *' expected", func_name);
+
+ /* Verify size type and value. */
+ ptrdiff_t size = (ptrdiff_t) luaL_checkinteger(L, 2);
+ if (size <= 0)
+ return luaL_error(L, "%s: non-positive size", func_name);
+
+ *data_p = data;
+ *size_p = size;
+
+ return 0;
+}
+
+/**
+ * msgpack.decode_array_header(buf.rpos, buf:size())
+ * -> arr_len, new_rpos
+ */
+static int
+lua_decode_array_header(lua_State *L)
+{
+ const char *func_name = "msgpack.decode_array_header";
+ const char *data;
+ ptrdiff_t size;
+ verify_decode_header_args(L, func_name, &data, &size);
+
+ if (mp_typeof(*data) != MP_ARRAY)
+ return luaL_error(L, "%s: unexpected msgpack type", func_name);
+
+ if (mp_check_array(data, data + size) > 0)
+ return luaL_error(L, "%s: unexpected end of buffer", func_name);
+
+ uint32_t len = mp_decode_array(&data);
+
+ lua_pushinteger(L, len);
+ *(const char **) luaL_pushcdata(L, CTID_CHAR_PTR) = data;
+ return 2;
+}
+
+/**
+ * msgpack.decode_map_header(buf.rpos, buf:size())
+ * -> map_len, new_rpos
+ */
+static int
+lua_decode_map_header(lua_State *L)
+{
+ const char *func_name = "msgpack.decode_map_header";
+ const char *data;
+ ptrdiff_t size;
+ verify_decode_header_args(L, func_name, &data, &size);
+
+ if (mp_typeof(*data) != MP_MAP)
+ return luaL_error(L, "%s: unexpected msgpack type", func_name);
+
+ if (mp_check_map(data, data + size) > 0)
+ return luaL_error(L, "%s: unexpected end of buffer", func_name);
+
+ uint32_t len = mp_decode_map(&data);
+
+ lua_pushinteger(L, len);
+ *(const char **) luaL_pushcdata(L, CTID_CHAR_PTR) = data;
+ return 2;
+}
+
static int
lua_msgpack_new(lua_State *L);
@@ -430,6 +510,8 @@ static const luaL_Reg msgpacklib[] = {
{ "decode", lua_msgpack_decode },
{ "decode_unchecked", lua_msgpack_decode_unchecked },
{ "ibuf_decode", lua_ibuf_msgpack_decode },
+ { "decode_array_header", lua_decode_array_header },
+ { "decode_map_header", lua_decode_map_header },
{ "new", lua_msgpack_new },
{ NULL, NULL }
};
diff --git a/test/app-tap/msgpack.test.lua b/test/app-tap/msgpack.test.lua
index 0e1692ad9..1b0bb9806 100755
--- a/test/app-tap/msgpack.test.lua
+++ b/test/app-tap/msgpack.test.lua
@@ -49,9 +49,188 @@ local function test_misc(test, s)
test:ok(not st and e:match("null"), "null ibuf")
end
+local function test_decode_array_map_header(test, s)
+ local ffi = require('ffi')
+
+ local usage_err = 'Usage: msgpack%.decode_[^_(]+_header%(ptr, size%)'
+ local end_of_buffer_err = 'msgpack%.decode_[^_]+_header: unexpected end ' ..
+ 'of buffer'
+ local non_positive_size_err = 'msgpack.decode_[^_]+_header: ' ..
+ 'non%-positive size'
+
+ local decode_cases = {
+ {
+ 'fixarray',
+ func = s.decode_array_header,
+ data = ffi.cast('char *', '\x94'),
+ size = 1,
+ exp_len = 4,
+ exp_rewind = 1,
+ },
+ {
+ 'array 16',
+ func = s.decode_array_header,
+ data = ffi.cast('char *', '\xdc\x00\x04'),
+ size = 3,
+ exp_len = 4,
+ exp_rewind = 3,
+ },
+ {
+ 'array 32',
+ func = s.decode_array_header,
+ data = ffi.cast('char *', '\xdd\x00\x00\x00\x04'),
+ size = 5,
+ exp_len = 4,
+ exp_rewind = 5,
+ },
+ {
+ 'truncated array 16',
+ func = s.decode_array_header,
+ data = ffi.cast('char *', '\xdc\x00'),
+ size = 2,
+ exp_err = end_of_buffer_err,
+ },
+ {
+ 'truncated array 32',
+ func = s.decode_array_header,
+ data = ffi.cast('char *', '\xdd\x00\x00\x00'),
+ size = 4,
+ exp_err = end_of_buffer_err,
+ },
+ {
+ 'fixmap',
+ func = s.decode_map_header,
+ data = ffi.cast('char *', '\x84'),
+ size = 1,
+ exp_len = 4,
+ exp_rewind = 1,
+ },
+ {
+ 'map 16',
+ func = s.decode_map_header,
+ data = ffi.cast('char *', '\xde\x00\x04'),
+ size = 3,
+ exp_len = 4,
+ exp_rewind = 3,
+ },
+ {
+ 'array 32',
+ func = s.decode_map_header,
+ data = ffi.cast('char *', '\xdf\x00\x00\x00\x04'),
+ size = 5,
+ exp_len = 4,
+ exp_rewind = 5,
+ },
+ {
+ 'truncated map 16',
+ func = s.decode_map_header,
+ data = ffi.cast('char *', '\xde\x00'),
+ size = 2,
+ exp_err = end_of_buffer_err,
+ },
+ {
+ 'truncated map 32',
+ func = s.decode_map_header,
+ data = ffi.cast('char *', '\xdf\x00\x00\x00'),
+ size = 4,
+ exp_err = end_of_buffer_err,
+ },
+ }
+
+ local bad_api_cases = {
+ {
+ 'wrong msgpack type',
+ data = ffi.cast('char *', '\xc0'),
+ size = 1,
+ exp_err = 'msgpack.decode_[^_]+_header: unexpected msgpack type',
+ },
+ {
+ 'zero size buffer',
+ data = ffi.cast('char *', ''),
+ size = 0,
+ exp_err = non_positive_size_err,
+ },
+ {
+ 'negative size buffer',
+ data = ffi.cast('char *', ''),
+ size = -1,
+ exp_err = non_positive_size_err,
+ },
+ {
+ 'size is nil',
+ data = ffi.cast('char *', ''),
+ size = nil,
+ exp_err = 'bad argument',
+ },
+ {
+ 'no arguments',
+ args = {},
+ exp_err = usage_err,
+ },
+ {
+ 'one argument',
+ args = {ffi.cast('char *', '')},
+ exp_err = usage_err,
+ },
+ {
+ 'data is nil',
+ args = {nil, 1},
+ exp_err = 'expected cdata as 1 argument',
+ },
+ {
+ 'data is not cdata',
+ args = {1, 1},
+ exp_err = 'expected cdata as 1 argument',
+ },
+ {
+ 'data with wrong cdata type',
+ args = {box.tuple.new(), 1},
+ exp_err = "msgpack.decode_[^_]+_header: 'char %*' expected",
+ },
+ {
+ 'size has wrong type',
+ args = {ffi.cast('char *', ''), 'eee'},
+ exp_err = 'bad argument',
+ },
+ }
+
+ test:plan(#decode_cases + 2 * #bad_api_cases)
+
+ -- Decode cases.
+ for _, case in ipairs(decode_cases) do
+ if case.exp_err ~= nil then
+ local ok, err = pcall(case.func, case.data, case.size)
+ local description = ('bad; %s'):format(case[1])
+ test:ok(ok == false and err:match(case.exp_err), description)
+ else
+ local len, new_buf = case.func(case.data, case.size)
+ local rewind = new_buf - case.data
+ local description = ('good; %s'):format(case[1])
+ test:is_deeply({len, rewind}, {case.exp_len, case.exp_rewind},
+ description)
+ end
+ end
+
+ -- Bad api usage cases.
+ for _, func_name in ipairs({'decode_array_header', 'decode_map_header'}) do
+ for _, case in ipairs(bad_api_cases) do
+ local ok, err
+ if case.args ~= nil then
+ local args_len = table.maxn(case.args)
+ ok, err = pcall(s[func_name], unpack(case.args, 1, args_len))
+ else
+ ok, err = pcall(s[func_name], case.data, case.size)
+ end
+ local description = ('%s bad api usage; %s'):format(func_name,
+ case[1])
+ test:ok(ok == false and err:match(case.exp_err), description)
+ end
+ end
+end
+
tap.test("msgpack", function(test)
local serializer = require('msgpack')
- test:plan(10)
+ test:plan(11)
test:test("unsigned", common.test_unsigned, serializer)
test:test("signed", common.test_signed, serializer)
test:test("double", common.test_double, serializer)
@@ -62,4 +241,5 @@ tap.test("msgpack", function(test)
test:test("ucdata", common.test_ucdata, serializer)
test:test("offsets", test_offsets, serializer)
test:test("misc", test_misc, serializer)
+ test:test("decode_array_map", test_decode_array_map_header, serializer)
end)
--
2.21.0
^ permalink raw reply [flat|nested] 9+ messages in thread
* [PATCH v4 2/4] net.box: add skip_header option to use with buffer
2019-05-07 22:30 [PATCH v4 0/4] Merger Alexander Turenko
2019-05-07 22:30 ` [PATCH v4 1/4] lua: add non-recursive msgpack decoding functions Alexander Turenko
@ 2019-05-07 22:30 ` Alexander Turenko
2019-05-13 13:52 ` Vladimir Davydov
2019-05-07 22:30 ` [PATCH v4 3/4] Add merger for tuples streams (C part) Alexander Turenko
2019-05-07 22:30 ` [PATCH v4 4/4] Add merger for tuple streams (Lua part) Alexander Turenko
3 siblings, 1 reply; 9+ messages in thread
From: Alexander Turenko @ 2019-05-07 22:30 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches
Needed for #3276.
@TarantoolBot document
Title: net.box: skip_header option
This option instructs net.box to skip {[IPROTO_DATA_KEY] = ...} wrapper
from a buffer. This may be needed to pass this buffer to some C function
when it expects some specific msgpack input.
Usage example:
```lua
local net_box = require('net.box')
local buffer = require('buffer')
local ffi = require('ffi')
local msgpack = require('msgpack')
local yaml = require('yaml')
box.cfg{listen = 3301}
box.once('load_data', function()
box.schema.user.grant('guest', 'read,write,execute', 'universe')
box.schema.space.create('s')
box.space.s:create_index('pk')
box.space.s:insert({1})
box.space.s:insert({2})
box.space.s:insert({3})
box.space.s:insert({4})
end)
local function foo()
return box.space.s:select()
end
_G.foo = foo
local conn = net_box.connect('localhost:3301')
local buf = buffer.ibuf()
conn.space.s:select(nil, {buffer = buf})
local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
local buf_lua = msgpack.decode(buf_str)
print('select:\n' .. yaml.encode(buf_lua))
-- {48: [[1], [2], [3], [4]]}
local buf = buffer.ibuf()
conn.space.s:select(nil, {buffer = buf, skip_header = true})
local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
local buf_lua = msgpack.decode(buf_str)
print('select:\n' .. yaml.encode(buf_lua))
-- [[1], [2], [3], [4]]
local buf = buffer.ibuf()
conn:call('foo', nil, {buffer = buf})
local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
local buf_lua = msgpack.decode(buf_str)
print('call:\n' .. yaml.encode(buf_lua))
-- {48: [[[1], [2], [3], [4]]]}
local buf = buffer.ibuf()
conn:call('foo', nil, {buffer = buf, skip_header = true})
local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
local buf_lua = msgpack.decode(buf_str)
print('call:\n' .. yaml.encode(buf_lua))
-- [[[1], [2], [3], [4]]]
os.exit()
```
---
src/box/lua/net_box.lua | 56 ++++++----
test/box/net.box.result | 222 +++++++++++++++++++++++++++++++++++++-
test/box/net.box.test.lua | 86 ++++++++++++++-
3 files changed, 333 insertions(+), 31 deletions(-)
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index e1c4b652b..0cfb5075d 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -10,11 +10,12 @@ local urilib = require('uri')
local internal = require('net.box.lib')
local trigger = require('internal.trigger')
-local band = bit.band
-local max = math.max
-local fiber_clock = fiber.clock
-local fiber_self = fiber.self
-local decode = msgpack.decode_unchecked
+local band = bit.band
+local max = math.max
+local fiber_clock = fiber.clock
+local fiber_self = fiber.self
+local decode = msgpack.decode_unchecked
+local decode_map_header = msgpack.decode_map_header
local table_new = require('table.new')
local check_iterator_type = box.internal.check_iterator_type
@@ -483,8 +484,8 @@ local function create_transport(host, port, user, password, callback,
-- @retval nil, error Error occured.
-- @retval not nil Future object.
--
- local function perform_async_request(buffer, method, on_push, on_push_ctx,
- ...)
+ local function perform_async_request(buffer, skip_header, method, on_push,
+ on_push_ctx, ...)
if state ~= 'active' and state ~= 'fetch_schema' then
return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
reason = last_error})
@@ -497,12 +498,13 @@ local function create_transport(host, port, user, password, callback,
local id = next_request_id
method_encoder[method](send_buf, id, ...)
next_request_id = next_id(id)
- -- Request in most cases has maximum 8 members:
- -- method, buffer, id, cond, errno, response, on_push,
- -- on_push_ctx.
- local request = setmetatable(table_new(0, 8), request_mt)
+ -- Request in most cases has maximum 9 members:
+ -- method, buffer, skip_header, id, cond, errno, response,
+ -- on_push, on_push_ctx.
+ local request = setmetatable(table_new(0, 9), request_mt)
request.method = method
request.buffer = buffer
+ request.skip_header = skip_header
request.id = id
request.cond = fiber.cond()
requests[id] = request
@@ -516,10 +518,11 @@ local function create_transport(host, port, user, password, callback,
-- @retval nil, error Error occured.
-- @retval not nil Response object.
--
- local function perform_request(timeout, buffer, method, on_push,
- on_push_ctx, ...)
+ local function perform_request(timeout, buffer, skip_header, method,
+ on_push, on_push_ctx, ...)
local request, err =
- perform_async_request(buffer, method, on_push, on_push_ctx, ...)
+ perform_async_request(buffer, skip_header, method, on_push,
+ on_push_ctx, ...)
if not request then
return nil, err
end
@@ -551,6 +554,15 @@ local function create_transport(host, port, user, password, callback,
if buffer ~= nil then
-- Copy xrow.body to user-provided buffer
local body_len = body_end - body_rpos
+ if request.skip_header then
+ -- Skip {[IPROTO_DATA_KEY] = ...} wrapper.
+ local map_len, key
+ map_len, body_rpos = decode_map_header(body_rpos, body_len)
+ assert(map_len == 1)
+ key, body_rpos = decode(body_rpos)
+ assert(key == IPROTO_DATA_KEY)
+ body_len = body_end - body_rpos
+ end
local wpos = buffer:alloc(body_len)
ffi.copy(wpos, body_rpos, body_len)
body_len = tonumber(body_len)
@@ -1047,18 +1059,19 @@ end
function remote_methods:_request(method, opts, ...)
local transport = self._transport
- local on_push, on_push_ctx, buffer, deadline
+ local on_push, on_push_ctx, buffer, skip_header, deadline
-- Extract options, set defaults, check if the request is
-- async.
if opts then
buffer = opts.buffer
+ skip_header = opts.skip_header
if opts.is_async then
if opts.on_push or opts.on_push_ctx then
error('To handle pushes in an async request use future:pairs()')
end
local res, err =
- transport.perform_async_request(buffer, method, table.insert,
- {}, ...)
+ transport.perform_async_request(buffer, skip_header, method,
+ table.insert, {}, ...)
if err then
box.error(err)
end
@@ -1084,8 +1097,9 @@ function remote_methods:_request(method, opts, ...)
transport.wait_state('active', timeout)
timeout = deadline and max(0, deadline - fiber_clock())
end
- local res, err = transport.perform_request(timeout, buffer, method,
- on_push, on_push_ctx, ...)
+ local res, err = transport.perform_request(timeout, buffer, skip_header,
+ method, on_push, on_push_ctx,
+ ...)
if err then
box.error(err)
end
@@ -1288,10 +1302,10 @@ function console_methods:eval(line, timeout)
end
if self.protocol == 'Binary' then
local loader = 'return require("console").eval(...)'
- res, err = pr(timeout, nil, 'eval', nil, nil, loader, {line})
+ res, err = pr(timeout, nil, false, 'eval', nil, nil, loader, {line})
else
assert(self.protocol == 'Lua console')
- res, err = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n')
+ res, err = pr(timeout, nil, false, 'inject', nil, nil, line..'$EOF$\n')
end
if err then
box.error(err)
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 3894f97d4..d02fed7a3 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -29,7 +29,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
offset, limit, key)
return ret
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
---
...
@@ -1598,6 +1598,18 @@ result
---
- {48: [[2]]}
...
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[2]]
+...
-- insert
c.space.test:insert({3}, {buffer = ibuf})
---
@@ -1610,6 +1622,21 @@ result
---
- {48: [[3]]}
...
+-- insert + skip_header
+_ = space:delete({3})
+---
+...
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
-- update
c.space.test:update({3}, {}, {buffer = ibuf})
---
@@ -1633,6 +1660,29 @@ result
---
- {48: [[3]]}
...
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
-- upsert
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
@@ -1645,6 +1695,18 @@ result
---
- {48: []}
...
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- delete
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
@@ -1657,6 +1719,18 @@ result
---
- {48: []}
...
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- select
c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
---
@@ -1669,6 +1743,18 @@ result
---
- {48: [[3], [2], [1, 'hello']]}
...
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true})
+---
+- 17
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3], [2], [1, 'hello']]
+...
-- select
len = c.space.test:select({}, {buffer = ibuf})
---
@@ -1692,6 +1778,29 @@ result
---
- {48: [[1, 'hello'], [2], [3], [4]]}
...
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+---
+...
+ibuf.rpos + len == ibuf.wpos
+---
+- true
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+ibuf.rpos == ibuf.wpos
+---
+- true
+...
+len
+---
+- 19
+...
+result
+---
+- [[1, 'hello'], [2], [3], [4]]
+...
-- call
c:call("echo", {1, 2, 3}, {buffer = ibuf})
---
@@ -1726,6 +1835,40 @@ result
---
- {48: []}
...
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- eval
c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
---
@@ -1760,6 +1903,75 @@ result
---
- {48: []}
...
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+-- make several request into a buffer with skip_header, then read
+-- results
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
-- unsupported methods
c.space.test:get({1}, { buffer = ibuf})
---
@@ -2603,7 +2815,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
---
...
c.state
@@ -3244,7 +3456,7 @@ c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
future = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
---
...
while not c:is_connected() do fiber.sleep(0.01) end
@@ -3379,7 +3591,7 @@ c:ping()
-- new attempts to read any data - the connection is closed
-- already.
--
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
---
...
while f:status() ~= 'dead' do fiber.sleep(0.01) end
@@ -3398,7 +3610,7 @@ c = net:connect(box.cfg.listen)
data = msgpack.encode(18400000000000000000)..'aaaaaaa'
---
...
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
---
- null
- Peer closed
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index d05b8c605..bea43002d 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -12,7 +12,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
offset, limit, key)
return ret
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
LISTEN = require('uri').parse(box.cfg.listen)
@@ -626,11 +626,22 @@ c.space.test:replace({2}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- insert
c.space.test:insert({3}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- insert + skip_header
+_ = space:delete({3})
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- update
c.space.test:update({3}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -639,21 +650,44 @@ c.space.test.index.primary:update({3}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- upsert
c.space.test:upsert({4}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- delete
c.space.test:upsert({4}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- select
c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- select
len = c.space.test:select({}, {buffer = ibuf})
ibuf.rpos + len == ibuf.wpos
@@ -662,6 +696,14 @@ ibuf.rpos == ibuf.wpos
len
result
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+ibuf.rpos + len == ibuf.wpos
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+ibuf.rpos == ibuf.wpos
+len
+result
+
-- call
c:call("echo", {1, 2, 3}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -673,6 +715,17 @@ c:call("echo", nil, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- eval
c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -684,6 +737,29 @@ c:eval("echo(...)", nil, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
+-- make several request into a buffer with skip_header, then read
+-- results
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- unsupported methods
c.space.test:get({1}, { buffer = ibuf})
c.space.test.index.primary:min({}, { buffer = ibuf})
@@ -1079,7 +1155,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
c.state
while not c:is_connected() do fiber.sleep(0.01) end
c:ping()
@@ -1312,7 +1388,7 @@ finalize_long()
--
c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
future = c:call('long_function', {1, 2, 3}, {is_async = true})
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
while not c:is_connected() do fiber.sleep(0.01) end
finalize_long()
future:wait_result(100)
@@ -1369,7 +1445,7 @@ c:ping()
-- new attempts to read any data - the connection is closed
-- already.
--
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
while f:status() ~= 'dead' do fiber.sleep(0.01) end
c:close()
@@ -1379,7 +1455,7 @@ c:close()
--
c = net:connect(box.cfg.listen)
data = msgpack.encode(18400000000000000000)..'aaaaaaa'
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
c:close()
test_run:grep_log('default', 'too big packet size in the header') ~= nil
--
2.21.0
^ permalink raw reply [flat|nested] 9+ messages in thread
* [PATCH v4 3/4] Add merger for tuples streams (C part)
2019-05-07 22:30 [PATCH v4 0/4] Merger Alexander Turenko
2019-05-07 22:30 ` [PATCH v4 1/4] lua: add non-recursive msgpack decoding functions Alexander Turenko
2019-05-07 22:30 ` [PATCH v4 2/4] net.box: add skip_header option to use with buffer Alexander Turenko
@ 2019-05-07 22:30 ` Alexander Turenko
2019-05-13 14:49 ` Vladimir Davydov
2019-05-07 22:30 ` [PATCH v4 4/4] Add merger for tuple streams (Lua part) Alexander Turenko
3 siblings, 1 reply; 9+ messages in thread
From: Alexander Turenko @ 2019-05-07 22:30 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches
Needed for #3276.
---
src/box/CMakeLists.txt | 1 +
src/box/merger.c | 355 +++++++++++++++++++++++++++++++++++++++
src/box/merger.h | 150 +++++++++++++++++
test/unit/CMakeLists.txt | 3 +
test/unit/merger.result | 71 ++++++++
test/unit/merger.test.c | 285 +++++++++++++++++++++++++++++++
6 files changed, 865 insertions(+)
create mode 100644 src/box/merger.c
create mode 100644 src/box/merger.h
create mode 100644 test/unit/merger.result
create mode 100644 test/unit/merger.test.c
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 2be0d1e35..ce328fb95 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -122,6 +122,7 @@ add_library(box STATIC
execute.c
wal.c
call.c
+ merger.c
${lua_sources}
lua/init.c
lua/call.c
diff --git a/src/box/merger.c b/src/box/merger.c
new file mode 100644
index 000000000..744bba469
--- /dev/null
+++ b/src/box/merger.c
@@ -0,0 +1,355 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "box/merger.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#define HEAP_FORWARD_DECLARATION
+#include "salad/heap.h"
+
+#include "diag.h" /* diag_set() */
+#include "box/tuple.h" /* tuple_ref(), tuple_unref(),
+ tuple_validate() */
+#include "box/tuple_format.h" /* box_tuple_format_new(),
+ tuple_format_*() */
+#include "box/key_def.h" /* key_def_*(),
+ tuple_compare() */
+
+/* {{{ Merger */
+
+/**
+ * Holds a source to fetch next tuples and a last fetched tuple to
+ * compare the node against other nodes.
+ *
+ * The main reason why this structure is separated from a merge
+ * source is that a heap node can not be a member of several
+ * heaps.
+ *
+ * The second reason is that it allows to encapsulate all heap
+ * related logic inside this compilation unit, without any traces
+ * in externally visible structures.
+ */
+struct merger_heap_node {
+ /* A source of tuples. */
+ struct merge_source *source;
+ /*
+ * A last fetched (refcounted) tuple to compare against
+ * other nodes.
+ */
+ struct tuple *tuple;
+ /* An anchor to make the structure a merger heap node. */
+ struct heap_node in_merger;
+};
+
+static bool
+merge_source_less(const heap_t *heap, const struct merger_heap_node *left,
+ const struct merger_heap_node *right);
+#define HEAP_NAME merger_heap
+#define HEAP_LESS merge_source_less
+#define heap_value_t struct merger_heap_node
+#define heap_value_attr in_merger
+#include "salad/heap.h"
+#undef HEAP_NAME
+#undef HEAP_LESS
+#undef heap_value_t
+#undef heap_value_attr
+
+/**
+ * Holds a heap, parameters of a merge process and utility fields.
+ */
+struct merger {
+ /* A merger is a source. */
+ struct merge_source base;
+ /*
+ * Whether a merge process started.
+ *
+ * The merger postpones charging of heap nodes until a
+ * first output tuple is acquired.
+ */
+ bool started;
+ /* A key_def to compare tuples. */
+ struct key_def *key_def;
+ /* A format to acquire compatible tuples from sources. */
+ struct tuple_format *format;
+ /*
+ * A heap of sources (of nodes that contains a source to
+ * be exact).
+ */
+ heap_t heap;
+ /* An array of heap nodes. */
+ uint32_t node_count;
+ struct merger_heap_node *nodes;
+ /* Ascending (false) / descending (true) order. */
+ bool reverse;
+};
+
+/* Helpers */
+
+/**
+ * Data comparing function to construct a heap of sources.
+ */
+static bool
+merge_source_less(const heap_t *heap, const struct merger_heap_node *left,
+ const struct merger_heap_node *right)
+{
+ assert(left->tuple != NULL);
+ assert(right->tuple != NULL);
+ struct merger *merger = container_of(heap, struct merger, heap);
+ int cmp = tuple_compare(left->tuple, right->tuple, merger->key_def);
+ return merger->reverse ? cmp >= 0 : cmp < 0;
+}
+
+/**
+ * Initialize a new merger heap node.
+ */
+static void
+merger_heap_node_create(struct merger_heap_node *node,
+ struct merge_source *source)
+{
+ node->source = source;
+ merge_source_ref(node->source);
+ node->tuple = NULL;
+ heap_node_create(&node->in_merger);
+}
+
+/**
+ * Free a merger heap node.
+ */
+static void
+merger_heap_node_delete(struct merger_heap_node *node)
+{
+ merge_source_unref(node->source);
+ if (node->tuple != NULL)
+ tuple_unref(node->tuple);
+}
+
+/**
+ * The helper to add a new heap node to a merger heap.
+ *
+ * Return -1 at an error and set a diag.
+ *
+ * Otherwise store a next tuple in node->tuple, add the node to
+ * merger->heap and return 0.
+ */
+static int
+merger_add_heap_node(struct merger *merger, struct merger_heap_node *node)
+{
+ struct tuple *tuple = NULL;
+
+ /* Acquire a next tuple. */
+ struct merge_source *source = node->source;
+ if (merge_source_next(source, merger->format, &tuple) != 0)
+ return -1;
+
+ /* Don't add an empty source to a heap. */
+ if (tuple == NULL)
+ return 0;
+
+ node->tuple = tuple;
+
+ /* Add a node to a heap. */
+ if (merger_heap_insert(&merger->heap, node) != 0) {
+ diag_set(OutOfMemory, 0, "malloc", "merger->heap");
+ return -1;
+ }
+
+ return 0;
+}
+
+/* Virtual methods declarations */
+
+static void
+merger_delete(struct merge_source *base);
+static int
+merger_next(struct merge_source *base, struct tuple_format *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Set sources for a merger.
+ *
+ * It is the helper for merger_new().
+ *
+ * Return 0 at success. Return -1 at an error and set a diag.
+ */
+static int
+merger_set_sources(struct merger *merger, struct merge_source **sources,
+ uint32_t source_count)
+{
+ const size_t nodes_size = sizeof(struct merger_heap_node) *
+ source_count;
+ struct merger_heap_node *nodes = malloc(nodes_size);
+ if (nodes == NULL) {
+ diag_set(OutOfMemory, nodes_size, "malloc",
+ "merger heap nodes");
+ return -1;
+ }
+
+ for (uint32_t i = 0; i < source_count; ++i)
+ merger_heap_node_create(&nodes[i], sources[i]);
+
+ merger->node_count = source_count;
+ merger->nodes = nodes;
+ return 0;
+}
+
+
+struct merge_source *
+merger_new(struct key_def *key_def, struct merge_source **sources,
+ uint32_t source_count, bool reverse)
+{
+ static struct merge_source_vtab merger_vtab = {
+ .destroy = merger_delete,
+ .next = merger_next,
+ };
+
+ struct merger *merger = malloc(sizeof(struct merger));
+ if (merger == NULL) {
+ diag_set(OutOfMemory, sizeof(struct merger), "malloc",
+ "merger");
+ return NULL;
+ }
+
+ /*
+ * We need to copy the key_def because it can be collected
+ * before a merge process ends (say, by LuaJIT GC if the
+ * key_def comes from Lua).
+ */
+ key_def = key_def_dup(key_def);
+ if (key_def == NULL) {
+ free(merger);
+ return NULL;
+ }
+
+ struct tuple_format *format = box_tuple_format_new(&key_def, 1);
+ if (format == NULL) {
+ key_def_delete(key_def);
+ free(merger);
+ return NULL;
+ }
+
+ merge_source_create(&merger->base, &merger_vtab);
+ merger->started = false;
+ merger->key_def = key_def;
+ merger->format = format;
+ merger_heap_create(&merger->heap);
+ merger->node_count = 0;
+ merger->nodes = NULL;
+ merger->reverse = reverse;
+
+ if (merger_set_sources(merger, sources, source_count) != 0) {
+ key_def_delete(merger->key_def);
+ tuple_format_unref(merger->format);
+ merger_heap_destroy(&merger->heap);
+ free(merger);
+ return NULL;
+ }
+
+ return &merger->base;
+}
+
+/* Virtual methods */
+
+static void
+merger_delete(struct merge_source *base)
+{
+ struct merger *merger = container_of(base, struct merger, base);
+
+ key_def_delete(merger->key_def);
+ tuple_format_unref(merger->format);
+ merger_heap_destroy(&merger->heap);
+
+ for (uint32_t i = 0; i < merger->node_count; ++i)
+ merger_heap_node_delete(&merger->nodes[i]);
+
+ if (merger->nodes != NULL)
+ free(merger->nodes);
+
+ free(merger);
+}
+
+static int
+merger_next(struct merge_source *base, struct tuple_format *format,
+ struct tuple **out)
+{
+ struct merger *merger = container_of(base, struct merger, base);
+
+ /*
+ * Fetch a first tuple for each source and add all heap
+ * nodes to a merger heap.
+ */
+ if (!merger->started) {
+ for (uint32_t i = 0; i < merger->node_count; ++i) {
+ struct merger_heap_node *node = &merger->nodes[i];
+ if (merger_add_heap_node(merger, node) != 0)
+ return -1;
+ }
+ merger->started = true;
+ }
+
+ /* Get a next tuple. */
+ struct merger_heap_node *node = merger_heap_top(&merger->heap);
+ if (node == NULL) {
+ *out = NULL;
+ return 0;
+ }
+ struct tuple *tuple = node->tuple;
+ assert(tuple != NULL);
+
+ /* Validate the tuple. */
+ if (format != NULL && tuple_validate(format, tuple) != 0)
+ return -1;
+
+ /*
+ * Note: An old node->tuple pointer will be written to
+ * *out as refcounted tuple, so we don't unreference it
+ * here.
+ */
+ struct merge_source *source = node->source;
+ if (merge_source_next(source, merger->format, &node->tuple) != 0)
+ return -1;
+
+ /* Update a heap. */
+ if (node->tuple == NULL)
+ merger_heap_delete(&merger->heap, node);
+ else
+ merger_heap_update(&merger->heap, node);
+
+ *out = tuple;
+ return 0;
+}
+
+/* }}} */
diff --git a/src/box/merger.h b/src/box/merger.h
new file mode 100644
index 000000000..90ae175a9
--- /dev/null
+++ b/src/box/merger.h
@@ -0,0 +1,150 @@
+#ifndef TARANTOOL_BOX_MERGER_H_INCLUDED
+#define TARANTOOL_BOX_MERGER_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/* {{{ Structures */
+
+struct tuple;
+struct key_def;
+struct tuple_format;
+
+struct merge_source;
+
+struct merge_source_vtab {
+ /**
+ * Free a merge source.
+ *
+ * Don't call it directly, use merge_source_unref()
+ * instead.
+ */
+ void (*destroy)(struct merge_source *base);
+ /**
+ * Get a next tuple (refcounted) from a source.
+ *
+ * When format is not NULL the resulting tuple will be in
+ * a compatible format.
+ *
+ * When format is NULL it means that it does not matter
+ * for a caller in which format a tuple will be.
+ *
+ * Return 0 when successfully fetched a tuple or NULL. In
+ * case of an error set a diag and return -1.
+ */
+ int (*next)(struct merge_source *base, struct tuple_format *format,
+ struct tuple **out);
+};
+
+/**
+ * Base (abstract) structure to represent a merge source.
+ *
+ * The structure does not hold any resources.
+ */
+struct merge_source {
+ /* Source-specific methods. */
+ const struct merge_source_vtab *vtab;
+ /* Reference counter. */
+ int refs;
+};
+
+/* }}} */
+
+/* {{{ Base merge source functions */
+
+/**
+ * Increment a merge source reference counter.
+ */
+static inline void
+merge_source_ref(struct merge_source *source)
+{
+ ++source->refs;
+}
+
+/**
+ * Decrement a merge source reference counter. When it has
+ * reached zero, free the source (call destroy() virtual method).
+ */
+static inline void
+merge_source_unref(struct merge_source *source)
+{
+ assert(source->refs - 1 >= 0);
+ if (--source->refs == 0)
+ source->vtab->destroy(source);
+}
+
+/**
+ * @see merge_source_vtab
+ */
+static inline int
+merge_source_next(struct merge_source *source, struct tuple_format *format,
+ struct tuple **out)
+{
+ return source->vtab->next(source, format, out);
+}
+
+/**
+ * Initialize a base merge source structure.
+ */
+static inline void
+merge_source_create(struct merge_source *source, struct merge_source_vtab *vtab)
+{
+ source->vtab = vtab;
+ source->refs = 1;
+}
+
+/* }}} */
+
+/* {{{ Merger */
+
+/**
+ * Create a new merger.
+ *
+ * Return NULL and set a diag in case of an error.
+ */
+struct merge_source *
+merger_new(struct key_def *key_def, struct merge_source **sources,
+ uint32_t source_count, bool reverse);
+
+/* }}} */
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_BOX_MERGER_H_INCLUDED */
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 2c8340800..07dcd6cf2 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -224,3 +224,6 @@ target_link_libraries(swim.test unit swim)
add_executable(swim_proto.test swim_proto.c swim_test_transport.c swim_test_ev.c
swim_test_utils.c ${PROJECT_SOURCE_DIR}/src/version.c)
target_link_libraries(swim_proto.test unit swim)
+
+add_executable(merger.test merger.test.c)
+target_link_libraries(merger.test unit core box)
diff --git a/test/unit/merger.result b/test/unit/merger.result
new file mode 100644
index 000000000..766ebce63
--- /dev/null
+++ b/test/unit/merger.result
@@ -0,0 +1,71 @@
+1..4
+ *** test_basic ***
+ 1..9
+ *** test_array_source ***
+ ok 1 - array source next() (any format): tuple != NULL
+ ok 2 - array source next() (any format): skip tuple validation
+ ok 3 - array source next() (any format): check tuple size
+ ok 4 - array source next() (any format): check tuple data
+ ok 5 - array source next() (any format): tuple != NULL
+ ok 6 - array source next() (any format): skip tuple validation
+ ok 7 - array source next() (any format): check tuple size
+ ok 8 - array source next() (any format): check tuple data
+ ok 9 - array source is empty (any format)
+ *** test_array_source: done ***
+ok 1 - subtests
+ 1..9
+ *** test_array_source ***
+ ok 1 - array source next() (user's format): tuple != NULL
+ ok 2 - array source next() (user's format): validate tuple
+ ok 3 - array source next() (user's format): check tuple size
+ ok 4 - array source next() (user's format): check tuple data
+ ok 5 - array source next() (user's format): tuple != NULL
+ ok 6 - array source next() (user's format): validate tuple
+ ok 7 - array source next() (user's format): check tuple size
+ ok 8 - array source next() (user's format): check tuple data
+ ok 9 - array source is empty (user's format)
+ *** test_array_source: done ***
+ok 2 - subtests
+ 1..17
+ *** test_merger ***
+ ok 1 - merger next() (any format): tuple != NULL
+ ok 2 - merger next() (any format): skip tuple validation
+ ok 3 - merger next() (any format): check tuple size
+ ok 4 - merger next() (any format): check tuple data
+ ok 5 - merger next() (any format): tuple != NULL
+ ok 6 - merger next() (any format): skip tuple validation
+ ok 7 - merger next() (any format): check tuple size
+ ok 8 - merger next() (any format): check tuple data
+ ok 9 - merger next() (any format): tuple != NULL
+ ok 10 - merger next() (any format): skip tuple validation
+ ok 11 - merger next() (any format): check tuple size
+ ok 12 - merger next() (any format): check tuple data
+ ok 13 - merger next() (any format): tuple != NULL
+ ok 14 - merger next() (any format): skip tuple validation
+ ok 15 - merger next() (any format): check tuple size
+ ok 16 - merger next() (any format): check tuple data
+ ok 17 - merger is empty (any format)
+ *** test_merger: done ***
+ok 3 - subtests
+ 1..17
+ *** test_merger ***
+ ok 1 - merger next() (user's format): tuple != NULL
+ ok 2 - merger next() (user's format): validate tuple
+ ok 3 - merger next() (user's format): check tuple size
+ ok 4 - merger next() (user's format): check tuple data
+ ok 5 - merger next() (user's format): tuple != NULL
+ ok 6 - merger next() (user's format): validate tuple
+ ok 7 - merger next() (user's format): check tuple size
+ ok 8 - merger next() (user's format): check tuple data
+ ok 9 - merger next() (user's format): tuple != NULL
+ ok 10 - merger next() (user's format): validate tuple
+ ok 11 - merger next() (user's format): check tuple size
+ ok 12 - merger next() (user's format): check tuple data
+ ok 13 - merger next() (user's format): tuple != NULL
+ ok 14 - merger next() (user's format): validate tuple
+ ok 15 - merger next() (user's format): check tuple size
+ ok 16 - merger next() (user's format): check tuple data
+ ok 17 - merger is empty (user's format)
+ *** test_merger: done ***
+ok 4 - subtests
+ *** test_basic: done ***
diff --git a/test/unit/merger.test.c b/test/unit/merger.test.c
new file mode 100644
index 000000000..b4a989a20
--- /dev/null
+++ b/test/unit/merger.test.c
@@ -0,0 +1,285 @@
+#include "unit.h" /* plan, header, footer, is, ok */
+#include "memory.h" /* memory_init() */
+#include "fiber.h" /* fiber_init() */
+#include "box/tuple.h" /* tuple_init(), tuple_*(),
+ tuple_validate() */
+#include "box/tuple_format.h" /* tuple_format_*,
+ box_tuple_format_new() */
+#include "box/key_def.h" /* key_def_new(),
+ key_def_delete() */
+#include "box/merger.h" /* merger_*() */
+
+/* {{{ Array merge source */
+
+struct merge_source_array {
+ struct merge_source base;
+ uint32_t tuple_count;
+ struct tuple **tuples;
+ uint32_t cur;
+};
+
+/* Virtual methods declarations */
+
+static void
+merge_source_array_destroy(struct merge_source *base);
+static int
+merge_source_array_next(struct merge_source *base, struct tuple_format *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+static struct merge_source *
+merge_source_array_new(bool even)
+{
+ static struct merge_source_vtab merge_source_array_vtab = {
+ .destroy = merge_source_array_destroy,
+ .next = merge_source_array_next,
+ };
+
+ struct merge_source_array *source = malloc(
+ sizeof(struct merge_source_array));
+ assert(source != NULL);
+
+ merge_source_create(&source->base, &merge_source_array_vtab);
+
+ uint32_t tuple_size = 2;
+ const uint32_t tuple_count = 2;
+ /* [1], [3] */
+ static const char *data_odd[] = {"\x91\x01", "\x91\x03"};
+ /* [2], [4] */
+ static const char *data_even[] = {"\x91\x02", "\x91\x04"};
+ const char **data = even ? data_even : data_odd;
+ source->tuples = malloc(sizeof(struct tuple *) * tuple_count);
+ assert(source->tuples != NULL);
+ struct tuple_format *format = tuple_format_runtime;
+ for (uint32_t i = 0; i < tuple_count; ++i) {
+ const char *end = data[i] + tuple_size;
+ source->tuples[i] = tuple_new(format, data[i], end);
+ tuple_ref(source->tuples[i]);
+ }
+ source->tuple_count = tuple_count;
+ source->cur = 0;
+
+ return &source->base;
+}
+
+/* Virtual methods */
+
+static void
+merge_source_array_destroy(struct merge_source *base)
+{
+ struct merge_source_array *source = container_of(base,
+ struct merge_source_array, base);
+
+ for (uint32_t i = 0; i < source->tuple_count; ++i)
+ tuple_unref(source->tuples[i]);
+
+ free(source->tuples);
+ free(source);
+}
+
+static int
+merge_source_array_next(struct merge_source *base, struct tuple_format *format,
+ struct tuple **out)
+{
+ struct merge_source_array *source = container_of(base,
+ struct merge_source_array, base);
+
+ if (source->cur == source->tuple_count) {
+ *out = NULL;
+ return 0;
+ }
+
+ struct tuple *tuple = source->tuples[source->cur];
+ assert(tuple != NULL);
+
+ /*
+ * Note: The source still stores the tuple (and will
+ * unreference it during destroy). Here we should give a
+ * referenced tuple (so a caller should unreference it on
+ * its side).
+ */
+ tuple_ref(tuple);
+
+ *out = tuple;
+ ++source->cur;
+ return 0;
+}
+
+/* }}} */
+
+static struct key_part_def key_part_unsigned = {
+ .fieldno = 0,
+ .type = FIELD_TYPE_UNSIGNED,
+ .coll_id = COLL_NONE,
+ .is_nullable = false,
+ .nullable_action = ON_CONFLICT_ACTION_DEFAULT,
+ .sort_order = SORT_ORDER_ASC,
+ .path = NULL,
+};
+
+static struct key_part_def key_part_integer = {
+ .fieldno = 0,
+ .type = FIELD_TYPE_INTEGER,
+ .coll_id = COLL_NONE,
+ .is_nullable = false,
+ .nullable_action = ON_CONFLICT_ACTION_DEFAULT,
+ .sort_order = SORT_ORDER_ASC,
+ .path = NULL,
+};
+
+uint32_t
+min_u32(uint32_t a, uint32_t b)
+{
+ return a < b ? a : b;
+}
+
+void
+check_tuple(struct tuple *tuple, struct tuple_format *format,
+ const char *exp_data, uint32_t exp_data_len, const char *case_name)
+{
+ uint32_t size;
+ const char *data = tuple_data_range(tuple, &size);
+
+ ok(tuple != NULL, "%s: tuple != NULL", case_name);
+ if (format == NULL) {
+ ok(true, "%s: skip tuple validation", case_name);
+ } else {
+ int rc = tuple_validate(format, tuple);
+ is(rc, 0, "%s: validate tuple", case_name);
+ }
+ is(size, exp_data_len, "%s: check tuple size", case_name);
+ ok(!strncmp(data, exp_data, min_u32(size, exp_data_len)),
+ "%s: check tuple data", case_name);
+}
+
+/**
+ * Check array source itself (just in case).
+ */
+int
+test_array_source(struct tuple_format *format)
+{
+ plan(9);
+ header();
+
+ /* [1], [3] */
+ const uint32_t exp_tuple_size = 2;
+ const uint32_t exp_tuple_count = 2;
+ static const char *exp_tuples_data[] = {"\x91\x01", "\x91\x03"};
+
+ struct merge_source *source = merge_source_array_new(false);
+ assert(source != NULL);
+
+ struct tuple *tuple = NULL;
+ const char *msg = format == NULL ?
+ "array source next() (any format)" :
+ "array source next() (user's format)";
+ for (uint32_t i = 0; i < exp_tuple_count; ++i) {
+ int rc = merge_source_next(source, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size,
+ msg);
+ tuple_unref(tuple);
+ }
+ int rc = merge_source_next(source, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ is(tuple, NULL, format == NULL ?
+ "array source is empty (any format)" :
+ "array source is empty (user's format)");
+
+ merge_source_unref(source);
+
+ footer();
+ return check_plan();
+}
+
+int
+test_merger(struct tuple_format *format)
+{
+ plan(17);
+ header();
+
+ /* [1], [2], [3], [4] */
+ const uint32_t exp_tuple_size = 2;
+ const uint32_t exp_tuple_count = 4;
+ static const char *exp_tuples_data[] = {
+ "\x91\x01", "\x91\x02", "\x91\x03", "\x91\x04",
+ };
+
+ const uint32_t source_count = 2;
+ struct merge_source *sources[] = {
+ merge_source_array_new(false),
+ merge_source_array_new(true),
+ };
+
+ struct key_def *key_def = key_def_new(&key_part_unsigned, 1);
+ struct merge_source *merger = merger_new(key_def, sources, source_count,
+ false);
+ key_def_delete(key_def);
+
+ struct tuple *tuple = NULL;
+ const char *msg = format == NULL ?
+ "merger next() (any format)" :
+ "merger next() (user's format)";
+ for (uint32_t i = 0; i < exp_tuple_count; ++i) {
+ int rc = merge_source_next(merger, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size,
+ msg);
+ tuple_unref(tuple);
+ }
+ int rc = merge_source_next(merger, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ is(tuple, NULL, format == NULL ?
+ "merger is empty (any format)" :
+ "merger is empty (user's format)");
+
+ merge_source_unref(merger);
+ merge_source_unref(sources[0]);
+ merge_source_unref(sources[1]);
+
+ footer();
+ return check_plan();
+}
+
+int
+test_basic()
+{
+ plan(4);
+ header();
+
+ struct key_def *key_def = key_def_new(&key_part_integer, 1);
+ struct tuple_format *format = box_tuple_format_new(&key_def, 1);
+ assert(format != NULL);
+
+ test_array_source(NULL);
+ test_array_source(format);
+ test_merger(NULL);
+ test_merger(format);
+
+ key_def_delete(key_def);
+ tuple_format_unref(format);
+
+ footer();
+ return check_plan();
+}
+
+int
+main()
+{
+ memory_init();
+ fiber_init(fiber_c_invoke);
+ tuple_init(NULL);
+
+ int rc = test_basic();
+
+ tuple_free();
+ fiber_free();
+ memory_free();
+
+ return rc;
+}
--
2.21.0
^ permalink raw reply [flat|nested] 9+ messages in thread
* [PATCH v4 4/4] Add merger for tuple streams (Lua part)
2019-05-07 22:30 [PATCH v4 0/4] Merger Alexander Turenko
` (2 preceding siblings ...)
2019-05-07 22:30 ` [PATCH v4 3/4] Add merger for tuples streams (C part) Alexander Turenko
@ 2019-05-07 22:30 ` Alexander Turenko
2019-05-13 14:49 ` Vladimir Davydov
3 siblings, 1 reply; 9+ messages in thread
From: Alexander Turenko @ 2019-05-07 22:30 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches
Fixes #3276.
@TarantoolBot document
Title: Merger for tuple streams
The main concept of the merger is a source. It is an object that
provides a stream of tuples. There are four types of sources: a tuple
source, a table source, a buffer source and a merger itself.
A tuple source just return one tuple. However this source (as well as a
table and a buffer ones) supports fetching of a next data chunk, so the
API allows to create it from a Lua iterator:
`merger.new_tuple_source(gen, param, state)`. A `gen` function should
return `state, tuple` on each call and then return `nil` when no more
tuples available. Consider the example:
```lua
box.cfg({})
box.schema.space.create('s')
box.space.s:create_index('pk')
box.space.s:insert({1})
box.space.s:insert({2})
box.space.s:insert({3})
s = merger.new_tuple_source(box.space.s:pairs())
s:select()
---
- - [1]
- [2]
- [3]
...
s = merger.new_tuple_source(box.space.s:pairs())
s:pairs():totable()
---
- - [1]
- [2]
- [3]
...
```
As we see a source (it is common for all sources) has `:select()` and
`:pairs()` methods. The first one has two options: `buffer` and `limit`
with the same meaning as ones in net.box `:select()`. The `:pairs()`
method (or `:ipairs()` alias) returns a luafun iterator (it is a Lua
iterator, but also provides a set of handy methods to operate in
functional style).
The same API exists to create a table and a buffer source:
`merger.new_table_source(gen, param, state)` and
`merger.new_buffer_source(gen, param, state)`. A `gen` function should
return a table or a buffer on each call.
There are also helpers that are useful when all data are available at
once: `merger.new_source_fromtable(tbl)` and
`merger.new_source_frombuffer(buf)`.
A merger is a special kind of a source, which is created from a key_def
object and a set of sources. It performs a kind of the merge sort:
chooses a source with a minimal / maximal tuple on each step, consumes
a tuple from this source and repeats. The API to create a merger is the
following:
```lua
local key_def_lib = require('key_def')
local merger = require('merger')
local key_def = key_def_lib.new(<...>)
local sources = {<...>}
local merger_inst = merger.new(key_def, sources, {
-- Ascending (false) or descending (true) order.
-- Default is ascending.
reverse = <boolean> or <nil>,
})
```
An instance of a merger has the same `:select()` and `:pairs()` methods
as any other source.
The `key_def_lib.new()` function takes a table of key parts as an
argument in the same format as box.space.<...>.index.<...>.parts or
conn.space.<...>.index.<...>.parts (where conn is a net.box connection):
```
local key_parts = {
{
fieldno = <number>,
type = <string>,
[ is_nullable = <boolean>, ]
[ collation_id = <number>, ]
[ collation = <string>, ]
},
...
}
local key_def = key_def_lib.new(key_parts)
```
A key_def can be cached across requests with the same ordering rules
(typically requests to a same space).
---
src/box/CMakeLists.txt | 2 +
src/box/lua/init.c | 7 +-
src/box/lua/merger.c | 1143 ++++++++++++++++++++++++++++++++++
src/box/lua/merger.h | 47 ++
src/box/lua/merger.lua | 41 ++
test/box-tap/merger.test.lua | 768 +++++++++++++++++++++++
6 files changed, 2007 insertions(+), 1 deletion(-)
create mode 100644 src/box/lua/merger.c
create mode 100644 src/box/lua/merger.h
create mode 100644 src/box/lua/merger.lua
create mode 100755 test/box-tap/merger.test.lua
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index ce328fb95..0864c3433 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -13,6 +13,7 @@ lua_source(lua_sources lua/upgrade.lua)
lua_source(lua_sources lua/console.lua)
lua_source(lua_sources lua/xlog.lua)
lua_source(lua_sources lua/key_def.lua)
+lua_source(lua_sources lua/merger.lua)
set(bin_sources)
bin_source(bin_sources bootstrap.snap bootstrap.h)
@@ -144,6 +145,7 @@ add_library(box STATIC
lua/xlog.c
lua/execute.c
lua/key_def.c
+ lua/merger.c
${bin_sources})
target_link_libraries(box box_error tuple stat xrow xlog vclock crc32 scramble
diff --git a/src/box/lua/init.c b/src/box/lua/init.c
index 6b8be5096..76b987b4b 100644
--- a/src/box/lua/init.c
+++ b/src/box/lua/init.c
@@ -60,6 +60,7 @@
#include "box/lua/tuple.h"
#include "box/lua/execute.h"
#include "box/lua/key_def.h"
+#include "box/lua/merger.h"
extern char session_lua[],
tuple_lua[],
@@ -70,7 +71,8 @@ extern char session_lua[],
feedback_daemon_lua[],
net_box_lua[],
upgrade_lua[],
- console_lua[];
+ console_lua[],
+ merger_lua[];
static const char *lua_sources[] = {
"box/session", session_lua,
@@ -83,6 +85,7 @@ static const char *lua_sources[] = {
"box/load_cfg", load_cfg_lua,
"box/xlog", xlog_lua,
"box/key_def", key_def_lua,
+ "box/merger", merger_lua,
NULL
};
@@ -317,6 +320,8 @@ box_lua_init(struct lua_State *L)
lua_pop(L, 1);
luaopen_key_def(L);
lua_pop(L, 1);
+ luaopen_merger(L);
+ lua_pop(L, 1);
/* Load Lua extension */
for (const char **s = lua_sources; *s; s += 2) {
diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c
new file mode 100644
index 000000000..a61adb389
--- /dev/null
+++ b/src/box/lua/merger.c
@@ -0,0 +1,1143 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "box/lua/merger.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <lua.h> /* lua_*() */
+#include <lauxlib.h> /* luaL_*() */
+
+#include "fiber.h" /* fiber() */
+#include "diag.h" /* diag_set() */
+
+#include "box/tuple.h" /* tuple_format_runtime,
+ tuple_*(), ... */
+
+#include "lua/error.h" /* luaT_error() */
+#include "lua/utils.h" /* luaL_pushcdata(),
+ luaL_iterator_*() */
+
+#include "box/lua/key_def.h" /* luaT_check_key_def() */
+#include "box/lua/tuple.h" /* luaT_tuple_new() */
+
+#include "small/ibuf.h" /* struct ibuf */
+#include "msgpuck.h" /* mp_*() */
+
+#include "box/merger.h" /* merge_source_*, merger_*() */
+
+static uint32_t CTID_STRUCT_MERGE_SOURCE_REF = 0;
+static uint32_t CTID_STRUCT_IBUF = 0;
+
+/**
+ * A type of a function to create a source from a Lua iterator on
+ * a Lua stack.
+ *
+ * Such function is to be passed to lbox_merge_source_new() as
+ * a parameter.
+ */
+typedef struct merge_source *(*luaL_merge_source_new_f)(struct lua_State *L);
+
+/* {{{ Helpers */
+
+/**
+ * Extract an ibuf object from the Lua stack.
+ */
+static struct ibuf *
+luaT_check_ibuf(struct lua_State *L, int idx)
+{
+ if (lua_type(L, idx) != LUA_TCDATA)
+ return NULL;
+
+ uint32_t cdata_type;
+ struct ibuf *ibuf_ptr = luaL_checkcdata(L, idx, &cdata_type);
+ if (ibuf_ptr == NULL || cdata_type != CTID_STRUCT_IBUF)
+ return NULL;
+ return ibuf_ptr;
+}
+
+/**
+ * Extract a merge source from the Lua stack.
+ */
+static struct merge_source *
+luaT_check_merge_source(struct lua_State *L, int idx)
+{
+ uint32_t cdata_type;
+ struct merge_source **source_ptr = luaL_checkcdata(L, idx, &cdata_type);
+ if (source_ptr == NULL || cdata_type != CTID_STRUCT_MERGE_SOURCE_REF)
+ return NULL;
+ return *source_ptr;
+}
+
+/**
+ * Skip an array around tuples and save its length.
+ */
+static int
+decode_header(struct ibuf *buf, size_t *len_p)
+{
+ /* Check the buffer is correct. */
+ if (buf->rpos > buf->wpos)
+ return -1;
+
+ /* Skip decoding if the buffer is empty. */
+ if (ibuf_used(buf) == 0) {
+ *len_p = 0;
+ return 0;
+ }
+
+ /* Check and skip the array around tuples. */
+ int ok = mp_typeof(*buf->rpos) == MP_ARRAY;
+ if (ok)
+ ok = mp_check_array(buf->rpos, buf->wpos) <= 0;
+ if (ok)
+ *len_p = mp_decode_array((const char **) &buf->rpos);
+ return ok ? 0 : -1;
+}
+
+/**
+ * Encode an array around tuples.
+ */
+static void
+encode_header(struct ibuf *output_buffer, uint32_t result_len)
+{
+ ibuf_reserve(output_buffer, mp_sizeof_array(result_len));
+ output_buffer->wpos = mp_encode_array(output_buffer->wpos, result_len);
+}
+
+/**
+ * Get a tuple from a Lua stack.
+ *
+ * If a Lua table is on a specified index, create a tuple with
+ * provided format and return. If format is NULL use the runtime
+ * format.
+ *
+ * If a tuple is on a specified index, validate it against
+ * provided format (if it is not NULL) and return.
+ *
+ * In case of an error return NULL and set a diag.
+ */
+static struct tuple *
+luaT_gettuple(struct lua_State *L, int idx, struct tuple_format *format)
+{
+ struct tuple *tuple = luaT_istuple(L, idx);
+ if (tuple == NULL) {
+ /* Create a tuple from a Lua table. */
+ if (format == NULL)
+ format = tuple_format_runtime;
+ if ((tuple = luaT_tuple_new(L, idx, format)) == NULL)
+ return NULL;
+ } else {
+ /* Validate a tuple. */
+ if (format != NULL && tuple_validate(format, tuple) != 0)
+ return NULL;
+ }
+ return tuple;
+}
+
+/* }}} */
+
+/* {{{ Create, destroy structures from Lua */
+
+/**
+ * Free a merge source from a Lua code.
+ */
+static int
+lbox_merge_source_gc(struct lua_State *L)
+{
+ struct merge_source *source = luaT_check_merge_source(L, 1);
+ assert(source != NULL);
+ merge_source_unref(source);
+ return 0;
+}
+
+/**
+ * Create a new source from a Lua iterator and push it onto the
+ * Lua stack.
+ *
+ * It is the helper for lbox_merger_new_buffer_source(),
+ * lbox_merger_new_table_source() and
+ * lbox_merger_new_tuple_source().
+ */
+static int
+lbox_merge_source_new(struct lua_State *L, const char *func_name,
+ luaL_merge_source_new_f luaL_merge_source_new)
+{
+ int top = lua_gettop(L);
+ if (top < 1 || top > 3 || !luaL_iscallable(L, 1))
+ return luaL_error(L, "Usage: %s(gen, param, state)", func_name);
+
+ /*
+ * luaL_merge_source_new() reads exactly three top values.
+ */
+ while (lua_gettop(L) < 3)
+ lua_pushnil(L);
+
+ struct merge_source *source = luaL_merge_source_new(L);
+ if (source == NULL) {
+ merge_source_unref(source);
+ return luaT_error(L);
+ }
+ *(struct merge_source **)
+ luaL_pushcdata(L, CTID_STRUCT_MERGE_SOURCE_REF) = source;
+ lua_pushcfunction(L, lbox_merge_source_gc);
+ luaL_setcdatagc(L, -2);
+
+ return 1;
+}
+
+/**
+ * Raise a Lua error with merger.new() usage info.
+ */
+static int
+lbox_merger_new_usage(struct lua_State *L, const char *param_name)
+{
+ static const char *usage = "merger.new(key_def, "
+ "{source, source, ...}[, {"
+ "reverse = <boolean> or <nil>}])";
+ if (param_name == NULL)
+ return luaL_error(L, "Bad params, use: %s", usage);
+ else
+ return luaL_error(L, "Bad param \"%s\", use: %s", param_name,
+ usage);
+}
+
+/**
+ * Parse a second parameter of merger.new() into an array of
+ * sources.
+ *
+ * Return an array of pointers to sources and set @a
+ * source_count_ptr. In case of an error set a diag and return
+ * NULL.
+ *
+ * It is the helper for lbox_merger_new().
+ */
+static struct merge_source **
+luaT_merger_new_parse_sources(struct lua_State *L, int idx,
+ uint32_t *source_count_ptr)
+{
+ /* Allocate sources array. */
+ uint32_t source_count = lua_objlen(L, idx);
+ const size_t sources_size = sizeof(struct merge_source *) *
+ source_count;
+ struct merge_source **sources = malloc(sources_size);
+ if (sources == NULL) {
+ diag_set(OutOfMemory, sources_size, "malloc", "sources");
+ return NULL;
+ }
+
+ /* Save all sources. */
+ for (uint32_t i = 0; i < source_count; ++i) {
+ lua_pushinteger(L, i + 1);
+ lua_gettable(L, idx);
+
+ /* Extract a source from a Lua stack. */
+ struct merge_source *source = luaT_check_merge_source(L, -1);
+ if (source == NULL) {
+ free(sources);
+ diag_set(IllegalParams,
+ "Unknown source type at index %d", i + 1);
+ return NULL;
+ }
+ sources[i] = source;
+ }
+ lua_pop(L, source_count);
+
+ *source_count_ptr = source_count;
+ return sources;
+}
+
+/**
+ * Create a new merger and push it to a Lua stack as a merge
+ * source.
+ *
+ * Expect cdata<struct key_def>, a table of sources and
+ * (optionally) a table of options on a Lua stack.
+ */
+static int
+lbox_merger_new(struct lua_State *L)
+{
+ struct key_def *key_def;
+ int top = lua_gettop(L);
+ bool ok = (top == 2 || top == 3) &&
+ /* key_def. */
+ (key_def = luaT_check_key_def(L, 1)) != NULL &&
+ /* Sources. */
+ lua_istable(L, 2) == 1 &&
+ /* Opts. */
+ (lua_isnoneornil(L, 3) == 1 || lua_istable(L, 3) == 1);
+ if (!ok)
+ return lbox_merger_new_usage(L, NULL);
+
+ /* Options. */
+ bool reverse = false;
+
+ /* Parse options. */
+ if (!lua_isnoneornil(L, 3)) {
+ /* Parse reverse. */
+ lua_pushstring(L, "reverse");
+ lua_gettable(L, 3);
+ if (!lua_isnil(L, -1)) {
+ if (lua_isboolean(L, -1))
+ reverse = lua_toboolean(L, -1);
+ else
+ return lbox_merger_new_usage(L, "reverse");
+ }
+ lua_pop(L, 1);
+ }
+
+ uint32_t source_count = 0;
+ struct merge_source **sources = luaT_merger_new_parse_sources(L, 2,
+ &source_count);
+ if (sources == NULL)
+ return luaT_error(L);
+
+ struct merge_source *merger = merger_new(key_def, sources, source_count,
+ reverse);
+ free(sources);
+ if (merger == NULL) {
+ merge_source_unref(merger);
+ return luaT_error(L);
+ }
+
+ *(struct merge_source **)
+ luaL_pushcdata(L, CTID_STRUCT_MERGE_SOURCE_REF) = merger;
+ lua_pushcfunction(L, lbox_merge_source_gc);
+ luaL_setcdatagc(L, -2);
+
+ return 1;
+}
+
+/* }}} */
+
+/* {{{ Buffer merge source */
+
+struct merge_source_buffer {
+ struct merge_source base;
+ /*
+ * A reference to a Lua iterator to fetch a next chunk of
+ * tuples.
+ */
+ struct luaL_iterator *fetch_it;
+ /*
+ * A reference to a buffer storing the current chunk of
+ * tuples. It is needed to prevent LuaJIT from collecting
+ * the buffer while the source consider it as the current
+ * one.
+ */
+ int ref;
+ /*
+ * A buffer with a current chunk of tuples.
+ */
+ struct ibuf *buf;
+ /*
+ * A merger stops before end of a buffer when it is not
+ * the last merger in the chain.
+ */
+ size_t remaining_tuple_count;
+};
+
+/* Virtual methods declarations */
+
+static void
+luaL_merge_source_buffer_destroy(struct merge_source *base);
+static int
+luaL_merge_source_buffer_next(struct merge_source *base,
+ struct tuple_format *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Create a new merge source of the buffer type.
+ *
+ * Reads gen, param, state from the top of a Lua stack.
+ *
+ * In case of an error it returns NULL and sets a diag.
+ */
+static struct merge_source *
+luaL_merge_source_buffer_new(struct lua_State *L)
+{
+ static struct merge_source_vtab merge_source_buffer_vtab = {
+ .destroy = luaL_merge_source_buffer_destroy,
+ .next = luaL_merge_source_buffer_next,
+ };
+
+ struct merge_source_buffer *source = malloc(
+ sizeof(struct merge_source_buffer));
+ if (source == NULL) {
+ diag_set(OutOfMemory, sizeof(struct merge_source_buffer),
+ "malloc", "merge_source_buffer");
+ return NULL;
+ }
+
+ merge_source_create(&source->base, &merge_source_buffer_vtab);
+
+ source->fetch_it = luaL_iterator_new(L, 0);
+ source->ref = 0;
+ source->buf = NULL;
+ source->remaining_tuple_count = 0;
+
+ return &source->base;
+}
+
+/**
+ * Call a user provided function to get a next data chunk (a
+ * buffer).
+ *
+ * Return 1 when a new buffer is received, 0 when a buffers
+ * iterator ends and -1 at error and set a diag.
+ */
+static int
+luaL_merge_source_buffer_fetch(struct merge_source_buffer *source)
+{
+ struct lua_State *L = fiber()->storage.lua.stack;
+ int nresult = luaL_iterator_next(L, source->fetch_it);
+
+ /* Handle a Lua error in a gen function. */
+ if (nresult == -1)
+ return -1;
+
+ /* No more data: do nothing. */
+ if (nresult == 0)
+ return 0;
+
+ /* Handle incorrect results count. */
+ if (nresult != 2) {
+ diag_set(IllegalParams, "Expected <state>, <buffer>, got %d "
+ "return values", nresult);
+ return -1;
+ }
+
+ /* Set a new buffer as the current chunk. */
+ if (source->ref > 0) {
+ luaL_unref(L, LUA_REGISTRYINDEX, source->ref);
+ source->ref = 0;
+ }
+ lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */
+ source->buf = luaT_check_ibuf(L, -1);
+ if (source->buf == NULL) {
+ diag_set(IllegalParams, "Expected <state>, <buffer>");
+ return -1;
+ }
+ source->ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ lua_pop(L, nresult);
+
+ /* Update remaining_tuple_count and skip the header. */
+ if (decode_header(source->buf, &source->remaining_tuple_count) != 0) {
+ diag_set(IllegalParams, "Invalid merge source %p",
+ &source->base);
+ return -1;
+ }
+ return 1;
+}
+
+/* Virtual methods */
+
+static void
+luaL_merge_source_buffer_destroy(struct merge_source *base)
+{
+ struct merge_source_buffer *source = container_of(base,
+ struct merge_source_buffer, base);
+
+ assert(source->fetch_it != NULL);
+ luaL_iterator_delete(source->fetch_it);
+ if (source->ref > 0)
+ luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref);
+
+ free(source);
+}
+
+static int
+luaL_merge_source_buffer_next(struct merge_source *base,
+ struct tuple_format *format,
+ struct tuple **out)
+{
+ struct merge_source_buffer *source = container_of(base,
+ struct merge_source_buffer, base);
+
+ /*
+ * Handle the case when all data were processed: ask a
+ * next chunk until a non-empty chunk is received or a
+ * chunks iterator ends.
+ */
+ while (source->remaining_tuple_count == 0) {
+ int rc = luaL_merge_source_buffer_fetch(source);
+ if (rc < 0)
+ return -1;
+ if (rc == 0) {
+ *out = NULL;
+ return 0;
+ }
+ }
+ if (ibuf_used(source->buf) == 0) {
+ diag_set(IllegalParams, "Unexpected msgpack buffer end");
+ return -1;
+ }
+ const char *tuple_beg = source->buf->rpos;
+ const char *tuple_end = tuple_beg;
+ if (mp_check(&tuple_end, source->buf->wpos) != 0) {
+ diag_set(IllegalParams, "Unexpected msgpack buffer end");
+ return -1;
+ }
+ --source->remaining_tuple_count;
+ source->buf->rpos = (char *) tuple_end;
+ if (format == NULL)
+ format = tuple_format_runtime;
+ struct tuple *tuple = tuple_new(format, tuple_beg, tuple_end);
+ if (tuple == NULL)
+ return -1;
+
+ tuple_ref(tuple);
+ *out = tuple;
+ return 0;
+}
+
+/* Lua functions */
+
+/**
+ * Create a new buffer source and push it onto the Lua stack.
+ */
+static int
+lbox_merger_new_buffer_source(struct lua_State *L)
+{
+ return lbox_merge_source_new(L, "merger.new_buffer_source",
+ luaL_merge_source_buffer_new);
+}
+
+/* }}} */
+
+/* {{{ Table merge source */
+
+struct merge_source_table {
+ struct merge_source base;
+ /*
+ * A reference to a Lua iterator to fetch a next chunk of
+ * tuples.
+ */
+ struct luaL_iterator *fetch_it;
+ /*
+ * A reference to a table with a current chunk of tuples.
+ */
+ int ref;
+ /* An index of current tuples within a current chunk. */
+ int next_idx;
+};
+
+/* Virtual methods declarations */
+
+static void
+luaL_merge_source_table_destroy(struct merge_source *base);
+static int
+luaL_merge_source_table_next(struct merge_source *base,
+ struct tuple_format *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Create a new merge source of the table type.
+ *
+ * In case of an error it returns NULL and set a diag.
+ */
+static struct merge_source *
+luaL_merge_source_table_new(struct lua_State *L)
+{
+ static struct merge_source_vtab merge_source_table_vtab = {
+ .destroy = luaL_merge_source_table_destroy,
+ .next = luaL_merge_source_table_next,
+ };
+
+ struct merge_source_table *source = malloc(
+ sizeof(struct merge_source_table));
+ if (source == NULL) {
+ diag_set(OutOfMemory, sizeof(struct merge_source_table),
+ "malloc", "merge_source_table");
+ return NULL;
+ }
+
+ merge_source_create(&source->base, &merge_source_table_vtab);
+
+ source->fetch_it = luaL_iterator_new(L, 0);
+ source->ref = 0;
+ source->next_idx = 1;
+
+ return &source->base;
+}
+
+/**
+ * Call a user provided function to fill the source.
+ *
+ * Return 0 when a tables iterator ends, 1 when a new table is
+ * received and -1 at an error (set a diag).
+ */
+static int
+luaL_merge_source_table_fetch(struct merge_source_table *source)
+{
+ struct lua_State *L = fiber()->storage.lua.stack;
+ int nresult = luaL_iterator_next(L, source->fetch_it);
+
+ /* Handle a Lua error in a gen function. */
+ if (nresult == -1)
+ return -1;
+
+ /* No more data: do nothing. */
+ if (nresult == 0)
+ return 0;
+
+ /* Handle incorrect results count. */
+ if (nresult != 2) {
+ diag_set(IllegalParams, "Expected <state>, <table>, got %d "
+ "return values", nresult);
+ return -1;
+ }
+
+ /* Set a new table as the current chunk. */
+ if (source->ref > 0) {
+ luaL_unref(L, LUA_REGISTRYINDEX, source->ref);
+ source->ref = 0;
+ }
+ lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */
+ if (lua_istable(L, -1) == 0) {
+ diag_set(IllegalParams, "Expected <state>, <table>");
+ return -1;
+ }
+ source->ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ source->next_idx = 1;
+ lua_pop(L, nresult);
+
+ return 1;
+}
+
+/* Virtual methods */
+
+static void
+luaL_merge_source_table_destroy(struct merge_source *base)
+{
+ struct merge_source_table *source = container_of(base,
+ struct merge_source_table, base);
+
+ assert(source->fetch_it != NULL);
+ luaL_iterator_delete(source->fetch_it);
+ if (source->ref > 0)
+ luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref);
+
+ free(source);
+}
+
+static int
+luaL_merge_source_table_next(struct merge_source *base,
+ struct tuple_format *format,
+ struct tuple **out)
+{
+ struct lua_State *L = fiber()->storage.lua.stack;
+ struct merge_source_table *source = container_of(base,
+ struct merge_source_table, base);
+
+ if (source->ref > 0) {
+ lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref);
+ lua_pushinteger(L, source->next_idx);
+ lua_gettable(L, -2);
+ }
+ /*
+ * If all data were processed, try to fetch more.
+ */
+ while (source->ref == 0 || lua_isnil(L, -1)) {
+ if (source->ref > 0)
+ lua_pop(L, 2);
+ int rc = luaL_merge_source_table_fetch(source);
+ if (rc < 0)
+ return -1;
+ if (rc == 0) {
+ *out = NULL;
+ return 0;
+ }
+ /*
+ * Retry tuple extracting when a next table is
+ * received.
+ */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref);
+ lua_pushinteger(L, source->next_idx);
+ lua_gettable(L, -2);
+ }
+
+ struct tuple *tuple = luaT_gettuple(L, -1, format);
+ if (tuple == NULL)
+ return -1;
+
+ ++source->next_idx;
+ lua_pop(L, 2);
+
+ tuple_ref(tuple);
+ *out = tuple;
+ return 0;
+}
+
+/* Lua functions */
+
+/**
+ * Create a new table source and push it onto the Lua stack.
+ */
+static int
+lbox_merger_new_table_source(struct lua_State *L)
+{
+ return lbox_merge_source_new(L, "merger.new_table_source",
+ luaL_merge_source_table_new);
+}
+
+/* }}} */
+
+/* {{{ Tuple merge source */
+
+struct merge_source_tuple {
+ struct merge_source base;
+ /*
+ * A reference to a Lua iterator to fetch a next chunk of
+ * tuples.
+ */
+ struct luaL_iterator *fetch_it;
+};
+
+/* Virtual methods declarations */
+
+static void
+luaL_merge_source_tuple_destroy(struct merge_source *base);
+static int
+luaL_merge_source_tuple_next(struct merge_source *base,
+ struct tuple_format *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Create a new merge source of the tuple type.
+ *
+ * In case of an error it returns NULL and set a diag.
+ */
+static struct merge_source *
+luaL_merge_source_tuple_new(struct lua_State *L)
+{
+ static struct merge_source_vtab merge_source_tuple_vtab = {
+ .destroy = luaL_merge_source_tuple_destroy,
+ .next = luaL_merge_source_tuple_next,
+ };
+
+ struct merge_source_tuple *source = malloc(
+ sizeof(struct merge_source_tuple));
+ if (source == NULL) {
+ diag_set(OutOfMemory, sizeof(struct merge_source_tuple),
+ "malloc", "merge_source_tuple");
+ return NULL;
+ }
+
+ merge_source_create(&source->base, &merge_source_tuple_vtab);
+
+ source->fetch_it = luaL_iterator_new(L, 0);
+
+ return &source->base;
+}
+
+/**
+ * Call a user provided function to fill the source.
+ *
+ * This function does not check whether a user-provided value
+ * is a tuple. A called should check it on its side.
+ *
+ * Return 1 at success and push a resulting tuple to a the Lua
+ * stack.
+ * Return 0 when no more data.
+ * Return -1 at error (set a diag).
+ */
+static int
+luaL_merge_source_tuple_fetch(struct merge_source_tuple *source,
+ struct lua_State *L)
+{
+ int nresult = luaL_iterator_next(L, source->fetch_it);
+
+ /* Handle a Lua error in a gen function. */
+ if (nresult == -1)
+ return -1;
+
+ /* No more data: do nothing. */
+ if (nresult == 0)
+ return 0;
+
+ /* Handle incorrect results count. */
+ if (nresult != 2) {
+ diag_set(IllegalParams, "Expected <state>, <tuple> got %d "
+ "return values", nresult);
+ return -1;
+ }
+
+ /* Set a new tuple as the current chunk. */
+ lua_insert(L, -2); /* Swap state and tuple. */
+ lua_pop(L, 1); /* Pop state. */
+
+ return 1;
+}
+
+/* Virtual methods */
+
+static void
+luaL_merge_source_tuple_destroy(struct merge_source *base)
+{
+ struct merge_source_tuple *source = container_of(base,
+ struct merge_source_tuple, base);
+
+ assert(source->fetch_it != NULL);
+ luaL_iterator_delete(source->fetch_it);
+
+ free(source);
+}
+
+static int
+luaL_merge_source_tuple_next(struct merge_source *base,
+ struct tuple_format *format,
+ struct tuple **out)
+{
+ struct lua_State *L = fiber()->storage.lua.stack;
+ struct merge_source_tuple *source = container_of(base,
+ struct merge_source_tuple, base);
+
+ int rc = luaL_merge_source_tuple_fetch(source, L);
+ if (rc < 0)
+ return -1;
+ /*
+ * Check whether a tuple appears after the fetch.
+ */
+ if (rc == 0) {
+ *out = NULL;
+ return 0;
+ }
+
+ struct tuple *tuple = luaT_gettuple(L, -1, format);
+ if (tuple == NULL)
+ return -1;
+
+ lua_pop(L, 1);
+ tuple_ref(tuple);
+ *out = tuple;
+ return 0;
+}
+
+/* Lua functions */
+
+/**
+ * Create a new tuple source and push it onto the Lua stack.
+ */
+static int
+lbox_merger_new_tuple_source(struct lua_State *L)
+{
+ return lbox_merge_source_new(L, "merger.new_tuple_source",
+ luaL_merge_source_tuple_new);
+}
+
+/* }}} */
+
+/* {{{ Merge source Lua methods */
+
+/**
+ * Iterator gen function to traverse source results.
+ *
+ * Expected a nil as the first parameter (param) and a
+ * merge_source as the second parameter (state) on a Lua stack.
+ *
+ * Push the original merge_source (as a new state) and a next
+ * tuple.
+ */
+static int
+lbox_merge_source_gen(struct lua_State *L)
+{
+ struct merge_source *source;
+ bool ok = lua_gettop(L) == 2 && lua_isnil(L, 1) &&
+ (source = luaT_check_merge_source(L, 2)) != NULL;
+ if (!ok)
+ return luaL_error(L, "Bad params, use: lbox_merge_source_gen("
+ "nil, merge_source)");
+
+ struct tuple *tuple;
+ if (merge_source_next(source, NULL, &tuple) != 0)
+ return luaT_error(L);
+ if (tuple == NULL) {
+ lua_pushnil(L);
+ lua_pushnil(L);
+ return 2;
+ }
+
+ /* Push merge_source, tuple. */
+ *(struct merge_source **)
+ luaL_pushcdata(L, CTID_STRUCT_MERGE_SOURCE_REF) = source;
+ luaT_pushtuple(L, tuple);
+
+ /*
+ * luaT_pushtuple() references the tuple, so we
+ * unreference it on merger's side.
+ */
+ tuple_unref(tuple);
+
+ return 2;
+}
+
+/**
+ * Iterate over merge source results from Lua.
+ *
+ * Push three values to the Lua stack:
+ *
+ * 1. gen (lbox_merge_source_gen wrapped by fun.wrap());
+ * 2. param (nil);
+ * 3. state (merge_source).
+ */
+static int
+lbox_merge_source_ipairs(struct lua_State *L)
+{
+ struct merge_source *source;
+ bool ok = lua_gettop(L) == 1 &&
+ (source = luaT_check_merge_source(L, 1)) != NULL;
+ if (!ok)
+ return luaL_error(L, "Usage: merge_source:ipairs()");
+ /* Stack: merge_source. */
+
+ luaL_loadstring(L, "return require('fun').wrap");
+ lua_call(L, 0, 1);
+ lua_insert(L, -2); /* Swap merge_source and wrap. */
+ /* Stack: wrap, merge_source. */
+
+ lua_pushcfunction(L, lbox_merge_source_gen);
+ lua_insert(L, -2); /* Swap merge_source and gen. */
+ /* Stack: wrap, gen, merge_source. */
+
+ /*
+ * Push nil as an iterator param, because all needed state
+ * is in a merge source.
+ */
+ lua_pushnil(L);
+ /* Stack: wrap, gen, merge_source, nil. */
+
+ lua_insert(L, -2); /* Swap merge_source and nil. */
+ /* Stack: wrap, gen, nil, merge_source. */
+
+ /* Call fun.wrap(gen, nil, merge_source). */
+ lua_call(L, 3, 3);
+ return 3;
+}
+
+/**
+ * Write source results into ibuf.
+ *
+ * It is the helper for lbox_merge_source_select().
+ */
+static int
+encode_result_buffer(struct lua_State *L, struct merge_source *source,
+ struct ibuf *output_buffer, uint32_t limit)
+{
+ uint32_t result_len = 0;
+ uint32_t result_len_offset = 4;
+
+ /*
+ * Reserve maximum size for the array around resulting
+ * tuples to set it later.
+ */
+ encode_header(output_buffer, UINT32_MAX);
+
+ /* Fetch, merge and copy tuples to the buffer. */
+ struct tuple *tuple;
+ int rc = 0;
+ while (result_len < limit && (rc =
+ merge_source_next(source, NULL, &tuple)) == 0 &&
+ tuple != NULL) {
+ uint32_t bsize = tuple->bsize;
+ ibuf_reserve(output_buffer, bsize);
+ memcpy(output_buffer->wpos, tuple_data(tuple), bsize);
+ output_buffer->wpos += bsize;
+ result_len_offset += bsize;
+ ++result_len;
+
+ /* The received tuple is not more needed. */
+ tuple_unref(tuple);
+ }
+
+ if (rc != 0)
+ return luaT_error(L);
+
+ /* Write the real array size. */
+ mp_store_u32(output_buffer->wpos - result_len_offset, result_len);
+
+ return 0;
+}
+
+/**
+ * Write source results into a new Lua table.
+ *
+ * It is the helper for lbox_merge_source_select().
+ */
+static int
+create_result_table(struct lua_State *L, struct merge_source *source,
+ uint32_t limit)
+{
+ /* Create result table. */
+ lua_newtable(L);
+
+ uint32_t cur = 1;
+
+ /* Fetch, merge and save tuples to the table. */
+ struct tuple *tuple;
+ int rc = 0;
+ while (cur - 1 < limit && (rc =
+ merge_source_next(source, NULL, &tuple)) == 0 &&
+ tuple != NULL) {
+ luaT_pushtuple(L, tuple);
+ lua_rawseti(L, -2, cur);
+ ++cur;
+
+ /*
+ * luaT_pushtuple() references the tuple, so we
+ * unreference it on merger's side.
+ */
+ tuple_unref(tuple);
+ }
+
+ if (rc != 0)
+ return luaT_error(L);
+
+ return 1;
+}
+
+/**
+ * Raise a Lua error with merger_inst:select() usage info.
+ */
+static int
+lbox_merge_source_select_usage(struct lua_State *L, const char *param_name)
+{
+ static const char *usage = "merge_source:select([{"
+ "buffer = <cdata<struct ibuf>> or <nil>, "
+ "limit = <number> or <nil>}])";
+ if (param_name == NULL)
+ return luaL_error(L, "Bad params, use: %s", usage);
+ else
+ return luaL_error(L, "Bad param \"%s\", use: %s", param_name,
+ usage);
+}
+
+/**
+ * Pull results of a merge source to a Lua stack.
+ *
+ * Write results into a buffer or a Lua table depending on
+ * options.
+ *
+ * Expected a merge source and options (optional) on a Lua stack.
+ *
+ * Return a Lua table or nothing when a 'buffer' option is
+ * provided.
+ */
+static int
+lbox_merge_source_select(struct lua_State *L)
+{
+ struct merge_source *source;
+ int top = lua_gettop(L);
+ bool ok = (top == 1 || top == 2) &&
+ /* Merge source. */
+ (source = luaT_check_merge_source(L, 1)) != NULL &&
+ /* Opts. */
+ (lua_isnoneornil(L, 2) == 1 || lua_istable(L, 2) == 1);
+ if (!ok)
+ return lbox_merge_source_select_usage(L, NULL);
+
+ uint32_t limit = UINT32_MAX;
+ struct ibuf *output_buffer = NULL;
+
+ /* Parse options. */
+ if (!lua_isnoneornil(L, 2)) {
+ /* Parse buffer. */
+ lua_pushstring(L, "buffer");
+ lua_gettable(L, 2);
+ if (!lua_isnil(L, -1)) {
+ if ((output_buffer = luaT_check_ibuf(L, -1)) == NULL)
+ return lbox_merge_source_select_usage(L,
+ "buffer");
+ }
+ lua_pop(L, 1);
+
+ /* Parse limit. */
+ lua_pushstring(L, "limit");
+ lua_gettable(L, 2);
+ if (!lua_isnil(L, -1)) {
+ if (lua_isnumber(L, -1))
+ limit = lua_tointeger(L, -1);
+ else
+ return lbox_merge_source_select_usage(L,
+ "limit");
+ }
+ lua_pop(L, 1);
+ }
+
+ if (output_buffer == NULL)
+ return create_result_table(L, source, limit);
+ else
+ return encode_result_buffer(L, source, output_buffer, limit);
+}
+
+/* }}} */
+
+/**
+ * Register the module.
+ */
+LUA_API int
+luaopen_merger(struct lua_State *L)
+{
+ luaL_cdef(L, "struct merge_source;");
+ luaL_cdef(L, "struct ibuf;");
+
+ CTID_STRUCT_MERGE_SOURCE_REF = luaL_ctypeid(L, "struct merge_source&");
+ CTID_STRUCT_IBUF = luaL_ctypeid(L, "struct ibuf");
+
+ /* Export C functions to Lua. */
+ static const struct luaL_Reg meta[] = {
+ {"new_buffer_source", lbox_merger_new_buffer_source},
+ {"new_table_source", lbox_merger_new_table_source},
+ {"new_tuple_source", lbox_merger_new_tuple_source},
+ {"new", lbox_merger_new},
+ {NULL, NULL}
+ };
+ luaL_register_module(L, "merger", meta);
+
+ /* Add internal.{select,ipairs}(). */
+ lua_newtable(L); /* merger.internal */
+ lua_pushcfunction(L, lbox_merge_source_select);
+ lua_setfield(L, -2, "select");
+ lua_pushcfunction(L, lbox_merge_source_ipairs);
+ lua_setfield(L, -2, "ipairs");
+ lua_setfield(L, -2, "internal");
+
+ return 1;
+}
diff --git a/src/box/lua/merger.h b/src/box/lua/merger.h
new file mode 100644
index 000000000..c3f648678
--- /dev/null
+++ b/src/box/lua/merger.h
@@ -0,0 +1,47 @@
+#ifndef TARANTOOL_BOX_LUA_MERGER_H_INCLUDED
+#define TARANTOOL_BOX_LUA_MERGER_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct lua_State;
+
+int
+luaopen_merger(struct lua_State *L);
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_BOX_LUA_MERGER_H_INCLUDED */
diff --git a/src/box/lua/merger.lua b/src/box/lua/merger.lua
new file mode 100644
index 000000000..11abd1a25
--- /dev/null
+++ b/src/box/lua/merger.lua
@@ -0,0 +1,41 @@
+local ffi = require('ffi')
+local fun = require('fun')
+local merger = require('merger')
+
+local ibuf_t = ffi.typeof('struct ibuf')
+local merge_source_t = ffi.typeof('struct merge_source')
+
+-- Create a source from one buffer.
+merger.new_source_frombuffer = function(buf)
+ local func_name = 'merger.new_source_frombuffer'
+ if type(buf) ~= 'cdata' or not ffi.istype(ibuf_t, buf) then
+ error(('Usage: %s(<cdata<struct ibuf>>)'):format(func_name), 0)
+ end
+
+ return merger.new_buffer_source(fun.iter({buf}))
+end
+
+-- Create a source from one table.
+merger.new_source_fromtable = function(tbl)
+ local func_name = 'merger.new_source_fromtable'
+ if type(tbl) ~= 'table' then
+ error(('Usage: %s(<table>)'):format(func_name), 0)
+ end
+
+ return merger.new_table_source(fun.iter({tbl}))
+end
+
+local methods = {
+ ['select'] = merger.internal.select,
+ ['pairs'] = merger.internal.ipairs,
+ ['ipairs'] = merger.internal.ipairs,
+}
+
+ffi.metatype(merge_source_t, {
+ __index = function(self, key)
+ return methods[key]
+ end,
+ -- Lua 5.2 compatibility
+ __pairs = merger.internal.ipairs,
+ __ipairs = merger.internal.ipairs,
+})
diff --git a/test/box-tap/merger.test.lua b/test/box-tap/merger.test.lua
new file mode 100755
index 000000000..ee9eaeaed
--- /dev/null
+++ b/test/box-tap/merger.test.lua
@@ -0,0 +1,768 @@
+#!/usr/bin/env tarantool
+
+local tap = require('tap')
+local buffer = require('buffer')
+local msgpackffi = require('msgpackffi')
+local digest = require('digest')
+local key_def_lib = require('key_def')
+local merger = require('merger')
+local fiber = require('fiber')
+local utf8 = require('utf8')
+local ffi = require('ffi')
+local fun = require('fun')
+
+-- A chunk size for table and buffer sources. A chunk size for
+-- tuple source is always 1.
+local FETCH_BLOCK_SIZE = 10
+
+local function merger_new_usage(param)
+ local msg = 'merger.new(key_def, ' ..
+ '{source, source, ...}[, {' ..
+ 'reverse = <boolean> or <nil>}])'
+ if not param then
+ return ('Bad params, use: %s'):format(msg)
+ else
+ return ('Bad param "%s", use: %s'):format(param, msg)
+ end
+end
+
+local function merger_select_usage(param)
+ local msg = 'merge_source:select([{' ..
+ 'buffer = <cdata<struct ibuf>> or <nil>, ' ..
+ 'limit = <number> or <nil>}])'
+ if not param then
+ return ('Bad params, use: %s'):format(msg)
+ else
+ return ('Bad param "%s", use: %s'):format(param, msg)
+ end
+end
+
+-- Get buffer with data encoded without last 'trunc' bytes.
+local function truncated_msgpack_buffer(data, trunc)
+ local data = msgpackffi.encode(data)
+ data = data:sub(1, data:len() - trunc)
+ local len = data:len()
+ local buf = buffer.ibuf()
+ -- Ensure we have enough buffer to write len + trunc bytes.
+ buf:reserve(len + trunc)
+ local p = buf:alloc(len)
+ -- Ensure len bytes follows with trunc zero bytes.
+ ffi.copy(p, data .. string.rep('\0', trunc), len + trunc)
+ return buf
+end
+
+local function truncated_msgpack_source(data, trunc)
+ local buf = truncated_msgpack_buffer(data, trunc)
+ return merger.new_source_frombuffer(buf)
+end
+
+local bad_source_new_calls = {
+ {
+ 'Bad fetch iterator',
+ funcs = {'new_buffer_source', 'new_table_source',
+ 'new_tuple_source'},
+ params = {1},
+ exp_err = '^Usage: merger%.[a-z_]+%(gen, param, state%)$',
+ },
+ {
+ 'Bad chunk type',
+ funcs = {'new_source_frombuffer', 'new_source_fromtable'},
+ params = {1},
+ exp_err = '^Usage: merger%.[a-z_]+%(<.+>%)$',
+ },
+ {
+ 'Bad buffer chunk',
+ funcs = {'new_source_frombuffer'},
+ params = {ffi.new('char *')},
+ exp_err = '^Usage: merger%.[a-z_]+%(<cdata<struct ibuf>>%)$',
+ },
+}
+
+local bad_chunks = {
+ {
+ 'Bad buffer source chunk (not cdata)',
+ func = 'new_buffer_source',
+ chunk = 1,
+ exp_err = 'Expected <state>, <buffer>',
+ },
+ {
+ 'Bad buffer source chunk (wrong ctype)',
+ func = 'new_buffer_source',
+ chunk = ffi.new('char *'),
+ exp_err = 'Expected <state>, <buffer>',
+ },
+ {
+ 'Bad table source chunk',
+ func = 'new_table_source',
+ chunk = 1,
+ exp_err = 'Expected <state>, <table>',
+ },
+ {
+ 'Bad tuple source chunk (not cdata)',
+ func = 'new_tuple_source',
+ chunk = 1,
+ exp_err = 'A tuple or a table expected, got number',
+ },
+ {
+ 'Bad tuple source chunk (wrong ctype)',
+ func = 'new_tuple_source',
+ chunk = ffi.new('char *'),
+ exp_err = 'A tuple or a table expected, got cdata',
+ },
+}
+
+local bad_merger_new_calls = {
+ {
+ 'Bad opts',
+ sources = {},
+ opts = 1,
+ exp_err = merger_new_usage(nil),
+ },
+ {
+ 'Bad opts.reverse',
+ sources = {},
+ opts = {reverse = 1},
+ exp_err = merger_new_usage('reverse'),
+ },
+}
+
+local bad_merger_select_calls = {
+ {
+ 'Wrong source of table type',
+ sources = {merger.new_source_fromtable({1})},
+ opts = nil,
+ exp_err = 'A tuple or a table expected, got number',
+ },
+ {
+ 'Bad msgpack source: wrong length of the tuples array',
+ -- Remove the last tuple from msgpack data, but keep old
+ -- tuples array size.
+ sources = {
+ truncated_msgpack_source({{''}, {''}, {''}}, 2),
+ },
+ opts = {},
+ exp_err = 'Unexpected msgpack buffer end',
+ },
+ {
+ 'Bad msgpack source: wrong length of a tuple',
+ -- Remove half of the last tuple, but keep old tuple size.
+ sources = {
+ truncated_msgpack_source({{''}, {''}, {''}}, 1),
+ },
+ opts = {},
+ exp_err = 'Unexpected msgpack buffer end',
+ },
+ {
+ 'Bad opts.buffer (wrong type)',
+ sources = {},
+ opts = {buffer = 1},
+ exp_err = merger_select_usage('buffer'),
+ },
+ {
+ 'Bad opts.buffer (wrong cdata type)',
+ sources = {},
+ opts = {buffer = ffi.new('char *')},
+ exp_err = merger_select_usage('buffer'),
+ },
+ {
+ 'Bad opts.limit (wrong type)',
+ sources = {},
+ opts = {limit = 'hello'},
+ exp_err = merger_select_usage('limit'),
+ }
+}
+
+local schemas = {
+ {
+ name = 'small_unsigned',
+ parts = {
+ {
+ fieldno = 2,
+ type = 'unsigned',
+ }
+ },
+ gen_tuple = function(tupleno)
+ return {'id_' .. tostring(tupleno), tupleno}
+ end,
+ },
+ -- Test with N-1 equal parts and Nth different.
+ {
+ name = 'many_parts',
+ parts = (function()
+ local parts = {}
+ for i = 1, 16 do
+ parts[i] = {
+ fieldno = i,
+ type = 'unsigned',
+ }
+ end
+ return parts
+ end)(),
+ gen_tuple = function(tupleno)
+ local tuple = {}
+ -- 15 constant parts
+ for i = 1, 15 do
+ tuple[i] = i
+ end
+ -- 16th part is varying
+ tuple[16] = tupleno
+ return tuple
+ end,
+ -- reduce tuple count to decrease test run time
+ tuple_count = 16,
+ },
+ -- Test null value in nullable field of an index.
+ {
+ name = 'nullable',
+ parts = {
+ {
+ fieldno = 1,
+ type = 'unsigned',
+ },
+ {
+ fieldno = 2,
+ type = 'string',
+ is_nullable = true,
+ },
+ },
+ gen_tuple = function(i)
+ if i % 1 == 1 then
+ return {0, tostring(i)}
+ else
+ return {0, box.NULL}
+ end
+ end,
+ },
+ -- Test index part with 'collation_id' option (as in net.box's
+ -- response).
+ {
+ name = 'collation_id',
+ parts = {
+ {
+ fieldno = 1,
+ type = 'string',
+ collation_id = 2, -- unicode_ci
+ },
+ },
+ gen_tuple = function(i)
+ local letters = {'a', 'b', 'c', 'A', 'B', 'C'}
+ if i <= #letters then
+ return {letters[i]}
+ else
+ return {''}
+ end
+ end,
+ },
+ -- Test index part with 'collation' option (as in local index
+ -- parts).
+ {
+ name = 'collation',
+ parts = {
+ {
+ fieldno = 1,
+ type = 'string',
+ collation = 'unicode_ci',
+ },
+ },
+ gen_tuple = function(i)
+ local letters = {'a', 'b', 'c', 'A', 'B', 'C'}
+ if i <= #letters then
+ return {letters[i]}
+ else
+ return {''}
+ end
+ end,
+ },
+}
+
+local function is_unicode_ci_part(part)
+ return part.collation_id == 2 or part.collation == 'unicode_ci'
+end
+
+local function tuple_comparator(a, b, parts)
+ for _, part in ipairs(parts) do
+ local fieldno = part.fieldno
+ if a[fieldno] ~= b[fieldno] then
+ if a[fieldno] == nil then
+ return -1
+ end
+ if b[fieldno] == nil then
+ return 1
+ end
+ if is_unicode_ci_part(part) then
+ return utf8.casecmp(a[fieldno], b[fieldno])
+ end
+ return a[fieldno] < b[fieldno] and -1 or 1
+ end
+ end
+
+ return 0
+end
+
+local function sort_tuples(tuples, parts, opts)
+ local function tuple_comparator_wrapper(a, b)
+ local cmp = tuple_comparator(a, b, parts)
+ if cmp < 0 then
+ return not opts.reverse
+ elseif cmp > 0 then
+ return opts.reverse
+ else
+ return false
+ end
+ end
+
+ table.sort(tuples, tuple_comparator_wrapper)
+end
+
+local function lowercase_unicode_ci_fields(tuples, parts)
+ for i = 1, #tuples do
+ local tuple = tuples[i]
+ for _, part in ipairs(parts) do
+ if is_unicode_ci_part(part) then
+ -- Workaround #3709.
+ if tuple[part.fieldno]:len() > 0 then
+ tuple[part.fieldno] = utf8.lower(tuple[part.fieldno])
+ end
+ end
+ end
+ end
+end
+
+local function fetch_source_gen(param, state)
+ local input_type = param.input_type
+ local tuples = param.tuples
+ local last_pos = state.last_pos
+ local fetch_block_size = FETCH_BLOCK_SIZE
+ -- A chunk size is always 1 for a tuple source.
+ if input_type == 'tuple' then
+ fetch_block_size = 1
+ end
+ local data = fun.iter(tuples):drop(last_pos):take(
+ fetch_block_size):totable()
+ if #data == 0 then
+ return
+ end
+ local new_state = {last_pos = last_pos + #data}
+ if input_type == 'table' then
+ return new_state, data
+ elseif input_type == 'buffer' then
+ local buf = buffer.ibuf()
+ msgpackffi.internal.encode_r(buf, data, 0)
+ return new_state, buf
+ elseif input_type == 'tuple' then
+ assert(#data <= 1)
+ if #data == 0 then return end
+ return new_state, data[1]
+ else
+ assert(false)
+ end
+end
+
+local function fetch_source_iterator(input_type, tuples)
+ local param = {
+ input_type = input_type,
+ tuples = tuples,
+ }
+ local state = {
+ last_pos = 0,
+ }
+ return fetch_source_gen, param, state
+end
+
+local function prepare_data(schema, tuple_count, source_count, opts)
+ local opts = opts or {}
+ local input_type = opts.input_type
+ local use_table_as_tuple = opts.use_table_as_tuple
+ local use_fetch_source = opts.use_fetch_source
+
+ local tuples = {}
+ local exp_result = {}
+
+ -- Ensure empty sources are empty table and not nil.
+ for i = 1, source_count do
+ if tuples[i] == nil then
+ tuples[i] = {}
+ end
+ end
+
+ -- Prepare N tables with tuples as input for merger.
+ for i = 1, tuple_count do
+ -- [1, source_count]
+ local guava = digest.guava(i, source_count) + 1
+ local tuple = schema.gen_tuple(i)
+ table.insert(exp_result, tuple)
+ if not use_table_as_tuple then
+ assert(input_type ~= 'buffer')
+ tuple = box.tuple.new(tuple)
+ end
+ table.insert(tuples[guava], tuple)
+ end
+
+ -- Sort tuples within each source.
+ for _, source_tuples in pairs(tuples) do
+ sort_tuples(source_tuples, schema.parts, opts)
+ end
+
+ -- Sort expected result.
+ sort_tuples(exp_result, schema.parts, opts)
+
+ -- Fill sources.
+ local sources
+ if use_fetch_source then
+ sources = {}
+ for i = 1, source_count do
+ local func = ('new_%s_source'):format(input_type)
+ sources[i] = merger[func](fetch_source_iterator(input_type,
+ tuples[i]))
+ end
+ elseif input_type == 'table' then
+ -- Imitate netbox's select w/o {buffer = ...}.
+ sources = {}
+ for i = 1, source_count do
+ sources[i] = merger.new_source_fromtable(tuples[i])
+ end
+ elseif input_type == 'buffer' then
+ -- Imitate netbox's select with {buffer = ...}.
+ sources = {}
+ for i = 1, source_count do
+ local buf = buffer.ibuf()
+ sources[i] = merger.new_source_frombuffer(buf)
+ msgpackffi.internal.encode_r(buf, tuples[i], 0)
+ end
+ elseif input_type == 'tuple' then
+ assert(false)
+ else
+ assert(false)
+ end
+
+ return sources, exp_result
+end
+
+local function test_case_opts_str(opts)
+ local params = {}
+
+ if opts.input_type then
+ table.insert(params, 'input_type: ' .. opts.input_type)
+ end
+
+ if opts.output_type then
+ table.insert(params, 'output_type: ' .. opts.output_type)
+ end
+
+ if opts.reverse then
+ table.insert(params, 'reverse')
+ end
+
+ if opts.use_table_as_tuple then
+ table.insert(params, 'use_table_as_tuple')
+ end
+
+ if opts.use_fetch_source then
+ table.insert(params, 'use_fetch_source')
+ end
+
+ if next(params) == nil then
+ return ''
+ end
+
+ return (' (%s)'):format(table.concat(params, ', '))
+end
+
+local function run_merger(test, schema, tuple_count, source_count, opts)
+ fiber.yield()
+
+ local opts = opts or {}
+
+ -- Prepare data.
+ local sources, exp_result = prepare_data(schema, tuple_count, source_count,
+ opts)
+
+ -- Create a merger instance.
+ local merger_inst = merger.new(schema.key_def, sources,
+ {reverse = opts.reverse})
+
+ local res
+
+ -- Run merger and prepare output for compare.
+ if opts.output_type == 'table' then
+ -- Table output.
+ res = merger_inst:select()
+ elseif opts.output_type == 'buffer' then
+ -- Buffer output.
+ local output_buffer = buffer.ibuf()
+ merger_inst:select({buffer = output_buffer})
+ res = msgpackffi.decode(output_buffer.rpos)
+ else
+ -- Tuple output.
+ assert(opts.output_type == 'tuple')
+ res = merger_inst:pairs():totable()
+ end
+
+ -- A bit more postprocessing to compare.
+ for i = 1, #res do
+ if type(res[i]) ~= 'table' then
+ res[i] = res[i]:totable()
+ end
+ end
+
+ -- unicode_ci does not differentiate btw 'A' and 'a', so the
+ -- order is arbitrary. We transform fields with unicode_ci
+ -- collation in parts to lower case before comparing.
+ lowercase_unicode_ci_fields(res, schema.parts)
+ lowercase_unicode_ci_fields(exp_result, schema.parts)
+
+ test:is_deeply(res, exp_result,
+ ('check order on %3d tuples in %4d sources%s')
+ :format(tuple_count, source_count, test_case_opts_str(opts)))
+end
+
+local function run_case(test, schema, opts)
+ local opts = opts or {}
+
+ local case_name = ('testing on schema %s%s'):format(
+ schema.name, test_case_opts_str(opts))
+ local tuple_count = schema.tuple_count or 100
+
+ local input_type = opts.input_type
+ local use_table_as_tuple = opts.use_table_as_tuple
+ local use_fetch_source = opts.use_fetch_source
+
+ -- Skip meaningless flags combinations.
+ if input_type == 'buffer' and not use_table_as_tuple then
+ return
+ end
+ if input_type == 'tuple' and not use_fetch_source then
+ return
+ end
+
+ test:test(case_name, function(test)
+ test:plan(4)
+
+ -- Check with small buffer count.
+ run_merger(test, schema, tuple_count, 1, opts)
+ run_merger(test, schema, tuple_count, 2, opts)
+ run_merger(test, schema, tuple_count, 5, opts)
+
+ -- Check more buffers then tuple count.
+ run_merger(test, schema, tuple_count, 128, opts)
+ end)
+end
+
+local test = tap.test('merger')
+test:plan(#bad_source_new_calls + #bad_chunks + #bad_merger_new_calls +
+ #bad_merger_select_calls + 6 + #schemas * 48)
+
+-- For collations.
+box.cfg{}
+
+for _, case in ipairs(bad_source_new_calls) do
+ test:test(case[1], function(test)
+ local funcs = case.funcs
+ test:plan(#funcs)
+ for _, func in ipairs(funcs) do
+ local ok, err = pcall(merger[func], unpack(case.params))
+ test:ok(ok == false and err:match(case.exp_err), func)
+ end
+ end)
+end
+
+for _, case in ipairs(bad_chunks) do
+ local source = merger[case.func](function(_, state)
+ return state, case.chunk
+ end, {}, {})
+ local ok, err = pcall(function()
+ return source:pairs():take(1):totable()
+ end)
+ test:ok(ok == false and err:match(case.exp_err), case[1])
+end
+
+-- Create the key_def for the test cases below.
+local key_def = key_def_lib.new({{
+ fieldno = 1,
+ type = 'string',
+}})
+
+-- Bad merger.new() calls.
+for _, case in ipairs(bad_merger_new_calls) do
+ local ok, err = pcall(merger.new, key_def, case.sources, case.opts)
+ err = tostring(err) -- cdata -> string
+ test:is_deeply({ok, err}, {false, case.exp_err}, case[1])
+end
+
+-- Bad source or/and opts parameters for merger's methods.
+for _, case in ipairs(bad_merger_select_calls) do
+ local merger_inst = merger.new(key_def, case.sources)
+ local ok, err = pcall(merger_inst.select, merger_inst, case.opts)
+ err = tostring(err) -- cdata -> string
+ test:is_deeply({ok, err}, {false, case.exp_err}, case[1])
+end
+
+-- Create a key_def for each schema.
+for _, schema in ipairs(schemas) do
+ schema.key_def = key_def_lib.new(schema.parts)
+end
+
+test:test('use a source in two mergers', function(test)
+ test:plan(5)
+
+ local data = {{'a'}, {'b'}, {'c'}}
+ local source = merger.new_source_fromtable(data)
+ local i1 = merger.new(key_def, {source}):pairs()
+ local i2 = merger.new(key_def, {source}):pairs()
+
+ local t1 = i1:head():totable()
+ test:is_deeply(t1, data[1], 'tuple 1 from merger 1')
+
+ local t3 = i2:head():totable()
+ test:is_deeply(t3, data[3], 'tuple 3 from merger 2')
+
+ local t2 = i1:head():totable()
+ test:is_deeply(t2, data[2], 'tuple 2 from merger 1')
+
+ test:ok(i1:is_null(), 'merger 1 ends')
+ test:ok(i2:is_null(), 'merger 2 ends')
+end)
+
+local function reusable_source_gen(param)
+ local chunks = param.chunks
+ local idx = param.idx or 1
+
+ if idx > table.maxn(chunks) then
+ return
+ end
+
+ local chunk = chunks[idx]
+ param.idx = idx + 1
+
+ if chunk == nil then
+ return
+ end
+ return box.NULL, chunk
+end
+
+local function verify_reusable_source(test, source)
+ test:plan(3)
+
+ local exp = {{1}, {2}}
+ local res = source:pairs():map(box.tuple.totable):totable()
+ test:is_deeply(res, exp, '1st use')
+
+ local exp = {{3}, {4}, {5}}
+ local res = source:pairs():map(box.tuple.totable):totable()
+ test:is_deeply(res, exp, '2nd use')
+
+ local exp = {}
+ local res = source:pairs():map(box.tuple.totable):totable()
+ test:is_deeply(res, exp, 'end')
+end
+
+test:test('reuse a tuple source', function(test)
+ local tuples = {{1}, {2}, nil, {3}, {4}, {5}}
+ local source = merger.new_tuple_source(reusable_source_gen,
+ {chunks = tuples})
+ verify_reusable_source(test, source)
+end)
+
+test:test('reuse a table source', function(test)
+ local chunks = {{{1}}, {{2}}, {}, nil, {{3}}, {{4}}, {}, {{5}}}
+ local source = merger.new_table_source(reusable_source_gen,
+ {chunks = chunks})
+ verify_reusable_source(test, source)
+end)
+
+test:test('reuse a buffer source', function(test)
+ local chunks_tbl = {{{1}}, {{2}}, {}, nil, {{3}}, {{4}}, {}, {{5}}}
+ local chunks = {}
+ for i = 1, table.maxn(chunks_tbl) do
+ if chunks_tbl[i] == nil then
+ chunks[i] = nil
+ else
+ chunks[i] = buffer.ibuf()
+ msgpackffi.internal.encode_r(chunks[i], chunks_tbl[i], 0)
+ end
+ end
+ local source = merger.new_buffer_source(reusable_source_gen,
+ {chunks = chunks})
+ verify_reusable_source(test, source)
+end)
+
+test:test('use limit', function(test)
+ test:plan(6)
+
+ local data = {{'a'}, {'b'}}
+
+ local source = merger.new_source_fromtable(data)
+ local m = merger.new(key_def, {source})
+ local res = m:select({limit = 0})
+ test:is(#res, 0, 'table output with limit 0')
+
+ local source = merger.new_source_fromtable(data)
+ local m = merger.new(key_def, {source})
+ local res = m:select({limit = 1})
+ test:is(#res, 1, 'table output with limit 1')
+ test:is_deeply(res[1]:totable(), data[1], 'tuple content')
+
+ local source = merger.new_source_fromtable(data)
+ local m = merger.new(key_def, {source})
+ local output_buffer = buffer.ibuf()
+ m:select({buffer = output_buffer, limit = 0})
+ local res = msgpackffi.decode(output_buffer.rpos)
+ test:is(#res, 0, 'buffer output with limit 0')
+
+ local source = merger.new_source_fromtable(data)
+ local m = merger.new(key_def, {source})
+ output_buffer:recycle()
+ m:select({buffer = output_buffer, limit = 1})
+ local res = msgpackffi.decode(output_buffer.rpos)
+ test:is(#res, 1, 'buffer output with limit 1')
+ test:is_deeply(res[1], data[1], 'tuple content')
+end)
+
+test:test('cascade mergers', function(test)
+ test:plan(2)
+
+ local data = {{'a'}, {'b'}}
+
+ local source = merger.new_source_fromtable(data)
+ local m1 = merger.new(key_def, {source})
+ local m2 = merger.new(key_def, {m1})
+
+ local res = m2:pairs():map(box.tuple.totable):totable()
+ test:is_deeply(res, data, 'same key_def')
+
+ local key_def_unicode = key_def_lib.new({{
+ fieldno = 1,
+ type = 'string',
+ collation = 'unicode',
+ }})
+
+ local source = merger.new_source_fromtable(data)
+ local m1 = merger.new(key_def, {source})
+ local m2 = merger.new(key_def_unicode, {m1})
+
+ local res = m2:pairs():map(box.tuple.totable):totable()
+ test:is_deeply(res, data, 'different key_defs')
+end)
+
+-- Merging cases.
+for _, input_type in ipairs({'buffer', 'table', 'tuple'}) do
+ for _, output_type in ipairs({'buffer', 'table', 'tuple'}) do
+ for _, reverse in ipairs({false, true}) do
+ for _, use_table_as_tuple in ipairs({false, true}) do
+ for _, use_fetch_source in ipairs({false, true}) do
+ for _, schema in ipairs(schemas) do
+ run_case(test, schema, {
+ input_type = input_type,
+ output_type = output_type,
+ reverse = reverse,
+ use_table_as_tuple = use_table_as_tuple,
+ use_fetch_source = use_fetch_source,
+ })
+ end
+ end
+ end
+ end
+ end
+end
+
+os.exit(test:check() and 0 or 1)
--
2.21.0
^ permalink raw reply [flat|nested] 9+ messages in thread