From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Alexander Turenko Subject: [PATCH 3/3] Add merger for tuple streams Date: Sun, 16 Dec 2018 23:17:26 +0300 Message-Id: <29bd3bbb8544f5b0e070e6b7880dad71f709a7d9.1544989900.git.alexander.turenko@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit To: Vladimir Davydov Cc: Alexander Turenko , tarantool-patches@freelists.org List-ID: Fixes #3276. --- src/CMakeLists.txt | 2 + src/lua/init.c | 5 + src/lua/merger.c | 1643 ++++++++++++++++++++++++++++++++++ src/lua/merger.h | 39 + src/lua/merger.lua | 19 + test/app-tap/merger.test.lua | 693 ++++++++++++++ test/app-tap/suite.ini | 1 + 7 files changed, 2402 insertions(+) create mode 100644 src/lua/merger.c create mode 100644 src/lua/merger.h create mode 100644 src/lua/merger.lua create mode 100755 test/app-tap/merger.test.lua diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 04de5ad04..d2a5fc9c1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -47,6 +47,7 @@ lua_source(lua_sources lua/trigger.lua) lua_source(lua_sources lua/table.lua) lua_source(lua_sources ../third_party/luafun/fun.lua) lua_source(lua_sources lua/httpc.lua) +lua_source(lua_sources lua/merger.lua) lua_source(lua_sources lua/iconv.lua) # LuaJIT jit.* library lua_source(lua_sources "${CMAKE_BINARY_DIR}/third_party/luajit/src/jit/bc.lua") @@ -181,6 +182,7 @@ set (server_sources lua/fio.c lua/crypto.c lua/httpc.c + lua/merger.c lua/utf8.c lua/info.c ${lua_sources} diff --git a/src/lua/init.c b/src/lua/init.c index ca4b47f3a..1eeab02b9 100644 --- a/src/lua/init.c +++ b/src/lua/init.c @@ -57,6 +57,7 @@ #include "lua/pickle.h" #include "lua/fio.h" #include "lua/httpc.h" +#include "lua/merger.h" #include "lua/utf8.h" #include "digest.h" #include @@ -89,6 +90,7 @@ extern char strict_lua[], errno_lua[], fiber_lua[], httpc_lua[], + merger_lua[], log_lua[], uri_lua[], socket_lua[], @@ -148,6 +150,7 @@ static const char *lua_modules[] = { "internal.trigger", trigger_lua, "pwd", pwd_lua, "http.client", httpc_lua, + "merger", merger_lua, "iconv", iconv_lua, /* jit.* library */ "jit.vmdef", vmdef_lua, @@ -452,6 +455,8 @@ tarantool_lua_init(const char *tarantool_bin, int argc, char **argv) tarantool_lua_digest_init(L); luaopen_http_client_driver(L); lua_pop(L, 1); + luaopen_merger(L); + lua_pop(L, 1); luaopen_msgpack(L); lua_pop(L, 1); luaopen_yaml(L); diff --git a/src/lua/merger.c b/src/lua/merger.c new file mode 100644 index 000000000..8caf8d47f --- /dev/null +++ b/src/lua/merger.c @@ -0,0 +1,1643 @@ +/* + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +/** + * API and basic usage + * ------------------- + * + * The following example demonstrates API of the module: + * + * ``` + * local net_box = require('net.box') + * local buffer = require('buffer') + * local merger = require('merger') + * + * -- The format of key_parts parameter is the same as + * -- `{box,conn}.space.<...>.index.<...>.parts` (where conn is + * -- net.box connection). + * local key_parts = { + * { + * fieldno = , + * type = , + * [ is_nullable = , ] + * [ collation_id = , ] + * [ collation = , ] + * }, + * ... + * } + * + * -- Create the merger instance. + * local merger_inst = merger.new(key_parts) + * + * -- Optional parameters. + * local opts = { + * -- Output buffer, only for merger_inst:select(<...>). + * [ buffer = , ] + * -- Ascending (default) or descending result order. + * [ descending = , ] + * -- Buffer encoding / decoding options are described below. + * [ decode = 'raw' / 'select' / 'call' / 'chain', ] + * [ encode = 'raw' / 'select' / 'call' / 'chain', ] + * [ encode_chain_len = , ] + * } + * + * -- Prepare buffer source. + * local conn = net_box.connect('localhost:3301') + * local buf = buffer.ibuf() + * conn.space.s:select(nil, {buffer = buf}) -- read to buffer + * + * -- We have three sources here. + * local sources = { + * buf, -- buffer source + * box.space.s:select(), -- table source + * {box.space.s:pairs()}, -- iterator source + * } + * + * -- Read the whole result at once. + * local res = merger_inst:select(sources, opts) + * + * -- Read the result tuple per tuple. + * local res = {} + * for _, tuple in merger_inst:pairs(sources, opts) do + * -- Some stop merge condition. + * if tuple[1] > MAX_VALUE then break end + * table.insert(res, tuple) + * end + * + * -- The same in the functional style. + * local function cond(tuple) + * return tuple[1] <= MAX_VALUE + * end + * local res = merger_inst:pairs(sources, opts):take(cond):totable() + * ``` + * + * The basic case of using merger is when there are M storages and + * data are partitioned (sharded) across them. A client want to + * fetch the data (tuples stream) from each storage and merge them + * into one tuple stream: + * + * ``` + * local net_box = require('net.box') + * local buffer = require('buffer') + * local merger = require('merger') + * + * -- Prepare M sources. + * local connects = { + * net_box.connect('localhost:3301'), + * net_box.connect('localhost:3302'), + * ... + * net_box.connect('localhost:<...>'), + * } + * local sources = {} + * for _, conn in ipairs(connects) do + * local buf = buffer.ibuf() + * conn.space.<...>.index.<...>:select(<...>, {buffer = buf}) + * table.insert(sources, buf) + * end + * + * -- See the 'Notes...' section below. + * local key_parts = {} + * local space = connects[1].space.<...> + * local index = space.index.<...> + * for _, part in ipairs(index.parts) do + * table.insert(key_parts, part) + * end + * if not index.unique then + * for _, part in ipairs(space.index[0]) do + * table.insert(key_parts, part) + * end + * end + * + * -- Create the merger instance. + * local merger_inst = merger.new(key_parts) + * + * -- Merge. + * local res = merger_inst:select(sources) + * ``` + * + * Notes re source sorting and key parts + * ------------------------------------- + * + * The merger expects that each source tuples stream is sorted + * according to provided key parts and perform a kind of merge + * sort (choose minimal / maximal tuple across sources on each + * step). Tuples from select() from Tarantool's space are sorted + * according to key parts from index that was used. When secondary + * non-unique index is used tuples are sorted according to the key + * parts of the secondary index and, then, key parts of the + * primary index. + * + * Decoding / encoding buffers + * --------------------------- + * + * A select response has the following structure: + * `{[48] = {tuples}}`, while a call response is + * `{[48] = {{tuples}}}` (because it should support multiple + * return values). A user should specify how merger will + * operate on buffers, so merger has `decode` (how to read buffer + * sources) and `encode` (how to write to a resulting buffer) + * options. These options accept the following values: + * + * Option value | Buffer structure + * ------------------ | ---------------- + * 'raw' | tuples + * 'select' (default) | {[48] = {tuples}} + * 'call' | {[48] = {{tuples}}} + * 'chain' | {[48] = {{{tuples, ...}}}} + * + * tuples is array of tuples. 'raw' and 'chain' options are about chaining + * mergers and they are described in the following section. + * + * How to check buffer data structure myself: + * + * ``` + * #!usr/bin/env tarantool + * + * local net_box = require('net.box') + * local buffer = require('buffer') + * local ffi = require('ffi') + * local msgpack = require('msgpack') + * local yaml = require('yaml') + * + * box.cfg{listen = 3301} + * box.once('load_data', function() + * box.schema.user.grant('guest', 'read,write,execute', 'universe') + * box.schema.space.create('s') + * box.space.s:create_index('pk') + * box.space.s:insert({1}) + * box.space.s:insert({2}) + * box.space.s:insert({3}) + * box.space.s:insert({4}) + * end) + * + * local function foo() + * return box.space.s:select() + * end + * _G.foo = foo + * + * local conn = net_box.connect('localhost:3301') + * + * local buf = buffer.ibuf() + * conn.space.s:select(nil, {buffer = buf}) + * local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos) + * local buf_lua = msgpack.decode(buf_str) + * print('select:\n' .. yaml.encode(buf_lua)) + * + * local buf = buffer.ibuf() + * conn:call('foo', nil, {buffer = buf}) + * local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos) + * local buf_lua = msgpack.decode(buf_str) + * print('call:\n' .. yaml.encode(buf_lua)) + * + * os.exit() + * ``` + * + * The `decode` option changes decoding algorithm of source + * buffers and does nothing for sources of other types. It will be + * completely ignored if there are no buffer sources. + * + * The `encode` option changes encoding algorithm of resulting + * buffer. When the option is provided, the `buffer` option should + * be provided too. When `encode` is 'chain', the + * `encode_chain_len` option is mandatory. + * + * Chaining mergers + * ---------------- + * + * Chaining mergers is needed to process a batch select request, + * when one response (buffer) contains several results (tuple + * arrays) to merge with another responses of this kind. Reshaping + * of such results into separate buffers or lua table would lead + * to extra data copies within Lua memory and extra time consuming + * msgpack decoding, so the merger supports this case of source + * data shape natively. + * + * When the `decode` option is 'select' (or nil) or 'call' the + * merger expects a usual net.box's select / call result in each + * of source buffers. + * + * When the `decode` option is 'chain' or 'raw' the merger expects + * an array of results instead of just result. Pass 'chain' for + * the first `:select()` (or `:pairs()`) call and 'raw' for the + * following ones. It is possible (but not mandatory) to use + * different mergers for each result, just reuse the same buffers + * for consequent calls. + * + * ``` + * -- Storage script + * -- -------------- + * + * -- Return N results in a table. + * -- Each result is table of tuples. + * local function batch_select(<...>) + * local res = {} + * for i = 1, N do + * local tuples = box.space.<...>:select(<...>) + * table.insert(res, tuples) + * end + * return res + * end + * + * -- Expose to call it using net.box. + * _G.batch_select = batch_select + * + * -- Client script + * -- ------------- + * + * local net_box = require('net.box') + * local buffer = require('buffer') + * local merger = require('merger') + * + * -- Prepare M sources. + * local connects = <...> + * local sources = {} + * for _, conn in ipairs(connects) do + * local buf = buffer.ibuf() + * conn:call('batch_select', <...>, {buffer = buf}) + * table.insert(sources, buf) + * end + * + * -- Now we have M sources and each have N results. We want to + * -- merge all 1st results, all 2nd results, ..., all Nth + * -- results. + * + * local merger_inst = merger.new(<...>) + * + * local res = {} + * for i = 1, N do + * -- We use the same merger instance for each merge, but it + * -- is possible to use different ones. + * local tuples = merger_inst:select(sources, { + * decode = i == 1 and 'chain' or 'raw', + * }) + * table.insert(res, tuples) + * end + * ``` + * + * When `buffer` option is passed it is possible to write results + * of several consequent merges into that buffer in the format + * another merger can accept (see cascading mergers below for the + * idea). Set `encode` to 'chain' to encode the first result + * and to 'raw' to encode consequent results. It is necessary to + * also set `encode_chain_len`, because size of resulting array + * is not known to a merger when it writes the first result. + * + * Constraints are: + * + * - `decode` option influences only on buffer sources + * interpretation (ignored for sources of other types). + * - `encode_*` options are applicable only when `buffer` + * options is provided. + * + * Cascading mergers + * ----------------- + * + * The idea is simple: the merger output formats are the same as + * source formats, so it is possible to merge results of previous + * merges. + * + * The example below is synthetic to be simple. Real cases when + * cascading can be profitable likely involve additional layers + * of Tarantool instances between storages and clients or separate + * threads to merge blocks of each level. + * + * To be honest no one use this ability for now. It exists, + * because the same input and output formats looks as good + * property of the API. + * + * ``` + * <...requires...> + * + * local sources = <...100 buffers...> + * local merger_inst = merger.new(<...>) + * + * -- We use buffer sources at 1st and 2nd merge layers, but read + * -- the final result as the table. + * + * local sources_level_2 = {} + * for i = 1, 10 do + * -- Take next 10 first level sources. + * local sources_level_1 = {} + * for j = 1, 10 do + * sources_level_1[j] = sources[(i - 1) * 10 + j] + * end + * + * -- Merge 10 sources into a second level source. + * local result_level_1 = buffer.ibuf() + * merger_inst:select(sources_level_1, {buffer = result_level_1}) + * sources_level_2[i] = result_level_1 + * end + * + * local res = merger_inst:select(sources_level_2) + * ``` + */ + +#include +#include +#include +#include + +#include +#include + +#include "lua/error.h" +#include "lua/utils.h" +#include "small/ibuf.h" +#include "msgpuck.h" +#include "mpstream.h" +#include "lua/msgpack.h" + +#define HEAP_FORWARD_DECLARATION +#include "salad/heap.h" + +#include "box/iproto_constants.h" /* IPROTO_DATA */ +#include "box/field_def.h" +#include "box/key_def.h" +#include "box/schema_def.h" +#include "box/tuple.h" +#include "box/lua/tuple.h" +#include "box/box.h" +#include "box/index.h" +#include "box/coll_id_cache.h" +#include "lua/lua_iterator.h" +#include "diag.h" + +#ifndef NDEBUG +#include "say.h" +/** + * Heap insert/delete/update macros wrapped with debug prints. + */ +#define MERGER_HEAP_INSERT(heap, hnode, source) do { \ + say_debug("merger: [source %p] insert: tuple: %s", (source), \ + tuple_str((source)->tuple)); \ + merger_heap_insert((heap), (hnode)); \ +} while(0) +#define MERGER_HEAP_DELETE(heap, hnode, source) do { \ + say_debug("merger: [source %p] delete", (source)); \ + merger_heap_delete((heap), (hnode)); \ +} while(0) +#define MERGER_HEAP_UPDATE(heap, hnode, source) do { \ + say_debug("merger: [source %p] update: tuple: %s", (source), \ + tuple_str((source)->tuple)); \ + merger_heap_update((heap), (hnode)); \ +} while(0) +#else /* !defined(NDEBUG) */ +/** + * Heap insert/delete/update macros wrappers w/o debug prints. + */ +#define MERGER_HEAP_INSERT(heap, hnode, source) do { \ + merger_heap_insert((heap), (hnode)); \ +} while(0) +#define MERGER_HEAP_DELETE(heap, hnode, source) do { \ + merger_heap_delete((heap), (hnode)); \ +} while(0) +#define MERGER_HEAP_UPDATE(heap, hnode, source) do { \ + merger_heap_update((heap), (hnode)); \ +} while(0) +#endif /* !defined(NDEBUG) */ + +/** + * Helper macros to push / throw out of memory errors to Lua. + */ +#define push_out_of_memory_error(L, size, what_name) do { \ + diag_set(OutOfMemory, (size), "malloc", (what_name)); \ + luaT_pusherror(L, diag_last_error(diag_get())); \ +} while(0) +#define throw_out_of_memory_error(L, size, what_name) do { \ + diag_set(OutOfMemory, (size), "malloc", (what_name)); \ + luaT_error(L); \ + unreachable(); \ + return -1; \ +} while(0) + +#define BOX_COLLATION_NAME_INDEX 1 + +/** + * A type of data structure that holds source data. + */ +enum merger_source_type { + SOURCE_TYPE_BUFFER, + SOURCE_TYPE_TABLE, + SOURCE_TYPE_ITERATOR, + SOURCE_TYPE_NONE, +}; + +/** + * How data are encoded in a buffer. + * + * `decode` and `encode` options are parsed to values of this + * enum. + */ +enum merger_buffer_type { + BUFFER_TYPE_RAW, + BUFFER_TYPE_SELECT, + BUFFER_TYPE_CALL, + BUFFER_TYPE_CHAIN, + BUFFER_TYPE_NONE, +}; + +/** + * Hold state of a merge source. + */ +struct merger_source { + /* + * A source is the heap node. Compared by the next tuple. + */ + struct heap_node hnode; + /* Union determinant. */ + enum merger_source_type type; + /* Fields specific for certaint source types. */ + union { + /* Buffer source. */ + struct { + struct ibuf *buf; + /* + * A merger stops before end of a buffer + * when it is not the last merger in the + * chain. + */ + size_t remaining_tuples_cnt; + } buf; + /* Table source. */ + struct { + int ref; + int next_idx; + } tbl; + /* Iterator source. */ + struct { + struct lua_iterator *it; + } it; + }; + /* Next tuple. */ + struct tuple *tuple; +}; + +/** + * Holds immutable parameters of a merger. + */ +struct merger { + struct key_def *key_def; + box_tuple_format_t *format; +}; + +/** + * Holds parameters of merge process, sources, result storage + * (if any), heap of sources and utility flags / counters. + */ +struct merger_iterator { + /* Heap of sources. */ + heap_t heap; + /* + * key_def is copied from merger. + * + * A merger can be collected by LuaJIT GC independently + * from a merger_iterator, so we cannot just save pointer + * to merger here and so we copy key_def from merger. + */ + struct key_def *key_def; + /* Parsed sources and decoding parameters. */ + uint32_t sources_count; + struct merger_source **sources; + enum merger_buffer_type decode; + /* Ascending / descending order. */ + int order; + /* Optional output buffer and encoding parameters. */ + struct ibuf *obuf; + enum merger_buffer_type encode; + uint32_t encode_chain_len; +}; + +static uint32_t merger_type_id = 0; +static uint32_t merger_iterator_type_id = 0; +static uint32_t ibuf_type_id = 0; + +/* Forward declarations. */ +static bool +source_less(const heap_t *heap, const struct heap_node *a, + const struct heap_node *b); +static int +lbox_merger_gc(struct lua_State *L); +static void +merger_iterator_delete(struct lua_State *L, struct merger_iterator *it); +static int +lbox_merger_iterator_gc(struct lua_State *L); + +#define HEAP_NAME merger_heap +#define HEAP_LESS source_less +#include "salad/heap.h" + +/** + * Create the new tuple with specific format from a Lua table or a + * tuple. + * + * In case of an error push the error message to the Lua stack and + * return NULL. + */ +static struct tuple * +luaT_gettuple_with_format(struct lua_State *L, int idx, + box_tuple_format_t *format) +{ + struct tuple *tuple; + if (lua_istable(L, idx)) { + /* Based on lbox_tuple_new() code. */ + struct ibuf *buf = tarantool_lua_ibuf; + ibuf_reset(buf); + struct mpstream stream; + mpstream_init(&stream, buf, ibuf_reserve_cb, ibuf_alloc_cb, + luamp_error, L); + luamp_encode_tuple(L, luaL_msgpack_default, &stream, idx); + mpstream_flush(&stream); + tuple = box_tuple_new(format, buf->buf, + buf->buf + ibuf_used(buf)); + if (tuple == NULL) { + luaT_pusherror(L, diag_last_error(diag_get())); + return NULL; + } + ibuf_reinit(tarantool_lua_ibuf); + return tuple; + } + tuple = luaT_istuple(L, idx); + if (tuple == NULL) { + lua_pushfstring(L, "A tuple or a table expected, got %s", + lua_typename(L, lua_type(L, -1))); + return NULL; + } + /* + * Create the new tuple with the format necessary for fast + * comparisons. + */ + const char *tuple_beg = tuple_data(tuple); + const char *tuple_end = tuple_beg + tuple->bsize; + tuple = box_tuple_new(format, tuple_beg, tuple_end); + if (tuple == NULL) { + luaT_pusherror(L, diag_last_error(diag_get())); + return NULL; + } + return tuple; +} + +/** + * Data comparing function to construct heap of sources. + */ +static bool +source_less(const heap_t *heap, const struct heap_node *a, + const struct heap_node *b) +{ + struct merger_source *left = container_of(a, struct merger_source, + hnode); + struct merger_source *right = container_of(b, struct merger_source, + hnode); + if (left->tuple == NULL && right->tuple == NULL) + return false; + if (left->tuple == NULL) + return false; + if (right->tuple == NULL) + return true; + struct merger_iterator *it = container_of(heap, struct merger_iterator, + heap); + return it->order * box_tuple_compare(left->tuple, right->tuple, + it->key_def) < 0; +} + +/** + * Update source->tuple of specific source. + * + * Increases the reference counter of the tuple. + * + * Return 0 when successfully fetched a tuple or NULL. In case of + * an error push an error message to the Lua stack and return 1. + */ +static int +source_fetch(struct lua_State *L, struct merger_source *source, + box_tuple_format_t *format) +{ + source->tuple = NULL; + + switch (source->type) { + case SOURCE_TYPE_BUFFER: { + if (source->buf.remaining_tuples_cnt == 0) + return 0; + --source->buf.remaining_tuples_cnt; + if (ibuf_used(source->buf.buf) == 0) { + lua_pushstring(L, "Unexpected msgpack buffer end"); + return 1; + } + const char *tuple_beg = source->buf.buf->rpos; + const char *tuple_end = tuple_beg; + /* + * mp_next() is faster then mp_check(), but can + * read bytes outside of the buffer and so can + * cause segmentation faults or incorrect result. + * + * We check buffer boundaries after the mp_next() + * call and throw an error when the boundaries are + * violated, but it does not save us from possible + * segmentation faults. + * + * It is in a user responsibility to provide valid + * msgpack. + */ + mp_next(&tuple_end); + if (tuple_end > source->buf.buf->wpos) { + lua_pushstring(L, "Unexpected msgpack buffer end"); + return 1; + } + source->buf.buf->rpos = (char *) tuple_end; + source->tuple = box_tuple_new(format, tuple_beg, tuple_end); + if (source->tuple == NULL) { + luaT_pusherror(L, diag_last_error(diag_get())); + return 1; + } + break; + } + case SOURCE_TYPE_TABLE: { + lua_rawgeti(L, LUA_REGISTRYINDEX, source->tbl.ref); + lua_pushinteger(L, source->tbl.next_idx); + lua_gettable(L, -2); + if (lua_isnil(L, -1)) { + lua_pop(L, 2); + return 0; + } + source->tuple = luaT_gettuple_with_format(L, -1, format); + if (source->tuple == NULL) + return 1; + ++source->tbl.next_idx; + lua_pop(L, 2); + break; + } + case SOURCE_TYPE_ITERATOR: { + int nresult = lua_iterator_next(L, source->it.it); + if (nresult == 0) + return 0; + source->tuple = luaT_gettuple_with_format(L, -nresult + 1, + format); + if (source->tuple == NULL) + return 1; + lua_pop(L, nresult); + break; + } + case SOURCE_TYPE_NONE: + default: + unreachable(); + } + box_tuple_ref(source->tuple); + return 0; +} + +/** + * Extract a merger object from the Lua stack. + */ +static struct merger * +check_merger(struct lua_State *L, int idx) +{ + uint32_t cdata_type; + struct merger **merger_ptr = luaL_checkcdata(L, idx, &cdata_type); + if (merger_ptr == NULL || cdata_type != merger_type_id) + return NULL; + return *merger_ptr; +} + +/** + * Extract a merger_iterator object from the Lua stack. + */ +static struct merger_iterator * +check_merger_iterator(struct lua_State *L, int idx) +{ + uint32_t cdata_type; + struct merger_iterator **it_ptr = luaL_checkcdata(L, idx, &cdata_type); + if (it_ptr == NULL || cdata_type != merger_iterator_type_id) + return NULL; + return *it_ptr; +} + +/** + * Extract an ibuf object from the Lua stack. + */ +static struct ibuf * +check_ibuf(struct lua_State *L, int idx) +{ + if (lua_type(L, idx) != LUA_TCDATA) + return NULL; + + uint32_t cdata_type; + struct ibuf *ibuf_ptr = luaL_checkcdata(L, idx, &cdata_type); + if (ibuf_ptr == NULL || cdata_type != ibuf_type_id) + return NULL; + return ibuf_ptr; +} + +#define RPOS_P(buf) ((const char **) &(buf)->rpos) + +/** + * Skip (and check) the wrapper around tuples array (and the array + * itself). + * + * Expected different kind of wrapping depending of it->decode. + */ +static int +decode_header(struct merger_iterator *it, struct ibuf *buf, size_t *len_p) +{ + int ok = 1; + /* Decode {[IPROTO_DATA] = ...} header. */ + if (it->decode != BUFFER_TYPE_RAW) + ok = mp_typeof(*buf->rpos) == MP_MAP && + mp_decode_map(RPOS_P(buf)) == 1 && + mp_typeof(*buf->rpos) == MP_UINT && + mp_decode_uint(RPOS_P(buf)) == IPROTO_DATA; + /* Decode the array around call return values. */ + if (ok && (it->decode == BUFFER_TYPE_CALL || + it->decode == BUFFER_TYPE_CHAIN)) + ok = mp_typeof(*buf->rpos) == MP_ARRAY && + mp_decode_array(RPOS_P(buf)) > 0; + /* Decode the array around chained input. */ + if (ok && it->decode == BUFFER_TYPE_CHAIN) + ok = mp_typeof(*buf->rpos) == MP_ARRAY && + mp_decode_array(RPOS_P(buf)) > 0; + /* Decode the array around tuples to merge. */ + if (ok) + ok = mp_typeof(*buf->rpos) == MP_ARRAY; + if (ok) + *len_p = mp_decode_array(RPOS_P(buf)); + return ok; +} + +#undef RPOS_P + +/** + * Encode the wrapper around tuples array (and the array itself). + * + * The written msgpack depends on it->encode. + */ +static void +encode_header(struct merger_iterator *it, uint32_t result_len) +{ + struct ibuf *obuf = it->obuf; + + /* Encode {[IPROTO_DATA] = ...} header. */ + if (it->encode != BUFFER_TYPE_RAW) { + ibuf_reserve(obuf, mp_sizeof_map(1) + + mp_sizeof_uint(IPROTO_DATA)); + obuf->wpos = mp_encode_map(obuf->wpos, 1); + obuf->wpos = mp_encode_uint(obuf->wpos, IPROTO_DATA); + } + /* Encode the array around call return values. */ + if (it->encode == BUFFER_TYPE_CALL || it->encode == BUFFER_TYPE_CHAIN) { + ibuf_reserve(obuf, mp_sizeof_array(1)); + obuf->wpos = mp_encode_array(obuf->wpos, 1); + } + /* Encode the array around chained output. */ + if (it->encode == BUFFER_TYPE_CHAIN) { + ibuf_reserve(obuf, mp_sizeof_array(it->encode_chain_len)); + obuf->wpos = mp_encode_array(obuf->wpos, it->encode_chain_len); + } + /* Encode the array around resulting tuples. */ + ibuf_reserve(obuf, mp_sizeof_array(result_len)); + obuf->wpos = mp_encode_array(obuf->wpos, result_len); +} + +/** + * Push 'bad params' / 'bad param X' and the usage info to the Lua + * stack. + */ +static int +merger_usage(struct lua_State *L, const char *param_name) +{ + static const char *usage = "merger_inst:{ipairs,pairs,select}(" + "{source, source, ...}[, {" + "descending = or , " + "decode = 'raw' / 'select' / 'call' / " + "'chain' / , " + "buffer = > or , " + "encode = 'raw' / 'select' / 'call' / " + "'chain' / , " + "encode_chain_len = or }])"; + if (param_name == NULL) + lua_pushfstring(L, "Bad params, use: %s", usage); + else + lua_pushfstring(L, "Bad param \"%s\", use: %s", param_name, + usage); + return 1; +} + +/** + * Get a tuple from a top source, update the source, update the + * heap. + * + * The reference counter of the tuple is increased (in + * source_fetch). + * + * Return NULL when all sources are drained. + */ +static struct tuple * +merger_next(struct lua_State *L, struct merger *merger, + struct merger_iterator *it) +{ + struct heap_node *hnode = merger_heap_top(&it->heap); + if (hnode == NULL) + return NULL; + + struct merger_source *source = container_of(hnode, struct merger_source, + hnode); + struct tuple *tuple = source->tuple; + assert(tuple != NULL); + if (source_fetch(L, source, merger->format) != 0) { + lua_error(L); + unreachable(); + return NULL; + } + if (source->tuple == NULL) + MERGER_HEAP_DELETE(&it->heap, hnode, source); + else + MERGER_HEAP_UPDATE(&it->heap, hnode, source); + + return tuple; +} + +/** + * Determine type of a merger source on the Lua stack. + * + * Set *buf_p to buffer when the source is valid source of buffer + * type and buf_p is not NULL. + */ +static enum merger_source_type +parse_source_type(lua_State *L, int idx, struct ibuf **buf_p) +{ + if (lua_type(L, idx) == LUA_TCDATA) { + struct ibuf *buf = check_ibuf(L, idx); + if (buf == NULL) + return SOURCE_TYPE_NONE; + if (buf_p != NULL) + *buf_p = buf; + return SOURCE_TYPE_BUFFER; + } else if (lua_istable(L, idx)) { + lua_rawgeti(L, idx, 1); + int iscallable = luaT_iscallable(L, idx); + lua_pop(L, 1); + if (iscallable) + return SOURCE_TYPE_ITERATOR; + return SOURCE_TYPE_TABLE; + } + + return SOURCE_TYPE_NONE; +} + +/** + * Parse 'decode' / 'encode' options. + */ +static enum merger_buffer_type +parse_buffer_type(lua_State *L, int idx) +{ + if (lua_isnoneornil(L, idx)) + return BUFFER_TYPE_SELECT; + + if (lua_type(L, idx) != LUA_TSTRING) + return BUFFER_TYPE_NONE; + + size_t len; + const char *param = lua_tolstring(L, idx, &len); + + if (!strncmp(param, "raw", len)) + return BUFFER_TYPE_RAW; + else if (!strncmp(param, "select", len)) + return BUFFER_TYPE_SELECT; + else if (!strncmp(param, "call", len)) + return BUFFER_TYPE_CALL; + else if (!strncmp(param, "chain", len)) + return BUFFER_TYPE_CHAIN; + + return BUFFER_TYPE_NONE; +} + +/** + * Parse optional third parameter of merger_inst:pairs() and + * merger_inst:select() into the merger_iterator structure. + * + * Returns 0 on success. In case of an error it pushes an error + * message to the Lua stack and returns 1. + */ +static int +parse_opts(struct lua_State *L, int idx, struct merger_iterator *it) +{ + /* No opts: use defaults. */ + if (lua_isnoneornil(L, idx)) + return 0; + + /* Not a table: error. */ + if (!lua_istable(L, idx)) + return merger_usage(L, NULL); + + /* Parse descending to it->order. */ + lua_pushstring(L, "descending"); + lua_gettable(L, idx); + if (!lua_isnil(L, -1)) { + if (lua_isboolean(L, -1)) + it->order = lua_toboolean(L, -1) ? -1 : 1; + else + return merger_usage(L, "descending"); + } + lua_pop(L, 1); + + /* Parse decode to it->decode. */ + lua_pushstring(L, "decode"); + lua_gettable(L, idx); + if (!lua_isnil(L, -1)) { + it->decode = parse_buffer_type(L, -1); + if (it->decode == BUFFER_TYPE_NONE) + return merger_usage(L, "decode"); + } + lua_pop(L, 1); + + /* Parse buffer. */ + lua_pushstring(L, "buffer"); + lua_gettable(L, idx); + if (!lua_isnil(L, -1)) { + if ((it->obuf = check_ibuf(L, -1)) == NULL) + return merger_usage(L, "buffer"); + } + lua_pop(L, 1); + + /* Parse encode to it->encode. */ + lua_pushstring(L, "encode"); + lua_gettable(L, idx); + if (!lua_isnil(L, -1)) { + if (it->obuf == NULL) { + lua_pushfstring(L, "\"buffer\" option is mandatory " + "when \"encode\" is used"); + return 1; + } + it->encode = parse_buffer_type(L, -1); + if (it->encode == BUFFER_TYPE_NONE) + return merger_usage(L, "encode"); + } + lua_pop(L, 1); + + /* Parse encode_chain_len. */ + lua_pushstring(L, "encode_chain_len"); + lua_gettable(L, idx); + if (!lua_isnil(L, -1)) { + if (it->encode != BUFFER_TYPE_CHAIN) { + lua_pushfstring(L, "\"encode_chain_len\" is " + "forbidden without " + "{encode = 'chain'}"); + return 1; + } + if (lua_isnumber(L, -1)) + it->encode_chain_len = + (uint32_t) lua_tointeger(L, -1); + else + return merger_usage(L, "encode_chain_len"); + } + lua_pop(L, 1); + + /* Verify output_chain_len is provided when we + * going to use it for output buffer header + * encoding. */ + if (it->obuf != NULL && it->encode == BUFFER_TYPE_CHAIN && + it->encode_chain_len == 0) { + lua_pushfstring(L, "\"encode_chain_len\" is mandatory when " + "\"buffer\" and {encode = 'chain'} are " + "used"); + return 1; + } + + return 0; +} + +/** + * Parse sources table: second parameter pf merger_isnt:pairs() + * and merger_inst:select() into the merger_iterator structure. + * + * Note: This function should be called when options are already + * parsed (using parse_opts()). + * + * Returns 0 on success. In case of an error it pushes an error + * message to the Lua stack and returns 1. + */ +static int +parse_sources(struct lua_State *L, int idx, struct merger *merger, + struct merger_iterator *it) +{ + /* Allocate sources array. */ + uint32_t capacity = 8; + const ssize_t sources_size = capacity * sizeof(struct merger_source *); + it->sources = (struct merger_source **) malloc(sources_size); + if (it->sources == NULL) { + push_out_of_memory_error(L, sources_size, "it->sources"); + return 1; + } + + /* Fetch all sources. */ + while (true) { + lua_pushinteger(L, it->sources_count + 1); + lua_gettable(L, idx); + if (lua_isnil(L, -1)) + break; + + /* Shrink sources array if needed. */ + if (it->sources_count == capacity) { + capacity *= 2; + struct merger_source **new_sources; + const ssize_t new_sources_size = + capacity * sizeof(struct merger_source *); + new_sources = (struct merger_source **) realloc( + it->sources, new_sources_size); + if (new_sources == NULL) { + push_out_of_memory_error(L, new_sources_size, + "new_sources"); + return 1; + } + it->sources = new_sources; + } + + /* Allocate the new source. */ + it->sources[it->sources_count] = (struct merger_source *) + malloc(sizeof(struct merger_source)); + struct merger_source *current_source = + it->sources[it->sources_count]; + if (current_source == NULL) { + push_out_of_memory_error(L, + sizeof(struct merger_source), + "merger_source"); + return 1; + } + + /* + * Set type and tuple to correctly proceed in + * merger_iterator_delete() in case of any further + * error. + */ + struct ibuf *buf = NULL; + current_source->type = parse_source_type(L, -1, &buf); + current_source->tuple = NULL; + + /* + * Note: We need to increment sources count right + * after successful malloc() of the new source + * (before any further error check), because + * merger_iterator_delete() frees that amount of + * sources. + */ + ++it->sources_count; + + /* Initialize the new source. */ + switch (current_source->type) { + case SOURCE_TYPE_BUFFER: + if (!decode_header(it, buf, + ¤t_source->buf.remaining_tuples_cnt)) { + lua_pushstring(L, "Invalid merge source"); + return 1; + } + current_source->buf.buf = buf; + break; + case SOURCE_TYPE_TABLE: + /* Save a table ref and a next index. */ + lua_pushvalue(L, -1); /* Popped by luaL_ref(). */ + int tbl_ref = luaL_ref(L, LUA_REGISTRYINDEX); + current_source->tbl.ref = tbl_ref; + current_source->tbl.next_idx = 1; + break; + case SOURCE_TYPE_ITERATOR: + /* Wrap and save iterator. */ + current_source->it.it = + lua_iterator_new_fromtable(L, -1); + break; + case SOURCE_TYPE_NONE: + lua_pushfstring(L, "Unknown source type at index %d", + it->sources_count); + return 1; + default: + unreachable(); + return 1; + } + if (source_fetch(L, current_source, merger->format) != 0) + return 1; + if (current_source->tuple != NULL) + MERGER_HEAP_INSERT(&it->heap, + ¤t_source->hnode, + current_source); + } + lua_pop(L, it->sources_count + 1); + + return 0; +} + +/** + * Parse sources and options on Lua stack and create the new + * merger_interator instance. + */ +static struct merger_iterator * +merger_iterator_new(struct lua_State *L) +{ + struct merger *merger; + int ok = (lua_gettop(L) == 2 || lua_gettop(L) == 3) && + /* Merger. */ + (merger = check_merger(L, 1)) != NULL && + /* Sources. */ + lua_istable(L, 2) == 1 && + /* Opts. */ + (lua_isnoneornil(L, 3) == 1 || lua_istable(L, 3) == 1); + if (!ok) { + merger_usage(L, NULL); + lua_error(L); + unreachable(); + return NULL; + } + + struct merger_iterator *it = (struct merger_iterator *) + malloc(sizeof(struct merger_iterator)); + merger_heap_create(&it->heap); + it->key_def = key_def_dup(merger->key_def); + it->sources_count = 0; + it->sources = NULL; + it->decode = BUFFER_TYPE_NONE; + it->order = 1; + it->obuf = NULL; + it->encode = BUFFER_TYPE_NONE; + it->encode_chain_len = 0; + + if (parse_opts(L, 3, it) != 0 || parse_sources(L, 2, merger, it) != 0) { + merger_iterator_delete(L, it); + lua_error(L); + unreachable(); + return NULL; + } + + return it; +} + +/** + * Iterator gen function to traverse merger results. + * + * Expected a merger instance as the first parameter (state) and a + * merger_iterator as the second parameter (param) on the Lua + * stack. + * + * Push the merger_iterator (the new param) and the next tuple. + */ +static int +lbox_merger_iterator_gen(struct lua_State *L) +{ + struct merger *merger; + struct merger_iterator *it; + bool ok = (merger = check_merger(L, -2)) != NULL && + (it = check_merger_iterator(L, -1)) != NULL; + if (!ok) + return luaL_error(L, "Bad params, use: " + "lbox_merger_iterator_gen(merger, " + "merger_iterator)"); + + struct tuple *tuple = merger_next(L, merger, it); + if (tuple == NULL) { + lua_pushnil(L); + lua_pushnil(L); + return 2; + } + + /* Push merger_iterator, tuple. */ + *(struct merger_iterator **) + luaL_pushcdata(L, merger_iterator_type_id) = it; + luaT_pushtuple(L, tuple); + + box_tuple_unref(tuple); + return 2; +} + +/** + * Iterate over merge results from Lua. + * + * Push three values to the Lua stack: + * + * 1. gen (lbox_merger_iterator_gen wrapped by fun.wrap()); + * 2. param (merger); + * 3. state (merger_iterator). + */ +static int +lbox_merger_ipairs(struct lua_State *L) +{ + /* Create merger_iterator. */ + struct merger_iterator *it = merger_iterator_new(L); + lua_settop(L, 1); /* Pop sources, [opts]. */ + /* Stack: merger. */ + + if (it->obuf != NULL) + return luaL_error(L, "\"buffer\" option is forbidden with " + "merger_inst:pairs(<...>)"); + + luaL_loadstring(L, "return require('fun').wrap"); + lua_call(L, 0, 1); + lua_insert(L, -2); /* Swap merger and wrap. */ + /* Stack: wrap, merger. */ + + lua_pushcfunction(L, lbox_merger_iterator_gen); + lua_insert(L, -2); /* Swap merger and gen. */ + /* Stack: wrap, gen, merger. */ + + *(struct merger_iterator **) + luaL_pushcdata(L, merger_iterator_type_id) = it; + lua_pushcfunction(L, lbox_merger_iterator_gc); + luaL_setcdatagc(L, -2); + /* Stack: wrap, gen, merger, merger_iterator. */ + + /* Call fun.wrap(gen, merger, merger_iterator). */ + lua_call(L, 3, 3); + return 3; +} + +/** + * Write merge results into ibuf. + */ +static void +encode_result_buffer(struct lua_State *L, struct merger *merger, + struct merger_iterator *it) +{ + struct ibuf *obuf = it->obuf; + uint32_t result_len = 0; + uint32_t result_len_offset = 4; + + /* + * Reserve maximum size for the array around resulting + * tuples to set it later. + */ + encode_header(it, UINT32_MAX); + + /* Fetch, merge and copy tuples to the buffer. */ + struct tuple *tuple; + while ((tuple = merger_next(L, merger, it)) != NULL) { + uint32_t bsize = tuple->bsize; + ibuf_reserve(obuf, bsize); + memcpy(obuf->wpos, tuple_data(tuple), bsize); + obuf->wpos += bsize; + result_len_offset += bsize; + box_tuple_unref(tuple); + ++result_len; + } + + /* Write the real array size. */ + mp_store_u32(obuf->wpos - result_len_offset, result_len); +} + +/** + * Write merge results into the new Lua table. + */ +static int +create_result_table(struct lua_State *L, struct merger *merger, + struct merger_iterator *it) +{ + /* Create result table. */ + lua_newtable(L); + + uint32_t cur = 1; + + /* Fetch, merge and save tuples to the table. */ + struct tuple *tuple; + while ((tuple = merger_next(L, merger, it)) != NULL) { + luaT_pushtuple(L, tuple); + lua_rawseti(L, -2, cur); + box_tuple_unref(tuple); + ++cur; + } + + return 1; +} + +/** + * Perform the merge. + * + * Write results into a buffer or a Lua table depending on options. + * + * Expected merger instance, sources table and options (optional) + * on the Lua stack. + * + * Return the Lua table or nothing when the 'buffer' option is + * provided. + */ +static int +lbox_merger_select(struct lua_State *L) +{ + struct merger *merger = check_merger(L, 1); + if (merger == NULL) { + merger_usage(L, NULL); + lua_error(L); + } + + struct merger_iterator *it = merger_iterator_new(L); + lua_settop(L, 0); /* Pop merger, sources, [opts]. */ + + if (it->obuf == NULL) { + return create_result_table(L, merger, it); + } else { + encode_result_buffer(L, merger, it); + return 0; + } +} + +/** + * Create the new merger instance. + * + * Expected a table of key parts on the Lua stack. + * + * Returns the new instance. + */ +static int +lbox_merger_new(struct lua_State *L) +{ + if (lua_gettop(L) != 1 || lua_istable(L, 1) != 1) + return luaL_error(L, "Bad params, use: merger.new({" + "{fieldno = fieldno, type = type" + "[, is_nullable = is_nullable" + "[, collation_id = collation_id" + "[, collation = collation]]]}, ...}"); + uint32_t key_parts_count = 0; + uint32_t capacity = 8; + + const ssize_t parts_size = sizeof(struct key_part_def) * capacity; + struct key_part_def *parts = NULL; + parts = (struct key_part_def *) malloc(parts_size); + if (parts == NULL) + throw_out_of_memory_error(L, parts_size, "parts"); + + while (true) { + lua_pushinteger(L, key_parts_count + 1); + lua_gettable(L, 1); + if (lua_isnil(L, -1)) + break; + + /* Extend parts if necessary. */ + if (key_parts_count == capacity) { + capacity *= 2; + struct key_part_def *old_parts = parts; + const ssize_t parts_size = + sizeof(struct key_part_def) * capacity; + parts = (struct key_part_def *) realloc(parts, + parts_size); + if (parts == NULL) { + free(old_parts); + throw_out_of_memory_error(L, parts_size, + "parts"); + } + } + + /* Set parts[key_parts_count].fieldno. */ + lua_pushstring(L, "fieldno"); + lua_gettable(L, -2); + if (lua_isnil(L, -1)) { + free(parts); + return luaL_error(L, "fieldno must not be nil"); + } + /* + * Transform one-based Lua fieldno to zero-based + * fieldno to use in key_def_new(). + */ + parts[key_parts_count].fieldno = lua_tointeger(L, -1) - 1; + lua_pop(L, 1); + + /* Set parts[key_parts_count].type. */ + lua_pushstring(L, "type"); + lua_gettable(L, -2); + if (lua_isnil(L, -1)) { + free(parts); + return luaL_error(L, "type must not be nil"); + } + size_t type_len; + const char *type_name = lua_tolstring(L, -1, &type_len); + lua_pop(L, 1); + parts[key_parts_count].type = field_type_by_name(type_name, + type_len); + if (parts[key_parts_count].type == field_type_MAX) { + free(parts); + return luaL_error(L, "Unknown field type: %s", + type_name); + } + + /* Set parts[key_parts_count].is_nullable. */ + lua_pushstring(L, "is_nullable"); + lua_gettable(L, -2); + if (lua_isnil(L, -1)) { + parts[key_parts_count].is_nullable = false; + parts[key_parts_count].nullable_action = + ON_CONFLICT_ACTION_DEFAULT; + } else { + parts[key_parts_count].is_nullable = + lua_toboolean(L, -1); + parts[key_parts_count].nullable_action = + ON_CONFLICT_ACTION_NONE; + } + lua_pop(L, 1); + + /* Set parts[key_parts_count].coll_id using collation_id. */ + lua_pushstring(L, "collation_id"); + lua_gettable(L, -2); + if (lua_isnil(L, -1)) + parts[key_parts_count].coll_id = COLL_NONE; + else + parts[key_parts_count].coll_id = lua_tointeger(L, -1); + lua_pop(L, 1); + + /* Set parts[key_parts_count].coll_id using collation. */ + lua_pushstring(L, "collation"); + lua_gettable(L, -2); + /* Check whether box.cfg{} was called. */ + if ((parts[key_parts_count].coll_id != COLL_NONE || + !lua_isnil(L, -1)) && !box_is_configured()) { + free(parts); + return luaL_error(L, "Cannot use collations: " + "please call box.cfg{}"); + } + if (!lua_isnil(L, -1)) { + if (parts[key_parts_count].coll_id != COLL_NONE) { + free(parts); + return luaL_error( + L, "Conflicting options: collation_id " + "and collation"); + } + size_t coll_name_len; + const char *coll_name = lua_tolstring(L, -1, + &coll_name_len); + struct coll_id *coll_id = coll_by_name(coll_name, + coll_name_len); + if (coll_id == NULL) { + free(parts); + return luaL_error( + L, "Unknown collation: \"%s\"", + coll_name); + } + parts[key_parts_count].coll_id = coll_id->id; + } + lua_pop(L, 1); + + /* Check coll_id. */ + struct coll_id *coll_id = + coll_by_id(parts[key_parts_count].coll_id); + if (parts[key_parts_count].coll_id != COLL_NONE && + coll_id == NULL) { + uint32_t collation_id = parts[key_parts_count].coll_id; + free(parts); + return luaL_error(L, "Unknown collation_id: %d", + collation_id); + } + + /* Set parts[key_parts_count].sort_order. */ + parts[key_parts_count].sort_order = SORT_ORDER_ASC; + + ++key_parts_count; + } + + struct merger *merger = calloc(1, sizeof(*merger)); + if (merger == NULL) { + free(parts); + throw_out_of_memory_error(L, sizeof(*merger), "merger"); + } + merger->key_def = key_def_new(parts, key_parts_count); + free(parts); + if (merger->key_def == NULL) { + return luaL_error(L, "Cannot create merger->key_def"); + } + + merger->format = box_tuple_format_new(&merger->key_def, 1); + if (merger->format == NULL) { + box_key_def_delete(merger->key_def); + free(merger); + return luaL_error(L, "Cannot create merger->format"); + } + + *(struct merger **) luaL_pushcdata(L, merger_type_id) = merger; + + lua_pushcfunction(L, lbox_merger_gc); + luaL_setcdatagc(L, -2); + + return 1; +} + +/** + * Free the merger instance from a Lua code. + */ +static int +lbox_merger_gc(struct lua_State *L) +{ + struct merger *merger; + if ((merger = check_merger(L, 1)) == NULL) + return 0; + box_key_def_delete(merger->key_def); + box_tuple_format_unref(merger->format); + free(merger); + return 0; +} + +/** + * Free the merger iterator. + * + * We need to know Lua state here, because sources of table and + * iterator types are saved as references within the Lua state. + */ +static void +merger_iterator_delete(struct lua_State *L, struct merger_iterator *it) +{ + merger_heap_destroy(&it->heap); + box_key_def_delete(it->key_def); + + for (uint32_t i = 0; i < it->sources_count; ++i) { + assert(it->sources != NULL); + switch (it->sources[i]->type) { + case SOURCE_TYPE_BUFFER: + /* No-op. */ + break; + case SOURCE_TYPE_TABLE: + luaL_unref(L, LUA_REGISTRYINDEX, + it->sources[i]->tbl.ref); + break; + case SOURCE_TYPE_ITERATOR: + lua_iterator_free(L, it->sources[i]->it.it); + break; + case SOURCE_TYPE_NONE: + /* + * We can reach this block when + * parse_sources() find a bad source. Do + * nothing, just free the memory. + */ + break; + default: + unreachable(); + } + if (it->sources[i]->tuple != NULL) + box_tuple_unref(it->sources[i]->tuple); + free(it->sources[i]); + } + + if (it->sources != NULL) { + assert(it->sources_count > 0); + free(it->sources); + } + + free(it); +} + +/** + * Free the merger iterator from a Lua code. + */ +static int +lbox_merger_iterator_gc(struct lua_State *L) +{ + struct merger_iterator *it; + if ((it = check_merger_iterator(L, 1)) == NULL) + return 0; + merger_iterator_delete(L, it); + return 0; +} + +/** + * Register the module. + */ +LUA_API int +luaopen_merger(lua_State *L) +{ + luaL_cdef(L, "struct merger;"); + luaL_cdef(L, "struct merger_iterator;"); + luaL_cdef(L, "struct ibuf;"); + merger_type_id = luaL_ctypeid(L, "struct merger&"); + merger_iterator_type_id = luaL_ctypeid(L, "struct merger_iterator&"); + ibuf_type_id = luaL_ctypeid(L, "struct ibuf"); + lua_newtable(L); + static const struct luaL_Reg meta[] = { + {"new", lbox_merger_new}, + {NULL, NULL} + }; + luaL_register_module(L, "merger", meta); + + /* Export C functions to Lua. */ + lua_newtable(L); /* merger.internal */ + lua_pushcfunction(L, lbox_merger_select); + lua_setfield(L, -2, "select"); + lua_pushcfunction(L, lbox_merger_ipairs); + lua_setfield(L, -2, "ipairs"); + lua_setfield(L, -2, "internal"); + + return 1; +} diff --git a/src/lua/merger.h b/src/lua/merger.h new file mode 100644 index 000000000..6d7ca0957 --- /dev/null +++ b/src/lua/merger.h @@ -0,0 +1,39 @@ +#ifndef TARANTOOL_LUA_MERGER_H_INCLUDED +#define TARANTOOL_LUA_MERGER_H_INCLUDED 1 +/* + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +struct lua_State; + +int +luaopen_merger(struct lua_State *L); + +#endif /* TARANTOOL_LUA_MERGER_H_INCLUDED */ diff --git a/src/lua/merger.lua b/src/lua/merger.lua new file mode 100644 index 000000000..173cf4154 --- /dev/null +++ b/src/lua/merger.lua @@ -0,0 +1,19 @@ +local ffi = require('ffi') +local merger = require('merger') + +local merger_t = ffi.typeof('struct merger') + +local methods = { + ['select'] = merger.internal.select, + ['pairs'] = merger.internal.ipairs, + ['ipairs'] = merger.internal.ipairs, +} + +ffi.metatype(merger_t, { + __index = function(self, key) + return methods[key] + end, + -- Lua 5.2 compatibility + __pairs = merger.internal.ipairs, + __ipairs = merger.internal.ipairs, +}) diff --git a/test/app-tap/merger.test.lua b/test/app-tap/merger.test.lua new file mode 100755 index 000000000..7ec6e072c --- /dev/null +++ b/test/app-tap/merger.test.lua @@ -0,0 +1,693 @@ +#!/usr/bin/env tarantool + +local tap = require('tap') +local buffer = require('buffer') +local msgpackffi = require('msgpackffi') +local digest = require('digest') +local merger = require('merger') +local crypto = require('crypto') +local fiber = require('fiber') +local utf8 = require('utf8') +local ffi = require('ffi') + +local IPROTO_DATA = 48 + +local function merger_usage(param) + local msg = 'merger_inst:{ipairs,pairs,select}(' .. + '{source, source, ...}[, {' .. + 'descending = or , ' .. + 'decode = \'raw\' / \'select\' / \'call\' / \'chain\' / , ' .. + 'buffer = > or , ' .. + 'encode = \'raw\' / \'select\' / \'call\' / \'chain\' / , ' .. + 'encode_chain_len = or }])' + if not param then + return ('Bad params, use: %s'):format(msg) + else + return ('Bad param "%s", use: %s'):format(param, msg) + end +end + +-- Get buffer with data encoded without last 'trunc' bytes. +local function truncated_msgpack_buffer(data, trunc) + local data = msgpackffi.encode(data) + data = data:sub(1, data:len() - trunc) + local len = data:len() + local buf = buffer.ibuf() + -- Ensure we have enough buffer to write len + trunc bytes. + buf:reserve(len + trunc) + local p = buf:alloc(len) + -- Ensure len bytes follows with trunc zero bytes. + ffi.copy(p, data .. string.rep('\0', trunc), len + trunc) + return buf +end + +local bad_merger_new_calls = { + -- Cases to call before box.cfg{}. + { + 'Pass a field on an unknown type', + parts = {{ + fieldno = 2, + type = 'unknown', + }}, + exp_err = 'Unknown field type: unknown', + }, + { + 'Try to use collation_id before box.cfg{}', + parts = {{ + fieldno = 1, + type = 'string', + collation_id = 2, + }}, + exp_err = 'Cannot use collations: please call box.cfg{}', + }, + { + 'Try to use collation before box.cfg{}', + parts = {{ + fieldno = 1, + type = 'string', + collation = 'unicode_ci', + }}, + exp_err = 'Cannot use collations: please call box.cfg{}', + }, + function() + -- For collations. + box.cfg{} + end, + -- Cases to call after box.cfg{}. + { + 'Try to use both collation_id and collation', + parts = {{ + fieldno = 1, + type = 'string', + collation_id = 2, + collation = 'unicode_ci', + }}, + exp_err = 'Conflicting options: collation_id and collation', + }, + { + 'Unknown collation_id', + parts = {{ + fieldno = 1, + type = 'string', + collation_id = 42, + }}, + exp_err = 'Unknown collation_id: 42', + }, + { + 'Unknown collation name', + parts = {{ + fieldno = 1, + type = 'string', + collation = 'unknown', + }}, + exp_err = 'Unknown collation: "unknown"', + }, +} + +local bad_merger_methods_calls = { + { + 'Bad opts', + sources = {}, + opts = 1, + exp_err = merger_usage(nil), + }, + { + 'Bad opts.descending', + sources = {}, + opts = {descending = 1}, + exp_err = merger_usage('descending'), + }, + { + 'Bad opts.decode', + sources = {}, + opts = {decode = 1}, + exp_err = merger_usage('decode'), + }, + { + 'Bad source', + sources = {1}, + opts = nil, + exp_err = 'Unknown source type at index 1', + }, + { + 'Bad cdata source', + sources = {ffi.new('char *')}, + opts = nil, + exp_err = 'Unknown source type at index 1', + }, + { + 'Missed encode_chain_len', + sources = {}, + opts = {buffer = buffer.ibuf(), encode = 'chain'}, + exp_err = '"encode_chain_len" is mandatory when "buffer" and ' .. + '{encode = \'chain\'} are used', + }, + { + 'Wrong source of table type', + sources = {{1}}, + opts = nil, + exp_err = 'A tuple or a table expected, got number', + }, + { + 'Use buffer with an iterator result', + sources = {}, + opts = {buffer = buffer.ibuf()}, + funcs = {'pairs', 'ipairs'}, + exp_err = '"buffer" option is forbidden with merger_inst:pairs(<...>)', + }, + { + 'Bad decode type', + sources = {}, + opts = {decode = 1}, + exp_err = merger_usage('decode'), + }, + { + 'Bad decode string', + sources = {}, + opts = {decode = 'bad value'}, + exp_err = merger_usage('decode'), + }, + { + 'A table source ignores {decode = \'chain\'}', + sources = {{{''}}}, + opts = {decode = 'chain'}, + exp_err = nil, + }, + { + 'An iterator source ignores {decode = \'chain\'}', + sources = {{pairs({})}}, + opts = {decode = 'chain'}, + exp_err = nil, + }, + { + 'Bad encode type', + sources = {}, + opts = {buffer = buffer.ibuf(), encode = 1}, + funcs = {'select'}, + exp_err = merger_usage('encode'), + }, + { + 'Bad encode string', + sources = {}, + opts = {buffer = buffer.ibuf(), encode = 'bad value'}, + funcs = {'select'}, + exp_err = merger_usage('encode'), + }, + { + -- Any encode value should lead to an error, but we check + -- only 'select' here. + 'Use "encode" without "buffer"', + sources = {}, + opts = {encode = 'select'}, + exp_err = '"buffer" option is mandatory when "encode" is used', + }, + { + 'Use "encode_chain_len" without "encode"', + sources = {}, + opts = {buffer = buffer.ibuf(), encode_chain_len = 1}, + exp_err = '"encode_chain_len" is forbidden without ' .. + '{encode = \'chain\'}', + }, + { + -- Any encode value except "chain" should lead to an + -- error, but we check only 'select' here. + 'Use "encode_chain_len" with "encode" != "chain"', + sources = {}, + opts = {buffer = buffer.ibuf(), encode = 'select', + encode_chain_len = 1}, + exp_err = '"encode_chain_len" is forbidden without ' .. + '{encode = \'chain\'}', + }, + { + 'Bad encode_chain_len type', + sources = {}, + opts = {buffer = buffer.ibuf(), encode = 'chain', + encode_chain_len = 'bad value'}, + exp_err = merger_usage('encode_chain_len'), + }, + { + 'Bad msgpack source: wrong length of the tuples array', + -- Remove the last tuple from msgpack data, but keep old + -- tuples array size. + sources = { + truncated_msgpack_buffer({[IPROTO_DATA] = {{''}, {''}, {''}}}, 2), + }, + opts = {}, + funcs = {'select'}, + exp_err = 'Unexpected msgpack buffer end', + }, + { + 'Bad msgpack source: wrong length of a tuple', + -- Remove half of the last tuple, but keep old tuple size. + sources = { + truncated_msgpack_buffer({[IPROTO_DATA] = {{''}, {''}, {''}}}, 1), + }, + opts = {}, + funcs = {'select'}, + exp_err = 'Unexpected msgpack buffer end', + }, +} + +local schemas = { + { + name = 'small_unsigned', + parts = { + { + fieldno = 2, + type = 'unsigned', + } + }, + gen_tuple = function(tupleno) + return {'id_' .. tostring(tupleno), tupleno} + end, + }, + -- Merger allocates a memory for 8 parts by default. + -- Test that reallocation works properly. + -- Test with N-1 equal parts and Nth different. + { + name = 'many_parts', + parts = (function() + local parts = {} + for i = 1, 128 do + parts[i] = { + fieldno = i, + type = 'unsigned', + } + end + return parts + end)(), + gen_tuple = function(tupleno) + local tuple = {} + -- 127 constant parts + for i = 1, 127 do + tuple[i] = i + end + -- 128th part is varying + tuple[128] = tupleno + return tuple + end, + -- reduce tuples count to decrease test run time + tuples_cnt = 16, + }, + -- Test null value in nullable field of an index. + { + name = 'nullable', + parts = { + { + fieldno = 1, + type = 'unsigned', + }, + { + fieldno = 2, + type = 'string', + is_nullable = true, + }, + }, + gen_tuple = function(i) + if i % 1 == 1 then + return {0, tostring(i)} + else + return {0, box.NULL} + end + end, + }, + -- Test index part with 'collation_id' option (as in net.box's + -- response). + { + name = 'collation_id', + parts = { + { + fieldno = 1, + type = 'string', + collation_id = 2, -- unicode_ci + }, + }, + gen_tuple = function(i) + local letters = {'a', 'b', 'c', 'A', 'B', 'C'} + if i <= #letters then + return {letters[i]} + else + return {''} + end + end, + }, + -- Test index part with 'collation' option (as in local index + -- parts). + { + name = 'collation', + parts = { + { + fieldno = 1, + type = 'string', + collation = 'unicode_ci', + }, + }, + gen_tuple = function(i) + local letters = {'a', 'b', 'c', 'A', 'B', 'C'} + if i <= #letters then + return {letters[i]} + else + return {''} + end + end, + }, +} + +local function is_unicode_ci_part(part) + return part.collation_id == 2 or part.collation == 'unicode_ci' +end + +local function sort_tuples(tuples, parts, opts) + local function tuple_comparator(a, b) + for _, part in ipairs(parts) do + local fieldno = part.fieldno + if a[fieldno] ~= b[fieldno] then + if a[fieldno] == nil then + return -1 + end + if b[fieldno] == nil then + return 1 + end + if is_unicode_ci_part(part) then + return utf8.casecmp(a[fieldno], b[fieldno]) + end + return a[fieldno] < b[fieldno] and -1 or 1 + end + end + + return 0 + end + + local function tuple_comparator_wrapper(a, b) + local cmp = tuple_comparator(a, b) + if cmp < 0 then + return not opts.descending + elseif cmp > 0 then + return opts.descending + else + return false + end + end + + table.sort(tuples, tuple_comparator_wrapper) +end + +local function lowercase_unicode_ci_fields(tuples, parts) + for i = 1, #tuples do + local tuple = tuples[i] + for _, part in ipairs(parts) do + if is_unicode_ci_part(part) then + -- Workaround #3709. + if tuple[part.fieldno]:len() > 0 then + tuple[part.fieldno] = utf8.lower(tuple[part.fieldno]) + end + end + end + end +end + +local function prepare_data(schema, tuples_cnt, sources_cnt, opts) + local opts = opts or {} + local input_type = opts.input_type + local use_table_as_tuple = opts.use_table_as_tuple + + local tuples = {} + local exp_result = {} + + -- Ensure empty sources are empty table and not nil. + for i = 1, sources_cnt do + if tuples[i] == nil then + tuples[i] = {} + end + end + + -- Prepare N tables with tuples as input for merger. + for i = 1, tuples_cnt do + -- [1, sources_cnt] + local guava = digest.guava(i, sources_cnt) + 1 + local tuple = schema.gen_tuple(i) + table.insert(exp_result, tuple) + if not use_table_as_tuple then + assert(input_type ~= 'buffer') + tuple = box.tuple.new(tuple) + end + table.insert(tuples[guava], tuple) + end + + -- Sort tuples within each source. + for _, source_tuples in pairs(tuples) do + sort_tuples(source_tuples, schema.parts, opts) + end + + -- Sort expected result. + sort_tuples(exp_result, schema.parts, opts) + + -- Fill sources. + local sources + if input_type == 'table' then + -- Imitate netbox's select w/o {buffer = ...}. + sources = tuples + elseif input_type == 'buffer' then + -- Imitate netbox's select with {buffer = ...}. + sources = {} + for i = 1, sources_cnt do + local data + if opts.decode == 'raw' then + data = tuples[i] + elseif opts.decode == nil or opts.decode == 'select' then + data = {[IPROTO_DATA] = tuples[i]} + elseif opts.decode == 'call' then + data = {[IPROTO_DATA] = {tuples[i]}} + elseif opts.decode == 'chain' then + data = {[IPROTO_DATA] = {{tuples[i]}}} + else + assert(false) + end + sources[i] = buffer.ibuf() + msgpackffi.internal.encode_r(sources[i], data, 0) + end + elseif input_type == 'iterator' then + -- Lua iterator. + sources = {} + for i = 1, sources_cnt do + sources[i] = { + -- gen (next) + next, + -- param (tuples) + tuples[i], + -- state (idx) + nil + } + end + end + + return sources, exp_result +end + +local function test_case_opts_str(opts) + local params = {} + + if opts.decode then + table.insert(params, 'decode: ' .. opts.decode) + end + + if opts.encode then + table.insert(params, 'encode: ' .. opts.encode) + end + + if opts.input_type then + table.insert(params, 'input_type: ' .. opts.input_type) + end + + if opts.output_type then + table.insert(params, 'output_type: ' .. opts.output_type) + end + + if opts.descending then + table.insert(params, 'descending') + end + + if opts.use_table_as_tuple then + table.insert(params, 'use_table_as_tuple') + end + + if next(params) == nil then + return '' + end + + return (' (%s)'):format(table.concat(params, ', ')) +end + +local function run_merger(test, schema, tuples_cnt, sources_cnt, opts) + fiber.yield() + + local opts = opts or {} + + -- Prepare data. + local sources, exp_result = + prepare_data(schema, tuples_cnt, sources_cnt, opts) + + -- Create a merger instance and fill options. + local merger_inst = merger.new(schema.parts) + local merger_opts = { + decode = opts.decode, + encode = opts.encode, + descending = opts.descending, + } + if opts.output_type == 'buffer' then + merger_opts.buffer = buffer.ibuf() + end + if opts.encode == 'chain' then + merger_opts.encode_chain_len = 1 + end + + local res + + -- Run merger and prepare output for compare. + if opts.output_type == 'table' then + -- Table output. + res = merger_inst:select(sources, merger_opts) + elseif opts.output_type == 'buffer' then + -- Buffer output. + merger_inst:select(sources, merger_opts) + local obuf = merger_opts.buffer + local data = msgpackffi.decode(obuf.rpos) + + if opts.encode == 'raw' then + res = data + elseif opts.encode == nil or opts.encode == 'select' then + res = data[IPROTO_DATA] + elseif opts.encode == 'call' then + res = data[IPROTO_DATA][1] + elseif opts.encode == 'chain' then + res = data[IPROTO_DATA][1][1] + else + assert(false) + end + else + -- Iterator output. + assert(opts.output_type == 'iterator') + res = merger_inst:pairs(sources, merger_opts):totable() + end + + -- A bit more postprocessing to compare. + for i = 1, #res do + if type(res[i]) ~= 'table' then + res[i] = res[i]:totable() + end + end + + -- unicode_ci does not differentiate btw 'A' and 'a', so the + -- order is arbitrary. We transform fields with unicode_ci + -- collation in parts to lower case before comparing. + lowercase_unicode_ci_fields(res, schema.parts) + lowercase_unicode_ci_fields(exp_result, schema.parts) + + test:is_deeply(res, exp_result, + ('check order on %3d tuples in %4d sources%s') + :format(tuples_cnt, sources_cnt, test_case_opts_str(opts))) +end + +local function run_case(test, schema, opts) + local opts = opts or {} + + local case_name = ('testing on schema %s%s'):format( + schema.name, test_case_opts_str(opts)) + local tuples_cnt = schema.tuples_cnt or 100 + + local encode = opts.encode + local decode = opts.decode + local input_type = opts.input_type + local output_type = opts.output_type + local use_table_as_tuple = opts.use_table_as_tuple + + -- Skip meaningless flags combinations. + if input_type ~= 'buffer' and decode ~= nil then + return + end + if output_type ~= 'buffer' and encode ~= nil then + return + end + if input_type == 'buffer' and not use_table_as_tuple then + return + end + + test:test(case_name, function(test) + test:plan(6) + + -- Check with small buffers count. + run_merger(test, schema, tuples_cnt, 1, opts) + run_merger(test, schema, tuples_cnt, 2, opts) + run_merger(test, schema, tuples_cnt, 3, opts) + run_merger(test, schema, tuples_cnt, 4, opts) + run_merger(test, schema, tuples_cnt, 5, opts) + + -- Check more buffers then tuples count. + run_merger(test, schema, tuples_cnt, 1000, opts) + end) +end + +local test = tap.test('merger') +test:plan(#bad_merger_new_calls - 1 + #bad_merger_methods_calls + + #schemas * 126) + +-- Bad merger.new() calls. +for _, case in ipairs(bad_merger_new_calls) do + if type(case) == 'function' then + case() + else + local ok, err = pcall(merger.new, case.parts) + test:is_deeply({ok, err}, {false, case.exp_err}, case[1]) + end +end + +-- Create the instance to use in testing merger's methods below. +local merger_inst = merger.new({{ + fieldno = 1, + type = 'string', +}}) + +-- Bad source or/and opts parameters for merger's methods. +for _, case in ipairs(bad_merger_methods_calls) do + test:test(case[1], function(test) + local funcs = case.funcs or {'pairs', 'ipairs', 'select'} + test:plan(#funcs) + for _, func in ipairs(funcs) do + local exp_ok = case.exp_err == nil + local ok, err = pcall(merger_inst[func], merger_inst, case.sources, + case.opts) + if ok then + err = nil + end + test:is_deeply({ok, err}, {exp_ok, case.exp_err}, func) + end + end) +end + +-- Merging cases. +for _, decode in ipairs({'nil', 'raw', 'select', 'call', 'chain'}) do + for _, encode in ipairs({'nil', 'raw', 'select', 'call', 'chain'}) do + for _, input_type in ipairs({'buffer', 'table', 'iterator'}) do + for _, output_type in ipairs({'buffer', 'table', 'iterator'}) do + for _, descending in ipairs({false, true}) do + for _, use_table_as_tuple in ipairs({false, true}) do + for _, schema in ipairs(schemas) do + decode = decode ~= 'nil' and decode or nil + encode = encode ~= 'nil' and encode or nil + run_case(test, schema, { + decode = decode, + encode = encode, + input_type = input_type, + output_type = output_type, + descending = descending, + use_table_as_tuple = use_table_as_tuple, + }) + end + end + end + end + end + end +end + +os.exit(test:check() and 0 or 1) diff --git a/test/app-tap/suite.ini b/test/app-tap/suite.ini index 86af82637..1a2abb79f 100644 --- a/test/app-tap/suite.ini +++ b/test/app-tap/suite.ini @@ -2,4 +2,5 @@ core = app description = application server tests (TAP) lua_libs = lua/require_mod.lua lua/serializer_test.lua +long_run = merger.test.lua is_parallel = True -- 2.19.2