* [PATCH v3 1/7] Add luaL_iscallable with support of cdata metatype
2019-04-10 15:21 [PATCH v3 0/7] Merger Alexander Turenko
@ 2019-04-10 15:21 ` Alexander Turenko
2019-04-18 17:30 ` [tarantool-patches] " Konstantin Osipov
2019-04-30 12:45 ` Vladimir Davydov
2019-04-10 15:21 ` [PATCH v3 2/7] Add functions to ease using Lua iterators from C Alexander Turenko
` (5 subsequent siblings)
6 siblings, 2 replies; 39+ messages in thread
From: Alexander Turenko @ 2019-04-10 15:21 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches
Needed for #3276.
---
extra/exports | 1 +
src/lua/utils.c | 43 ++++++++++++++++
src/lua/utils.h | 10 ++++
test/app-tap/module_api.c | 10 ++++
test/app-tap/module_api.test.lua | 85 +++++++++++++++++++++++++++++++-
5 files changed, 147 insertions(+), 2 deletions(-)
diff --git a/extra/exports b/extra/exports
index 21fa9bf00..4f41a17b3 100644
--- a/extra/exports
+++ b/extra/exports
@@ -136,6 +136,7 @@ luaT_call
luaT_cpcall
luaT_state
luaT_tolstring
+luaL_iscallable
box_txn
box_txn_begin
box_txn_commit
diff --git a/src/lua/utils.c b/src/lua/utils.c
index a4fdf8985..83bc1695e 100644
--- a/src/lua/utils.c
+++ b/src/lua/utils.c
@@ -1016,6 +1016,49 @@ luaT_tolstring(lua_State *L, int idx, size_t *len)
return lua_tolstring(L, -1, len);
}
+/* Based on ffi_meta___call() from luajit/src/lib_ffi.c. */
+static int
+luaL_cdata_iscallable(lua_State *L, int idx)
+{
+ /* Calculate absolute value in the stack. */
+ if (idx < 0)
+ idx = lua_gettop(L) + idx + 1;
+
+ /* Get cdata from the stack. */
+ assert(lua_type(L, idx) == LUA_TCDATA);
+ GCcdata *cd = cdataV(L->base + idx - 1);
+
+ CTState *cts = ctype_cts(L);
+ CTypeID id = cd->ctypeid;
+ CType *ct = ctype_raw(cts, id);
+ if (ctype_isptr(ct->info))
+ id = ctype_cid(ct->info);
+
+ /* Get ctype metamethod. */
+ cTValue *tv = lj_ctype_meta(cts, id, MM_call);
+
+ return tv != NULL;
+}
+
+int
+luaL_iscallable(lua_State *L, int idx)
+{
+ /* Whether it is function. */
+ int res = lua_isfunction(L, idx);
+ if (res == 1)
+ return 1;
+
+ /* Whether it is cdata with metatype with __call field. */
+ if (lua_type(L, idx) == LUA_TCDATA)
+ return luaL_cdata_iscallable(L, idx);
+
+ /* Whether it has metatable with __call field. */
+ res = luaL_getmetafield(L, idx, "__call");
+ if (res == 1)
+ lua_pop(L, 1); /* Pop __call value. */
+ return res;
+}
+
lua_State *
luaT_state(void)
{
diff --git a/src/lua/utils.h b/src/lua/utils.h
index df4d50e1d..cf8c8d09d 100644
--- a/src/lua/utils.h
+++ b/src/lua/utils.h
@@ -449,6 +449,16 @@ luaT_state(void);
LUA_API const char *
luaT_tolstring(lua_State *L, int idx, size_t *ssize);
+/**
+ * Check whether a Lua object is a function or has
+ * metatable/metatype with a __call field.
+ *
+ * Note: It does not check type of __call metatable/metatype
+ * field.
+ */
+LUA_API int
+luaL_iscallable(lua_State *L, int idx);
+
/** \endcond public */
/**
diff --git a/test/app-tap/module_api.c b/test/app-tap/module_api.c
index 4abe1af48..b81a98056 100644
--- a/test/app-tap/module_api.c
+++ b/test/app-tap/module_api.c
@@ -440,6 +440,15 @@ test_tostring(lua_State *L)
return 1;
}
+static int
+test_iscallable(lua_State *L)
+{
+ int exp = lua_toboolean(L, 2);
+ int res = luaL_iscallable(L, 1);
+ lua_pushboolean(L, res == exp);
+ return 1;
+}
+
LUA_API int
luaopen_module_api(lua_State *L)
{
@@ -467,6 +476,7 @@ luaopen_module_api(lua_State *L)
{"test_cpcall", test_cpcall},
{"test_state", test_state},
{"test_tostring", test_tostring},
+ {"iscallable", test_iscallable},
{NULL, NULL}
};
luaL_register(L, "module_api", lib);
diff --git a/test/app-tap/module_api.test.lua b/test/app-tap/module_api.test.lua
index f93257236..a6658cc61 100755
--- a/test/app-tap/module_api.test.lua
+++ b/test/app-tap/module_api.test.lua
@@ -3,7 +3,9 @@
local fio = require('fio')
box.cfg{log = "tarantool.log"}
-build_path = os.getenv("BUILDDIR")
+-- Use BUILDDIR passed from test-run or cwd when run w/o
+-- test-run to find test/app-tap/module_api.{so,dylib}.
+build_path = os.getenv("BUILDDIR") or '.'
package.cpath = fio.pathjoin(build_path, 'test/app-tap/?.so' ) .. ';' ..
fio.pathjoin(build_path, 'test/app-tap/?.dylib') .. ';' ..
package.cpath
@@ -36,8 +38,86 @@ local function test_pushcdata(test, module)
test:is(gc_counter, 1, 'pushcdata gc')
end
+local function test_iscallable(test, module)
+ local ffi = require('ffi')
+
+ ffi.cdef([[
+ struct cdata_1 { int foo; };
+ struct cdata_2 { int foo; };
+ ]])
+
+ local cdata_1 = ffi.new('struct cdata_1')
+ local cdata_1_ref = ffi.new('struct cdata_1 &')
+ local cdata_2 = ffi.new('struct cdata_2')
+ local cdata_2_ref = ffi.new('struct cdata_2 &')
+
+ local nop = function() end
+
+ ffi.metatype('struct cdata_2', {
+ __call = nop,
+ })
+
+ local cases = {
+ {
+ obj = nop,
+ exp = true,
+ description = 'function',
+ },
+ {
+ obj = nil,
+ exp = false,
+ description = 'nil',
+ },
+ {
+ obj = 1,
+ exp = false,
+ description = 'number',
+ },
+ {
+ obj = {},
+ exp = false,
+ description = 'table without metatable',
+ },
+ {
+ obj = setmetatable({}, {}),
+ exp = false,
+ description = 'table without __call metatable field',
+ },
+ {
+ obj = setmetatable({}, {__call = nop}),
+ exp = true,
+ description = 'table with __call metatable field'
+ },
+ {
+ obj = cdata_1,
+ exp = false,
+ description = 'cdata without __call metatable field',
+ },
+ {
+ obj = cdata_1_ref,
+ exp = false,
+ description = 'cdata reference without __call metatable field',
+ },
+ {
+ obj = cdata_2,
+ exp = true,
+ description = 'cdata with __call metatable field',
+ },
+ {
+ obj = cdata_2_ref,
+ exp = true,
+ description = 'cdata reference with __call metatable field',
+ },
+ }
+
+ test:plan(#cases)
+ for _, case in ipairs(cases) do
+ test:ok(module.iscallable(case.obj, case.exp), case.description)
+ end
+end
+
local test = require('tap').test("module_api", function(test)
- test:plan(23)
+ test:plan(24)
local status, module = pcall(require, 'module_api')
test:is(status, true, "module")
test:ok(status, "module is loaded")
@@ -62,6 +142,7 @@ local test = require('tap').test("module_api", function(test)
test:like(msg, 'luaT_error', 'luaT_error')
test:test("pushcdata", test_pushcdata, module)
+ test:test("iscallable", test_iscallable, module)
space:drop()
end)
--
2.20.1
^ permalink raw reply [flat|nested] 39+ messages in thread
* [PATCH v3 2/7] Add functions to ease using Lua iterators from C
2019-04-10 15:21 [PATCH v3 0/7] Merger Alexander Turenko
2019-04-10 15:21 ` [PATCH v3 1/7] Add luaL_iscallable with support of cdata metatype Alexander Turenko
@ 2019-04-10 15:21 ` Alexander Turenko
2019-04-18 17:31 ` [tarantool-patches] " Konstantin Osipov
2019-04-30 12:46 ` Vladimir Davydov
2019-04-10 15:21 ` [PATCH v3 3/7] lua: optimize creation of a tuple from a tuple Alexander Turenko
` (4 subsequent siblings)
6 siblings, 2 replies; 39+ messages in thread
From: Alexander Turenko @ 2019-04-10 15:21 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches
Needed for #3276.
---
src/lua/utils.c | 92 +++++++++++++++
src/lua/utils.h | 37 ++++++
test/unit/CMakeLists.txt | 4 +
test/unit/luaL_iterator.c | 208 +++++++++++++++++++++++++++++++++
test/unit/luaL_iterator.result | 89 ++++++++++++++
5 files changed, 430 insertions(+)
create mode 100644 test/unit/luaL_iterator.c
create mode 100644 test/unit/luaL_iterator.result
diff --git a/src/lua/utils.c b/src/lua/utils.c
index 83bc1695e..192912ab8 100644
--- a/src/lua/utils.c
+++ b/src/lua/utils.c
@@ -1065,6 +1065,98 @@ luaT_state(void)
return tarantool_L;
}
+/* {{{ Helper functions to interact with a Lua iterator from C */
+
+struct luaL_iterator {
+ int gen;
+ int param;
+ int state;
+};
+
+struct luaL_iterator *
+luaL_iterator_new(lua_State *L, int idx)
+{
+ struct luaL_iterator *it = malloc(sizeof(struct luaL_iterator));
+ if (it == NULL) {
+ diag_set(OutOfMemory, sizeof(struct luaL_iterator),
+ "malloc", "luaL_iterator");
+ return NULL;
+ }
+
+ if (idx == 0) {
+ /* gen, param, state are on top of a Lua stack. */
+ lua_pushvalue(L, -3); /* Popped by luaL_ref(). */
+ it->gen = luaL_ref(L, LUA_REGISTRYINDEX);
+ lua_pushvalue(L, -2); /* Popped by luaL_ref(). */
+ it->param = luaL_ref(L, LUA_REGISTRYINDEX);
+ lua_pushvalue(L, -1); /* Popped by luaL_ref(). */
+ it->state = luaL_ref(L, LUA_REGISTRYINDEX);
+ } else {
+ /*
+ * {gen, param, state} table is at idx in a Lua
+ * stack.
+ */
+ lua_rawgeti(L, idx, 1); /* Popped by luaL_ref(). */
+ it->gen = luaL_ref(L, LUA_REGISTRYINDEX);
+ lua_rawgeti(L, idx, 2); /* Popped by luaL_ref(). */
+ it->param = luaL_ref(L, LUA_REGISTRYINDEX);
+ lua_rawgeti(L, idx, 3); /* Popped by luaL_ref(). */
+ it->state = luaL_ref(L, LUA_REGISTRYINDEX);
+ }
+
+ return it;
+}
+
+int
+luaL_iterator_next(lua_State *L, struct luaL_iterator *it)
+{
+ int frame_start = lua_gettop(L);
+
+ /* Call gen(param, state). */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, it->gen);
+ lua_rawgeti(L, LUA_REGISTRYINDEX, it->param);
+ lua_rawgeti(L, LUA_REGISTRYINDEX, it->state);
+ if (luaT_call(L, 2, LUA_MULTRET) != 0) {
+ /*
+ * Pop garbage from the call (a gen function
+ * likely will not leave the stack even when raise
+ * an error), pop a returned error.
+ */
+ lua_settop(L, frame_start);
+ return -1;
+ }
+ int nresults = lua_gettop(L) - frame_start;
+
+ /*
+ * gen() function can either return nil when the iterator
+ * ends or return zero count of values.
+ *
+ * In LuaJIT pairs() returns nil, but ipairs() returns
+ * nothing when ends.
+ */
+ if (nresults == 0 || lua_isnil(L, frame_start + 1)) {
+ lua_settop(L, frame_start);
+ return 0;
+ }
+
+ /* Save the first result to it->state. */
+ luaL_unref(L, LUA_REGISTRYINDEX, it->state);
+ lua_pushvalue(L, frame_start + 1); /* Popped by luaL_ref(). */
+ it->state = luaL_ref(L, LUA_REGISTRYINDEX);
+
+ return nresults;
+}
+
+void luaL_iterator_delete(struct luaL_iterator *it)
+{
+ luaL_unref(tarantool_L, LUA_REGISTRYINDEX, it->gen);
+ luaL_unref(tarantool_L, LUA_REGISTRYINDEX, it->param);
+ luaL_unref(tarantool_L, LUA_REGISTRYINDEX, it->state);
+ free(it);
+}
+
+/* }}} */
+
int
tarantool_lua_utils_init(struct lua_State *L)
{
diff --git a/src/lua/utils.h b/src/lua/utils.h
index cf8c8d09d..a22492227 100644
--- a/src/lua/utils.h
+++ b/src/lua/utils.h
@@ -536,6 +536,43 @@ luaL_checkfinite(struct lua_State *L, struct luaL_serializer *cfg,
luaL_error(L, "number must not be NaN or Inf");
}
+/* {{{ Helper functions to interact with a Lua iterator from C */
+
+/**
+ * Holds iterator state (references to Lua objects).
+ */
+struct luaL_iterator;
+
+/**
+ * Create a Lua iterator from a gen, param, state triplet.
+ *
+ * If idx == 0, then three top stack values are used as the
+ * triplet. Note: they are not popped.
+ *
+ * Otherwise idx is index on Lua stack points to a
+ * {gen, param, state} table.
+ */
+struct luaL_iterator *
+luaL_iterator_new(lua_State *L, int idx);
+
+/**
+ * Move iterator to the next value. Push values returned by
+ * gen(param, state).
+ *
+ * Return count of pushed values. Zero means no more results
+ * available. In case of a Lua error in a gen function return -1
+ * and set a diag.
+ */
+int
+luaL_iterator_next(lua_State *L, struct luaL_iterator *it);
+
+/**
+ * Free all resources held by the iterator.
+ */
+void luaL_iterator_delete(struct luaL_iterator *it);
+
+/* }}} */
+
int
tarantool_lua_utils_init(struct lua_State *L);
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 2bcb6e0a8..64739ab09 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -144,6 +144,10 @@ add_executable(luaT_tuple_new.test luaT_tuple_new.c)
target_link_libraries(luaT_tuple_new.test unit box server core misc
${CURL_LIBRARIES} ${LIBYAML_LIBRARIES} ${READLINE_LIBRARIES}
${ICU_LIBRARIES} ${LUAJIT_LIBRARIES})
+add_executable(luaL_iterator.test luaL_iterator.c)
+target_link_libraries(luaL_iterator.test unit server coll core misc
+ ${CURL_LIBRARIES} ${LIBYAML_LIBRARIES} ${READLINE_LIBRARIES}
+ ${ICU_LIBRARIES} ${LUAJIT_LIBRARIES} dl)
add_executable(say.test say.c)
target_link_libraries(say.test core unit)
diff --git a/test/unit/luaL_iterator.c b/test/unit/luaL_iterator.c
new file mode 100644
index 000000000..8d25a0062
--- /dev/null
+++ b/test/unit/luaL_iterator.c
@@ -0,0 +1,208 @@
+#include <lua.h> /* lua_*() */
+#include <lauxlib.h> /* luaL_*() */
+#include <lualib.h> /* luaL_openlibs() */
+#include "unit.h" /* plan, header, footer, is */
+#include "memory.h" /* memory_init() */
+#include "fiber.h" /* fiber_init() */
+#include "diag.h" /* struct error, diag_*() */
+#include "exception.h" /* type_LuajitError */
+#include "lua/utils.h" /* luaL_iterator_*() */
+#include "lua/error.h" /* tarantool_lua_error_init() */
+
+extern char fun_lua[];
+
+int
+main()
+{
+ struct {
+ /* A string to output with a test case. */
+ const char *description;
+ /* A string with Lua code to push an iterator. */
+ const char *init;
+ /*
+ * How much values are pushed by the Lua code
+ * above.
+ */
+ int init_retvals;
+ /*
+ * Start values from this number to distinguish
+ * them from its ordinal.
+ */
+ int first_value;
+ /*
+ * Lua stack index where {gen, param, state} is
+ * placed or zero.
+ */
+ int idx;
+ /* How much values are in the iterator. */
+ int iterations;
+ /* Expected error (if any). */
+ const char *exp_err;
+ } cases[] = {
+ {
+ .description = "pairs, zero idx",
+ .init = "return pairs({42})",
+ .init_retvals = 3,
+ .first_value = 42,
+ .idx = 0,
+ .iterations = 1,
+ .exp_err = NULL,
+ },
+ {
+ .description = "ipairs, zero idx",
+ .init = "return ipairs({42, 43, 44})",
+ .init_retvals = 3,
+ .first_value = 42,
+ .idx = 0,
+ .iterations = 3,
+ .exp_err = NULL,
+ },
+ {
+ .description = "luafun iterator, zero idx",
+ .init = "return fun.wrap(ipairs({42, 43, 44}))",
+ .init_retvals = 3,
+ .first_value = 42,
+ .idx = 0,
+ .iterations = 3,
+ .exp_err = NULL,
+ },
+ {
+ .description = "pairs, from table",
+ .init = "return {pairs({42})}",
+ .init_retvals = 1,
+ .first_value = 42,
+ .idx = -1,
+ .iterations = 1,
+ .exp_err = NULL,
+ },
+ {
+ .description = "ipairs, from table",
+ .init = "return {ipairs({42, 43, 44})}",
+ .init_retvals = 1,
+ .first_value = 42,
+ .idx = -1,
+ .iterations = 3,
+ .exp_err = NULL,
+ },
+ {
+ .description = "luafun iterator, from table",
+ .init = "return {fun.wrap(ipairs({42, 43, 44}))}",
+ .init_retvals = 1,
+ .first_value = 42,
+ .idx = -1,
+ .iterations = 3,
+ .exp_err = NULL,
+ },
+ {
+ .description = "lua error",
+ .init = "return error, 'I am the error', 0",
+ .init_retvals = 3,
+ .first_value = 0,
+ .idx = 0,
+ .iterations = 0,
+ .exp_err = "I am the error",
+ },
+ };
+
+ int cases_cnt = (int) (sizeof(cases) / sizeof(cases[0]));
+ /*
+ * * 4 checks per iteration.
+ * * 3 checks of a stack size.
+ * * 1 check that values ends (for success cases).
+ * * 1 check for an iterator error (for error cases).
+ * * 1 check for an error type (for error cases).
+ * * 1 check for an error message (for error cases).
+ */
+ int planned = 0;
+ for (int i = 0; i < cases_cnt; ++i) {
+ planned += cases[i].iterations * 4 + 4;
+ if (cases[i].exp_err != NULL)
+ planned += 2;
+ }
+
+ plan(planned);
+ header();
+
+ struct lua_State *L = luaL_newstate();
+ luaL_openlibs(L);
+ tarantool_L = L;
+
+ memory_init();
+ fiber_init(fiber_c_invoke);
+ tarantool_lua_error_init(L);
+
+ /*
+ * Check that everything works fine in a thread (a fiber)
+ * other then the main one.
+ */
+ L = lua_newthread(L);
+
+ /*
+ * Expose luafun.
+ *
+ * Don't register it in package.loaded for simplicity.
+ */
+ luaL_loadstring(L, fun_lua);
+ lua_call(L, 0, 1);
+ lua_setglobal(L, "fun");
+
+ for (int i = 0; i < cases_cnt; ++i) {
+ const char *description = cases[i].description;
+ int top = lua_gettop(L);
+
+ /* Push an iterator to the Lua stack. */
+ luaL_loadstring(L, cases[i].init);
+ lua_call(L, 0, cases[i].init_retvals);
+
+ /* Create the luaL_iterator structure. */
+ struct luaL_iterator *it = luaL_iterator_new(L, cases[i].idx);
+ lua_pop(L, cases[i].init_retvals);
+
+ /* Check stack size. */
+ is(lua_gettop(L) - top, 0, "%s: stack size", description);
+
+ /* Iterate over values and check them. */
+ for (int j = 0; j < cases[i].iterations; ++j) {
+ int top = lua_gettop(L);
+ int rc = luaL_iterator_next(L, it);
+ is(rc, 2, "%s: iter %d: gen() retval count",
+ description, j);
+ is(luaL_checkinteger(L, -2), j + 1,
+ "%s: iter %d: gen() 1st retval",
+ description, j);
+ is(luaL_checkinteger(L, -1), j + cases[i].first_value,
+ "%s: iter %d: gen() 2nd retval",
+ description, j);
+ lua_pop(L, 2);
+ is(lua_gettop(L) - top, 0, "%s: iter: %d: stack size",
+ description, j);
+ }
+
+ if (cases[i].exp_err == NULL) {
+ /* Check the iterator ends when expected. */
+ int rc = luaL_iterator_next(L, it);
+ is(rc, 0, "%s: iterator ends", description);
+ } else {
+ /* Check expected error. */
+ int rc = luaL_iterator_next(L, it);
+ is(rc, -1, "%s: iterator error", description);
+ struct error *e = diag_last_error(diag_get());
+ is(e->type, &type_LuajitError, "%s: check error type",
+ description);
+ ok(!strcmp(e->errmsg, cases[i].exp_err),
+ "%s: check error message", description);
+ }
+
+ /* Check stack size. */
+ is(lua_gettop(L) - top, 0, "%s: stack size", description);
+
+ /* Free the luaL_iterator structure. */
+ luaL_iterator_delete(it);
+
+ /* Check stack size. */
+ is(lua_gettop(L) - top, 0, "%s: stack size", description);
+ }
+
+ footer();
+ return check_plan();
+}
diff --git a/test/unit/luaL_iterator.result b/test/unit/luaL_iterator.result
new file mode 100644
index 000000000..2472eedcf
--- /dev/null
+++ b/test/unit/luaL_iterator.result
@@ -0,0 +1,89 @@
+1..86
+ *** main ***
+ok 1 - pairs, zero idx: stack size
+ok 2 - pairs, zero idx: iter 0: gen() retval count
+ok 3 - pairs, zero idx: iter 0: gen() 1st retval
+ok 4 - pairs, zero idx: iter 0: gen() 2nd retval
+ok 5 - pairs, zero idx: iter: 0: stack size
+ok 6 - pairs, zero idx: iterator ends
+ok 7 - pairs, zero idx: stack size
+ok 8 - pairs, zero idx: stack size
+ok 9 - ipairs, zero idx: stack size
+ok 10 - ipairs, zero idx: iter 0: gen() retval count
+ok 11 - ipairs, zero idx: iter 0: gen() 1st retval
+ok 12 - ipairs, zero idx: iter 0: gen() 2nd retval
+ok 13 - ipairs, zero idx: iter: 0: stack size
+ok 14 - ipairs, zero idx: iter 1: gen() retval count
+ok 15 - ipairs, zero idx: iter 1: gen() 1st retval
+ok 16 - ipairs, zero idx: iter 1: gen() 2nd retval
+ok 17 - ipairs, zero idx: iter: 1: stack size
+ok 18 - ipairs, zero idx: iter 2: gen() retval count
+ok 19 - ipairs, zero idx: iter 2: gen() 1st retval
+ok 20 - ipairs, zero idx: iter 2: gen() 2nd retval
+ok 21 - ipairs, zero idx: iter: 2: stack size
+ok 22 - ipairs, zero idx: iterator ends
+ok 23 - ipairs, zero idx: stack size
+ok 24 - ipairs, zero idx: stack size
+ok 25 - luafun iterator, zero idx: stack size
+ok 26 - luafun iterator, zero idx: iter 0: gen() retval count
+ok 27 - luafun iterator, zero idx: iter 0: gen() 1st retval
+ok 28 - luafun iterator, zero idx: iter 0: gen() 2nd retval
+ok 29 - luafun iterator, zero idx: iter: 0: stack size
+ok 30 - luafun iterator, zero idx: iter 1: gen() retval count
+ok 31 - luafun iterator, zero idx: iter 1: gen() 1st retval
+ok 32 - luafun iterator, zero idx: iter 1: gen() 2nd retval
+ok 33 - luafun iterator, zero idx: iter: 1: stack size
+ok 34 - luafun iterator, zero idx: iter 2: gen() retval count
+ok 35 - luafun iterator, zero idx: iter 2: gen() 1st retval
+ok 36 - luafun iterator, zero idx: iter 2: gen() 2nd retval
+ok 37 - luafun iterator, zero idx: iter: 2: stack size
+ok 38 - luafun iterator, zero idx: iterator ends
+ok 39 - luafun iterator, zero idx: stack size
+ok 40 - luafun iterator, zero idx: stack size
+ok 41 - pairs, from table: stack size
+ok 42 - pairs, from table: iter 0: gen() retval count
+ok 43 - pairs, from table: iter 0: gen() 1st retval
+ok 44 - pairs, from table: iter 0: gen() 2nd retval
+ok 45 - pairs, from table: iter: 0: stack size
+ok 46 - pairs, from table: iterator ends
+ok 47 - pairs, from table: stack size
+ok 48 - pairs, from table: stack size
+ok 49 - ipairs, from table: stack size
+ok 50 - ipairs, from table: iter 0: gen() retval count
+ok 51 - ipairs, from table: iter 0: gen() 1st retval
+ok 52 - ipairs, from table: iter 0: gen() 2nd retval
+ok 53 - ipairs, from table: iter: 0: stack size
+ok 54 - ipairs, from table: iter 1: gen() retval count
+ok 55 - ipairs, from table: iter 1: gen() 1st retval
+ok 56 - ipairs, from table: iter 1: gen() 2nd retval
+ok 57 - ipairs, from table: iter: 1: stack size
+ok 58 - ipairs, from table: iter 2: gen() retval count
+ok 59 - ipairs, from table: iter 2: gen() 1st retval
+ok 60 - ipairs, from table: iter 2: gen() 2nd retval
+ok 61 - ipairs, from table: iter: 2: stack size
+ok 62 - ipairs, from table: iterator ends
+ok 63 - ipairs, from table: stack size
+ok 64 - ipairs, from table: stack size
+ok 65 - luafun iterator, from table: stack size
+ok 66 - luafun iterator, from table: iter 0: gen() retval count
+ok 67 - luafun iterator, from table: iter 0: gen() 1st retval
+ok 68 - luafun iterator, from table: iter 0: gen() 2nd retval
+ok 69 - luafun iterator, from table: iter: 0: stack size
+ok 70 - luafun iterator, from table: iter 1: gen() retval count
+ok 71 - luafun iterator, from table: iter 1: gen() 1st retval
+ok 72 - luafun iterator, from table: iter 1: gen() 2nd retval
+ok 73 - luafun iterator, from table: iter: 1: stack size
+ok 74 - luafun iterator, from table: iter 2: gen() retval count
+ok 75 - luafun iterator, from table: iter 2: gen() 1st retval
+ok 76 - luafun iterator, from table: iter 2: gen() 2nd retval
+ok 77 - luafun iterator, from table: iter: 2: stack size
+ok 78 - luafun iterator, from table: iterator ends
+ok 79 - luafun iterator, from table: stack size
+ok 80 - luafun iterator, from table: stack size
+ok 81 - lua error: stack size
+ok 82 - lua error: iterator error
+ok 83 - lua error: check error type
+ok 84 - lua error: check error message
+ok 85 - lua error: stack size
+ok 86 - lua error: stack size
+ *** main: done ***
--
2.20.1
^ permalink raw reply [flat|nested] 39+ messages in thread
* [PATCH v3 3/7] lua: optimize creation of a tuple from a tuple
2019-04-10 15:21 [PATCH v3 0/7] Merger Alexander Turenko
2019-04-10 15:21 ` [PATCH v3 1/7] Add luaL_iscallable with support of cdata metatype Alexander Turenko
2019-04-10 15:21 ` [PATCH v3 2/7] Add functions to ease using Lua iterators from C Alexander Turenko
@ 2019-04-10 15:21 ` Alexander Turenko
2019-04-18 17:32 ` [tarantool-patches] " Konstantin Osipov
2019-04-30 12:50 ` Vladimir Davydov
2019-04-10 15:21 ` [PATCH v3 4/7] lua: add non-recursive msgpack decoding functions Alexander Turenko
` (3 subsequent siblings)
6 siblings, 2 replies; 39+ messages in thread
From: Alexander Turenko @ 2019-04-10 15:21 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches
Don't parse tuple data, just copy it.
---
src/box/lua/tuple.c | 65 ++++++++++++++++++++++++++++-----------------
1 file changed, 41 insertions(+), 24 deletions(-)
diff --git a/src/box/lua/tuple.c b/src/box/lua/tuple.c
index 183c3901d..c57945997 100644
--- a/src/box/lua/tuple.c
+++ b/src/box/lua/tuple.c
@@ -96,37 +96,54 @@ luaT_istuple(struct lua_State *L, int narg)
struct tuple *
luaT_tuple_new(struct lua_State *L, int idx, box_tuple_format_t *format)
{
- if (idx != 0 && !lua_istable(L, idx) && !luaT_istuple(L, idx)) {
+ struct tuple *tuple;
+
+ if (idx == 0 || lua_istable(L, idx)) {
+ struct ibuf *buf = tarantool_lua_ibuf;
+ ibuf_reset(buf);
+ struct mpstream stream;
+ mpstream_init(&stream, buf, ibuf_reserve_cb, ibuf_alloc_cb,
+ luamp_error, L);
+ if (idx == 0) {
+ /*
+ * Create the tuple from lua stack
+ * objects.
+ */
+ int argc = lua_gettop(L);
+ mpstream_encode_array(&stream, argc);
+ for (int k = 1; k <= argc; ++k) {
+ luamp_encode(L, luaL_msgpack_default, &stream,
+ k);
+ }
+ } else {
+ /* Create the tuple from a Lua table. */
+ luamp_encode_tuple(L, &tuple_serializer, &stream, idx);
+ }
+ mpstream_flush(&stream);
+ tuple = box_tuple_new(format, buf->buf,
+ buf->buf + ibuf_used(buf));
+ if (tuple == NULL)
+ return NULL;
+ ibuf_reinit(tarantool_lua_ibuf);
+ return tuple;
+ }
+
+ tuple = luaT_istuple(L, idx);
+ if (tuple == NULL) {
diag_set(IllegalParams, "A tuple or a table expected, got %s",
lua_typename(L, lua_type(L, idx)));
return NULL;
}
- struct ibuf *buf = tarantool_lua_ibuf;
- ibuf_reset(buf);
- struct mpstream stream;
- mpstream_init(&stream, buf, ibuf_reserve_cb, ibuf_alloc_cb,
- luamp_error, L);
- if (idx == 0) {
- /*
- * Create the tuple from lua stack
- * objects.
- */
- int argc = lua_gettop(L);
- mpstream_encode_array(&stream, argc);
- for (int k = 1; k <= argc; ++k) {
- luamp_encode(L, luaL_msgpack_default, &stream, k);
- }
- } else {
- /* Create the tuple from a Lua table. */
- luamp_encode_tuple(L, &tuple_serializer, &stream, idx);
- }
- mpstream_flush(&stream);
- struct tuple *tuple = box_tuple_new(format, buf->buf,
- buf->buf + ibuf_used(buf));
+ /*
+ * Create a new tuple with the necessary format from
+ * another tuple.
+ */
+ const char *tuple_beg = tuple_data(tuple);
+ const char *tuple_end = tuple_beg + tuple->bsize;
+ tuple = box_tuple_new(format, tuple_beg, tuple_end);
if (tuple == NULL)
return NULL;
- ibuf_reinit(tarantool_lua_ibuf);
return tuple;
}
--
2.20.1
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] [PATCH v3 3/7] lua: optimize creation of a tuple from a tuple
2019-04-10 15:21 ` [PATCH v3 3/7] lua: optimize creation of a tuple from a tuple Alexander Turenko
@ 2019-04-18 17:32 ` Konstantin Osipov
2019-04-30 12:50 ` Vladimir Davydov
1 sibling, 0 replies; 39+ messages in thread
From: Konstantin Osipov @ 2019-04-18 17:32 UTC (permalink / raw)
To: tarantool-patches; +Cc: Vladimir Davydov, Alexander Turenko
* Alexander Turenko <alexander.turenko@tarantool.org> [19/04/10 18:23]:
> Don't parse tuple data, just copy it.
LGTM
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [PATCH v3 3/7] lua: optimize creation of a tuple from a tuple
2019-04-10 15:21 ` [PATCH v3 3/7] lua: optimize creation of a tuple from a tuple Alexander Turenko
2019-04-18 17:32 ` [tarantool-patches] " Konstantin Osipov
@ 2019-04-30 12:50 ` Vladimir Davydov
2019-04-30 15:07 ` Alexander Turenko
1 sibling, 1 reply; 39+ messages in thread
From: Vladimir Davydov @ 2019-04-30 12:50 UTC (permalink / raw)
To: Alexander Turenko; +Cc: tarantool-patches
On Wed, Apr 10, 2019 at 06:21:21PM +0300, Alexander Turenko wrote:
> Don't parse tuple data, just copy it.
I don't understand: luamp_encode_tuple doesn't parse or decode a tuple.
It simply copies it to the stream. This patch simply eliminates an extra
memcpy, right?
Is this small optimization worth complicating the code?
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [PATCH v3 3/7] lua: optimize creation of a tuple from a tuple
2019-04-30 12:50 ` Vladimir Davydov
@ 2019-04-30 15:07 ` Alexander Turenko
0 siblings, 0 replies; 39+ messages in thread
From: Alexander Turenko @ 2019-04-30 15:07 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
On Tue, Apr 30, 2019 at 03:50:52PM +0300, Vladimir Davydov wrote:
> On Wed, Apr 10, 2019 at 06:21:21PM +0300, Alexander Turenko wrote:
> > Don't parse tuple data, just copy it.
>
> I don't understand: luamp_encode_tuple doesn't parse or decode a tuple.
> It simply copies it to the stream. This patch simply eliminates an extra
> memcpy, right?
>
> Is this small optimization worth complicating the code?
I did thought that it parses a tuple. Now I see that it does not.
Dropped from the patchset.
WBR, Alexander Turenko.
^ permalink raw reply [flat|nested] 39+ messages in thread
* [PATCH v3 4/7] lua: add non-recursive msgpack decoding functions
2019-04-10 15:21 [PATCH v3 0/7] Merger Alexander Turenko
` (2 preceding siblings ...)
2019-04-10 15:21 ` [PATCH v3 3/7] lua: optimize creation of a tuple from a tuple Alexander Turenko
@ 2019-04-10 15:21 ` Alexander Turenko
2019-04-18 17:35 ` [tarantool-patches] " Konstantin Osipov
2019-04-30 13:03 ` Vladimir Davydov
2019-04-10 15:21 ` [PATCH v3 5/7] net.box: add skip_header option to use with buffer Alexander Turenko
` (2 subsequent siblings)
6 siblings, 2 replies; 39+ messages in thread
From: Alexander Turenko @ 2019-04-10 15:21 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches
Needed for #3276.
@TarantoolBot document
Title: Non-recursive msgpack decoding functions
Contracts:
```
msgpack.decode_array(buf.rpos, buf:size()) -> arr_len, new_rpos
msgpack.decode_map(buf.rpos, buf:size()) -> map_len, new_rpos
```
These functions are intended to be used with a msgpack buffer received
from net.box. A user may want to skip {[IPROTO_DATA_KEY] = ...} wrapper
and an array header before pass the buffer to decode in some C function.
See https://github.com/tarantool/tarantool/issues/2195 for more
information re this net.box's API.
Consider merger's docbot comment for usage examples.
---
src/lua/msgpack.c | 80 +++++++++++++++
test/app-tap/msgpack.test.lua | 180 +++++++++++++++++++++++++++++++++-
2 files changed, 259 insertions(+), 1 deletion(-)
diff --git a/src/lua/msgpack.c b/src/lua/msgpack.c
index 1b1874eb2..f445840bf 100644
--- a/src/lua/msgpack.c
+++ b/src/lua/msgpack.c
@@ -422,6 +422,84 @@ lua_ibuf_msgpack_decode(lua_State *L)
return 2;
}
+/**
+ * Verify and set arguments: data and size.
+ *
+ * Always return 0. In case of any fail raise a Lua error.
+ */
+static int
+verify_decode_args(lua_State *L, const char *func_name, const char **data_p,
+ ptrdiff_t *size_p)
+{
+ /* Verify arguments count. */
+ if (lua_gettop(L) != 2)
+ return luaL_error(L, "Usage: %s(ptr, size)", func_name);
+
+ /* Verify ptr type. */
+ uint32_t ctypeid;
+ const char *data = *(char **) luaL_checkcdata(L, 1, &ctypeid);
+ if (ctypeid != CTID_CHAR_PTR)
+ return luaL_error(L, "%s: 'char *' expected", func_name);
+
+ /* Verify size type and value. */
+ ptrdiff_t size = (ptrdiff_t) luaL_checkinteger(L, 2);
+ if (size <= 0)
+ return luaL_error(L, "%s: non-positive size", func_name);
+
+ *data_p = data;
+ *size_p = size;
+
+ return 0;
+}
+
+/**
+ * msgpack.decode_array(buf.rpos, buf:size()) -> arr_len, new_rpos
+ */
+static int
+lua_decode_array(lua_State *L)
+{
+ const char *func_name = "msgpack.decode_array";
+ const char *data;
+ ptrdiff_t size;
+ verify_decode_args(L, func_name, &data, &size);
+
+ if (mp_typeof(*data) != MP_ARRAY)
+ return luaL_error(L, "%s: unexpected msgpack type", func_name);
+
+ if (mp_check_array(data, data + size) > 0)
+ return luaL_error(L, "%s: unexpected end of buffer", func_name);
+
+ uint32_t len = mp_decode_array(&data);
+
+ lua_pushinteger(L, len);
+ *(const char **) luaL_pushcdata(L, CTID_CHAR_PTR) = data;
+ return 2;
+}
+
+/**
+ * msgpack.decode_map(buf.rpos, buf:size()) -> map_len, new_rpos
+ */
+static int
+lua_decode_map(lua_State *L)
+{
+ const char *func_name = "msgpack.decode_map";
+ const char *data;
+ ptrdiff_t size;
+ verify_decode_args(L, func_name, &data, &size);
+
+ if (mp_typeof(*data) != MP_MAP)
+ return luaL_error(L, "%s: unexpected msgpack type", func_name);
+
+ if (mp_check_map(data, data + size) > 0)
+ return luaL_error(L, "%s: unexpected end of buffer", func_name);
+
+ uint32_t len = mp_decode_map(&data);
+
+ lua_pushinteger(L, len);
+ *(const char **) luaL_pushcdata(L, CTID_CHAR_PTR) = data;
+ return 2;
+}
+
static int
lua_msgpack_new(lua_State *L);
@@ -430,6 +508,8 @@ static const luaL_Reg msgpacklib[] = {
{ "decode", lua_msgpack_decode },
{ "decode_unchecked", lua_msgpack_decode_unchecked },
{ "ibuf_decode", lua_ibuf_msgpack_decode },
+ { "decode_array", lua_decode_array },
+ { "decode_map", lua_decode_map },
{ "new", lua_msgpack_new },
{ NULL, NULL }
};
diff --git a/test/app-tap/msgpack.test.lua b/test/app-tap/msgpack.test.lua
index 0e1692ad9..ee215dfb1 100755
--- a/test/app-tap/msgpack.test.lua
+++ b/test/app-tap/msgpack.test.lua
@@ -49,9 +49,186 @@ local function test_misc(test, s)
test:ok(not st and e:match("null"), "null ibuf")
end
+local function test_decode_array_map(test, s)
+ local ffi = require('ffi')
+
+ local usage_err = 'Usage: msgpack%.decode_[^_(]+%(ptr, size%)'
+ local end_of_buffer_err = 'msgpack%.decode_[^_]+: unexpected end of buffer'
+ local non_positive_size_err = 'msgpack.decode_[^_]+: non%-positive size'
+
+ local decode_cases = {
+ {
+ 'fixarray',
+ func = s.decode_array,
+ data = ffi.cast('char *', '\x94'),
+ size = 1,
+ exp_len = 4,
+ exp_rewind = 1,
+ },
+ {
+ 'array 16',
+ func = s.decode_array,
+ data = ffi.cast('char *', '\xdc\x00\x04'),
+ size = 3,
+ exp_len = 4,
+ exp_rewind = 3,
+ },
+ {
+ 'array 32',
+ func = s.decode_array,
+ data = ffi.cast('char *', '\xdd\x00\x00\x00\x04'),
+ size = 5,
+ exp_len = 4,
+ exp_rewind = 5,
+ },
+ {
+ 'truncated array 16',
+ func = s.decode_array,
+ data = ffi.cast('char *', '\xdc\x00'),
+ size = 2,
+ exp_err = end_of_buffer_err,
+ },
+ {
+ 'truncated array 32',
+ func = s.decode_array,
+ data = ffi.cast('char *', '\xdd\x00\x00\x00'),
+ size = 4,
+ exp_err = end_of_buffer_err,
+ },
+ {
+ 'fixmap',
+ func = s.decode_map,
+ data = ffi.cast('char *', '\x84'),
+ size = 1,
+ exp_len = 4,
+ exp_rewind = 1,
+ },
+ {
+ 'map 16',
+ func = s.decode_map,
+ data = ffi.cast('char *', '\xde\x00\x04'),
+ size = 3,
+ exp_len = 4,
+ exp_rewind = 3,
+ },
+ {
+ 'array 32',
+ func = s.decode_map,
+ data = ffi.cast('char *', '\xdf\x00\x00\x00\x04'),
+ size = 5,
+ exp_len = 4,
+ exp_rewind = 5,
+ },
+ {
+ 'truncated map 16',
+ func = s.decode_map,
+ data = ffi.cast('char *', '\xde\x00'),
+ size = 2,
+ exp_err = end_of_buffer_err,
+ },
+ {
+ 'truncated map 32',
+ func = s.decode_map,
+ data = ffi.cast('char *', '\xdf\x00\x00\x00'),
+ size = 4,
+ exp_err = end_of_buffer_err,
+ },
+ }
+
+ local bad_api_cases = {
+ {
+ 'wrong msgpack type',
+ data = ffi.cast('char *', '\xc0'),
+ size = 1,
+ exp_err = 'msgpack.decode_[^_]+: unexpected msgpack type',
+ },
+ {
+ 'zero size buffer',
+ data = ffi.cast('char *', ''),
+ size = 0,
+ exp_err = non_positive_size_err,
+ },
+ {
+ 'negative size buffer',
+ data = ffi.cast('char *', ''),
+ size = -1,
+ exp_err = non_positive_size_err,
+ },
+ {
+ 'size is nil',
+ data = ffi.cast('char *', ''),
+ size = nil,
+ exp_err = 'bad argument',
+ },
+ {
+ 'no arguments',
+ args = {},
+ exp_err = usage_err,
+ },
+ {
+ 'one argument',
+ args = {ffi.cast('char *', '')},
+ exp_err = usage_err,
+ },
+ {
+ 'data is nil',
+ args = {nil, 1},
+ exp_err = 'expected cdata as 1 argument',
+ },
+ {
+ 'data is not cdata',
+ args = {1, 1},
+ exp_err = 'expected cdata as 1 argument',
+ },
+ {
+ 'data with wrong cdata type',
+ args = {box.tuple.new(), 1},
+ exp_err = "msgpack.decode_[^_]+: 'char %*' expected",
+ },
+ {
+ 'size has wrong type',
+ args = {ffi.cast('char *', ''), 'eee'},
+ exp_err = 'bad argument',
+ },
+ }
+
+ test:plan(#decode_cases + 2 * #bad_api_cases)
+
+ -- Decode cases.
+ for _, case in ipairs(decode_cases) do
+ if case.exp_err ~= nil then
+ local ok, err = pcall(case.func, case.data, case.size)
+ local description = ('bad; %s'):format(case[1])
+ test:ok(ok == false and err:match(case.exp_err), description)
+ else
+ local len, new_buf = case.func(case.data, case.size)
+ local rewind = new_buf - case.data
+ local description = ('good; %s'):format(case[1])
+ test:is_deeply({len, rewind}, {case.exp_len, case.exp_rewind},
+ description)
+ end
+ end
+
+ -- Bad api usage cases.
+ for _, func_name in ipairs({'decode_array', 'decode_map'}) do
+ for _, case in ipairs(bad_api_cases) do
+ local ok, err
+ if case.args ~= nil then
+ local args_len = table.maxn(case.args)
+ ok, err = pcall(s[func_name], unpack(case.args, 1, args_len))
+ else
+ ok, err = pcall(s[func_name], case.data, case.size)
+ end
+ local description = ('%s bad api usage; %s'):format(func_name,
+ case[1])
+ test:ok(ok == false and err:match(case.exp_err), description)
+ end
+ end
+end
+
tap.test("msgpack", function(test)
local serializer = require('msgpack')
- test:plan(10)
+ test:plan(11)
test:test("unsigned", common.test_unsigned, serializer)
test:test("signed", common.test_signed, serializer)
test:test("double", common.test_double, serializer)
@@ -62,4 +239,5 @@ tap.test("msgpack", function(test)
test:test("ucdata", common.test_ucdata, serializer)
test:test("offsets", test_offsets, serializer)
test:test("misc", test_misc, serializer)
+ test:test("decode_array_map", test_decode_array_map, serializer)
end)
--
2.20.1
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] [PATCH v3 4/7] lua: add non-recursive msgpack decoding functions
2019-04-10 15:21 ` [PATCH v3 4/7] lua: add non-recursive msgpack decoding functions Alexander Turenko
@ 2019-04-18 17:35 ` Konstantin Osipov
2019-04-18 18:30 ` Alexander Turenko
2019-04-30 13:03 ` Vladimir Davydov
1 sibling, 1 reply; 39+ messages in thread
From: Konstantin Osipov @ 2019-04-18 17:35 UTC (permalink / raw)
To: tarantool-patches; +Cc: Vladimir Davydov, Alexander Turenko
* Alexander Turenko <alexander.turenko@tarantool.org> [19/04/10 18:23]:
> Contracts:
>
> ```
> msgpack.decode_array(buf.rpos, buf:size()) -> arr_len, new_rpos
> msgpack.decode_map(buf.rpos, buf:size()) -> map_len, new_rpos
> ```
From what they do, they actually neither decode nor skip an
array/map. They decode array/map header only. I know that msgpuck
uses this naming, but it is actually confusing. Why not just
encode array/map header, but skip array/map data in these functions and
and call them msgpack.skip_array(), msgpack.skip_map()
respectively?
> +
> + uint32_t len = mp_decode_array(&data);
This doesn't actually skip array members, does it.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] [PATCH v3 4/7] lua: add non-recursive msgpack decoding functions
2019-04-18 17:35 ` [tarantool-patches] " Konstantin Osipov
@ 2019-04-18 18:30 ` Alexander Turenko
2019-04-18 18:33 ` Konstantin Osipov
0 siblings, 1 reply; 39+ messages in thread
From: Alexander Turenko @ 2019-04-18 18:30 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches, Vladimir Davydov
In short:
* The behaviour is what I need (I cannot skip data).
* The names are confusing, but I don't see better variants. These ones
at least consistent with others.
I posted below relevant discussions, old variants, possible variants,
usage examples and so on.
WBR, Alexander Turenko.
On Thu, Apr 18, 2019 at 08:35:28PM +0300, Konstantin Osipov wrote:
> * Alexander Turenko <alexander.turenko@tarantool.org> [19/04/10 18:23]:
> > Contracts:
> >
> > ```
> > msgpack.decode_array(buf.rpos, buf:size()) -> arr_len, new_rpos
> > msgpack.decode_map(buf.rpos, buf:size()) -> map_len, new_rpos
> > ```
>
> From what they do, they actually neither decode nor skip an
> array/map. They decode array/map header only. I know that msgpuck
> uses this naming, but it is actually confusing.
Yep, names are most hard part of the patch. They were check_array/map(),
then were renamed to decode_array/map(). The relevant parts from the
previous discussion:
Vladimir:
> Not sure about the name either, because it doesn't just checks the
> msgpack - it decodes it, but can't come up with anything substantially
> better. May be, msgpack.decode_array?
>
> https://www.freelists.org/post/tarantool-patches/PATCH-v2-56-netbox-add-helpers-to-decode-msgpack-headers,1
Me:
> Re name: now I understood: decode_unchecked() is like mp_decode(),
> decode() is like mp_check() + mp_decode(). So it worth to rename it to
> decode_array(). Done.
>
> https://www.freelists.org/post/tarantool-patches/PATCH-v2-56-netbox-add-helpers-to-decode-msgpack-headers,2
All the words 'check', 'skip' and 'decode' are assumed intuitively as
something related to the whole array/map. The only thing we able to do
here is to add '_header' postfix (it would look a bit weird, IMHO).
Should I do?
> Why not just encode array/map header, but skip array/map data in
> these functions and and call them msgpack.skip_array(),
> msgpack.skip_map() respectively?
I need to skip an array/map header (and save length), but feed bare data
to a source constructor.
Initially I had a parameter in merger (now it would be in a source
constructor) that say how much array headers it need to skip before
actually read tuples. AFAIR, the option values were BUFFER_RAW,
BUFFER_SELECT, BUFFER_CALL, BUFFER_CHUNKED.
Then we decided to factor it our from merger and then splitted to
msgpack helpers and a net.box option.
> > +
> > + uint32_t len = mp_decode_array(&data);
>
> This doesn't actually skip array members, does it.
This is exactly what I need.
Don't know how to describe it in short. I receive [[tuple, tuple, ...],
[tuple, tuple, ...]] in a buffer and need to feed each [tuple, tuple,
...] part to merger.
The following sections of merger's examples should shed light on that I
hope:
https://github.com/Totktonada/tarantool-merger-examples/tree/695fc9511685033f4b4b22c0df537a12ddf2eaf6#preparing-buffers
https://github.com/Totktonada/tarantool-merger-examples/tree/695fc9511685033f4b4b22c0df537a12ddf2eaf6#multiplexing-requests
The usage example itself is here:
https://github.com/Totktonada/tarantool-merger-examples/blob/695fc9511685033f4b4b22c0df537a12ddf2eaf6/multiplexed_example/frontend.lua#L38-L46
WBR, Alexander Turenko.
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] [PATCH v3 4/7] lua: add non-recursive msgpack decoding functions
2019-04-18 18:30 ` Alexander Turenko
@ 2019-04-18 18:33 ` Konstantin Osipov
2019-04-18 18:44 ` Alexander Turenko
0 siblings, 1 reply; 39+ messages in thread
From: Konstantin Osipov @ 2019-04-18 18:33 UTC (permalink / raw)
To: Alexander Turenko; +Cc: tarantool-patches, Vladimir Davydov
* Alexander Turenko <alexander.turenko@tarantool.org> [19/04/18 21:32]:
> In short:
>
> * The behaviour is what I need (I cannot skip data).
decode_array_header, decode_map_header?
> * The names are confusing, but I don't see better variants. These ones
> at least consistent with others.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] [PATCH v3 4/7] lua: add non-recursive msgpack decoding functions
2019-04-18 18:33 ` Konstantin Osipov
@ 2019-04-18 18:44 ` Alexander Turenko
0 siblings, 0 replies; 39+ messages in thread
From: Alexander Turenko @ 2019-04-18 18:44 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches, Vladimir Davydov
On Thu, Apr 18, 2019 at 09:33:29PM +0300, Konstantin Osipov wrote:
> * Alexander Turenko <alexander.turenko@tarantool.org> [19/04/18 21:32]:
> > In short:
> >
> > * The behaviour is what I need (I cannot skip data).
>
> decode_array_header, decode_map_header?
I proposed this too below in the my email :)
Yep, it is the possible variant. I feel this a bit weird, but at least
it eliminates the confusion. So okay.
I'll change the implementation, examples repository and graphql PRs
according to this naming.
WBR, Alexander Turenko.
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [PATCH v3 4/7] lua: add non-recursive msgpack decoding functions
2019-04-10 15:21 ` [PATCH v3 4/7] lua: add non-recursive msgpack decoding functions Alexander Turenko
2019-04-18 17:35 ` [tarantool-patches] " Konstantin Osipov
@ 2019-04-30 13:03 ` Vladimir Davydov
2019-04-30 18:38 ` Alexander Turenko
1 sibling, 1 reply; 39+ messages in thread
From: Vladimir Davydov @ 2019-04-30 13:03 UTC (permalink / raw)
To: Alexander Turenko; +Cc: tarantool-patches
On Wed, Apr 10, 2019 at 06:21:22PM +0300, Alexander Turenko wrote:
> Needed for #3276.
>
> @TarantoolBot document
> Title: Non-recursive msgpack decoding functions
>
> Contracts:
>
> ```
> msgpack.decode_array(buf.rpos, buf:size()) -> arr_len, new_rpos
> msgpack.decode_map(buf.rpos, buf:size()) -> map_len, new_rpos
> ```
Let's rename to decode_array_header and decode_map_header, as agreed
with Kostja. Other than that looks good to me.
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [PATCH v3 4/7] lua: add non-recursive msgpack decoding functions
2019-04-30 13:03 ` Vladimir Davydov
@ 2019-04-30 18:38 ` Alexander Turenko
0 siblings, 0 replies; 39+ messages in thread
From: Alexander Turenko @ 2019-04-30 18:38 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
On Tue, Apr 30, 2019 at 04:03:17PM +0300, Vladimir Davydov wrote:
> On Wed, Apr 10, 2019 at 06:21:22PM +0300, Alexander Turenko wrote:
> > Needed for #3276.
> >
> > @TarantoolBot document
> > Title: Non-recursive msgpack decoding functions
> >
> > Contracts:
> >
> > ```
> > msgpack.decode_array(buf.rpos, buf:size()) -> arr_len, new_rpos
> > msgpack.decode_map(buf.rpos, buf:size()) -> map_len, new_rpos
> > ```
>
> Let's rename to decode_array_header and decode_map_header, as agreed
> with Kostja. Other than that looks good to me.
Done. Also updated tarantool-merger-examples repo and graphql PRs.
Removed 'Consider merger's docbot comment for usage examples' from the
commit message, because the patch likely will not land into master at
the same time as merger.
Pushed to Totktonada/gh-3276-on-board-merger.
^ permalink raw reply [flat|nested] 39+ messages in thread
* [PATCH v3 5/7] net.box: add skip_header option to use with buffer
2019-04-10 15:21 [PATCH v3 0/7] Merger Alexander Turenko
` (3 preceding siblings ...)
2019-04-10 15:21 ` [PATCH v3 4/7] lua: add non-recursive msgpack decoding functions Alexander Turenko
@ 2019-04-10 15:21 ` Alexander Turenko
2019-04-18 17:37 ` [tarantool-patches] " Konstantin Osipov
2019-04-30 13:16 ` Vladimir Davydov
2019-04-10 15:21 ` [PATCH v3 6/7] Add merger for tuples streams (C part) Alexander Turenko
2019-04-10 15:21 ` [PATCH v3 7/7] Add merger for tuple streams (Lua part) Alexander Turenko
6 siblings, 2 replies; 39+ messages in thread
From: Alexander Turenko @ 2019-04-10 15:21 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches
Needed for #3276.
@TarantoolBot document
Title: net.box: skip_header option
This option instructs net.box to skip {[IPROTO_DATA_KEY] = ...} wrapper
from a buffer. This may be needed to pass this buffer to some C function
when it expects some specific msgpack input.
See src/box/lua/net_box.lua for examples.
---
src/box/lua/net_box.lua | 46 +++++---
test/box/net.box.result | 222 +++++++++++++++++++++++++++++++++++++-
test/box/net.box.test.lua | 86 ++++++++++++++-
3 files changed, 328 insertions(+), 26 deletions(-)
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index b3139a3f5..c6ed3e138 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -15,6 +15,7 @@ local max = math.max
local fiber_clock = fiber.clock
local fiber_self = fiber.self
local decode = msgpack.decode_unchecked
+local decode_map = msgpack.decode_map
local table_new = require('table.new')
local check_iterator_type = box.internal.check_iterator_type
@@ -483,8 +484,8 @@ local function create_transport(host, port, user, password, callback,
-- @retval nil, error Error occured.
-- @retval not nil Future object.
--
- local function perform_async_request(buffer, method, on_push, on_push_ctx,
- ...)
+ local function perform_async_request(buffer, skip_header, method, on_push,
+ on_push_ctx, ...)
if state ~= 'active' and state ~= 'fetch_schema' then
return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
reason = last_error})
@@ -497,12 +498,13 @@ local function create_transport(host, port, user, password, callback,
local id = next_request_id
method_encoder[method](send_buf, id, ...)
next_request_id = next_id(id)
- -- Request in most cases has maximum 8 members:
- -- method, buffer, id, cond, errno, response, on_push,
- -- on_push_ctx.
- local request = setmetatable(table_new(0, 8), request_mt)
+ -- Request in most cases has maximum 9 members:
+ -- method, buffer, skip_header, id, cond, errno, response,
+ -- on_push, on_push_ctx.
+ local request = setmetatable(table_new(0, 9), request_mt)
request.method = method
request.buffer = buffer
+ request.skip_header = skip_header
request.id = id
request.cond = fiber.cond()
requests[id] = request
@@ -516,10 +518,11 @@ local function create_transport(host, port, user, password, callback,
-- @retval nil, error Error occured.
-- @retval not nil Response object.
--
- local function perform_request(timeout, buffer, method, on_push,
- on_push_ctx, ...)
+ local function perform_request(timeout, buffer, skip_header, method,
+ on_push, on_push_ctx, ...)
local request, err =
- perform_async_request(buffer, method, on_push, on_push_ctx, ...)
+ perform_async_request(buffer, skip_header, method, on_push,
+ on_push_ctx, ...)
if not request then
return nil, err
end
@@ -551,6 +554,15 @@ local function create_transport(host, port, user, password, callback,
if buffer ~= nil then
-- Copy xrow.body to user-provided buffer
local body_len = body_end - body_rpos
+ if request.skip_header then
+ -- Skip {[IPROTO_DATA_KEY] = ...} wrapper.
+ local map_len, key
+ map_len, body_rpos = decode_map(body_rpos, body_len)
+ assert(map_len == 1)
+ key, body_rpos = decode(body_rpos)
+ assert(key == IPROTO_DATA_KEY)
+ body_len = body_end - body_rpos
+ end
local wpos = buffer:alloc(body_len)
ffi.copy(wpos, body_rpos, body_len)
body_len = tonumber(body_len)
@@ -1047,18 +1059,19 @@ end
function remote_methods:_request(method, opts, ...)
local transport = self._transport
- local on_push, on_push_ctx, buffer, deadline
+ local on_push, on_push_ctx, buffer, skip_header, deadline
-- Extract options, set defaults, check if the request is
-- async.
if opts then
buffer = opts.buffer
+ skip_header = opts.skip_header
if opts.is_async then
if opts.on_push or opts.on_push_ctx then
error('To handle pushes in an async request use future:pairs()')
end
local res, err =
- transport.perform_async_request(buffer, method, table.insert,
- {}, ...)
+ transport.perform_async_request(buffer, skip_header, method,
+ table.insert, {}, ...)
if err then
box.error(err)
end
@@ -1084,8 +1097,9 @@ function remote_methods:_request(method, opts, ...)
transport.wait_state('active', timeout)
timeout = deadline and max(0, deadline - fiber_clock())
end
- local res, err = transport.perform_request(timeout, buffer, method,
- on_push, on_push_ctx, ...)
+ local res, err = transport.perform_request(timeout, buffer, skip_header,
+ method, on_push, on_push_ctx,
+ ...)
if err then
box.error(err)
end
@@ -1288,10 +1302,10 @@ function console_methods:eval(line, timeout)
end
if self.protocol == 'Binary' then
local loader = 'return require("console").eval(...)'
- res, err = pr(timeout, nil, 'eval', nil, nil, loader, {line})
+ res, err = pr(timeout, nil, false, 'eval', nil, nil, loader, {line})
else
assert(self.protocol == 'Lua console')
- res, err = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n')
+ res, err = pr(timeout, nil, false, 'inject', nil, nil, line..'$EOF$\n')
end
if err then
box.error(err)
diff --git a/test/box/net.box.result b/test/box/net.box.result
index f71699818..8ef3de808 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -29,7 +29,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
offset, limit, key)
return ret
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
---
...
@@ -1598,6 +1598,18 @@ result
---
- {48: [[2]]}
...
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[2]]
+...
-- insert
c.space.test:insert({3}, {buffer = ibuf})
---
@@ -1610,6 +1622,21 @@ result
---
- {48: [[3]]}
...
+-- insert + skip_header
+_ = space:delete({3})
+---
+...
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
-- update
c.space.test:update({3}, {}, {buffer = ibuf})
---
@@ -1633,6 +1660,29 @@ result
---
- {48: [[3]]}
...
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
-- upsert
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
@@ -1645,6 +1695,18 @@ result
---
- {48: []}
...
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- delete
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
@@ -1657,6 +1719,18 @@ result
---
- {48: []}
...
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- select
c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
---
@@ -1669,6 +1743,18 @@ result
---
- {48: [[3], [2], [1, 'hello']]}
...
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true})
+---
+- 17
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3], [2], [1, 'hello']]
+...
-- select
len = c.space.test:select({}, {buffer = ibuf})
---
@@ -1692,6 +1778,29 @@ result
---
- {48: [[1, 'hello'], [2], [3], [4]]}
...
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+---
+...
+ibuf.rpos + len == ibuf.wpos
+---
+- true
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+ibuf.rpos == ibuf.wpos
+---
+- true
+...
+len
+---
+- 19
+...
+result
+---
+- [[1, 'hello'], [2], [3], [4]]
+...
-- call
c:call("echo", {1, 2, 3}, {buffer = ibuf})
---
@@ -1726,6 +1835,40 @@ result
---
- {48: []}
...
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- eval
c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
---
@@ -1760,6 +1903,75 @@ result
---
- {48: []}
...
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+-- make several request into a buffer with skip_header, then read
+-- results
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
-- unsupported methods
c.space.test:get({1}, { buffer = ibuf})
---
@@ -2596,7 +2808,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
---
...
c.state
@@ -3237,7 +3449,7 @@ c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
future = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
---
...
while not c:is_connected() do fiber.sleep(0.01) end
@@ -3372,7 +3584,7 @@ c:ping()
-- new attempts to read any data - the connection is closed
-- already.
--
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
---
...
while f:status() ~= 'dead' do fiber.sleep(0.01) end
@@ -3391,7 +3603,7 @@ c = net:connect(box.cfg.listen)
data = msgpack.encode(18400000000000000000)..'aaaaaaa'
---
...
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
---
- null
- Peer closed
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index beb33c24f..5ff2975ac 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -12,7 +12,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
offset, limit, key)
return ret
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
LISTEN = require('uri').parse(box.cfg.listen)
@@ -626,11 +626,22 @@ c.space.test:replace({2}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- insert
c.space.test:insert({3}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- insert + skip_header
+_ = space:delete({3})
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- update
c.space.test:update({3}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -639,21 +650,44 @@ c.space.test.index.primary:update({3}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- upsert
c.space.test:upsert({4}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- delete
c.space.test:upsert({4}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- select
c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- select
len = c.space.test:select({}, {buffer = ibuf})
ibuf.rpos + len == ibuf.wpos
@@ -662,6 +696,14 @@ ibuf.rpos == ibuf.wpos
len
result
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+ibuf.rpos + len == ibuf.wpos
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+ibuf.rpos == ibuf.wpos
+len
+result
+
-- call
c:call("echo", {1, 2, 3}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -673,6 +715,17 @@ c:call("echo", nil, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- eval
c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -684,6 +737,29 @@ c:eval("echo(...)", nil, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
+-- make several request into a buffer with skip_header, then read
+-- results
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- unsupported methods
c.space.test:get({1}, { buffer = ibuf})
c.space.test.index.primary:min({}, { buffer = ibuf})
@@ -1074,7 +1150,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
c.state
while not c:is_connected() do fiber.sleep(0.01) end
c:ping()
@@ -1307,7 +1383,7 @@ finalize_long()
--
c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
future = c:call('long_function', {1, 2, 3}, {is_async = true})
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
while not c:is_connected() do fiber.sleep(0.01) end
finalize_long()
future:wait_result(100)
@@ -1364,7 +1440,7 @@ c:ping()
-- new attempts to read any data - the connection is closed
-- already.
--
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
while f:status() ~= 'dead' do fiber.sleep(0.01) end
c:close()
@@ -1374,7 +1450,7 @@ c:close()
--
c = net:connect(box.cfg.listen)
data = msgpack.encode(18400000000000000000)..'aaaaaaa'
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
c:close()
test_run:grep_log('default', 'too big packet size in the header') ~= nil
--
2.20.1
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] [PATCH v3 5/7] net.box: add skip_header option to use with buffer
2019-04-10 15:21 ` [PATCH v3 5/7] net.box: add skip_header option to use with buffer Alexander Turenko
@ 2019-04-18 17:37 ` Konstantin Osipov
2019-04-18 18:39 ` Alexander Turenko
2019-04-30 13:16 ` Vladimir Davydov
1 sibling, 1 reply; 39+ messages in thread
From: Konstantin Osipov @ 2019-04-18 17:37 UTC (permalink / raw)
To: tarantool-patches; +Cc: Vladimir Davydov, Alexander Turenko
* Alexander Turenko <alexander.turenko@tarantool.org> [19/04/10 18:23]:
> Needed for #3276.
>
> @TarantoolBot document
> Title: net.box: skip_header option
>
> This option instructs net.box to skip {[IPROTO_DATA_KEY] = ...} wrapper
> from a buffer. This may be needed to pass this buffer to some C function
> when it expects some specific msgpack input.
I'm sorry I don't understand from this comment what this function
does and why. I would expect it to skip entire IPROTO_HEADER and
only return IPROTO_BODY. Does it?
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] [PATCH v3 5/7] net.box: add skip_header option to use with buffer
2019-04-18 17:37 ` [tarantool-patches] " Konstantin Osipov
@ 2019-04-18 18:39 ` Alexander Turenko
0 siblings, 0 replies; 39+ messages in thread
From: Alexander Turenko @ 2019-04-18 18:39 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches, Vladimir Davydov
On Thu, Apr 18, 2019 at 08:37:33PM +0300, Konstantin Osipov wrote:
> * Alexander Turenko <alexander.turenko@tarantool.org> [19/04/10 18:23]:
> > Needed for #3276.
> >
> > @TarantoolBot document
> > Title: net.box: skip_header option
> >
> > This option instructs net.box to skip {[IPROTO_DATA_KEY] = ...} wrapper
> > from a buffer. This may be needed to pass this buffer to some C function
> > when it expects some specific msgpack input.
>
> I'm sorry I don't understand from this comment what this function
> does and why. I would expect it to skip entire IPROTO_HEADER and
> only return IPROTO_BODY. Does it?
net.box with `buffer` option already skip a packet header, but
IPROTO_BODY is a singleton map with IPROTO_DATA_KEY as the key and data
itself as the value: {[48] = <data that I need>}.
I'm sorry that the description is confusing. It seems I should provide
more context here: give an example of iproto packet (say, result of a
box's select); then show a part that will be written to a buffer without
this option and with.
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [PATCH v3 5/7] net.box: add skip_header option to use with buffer
2019-04-10 15:21 ` [PATCH v3 5/7] net.box: add skip_header option to use with buffer Alexander Turenko
2019-04-18 17:37 ` [tarantool-patches] " Konstantin Osipov
@ 2019-04-30 13:16 ` Vladimir Davydov
2019-04-30 18:39 ` Alexander Turenko
1 sibling, 1 reply; 39+ messages in thread
From: Vladimir Davydov @ 2019-04-30 13:16 UTC (permalink / raw)
To: Alexander Turenko; +Cc: tarantool-patches
On Wed, Apr 10, 2019 at 06:21:23PM +0300, Alexander Turenko wrote:
> Needed for #3276.
>
> @TarantoolBot document
> Title: net.box: skip_header option
>
> This option instructs net.box to skip {[IPROTO_DATA_KEY] = ...} wrapper
> from a buffer. This may be needed to pass this buffer to some C function
> when it expects some specific msgpack input.
>
> See src/box/lua/net_box.lua for examples.
> ---
Looks good to me, but can't push it now as it depends on the previous
patch.
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [PATCH v3 5/7] net.box: add skip_header option to use with buffer
2019-04-30 13:16 ` Vladimir Davydov
@ 2019-04-30 18:39 ` Alexander Turenko
0 siblings, 0 replies; 39+ messages in thread
From: Alexander Turenko @ 2019-04-30 18:39 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
On Tue, Apr 30, 2019 at 04:16:25PM +0300, Vladimir Davydov wrote:
> On Wed, Apr 10, 2019 at 06:21:23PM +0300, Alexander Turenko wrote:
> > Needed for #3276.
> >
> > @TarantoolBot document
> > Title: net.box: skip_header option
> >
> > This option instructs net.box to skip {[IPROTO_DATA_KEY] = ...} wrapper
> > from a buffer. This may be needed to pass this buffer to some C function
> > when it expects some specific msgpack input.
> >
> > See src/box/lua/net_box.lua for examples.
> > ---
>
> Looks good to me, but can't push it now as it depends on the previous
> patch.
Konstantin found the description non-clear and so I have updated the
example in [1] to show skip_header option effect and put it into the
docbot comment.
Pushed to Totktonada/gh-3276-on-board-merger.
Cited the example below:
```lua
local net_box = require('net.box')
local buffer = require('buffer')
local ffi = require('ffi')
local msgpack = require('msgpack')
local yaml = require('yaml')
box.cfg{listen = 3301}
box.once('load_data', function()
box.schema.user.grant('guest', 'read,write,execute', 'universe')
box.schema.space.create('s')
box.space.s:create_index('pk')
box.space.s:insert({1})
box.space.s:insert({2})
box.space.s:insert({3})
box.space.s:insert({4})
end)
local function foo()
return box.space.s:select()
end
_G.foo = foo
local conn = net_box.connect('localhost:3301')
local buf = buffer.ibuf()
conn.space.s:select(nil, {buffer = buf})
local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
local buf_lua = msgpack.decode(buf_str)
print('select:\n' .. yaml.encode(buf_lua))
-- {48: [[1], [2], [3], [4]]}
local buf = buffer.ibuf()
conn.space.s:select(nil, {buffer = buf, skip_header = true})
local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
local buf_lua = msgpack.decode(buf_str)
print('select:\n' .. yaml.encode(buf_lua))
-- [[1], [2], [3], [4]]
local buf = buffer.ibuf()
conn:call('foo', nil, {buffer = buf})
local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
local buf_lua = msgpack.decode(buf_str)
print('call:\n' .. yaml.encode(buf_lua))
-- {48: [[[1], [2], [3], [4]]]}
local buf = buffer.ibuf()
conn:call('foo', nil, {buffer = buf, skip_header = true})
local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos)
local buf_lua = msgpack.decode(buf_str)
print('call:\n' .. yaml.encode(buf_lua))
-- [[[1], [2], [3], [4]]]
os.exit()
```
[1]: https://github.com/Totktonada/tarantool-merger-examples/tree/e65141ba18ff74f9c7f68b7a782ae00f920f8929#preparing-buffers
^ permalink raw reply [flat|nested] 39+ messages in thread
* [PATCH v3 6/7] Add merger for tuples streams (C part)
2019-04-10 15:21 [PATCH v3 0/7] Merger Alexander Turenko
` (4 preceding siblings ...)
2019-04-10 15:21 ` [PATCH v3 5/7] net.box: add skip_header option to use with buffer Alexander Turenko
@ 2019-04-10 15:21 ` Alexander Turenko
2019-04-25 11:43 ` [tarantool-patches] " Konstantin Osipov
2019-04-30 15:34 ` Vladimir Davydov
2019-04-10 15:21 ` [PATCH v3 7/7] Add merger for tuple streams (Lua part) Alexander Turenko
6 siblings, 2 replies; 39+ messages in thread
From: Alexander Turenko @ 2019-04-10 15:21 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches
Needed for #3276.
---
src/box/CMakeLists.txt | 1 +
src/box/merger.c | 464 +++++++++++++++++++++++++++++++++++++++
src/box/merger.h | 180 +++++++++++++++
test/unit/CMakeLists.txt | 3 +
test/unit/merger.result | 71 ++++++
test/unit/merger.test.c | 301 +++++++++++++++++++++++++
6 files changed, 1020 insertions(+)
create mode 100644 src/box/merger.c
create mode 100644 src/box/merger.h
create mode 100644 test/unit/merger.result
create mode 100644 test/unit/merger.test.c
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 7fbbc7803..d1251c326 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -121,6 +121,7 @@ add_library(box STATIC
execute.c
wal.c
call.c
+ merger.c
${lua_sources}
lua/init.c
lua/call.c
diff --git a/src/box/merger.c b/src/box/merger.c
new file mode 100644
index 000000000..83f628758
--- /dev/null
+++ b/src/box/merger.c
@@ -0,0 +1,464 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "box/merger.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#define HEAP_FORWARD_DECLARATION
+#include "salad/heap.h"
+
+#include "trivia/util.h" /* unlikely() */
+#include "diag.h" /* diag_set() */
+#include "say.h" /* panic() */
+#include "box/tuple.h" /* box_tuple_unref() */
+#include "box/tuple_format.h" /* box_tuple_format_*(),
+ tuple_format_id() */
+#include "box/key_def.h" /* box_key_def_*(),
+ box_tuple_compare() */
+
+/* {{{ Create, delete, ref, unref a source and a context */
+
+enum { MERGER_SOURCE_REF_MAX = INT_MAX };
+enum { MERGER_CONTEXT_REF_MAX = INT_MAX };
+
+void
+merger_source_ref(struct merger_source *source)
+{
+ if (unlikely(source->refs >= MERGER_SOURCE_REF_MAX))
+ panic("Merger source reference counter overflow");
+ ++source->refs;
+}
+
+void
+merger_source_unref(struct merger_source *source)
+{
+ assert(source->refs - 1 >= 0);
+ if (--source->refs == 0)
+ source->vtab->delete(source);
+}
+
+void
+merger_source_new(struct merger_source *source, struct merger_source_vtab *vtab)
+{
+ source->vtab = vtab;
+ source->refs = 0;
+}
+
+void
+merger_context_delete(struct merger_context *ctx)
+{
+ box_key_def_delete(ctx->key_def);
+ box_tuple_format_unref(ctx->format);
+ free(ctx);
+}
+
+void
+merger_context_ref(struct merger_context *ctx)
+{
+ if (unlikely(ctx->refs >= MERGER_CONTEXT_REF_MAX))
+ panic("Merger context reference counter overflow");
+ ++ctx->refs;
+}
+
+void
+merger_context_unref(struct merger_context *ctx)
+{
+ assert(ctx->refs - 1 >= 0);
+ if (--ctx->refs == 0)
+ merger_context_delete(ctx);
+}
+
+struct merger_context *
+merger_context_new(const struct key_def *key_def)
+{
+ struct merger_context *ctx = (struct merger_context *) malloc(
+ sizeof(struct merger_context));
+ if (ctx == NULL) {
+ diag_set(OutOfMemory, sizeof(struct merger_context), "malloc",
+ "merger_context");
+ return NULL;
+ }
+
+ /*
+ * We need to copy key_def, because a key_def from the
+ * parameter can be collected before merger_context end
+ * of life (say, by LuaJIT GC if the key_def comes from
+ * Lua).
+ */
+ ctx->key_def = key_def_dup(key_def);
+ if (ctx->key_def == NULL) {
+ free(ctx);
+ return NULL;
+ }
+
+ ctx->format = box_tuple_format_new(&ctx->key_def, 1);
+ if (ctx->format == NULL) {
+ box_key_def_delete(ctx->key_def);
+ free(ctx);
+ return NULL;
+ }
+
+ ctx->refs = 0;
+
+ return ctx;
+}
+
+/* }}} */
+
+/* {{{ Merger */
+
+/**
+ * Holds a source to fetch next tuples and a last fetched tuple to
+ * compare the node against other nodes.
+ *
+ * The main reason why this structure is separated from a merger
+ * source is that a heap node can not be a member of several
+ * heaps.
+ *
+ * The second reason is that it allows to incapsulate all heap
+ * related logic inside this compilation unit, without any trails
+ * in externally visible structures.
+ */
+struct merger_heap_node {
+ /* A source of tuples. */
+ struct merger_source *source;
+ /*
+ * A last fetched (refcounted) tuple to compare against
+ * other nodes.
+ */
+ struct tuple *tuple;
+ /* An anchor to make the structure a merger heap node. */
+ struct heap_node heap_node_anchor;
+};
+
+static bool
+merger_source_less(const heap_t *heap, const struct merger_heap_node *left,
+ const struct merger_heap_node *right);
+#define HEAP_NAME merger_heap
+#define HEAP_LESS merger_source_less
+#define heap_value_t struct merger_heap_node
+#define heap_value_attr heap_node_anchor
+#include "salad/heap.h"
+#undef HEAP_NAME
+#undef HEAP_LESS
+#undef heap_value_t
+#undef heap_value_attr
+
+/**
+ * Holds a heap, an immutable context, parameters of a merge
+ * process and utility fields.
+ */
+struct merger {
+ /* A merger is a source. */
+ struct merger_source base;
+ /*
+ * Whether a merge process started.
+ *
+ * The merger postpones charging of heap nodes until a
+ * first output tuple is acquired.
+ */
+ bool started;
+ /* A merger context. */
+ struct merger_context *ctx;
+ /*
+ * A heap of sources (of nodes that contains a source to
+ * be exact).
+ */
+ heap_t heap;
+ /* An array of heap nodes. */
+ uint32_t nodes_count;
+ struct merger_heap_node *nodes;
+ /* Ascending (false) / descending (true) order. */
+ bool reverse;
+};
+
+/* Helpers */
+
+/**
+ * Data comparing function to construct a heap of sources.
+ */
+static bool
+merger_source_less(const heap_t *heap, const struct merger_heap_node *left,
+ const struct merger_heap_node *right)
+{
+ if (left->tuple == NULL && right->tuple == NULL)
+ return false;
+ if (left->tuple == NULL)
+ return false;
+ if (right->tuple == NULL)
+ return true;
+ struct merger *merger = container_of(heap, struct merger, heap);
+ int cmp = box_tuple_compare(left->tuple, right->tuple,
+ merger->ctx->key_def);
+ return merger->reverse ? cmp >= 0 : cmp < 0;
+}
+
+/**
+ * How much more memory the heap will reserve at the next grow.
+ *
+ * See HEAP(reserve)() function in lib/salad/heap.h.
+ */
+static size_t
+heap_next_grow_size(const heap_t *heap)
+{
+ heap_off_t heap_capacity_diff = heap->capacity == 0 ?
+ HEAP_INITIAL_CAPACITY : heap->capacity;
+ return heap_capacity_diff * sizeof(struct heap_node *);
+}
+
+/**
+ * Initialize a new merger heap node.
+ */
+static void
+merger_heap_node_new(struct merger_heap_node *node,
+ struct merger_source *source)
+{
+ node->source = source;
+ merger_source_ref(node->source);
+ node->tuple = NULL;
+ heap_node_create(&node->heap_node_anchor);
+}
+
+/**
+ * Free a merger heap node.
+ */
+static void
+merger_heap_node_delete(struct merger_heap_node *node)
+{
+ merger_source_unref(node->source);
+ if (node->tuple != NULL)
+ box_tuple_unref(node->tuple);
+}
+
+/**
+ * The helper to add a new heap node to a merger heap.
+ *
+ * Return -1 at an error and set a diag.
+ *
+ * Otherwise store a next tuple in node->tuple, add the node to
+ * merger->heap and return 0.
+ */
+static int
+merger_add_heap_node(struct merger *merger, struct merger_heap_node *node)
+{
+ struct tuple *tuple = NULL;
+
+ /* Acquire a next tuple. */
+ struct merger_source *source = node->source;
+ if (source->vtab->next(source, merger->ctx->format, &tuple) != 0)
+ return -1;
+
+ /* Don't add an empty source to a heap. */
+ if (tuple == NULL)
+ return 0;
+
+ node->tuple = tuple;
+
+ /* Add a node to a heap. */
+ if (merger_heap_insert(&merger->heap, node)) {
+ diag_set(OutOfMemory, heap_next_grow_size(&merger->heap),
+ "malloc", "merger->heap");
+ return -1;
+ }
+
+ return 0;
+}
+
+/* Virtual methods declarations */
+
+static void
+merger_delete(struct merger_source *base);
+static int
+merger_next(struct merger_source *base, box_tuple_format_t *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+struct merger_source *
+merger_new(struct merger_context *ctx)
+{
+ static struct merger_source_vtab merger_vtab = {
+ .delete = merger_delete,
+ .next = merger_next,
+ };
+
+ struct merger *merger = (struct merger *) malloc(sizeof(struct merger));
+ if (merger == NULL) {
+ diag_set(OutOfMemory, sizeof(struct merger), "malloc",
+ "merger");
+ return NULL;
+ }
+
+ merger_source_new(&merger->base, &merger_vtab);
+
+ merger->started = false;
+ merger->ctx = ctx;
+ merger_context_ref(merger->ctx);
+ merger_heap_create(&merger->heap);
+ merger->nodes_count = 0;
+ merger->nodes = NULL;
+ merger->reverse = false;
+
+ return &merger->base;
+}
+
+int
+merger_set_sources(struct merger_source *base, struct merger_source **sources,
+ uint32_t sources_count)
+{
+ struct merger *merger = container_of(base, struct merger, base);
+
+ /* Ensure we don't leak old nodes. */
+ assert(merger->nodes_count == 0);
+ assert(merger->nodes == NULL);
+
+ const size_t nodes_size =
+ sources_count * sizeof(struct merger_heap_node);
+ struct merger_heap_node *nodes = (struct merger_heap_node *) malloc(
+ nodes_size);
+ if (nodes == NULL) {
+ diag_set(OutOfMemory, nodes_size, "malloc",
+ "merger heap nodes");
+ return -1;
+ }
+
+ for (uint32_t i = 0; i < sources_count; ++i)
+ merger_heap_node_new(&nodes[i], sources[i]);
+
+ merger->nodes_count = sources_count;
+ merger->nodes = nodes;
+ return 0;
+}
+
+void
+merger_set_reverse(struct merger_source *base, bool reverse)
+{
+ struct merger *merger = container_of(base, struct merger, base);
+
+ merger->reverse = reverse;
+}
+
+/* Virtual methods */
+
+static void
+merger_delete(struct merger_source *base)
+{
+ struct merger *merger = container_of(base, struct merger, base);
+
+ merger_context_unref(merger->ctx);
+ merger_heap_destroy(&merger->heap);
+
+ for (uint32_t i = 0; i < merger->nodes_count; ++i)
+ merger_heap_node_delete(&merger->nodes[i]);
+
+ if (merger->nodes != NULL)
+ free(merger->nodes);
+
+ free(merger);
+}
+
+static int
+merger_next(struct merger_source *base, box_tuple_format_t *format,
+ struct tuple **out)
+{
+ struct merger *merger = container_of(base, struct merger, base);
+
+ /*
+ * Fetch a first tuple for each source and add all heap
+ * nodes to a merger heap.
+ */
+ if (!merger->started) {
+ for (uint32_t i = 0; i < merger->nodes_count; ++i) {
+ struct merger_heap_node *node = &merger->nodes[i];
+ if (merger_add_heap_node(merger, node) != 0)
+ return -1;
+ }
+ merger->started = true;
+ }
+
+ /* Get a next tuple. */
+ struct merger_heap_node *node = merger_heap_top(&merger->heap);
+ if (node == NULL) {
+ *out = NULL;
+ return 0;
+ }
+ struct tuple *tuple = node->tuple;
+ assert(tuple != NULL);
+
+ /*
+ * The tuples are stored in merger->ctx->format for
+ * fast comparisons, but we should return tuples in a
+ * requested format.
+ */
+ uint32_t id_stored = tuple_format_id(merger->ctx->format);
+ assert(tuple->format_id == id_stored);
+ if (format == NULL)
+ format = merger->ctx->format;
+ uint32_t id_requested = tuple_format_id(format);
+ if (id_stored != id_requested) {
+ const char *tuple_beg = tuple_data(tuple);
+ const char *tuple_end = tuple_beg + tuple->bsize;
+ tuple = box_tuple_new(format, tuple_beg, tuple_end);
+ if (tuple == NULL)
+ return -1;
+ box_tuple_ref(tuple);
+ /*
+ * The node->tuple pointer will be rewritten below
+ * and in this branch it will not be returned. So
+ * we unreference it.
+ */
+ box_tuple_unref(node->tuple);
+ }
+
+ /*
+ * Note: An old node->tuple pointer will be written to
+ * *out as refcounted tuple or is already unreferenced
+ * above, so we don't unreference it here.
+ */
+ struct merger_source *source = node->source;
+ if (source->vtab->next(source, merger->ctx->format, &node->tuple) != 0)
+ return -1;
+
+ /* Update a heap. */
+ if (node->tuple == NULL)
+ merger_heap_delete(&merger->heap, node);
+ else
+ merger_heap_update(&merger->heap, node);
+
+ *out = tuple;
+ return 0;
+}
+
+/* }}} */
diff --git a/src/box/merger.h b/src/box/merger.h
new file mode 100644
index 000000000..2323dd7d7
--- /dev/null
+++ b/src/box/merger.h
@@ -0,0 +1,180 @@
+#ifndef TARANTOOL_BOX_MERGER_H_INCLUDED
+#define TARANTOOL_BOX_MERGER_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <stdbool.h>
+#include <stdint.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/* {{{ Structures */
+
+struct tuple;
+struct key_def;
+struct tuple_format;
+typedef struct tuple_format box_tuple_format_t;
+
+struct merger_source;
+struct merger_context;
+
+struct merger_source_vtab {
+ /**
+ * Free a merger source.
+ *
+ * Don't call it directly, use merger_source_unref()
+ * instead.
+ */
+ void (*delete)(struct merger_source *base);
+ /**
+ * Get a next tuple (refcounted) from a source.
+ *
+ * When format is NULL it means that it does not matter
+ * for a caller in which format a tuple will be.
+ *
+ * Return 0 when successfully fetched a tuple or NULL. In
+ * case of an error set a diag and return -1.
+ */
+ int (*next)(struct merger_source *base, box_tuple_format_t *format,
+ struct tuple **out);
+};
+
+/**
+ * Base (abstract) structure to represent a merger source.
+ *
+ * The structure does not hold any resources.
+ */
+struct merger_source {
+ /* Source-specific methods. */
+ struct merger_source_vtab *vtab;
+ /* Reference counter. */
+ int refs;
+};
+
+/**
+ * Holds immutable parameters of a merger.
+ */
+struct merger_context {
+ struct key_def *key_def;
+ box_tuple_format_t *format;
+ /* Reference counter. */
+ int refs;
+};
+
+/* }}} */
+
+/* {{{ Create, delete, ref, unref a source and a context */
+
+/**
+ * Increment a merger source reference counter.
+ */
+void
+merger_source_ref(struct merger_source *source);
+
+/**
+ * Decrement a merger source reference counter. When it has
+ * reached zero, free the source (call delete() virtual method).
+ */
+void
+merger_source_unref(struct merger_source *source);
+
+/**
+ * Initialize a base merger source structure.
+ */
+void
+merger_source_new(struct merger_source *source,
+ struct merger_source_vtab *vtab);
+
+/**
+ * Free a merger context.
+ */
+void
+merger_context_delete(struct merger_context *ctx);
+
+/**
+ * Increment a merger context reference counter.
+ */
+void
+merger_context_ref(struct merger_context *ctx);
+
+/**
+ * Decrement a merger context reference counter. When it has
+ * reached zero, free the context.
+ */
+void
+merger_context_unref(struct merger_context *ctx);
+
+/**
+ * Create a new merger context.
+ *
+ * A returned merger context is NOT reference counted.
+ *
+ * Return NULL and set a diag in case of an error.
+ */
+struct merger_context *
+merger_context_new(const struct key_def *key_def);
+
+/* }}} */
+
+/* {{{ Merger */
+
+/**
+ * Create a new merger w/o sources.
+ *
+ * Return NULL and set a diag in case of an error.
+ */
+struct merger_source *
+merger_new(struct merger_context *ctx);
+
+/**
+ * Set sources for a merger.
+ *
+ * Return 0 at success. Return -1 at an error and set a diag.
+ */
+int
+merger_set_sources(struct merger_source *base, struct merger_source **sources,
+ uint32_t sources_count);
+
+/**
+ * Set reverse flag for a merger.
+ */
+void
+merger_set_reverse(struct merger_source *base, bool reverse);
+
+/* }}} */
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_BOX_MERGER_H_INCLUDED */
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 64739ab09..afdcbebf6 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -224,3 +224,6 @@ target_link_libraries(swim.test unit swim)
add_executable(swim_proto.test swim_proto.c swim_test_transport.c swim_test_ev.c
swim_test_utils.c ${PROJECT_SOURCE_DIR}/src/version.c)
target_link_libraries(swim_proto.test unit swim)
+
+add_executable(merger.test merger.test.c)
+target_link_libraries(merger.test unit core box)
diff --git a/test/unit/merger.result b/test/unit/merger.result
new file mode 100644
index 000000000..11399e0bc
--- /dev/null
+++ b/test/unit/merger.result
@@ -0,0 +1,71 @@
+1..4
+ *** test_basic ***
+ 1..9
+ *** test_array_source ***
+ ok 1 - array source next() (any format): tuple != NULL
+ ok 2 - array source next() (any format): skip tuple format id check
+ ok 3 - array source next() (any format): check tuple size
+ ok 4 - array source next() (any format): check tuple data
+ ok 5 - array source next() (any format): tuple != NULL
+ ok 6 - array source next() (any format): skip tuple format id check
+ ok 7 - array source next() (any format): check tuple size
+ ok 8 - array source next() (any format): check tuple data
+ ok 9 - array source is empty (any format)
+ *** test_array_source: done ***
+ok 1 - subtests
+ 1..9
+ *** test_array_source ***
+ ok 1 - array source next() (user's format): tuple != NULL
+ ok 2 - array source next() (user's format): check tuple format id
+ ok 3 - array source next() (user's format): check tuple size
+ ok 4 - array source next() (user's format): check tuple data
+ ok 5 - array source next() (user's format): tuple != NULL
+ ok 6 - array source next() (user's format): check tuple format id
+ ok 7 - array source next() (user's format): check tuple size
+ ok 8 - array source next() (user's format): check tuple data
+ ok 9 - array source is empty (user's format)
+ *** test_array_source: done ***
+ok 2 - subtests
+ 1..17
+ *** test_merger ***
+ ok 1 - merger next() (any format): tuple != NULL
+ ok 2 - merger next() (any format): skip tuple format id check
+ ok 3 - merger next() (any format): check tuple size
+ ok 4 - merger next() (any format): check tuple data
+ ok 5 - merger next() (any format): tuple != NULL
+ ok 6 - merger next() (any format): skip tuple format id check
+ ok 7 - merger next() (any format): check tuple size
+ ok 8 - merger next() (any format): check tuple data
+ ok 9 - merger next() (any format): tuple != NULL
+ ok 10 - merger next() (any format): skip tuple format id check
+ ok 11 - merger next() (any format): check tuple size
+ ok 12 - merger next() (any format): check tuple data
+ ok 13 - merger next() (any format): tuple != NULL
+ ok 14 - merger next() (any format): skip tuple format id check
+ ok 15 - merger next() (any format): check tuple size
+ ok 16 - merger next() (any format): check tuple data
+ ok 17 - merger is empty (any format)
+ *** test_merger: done ***
+ok 3 - subtests
+ 1..17
+ *** test_merger ***
+ ok 1 - merger next() (user's format): tuple != NULL
+ ok 2 - merger next() (user's format): check tuple format id
+ ok 3 - merger next() (user's format): check tuple size
+ ok 4 - merger next() (user's format): check tuple data
+ ok 5 - merger next() (user's format): tuple != NULL
+ ok 6 - merger next() (user's format): check tuple format id
+ ok 7 - merger next() (user's format): check tuple size
+ ok 8 - merger next() (user's format): check tuple data
+ ok 9 - merger next() (user's format): tuple != NULL
+ ok 10 - merger next() (user's format): check tuple format id
+ ok 11 - merger next() (user's format): check tuple size
+ ok 12 - merger next() (user's format): check tuple data
+ ok 13 - merger next() (user's format): tuple != NULL
+ ok 14 - merger next() (user's format): check tuple format id
+ ok 15 - merger next() (user's format): check tuple size
+ ok 16 - merger next() (user's format): check tuple data
+ ok 17 - merger is empty (user's format)
+ *** test_merger: done ***
+ok 4 - subtests
+ *** test_basic: done ***
diff --git a/test/unit/merger.test.c b/test/unit/merger.test.c
new file mode 100644
index 000000000..0a25d8f04
--- /dev/null
+++ b/test/unit/merger.test.c
@@ -0,0 +1,301 @@
+#include "unit.h" /* plan, header, footer, is, ok */
+#include "memory.h" /* memory_init() */
+#include "fiber.h" /* fiber_init() */
+#include "box/tuple.h" /* tuple_init(), box_tuple_*(),
+ tuple_*() */
+#include "box/tuple_format.h" /* box_tuple_format_default(),
+ tuple_format_id() */
+#include "box/key_def.h" /* key_def_new(),
+ key_def_delete() */
+#include "box/merger.h" /* merger_*() */
+
+/* {{{ Array merger source */
+
+struct merger_source_array {
+ struct merger_source base;
+ uint32_t tuples_count;
+ struct tuple **tuples;
+ uint32_t cur;
+};
+
+/* Virtual methods declarations */
+
+static void
+merger_source_array_delete(struct merger_source *base);
+static int
+merger_source_array_next(struct merger_source *base, box_tuple_format_t *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+static struct merger_source *
+merger_source_array_new(bool even)
+{
+ static struct merger_source_vtab merger_source_array_vtab = {
+ .delete = merger_source_array_delete,
+ .next = merger_source_array_next,
+ };
+
+ struct merger_source_array *source =
+ (struct merger_source_array *) malloc(
+ sizeof(struct merger_source_array));
+ assert(source != NULL);
+
+ merger_source_new(&source->base, &merger_source_array_vtab);
+
+ uint32_t tuple_size = 2;
+ const uint32_t tuples_count = 2;
+ /* {1}, {3} */
+ static const char *data_odd[] = {"\x91\x01", "\x91\x03"};
+ /* {2}, {4} */
+ static const char *data_even[] = {"\x91\x02", "\x91\x04"};
+ const char **data = even ? data_even : data_odd;
+ source->tuples = (struct tuple **) malloc(
+ tuples_count * sizeof(struct tuple *));
+ assert(source->tuples != NULL);
+ box_tuple_format_t *format = box_tuple_format_default();
+ for (uint32_t i = 0; i < tuples_count; ++i) {
+ const char *end = data[i] + tuple_size;
+ source->tuples[i] = box_tuple_new(format, data[i], end);
+ box_tuple_ref(source->tuples[i]);
+ }
+ source->tuples_count = tuples_count;
+ source->cur = 0;
+
+ return &source->base;
+}
+
+/* Virtual methods */
+
+static void
+merger_source_array_delete(struct merger_source *base)
+{
+ struct merger_source_array *source = container_of(base,
+ struct merger_source_array, base);
+
+ for (uint32_t i = 0; i < source->tuples_count; ++i)
+ box_tuple_unref(source->tuples[i]);
+
+ free(source->tuples);
+ free(source);
+}
+
+static int
+merger_source_array_next(struct merger_source *base, box_tuple_format_t *format,
+ struct tuple **out)
+{
+ struct merger_source_array *source = container_of(base,
+ struct merger_source_array, base);
+
+ if (source->cur == source->tuples_count) {
+ *out = NULL;
+ return 0;
+ }
+
+ struct tuple *tuple = source->tuples[source->cur];
+
+ box_tuple_format_t *default_format = box_tuple_format_default();
+ uint32_t id_stored = tuple_format_id(default_format);
+ assert(tuple->format_id == id_stored);
+ if (format == NULL)
+ format = default_format;
+ uint32_t id_requested = tuple_format_id(format);
+ if (id_stored != id_requested) {
+ const char *tuple_beg = tuple_data(tuple);
+ const char *tuple_end = tuple_beg + tuple->bsize;
+ tuple = box_tuple_new(format, tuple_beg, tuple_end);
+ assert(tuple != NULL);
+ }
+
+ assert(tuple != NULL);
+ box_tuple_ref(tuple);
+ *out = tuple;
+ ++source->cur;
+ return 0;
+}
+
+/* }}} */
+
+static struct key_part_def key_part_unsigned = {
+ .fieldno = 0,
+ .type = FIELD_TYPE_UNSIGNED,
+ .coll_id = COLL_NONE,
+ .is_nullable = false,
+ .nullable_action = ON_CONFLICT_ACTION_DEFAULT,
+ .sort_order = SORT_ORDER_ASC,
+ .path = NULL,
+};
+
+static struct key_part_def key_part_integer = {
+ .fieldno = 0,
+ .type = FIELD_TYPE_INTEGER,
+ .coll_id = COLL_NONE,
+ .is_nullable = false,
+ .nullable_action = ON_CONFLICT_ACTION_DEFAULT,
+ .sort_order = SORT_ORDER_ASC,
+ .path = NULL,
+};
+
+uint32_t
+min_u32(uint32_t a, uint32_t b)
+{
+ return a < b ? a : b;
+}
+
+void
+check_tuple(struct tuple *tuple, box_tuple_format_t *format,
+ const char *exp_data, uint32_t exp_data_len, const char *case_name)
+{
+ uint32_t size;
+ const char *data = tuple_data_range(tuple, &size);
+
+ ok(tuple != NULL, "%s: tuple != NULL", case_name);
+ if (format == NULL) {
+ ok(true, "%s: skip tuple format id check", case_name);
+ } else {
+ is(tuple->format_id, tuple_format_id(format),
+ "%s: check tuple format id", case_name);
+ }
+ is(size, exp_data_len, "%s: check tuple size", case_name);
+ ok(!strncmp(data, exp_data, min_u32(size, exp_data_len)),
+ "%s: check tuple data", case_name);
+}
+
+/**
+ * Check array source itself (just in case).
+ */
+int
+test_array_source(box_tuple_format_t *format)
+{
+ plan(9);
+ header();
+
+ /* {1}, {3} */
+ const uint32_t exp_tuple_size = 2;
+ const uint32_t exp_tuples_count = 2;
+ static const char *exp_tuples_data[] = {"\x91\x01", "\x91\x03"};
+
+ struct merger_source *source = merger_source_array_new(false);
+ assert(source != NULL);
+ merger_source_ref(source);
+
+ struct tuple *tuple = NULL;
+ const char *msg = format == NULL ?
+ "array source next() (any format)" :
+ "array source next() (user's format)";
+ for (uint32_t i = 0; i < exp_tuples_count; ++i) {
+ int rc = source->vtab->next(source, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size,
+ msg);
+ box_tuple_unref(tuple);
+ }
+ int rc = source->vtab->next(source, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ is(tuple, NULL, format == NULL ?
+ "array source is empty (any format)" :
+ "array source is empty (user's format)");
+
+ merger_source_unref(source);
+
+ footer();
+ return check_plan();
+}
+
+int
+test_merger(box_tuple_format_t *format)
+{
+ plan(17);
+ header();
+
+ /* {1}, {2}, {3}, {4} */
+ const uint32_t exp_tuple_size = 2;
+ const uint32_t exp_tuples_count = 4;
+ static const char *exp_tuples_data[] = {
+ "\x91\x01", "\x91\x02", "\x91\x03", "\x91\x04",
+ };
+
+ const uint32_t sources_count = 2;
+ struct merger_source *sources[] = {
+ merger_source_array_new(false),
+ merger_source_array_new(true),
+ };
+ merger_source_ref(sources[0]);
+ merger_source_ref(sources[1]);
+
+ struct key_def *key_def = key_def_new(&key_part_unsigned, 1);
+ struct merger_context *ctx = merger_context_new(key_def);
+ merger_context_ref(ctx);
+ key_def_delete(key_def);
+ struct merger_source *merger = merger_new(ctx);
+ merger_set_sources(merger, sources, sources_count);
+ merger_set_reverse(merger, false);
+ merger_source_ref(merger);
+
+ struct tuple *tuple = NULL;
+ const char *msg = format == NULL ?
+ "merger next() (any format)" :
+ "merger next() (user's format)";
+ for (uint32_t i = 0; i < exp_tuples_count; ++i) {
+ int rc = merger->vtab->next(merger, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ check_tuple(tuple, format, exp_tuples_data[i], exp_tuple_size,
+ msg);
+ box_tuple_unref(tuple);
+ }
+ int rc = merger->vtab->next(merger, format, &tuple);
+ (void) rc;
+ assert(rc == 0);
+ is(tuple, NULL, format == NULL ?
+ "merger is empty (any format)" :
+ "merger is empty (user's format)");
+
+ merger_source_unref(merger);
+ merger_context_unref(ctx);
+ merger_source_unref(sources[0]);
+ merger_source_unref(sources[1]);
+
+ footer();
+ return check_plan();
+}
+
+int
+test_basic()
+{
+ plan(4);
+ header();
+
+ struct key_def *key_def = key_def_new(&key_part_integer, 1);
+ box_tuple_format_t *format = box_tuple_format_new(&key_def, 1);
+ assert(format != NULL);
+
+ test_array_source(NULL);
+ test_array_source(format);
+ test_merger(NULL);
+ test_merger(format);
+
+ key_def_delete(key_def);
+ box_tuple_format_unref(format);
+
+ footer();
+ return check_plan();
+}
+
+int
+main()
+{
+ memory_init();
+ fiber_init(fiber_c_invoke);
+ tuple_init(NULL);
+
+ int rc = test_basic();
+
+ tuple_free();
+ fiber_free();
+ memory_free();
+
+ return rc;
+}
--
2.20.1
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] [PATCH v3 6/7] Add merger for tuples streams (C part)
2019-04-10 15:21 ` [PATCH v3 6/7] Add merger for tuples streams (C part) Alexander Turenko
@ 2019-04-25 11:43 ` Konstantin Osipov
2019-04-25 13:32 ` Alexander Turenko
2019-04-30 15:34 ` Vladimir Davydov
1 sibling, 1 reply; 39+ messages in thread
From: Konstantin Osipov @ 2019-04-25 11:43 UTC (permalink / raw)
To: tarantool-patches; +Cc: Vladimir Davydov, Alexander Turenko
* Alexander Turenko <alexander.turenko@tarantool.org> [19/04/10 18:23]:
> +enum { MERGER_SOURCE_REF_MAX = INT_MAX };
It's merge_source, not merger, source, no? Here and in the rest of
the code.
> +void
> +merger_context_delete(struct merger_context *ctx)
Please decide it's either simply merger or merge_context.
merger_context does not make any sense to me.
what's in the context anyway? Can you have multiple contexts in a
single merge? If no, why not simply merger? Why do you need both,
a merger and a context?
> +/**
> + * Holds immutable parameters of a merger.
> + */
> +struct merger_context {
> + struct key_def *key_def;
> + box_tuple_format_t *format;
> + /* Reference counter. */
> + int refs;
> +};
Uhm.. Is it a comparison/sort context, not merger context then?
Please explain why you need to ref it btw. I would simply pass it
into Lua via ffi, to avoid any references, and make it clear that
the life cycle of this object is bound to the merge window.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] [PATCH v3 6/7] Add merger for tuples streams (C part)
2019-04-25 11:43 ` [tarantool-patches] " Konstantin Osipov
@ 2019-04-25 13:32 ` Alexander Turenko
2019-04-25 13:45 ` Konstantin Osipov
0 siblings, 1 reply; 39+ messages in thread
From: Alexander Turenko @ 2019-04-25 13:32 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches, Vladimir Davydov
On Thu, Apr 25, 2019 at 02:43:43PM +0300, Konstantin Osipov wrote:
> * Alexander Turenko <alexander.turenko@tarantool.org> [19/04/10 18:23]:
> > +enum { MERGER_SOURCE_REF_MAX = INT_MAX };
>
> It's merge_source, not merger, source, no? Here and in the rest of
> the code.
merge source or merger's source. I found it neat to have one prefix
'merger_' for all related structures and functions.
But okay, I'll check it with Vladimir and if he agree will change those
names.
>
> > +void
> > +merger_context_delete(struct merger_context *ctx)
>
> Please decide it's either simply merger or merge_context.
> merger_context does not make any sense to me.
merger is a special kind of a source, the different thing.
>
> what's in the context anyway? Can you have multiple contexts in a
> single merge? If no, why not simply merger? Why do you need both,
> a merger and a context?
>
> > +/**
> > + * Holds immutable parameters of a merger.
> > + */
> > +struct merger_context {
> > + struct key_def *key_def;
> > + box_tuple_format_t *format;
> > + /* Reference counter. */
> > + int refs;
> > +};
>
> Uhm.. Is it a comparison/sort context, not merger context then?
key_def is here for comparisons, yep, but format is for creating tuples
(with needed offsets). I'm like 'merger_' prefix, because it makes clear
that it is something that a merger need to perform its work.
The name is very common, yep. 'Comparison/sort context' would be more
specific, but not exact: it would arise a question: 'why a format is a
part of comparison context'?
I'll discuss this point with Vladimir too.
> Please explain why you need to ref it btw. I would simply pass it
> into Lua via ffi, to avoid any references, and make it clear that
> the life cycle of this object is bound to the merge window.
So you'll need to construct a format in Lua somehow. Moreover, you'll
need to create *right* format in Lua to make comparisons fast. Looks as
a way more complex approach.
Maybe I didn't get your idea.
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] [PATCH v3 6/7] Add merger for tuples streams (C part)
2019-04-25 13:32 ` Alexander Turenko
@ 2019-04-25 13:45 ` Konstantin Osipov
2019-04-25 15:32 ` [tarantool-patches] " Alexander Turenko
0 siblings, 1 reply; 39+ messages in thread
From: Konstantin Osipov @ 2019-04-25 13:45 UTC (permalink / raw)
To: Alexander Turenko; +Cc: tarantool-patches, Vladimir Davydov
* Alexander Turenko <alexander.turenko@tarantool.org> [19/04/25 16:37]:
> merge source or merger's source. I found it neat to have one prefix
> 'merger_' for all related structures and functions.
What naming scheme other libraries for map/reduce use?
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v3 6/7] Add merger for tuples streams (C part)
2019-04-25 13:45 ` Konstantin Osipov
@ 2019-04-25 15:32 ` Alexander Turenko
2019-04-25 16:42 ` Konstantin Osipov
0 siblings, 1 reply; 39+ messages in thread
From: Alexander Turenko @ 2019-04-25 15:32 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches, Vladimir Davydov
On Thu, Apr 25, 2019 at 04:45:39PM +0300, Konstantin Osipov wrote:
> * Alexander Turenko <alexander.turenko@tarantool.org> [19/04/25 16:37]:
> > merge source or merger's source. I found it neat to have one prefix
> > 'merger_' for all related structures and functions.
>
> What naming scheme other libraries for map/reduce use?
Our merger is more like a specific reduce phase when we see on it in
context of a general map-reduce framework. It is hard to find an
intersection of our names with, say, hadoop.
Once in the past we discussed how to better integrate general map-reduce
API into net.box, but then decided to implement merger first as the
concrete reduce phase w/o attempts to generalize it much.
Anyway, I see the word 'input' is quite often in map-reduce libraries
documentation. Should we use it instead of 'source'? merge_input or
merger_input?
WBR, Alexander Turenko.
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v3 6/7] Add merger for tuples streams (C part)
2019-04-25 15:32 ` [tarantool-patches] " Alexander Turenko
@ 2019-04-25 16:42 ` Konstantin Osipov
0 siblings, 0 replies; 39+ messages in thread
From: Konstantin Osipov @ 2019-04-25 16:42 UTC (permalink / raw)
To: Alexander Turenko; +Cc: tarantool-patches, Vladimir Davydov
* Alexander Turenko <alexander.turenko@tarantool.org> [19/04/25 18:36]:
> Anyway, I see the word 'input' is quite often in map-reduce libraries
> documentation. Should we use it instead of 'source'? merge_input or
> merger_input?
The problem is not with the word "source", it's with the word
"merger".
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [PATCH v3 6/7] Add merger for tuples streams (C part)
2019-04-10 15:21 ` [PATCH v3 6/7] Add merger for tuples streams (C part) Alexander Turenko
2019-04-25 11:43 ` [tarantool-patches] " Konstantin Osipov
@ 2019-04-30 15:34 ` Vladimir Davydov
2019-05-07 22:14 ` Alexander Turenko
1 sibling, 1 reply; 39+ messages in thread
From: Vladimir Davydov @ 2019-04-30 15:34 UTC (permalink / raw)
To: Alexander Turenko; +Cc: tarantool-patches
On Wed, Apr 10, 2019 at 06:21:24PM +0300, Alexander Turenko wrote:
> diff --git a/src/box/merger.c b/src/box/merger.c
> new file mode 100644
> index 000000000..83f628758
> --- /dev/null
> +++ b/src/box/merger.c
> @@ -0,0 +1,464 @@
> +/*
> + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + * copyright notice, this list of conditions and the
> + * following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + * copyright notice, this list of conditions and the following
> + * disclaimer in the documentation and/or other materials
> + * provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +
> +#include "box/merger.h"
> +
> +#include <assert.h>
> +#include <stdbool.h>
> +#include <stdint.h>
> +#include <stdlib.h>
> +
> +#define HEAP_FORWARD_DECLARATION
> +#include "salad/heap.h"
> +
> +#include "trivia/util.h" /* unlikely() */
> +#include "diag.h" /* diag_set() */
> +#include "say.h" /* panic() */
> +#include "box/tuple.h" /* box_tuple_unref() */
> +#include "box/tuple_format.h" /* box_tuple_format_*(),
> + tuple_format_id() */
> +#include "box/key_def.h" /* box_key_def_*(),
> + box_tuple_compare() */
> +
> +/* {{{ Create, delete, ref, unref a source and a context */
> +
> +enum { MERGER_SOURCE_REF_MAX = INT_MAX };
> +enum { MERGER_CONTEXT_REF_MAX = INT_MAX };
> +
> +void
> +merger_source_ref(struct merger_source *source)
I tend to agree with Kostja that merger_source sounds sort of awkward,
like it's a source of mergers (cf. tuple_source - source of tuples).
Similarly, merger_context => merge_context. BTW what about including
'reverse' in merge_context?
> +{
> + if (unlikely(source->refs >= MERGER_SOURCE_REF_MAX))
> + panic("Merger source reference counter overflow");
> + ++source->refs;
> +}
> +
> +void
> +merger_source_unref(struct merger_source *source)
> +{
> + assert(source->refs - 1 >= 0);
> + if (--source->refs == 0)
> + source->vtab->delete(source);
> +}
> +
> +void
> +merger_source_new(struct merger_source *source, struct merger_source_vtab *vtab)
We typically call a constructor function _create: merger_source_create.
> +{
> + source->vtab = vtab;
> + source->refs = 0;
> +}
> +
> +void
> +merger_context_delete(struct merger_context *ctx)
> +{
> + box_key_def_delete(ctx->key_def);
> + box_tuple_format_unref(ctx->format);
I suggest avoid using box_ methods in the code - after all those are
intended for external modules. Creating a key_def with key_def_dup and
freeing it with box_key_def_delete looks especially weird. Let's use
the underlying functions directly, shall we?
> + free(ctx);
> +}
> +
> +void
> +merger_context_ref(struct merger_context *ctx)
> +{
> + if (unlikely(ctx->refs >= MERGER_CONTEXT_REF_MAX))
Wow, I wouldn't care, but okay.
> + panic("Merger context reference counter overflow");
> + ++ctx->refs;
> +}
> +
> +void
> +merger_context_unref(struct merger_context *ctx)
> +{
> + assert(ctx->refs - 1 >= 0);
> + if (--ctx->refs == 0)
> + merger_context_delete(ctx);
> +}
> +
> +struct merger_context *
> +merger_context_new(const struct key_def *key_def)
> +{
> + struct merger_context *ctx = (struct merger_context *) malloc(
Useless type conversion. There are a few more places like that.
> + sizeof(struct merger_context));
> + if (ctx == NULL) {
> + diag_set(OutOfMemory, sizeof(struct merger_context), "malloc",
> + "merger_context");
> + return NULL;
> + }
> +
> + /*
> + * We need to copy key_def, because a key_def from the
> + * parameter can be collected before merger_context end
> + * of life (say, by LuaJIT GC if the key_def comes from
> + * Lua).
> + */
> + ctx->key_def = key_def_dup(key_def);
> + if (ctx->key_def == NULL) {
> + free(ctx);
> + return NULL;
> + }
> +
> + ctx->format = box_tuple_format_new(&ctx->key_def, 1);
> + if (ctx->format == NULL) {
> + box_key_def_delete(ctx->key_def);
> + free(ctx);
> + return NULL;
> + }
> +
> + ctx->refs = 0;
> +
> + return ctx;
> +}
> +
> +/* }}} */
> +
> +/* {{{ Merger */
> +
> +/**
> + * Holds a source to fetch next tuples and a last fetched tuple to
> + * compare the node against other nodes.
> + *
> + * The main reason why this structure is separated from a merger
> + * source is that a heap node can not be a member of several
> + * heaps.
> + *
> + * The second reason is that it allows to incapsulate all heap
encapsulate :)
> + * related logic inside this compilation unit, without any trails
'trails' sounds weird to me in this context. I'd say 'traces'.
> + * in externally visible structures.
Not sure that these two reasons are ironclad:
- It's pointless to add the same source to two mergers (or to the same
merger twice). I think we'd better simply proscribe it. There's a
handy heap_node_is_stray helper that could be used to detect if a
source is already used by a merger.
- merger_heap_node is pretty lightweight: it stores only pointers
and struct heap_node so all we need is include salad/heap.h into
merger.h, which seems to be okay.
At the same time, not introducing a separate anchor struct would reduce
the amount of code you have to add and hence make the code a bit easier
to follow IMO.
> + */
> +struct merger_heap_node {
> + /* A source of tuples. */
> + struct merger_source *source;
> + /*
> + * A last fetched (refcounted) tuple to compare against
> + * other nodes.
> + */
> + struct tuple *tuple;
> + /* An anchor to make the structure a merger heap node. */
> + struct heap_node heap_node_anchor;
We typically call anchors after the accommodating container, in_merger
in this case.
> +};
> +
> +static bool
> +merger_source_less(const heap_t *heap, const struct merger_heap_node *left,
> + const struct merger_heap_node *right);
> +#define HEAP_NAME merger_heap
> +#define HEAP_LESS merger_source_less
> +#define heap_value_t struct merger_heap_node
> +#define heap_value_attr heap_node_anchor
> +#include "salad/heap.h"
> +#undef HEAP_NAME
> +#undef HEAP_LESS
> +#undef heap_value_t
> +#undef heap_value_attr
> +
> +/**
> + * Holds a heap, an immutable context, parameters of a merge
> + * process and utility fields.
> + */
> +struct merger {
> + /* A merger is a source. */
> + struct merger_source base;
> + /*
> + * Whether a merge process started.
> + *
> + * The merger postpones charging of heap nodes until a
> + * first output tuple is acquired.
> + */
> + bool started;
> + /* A merger context. */
> + struct merger_context *ctx;
> + /*
> + * A heap of sources (of nodes that contains a source to
> + * be exact).
> + */
> + heap_t heap;
> + /* An array of heap nodes. */
> + uint32_t nodes_count;
node_count :)
> + struct merger_heap_node *nodes;
> + /* Ascending (false) / descending (true) order. */
> + bool reverse;
> +};
> +
> +/* Helpers */
> +
> +/**
> + * Data comparing function to construct a heap of sources.
> + */
> +static bool
> +merger_source_less(const heap_t *heap, const struct merger_heap_node *left,
> + const struct merger_heap_node *right)
> +{
> + if (left->tuple == NULL && right->tuple == NULL)
> + return false;
> + if (left->tuple == NULL)
> + return false;
> + if (right->tuple == NULL)
> + return true;
How can this happen? I thought we would remove a source that ended
iteration (returned NULL), no?
> + struct merger *merger = container_of(heap, struct merger, heap);
> + int cmp = box_tuple_compare(left->tuple, right->tuple,
> + merger->ctx->key_def);
> + return merger->reverse ? cmp >= 0 : cmp < 0;
> +}
> +
> +/**
> + * How much more memory the heap will reserve at the next grow.
> + *
> + * See HEAP(reserve)() function in lib/salad/heap.h.
> + */
> +static size_t
> +heap_next_grow_size(const heap_t *heap)
Please don't introduce this function so intricately depending on the
heap implementation solely for error reporting - simply pass 0 to
diag_set(OutOfMemory) - it's perfectly fine, we do that all the time.
> +{
> + heap_off_t heap_capacity_diff = heap->capacity == 0 ?
> + HEAP_INITIAL_CAPACITY : heap->capacity;
> + return heap_capacity_diff * sizeof(struct heap_node *);
> +}
> +
> +/**
> + * Initialize a new merger heap node.
> + */
> +static void
> +merger_heap_node_new(struct merger_heap_node *node,
> + struct merger_source *source)
> +{
> + node->source = source;
> + merger_source_ref(node->source);
> + node->tuple = NULL;
> + heap_node_create(&node->heap_node_anchor);
> +}
> +
> +/**
> + * Free a merger heap node.
> + */
> +static void
> +merger_heap_node_delete(struct merger_heap_node *node)
> +{
> + merger_source_unref(node->source);
> + if (node->tuple != NULL)
> + box_tuple_unref(node->tuple);
> +}
> +
> +/**
> + * The helper to add a new heap node to a merger heap.
> + *
> + * Return -1 at an error and set a diag.
> + *
> + * Otherwise store a next tuple in node->tuple, add the node to
> + * merger->heap and return 0.
> + */
> +static int
> +merger_add_heap_node(struct merger *merger, struct merger_heap_node *node)
> +{
> + struct tuple *tuple = NULL;
> +
> + /* Acquire a next tuple. */
> + struct merger_source *source = node->source;
> + if (source->vtab->next(source, merger->ctx->format, &tuple) != 0)
> + return -1;
> +
> + /* Don't add an empty source to a heap. */
> + if (tuple == NULL)
> + return 0;
> +
> + node->tuple = tuple;
> +
> + /* Add a node to a heap. */
> + if (merger_heap_insert(&merger->heap, node)) {
According to our coding conventions, != 0 should be checked explicitly.
> + diag_set(OutOfMemory, heap_next_grow_size(&merger->heap),
> + "malloc", "merger->heap");
> + return -1;
> + }
> +
> + return 0;
> +}
> +
> +/* Virtual methods declarations */
> +
> +static void
> +merger_delete(struct merger_source *base);
> +static int
> +merger_next(struct merger_source *base, box_tuple_format_t *format,
> + struct tuple **out);
> +
> +/* Non-virtual methods */
> +
> +struct merger_source *
> +merger_new(struct merger_context *ctx)
> +{
> + static struct merger_source_vtab merger_vtab = {
> + .delete = merger_delete,
> + .next = merger_next,
> + };
> +
> + struct merger *merger = (struct merger *) malloc(sizeof(struct merger));
> + if (merger == NULL) {
> + diag_set(OutOfMemory, sizeof(struct merger), "malloc",
> + "merger");
> + return NULL;
> + }
> +
> + merger_source_new(&merger->base, &merger_vtab);
> +
> + merger->started = false;
> + merger->ctx = ctx;
> + merger_context_ref(merger->ctx);
> + merger_heap_create(&merger->heap);
> + merger->nodes_count = 0;
> + merger->nodes = NULL;
> + merger->reverse = false;
> +
> + return &merger->base;
> +}
> +
> +int
> +merger_set_sources(struct merger_source *base, struct merger_source **sources,
> + uint32_t sources_count)
> +{
> + struct merger *merger = container_of(base, struct merger, base);
> +
> + /* Ensure we don't leak old nodes. */
> + assert(merger->nodes_count == 0);
> + assert(merger->nodes == NULL);
> +
> + const size_t nodes_size =
> + sources_count * sizeof(struct merger_heap_node);
> + struct merger_heap_node *nodes = (struct merger_heap_node *) malloc(
> + nodes_size);
> + if (nodes == NULL) {
> + diag_set(OutOfMemory, nodes_size, "malloc",
> + "merger heap nodes");
> + return -1;
> + }
> +
> + for (uint32_t i = 0; i < sources_count; ++i)
> + merger_heap_node_new(&nodes[i], sources[i]);
> +
> + merger->nodes_count = sources_count;
> + merger->nodes = nodes;
> + return 0;
> +}
> +
> +void
> +merger_set_reverse(struct merger_source *base, bool reverse)
> +{
> + struct merger *merger = container_of(base, struct merger, base);
> +
> + merger->reverse = reverse;
> +}
> +
> +/* Virtual methods */
> +
> +static void
> +merger_delete(struct merger_source *base)
> +{
> + struct merger *merger = container_of(base, struct merger, base);
> +
> + merger_context_unref(merger->ctx);
> + merger_heap_destroy(&merger->heap);
> +
> + for (uint32_t i = 0; i < merger->nodes_count; ++i)
> + merger_heap_node_delete(&merger->nodes[i]);
> +
> + if (merger->nodes != NULL)
> + free(merger->nodes);
> +
> + free(merger);
> +}
> +
> +static int
> +merger_next(struct merger_source *base, box_tuple_format_t *format,
> + struct tuple **out)
> +{
> + struct merger *merger = container_of(base, struct merger, base);
> +
> + /*
> + * Fetch a first tuple for each source and add all heap
> + * nodes to a merger heap.
> + */
> + if (!merger->started) {
> + for (uint32_t i = 0; i < merger->nodes_count; ++i) {
> + struct merger_heap_node *node = &merger->nodes[i];
> + if (merger_add_heap_node(merger, node) != 0)
> + return -1;
> + }
> + merger->started = true;
> + }
> +
> + /* Get a next tuple. */
> + struct merger_heap_node *node = merger_heap_top(&merger->heap);
> + if (node == NULL) {
> + *out = NULL;
> + return 0;
> + }
> + struct tuple *tuple = node->tuple;
> + assert(tuple != NULL);
> +
> + /*
> + * The tuples are stored in merger->ctx->format for
> + * fast comparisons, but we should return tuples in a
> + * requested format.
> + */
> + uint32_t id_stored = tuple_format_id(merger->ctx->format);
> + assert(tuple->format_id == id_stored);
> + if (format == NULL)
> + format = merger->ctx->format;
> + uint32_t id_requested = tuple_format_id(format);
> + if (id_stored != id_requested) {
Tuples returned by a source must be sorted. This means that you can't
pass a merger to another merger using a different key def. That is,
this code is somewhat pointless. And it looks weird to me.
I have a suspicion that all those relocations are going to cost us more
than tuple comparisons themselves. May be, we shouldn't relocate tuples
at all, just validate and forward them as they are? After all, we are
heading toward having offsets of all formatted fields so if we are given
a tuple (e.g. from space.select) it's likely to have all fields indexed
already.
> + const char *tuple_beg = tuple_data(tuple);
> + const char *tuple_end = tuple_beg + tuple->bsize;
> + tuple = box_tuple_new(format, tuple_beg, tuple_end);
> + if (tuple == NULL)
> + return -1;
> + box_tuple_ref(tuple);
> + /*
> + * The node->tuple pointer will be rewritten below
> + * and in this branch it will not be returned. So
> + * we unreference it.
> + */
> + box_tuple_unref(node->tuple);
> + }
> +
> + /*
> + * Note: An old node->tuple pointer will be written to
> + * *out as refcounted tuple or is already unreferenced
> + * above, so we don't unreference it here.
> + */
> + struct merger_source *source = node->source;
> + if (source->vtab->next(source, merger->ctx->format, &node->tuple) != 0)
> + return -1;
> +
> + /* Update a heap. */
> + if (node->tuple == NULL)
> + merger_heap_delete(&merger->heap, node);
> + else
> + merger_heap_update(&merger->heap, node);
> +
> + *out = tuple;
> + return 0;
> +}
> +
> +/* }}} */
> diff --git a/src/box/merger.h b/src/box/merger.h
> new file mode 100644
> index 000000000..2323dd7d7
> --- /dev/null
> +++ b/src/box/merger.h
> @@ -0,0 +1,180 @@
> +#ifndef TARANTOOL_BOX_MERGER_H_INCLUDED
> +#define TARANTOOL_BOX_MERGER_H_INCLUDED
> +/*
> + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + * copyright notice, this list of conditions and the
> + * following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + * copyright notice, this list of conditions and the following
> + * disclaimer in the documentation and/or other materials
> + * provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +
> +#include <stdbool.h>
> +#include <stdint.h>
> +
> +#if defined(__cplusplus)
> +extern "C" {
> +#endif /* defined(__cplusplus) */
> +
> +/* {{{ Structures */
> +
> +struct tuple;
> +struct key_def;
> +struct tuple_format;
> +typedef struct tuple_format box_tuple_format_t;
> +
> +struct merger_source;
> +struct merger_context;
> +
> +struct merger_source_vtab {
> + /**
> + * Free a merger source.
> + *
> + * Don't call it directly, use merger_source_unref()
> + * instead.
> + */
> + void (*delete)(struct merger_source *base);
'delete' is a reserved word in C++
IMO better avoid using it in a header file - it may be included in a C++
file one day.
> + /**
> + * Get a next tuple (refcounted) from a source.
> + *
> + * When format is NULL it means that it does not matter
> + * for a caller in which format a tuple will be.
> + *
> + * Return 0 when successfully fetched a tuple or NULL. In
> + * case of an error set a diag and return -1.
> + */
> + int (*next)(struct merger_source *base, box_tuple_format_t *format,
> + struct tuple **out);
Let's please add inline merge_source_next wrapper function.
It'd look neater than calling src->vtab->next directly.
Also, 'format' looks more like a parameter of a source to me, not an
argument of this function. What about moving 'format' to merge_source
struct?
> +};
> +
> +/**
> + * Base (abstract) structure to represent a merger source.
> + *
> + * The structure does not hold any resources.
> + */
> +struct merger_source {
> + /* Source-specific methods. */
> + struct merger_source_vtab *vtab;
Should be const.
> + /* Reference counter. */
> + int refs;
> +};
> +
> +/**
> + * Holds immutable parameters of a merger.
> + */
> +struct merger_context {
> + struct key_def *key_def;
> + box_tuple_format_t *format;
> + /* Reference counter. */
> + int refs;
> +};
> +
> +/* }}} */
> +
> +/* {{{ Create, delete, ref, unref a source and a context */
> +
> +/**
> + * Increment a merger source reference counter.
> + */
> +void
> +merger_source_ref(struct merger_source *source);
> +
> +/**
> + * Decrement a merger source reference counter. When it has
> + * reached zero, free the source (call delete() virtual method).
> + */
> +void
> +merger_source_unref(struct merger_source *source);
> +
> +/**
> + * Initialize a base merger source structure.
> + */
> +void
> +merger_source_new(struct merger_source *source,
> + struct merger_source_vtab *vtab);
> +
> +/**
> + * Free a merger context.
> + */
> +void
> +merger_context_delete(struct merger_context *ctx);
Why make it public? AFAICS you only need merger_context_unref outside.
> +
> +/**
> + * Increment a merger context reference counter.
> + */
> +void
> +merger_context_ref(struct merger_context *ctx);
> +
> +/**
> + * Decrement a merger context reference counter. When it has
> + * reached zero, free the context.
> + */
> +void
> +merger_context_unref(struct merger_context *ctx);
> +
> +/**
> + * Create a new merger context.
> + *
> + * A returned merger context is NOT reference counted.
It is refernece counted. It just has the reference counter set to 0.
Anyway, why not set it to 1 right away - the caller will do that in
any case, no?
> + *
> + * Return NULL and set a diag in case of an error.
> + */
> +struct merger_context *
> +merger_context_new(const struct key_def *key_def);
> +
> +/* }}} */
> +
> +/* {{{ Merger */
> +
> +/**
> + * Create a new merger w/o sources.
> + *
> + * Return NULL and set a diag in case of an error.
> + */
> +struct merger_source *
> +merger_new(struct merger_context *ctx);
> +
> +/**
> + * Set sources for a merger.
> + *
> + * Return 0 at success. Return -1 at an error and set a diag.
> + */
> +int
> +merger_set_sources(struct merger_source *base, struct merger_source **sources,
> + uint32_t sources_count);
source_count :)
> +
> +/**
> + * Set reverse flag for a merger.
> + */
> +void
> +merger_set_reverse(struct merger_source *base, bool reverse);
I'd rather pass all the arguments directly to the constructor, the way
you do it in Lua, otherwise unsettling questions might arise: what
happens if I call set_sources or set_reverse after using the merger?
can I use a merger without calling those functions?
> +
> +/* }}} */
> +
> +#if defined(__cplusplus)
> +} /* extern "C" */
> +#endif /* defined(__cplusplus) */
> +
> +#endif /* TARANTOOL_BOX_MERGER_H_INCLUDED */
> diff --git a/test/unit/merger.test.c b/test/unit/merger.test.c
> new file mode 100644
> index 000000000..0a25d8f04
> --- /dev/null
> +++ b/test/unit/merger.test.c
> @@ -0,0 +1,301 @@
> +#include "unit.h" /* plan, header, footer, is, ok */
> +#include "memory.h" /* memory_init() */
> +#include "fiber.h" /* fiber_init() */
> +#include "box/tuple.h" /* tuple_init(), box_tuple_*(),
> + tuple_*() */
> +#include "box/tuple_format.h" /* box_tuple_format_default(),
> + tuple_format_id() */
> +#include "box/key_def.h" /* key_def_new(),
> + key_def_delete() */
> +#include "box/merger.h" /* merger_*() */
> +
> +/* {{{ Array merger source */
> +
> +struct merger_source_array {
> + struct merger_source base;
> + uint32_t tuples_count;
> + struct tuple **tuples;
> + uint32_t cur;
> +};
> +
> +/* Virtual methods declarations */
> +
> +static void
> +merger_source_array_delete(struct merger_source *base);
> +static int
> +merger_source_array_next(struct merger_source *base, box_tuple_format_t *format,
> + struct tuple **out);
> +
> +/* Non-virtual methods */
> +
> +static struct merger_source *
> +merger_source_array_new(bool even)
> +{
> + static struct merger_source_vtab merger_source_array_vtab = {
> + .delete = merger_source_array_delete,
> + .next = merger_source_array_next,
> + };
> +
> + struct merger_source_array *source =
> + (struct merger_source_array *) malloc(
> + sizeof(struct merger_source_array));
> + assert(source != NULL);
> +
> + merger_source_new(&source->base, &merger_source_array_vtab);
> +
> + uint32_t tuple_size = 2;
> + const uint32_t tuples_count = 2;
> + /* {1}, {3} */
> + static const char *data_odd[] = {"\x91\x01", "\x91\x03"};
You might want to use mp_format here.
> + /* {2}, {4} */
> + static const char *data_even[] = {"\x91\x02", "\x91\x04"};
> + const char **data = even ? data_even : data_odd;
> + source->tuples = (struct tuple **) malloc(
> + tuples_count * sizeof(struct tuple *));
> + assert(source->tuples != NULL);
> + box_tuple_format_t *format = box_tuple_format_default();
> + for (uint32_t i = 0; i < tuples_count; ++i) {
> + const char *end = data[i] + tuple_size;
> + source->tuples[i] = box_tuple_new(format, data[i], end);
> + box_tuple_ref(source->tuples[i]);
> + }
> + source->tuples_count = tuples_count;
> + source->cur = 0;
> +
> + return &source->base;
> +}
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [PATCH v3 6/7] Add merger for tuples streams (C part)
2019-04-30 15:34 ` Vladimir Davydov
@ 2019-05-07 22:14 ` Alexander Turenko
0 siblings, 0 replies; 39+ messages in thread
From: Alexander Turenko @ 2019-05-07 22:14 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
Thanks!
I'll send 4th versions of the patchset.
WBR, Alexander Turenko.
On Tue, Apr 30, 2019 at 06:34:15PM +0300, Vladimir Davydov wrote:
> On Wed, Apr 10, 2019 at 06:21:24PM +0300, Alexander Turenko wrote:
> > diff --git a/src/box/merger.c b/src/box/merger.c
> > new file mode 100644
> > index 000000000..83f628758
> > --- /dev/null
> > +++ b/src/box/merger.c
> > @@ -0,0 +1,464 @@
> > +/*
> > + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> > + *
> > + * Redistribution and use in source and binary forms, with or
> > + * without modification, are permitted provided that the following
> > + * conditions are met:
> > + *
> > + * 1. Redistributions of source code must retain the above
> > + * copyright notice, this list of conditions and the
> > + * following disclaimer.
> > + *
> > + * 2. Redistributions in binary form must reproduce the above
> > + * copyright notice, this list of conditions and the following
> > + * disclaimer in the documentation and/or other materials
> > + * provided with the distribution.
> > + *
> > + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> > + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> > + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> > + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> > + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> > + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> > + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> > + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> > + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> > + * SUCH DAMAGE.
> > + */
> > +
> > +#include "box/merger.h"
> > +
> > +#include <assert.h>
> > +#include <stdbool.h>
> > +#include <stdint.h>
> > +#include <stdlib.h>
> > +
> > +#define HEAP_FORWARD_DECLARATION
> > +#include "salad/heap.h"
> > +
> > +#include "trivia/util.h" /* unlikely() */
> > +#include "diag.h" /* diag_set() */
> > +#include "say.h" /* panic() */
> > +#include "box/tuple.h" /* box_tuple_unref() */
> > +#include "box/tuple_format.h" /* box_tuple_format_*(),
> > + tuple_format_id() */
> > +#include "box/key_def.h" /* box_key_def_*(),
> > + box_tuple_compare() */
> > +
> > +/* {{{ Create, delete, ref, unref a source and a context */
> > +
> > +enum { MERGER_SOURCE_REF_MAX = INT_MAX };
> > +enum { MERGER_CONTEXT_REF_MAX = INT_MAX };
> > +
> > +void
> > +merger_source_ref(struct merger_source *source)
>
> I tend to agree with Kostja that merger_source sounds sort of awkward,
> like it's a source of mergers (cf. tuple_source - source of tuples).
Okay. Renamed.
> Similarly, merger_context => merge_context. BTW what about including
> 'reverse' in merge_context?
The idea was to cache one context and use it to process, say, EQ and REQ
requests both.
Anyway, merger_context is gone.
>
> > +{
> > + if (unlikely(source->refs >= MERGER_SOURCE_REF_MAX))
> > + panic("Merger source reference counter overflow");
> > + ++source->refs;
> > +}
> > +
> > +void
> > +merger_source_unref(struct merger_source *source)
> > +{
> > + assert(source->refs - 1 >= 0);
> > + if (--source->refs == 0)
> > + source->vtab->delete(source);
> > +}
> > +
> > +void
> > +merger_source_new(struct merger_source *source, struct merger_source_vtab *vtab)
>
> We typically call a constructor function _create: merger_source_create.
Changed. Also changed merger_heap_node_new() to
merger_heap_node_create().
>
> > +{
> > + source->vtab = vtab;
> > + source->refs = 0;
> > +}
> > +
> > +void
> > +merger_context_delete(struct merger_context *ctx)
> > +{
> > + box_key_def_delete(ctx->key_def);
> > + box_tuple_format_unref(ctx->format);
>
> I suggest avoid using box_ methods in the code - after all those are
> intended for external modules. Creating a key_def with key_def_dup and
> freeing it with box_key_def_delete looks especially weird. Let's use
> the underlying functions directly, shall we?
I had eliminated all box_* types and functions where it can be easily
done. The only function I keep use is box_tuple_format_new(), because it
is the convenient helper to create a format from a key_def.
>
> > + free(ctx);
> > +}
> > +
> > +void
> > +merger_context_ref(struct merger_context *ctx)
> > +{
> > + if (unlikely(ctx->refs >= MERGER_CONTEXT_REF_MAX))
>
> Wow, I wouldn't care, but okay.
Removed, because I made merge_source_ref/unref static inline and don't
want to include "util/trivia.h" (for unlikely()) and "say.h" (for
panic()) in merger.h.
>
> > + panic("Merger context reference counter overflow");
> > + ++ctx->refs;
> > +}
> > +
> > +void
> > +merger_context_unref(struct merger_context *ctx)
> > +{
> > + assert(ctx->refs - 1 >= 0);
> > + if (--ctx->refs == 0)
> > + merger_context_delete(ctx);
> > +}
> > +
> > +struct merger_context *
> > +merger_context_new(const struct key_def *key_def)
> > +{
> > + struct merger_context *ctx = (struct merger_context *) malloc(
>
> Useless type conversion. There are a few more places like that.
Removed casts in all such places.
>
> > + sizeof(struct merger_context));
> > + if (ctx == NULL) {
> > + diag_set(OutOfMemory, sizeof(struct merger_context), "malloc",
> > + "merger_context");
> > + return NULL;
> > + }
> > +
> > + /*
> > + * We need to copy key_def, because a key_def from the
> > + * parameter can be collected before merger_context end
> > + * of life (say, by LuaJIT GC if the key_def comes from
> > + * Lua).
> > + */
> > + ctx->key_def = key_def_dup(key_def);
> > + if (ctx->key_def == NULL) {
> > + free(ctx);
> > + return NULL;
> > + }
> > +
> > + ctx->format = box_tuple_format_new(&ctx->key_def, 1);
> > + if (ctx->format == NULL) {
> > + box_key_def_delete(ctx->key_def);
> > + free(ctx);
> > + return NULL;
> > + }
> > +
> > + ctx->refs = 0;
> > +
> > + return ctx;
> > +}
> > +
> > +/* }}} */
> > +
> > +/* {{{ Merger */
> > +
> > +/**
> > + * Holds a source to fetch next tuples and a last fetched tuple to
> > + * compare the node against other nodes.
> > + *
> > + * The main reason why this structure is separated from a merger
> > + * source is that a heap node can not be a member of several
> > + * heaps.
> > + *
> > + * The second reason is that it allows to incapsulate all heap
>
> encapsulate :)
Yep :)
>
> > + * related logic inside this compilation unit, without any trails
>
> 'trails' sounds weird to me in this context. I'd say 'traces'.
Okay.
>
> > + * in externally visible structures.
>
> Not sure that these two reasons are ironclad:
>
> - It's pointless to add the same source to two mergers (or to the same
> merger twice). I think we'd better simply proscribe it. There's a
> handy heap_node_is_stray helper that could be used to detect if a
> source is already used by a merger.
>
> - merger_heap_node is pretty lightweight: it stores only pointers
> and struct heap_node so all we need is include salad/heap.h into
> merger.h, which seems to be okay.
>
> At the same time, not introducing a separate anchor struct would reduce
> the amount of code you have to add and hence make the code a bit easier
> to follow IMO.
We discussed it voicely with Vladimir and he says that don't insist on
this.
I found it very misleading to have a next tuple for a merger in a
source. It is easier to work with the code (say, check whether a tuple
(un)referenced in a right way) when a merger don't hold its state in a
source.
>
> > + */
> > +struct merger_heap_node {
> > + /* A source of tuples. */
> > + struct merger_source *source;
> > + /*
> > + * A last fetched (refcounted) tuple to compare against
> > + * other nodes.
> > + */
> > + struct tuple *tuple;
> > + /* An anchor to make the structure a merger heap node. */
> > + struct heap_node heap_node_anchor;
>
> We typically call anchors after the accommodating container, in_merger
> in this case.
Okay.
>
> > +};
> > +
> > +static bool
> > +merger_source_less(const heap_t *heap, const struct merger_heap_node *left,
> > + const struct merger_heap_node *right);
> > +#define HEAP_NAME merger_heap
> > +#define HEAP_LESS merger_source_less
> > +#define heap_value_t struct merger_heap_node
> > +#define heap_value_attr heap_node_anchor
> > +#include "salad/heap.h"
> > +#undef HEAP_NAME
> > +#undef HEAP_LESS
> > +#undef heap_value_t
> > +#undef heap_value_attr
> > +
> > +/**
> > + * Holds a heap, an immutable context, parameters of a merge
> > + * process and utility fields.
> > + */
> > +struct merger {
> > + /* A merger is a source. */
> > + struct merger_source base;
> > + /*
> > + * Whether a merge process started.
> > + *
> > + * The merger postpones charging of heap nodes until a
> > + * first output tuple is acquired.
> > + */
> > + bool started;
> > + /* A merger context. */
> > + struct merger_context *ctx;
> > + /*
> > + * A heap of sources (of nodes that contains a source to
> > + * be exact).
> > + */
> > + heap_t heap;
> > + /* An array of heap nodes. */
> > + uint32_t nodes_count;
>
> node_count :)
Changed all such names.
>
> > + struct merger_heap_node *nodes;
> > + /* Ascending (false) / descending (true) order. */
> > + bool reverse;
> > +};
> > +
> > +/* Helpers */
> > +
> > +/**
> > + * Data comparing function to construct a heap of sources.
> > + */
> > +static bool
> > +merger_source_less(const heap_t *heap, const struct merger_heap_node *left,
> > + const struct merger_heap_node *right)
> > +{
> > + if (left->tuple == NULL && right->tuple == NULL)
> > + return false;
> > + if (left->tuple == NULL)
> > + return false;
> > + if (right->tuple == NULL)
> > + return true;
>
> How can this happen? I thought we would remove a source that ended
> iteration (returned NULL), no?
You are right. Replaced with asserts.
>
> > + struct merger *merger = container_of(heap, struct merger, heap);
> > + int cmp = box_tuple_compare(left->tuple, right->tuple,
> > + merger->ctx->key_def);
> > + return merger->reverse ? cmp >= 0 : cmp < 0;
> > +}
> > +
> > +/**
> > + * How much more memory the heap will reserve at the next grow.
> > + *
> > + * See HEAP(reserve)() function in lib/salad/heap.h.
> > + */
> > +static size_t
> > +heap_next_grow_size(const heap_t *heap)
>
> Please don't introduce this function so intricately depending on the
> heap implementation solely for error reporting - simply pass 0 to
> diag_set(OutOfMemory) - it's perfectly fine, we do that all the time.
Didn't know that it is okay to pass 0 here. Fixed.
>
> > +{
> > + heap_off_t heap_capacity_diff = heap->capacity == 0 ?
> > + HEAP_INITIAL_CAPACITY : heap->capacity;
> > + return heap_capacity_diff * sizeof(struct heap_node *);
> > +}
> > +
> > +/**
> > + * Initialize a new merger heap node.
> > + */
> > +static void
> > +merger_heap_node_new(struct merger_heap_node *node,
> > + struct merger_source *source)
> > +{
> > + node->source = source;
> > + merger_source_ref(node->source);
> > + node->tuple = NULL;
> > + heap_node_create(&node->heap_node_anchor);
> > +}
> > +
> > +/**
> > + * Free a merger heap node.
> > + */
> > +static void
> > +merger_heap_node_delete(struct merger_heap_node *node)
> > +{
> > + merger_source_unref(node->source);
> > + if (node->tuple != NULL)
> > + box_tuple_unref(node->tuple);
> > +}
> > +
> > +/**
> > + * The helper to add a new heap node to a merger heap.
> > + *
> > + * Return -1 at an error and set a diag.
> > + *
> > + * Otherwise store a next tuple in node->tuple, add the node to
> > + * merger->heap and return 0.
> > + */
> > +static int
> > +merger_add_heap_node(struct merger *merger, struct merger_heap_node *node)
> > +{
> > + struct tuple *tuple = NULL;
> > +
> > + /* Acquire a next tuple. */
> > + struct merger_source *source = node->source;
> > + if (source->vtab->next(source, merger->ctx->format, &tuple) != 0)
> > + return -1;
> > +
> > + /* Don't add an empty source to a heap. */
> > + if (tuple == NULL)
> > + return 0;
> > +
> > + node->tuple = tuple;
> > +
> > + /* Add a node to a heap. */
> > + if (merger_heap_insert(&merger->heap, node)) {
>
> According to our coding conventions, != 0 should be checked explicitly.
Fixed.
>
> > + diag_set(OutOfMemory, heap_next_grow_size(&merger->heap),
> > + "malloc", "merger->heap");
> > + return -1;
> > + }
> > +
> > + return 0;
> > +}
> > +
> > +/* Virtual methods declarations */
> > +
> > +static void
> > +merger_delete(struct merger_source *base);
> > +static int
> > +merger_next(struct merger_source *base, box_tuple_format_t *format,
> > + struct tuple **out);
> > +
> > +/* Non-virtual methods */
> > +
> > +struct merger_source *
> > +merger_new(struct merger_context *ctx)
> > +{
> > + static struct merger_source_vtab merger_vtab = {
> > + .delete = merger_delete,
> > + .next = merger_next,
> > + };
> > +
> > + struct merger *merger = (struct merger *) malloc(sizeof(struct merger));
> > + if (merger == NULL) {
> > + diag_set(OutOfMemory, sizeof(struct merger), "malloc",
> > + "merger");
> > + return NULL;
> > + }
> > +
> > + merger_source_new(&merger->base, &merger_vtab);
> > +
> > + merger->started = false;
> > + merger->ctx = ctx;
> > + merger_context_ref(merger->ctx);
> > + merger_heap_create(&merger->heap);
> > + merger->nodes_count = 0;
> > + merger->nodes = NULL;
> > + merger->reverse = false;
> > +
> > + return &merger->base;
> > +}
> > +
> > +int
> > +merger_set_sources(struct merger_source *base, struct merger_source **sources,
> > + uint32_t sources_count)
> > +{
> > + struct merger *merger = container_of(base, struct merger, base);
> > +
> > + /* Ensure we don't leak old nodes. */
> > + assert(merger->nodes_count == 0);
> > + assert(merger->nodes == NULL);
> > +
> > + const size_t nodes_size =
> > + sources_count * sizeof(struct merger_heap_node);
> > + struct merger_heap_node *nodes = (struct merger_heap_node *) malloc(
> > + nodes_size);
> > + if (nodes == NULL) {
> > + diag_set(OutOfMemory, nodes_size, "malloc",
> > + "merger heap nodes");
> > + return -1;
> > + }
> > +
> > + for (uint32_t i = 0; i < sources_count; ++i)
> > + merger_heap_node_new(&nodes[i], sources[i]);
> > +
> > + merger->nodes_count = sources_count;
> > + merger->nodes = nodes;
> > + return 0;
> > +}
> > +
> > +void
> > +merger_set_reverse(struct merger_source *base, bool reverse)
> > +{
> > + struct merger *merger = container_of(base, struct merger, base);
> > +
> > + merger->reverse = reverse;
> > +}
> > +
> > +/* Virtual methods */
> > +
> > +static void
> > +merger_delete(struct merger_source *base)
> > +{
> > + struct merger *merger = container_of(base, struct merger, base);
> > +
> > + merger_context_unref(merger->ctx);
> > + merger_heap_destroy(&merger->heap);
> > +
> > + for (uint32_t i = 0; i < merger->nodes_count; ++i)
> > + merger_heap_node_delete(&merger->nodes[i]);
> > +
> > + if (merger->nodes != NULL)
> > + free(merger->nodes);
> > +
> > + free(merger);
> > +}
> > +
> > +static int
> > +merger_next(struct merger_source *base, box_tuple_format_t *format,
> > + struct tuple **out)
> > +{
> > + struct merger *merger = container_of(base, struct merger, base);
> > +
> > + /*
> > + * Fetch a first tuple for each source and add all heap
> > + * nodes to a merger heap.
> > + */
> > + if (!merger->started) {
> > + for (uint32_t i = 0; i < merger->nodes_count; ++i) {
> > + struct merger_heap_node *node = &merger->nodes[i];
> > + if (merger_add_heap_node(merger, node) != 0)
> > + return -1;
> > + }
> > + merger->started = true;
> > + }
> > +
> > + /* Get a next tuple. */
> > + struct merger_heap_node *node = merger_heap_top(&merger->heap);
> > + if (node == NULL) {
> > + *out = NULL;
> > + return 0;
> > + }
> > + struct tuple *tuple = node->tuple;
> > + assert(tuple != NULL);
> > +
> > + /*
> > + * The tuples are stored in merger->ctx->format for
> > + * fast comparisons, but we should return tuples in a
> > + * requested format.
> > + */
> > + uint32_t id_stored = tuple_format_id(merger->ctx->format);
> > + assert(tuple->format_id == id_stored);
> > + if (format == NULL)
> > + format = merger->ctx->format;
> > + uint32_t id_requested = tuple_format_id(format);
> > + if (id_stored != id_requested) {
>
> Tuples returned by a source must be sorted. This means that you can't
> pass a merger to another merger using a different key def. That is,
> this code is somewhat pointless. And it looks weird to me.
>
> I have a suspicion that all those relocations are going to cost us more
> than tuple comparisons themselves. May be, we shouldn't relocate tuples
> at all, just validate and forward them as they are? After all, we are
> heading toward having offsets of all formatted fields so if we are given
> a tuple (e.g. from space.select) it's likely to have all fields indexed
> already.
Runtime tuples uses smalloc(), which should be cheaper that malloc(),
but your point looks important after merge context removal. Also I agree
that we should not recreate tuples from a local space. So I rewrote it
with tuple_validate() in merger_next() and in table and tuple sources.
>
> > + const char *tuple_beg = tuple_data(tuple);
> > + const char *tuple_end = tuple_beg + tuple->bsize;
> > + tuple = box_tuple_new(format, tuple_beg, tuple_end);
> > + if (tuple == NULL)
> > + return -1;
> > + box_tuple_ref(tuple);
> > + /*
> > + * The node->tuple pointer will be rewritten below
> > + * and in this branch it will not be returned. So
> > + * we unreference it.
> > + */
> > + box_tuple_unref(node->tuple);
> > + }
> > +
> > + /*
> > + * Note: An old node->tuple pointer will be written to
> > + * *out as refcounted tuple or is already unreferenced
> > + * above, so we don't unreference it here.
> > + */
> > + struct merger_source *source = node->source;
> > + if (source->vtab->next(source, merger->ctx->format, &node->tuple) != 0)
> > + return -1;
> > +
> > + /* Update a heap. */
> > + if (node->tuple == NULL)
> > + merger_heap_delete(&merger->heap, node);
> > + else
> > + merger_heap_update(&merger->heap, node);
> > +
> > + *out = tuple;
> > + return 0;
> > +}
> > +
> > +/* }}} */
> > diff --git a/src/box/merger.h b/src/box/merger.h
> > new file mode 100644
> > index 000000000..2323dd7d7
> > --- /dev/null
> > +++ b/src/box/merger.h
> > @@ -0,0 +1,180 @@
> > +#ifndef TARANTOOL_BOX_MERGER_H_INCLUDED
> > +#define TARANTOOL_BOX_MERGER_H_INCLUDED
> > +/*
> > + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> > + *
> > + * Redistribution and use in source and binary forms, with or
> > + * without modification, are permitted provided that the following
> > + * conditions are met:
> > + *
> > + * 1. Redistributions of source code must retain the above
> > + * copyright notice, this list of conditions and the
> > + * following disclaimer.
> > + *
> > + * 2. Redistributions in binary form must reproduce the above
> > + * copyright notice, this list of conditions and the following
> > + * disclaimer in the documentation and/or other materials
> > + * provided with the distribution.
> > + *
> > + * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
> > + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> > + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> > + * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> > + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> > + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> > + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> > + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> > + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> > + * SUCH DAMAGE.
> > + */
> > +
> > +#include <stdbool.h>
> > +#include <stdint.h>
> > +
> > +#if defined(__cplusplus)
> > +extern "C" {
> > +#endif /* defined(__cplusplus) */
> > +
> > +/* {{{ Structures */
> > +
> > +struct tuple;
> > +struct key_def;
> > +struct tuple_format;
> > +typedef struct tuple_format box_tuple_format_t;
> > +
> > +struct merger_source;
> > +struct merger_context;
> > +
> > +struct merger_source_vtab {
> > + /**
> > + * Free a merger source.
> > + *
> > + * Don't call it directly, use merger_source_unref()
> > + * instead.
> > + */
> > + void (*delete)(struct merger_source *base);
>
> 'delete' is a reserved word in C++
>
> IMO better avoid using it in a header file - it may be included in a C++
> file one day.
Changed to 'destroy'. Also changed names of implementations of this
virtual method:
* luaL_merge_source_*_delete() -> luaL_merge_source_*_destroy();
* merge_source_array_delete() -> merge_source_array_destroy() (in the
unit test).
>
> > + /**
> > + * Get a next tuple (refcounted) from a source.
> > + *
> > + * When format is NULL it means that it does not matter
> > + * for a caller in which format a tuple will be.
> > + *
> > + * Return 0 when successfully fetched a tuple or NULL. In
> > + * case of an error set a diag and return -1.
> > + */
> > + int (*next)(struct merger_source *base, box_tuple_format_t *format,
> > + struct tuple **out);
>
> Let's please add inline merge_source_next wrapper function.
> It'd look neater than calling src->vtab->next directly.
Added.
Also made all base source functions (create, ref, unref, next) static
inline.
> Also, 'format' looks more like a parameter of a source to me, not an
> argument of this function. What about moving 'format' to merge_source
> struct?
It is the possible way to design this API. We'll need to allow a user to
set a format for a source in Lua too in this case. We discussed this a
bit with Vladimir. I have proposed to add a key_def argument for our Lua
sources, but Vladimir declines this saying that a source does not
perform tuple comparisons.
>
> > +};
> > +
> > +/**
> > + * Base (abstract) structure to represent a merger source.
> > + *
> > + * The structure does not hold any resources.
> > + */
> > +struct merger_source {
> > + /* Source-specific methods. */
> > + struct merger_source_vtab *vtab;
>
> Should be const.
Fixed.
>
> > + /* Reference counter. */
> > + int refs;
> > +};
> > +
> > +/**
> > + * Holds immutable parameters of a merger.
> > + */
> > +struct merger_context {
> > + struct key_def *key_def;
> > + box_tuple_format_t *format;
> > + /* Reference counter. */
> > + int refs;
> > +};
> > +
> > +/* }}} */
> > +
> > +/* {{{ Create, delete, ref, unref a source and a context */
> > +
> > +/**
> > + * Increment a merger source reference counter.
> > + */
> > +void
> > +merger_source_ref(struct merger_source *source);
> > +
> > +/**
> > + * Decrement a merger source reference counter. When it has
> > + * reached zero, free the source (call delete() virtual method).
> > + */
> > +void
> > +merger_source_unref(struct merger_source *source);
> > +
> > +/**
> > + * Initialize a base merger source structure.
> > + */
> > +void
> > +merger_source_new(struct merger_source *source,
> > + struct merger_source_vtab *vtab);
> > +
> > +/**
> > + * Free a merger context.
> > + */
> > +void
> > +merger_context_delete(struct merger_context *ctx);
>
> Why make it public? AFAICS you only need merger_context_unref outside.
Traces of some old code. Anyway, merger_context is gone.
>
> > +
> > +/**
> > + * Increment a merger context reference counter.
> > + */
> > +void
> > +merger_context_ref(struct merger_context *ctx);
> > +
> > +/**
> > + * Decrement a merger context reference counter. When it has
> > + * reached zero, free the context.
> > + */
> > +void
> > +merger_context_unref(struct merger_context *ctx);
> > +
> > +/**
> > + * Create a new merger context.
> > + *
> > + * A returned merger context is NOT reference counted.
>
> It is refernece counted. It just has the reference counter set to 0.
> Anyway, why not set it to 1 right away - the caller will do that in
> any case, no?
merger_context is gone, but this is applicable to merge_source too.
Sounds sensible for me. I had changed merge_source_create() to set refs
== 1 and had changed ref / unref calls appropriately.
>
> > + *
> > + * Return NULL and set a diag in case of an error.
> > + */
> > +struct merger_context *
> > +merger_context_new(const struct key_def *key_def);
> > +
> > +/* }}} */
> > +
> > +/* {{{ Merger */
> > +
> > +/**
> > + * Create a new merger w/o sources.
> > + *
> > + * Return NULL and set a diag in case of an error.
> > + */
> > +struct merger_source *
> > +merger_new(struct merger_context *ctx);
> > +
> > +/**
> > + * Set sources for a merger.
> > + *
> > + * Return 0 at success. Return -1 at an error and set a diag.
> > + */
> > +int
> > +merger_set_sources(struct merger_source *base, struct merger_source **sources,
> > + uint32_t sources_count);
>
> source_count :)
Changed all such names.
>
> > +
> > +/**
> > + * Set reverse flag for a merger.
> > + */
> > +void
> > +merger_set_reverse(struct merger_source *base, bool reverse);
>
> I'd rather pass all the arguments directly to the constructor, the way
> you do it in Lua, otherwise unsettling questions might arise: what
> happens if I call set_sources or set_reverse after using the merger?
> can I use a merger without calling those functions?
Good points. Done.
>
> > +
> > +/* }}} */
> > +
> > +#if defined(__cplusplus)
> > +} /* extern "C" */
> > +#endif /* defined(__cplusplus) */
> > +
> > +#endif /* TARANTOOL_BOX_MERGER_H_INCLUDED */
> > diff --git a/test/unit/merger.test.c b/test/unit/merger.test.c
> > new file mode 100644
> > index 000000000..0a25d8f04
> > --- /dev/null
> > +++ b/test/unit/merger.test.c
> > @@ -0,0 +1,301 @@
> > +#include "unit.h" /* plan, header, footer, is, ok */
> > +#include "memory.h" /* memory_init() */
> > +#include "fiber.h" /* fiber_init() */
> > +#include "box/tuple.h" /* tuple_init(), box_tuple_*(),
> > + tuple_*() */
> > +#include "box/tuple_format.h" /* box_tuple_format_default(),
> > + tuple_format_id() */
> > +#include "box/key_def.h" /* key_def_new(),
> > + key_def_delete() */
> > +#include "box/merger.h" /* merger_*() */
> > +
> > +/* {{{ Array merger source */
> > +
> > +struct merger_source_array {
> > + struct merger_source base;
> > + uint32_t tuples_count;
> > + struct tuple **tuples;
> > + uint32_t cur;
> > +};
> > +
> > +/* Virtual methods declarations */
> > +
> > +static void
> > +merger_source_array_delete(struct merger_source *base);
> > +static int
> > +merger_source_array_next(struct merger_source *base, box_tuple_format_t *format,
> > + struct tuple **out);
> > +
> > +/* Non-virtual methods */
> > +
> > +static struct merger_source *
> > +merger_source_array_new(bool even)
> > +{
> > + static struct merger_source_vtab merger_source_array_vtab = {
> > + .delete = merger_source_array_delete,
> > + .next = merger_source_array_next,
> > + };
> > +
> > + struct merger_source_array *source =
> > + (struct merger_source_array *) malloc(
> > + sizeof(struct merger_source_array));
> > + assert(source != NULL);
> > +
> > + merger_source_new(&source->base, &merger_source_array_vtab);
> > +
> > + uint32_t tuple_size = 2;
> > + const uint32_t tuples_count = 2;
> > + /* {1}, {3} */
> > + static const char *data_odd[] = {"\x91\x01", "\x91\x03"};
>
> You might want to use mp_format here.
Good idea, but here arrays of constant strings looks shorther and so I
prefer to leave it as is. Comments should clarify what the data mean.
Changed arrays in comments from Lua-style '{1}' to JSON-style '[1]'.
>
> > + /* {2}, {4} */
> > + static const char *data_even[] = {"\x91\x02", "\x91\x04"};
> > + const char **data = even ? data_even : data_odd;
> > + source->tuples = (struct tuple **) malloc(
> > + tuples_count * sizeof(struct tuple *));
> > + assert(source->tuples != NULL);
> > + box_tuple_format_t *format = box_tuple_format_default();
> > + for (uint32_t i = 0; i < tuples_count; ++i) {
> > + const char *end = data[i] + tuple_size;
> > + source->tuples[i] = box_tuple_new(format, data[i], end);
> > + box_tuple_ref(source->tuples[i]);
> > + }
> > + source->tuples_count = tuples_count;
> > + source->cur = 0;
> > +
> > + return &source->base;
> > +}
^ permalink raw reply [flat|nested] 39+ messages in thread
* [PATCH v3 7/7] Add merger for tuple streams (Lua part)
2019-04-10 15:21 [PATCH v3 0/7] Merger Alexander Turenko
` (5 preceding siblings ...)
2019-04-10 15:21 ` [PATCH v3 6/7] Add merger for tuples streams (C part) Alexander Turenko
@ 2019-04-10 15:21 ` Alexander Turenko
2019-04-25 11:46 ` [tarantool-patches] " Konstantin Osipov
2019-04-30 17:37 ` Vladimir Davydov
6 siblings, 2 replies; 39+ messages in thread
From: Alexander Turenko @ 2019-04-10 15:21 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches
Fixes #3276.
@TarantoolBot document
Title: Merger for tuple streams
The main concept of the merger is a source. It is an object that
provides a stream of tuples. There are four types of sources: a tuple
source, a table source, a buffer source and a merger itself.
A tuple source just return one tuple. However this source (as well as a
table and a buffer ones) supports fetching of a next data chunk, so the
API allows to create it from a Lua iterator:
`merger.new_tuple_source(gen, param, state)`. A `gen` function should
return `state, tuple` on each call and then return `nil` when no more
tuples available. Consider the example:
```lua
box.cfg({})
box.schema.space.create('s')
box.space.s:create_index('pk')
box.space.s:insert({1})
box.space.s:insert({2})
box.space.s:insert({3})
s = merger.new_tuple_source(box.space.s:pairs())
s:select()
---
- - [1]
- [2]
- [3]
...
s = merger.new_tuple_source(box.space.s:pairs())
s:pairs():totable()
---
- - [1]
- [2]
- [3]
...
```
As we see a source (it is common for all sources) has `:select()` and
`:pairs()` methods. The first one has two options: `buffer` and `limit`
with the same meaning as ones in net.box `:select()`. The `:pairs()`
method (or `:ipairs()` alias) returns a luafun iterator (it is a Lua
iterator, but also provides a set of handy methods to operate in
functional style).
The same API exists to create a table and a buffer source:
`merger.new_table_source(gen, param, state)` and
`merger.new_buffer_source(gen, param, state)`. A `gen` function should
return a table or a buffer on each call.
There are also helpers that are useful when all data are available at
once: `merger.new_source_fromtable(tbl)` and
`merger.new_source_frombuffer(buf)`.
A merger is a special kind of a source, which is created from a key_def
object and a set of sources. It performs a kind of the merge sort:
chooses a source with a minimal / maximal tuple on each step, consumes
a tuple from this source and repeats. The API to create a merger is the
following:
```lua
local ctx = merger.context.new(key_def.new(<...>))
local sources = {<...>}
local merger_inst = merger.new(ctx, sources, {
-- Ascending (false) or descending (true) order.
-- Default is ascending.
reverse = <boolean> or <nil>,
})
```
An instance of a merger has the same `:select()` and `:pairs()` methods
as any other source. A merger context is a part of a merger state that
is immutable and can be cached across requests with the same ordering
rules (typically requests to a same space).
The `key_def.new()` function takes a table of key parts as an argument
in the same format as box.space.<...>.index.<...>.parts or
conn.space.<...>.index.<...>.parts (where conn is a net.box connection):
```
local key_parts = {
{
fieldno = <number>,
type = <string>,
[ is_nullable = <boolean>, ]
[ collation_id = <number>, ]
[ collation = <string>, ]
},
...
}
local key_def_inst = key_def.new(key_parts)
```
---
src/box/CMakeLists.txt | 2 +
src/box/lua/init.c | 7 +-
src/box/lua/merger.c | 1184 ++++++++++++++++++++++++++++++++++
src/box/lua/merger.h | 47 ++
src/box/lua/merger.lua | 41 ++
test/box-tap/merger.test.lua | 725 +++++++++++++++++++++
6 files changed, 2005 insertions(+), 1 deletion(-)
create mode 100644 src/box/lua/merger.c
create mode 100644 src/box/lua/merger.h
create mode 100644 src/box/lua/merger.lua
create mode 100755 test/box-tap/merger.test.lua
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index d1251c326..491e3d160 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -13,6 +13,7 @@ lua_source(lua_sources lua/upgrade.lua)
lua_source(lua_sources lua/console.lua)
lua_source(lua_sources lua/xlog.lua)
lua_source(lua_sources lua/key_def.lua)
+lua_source(lua_sources lua/merger.lua)
set(bin_sources)
bin_source(bin_sources bootstrap.snap bootstrap.h)
@@ -143,6 +144,7 @@ add_library(box STATIC
lua/xlog.c
lua/execute.c
lua/key_def.c
+ lua/merger.c
${bin_sources})
target_link_libraries(box box_error tuple stat xrow xlog vclock crc32 scramble
diff --git a/src/box/lua/init.c b/src/box/lua/init.c
index 6b8be5096..76b987b4b 100644
--- a/src/box/lua/init.c
+++ b/src/box/lua/init.c
@@ -60,6 +60,7 @@
#include "box/lua/tuple.h"
#include "box/lua/execute.h"
#include "box/lua/key_def.h"
+#include "box/lua/merger.h"
extern char session_lua[],
tuple_lua[],
@@ -70,7 +71,8 @@ extern char session_lua[],
feedback_daemon_lua[],
net_box_lua[],
upgrade_lua[],
- console_lua[];
+ console_lua[],
+ merger_lua[];
static const char *lua_sources[] = {
"box/session", session_lua,
@@ -83,6 +85,7 @@ static const char *lua_sources[] = {
"box/load_cfg", load_cfg_lua,
"box/xlog", xlog_lua,
"box/key_def", key_def_lua,
+ "box/merger", merger_lua,
NULL
};
@@ -317,6 +320,8 @@ box_lua_init(struct lua_State *L)
lua_pop(L, 1);
luaopen_key_def(L);
lua_pop(L, 1);
+ luaopen_merger(L);
+ lua_pop(L, 1);
/* Load Lua extension */
for (const char **s = lua_sources; *s; s += 2) {
diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c
new file mode 100644
index 000000000..ebe60bc8d
--- /dev/null
+++ b/src/box/lua/merger.c
@@ -0,0 +1,1184 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "box/lua/merger.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <lua.h> /* lua_*() */
+#include <lauxlib.h> /* luaL_*() */
+
+#include "fiber.h" /* fiber() */
+#include "diag.h" /* diag_set() */
+
+#include "box/tuple.h" /* box_tuple_*() */
+
+#include "lua/error.h" /* luaT_error() */
+#include "lua/utils.h" /* luaL_pushcdata(),
+ luaL_iterator_*() */
+
+#include "box/lua/key_def.h" /* check_key_def() */
+#include "box/lua/tuple.h" /* luaT_tuple_new() */
+
+#include "small/ibuf.h" /* struct ibuf */
+#include "msgpuck.h" /* mp_*() */
+
+#include "box/merger.h" /* merger_*() */
+
+static uint32_t merger_source_type_id = 0;
+static uint32_t merger_context_type_id = 0;
+static uint32_t ibuf_type_id = 0;
+
+/**
+ * A type of a function to create a source from a Lua iterator on
+ * a Lua stack.
+ *
+ * Such function is to be passed to lbox_merger_source_new() as
+ * a parameter.
+ */
+typedef struct merger_source *(*luaL_merger_source_new_f)(struct lua_State *L);
+
+/* {{{ Helpers */
+
+/**
+ * Extract an ibuf object from the Lua stack.
+ */
+static struct ibuf *
+luaT_check_ibuf(struct lua_State *L, int idx)
+{
+ if (lua_type(L, idx) != LUA_TCDATA)
+ return NULL;
+
+ uint32_t cdata_type;
+ struct ibuf *ibuf_ptr = luaL_checkcdata(L, idx, &cdata_type);
+ if (ibuf_ptr == NULL || cdata_type != ibuf_type_id)
+ return NULL;
+ return ibuf_ptr;
+}
+
+/**
+ * Extract a merger source from the Lua stack.
+ */
+static struct merger_source *
+luaT_check_merger_source(struct lua_State *L, int idx)
+{
+ uint32_t cdata_type;
+ struct merger_source **source_ptr = luaL_checkcdata(L, idx,
+ &cdata_type);
+ if (source_ptr == NULL || cdata_type != merger_source_type_id)
+ return NULL;
+ return *source_ptr;
+}
+
+/**
+ * Extract a merger context from the Lua stack.
+ */
+static struct merger_context *
+luaT_check_merger_context(struct lua_State *L, int idx)
+{
+ uint32_t cdata_type;
+ struct merger_context **ctx_ptr = luaL_checkcdata(L, idx, &cdata_type);
+ if (ctx_ptr == NULL || cdata_type != merger_context_type_id)
+ return NULL;
+ return *ctx_ptr;
+}
+
+/**
+ * Skip an array around tuples and save its length.
+ */
+static int
+decode_header(struct ibuf *buf, size_t *len_p)
+{
+ /* Check the buffer is correct. */
+ if (buf->rpos > buf->wpos)
+ return -1;
+
+ /* Skip decoding if the buffer is empty. */
+ if (ibuf_used(buf) == 0) {
+ *len_p = 0;
+ return 0;
+ }
+
+ /* Check and skip the array around tuples. */
+ int ok = mp_typeof(*buf->rpos) == MP_ARRAY;
+ if (ok)
+ ok = mp_check_array(buf->rpos, buf->wpos) <= 0;
+ if (ok)
+ *len_p = mp_decode_array((const char **) &buf->rpos);
+ return ok ? 0 : -1;
+}
+
+/**
+ * Encode an array around tuples.
+ */
+static void
+encode_header(struct ibuf *obuf, uint32_t result_len)
+{
+ ibuf_reserve(obuf, mp_sizeof_array(result_len));
+ obuf->wpos = mp_encode_array(obuf->wpos, result_len);
+}
+
+/* }}} */
+
+/* {{{ Create, delete structures from Lua */
+
+/**
+ * Free a merger source from a Lua code.
+ */
+static int
+lbox_merger_source_gc(struct lua_State *L)
+{
+ struct merger_source *source;
+ if ((source = luaT_check_merger_source(L, 1)) == NULL)
+ return 0;
+ merger_source_unref(source);
+ return 0;
+}
+
+/**
+ * Free a merger context from a Lua code.
+ */
+static int
+lbox_merger_context_gc(struct lua_State *L)
+{
+ struct merger_context *ctx;
+ if ((ctx = luaT_check_merger_context(L, 1)) == NULL)
+ return 0;
+ merger_context_unref(ctx);
+ return 0;
+}
+
+/**
+ * Create a new source from a Lua iterator and push it onto the
+ * Lua stack.
+ *
+ * It is the helper for lbox_merger_new_buffer_source(),
+ * lbox_merger_new_table_source() and
+ * lbox_merger_new_tuple_source().
+ */
+static int
+lbox_merger_source_new(struct lua_State *L, const char *func_name,
+ luaL_merger_source_new_f luaL_merger_source_new)
+{
+ int top = lua_gettop(L);
+ if (top < 1 || top > 3 || !luaL_iscallable(L, 1))
+ return luaL_error(L, "Usage: %s(gen, param, state)", func_name);
+
+ /*
+ * luaL_merger_source_new() reads exactly three top
+ * values.
+ */
+ while (lua_gettop(L) < 3)
+ lua_pushnil(L);
+
+ struct merger_source *source = luaL_merger_source_new(L);
+ if (source == NULL)
+ return luaT_error(L);
+ merger_source_ref(source);
+ *(struct merger_source **) luaL_pushcdata(L, merger_source_type_id) =
+ source;
+ lua_pushcfunction(L, lbox_merger_source_gc);
+ luaL_setcdatagc(L, -2);
+
+ return 1;
+}
+
+/**
+ * Create a new merger context and push it to a Lua stack.
+ *
+ * Expect cdata<struct key_def> on a Lua stack.
+ */
+static int
+lbox_merger_context_new(struct lua_State *L)
+{
+ struct key_def *key_def;
+ if (lua_gettop(L) != 1 || (key_def = check_key_def(L, 1)) == NULL)
+ return luaL_error(L, "Usage: merger.context.new(key_def)");
+
+ struct merger_context *ctx = merger_context_new(key_def);
+ if (ctx == NULL)
+ return luaT_error(L);
+
+ merger_context_ref(ctx);
+ *(struct merger_context **) luaL_pushcdata(L, merger_context_type_id) =
+ ctx;
+ lua_pushcfunction(L, lbox_merger_context_gc);
+ luaL_setcdatagc(L, -2);
+
+ return 1;
+}
+
+/**
+ * Raise a Lua error with merger.new() usage info.
+ */
+static int
+lbox_merger_new_usage(struct lua_State *L, const char *param_name)
+{
+ static const char *usage = "merger.new(merger_context, "
+ "{source, source, ...}[, {"
+ "reverse = <boolean> or <nil>}])";
+ if (param_name == NULL)
+ return luaL_error(L, "Bad params, use: %s", usage);
+ else
+ return luaL_error(L, "Bad param \"%s\", use: %s", param_name,
+ usage);
+}
+
+/**
+ * Parse a second parameter of merger.new() into an array of
+ * sources.
+ *
+ * Return an array of pointers to sources and set @a
+ * sources_count_ptr. In case of an error set a diag and return
+ * NULL.
+ *
+ * It is the helper for lbox_merger_new().
+ */
+static struct merger_source **
+luaT_merger_new_parse_sources(struct lua_State *L, int idx,
+ uint32_t *sources_count_ptr)
+{
+ /* Allocate sources array. */
+ uint32_t sources_count = lua_objlen(L, idx);
+ const ssize_t sources_size =
+ sources_count * sizeof(struct merger_source *);
+ struct merger_source **sources =
+ (struct merger_source **) malloc(sources_size);
+ if (sources == NULL) {
+ diag_set(OutOfMemory, sources_size, "malloc", "sources");
+ return NULL;
+ }
+
+ /* Save all sources. */
+ for (uint32_t i = 0; i < sources_count; ++i) {
+ lua_pushinteger(L, i + 1);
+ lua_gettable(L, idx);
+
+ /* Extract a source from a Lua stack. */
+ struct merger_source *source = luaT_check_merger_source(L, -1);
+ if (source == NULL) {
+ free(sources);
+ diag_set(IllegalParams,
+ "Unknown source type at index %d", i + 1);
+ return NULL;
+ }
+ sources[i] = source;
+ }
+ lua_pop(L, sources_count);
+
+ *sources_count_ptr = sources_count;
+ return sources;
+}
+
+/**
+ * Create a new merger and push it to a Lua stack as a merger
+ * source.
+ *
+ * Expect cdata<struct context>, a table of sources and
+ * (optionally) a table of options on a Lua stack.
+ */
+static int
+lbox_merger_new(struct lua_State *L)
+{
+ struct merger_context *ctx;
+ int top = lua_gettop(L);
+ bool ok = (top == 2 || top == 3) &&
+ /* Merger context. */
+ (ctx = luaT_check_merger_context(L, 1)) != NULL &&
+ /* Sources. */
+ lua_istable(L, 2) == 1 &&
+ /* Opts. */
+ (lua_isnoneornil(L, 3) == 1 || lua_istable(L, 3) == 1);
+ if (!ok)
+ return lbox_merger_new_usage(L, NULL);
+
+ /* Options. */
+ bool reverse = false;
+
+ /* Parse options. */
+ if (!lua_isnoneornil(L, 3)) {
+ /* Parse reverse. */
+ lua_pushstring(L, "reverse");
+ lua_gettable(L, 3);
+ if (!lua_isnil(L, -1)) {
+ if (lua_isboolean(L, -1))
+ reverse = lua_toboolean(L, -1);
+ else
+ return lbox_merger_new_usage(L, "reverse");
+ }
+ lua_pop(L, 1);
+ }
+
+ struct merger_source *merger = merger_new(ctx);
+ if (merger == NULL)
+ return luaT_error(L);
+
+ uint32_t sources_count = 0;
+ struct merger_source **sources = luaT_merger_new_parse_sources(L, 2,
+ &sources_count);
+ if (sources == NULL) {
+ merger->vtab->delete(merger);
+ return luaT_error(L);
+ }
+
+ if (merger_set_sources(merger, sources, sources_count) != 0) {
+ free(sources);
+ merger->vtab->delete(merger);
+ return luaT_error(L);
+ }
+ free(sources);
+ merger_set_reverse(merger, reverse);
+
+ merger_source_ref(merger);
+ *(struct merger_source **)
+ luaL_pushcdata(L, merger_source_type_id) = merger;
+ lua_pushcfunction(L, lbox_merger_source_gc);
+ luaL_setcdatagc(L, -2);
+
+ return 1;
+}
+
+/* }}} */
+
+/* {{{ Buffer merger source */
+
+struct merger_source_buffer {
+ struct merger_source base;
+ /*
+ * A reference to a Lua iterator to fetch a next chunk of
+ * tuples.
+ */
+ struct luaL_iterator *fetch_it;
+ /*
+ * A reference a buffer with a current chunk of tuples.
+ * It is needed to prevent LuaJIT from collecting the
+ * buffer while the source consider it as the current
+ * one.
+ */
+ int ref;
+ /*
+ * A buffer with a current chunk of tuples.
+ */
+ struct ibuf *buf;
+ /*
+ * A merger stops before end of a buffer when it is not
+ * the last merger in the chain.
+ */
+ size_t remaining_tuples_cnt;
+};
+
+/* Virtual methods declarations */
+
+static void
+luaL_merger_source_buffer_delete(struct merger_source *base);
+static int
+luaL_merger_source_buffer_next(struct merger_source *base,
+ box_tuple_format_t *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Create a new merger source of the buffer type.
+ *
+ * Reads gen, param, state from the top of a Lua stack.
+ *
+ * In case of an error it returns NULL and sets a diag.
+ */
+static struct merger_source *
+luaL_merger_source_buffer_new(struct lua_State *L)
+{
+ static struct merger_source_vtab merger_source_buffer_vtab = {
+ .delete = luaL_merger_source_buffer_delete,
+ .next = luaL_merger_source_buffer_next,
+ };
+
+ struct merger_source_buffer *source = (struct merger_source_buffer *)
+ malloc(sizeof(struct merger_source_buffer));
+ if (source == NULL) {
+ diag_set(OutOfMemory, sizeof(struct merger_source_buffer),
+ "malloc", "merger_source_buffer");
+ return NULL;
+ }
+
+ merger_source_new(&source->base, &merger_source_buffer_vtab);
+
+ source->fetch_it = luaL_iterator_new(L, 0);
+ source->ref = 0;
+ source->buf = NULL;
+ source->remaining_tuples_cnt = 0;
+
+ return &source->base;
+}
+
+/**
+ * Call a user provided function to get a next data chunk (a
+ * buffer).
+ *
+ * Return 1 when a new buffer is received, 0 when a buffers
+ * iterator ends and -1 at error and set a diag.
+ */
+static int
+luaL_merger_source_buffer_fetch(struct merger_source_buffer *source)
+{
+ struct lua_State *L = fiber()->storage.lua.stack;
+ int nresult = luaL_iterator_next(L, source->fetch_it);
+
+ /* Handle a Lua error in a gen function. */
+ if (nresult == -1)
+ return -1;
+
+ /* No more data: do nothing. */
+ if (nresult == 0)
+ return 0;
+
+ /* Handle incorrect results count. */
+ if (nresult != 2) {
+ diag_set(IllegalParams, "Expected <state>, <buffer>, got %d "
+ "return values", nresult);
+ return -1;
+ }
+
+ /* Set a new buffer as the current chunk. */
+ if (source->ref > 0)
+ luaL_unref(L, LUA_REGISTRYINDEX, source->ref);
+ lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */
+ source->ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ source->buf = luaT_check_ibuf(L, -1);
+ assert(source->buf != NULL);
+ lua_pop(L, nresult);
+
+ /* Update remaining_tuples_cnt and skip the header. */
+ if (decode_header(source->buf, &source->remaining_tuples_cnt) != 0) {
+ diag_set(IllegalParams, "Invalid merger source %p",
+ &source->base);
+ return -1;
+ }
+ return 1;
+}
+
+/* Virtual methods */
+
+static void
+luaL_merger_source_buffer_delete(struct merger_source *base)
+{
+ struct merger_source_buffer *source = container_of(base,
+ struct merger_source_buffer, base);
+
+ assert(source->fetch_it != NULL);
+ luaL_iterator_delete(source->fetch_it);
+ if (source->ref > 0)
+ luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref);
+
+ free(source);
+}
+
+static int
+luaL_merger_source_buffer_next(struct merger_source *base,
+ box_tuple_format_t *format,
+ struct tuple **out)
+{
+ struct merger_source_buffer *source = container_of(base,
+ struct merger_source_buffer, base);
+
+ /*
+ * Handle the case when all data were processed: ask a
+ * next chunk until a non-empty chunk is received or a
+ * chunks iterator ends.
+ */
+ while (source->remaining_tuples_cnt == 0) {
+ int rc = luaL_merger_source_buffer_fetch(source);
+ if (rc < 0)
+ return -1;
+ if (rc == 0) {
+ *out = NULL;
+ return 0;
+ }
+ }
+ if (ibuf_used(source->buf) == 0) {
+ diag_set(IllegalParams, "Unexpected msgpack buffer end");
+ return -1;
+ }
+ const char *tuple_beg = source->buf->rpos;
+ const char *tuple_end = tuple_beg;
+ /*
+ * mp_next() is faster then mp_check(), but can read bytes
+ * outside of the buffer and so can cause segmentation
+ * faults or an incorrect result.
+ *
+ * We check buffer boundaries after the mp_next() call and
+ * throw an error when the boundaries are violated, but it
+ * does not save us from possible segmentation faults.
+ *
+ * It is in a user responsibility to provide valid
+ * msgpack.
+ */
+ mp_next(&tuple_end);
+ --source->remaining_tuples_cnt;
+ if (tuple_end > source->buf->wpos) {
+ diag_set(IllegalParams, "Unexpected msgpack buffer end");
+ return -1;
+ }
+ source->buf->rpos = (char *) tuple_end;
+ if (format == NULL)
+ format = box_tuple_format_default();
+ struct tuple *tuple = box_tuple_new(format, tuple_beg, tuple_end);
+ if (tuple == NULL)
+ return -1;
+
+ box_tuple_ref(tuple);
+ *out = tuple;
+ return 0;
+}
+
+/* Lua functions */
+
+/**
+ * Create a new buffer source and push it onto the Lua stack.
+ */
+static int
+lbox_merger_new_buffer_source(struct lua_State *L)
+{
+ return lbox_merger_source_new(L, "merger.new_buffer_source",
+ luaL_merger_source_buffer_new);
+}
+
+/* }}} */
+
+/* {{{ Table merger source */
+
+struct merger_source_table {
+ struct merger_source base;
+ /*
+ * A reference to a Lua iterator to fetch a next chunk of
+ * tuples.
+ */
+ struct luaL_iterator *fetch_it;
+ /*
+ * A reference to a table with a current chunk of tuples.
+ */
+ int ref;
+ /* An index of current tuples within a current chunk. */
+ int next_idx;
+};
+
+/* Virtual methods declarations */
+
+static void
+luaL_merger_source_table_delete(struct merger_source *base);
+static int
+luaL_merger_source_table_next(struct merger_source *base,
+ box_tuple_format_t *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Create a new merger source of the table type.
+ *
+ * In case of an error it returns NULL and set a diag.
+ */
+static struct merger_source *
+luaL_merger_source_table_new(struct lua_State *L)
+{
+ static struct merger_source_vtab merger_source_table_vtab = {
+ .delete = luaL_merger_source_table_delete,
+ .next = luaL_merger_source_table_next,
+ };
+
+ struct merger_source_table *source = (struct merger_source_table *)
+ malloc(sizeof(struct merger_source_table));
+ if (source == NULL) {
+ diag_set(OutOfMemory, sizeof(struct merger_source_table),
+ "malloc", "merger_source_table");
+ return NULL;
+ }
+
+ merger_source_new(&source->base, &merger_source_table_vtab);
+
+ source->fetch_it = luaL_iterator_new(L, 0);
+ source->ref = 0;
+ source->next_idx = 1;
+
+ return &source->base;
+}
+
+/**
+ * Call a user provided function to fill the source.
+ *
+ * Return 0 when a tables iterator ends, 1 when a new table is
+ * received and -1 at an error (set a diag).
+ */
+static int
+luaL_merger_source_table_fetch(struct merger_source_table *source)
+{
+ struct lua_State *L = fiber()->storage.lua.stack;
+ int nresult = luaL_iterator_next(L, source->fetch_it);
+
+ /* Handle a Lua error in a gen function. */
+ if (nresult == -1)
+ return -1;
+
+ /* No more data: do nothing. */
+ if (nresult == 0)
+ return 0;
+
+ /* Handle incorrect results count. */
+ if (nresult != 2) {
+ diag_set(IllegalParams, "Expected <state>, <table>, got %d "
+ "return values", nresult);
+ return -1;
+ }
+
+ /* Set a new table as the current chunk. */
+ if (source->ref > 0)
+ luaL_unref(L, LUA_REGISTRYINDEX, source->ref);
+ lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */
+ source->ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ source->next_idx = 1;
+ lua_pop(L, nresult);
+
+ return 1;
+}
+
+/* Virtual methods */
+
+static void
+luaL_merger_source_table_delete(struct merger_source *base)
+{
+ struct merger_source_table *source = container_of(base,
+ struct merger_source_table, base);
+
+ assert(source->fetch_it != NULL);
+ luaL_iterator_delete(source->fetch_it);
+ if (source->ref > 0)
+ luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref);
+
+ free(source);
+}
+
+static int
+luaL_merger_source_table_next(struct merger_source *base,
+ box_tuple_format_t *format,
+ struct tuple **out)
+{
+ struct lua_State *L = fiber()->storage.lua.stack;
+ struct merger_source_table *source = container_of(base,
+ struct merger_source_table, base);
+
+ if (source->ref > 0) {
+ lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref);
+ lua_pushinteger(L, source->next_idx);
+ lua_gettable(L, -2);
+ }
+ /*
+ * If all data were processed, try to fetch more.
+ */
+ while (source->ref == 0 || lua_isnil(L, -1)) {
+ if (source->ref > 0)
+ lua_pop(L, 2);
+ int rc = luaL_merger_source_table_fetch(source);
+ if (rc < 0)
+ return -1;
+ if (rc == 0) {
+ *out = NULL;
+ return 0;
+ }
+ /*
+ * Retry tuple extracting when a next table is
+ * received.
+ */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref);
+ lua_pushinteger(L, source->next_idx);
+ lua_gettable(L, -2);
+ }
+ if (format == NULL)
+ format = box_tuple_format_default();
+ struct tuple *tuple = luaT_tuple_new(L, -1, format);
+ if (tuple == NULL)
+ return -1;
+ ++source->next_idx;
+ lua_pop(L, 2);
+
+ box_tuple_ref(tuple);
+ *out = tuple;
+ return 0;
+}
+
+/* Lua functions */
+
+/**
+ * Create a new table source and push it onto the Lua stack.
+ */
+static int
+lbox_merger_new_table_source(struct lua_State *L)
+{
+ return lbox_merger_source_new(L, "merger.new_table_source",
+ luaL_merger_source_table_new);
+}
+
+/* }}} */
+
+/* {{{ Tuple merger source */
+
+struct merger_source_tuple {
+ struct merger_source base;
+ /*
+ * A reference to a Lua iterator to fetch a next chunk of
+ * tuples.
+ */
+ struct luaL_iterator *fetch_it;
+};
+
+/* Virtual methods declarations */
+
+static void
+luaL_merger_source_tuple_delete(struct merger_source *base);
+static int
+luaL_merger_source_tuple_next(struct merger_source *base,
+ box_tuple_format_t *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Create a new merger source of the tuple type.
+ *
+ * In case of an error it returns NULL and set a diag.
+ */
+static struct merger_source *
+luaL_merger_source_tuple_new(struct lua_State *L)
+{
+ static struct merger_source_vtab merger_source_tuple_vtab = {
+ .delete = luaL_merger_source_tuple_delete,
+ .next = luaL_merger_source_tuple_next,
+ };
+
+ struct merger_source_tuple *source =
+ (struct merger_source_tuple *) malloc(
+ sizeof(struct merger_source_tuple));
+ if (source == NULL) {
+ diag_set(OutOfMemory, sizeof(struct merger_source_tuple),
+ "malloc", "merger_source_tuple");
+ return NULL;
+ }
+
+ merger_source_new(&source->base, &merger_source_tuple_vtab);
+
+ source->fetch_it = luaL_iterator_new(L, 0);
+
+ return &source->base;
+}
+
+/**
+ * Call a user provided function to fill the source.
+ *
+ * Return 1 at success and push a resulting tuple to a the Lua
+ * stack.
+ * Return 0 when no more data.
+ * Return -1 at error (set a diag).
+ */
+static int
+luaL_merger_source_tuple_fetch(struct merger_source_tuple *source,
+ struct lua_State *L)
+{
+ int nresult = luaL_iterator_next(L, source->fetch_it);
+
+ /* Handle a Lua error in a gen function. */
+ if (nresult == -1)
+ return -1;
+
+ /* No more data: do nothing. */
+ if (nresult == 0)
+ return 0;
+
+ /* Handle incorrect results count. */
+ if (nresult != 2) {
+ diag_set(IllegalParams, "Expected <state>, <tuple> got %d "
+ "return values", nresult);
+ return -1;
+ }
+
+ /* Set a new tuple as the current chunk. */
+ lua_insert(L, -2); /* Swap state and tuple. */
+ lua_pop(L, 1); /* Pop state. */
+
+ return 1;
+}
+
+/* Virtual methods */
+
+static void
+luaL_merger_source_tuple_delete(struct merger_source *base)
+{
+ struct merger_source_tuple *source = container_of(base,
+ struct merger_source_tuple, base);
+
+ assert(source->fetch_it != NULL);
+ luaL_iterator_delete(source->fetch_it);
+
+ free(source);
+}
+
+static int
+luaL_merger_source_tuple_next(struct merger_source *base,
+ box_tuple_format_t *format,
+ struct tuple **out)
+{
+ struct lua_State *L = fiber()->storage.lua.stack;
+ struct merger_source_tuple *source = container_of(base,
+ struct merger_source_tuple, base);
+
+ int rc = luaL_merger_source_tuple_fetch(source, L);
+ if (rc < 0)
+ return -1;
+ /*
+ * Check whether a tuple appears after the fetch.
+ */
+ if (rc == 0) {
+ *out = NULL;
+ return 0;
+ }
+
+ if (format == NULL)
+ format = box_tuple_format_default();
+ struct tuple *tuple = luaT_tuple_new(L, -1, format);
+ if (tuple == NULL)
+ return -1;
+ lua_pop(L, 1);
+
+ box_tuple_ref(tuple);
+ *out = tuple;
+ return 0;
+}
+
+/* Lua functions */
+
+/**
+ * Create a new tuple source and push it onto the Lua stack.
+ */
+static int
+lbox_merger_new_tuple_source(struct lua_State *L)
+{
+ return lbox_merger_source_new(L, "merger.new_tuple_source",
+ luaL_merger_source_tuple_new);
+}
+
+/* }}} */
+
+/* {{{ Merger source Lua methods */
+
+/**
+ * Iterator gen function to traverse source results.
+ *
+ * Expected a nil as the first parameter (param) and a
+ * merger_source as the second parameter (state) on a Lua stack.
+ *
+ * Push the original merger_source (as a new state) and a next
+ * tuple.
+ */
+static int
+lbox_merger_source_gen(struct lua_State *L)
+{
+ struct merger_source *source;
+ bool ok = lua_gettop(L) == 2 && lua_isnil(L, 1) &&
+ (source = luaT_check_merger_source(L, 2)) != NULL;
+ if (!ok)
+ return luaL_error(L, "Bad params, use: lbox_merger_source_gen("
+ "nil, merger_source)");
+
+ struct tuple *tuple;
+ if (source->vtab->next(source, NULL, &tuple) != 0)
+ return luaT_error(L);
+ if (tuple == NULL) {
+ lua_pushnil(L);
+ lua_pushnil(L);
+ return 2;
+ }
+
+ /* Push merger_source, tuple. */
+ *(struct merger_source **)
+ luaL_pushcdata(L, merger_source_type_id) = source;
+ luaT_pushtuple(L, tuple);
+
+ /*
+ * luaT_pushtuple() references the tuple, so we
+ * unreference it on merger's side.
+ */
+ box_tuple_unref(tuple);
+
+ return 2;
+}
+
+/**
+ * Iterate over merger source results from Lua.
+ *
+ * Push three values to the Lua stack:
+ *
+ * 1. gen (lbox_merger_source_gen wrapped by fun.wrap());
+ * 2. param (nil);
+ * 3. state (merger_source).
+ */
+static int
+lbox_merger_source_ipairs(struct lua_State *L)
+{
+ struct merger_source *source;
+ bool ok = lua_gettop(L) == 1 &&
+ (source = luaT_check_merger_source(L, 1)) != NULL;
+ if (!ok)
+ return luaL_error(L, "Usage: merger_source:ipairs()");
+ /* Stack: merger_source. */
+
+ luaL_loadstring(L, "return require('fun').wrap");
+ lua_call(L, 0, 1);
+ lua_insert(L, -2); /* Swap merger_source and wrap. */
+ /* Stack: wrap, merger_source. */
+
+ lua_pushcfunction(L, lbox_merger_source_gen);
+ lua_insert(L, -2); /* Swap merger_source and gen. */
+ /* Stack: wrap, gen, merger_source. */
+
+ /*
+ * Push nil as an iterator param, because all needed state
+ * is in a merger source.
+ */
+ lua_pushnil(L);
+ /* Stack: wrap, gen, merger_source, nil. */
+
+ lua_insert(L, -2); /* Swap merger_source and nil. */
+ /* Stack: wrap, gen, nil, merger_source. */
+
+ /* Call fun.wrap(gen, nil, merger_source). */
+ lua_call(L, 3, 3);
+ return 3;
+}
+
+/**
+ * Write source results into ibuf.
+ *
+ * It is the helper for lbox_merger_source_select().
+ */
+static int
+encode_result_buffer(struct lua_State *L, struct merger_source *source,
+ struct ibuf *obuf, uint32_t limit)
+{
+ uint32_t result_len = 0;
+ uint32_t result_len_offset = 4;
+
+ /*
+ * Reserve maximum size for the array around resulting
+ * tuples to set it later.
+ */
+ encode_header(obuf, UINT32_MAX);
+
+ /* Fetch, merge and copy tuples to the buffer. */
+ struct tuple *tuple;
+ int rc = 0;
+ while (result_len < limit && (rc =
+ source->vtab->next(source, NULL, &tuple)) == 0 &&
+ tuple != NULL) {
+ uint32_t bsize = tuple->bsize;
+ ibuf_reserve(obuf, bsize);
+ memcpy(obuf->wpos, tuple_data(tuple), bsize);
+ obuf->wpos += bsize;
+ result_len_offset += bsize;
+ ++result_len;
+
+ /* The received tuple is not more needed. */
+ box_tuple_unref(tuple);
+ }
+
+ if (rc != 0)
+ return luaT_error(L);
+
+ /* Write the real array size. */
+ mp_store_u32(obuf->wpos - result_len_offset, result_len);
+
+ return 0;
+}
+
+/**
+ * Write source results into a new Lua table.
+ *
+ * It is the helper for lbox_merger_source_select().
+ */
+static int
+create_result_table(struct lua_State *L, struct merger_source *source,
+ uint32_t limit)
+{
+ /* Create result table. */
+ lua_newtable(L);
+
+ uint32_t cur = 1;
+
+ /* Fetch, merge and save tuples to the table. */
+ struct tuple *tuple;
+ int rc = 0;
+ while (cur - 1 < limit && (rc =
+ source->vtab->next(source, NULL, &tuple)) == 0 &&
+ tuple != NULL) {
+ luaT_pushtuple(L, tuple);
+ lua_rawseti(L, -2, cur);
+ ++cur;
+
+ /*
+ * luaT_pushtuple() references the tuple, so we
+ * unreference it on merger's side.
+ */
+ box_tuple_unref(tuple);
+ }
+
+ if (rc != 0)
+ return luaT_error(L);
+
+ return 1;
+}
+
+/**
+ * Raise a Lua error with merger_inst:select() usage info.
+ */
+static int
+lbox_merger_source_select_usage(struct lua_State *L, const char *param_name)
+{
+ static const char *usage = "merger_source:select([{"
+ "buffer = <cdata<struct ibuf>> or <nil>, "
+ "limit = <number> or <nil>}])";
+ if (param_name == NULL)
+ return luaL_error(L, "Bad params, use: %s", usage);
+ else
+ return luaL_error(L, "Bad param \"%s\", use: %s", param_name,
+ usage);
+}
+
+/**
+ * Pull results of a merger source to a Lua stack.
+ *
+ * Write results into a buffer or a Lua table depending on
+ * options.
+ *
+ * Expected a merger source and options (optional) on a Lua stack.
+ *
+ * Return a Lua table or nothing when a 'buffer' option is
+ * provided.
+ */
+static int
+lbox_merger_source_select(struct lua_State *L)
+{
+ struct merger_source *source;
+ int top = lua_gettop(L);
+ bool ok = (top == 1 || top == 2) &&
+ /* Merger source. */
+ (source = luaT_check_merger_source(L, 1)) != NULL &&
+ /* Opts. */
+ (lua_isnoneornil(L, 2) == 1 || lua_istable(L, 2) == 1);
+ if (!ok)
+ return lbox_merger_source_select_usage(L, NULL);
+
+ uint32_t limit = 0xFFFFFFFF;
+ struct ibuf *obuf = NULL;
+
+ /* Parse options. */
+ if (!lua_isnoneornil(L, 2)) {
+ /* Parse buffer. */
+ lua_pushstring(L, "buffer");
+ lua_gettable(L, 2);
+ if (!lua_isnil(L, -1)) {
+ if ((obuf = luaT_check_ibuf(L, -1)) == NULL)
+ return lbox_merger_source_select_usage(L,
+ "buffer");
+ }
+ lua_pop(L, 1);
+
+ /* Parse limit. */
+ lua_pushstring(L, "limit");
+ lua_gettable(L, 2);
+ if (!lua_isnil(L, -1)) {
+ if (lua_isnumber(L, -1))
+ limit = lua_tointeger(L, -1);
+ else
+ return lbox_merger_source_select_usage(L,
+ "limit");
+ }
+ lua_pop(L, 1);
+ }
+
+ if (obuf == NULL)
+ return create_result_table(L, source, limit);
+ else
+ return encode_result_buffer(L, source, obuf, limit);
+}
+
+/* }}} */
+
+/**
+ * Register the module.
+ */
+LUA_API int
+luaopen_merger(struct lua_State *L)
+{
+ luaL_cdef(L, "struct merger_source;");
+ luaL_cdef(L, "struct merger_context;");
+ luaL_cdef(L, "struct ibuf;");
+
+ merger_source_type_id = luaL_ctypeid(L, "struct merger_source&");
+ merger_context_type_id = luaL_ctypeid(L, "struct merger_context&");
+ ibuf_type_id = luaL_ctypeid(L, "struct ibuf");
+
+ /* Export C functions to Lua. */
+ static const struct luaL_Reg meta[] = {
+ {"new_buffer_source", lbox_merger_new_buffer_source},
+ {"new_table_source", lbox_merger_new_table_source},
+ {"new_tuple_source", lbox_merger_new_tuple_source},
+ {"new", lbox_merger_new},
+ {NULL, NULL}
+ };
+ luaL_register_module(L, "merger", meta);
+
+ /* Add context.new(). */
+ lua_newtable(L); /* merger.context */
+ lua_pushcfunction(L, lbox_merger_context_new);
+ lua_setfield(L, -2, "new");
+ lua_setfield(L, -2, "context");
+
+ /* Add internal.{select,ipairs}(). */
+ lua_newtable(L); /* merger.internal */
+ lua_pushcfunction(L, lbox_merger_source_select);
+ lua_setfield(L, -2, "select");
+ lua_pushcfunction(L, lbox_merger_source_ipairs);
+ lua_setfield(L, -2, "ipairs");
+ lua_setfield(L, -2, "internal");
+
+ return 1;
+}
diff --git a/src/box/lua/merger.h b/src/box/lua/merger.h
new file mode 100644
index 000000000..c3f648678
--- /dev/null
+++ b/src/box/lua/merger.h
@@ -0,0 +1,47 @@
+#ifndef TARANTOOL_BOX_LUA_MERGER_H_INCLUDED
+#define TARANTOOL_BOX_LUA_MERGER_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct lua_State;
+
+int
+luaopen_merger(struct lua_State *L);
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_BOX_LUA_MERGER_H_INCLUDED */
diff --git a/src/box/lua/merger.lua b/src/box/lua/merger.lua
new file mode 100644
index 000000000..9bf7cc019
--- /dev/null
+++ b/src/box/lua/merger.lua
@@ -0,0 +1,41 @@
+local ffi = require('ffi')
+local fun = require('fun')
+local merger = require('merger')
+
+local ibuf_t = ffi.typeof('struct ibuf')
+local merger_source_t = ffi.typeof('struct merger_source')
+
+-- Create a source from one buffer.
+merger.new_source_frombuffer = function(buf)
+ local func_name = 'merger.new_source_frombuffer'
+ if type(buf) ~= 'cdata' or not ffi.istype(ibuf_t, buf) then
+ error(('Usage: %s(<cdata<struct ibuf>>)'):format(func_name), 0)
+ end
+
+ return merger.new_buffer_source(fun.iter({buf}))
+end
+
+-- Create a source from one table.
+merger.new_source_fromtable = function(tbl)
+ local func_name = 'merger.new_source_fromtable'
+ if type(tbl) ~= 'table' then
+ error(('Usage: %s(<table>)'):format(func_name), 0)
+ end
+
+ return merger.new_table_source(fun.iter({tbl}))
+end
+
+local methods = {
+ ['select'] = merger.internal.select,
+ ['pairs'] = merger.internal.ipairs,
+ ['ipairs'] = merger.internal.ipairs,
+}
+
+ffi.metatype(merger_source_t, {
+ __index = function(self, key)
+ return methods[key]
+ end,
+ -- Lua 5.2 compatibility
+ __pairs = merger.internal.ipairs,
+ __ipairs = merger.internal.ipairs,
+})
diff --git a/test/box-tap/merger.test.lua b/test/box-tap/merger.test.lua
new file mode 100755
index 000000000..1412c6126
--- /dev/null
+++ b/test/box-tap/merger.test.lua
@@ -0,0 +1,725 @@
+#!/usr/bin/env tarantool
+
+local tap = require('tap')
+local buffer = require('buffer')
+local msgpackffi = require('msgpackffi')
+local digest = require('digest')
+local key_def = require('key_def')
+local merger = require('merger')
+local fiber = require('fiber')
+local utf8 = require('utf8')
+local ffi = require('ffi')
+local fun = require('fun')
+
+-- A chunk size for table and buffer sources. A chunk size for
+-- tuple source is always 1.
+local FETCH_BLOCK_SIZE = 10
+
+local function merger_new_usage(param)
+ local msg = 'merger.new(merger_context, ' ..
+ '{source, source, ...}[, {' ..
+ 'reverse = <boolean> or <nil>}])'
+ if not param then
+ return ('Bad params, use: %s'):format(msg)
+ else
+ return ('Bad param "%s", use: %s'):format(param, msg)
+ end
+end
+
+local function merger_select_usage(param)
+ local msg = 'merger_source:select([{' ..
+ 'buffer = <cdata<struct ibuf>> or <nil>, ' ..
+ 'limit = <number> or <nil>}])'
+ if not param then
+ return ('Bad params, use: %s'):format(msg)
+ else
+ return ('Bad param "%s", use: %s'):format(param, msg)
+ end
+end
+
+-- Get buffer with data encoded without last 'trunc' bytes.
+local function truncated_msgpack_buffer(data, trunc)
+ local data = msgpackffi.encode(data)
+ data = data:sub(1, data:len() - trunc)
+ local len = data:len()
+ local buf = buffer.ibuf()
+ -- Ensure we have enough buffer to write len + trunc bytes.
+ buf:reserve(len + trunc)
+ local p = buf:alloc(len)
+ -- Ensure len bytes follows with trunc zero bytes.
+ ffi.copy(p, data .. string.rep('\0', trunc), len + trunc)
+ return buf
+end
+
+local function truncated_msgpack_source(data, trunc)
+ local buf = truncated_msgpack_buffer(data, trunc)
+ return merger.new_source_frombuffer(buf)
+end
+
+local bad_source_new_calls = {
+ {
+ 'Bad fetch iterator',
+ funcs = {'new_buffer_source', 'new_table_source',
+ 'new_tuple_source'},
+ params = {1},
+ exp_err = '^Usage: merger%.[a-z_]+%(gen, param, state%)$',
+ },
+ {
+ 'Bad chunk type',
+ funcs = {'new_source_frombuffer', 'new_source_fromtable'},
+ params = {1},
+ exp_err = '^Usage: merger%.[a-z_]+%(<.+>%)$',
+ },
+ {
+ 'Bad buffer chunk',
+ funcs = {'new_source_frombuffer'},
+ params = {ffi.new('char *')},
+ exp_err = '^Usage: merger%.[a-z_]+%(<cdata<struct ibuf>>%)$',
+ },
+}
+
+local bad_merger_new_calls = {
+ {
+ 'Bad opts',
+ sources = {},
+ opts = 1,
+ exp_err = merger_new_usage(nil),
+ },
+ {
+ 'Bad opts.reverse',
+ sources = {},
+ opts = {reverse = 1},
+ exp_err = merger_new_usage('reverse'),
+ },
+}
+
+local bad_merger_select_calls = {
+ {
+ 'Wrong source of table type',
+ sources = {merger.new_source_fromtable({1})},
+ opts = nil,
+ exp_err = 'A tuple or a table expected, got number',
+ },
+ {
+ 'Bad msgpack source: wrong length of the tuples array',
+ -- Remove the last tuple from msgpack data, but keep old
+ -- tuples array size.
+ sources = {
+ truncated_msgpack_source({{''}, {''}, {''}}, 2),
+ },
+ opts = {},
+ exp_err = 'Unexpected msgpack buffer end',
+ },
+ {
+ 'Bad msgpack source: wrong length of a tuple',
+ -- Remove half of the last tuple, but keep old tuple size.
+ sources = {
+ truncated_msgpack_source({{''}, {''}, {''}}, 1),
+ },
+ opts = {},
+ exp_err = 'Unexpected msgpack buffer end',
+ },
+ {
+ 'Bad opts.buffer (wrong type)',
+ sources = {},
+ opts = {buffer = 1},
+ exp_err = merger_select_usage('buffer'),
+ },
+ {
+ 'Bad opts.buffer (wrong cdata type)',
+ sources = {},
+ opts = {buffer = ffi.new('char *')},
+ exp_err = merger_select_usage('buffer'),
+ },
+ {
+ 'Bad opts.limit (wrong type)',
+ sources = {},
+ opts = {limit = 'hello'},
+ exp_err = merger_select_usage('limit'),
+ }
+}
+
+local schemas = {
+ {
+ name = 'small_unsigned',
+ parts = {
+ {
+ fieldno = 2,
+ type = 'unsigned',
+ }
+ },
+ gen_tuple = function(tupleno)
+ return {'id_' .. tostring(tupleno), tupleno}
+ end,
+ },
+ -- Test with N-1 equal parts and Nth different.
+ {
+ name = 'many_parts',
+ parts = (function()
+ local parts = {}
+ for i = 1, 16 do
+ parts[i] = {
+ fieldno = i,
+ type = 'unsigned',
+ }
+ end
+ return parts
+ end)(),
+ gen_tuple = function(tupleno)
+ local tuple = {}
+ -- 15 constant parts
+ for i = 1, 15 do
+ tuple[i] = i
+ end
+ -- 16th part is varying
+ tuple[16] = tupleno
+ return tuple
+ end,
+ -- reduce tuples count to decrease test run time
+ tuples_cnt = 16,
+ },
+ -- Test null value in nullable field of an index.
+ {
+ name = 'nullable',
+ parts = {
+ {
+ fieldno = 1,
+ type = 'unsigned',
+ },
+ {
+ fieldno = 2,
+ type = 'string',
+ is_nullable = true,
+ },
+ },
+ gen_tuple = function(i)
+ if i % 1 == 1 then
+ return {0, tostring(i)}
+ else
+ return {0, box.NULL}
+ end
+ end,
+ },
+ -- Test index part with 'collation_id' option (as in net.box's
+ -- response).
+ {
+ name = 'collation_id',
+ parts = {
+ {
+ fieldno = 1,
+ type = 'string',
+ collation_id = 2, -- unicode_ci
+ },
+ },
+ gen_tuple = function(i)
+ local letters = {'a', 'b', 'c', 'A', 'B', 'C'}
+ if i <= #letters then
+ return {letters[i]}
+ else
+ return {''}
+ end
+ end,
+ },
+ -- Test index part with 'collation' option (as in local index
+ -- parts).
+ {
+ name = 'collation',
+ parts = {
+ {
+ fieldno = 1,
+ type = 'string',
+ collation = 'unicode_ci',
+ },
+ },
+ gen_tuple = function(i)
+ local letters = {'a', 'b', 'c', 'A', 'B', 'C'}
+ if i <= #letters then
+ return {letters[i]}
+ else
+ return {''}
+ end
+ end,
+ },
+}
+
+local function is_unicode_ci_part(part)
+ return part.collation_id == 2 or part.collation == 'unicode_ci'
+end
+
+local function tuple_comparator(a, b, parts)
+ for _, part in ipairs(parts) do
+ local fieldno = part.fieldno
+ if a[fieldno] ~= b[fieldno] then
+ if a[fieldno] == nil then
+ return -1
+ end
+ if b[fieldno] == nil then
+ return 1
+ end
+ if is_unicode_ci_part(part) then
+ return utf8.casecmp(a[fieldno], b[fieldno])
+ end
+ return a[fieldno] < b[fieldno] and -1 or 1
+ end
+ end
+
+ return 0
+end
+
+local function sort_tuples(tuples, parts, opts)
+ local function tuple_comparator_wrapper(a, b)
+ local cmp = tuple_comparator(a, b, parts)
+ if cmp < 0 then
+ return not opts.reverse
+ elseif cmp > 0 then
+ return opts.reverse
+ else
+ return false
+ end
+ end
+
+ table.sort(tuples, tuple_comparator_wrapper)
+end
+
+local function lowercase_unicode_ci_fields(tuples, parts)
+ for i = 1, #tuples do
+ local tuple = tuples[i]
+ for _, part in ipairs(parts) do
+ if is_unicode_ci_part(part) then
+ -- Workaround #3709.
+ if tuple[part.fieldno]:len() > 0 then
+ tuple[part.fieldno] = utf8.lower(tuple[part.fieldno])
+ end
+ end
+ end
+ end
+end
+
+local function fetch_source_gen(param, state)
+ local input_type = param.input_type
+ local tuples = param.tuples
+ local last_pos = state.last_pos
+ local fetch_block_size = FETCH_BLOCK_SIZE
+ -- A chunk size is always 1 for a tuple source.
+ if input_type == 'tuple' then
+ fetch_block_size = 1
+ end
+ local data = fun.iter(tuples):drop(last_pos):take(
+ fetch_block_size):totable()
+ if #data == 0 then
+ return
+ end
+ local new_state = {last_pos = last_pos + #data}
+ if input_type == 'table' then
+ return new_state, data
+ elseif input_type == 'buffer' then
+ local buf = buffer.ibuf()
+ msgpackffi.internal.encode_r(buf, data, 0)
+ return new_state, buf
+ elseif input_type == 'tuple' then
+ assert(#data <= 1)
+ if #data == 0 then return end
+ return new_state, data[1]
+ else
+ assert(false)
+ end
+end
+
+local function fetch_source_iterator(input_type, tuples)
+ local param = {
+ input_type = input_type,
+ tuples = tuples,
+ }
+ local state = {
+ last_pos = 0,
+ }
+ return fetch_source_gen, param, state
+end
+
+local function prepare_data(schema, tuples_cnt, sources_cnt, opts)
+ local opts = opts or {}
+ local input_type = opts.input_type
+ local use_table_as_tuple = opts.use_table_as_tuple
+ local use_fetch_source = opts.use_fetch_source
+
+ local tuples = {}
+ local exp_result = {}
+
+ -- Ensure empty sources are empty table and not nil.
+ for i = 1, sources_cnt do
+ if tuples[i] == nil then
+ tuples[i] = {}
+ end
+ end
+
+ -- Prepare N tables with tuples as input for merger.
+ for i = 1, tuples_cnt do
+ -- [1, sources_cnt]
+ local guava = digest.guava(i, sources_cnt) + 1
+ local tuple = schema.gen_tuple(i)
+ table.insert(exp_result, tuple)
+ if not use_table_as_tuple then
+ assert(input_type ~= 'buffer')
+ tuple = box.tuple.new(tuple)
+ end
+ table.insert(tuples[guava], tuple)
+ end
+
+ -- Sort tuples within each source.
+ for _, source_tuples in pairs(tuples) do
+ sort_tuples(source_tuples, schema.parts, opts)
+ end
+
+ -- Sort expected result.
+ sort_tuples(exp_result, schema.parts, opts)
+
+ -- Fill sources.
+ local sources
+ if use_fetch_source then
+ sources = {}
+ for i = 1, sources_cnt do
+ local func = ('new_%s_source'):format(input_type)
+ sources[i] = merger[func](fetch_source_iterator(input_type,
+ tuples[i]))
+ end
+ elseif input_type == 'table' then
+ -- Imitate netbox's select w/o {buffer = ...}.
+ sources = {}
+ for i = 1, sources_cnt do
+ sources[i] = merger.new_source_fromtable(tuples[i])
+ end
+ elseif input_type == 'buffer' then
+ -- Imitate netbox's select with {buffer = ...}.
+ sources = {}
+ for i = 1, sources_cnt do
+ local buf = buffer.ibuf()
+ sources[i] = merger.new_source_frombuffer(buf)
+ msgpackffi.internal.encode_r(buf, tuples[i], 0)
+ end
+ elseif input_type == 'tuple' then
+ assert(false)
+ else
+ assert(false)
+ end
+
+ return sources, exp_result
+end
+
+local function test_case_opts_str(opts)
+ local params = {}
+
+ if opts.input_type then
+ table.insert(params, 'input_type: ' .. opts.input_type)
+ end
+
+ if opts.output_type then
+ table.insert(params, 'output_type: ' .. opts.output_type)
+ end
+
+ if opts.reverse then
+ table.insert(params, 'reverse')
+ end
+
+ if opts.use_table_as_tuple then
+ table.insert(params, 'use_table_as_tuple')
+ end
+
+ if opts.use_fetch_source then
+ table.insert(params, 'use_fetch_source')
+ end
+
+ if next(params) == nil then
+ return ''
+ end
+
+ return (' (%s)'):format(table.concat(params, ', '))
+end
+
+local function run_merger(test, schema, tuples_cnt, sources_cnt, opts)
+ fiber.yield()
+
+ local opts = opts or {}
+
+ -- Prepare data.
+ local sources, exp_result = prepare_data(schema, tuples_cnt, sources_cnt,
+ opts)
+
+ -- Create a merger instance.
+ local merger_inst = merger.new(schema.merger_context, sources,
+ {reverse = opts.reverse})
+
+ local res
+
+ -- Run merger and prepare output for compare.
+ if opts.output_type == 'table' then
+ -- Table output.
+ res = merger_inst:select()
+ elseif opts.output_type == 'buffer' then
+ -- Buffer output.
+ local obuf = buffer.ibuf()
+ merger_inst:select({buffer = obuf})
+ res = msgpackffi.decode(obuf.rpos)
+ else
+ -- Tuple output.
+ assert(opts.output_type == 'tuple')
+ res = merger_inst:pairs():totable()
+ end
+
+ -- A bit more postprocessing to compare.
+ for i = 1, #res do
+ if type(res[i]) ~= 'table' then
+ res[i] = res[i]:totable()
+ end
+ end
+
+ -- unicode_ci does not differentiate btw 'A' and 'a', so the
+ -- order is arbitrary. We transform fields with unicode_ci
+ -- collation in parts to lower case before comparing.
+ lowercase_unicode_ci_fields(res, schema.parts)
+ lowercase_unicode_ci_fields(exp_result, schema.parts)
+
+ test:is_deeply(res, exp_result,
+ ('check order on %3d tuples in %4d sources%s')
+ :format(tuples_cnt, sources_cnt, test_case_opts_str(opts)))
+end
+
+local function run_case(test, schema, opts)
+ local opts = opts or {}
+
+ local case_name = ('testing on schema %s%s'):format(
+ schema.name, test_case_opts_str(opts))
+ local tuples_cnt = schema.tuples_cnt or 100
+
+ local input_type = opts.input_type
+ local use_table_as_tuple = opts.use_table_as_tuple
+ local use_fetch_source = opts.use_fetch_source
+
+ -- Skip meaningless flags combinations.
+ if input_type == 'buffer' and not use_table_as_tuple then
+ return
+ end
+ if input_type == 'tuple' and not use_fetch_source then
+ return
+ end
+
+ test:test(case_name, function(test)
+ test:plan(4)
+
+ -- Check with small buffers count.
+ run_merger(test, schema, tuples_cnt, 1, opts)
+ run_merger(test, schema, tuples_cnt, 2, opts)
+ run_merger(test, schema, tuples_cnt, 5, opts)
+
+ -- Check more buffers then tuples count.
+ run_merger(test, schema, tuples_cnt, 128, opts)
+ end)
+end
+
+local test = tap.test('merger')
+test:plan(#bad_source_new_calls + #bad_merger_new_calls +
+ #bad_merger_select_calls + 6 + #schemas * 48)
+
+-- For collations.
+box.cfg{}
+
+for _, case in ipairs(bad_source_new_calls) do
+ test:test(case[1], function(test)
+ local funcs = case.funcs
+ test:plan(#funcs)
+ for _, func in ipairs(funcs) do
+ local ok, err = pcall(merger[func], unpack(case.params))
+ test:ok(ok == false and err:match(case.exp_err), func)
+ end
+ end)
+end
+
+-- Create the merger context for the test cases below.
+local ctx = merger.context.new(key_def.new({{
+ fieldno = 1,
+ type = 'string',
+}}))
+
+-- Bad merger.new() calls.
+for _, case in ipairs(bad_merger_new_calls) do
+ local ok, err = pcall(merger.new, ctx, case.sources, case.opts)
+ err = tostring(err) -- cdata -> string
+ test:is_deeply({ok, err}, {false, case.exp_err}, case[1])
+end
+
+-- Bad source or/and opts parameters for merger's methods.
+for _, case in ipairs(bad_merger_select_calls) do
+ local merger_inst = merger.new(ctx, case.sources)
+ local ok, err = pcall(merger_inst.select, merger_inst, case.opts)
+ err = tostring(err) -- cdata -> string
+ test:is_deeply({ok, err}, {false, case.exp_err}, case[1])
+end
+
+-- Create a merger context for each schema.
+for _, schema in ipairs(schemas) do
+ schema.merger_context = merger.context.new(key_def.new(schema.parts))
+end
+
+test:test('use a source in two mergers', function(test)
+ test:plan(5)
+
+ local data = {{'a'}, {'b'}, {'c'}}
+ local source = merger.new_source_fromtable(data)
+ local i1 = merger.new(ctx, {source}):pairs()
+ local i2 = merger.new(ctx, {source}):pairs()
+
+ local t1 = i1:head():totable()
+ test:is_deeply(t1, data[1], 'tuple 1 from merger 1')
+
+ local t3 = i2:head():totable()
+ test:is_deeply(t3, data[3], 'tuple 3 from merger 2')
+
+ local t2 = i1:head():totable()
+ test:is_deeply(t2, data[2], 'tuple 2 from merger 1')
+
+ test:ok(i1:is_null(), 'merger 1 ends')
+ test:ok(i2:is_null(), 'merger 2 ends')
+end)
+
+local function reusable_source_gen(param)
+ local chunks = param.chunks
+ local idx = param.idx or 1
+
+ if idx > table.maxn(chunks) then
+ return
+ end
+
+ local chunk = chunks[idx]
+ param.idx = idx + 1
+
+ if chunk == nil then
+ return
+ end
+ return box.NULL, chunk
+end
+
+local function verify_reusable_source(test, source)
+ test:plan(3)
+
+ local exp = {{1}, {2}}
+ local res = source:pairs():map(box.tuple.totable):totable()
+ test:is_deeply(res, exp, '1st use')
+
+ local exp = {{3}, {4}, {5}}
+ local res = source:pairs():map(box.tuple.totable):totable()
+ test:is_deeply(res, exp, '2nd use')
+
+ local exp = {}
+ local res = source:pairs():map(box.tuple.totable):totable()
+ test:is_deeply(res, exp, 'end')
+end
+
+test:test('reuse a tuple source', function(test)
+ local tuples = {{1}, {2}, nil, {3}, {4}, {5}}
+ local source = merger.new_tuple_source(reusable_source_gen,
+ {chunks = tuples})
+ verify_reusable_source(test, source)
+end)
+
+test:test('reuse a table source', function(test)
+ local chunks = {{{1}}, {{2}}, {}, nil, {{3}}, {{4}}, {}, {{5}}}
+ local source = merger.new_table_source(reusable_source_gen,
+ {chunks = chunks})
+ verify_reusable_source(test, source)
+end)
+
+test:test('reuse a buffer source', function(test)
+ local chunks_tbl = {{{1}}, {{2}}, {}, nil, {{3}}, {{4}}, {}, {{5}}}
+ local chunks = {}
+ for i = 1, table.maxn(chunks_tbl) do
+ if chunks_tbl[i] == nil then
+ chunks[i] = nil
+ else
+ chunks[i] = buffer.ibuf()
+ msgpackffi.internal.encode_r(chunks[i], chunks_tbl[i], 0)
+ end
+ end
+ local source = merger.new_buffer_source(reusable_source_gen,
+ {chunks = chunks})
+ verify_reusable_source(test, source)
+end)
+
+test:test('use limit', function(test)
+ test:plan(6)
+
+ local data = {{'a'}, {'b'}}
+
+ local source = merger.new_source_fromtable(data)
+ local m = merger.new(ctx, {source})
+ local res = m:select({limit = 0})
+ test:is(#res, 0, 'table output with limit 0')
+
+ local source = merger.new_source_fromtable(data)
+ local m = merger.new(ctx, {source})
+ local res = m:select({limit = 1})
+ test:is(#res, 1, 'table output with limit 1')
+ test:is_deeply(res[1]:totable(), data[1], 'tuple content')
+
+ local source = merger.new_source_fromtable(data)
+ local m = merger.new(ctx, {source})
+ local obuf = buffer.ibuf()
+ m:select({buffer = obuf, limit = 0})
+ local res = msgpackffi.decode(obuf.rpos)
+ test:is(#res, 0, 'buffer output with limit 0')
+
+ local source = merger.new_source_fromtable(data)
+ local m = merger.new(ctx, {source})
+ obuf:recycle()
+ m:select({buffer = obuf, limit = 1})
+ local res = msgpackffi.decode(obuf.rpos)
+ test:is(#res, 1, 'buffer output with limit 1')
+ test:is_deeply(res[1], data[1], 'tuple content')
+end)
+
+test:test('cascade mergers', function(test)
+ test:plan(2)
+
+ local data = {{'a'}, {'b'}}
+
+ local source = merger.new_source_fromtable(data)
+ local m1 = merger.new(ctx, {source})
+ local m2 = merger.new(ctx, {m1})
+
+ local res = m2:pairs():map(box.tuple.totable):totable()
+ test:is_deeply(res, data, 'same context')
+
+ local ctx_unicode = merger.context.new(key_def.new({{
+ fieldno = 1,
+ type = 'string',
+ collation = 'unicode',
+ }}))
+
+ local source = merger.new_source_fromtable(data)
+ local m1 = merger.new(ctx, {source})
+ local m2 = merger.new(ctx_unicode, {m1})
+
+ local res = m2:pairs():map(box.tuple.totable):totable()
+ test:is_deeply(res, data, 'different contexts')
+end)
+
+-- Merging cases.
+for _, input_type in ipairs({'buffer', 'table', 'tuple'}) do
+ for _, output_type in ipairs({'buffer', 'table', 'tuple'}) do
+ for _, reverse in ipairs({false, true}) do
+ for _, use_table_as_tuple in ipairs({false, true}) do
+ for _, use_fetch_source in ipairs({false, true}) do
+ for _, schema in ipairs(schemas) do
+ run_case(test, schema, {
+ input_type = input_type,
+ output_type = output_type,
+ reverse = reverse,
+ use_table_as_tuple = use_table_as_tuple,
+ use_fetch_source = use_fetch_source,
+ })
+ end
+ end
+ end
+ end
+ end
+end
+
+os.exit(test:check() and 0 or 1)
--
2.20.1
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] [PATCH v3 7/7] Add merger for tuple streams (Lua part)
2019-04-10 15:21 ` [PATCH v3 7/7] Add merger for tuple streams (Lua part) Alexander Turenko
@ 2019-04-25 11:46 ` Konstantin Osipov
2019-04-25 12:53 ` Alexander Turenko
2019-04-30 17:37 ` Vladimir Davydov
1 sibling, 1 reply; 39+ messages in thread
From: Konstantin Osipov @ 2019-04-25 11:46 UTC (permalink / raw)
To: tarantool-patches; +Cc: Vladimir Davydov, Alexander Turenko
* Alexander Turenko <alexander.turenko@tarantool.org> [19/04/10 18:23]:
>
The api is generally LGTM, one comment below:
> A merger is a special kind of a source, which is created from a key_def
> object and a set of sources. It performs a kind of the merge sort:
> chooses a source with a minimal / maximal tuple on each step, consumes
> a tuple from this source and repeats. The API to create a merger is the
> following:
>
> ```lua
> local ctx = merger.context.new(key_def.new(<...>))
> local sources = {<...>}
> local merger_inst = merger.new(ctx, sources, {
Why do you need a separate object used only to construct a
merger? Why not pass all parameters into merger.new?
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] [PATCH v3 7/7] Add merger for tuple streams (Lua part)
2019-04-25 11:46 ` [tarantool-patches] " Konstantin Osipov
@ 2019-04-25 12:53 ` Alexander Turenko
2019-04-25 13:30 ` Konstantin Osipov
0 siblings, 1 reply; 39+ messages in thread
From: Alexander Turenko @ 2019-04-25 12:53 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches, Vladimir Davydov
On Thu, Apr 25, 2019 at 02:46:59PM +0300, Konstantin Osipov wrote:
> * Alexander Turenko <alexander.turenko@tarantool.org> [19/04/10 18:23]:
> >
> The api is generally LGTM, one comment below:
>
> > A merger is a special kind of a source, which is created from a key_def
> > object and a set of sources. It performs a kind of the merge sort:
> > chooses a source with a minimal / maximal tuple on each step, consumes
> > a tuple from this source and repeats. The API to create a merger is the
> > following:
> >
> > ```lua
> > local ctx = merger.context.new(key_def.new(<...>))
> > local sources = {<...>}
> > local merger_inst = merger.new(ctx, sources, {
>
> Why do you need a separate object used only to construct a
> merger? Why not pass all parameters into merger.new?
A user may want to cache key_def + format creation when a schema changes
rarely. The original merger allows it:
https://github.com/tarantool/shard/blob/180948e99148973e89f75f8e4784315e183e3fa2/shard/init.lua#L1215-L1224
Even if one doesn't cache a merger context, but runs several merges over
one data stream (see the multiplexed cases in examples) and a schema is
the same, it worth to reuse the context.
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] [PATCH v3 7/7] Add merger for tuple streams (Lua part)
2019-04-25 12:53 ` Alexander Turenko
@ 2019-04-25 13:30 ` Konstantin Osipov
0 siblings, 0 replies; 39+ messages in thread
From: Konstantin Osipov @ 2019-04-25 13:30 UTC (permalink / raw)
To: Alexander Turenko; +Cc: tarantool-patches, Vladimir Davydov
* Alexander Turenko <alexander.turenko@tarantool.org> [19/04/25 15:57]:
> > The api is generally LGTM, one comment below:
> >
> > > A merger is a special kind of a source, which is created from a key_def
> > > object and a set of sources. It performs a kind of the merge sort:
> > > chooses a source with a minimal / maximal tuple on each step, consumes
> > > a tuple from this source and repeats. The API to create a merger is the
> > > following:
> > >
> > > ```lua
> > > local ctx = merger.context.new(key_def.new(<...>))
> > > local sources = {<...>}
> > > local merger_inst = merger.new(ctx, sources, {
> >
> > Why do you need a separate object used only to construct a
> > merger? Why not pass all parameters into merger.new?
>
> A user may want to cache key_def + format creation when a schema changes
> rarely. The original merger allows it:
> https://github.com/tarantool/shard/blob/180948e99148973e89f75f8e4784315e183e3fa2/shard/init.lua#L1215-L1224
>
> Even if one doesn't cache a merger context, but runs several merges over
> one data stream (see the multiplexed cases in examples) and a schema is
> the same, it worth to reuse the context.
I wouldn't bother with it.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [PATCH v3 7/7] Add merger for tuple streams (Lua part)
2019-04-10 15:21 ` [PATCH v3 7/7] Add merger for tuple streams (Lua part) Alexander Turenko
2019-04-25 11:46 ` [tarantool-patches] " Konstantin Osipov
@ 2019-04-30 17:37 ` Vladimir Davydov
2019-04-30 21:09 ` [tarantool-patches] " Konstantin Osipov
2019-05-07 22:14 ` Alexander Turenko
1 sibling, 2 replies; 39+ messages in thread
From: Vladimir Davydov @ 2019-04-30 17:37 UTC (permalink / raw)
To: Alexander Turenko; +Cc: tarantool-patches
On Wed, Apr 10, 2019 at 06:21:25PM +0300, Alexander Turenko wrote:
> A merger is a special kind of a source, which is created from a key_def
> object and a set of sources. It performs a kind of the merge sort:
> chooses a source with a minimal / maximal tuple on each step, consumes
> a tuple from this source and repeats. The API to create a merger is the
> following:
>
> ```lua
> local ctx = merger.context.new(key_def.new(<...>))
Discussed verbally, agreed that a performance gain from using
merger.context is somewhat dubious. It's compelling to drop it
altogether and pass key_def to merger.new directly, creating
a format every time (we can reuse formats if necessary in future).
Alexander will think about it.
Other than that the API and the patch looks good to me.
See a few minor comments below.
> local sources = {<...>}
> local merger_inst = merger.new(ctx, sources, {
> -- Ascending (false) or descending (true) order.
> -- Default is ascending.
> reverse = <boolean> or <nil>,
> })
> ```
> diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c
> new file mode 100644
> index 000000000..ebe60bc8d
> --- /dev/null
> +++ b/src/box/lua/merger.c
> @@ -0,0 +1,1184 @@
> +static uint32_t merger_source_type_id = 0;
> +static uint32_t merger_context_type_id = 0;
> +static uint32_t ibuf_type_id = 0;
We typically use upper case for naming variables storing Lua type ids,
e.g. CTID_STRUCT_TUPLE_REF.
> +static int
> +lbox_merger_source_gc(struct lua_State *L)
> +{
> + struct merger_source *source;
> + if ((source = luaT_check_merger_source(L, 1)) == NULL)
> + return 0;
Is it actually possible?
> + merger_source_unref(source);
> + return 0;
> +}
> +static struct merger_source **
> +luaT_merger_new_parse_sources(struct lua_State *L, int idx,
> + uint32_t *sources_count_ptr)
> +{
> + /* Allocate sources array. */
> + uint32_t sources_count = lua_objlen(L, idx);
> + const ssize_t sources_size =
> + sources_count * sizeof(struct merger_source *);
> + struct merger_source **sources =
> + (struct merger_source **) malloc(sources_size);
Pointless type conversion.
> + if (sources == NULL) {
> + diag_set(OutOfMemory, sources_size, "malloc", "sources");
> + return NULL;
> + }
> +
> + /* Save all sources. */
> + for (uint32_t i = 0; i < sources_count; ++i) {
> + lua_pushinteger(L, i + 1);
> + lua_gettable(L, idx);
> +
> + /* Extract a source from a Lua stack. */
> + struct merger_source *source = luaT_check_merger_source(L, -1);
> + if (source == NULL) {
> + free(sources);
> + diag_set(IllegalParams,
> + "Unknown source type at index %d", i + 1);
> + return NULL;
> + }
> + sources[i] = source;
> + }
> + lua_pop(L, sources_count);
> +
> + *sources_count_ptr = sources_count;
source_count
> +struct merger_source_buffer {
> + struct merger_source base;
> + /*
> + * A reference to a Lua iterator to fetch a next chunk of
> + * tuples.
> + */
> + struct luaL_iterator *fetch_it;
> + /*
> + * A reference a buffer with a current chunk of tuples.
A reference to a buffer storing the current chunk of tuples.
> + * It is needed to prevent LuaJIT from collecting the
> + * buffer while the source consider it as the current
> + * one.
> + */
> + int ref;
> + /*
> + * A buffer with a current chunk of tuples.
> + */
> + struct ibuf *buf;
> + /*
> + * A merger stops before end of a buffer when it is not
> + * the last merger in the chain.
> + */
> + size_t remaining_tuples_cnt;
Please rename to remaining_tuple_count.
> +static int
> +luaL_merger_source_buffer_fetch(struct merger_source_buffer *source)
> +{
> + struct lua_State *L = fiber()->storage.lua.stack;
> + int nresult = luaL_iterator_next(L, source->fetch_it);
> +
> + /* Handle a Lua error in a gen function. */
> + if (nresult == -1)
> + return -1;
> +
> + /* No more data: do nothing. */
> + if (nresult == 0)
> + return 0;
> +
> + /* Handle incorrect results count. */
> + if (nresult != 2) {
> + diag_set(IllegalParams, "Expected <state>, <buffer>, got %d "
> + "return values", nresult);
> + return -1;
> + }
> +
> + /* Set a new buffer as the current chunk. */
> + if (source->ref > 0)
> + luaL_unref(L, LUA_REGISTRYINDEX, source->ref);
> + lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */
> + source->ref = luaL_ref(L, LUA_REGISTRYINDEX);
> + source->buf = luaT_check_ibuf(L, -1);
> + assert(source->buf != NULL);
| tarantool> m = merger.new(c, {merger.new_buffer_source(box.space._space:pairs())})
| ---
| ...
|
| tarantool> m:select()
| luaL_merger_source_buffer_fetch: Assertion `source->buf != NULL' failed.
Please fail gracefully in this case.
> + lua_pop(L, nresult);
> +
> + /* Update remaining_tuples_cnt and skip the header. */
> + if (decode_header(source->buf, &source->remaining_tuples_cnt) != 0) {
> + diag_set(IllegalParams, "Invalid merger source %p",
> + &source->base);
> + return -1;
> + }
> + return 1;
> +}
> +
> +/* Virtual methods */
> +
> +static void
> +luaL_merger_source_buffer_delete(struct merger_source *base)
> +{
> + struct merger_source_buffer *source = container_of(base,
> + struct merger_source_buffer, base);
> +
> + assert(source->fetch_it != NULL);
> + luaL_iterator_delete(source->fetch_it);
> + if (source->ref > 0)
> + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref);
> +
> + free(source);
> +}
> +
> +static int
> +luaL_merger_source_buffer_next(struct merger_source *base,
> + box_tuple_format_t *format,
> + struct tuple **out)
> +{
> + struct merger_source_buffer *source = container_of(base,
> + struct merger_source_buffer, base);
> +
> + /*
> + * Handle the case when all data were processed: ask a
> + * next chunk until a non-empty chunk is received or a
> + * chunks iterator ends.
> + */
> + while (source->remaining_tuples_cnt == 0) {
> + int rc = luaL_merger_source_buffer_fetch(source);
> + if (rc < 0)
> + return -1;
> + if (rc == 0) {
> + *out = NULL;
> + return 0;
> + }
> + }
> + if (ibuf_used(source->buf) == 0) {
> + diag_set(IllegalParams, "Unexpected msgpack buffer end");
> + return -1;
> + }
> + const char *tuple_beg = source->buf->rpos;
> + const char *tuple_end = tuple_beg;
> + /*
> + * mp_next() is faster then mp_check(), but can read bytes
> + * outside of the buffer and so can cause segmentation
> + * faults or an incorrect result.
> + *
> + * We check buffer boundaries after the mp_next() call and
> + * throw an error when the boundaries are violated, but it
> + * does not save us from possible segmentation faults.
> + *
> + * It is in a user responsibility to provide valid
> + * msgpack.
Ugh, I'd check the buffer with mp_check anyway. Would probably provide
an option to skip the check if required ('unchecked'). Not sure if it's
really necessary though.
> + */
> + mp_next(&tuple_end);
> + --source->remaining_tuples_cnt;
> + if (tuple_end > source->buf->wpos) {
> + diag_set(IllegalParams, "Unexpected msgpack buffer end");
> + return -1;
> + }
> + source->buf->rpos = (char *) tuple_end;
> + if (format == NULL)
> + format = box_tuple_format_default();
I'd pass tuple_format_runtime explicitly to the ->next callback.
Special-casing NULL is kinda ugly.
> + struct tuple *tuple = box_tuple_new(format, tuple_beg, tuple_end);
> + if (tuple == NULL)
> + return -1;
> +
> + box_tuple_ref(tuple);
> + *out = tuple;
> + return 0;
> +}
> +static int
> +encode_result_buffer(struct lua_State *L, struct merger_source *source,
> + struct ibuf *obuf, uint32_t limit)
Better rename 'obuf' to 'out' or 'buf' or something - we have struct
obuf so 'struct ibuf *obuf' looks misleading.
> +{
> + uint32_t result_len = 0;
> + uint32_t result_len_offset = 4;
> +
> + /*
> + * Reserve maximum size for the array around resulting
> + * tuples to set it later.
> + */
> + encode_header(obuf, UINT32_MAX);
> +
> + /* Fetch, merge and copy tuples to the buffer. */
> + struct tuple *tuple;
> + int rc = 0;
> + while (result_len < limit && (rc =
> + source->vtab->next(source, NULL, &tuple)) == 0 &&
> + tuple != NULL) {
> + uint32_t bsize = tuple->bsize;
> + ibuf_reserve(obuf, bsize);
> + memcpy(obuf->wpos, tuple_data(tuple), bsize);
> + obuf->wpos += bsize;
> + result_len_offset += bsize;
> + ++result_len;
> +
> + /* The received tuple is not more needed. */
> + box_tuple_unref(tuple);
> + }
> +
> + if (rc != 0)
> + return luaT_error(L);
> +
> + /* Write the real array size. */
> + mp_store_u32(obuf->wpos - result_len_offset, result_len);
> +
> + return 0;
> +}
> +static int
> +lbox_merger_source_select(struct lua_State *L)
> +{
> + struct merger_source *source;
> + int top = lua_gettop(L);
> + bool ok = (top == 1 || top == 2) &&
> + /* Merger source. */
> + (source = luaT_check_merger_source(L, 1)) != NULL &&
> + /* Opts. */
> + (lua_isnoneornil(L, 2) == 1 || lua_istable(L, 2) == 1);
> + if (!ok)
> + return lbox_merger_source_select_usage(L, NULL);
> +
> + uint32_t limit = 0xFFFFFFFF;
Hmm, UINT32_MAX?
^ permalink raw reply [flat|nested] 39+ messages in thread
* [tarantool-patches] Re: [PATCH v3 7/7] Add merger for tuple streams (Lua part)
2019-04-30 17:37 ` Vladimir Davydov
@ 2019-04-30 21:09 ` Konstantin Osipov
2019-05-02 9:48 ` Vladimir Davydov
2019-05-07 22:14 ` Alexander Turenko
1 sibling, 1 reply; 39+ messages in thread
From: Konstantin Osipov @ 2019-04-30 21:09 UTC (permalink / raw)
To: tarantool-patches; +Cc: Alexander Turenko
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/04/30 20:38]:
Vova, did you see my review for merger and key_def patches? Do you
agree with it?
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v3 7/7] Add merger for tuple streams (Lua part)
2019-04-30 21:09 ` [tarantool-patches] " Konstantin Osipov
@ 2019-05-02 9:48 ` Vladimir Davydov
0 siblings, 0 replies; 39+ messages in thread
From: Vladimir Davydov @ 2019-05-02 9:48 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches, Alexander Turenko
On Wed, May 01, 2019 at 12:09:51AM +0300, Konstantin Osipov wrote:
> Vova, did you see my review for merger and key_def patches?
Yes.
> Do you agree with it?
Yes, mostly. We agreed with Alexander that it'd be better to rename
merger_source to merge_source and get rid of merger_context. Regarding
allocating a tuple in key_def.extract_key I think it's okay, because I
doubt that this function will be called often. If somebody requests it,
we can add a buffer parameter to store an extracted key as raw msgpack,
similar to how net_box.select handles it.
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [PATCH v3 7/7] Add merger for tuple streams (Lua part)
2019-04-30 17:37 ` Vladimir Davydov
2019-04-30 21:09 ` [tarantool-patches] " Konstantin Osipov
@ 2019-05-07 22:14 ` Alexander Turenko
1 sibling, 0 replies; 39+ messages in thread
From: Alexander Turenko @ 2019-05-07 22:14 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
Thanks!
I'll send 4th version of the patchset.
WBR, Alexander Turenko.
On Tue, Apr 30, 2019 at 08:37:24PM +0300, Vladimir Davydov wrote:
> On Wed, Apr 10, 2019 at 06:21:25PM +0300, Alexander Turenko wrote:
> > A merger is a special kind of a source, which is created from a key_def
> > object and a set of sources. It performs a kind of the merge sort:
> > chooses a source with a minimal / maximal tuple on each step, consumes
> > a tuple from this source and repeats. The API to create a merger is the
> > following:
> >
> > ```lua
> > local ctx = merger.context.new(key_def.new(<...>))
>
> Discussed verbally, agreed that a performance gain from using
> merger.context is somewhat dubious. It's compelling to drop it
> altogether and pass key_def to merger.new directly, creating
> a format every time (we can reuse formats if necessary in future).
> Alexander will think about it.
Okay. Removed.
I'll suggest to cache a key_def instead of a merge context in
tarantool-merger-examples.
>
> Other than that the API and the patch looks good to me.
>
> See a few minor comments below.
>
> > local sources = {<...>}
> > local merger_inst = merger.new(ctx, sources, {
> > -- Ascending (false) or descending (true) order.
> > -- Default is ascending.
> > reverse = <boolean> or <nil>,
> > })
> > ```
>
> > diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c
> > new file mode 100644
> > index 000000000..ebe60bc8d
> > --- /dev/null
> > +++ b/src/box/lua/merger.c
> > @@ -0,0 +1,1184 @@
>
> > +static uint32_t merger_source_type_id = 0;
> > +static uint32_t merger_context_type_id = 0;
> > +static uint32_t ibuf_type_id = 0;
>
> We typically use upper case for naming variables storing Lua type ids,
> e.g. CTID_STRUCT_TUPLE_REF.
Changed to CTID_STRUCT_IBUF and CTID_STRUCT_MERGE_SOURCE_REF.
>
> > +static int
> > +lbox_merger_source_gc(struct lua_State *L)
> > +{
> > + struct merger_source *source;
> > + if ((source = luaT_check_merger_source(L, 1)) == NULL)
> > + return 0;
>
> Is it actually possible?
No. It was carried from the original shard/driver.c, but tarantool's
code don't perform such checks. Replaced with assert.
>
> > + merger_source_unref(source);
> > + return 0;
> > +}
>
> > +static struct merger_source **
> > +luaT_merger_new_parse_sources(struct lua_State *L, int idx,
> > + uint32_t *sources_count_ptr)
> > +{
> > + /* Allocate sources array. */
> > + uint32_t sources_count = lua_objlen(L, idx);
> > + const ssize_t sources_size =
> > + sources_count * sizeof(struct merger_source *);
> > + struct merger_source **sources =
> > + (struct merger_source **) malloc(sources_size);
>
> Pointless type conversion.
Removed. Also changed the order from `count * sizeof(...)` to
`sizeof(...) * count` where we allocate arrays to do the arithmetic in
size_t. There is no real reason, just to do things in the right way.
Also fixed the place (it is this function) where I mistakely save an
array size into a ssize_t (signed) variable.
>
> > + if (sources == NULL) {
> > + diag_set(OutOfMemory, sources_size, "malloc", "sources");
> > + return NULL;
> > + }
> > +
> > + /* Save all sources. */
> > + for (uint32_t i = 0; i < sources_count; ++i) {
> > + lua_pushinteger(L, i + 1);
> > + lua_gettable(L, idx);
> > +
> > + /* Extract a source from a Lua stack. */
> > + struct merger_source *source = luaT_check_merger_source(L, -1);
> > + if (source == NULL) {
> > + free(sources);
> > + diag_set(IllegalParams,
> > + "Unknown source type at index %d", i + 1);
> > + return NULL;
> > + }
> > + sources[i] = source;
> > + }
> > + lua_pop(L, sources_count);
> > +
> > + *sources_count_ptr = sources_count;
>
> source_count
Changed all *_count names in that way and also changed all *_cnt to
*_count.
>
> > +struct merger_source_buffer {
> > + struct merger_source base;
> > + /*
> > + * A reference to a Lua iterator to fetch a next chunk of
> > + * tuples.
> > + */
> > + struct luaL_iterator *fetch_it;
> > + /*
> > + * A reference a buffer with a current chunk of tuples.
>
> A reference to a buffer storing the current chunk of tuples.
I don't get why 'THE current chunk', but changed as you suggested
anyway.
>
> > + * It is needed to prevent LuaJIT from collecting the
> > + * buffer while the source consider it as the current
> > + * one.
> > + */
> > + int ref;
> > + /*
> > + * A buffer with a current chunk of tuples.
> > + */
> > + struct ibuf *buf;
> > + /*
> > + * A merger stops before end of a buffer when it is not
> > + * the last merger in the chain.
> > + */
> > + size_t remaining_tuples_cnt;
>
> Please rename to remaining_tuple_count.
Done.
>
> > +static int
> > +luaL_merger_source_buffer_fetch(struct merger_source_buffer *source)
> > +{
> > + struct lua_State *L = fiber()->storage.lua.stack;
> > + int nresult = luaL_iterator_next(L, source->fetch_it);
> > +
> > + /* Handle a Lua error in a gen function. */
> > + if (nresult == -1)
> > + return -1;
> > +
> > + /* No more data: do nothing. */
> > + if (nresult == 0)
> > + return 0;
> > +
> > + /* Handle incorrect results count. */
> > + if (nresult != 2) {
> > + diag_set(IllegalParams, "Expected <state>, <buffer>, got %d "
> > + "return values", nresult);
> > + return -1;
> > + }
> > +
> > + /* Set a new buffer as the current chunk. */
> > + if (source->ref > 0)
> > + luaL_unref(L, LUA_REGISTRYINDEX, source->ref);
> > + lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */
> > + source->ref = luaL_ref(L, LUA_REGISTRYINDEX);
> > + source->buf = luaT_check_ibuf(L, -1);
> > + assert(source->buf != NULL);
>
> | tarantool> m = merger.new(c, {merger.new_buffer_source(box.space._space:pairs())})
> | ---
> | ...
> |
> | tarantool> m:select()
> | luaL_merger_source_buffer_fetch: Assertion `source->buf != NULL' failed.
>
> Please fail gracefully in this case.
Nice catch. Fixed. Added 'bad_chunks' test cases.
>
> > + lua_pop(L, nresult);
> > +
> > + /* Update remaining_tuples_cnt and skip the header. */
> > + if (decode_header(source->buf, &source->remaining_tuples_cnt) != 0) {
> > + diag_set(IllegalParams, "Invalid merger source %p",
> > + &source->base);
> > + return -1;
> > + }
> > + return 1;
> > +}
> > +
> > +/* Virtual methods */
> > +
> > +static void
> > +luaL_merger_source_buffer_delete(struct merger_source *base)
> > +{
> > + struct merger_source_buffer *source = container_of(base,
> > + struct merger_source_buffer, base);
> > +
> > + assert(source->fetch_it != NULL);
> > + luaL_iterator_delete(source->fetch_it);
> > + if (source->ref > 0)
> > + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref);
> > +
> > + free(source);
> > +}
> > +
> > +static int
> > +luaL_merger_source_buffer_next(struct merger_source *base,
> > + box_tuple_format_t *format,
> > + struct tuple **out)
> > +{
> > + struct merger_source_buffer *source = container_of(base,
> > + struct merger_source_buffer, base);
> > +
> > + /*
> > + * Handle the case when all data were processed: ask a
> > + * next chunk until a non-empty chunk is received or a
> > + * chunks iterator ends.
> > + */
> > + while (source->remaining_tuples_cnt == 0) {
> > + int rc = luaL_merger_source_buffer_fetch(source);
> > + if (rc < 0)
> > + return -1;
> > + if (rc == 0) {
> > + *out = NULL;
> > + return 0;
> > + }
> > + }
> > + if (ibuf_used(source->buf) == 0) {
> > + diag_set(IllegalParams, "Unexpected msgpack buffer end");
> > + return -1;
> > + }
> > + const char *tuple_beg = source->buf->rpos;
> > + const char *tuple_end = tuple_beg;
> > + /*
> > + * mp_next() is faster then mp_check(), but can read bytes
> > + * outside of the buffer and so can cause segmentation
> > + * faults or an incorrect result.
> > + *
> > + * We check buffer boundaries after the mp_next() call and
> > + * throw an error when the boundaries are violated, but it
> > + * does not save us from possible segmentation faults.
> > + *
> > + * It is in a user responsibility to provide valid
> > + * msgpack.
>
> Ugh, I'd check the buffer with mp_check anyway. Would probably provide
> an option to skip the check if required ('unchecked'). Not sure if it's
> really necessary though.
Okay.
>
> > + */
> > + mp_next(&tuple_end);
> > + --source->remaining_tuples_cnt;
> > + if (tuple_end > source->buf->wpos) {
> > + diag_set(IllegalParams, "Unexpected msgpack buffer end");
> > + return -1;
> > + }
> > + source->buf->rpos = (char *) tuple_end;
> > + if (format == NULL)
> > + format = box_tuple_format_default();
>
> I'd pass tuple_format_runtime explicitly to the ->next callback.
> Special-casing NULL is kinda ugly.
NULL value for a format has the special meaning for source->next(): it
does not matter for a caller in which format a resulting tuple will be
(see the comment for next() in struct merge_source_vtab). It is not
always means that a tuple will be in the runtime format: say, for a
tuple source it means that it will return tuples in its original format.
The same is applicable for a table source when a chunk contains tuples
(not Lua tables).
This does not matter much for a buffer source, but it is the part of a
source contract in general. I'll leave it as is if you don't mind.
>
> > + struct tuple *tuple = box_tuple_new(format, tuple_beg, tuple_end);
> > + if (tuple == NULL)
> > + return -1;
> > +
> > + box_tuple_ref(tuple);
> > + *out = tuple;
> > + return 0;
> > +}
>
> > +static int
> > +encode_result_buffer(struct lua_State *L, struct merger_source *source,
> > + struct ibuf *obuf, uint32_t limit)
>
> Better rename 'obuf' to 'out' or 'buf' or something - we have struct
> obuf so 'struct ibuf *obuf' looks misleading.
We use 'buf' where reading from it, so I would use some another name
when writing. 'out' looks more appropriate for a scalar / pointer value
we'll replace in a function. I'll use 'output_buffer' here and in other
such places in C and Lua if you don't mind.
>
> > +{
> > + uint32_t result_len = 0;
> > + uint32_t result_len_offset = 4;
> > +
> > + /*
> > + * Reserve maximum size for the array around resulting
> > + * tuples to set it later.
> > + */
> > + encode_header(obuf, UINT32_MAX);
> > +
> > + /* Fetch, merge and copy tuples to the buffer. */
> > + struct tuple *tuple;
> > + int rc = 0;
> > + while (result_len < limit && (rc =
> > + source->vtab->next(source, NULL, &tuple)) == 0 &&
> > + tuple != NULL) {
> > + uint32_t bsize = tuple->bsize;
> > + ibuf_reserve(obuf, bsize);
> > + memcpy(obuf->wpos, tuple_data(tuple), bsize);
> > + obuf->wpos += bsize;
> > + result_len_offset += bsize;
> > + ++result_len;
> > +
> > + /* The received tuple is not more needed. */
> > + box_tuple_unref(tuple);
> > + }
> > +
> > + if (rc != 0)
> > + return luaT_error(L);
> > +
> > + /* Write the real array size. */
> > + mp_store_u32(obuf->wpos - result_len_offset, result_len);
> > +
> > + return 0;
> > +}
>
> > +static int
> > +lbox_merger_source_select(struct lua_State *L)
> > +{
> > + struct merger_source *source;
> > + int top = lua_gettop(L);
> > + bool ok = (top == 1 || top == 2) &&
> > + /* Merger source. */
> > + (source = luaT_check_merger_source(L, 1)) != NULL &&
> > + /* Opts. */
> > + (lua_isnoneornil(L, 2) == 1 || lua_istable(L, 2) == 1);
> > + if (!ok)
> > + return lbox_merger_source_select_usage(L, NULL);
> > +
> > + uint32_t limit = 0xFFFFFFFF;
>
> Hmm, UINT32_MAX?
Okay, changed.
^ permalink raw reply [flat|nested] 39+ messages in thread