From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Alexander Turenko Subject: [PATCH v3 7/7] Add merger for tuple streams (Lua part) Date: Wed, 10 Apr 2019 18:21:25 +0300 Message-Id: <9af4b8f1311537ef696d71a1b09bc1721bde8ef0.1554906327.git.alexander.turenko@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit To: Vladimir Davydov Cc: Alexander Turenko , tarantool-patches@freelists.org List-ID: Fixes #3276. @TarantoolBot document Title: Merger for tuple streams The main concept of the merger is a source. It is an object that provides a stream of tuples. There are four types of sources: a tuple source, a table source, a buffer source and a merger itself. A tuple source just return one tuple. However this source (as well as a table and a buffer ones) supports fetching of a next data chunk, so the API allows to create it from a Lua iterator: `merger.new_tuple_source(gen, param, state)`. A `gen` function should return `state, tuple` on each call and then return `nil` when no more tuples available. Consider the example: ```lua box.cfg({}) box.schema.space.create('s') box.space.s:create_index('pk') box.space.s:insert({1}) box.space.s:insert({2}) box.space.s:insert({3}) s = merger.new_tuple_source(box.space.s:pairs()) s:select() --- - - [1] - [2] - [3] ... s = merger.new_tuple_source(box.space.s:pairs()) s:pairs():totable() --- - - [1] - [2] - [3] ... ``` As we see a source (it is common for all sources) has `:select()` and `:pairs()` methods. The first one has two options: `buffer` and `limit` with the same meaning as ones in net.box `:select()`. The `:pairs()` method (or `:ipairs()` alias) returns a luafun iterator (it is a Lua iterator, but also provides a set of handy methods to operate in functional style). The same API exists to create a table and a buffer source: `merger.new_table_source(gen, param, state)` and `merger.new_buffer_source(gen, param, state)`. A `gen` function should return a table or a buffer on each call. There are also helpers that are useful when all data are available at once: `merger.new_source_fromtable(tbl)` and `merger.new_source_frombuffer(buf)`. A merger is a special kind of a source, which is created from a key_def object and a set of sources. It performs a kind of the merge sort: chooses a source with a minimal / maximal tuple on each step, consumes a tuple from this source and repeats. The API to create a merger is the following: ```lua local ctx = merger.context.new(key_def.new(<...>)) local sources = {<...>} local merger_inst = merger.new(ctx, sources, { -- Ascending (false) or descending (true) order. -- Default is ascending. reverse = or , }) ``` An instance of a merger has the same `:select()` and `:pairs()` methods as any other source. A merger context is a part of a merger state that is immutable and can be cached across requests with the same ordering rules (typically requests to a same space). The `key_def.new()` function takes a table of key parts as an argument in the same format as box.space.<...>.index.<...>.parts or conn.space.<...>.index.<...>.parts (where conn is a net.box connection): ``` local key_parts = { { fieldno = , type = , [ is_nullable = , ] [ collation_id = , ] [ collation = , ] }, ... } local key_def_inst = key_def.new(key_parts) ``` --- src/box/CMakeLists.txt | 2 + src/box/lua/init.c | 7 +- src/box/lua/merger.c | 1184 ++++++++++++++++++++++++++++++++++ src/box/lua/merger.h | 47 ++ src/box/lua/merger.lua | 41 ++ test/box-tap/merger.test.lua | 725 +++++++++++++++++++++ 6 files changed, 2005 insertions(+), 1 deletion(-) create mode 100644 src/box/lua/merger.c create mode 100644 src/box/lua/merger.h create mode 100644 src/box/lua/merger.lua create mode 100755 test/box-tap/merger.test.lua diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index d1251c326..491e3d160 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -13,6 +13,7 @@ lua_source(lua_sources lua/upgrade.lua) lua_source(lua_sources lua/console.lua) lua_source(lua_sources lua/xlog.lua) lua_source(lua_sources lua/key_def.lua) +lua_source(lua_sources lua/merger.lua) set(bin_sources) bin_source(bin_sources bootstrap.snap bootstrap.h) @@ -143,6 +144,7 @@ add_library(box STATIC lua/xlog.c lua/execute.c lua/key_def.c + lua/merger.c ${bin_sources}) target_link_libraries(box box_error tuple stat xrow xlog vclock crc32 scramble diff --git a/src/box/lua/init.c b/src/box/lua/init.c index 6b8be5096..76b987b4b 100644 --- a/src/box/lua/init.c +++ b/src/box/lua/init.c @@ -60,6 +60,7 @@ #include "box/lua/tuple.h" #include "box/lua/execute.h" #include "box/lua/key_def.h" +#include "box/lua/merger.h" extern char session_lua[], tuple_lua[], @@ -70,7 +71,8 @@ extern char session_lua[], feedback_daemon_lua[], net_box_lua[], upgrade_lua[], - console_lua[]; + console_lua[], + merger_lua[]; static const char *lua_sources[] = { "box/session", session_lua, @@ -83,6 +85,7 @@ static const char *lua_sources[] = { "box/load_cfg", load_cfg_lua, "box/xlog", xlog_lua, "box/key_def", key_def_lua, + "box/merger", merger_lua, NULL }; @@ -317,6 +320,8 @@ box_lua_init(struct lua_State *L) lua_pop(L, 1); luaopen_key_def(L); lua_pop(L, 1); + luaopen_merger(L); + lua_pop(L, 1); /* Load Lua extension */ for (const char **s = lua_sources; *s; s += 2) { diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c new file mode 100644 index 000000000..ebe60bc8d --- /dev/null +++ b/src/box/lua/merger.c @@ -0,0 +1,1184 @@ +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include "box/lua/merger.h" + +#include +#include +#include +#include +#include + +#include /* lua_*() */ +#include /* luaL_*() */ + +#include "fiber.h" /* fiber() */ +#include "diag.h" /* diag_set() */ + +#include "box/tuple.h" /* box_tuple_*() */ + +#include "lua/error.h" /* luaT_error() */ +#include "lua/utils.h" /* luaL_pushcdata(), + luaL_iterator_*() */ + +#include "box/lua/key_def.h" /* check_key_def() */ +#include "box/lua/tuple.h" /* luaT_tuple_new() */ + +#include "small/ibuf.h" /* struct ibuf */ +#include "msgpuck.h" /* mp_*() */ + +#include "box/merger.h" /* merger_*() */ + +static uint32_t merger_source_type_id = 0; +static uint32_t merger_context_type_id = 0; +static uint32_t ibuf_type_id = 0; + +/** + * A type of a function to create a source from a Lua iterator on + * a Lua stack. + * + * Such function is to be passed to lbox_merger_source_new() as + * a parameter. + */ +typedef struct merger_source *(*luaL_merger_source_new_f)(struct lua_State *L); + +/* {{{ Helpers */ + +/** + * Extract an ibuf object from the Lua stack. + */ +static struct ibuf * +luaT_check_ibuf(struct lua_State *L, int idx) +{ + if (lua_type(L, idx) != LUA_TCDATA) + return NULL; + + uint32_t cdata_type; + struct ibuf *ibuf_ptr = luaL_checkcdata(L, idx, &cdata_type); + if (ibuf_ptr == NULL || cdata_type != ibuf_type_id) + return NULL; + return ibuf_ptr; +} + +/** + * Extract a merger source from the Lua stack. + */ +static struct merger_source * +luaT_check_merger_source(struct lua_State *L, int idx) +{ + uint32_t cdata_type; + struct merger_source **source_ptr = luaL_checkcdata(L, idx, + &cdata_type); + if (source_ptr == NULL || cdata_type != merger_source_type_id) + return NULL; + return *source_ptr; +} + +/** + * Extract a merger context from the Lua stack. + */ +static struct merger_context * +luaT_check_merger_context(struct lua_State *L, int idx) +{ + uint32_t cdata_type; + struct merger_context **ctx_ptr = luaL_checkcdata(L, idx, &cdata_type); + if (ctx_ptr == NULL || cdata_type != merger_context_type_id) + return NULL; + return *ctx_ptr; +} + +/** + * Skip an array around tuples and save its length. + */ +static int +decode_header(struct ibuf *buf, size_t *len_p) +{ + /* Check the buffer is correct. */ + if (buf->rpos > buf->wpos) + return -1; + + /* Skip decoding if the buffer is empty. */ + if (ibuf_used(buf) == 0) { + *len_p = 0; + return 0; + } + + /* Check and skip the array around tuples. */ + int ok = mp_typeof(*buf->rpos) == MP_ARRAY; + if (ok) + ok = mp_check_array(buf->rpos, buf->wpos) <= 0; + if (ok) + *len_p = mp_decode_array((const char **) &buf->rpos); + return ok ? 0 : -1; +} + +/** + * Encode an array around tuples. + */ +static void +encode_header(struct ibuf *obuf, uint32_t result_len) +{ + ibuf_reserve(obuf, mp_sizeof_array(result_len)); + obuf->wpos = mp_encode_array(obuf->wpos, result_len); +} + +/* }}} */ + +/* {{{ Create, delete structures from Lua */ + +/** + * Free a merger source from a Lua code. + */ +static int +lbox_merger_source_gc(struct lua_State *L) +{ + struct merger_source *source; + if ((source = luaT_check_merger_source(L, 1)) == NULL) + return 0; + merger_source_unref(source); + return 0; +} + +/** + * Free a merger context from a Lua code. + */ +static int +lbox_merger_context_gc(struct lua_State *L) +{ + struct merger_context *ctx; + if ((ctx = luaT_check_merger_context(L, 1)) == NULL) + return 0; + merger_context_unref(ctx); + return 0; +} + +/** + * Create a new source from a Lua iterator and push it onto the + * Lua stack. + * + * It is the helper for lbox_merger_new_buffer_source(), + * lbox_merger_new_table_source() and + * lbox_merger_new_tuple_source(). + */ +static int +lbox_merger_source_new(struct lua_State *L, const char *func_name, + luaL_merger_source_new_f luaL_merger_source_new) +{ + int top = lua_gettop(L); + if (top < 1 || top > 3 || !luaL_iscallable(L, 1)) + return luaL_error(L, "Usage: %s(gen, param, state)", func_name); + + /* + * luaL_merger_source_new() reads exactly three top + * values. + */ + while (lua_gettop(L) < 3) + lua_pushnil(L); + + struct merger_source *source = luaL_merger_source_new(L); + if (source == NULL) + return luaT_error(L); + merger_source_ref(source); + *(struct merger_source **) luaL_pushcdata(L, merger_source_type_id) = + source; + lua_pushcfunction(L, lbox_merger_source_gc); + luaL_setcdatagc(L, -2); + + return 1; +} + +/** + * Create a new merger context and push it to a Lua stack. + * + * Expect cdata on a Lua stack. + */ +static int +lbox_merger_context_new(struct lua_State *L) +{ + struct key_def *key_def; + if (lua_gettop(L) != 1 || (key_def = check_key_def(L, 1)) == NULL) + return luaL_error(L, "Usage: merger.context.new(key_def)"); + + struct merger_context *ctx = merger_context_new(key_def); + if (ctx == NULL) + return luaT_error(L); + + merger_context_ref(ctx); + *(struct merger_context **) luaL_pushcdata(L, merger_context_type_id) = + ctx; + lua_pushcfunction(L, lbox_merger_context_gc); + luaL_setcdatagc(L, -2); + + return 1; +} + +/** + * Raise a Lua error with merger.new() usage info. + */ +static int +lbox_merger_new_usage(struct lua_State *L, const char *param_name) +{ + static const char *usage = "merger.new(merger_context, " + "{source, source, ...}[, {" + "reverse = or }])"; + if (param_name == NULL) + return luaL_error(L, "Bad params, use: %s", usage); + else + return luaL_error(L, "Bad param \"%s\", use: %s", param_name, + usage); +} + +/** + * Parse a second parameter of merger.new() into an array of + * sources. + * + * Return an array of pointers to sources and set @a + * sources_count_ptr. In case of an error set a diag and return + * NULL. + * + * It is the helper for lbox_merger_new(). + */ +static struct merger_source ** +luaT_merger_new_parse_sources(struct lua_State *L, int idx, + uint32_t *sources_count_ptr) +{ + /* Allocate sources array. */ + uint32_t sources_count = lua_objlen(L, idx); + const ssize_t sources_size = + sources_count * sizeof(struct merger_source *); + struct merger_source **sources = + (struct merger_source **) malloc(sources_size); + if (sources == NULL) { + diag_set(OutOfMemory, sources_size, "malloc", "sources"); + return NULL; + } + + /* Save all sources. */ + for (uint32_t i = 0; i < sources_count; ++i) { + lua_pushinteger(L, i + 1); + lua_gettable(L, idx); + + /* Extract a source from a Lua stack. */ + struct merger_source *source = luaT_check_merger_source(L, -1); + if (source == NULL) { + free(sources); + diag_set(IllegalParams, + "Unknown source type at index %d", i + 1); + return NULL; + } + sources[i] = source; + } + lua_pop(L, sources_count); + + *sources_count_ptr = sources_count; + return sources; +} + +/** + * Create a new merger and push it to a Lua stack as a merger + * source. + * + * Expect cdata, a table of sources and + * (optionally) a table of options on a Lua stack. + */ +static int +lbox_merger_new(struct lua_State *L) +{ + struct merger_context *ctx; + int top = lua_gettop(L); + bool ok = (top == 2 || top == 3) && + /* Merger context. */ + (ctx = luaT_check_merger_context(L, 1)) != NULL && + /* Sources. */ + lua_istable(L, 2) == 1 && + /* Opts. */ + (lua_isnoneornil(L, 3) == 1 || lua_istable(L, 3) == 1); + if (!ok) + return lbox_merger_new_usage(L, NULL); + + /* Options. */ + bool reverse = false; + + /* Parse options. */ + if (!lua_isnoneornil(L, 3)) { + /* Parse reverse. */ + lua_pushstring(L, "reverse"); + lua_gettable(L, 3); + if (!lua_isnil(L, -1)) { + if (lua_isboolean(L, -1)) + reverse = lua_toboolean(L, -1); + else + return lbox_merger_new_usage(L, "reverse"); + } + lua_pop(L, 1); + } + + struct merger_source *merger = merger_new(ctx); + if (merger == NULL) + return luaT_error(L); + + uint32_t sources_count = 0; + struct merger_source **sources = luaT_merger_new_parse_sources(L, 2, + &sources_count); + if (sources == NULL) { + merger->vtab->delete(merger); + return luaT_error(L); + } + + if (merger_set_sources(merger, sources, sources_count) != 0) { + free(sources); + merger->vtab->delete(merger); + return luaT_error(L); + } + free(sources); + merger_set_reverse(merger, reverse); + + merger_source_ref(merger); + *(struct merger_source **) + luaL_pushcdata(L, merger_source_type_id) = merger; + lua_pushcfunction(L, lbox_merger_source_gc); + luaL_setcdatagc(L, -2); + + return 1; +} + +/* }}} */ + +/* {{{ Buffer merger source */ + +struct merger_source_buffer { + struct merger_source base; + /* + * A reference to a Lua iterator to fetch a next chunk of + * tuples. + */ + struct luaL_iterator *fetch_it; + /* + * A reference a buffer with a current chunk of tuples. + * It is needed to prevent LuaJIT from collecting the + * buffer while the source consider it as the current + * one. + */ + int ref; + /* + * A buffer with a current chunk of tuples. + */ + struct ibuf *buf; + /* + * A merger stops before end of a buffer when it is not + * the last merger in the chain. + */ + size_t remaining_tuples_cnt; +}; + +/* Virtual methods declarations */ + +static void +luaL_merger_source_buffer_delete(struct merger_source *base); +static int +luaL_merger_source_buffer_next(struct merger_source *base, + box_tuple_format_t *format, + struct tuple **out); + +/* Non-virtual methods */ + +/** + * Create a new merger source of the buffer type. + * + * Reads gen, param, state from the top of a Lua stack. + * + * In case of an error it returns NULL and sets a diag. + */ +static struct merger_source * +luaL_merger_source_buffer_new(struct lua_State *L) +{ + static struct merger_source_vtab merger_source_buffer_vtab = { + .delete = luaL_merger_source_buffer_delete, + .next = luaL_merger_source_buffer_next, + }; + + struct merger_source_buffer *source = (struct merger_source_buffer *) + malloc(sizeof(struct merger_source_buffer)); + if (source == NULL) { + diag_set(OutOfMemory, sizeof(struct merger_source_buffer), + "malloc", "merger_source_buffer"); + return NULL; + } + + merger_source_new(&source->base, &merger_source_buffer_vtab); + + source->fetch_it = luaL_iterator_new(L, 0); + source->ref = 0; + source->buf = NULL; + source->remaining_tuples_cnt = 0; + + return &source->base; +} + +/** + * Call a user provided function to get a next data chunk (a + * buffer). + * + * Return 1 when a new buffer is received, 0 when a buffers + * iterator ends and -1 at error and set a diag. + */ +static int +luaL_merger_source_buffer_fetch(struct merger_source_buffer *source) +{ + struct lua_State *L = fiber()->storage.lua.stack; + int nresult = luaL_iterator_next(L, source->fetch_it); + + /* Handle a Lua error in a gen function. */ + if (nresult == -1) + return -1; + + /* No more data: do nothing. */ + if (nresult == 0) + return 0; + + /* Handle incorrect results count. */ + if (nresult != 2) { + diag_set(IllegalParams, "Expected , , got %d " + "return values", nresult); + return -1; + } + + /* Set a new buffer as the current chunk. */ + if (source->ref > 0) + luaL_unref(L, LUA_REGISTRYINDEX, source->ref); + lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */ + source->ref = luaL_ref(L, LUA_REGISTRYINDEX); + source->buf = luaT_check_ibuf(L, -1); + assert(source->buf != NULL); + lua_pop(L, nresult); + + /* Update remaining_tuples_cnt and skip the header. */ + if (decode_header(source->buf, &source->remaining_tuples_cnt) != 0) { + diag_set(IllegalParams, "Invalid merger source %p", + &source->base); + return -1; + } + return 1; +} + +/* Virtual methods */ + +static void +luaL_merger_source_buffer_delete(struct merger_source *base) +{ + struct merger_source_buffer *source = container_of(base, + struct merger_source_buffer, base); + + assert(source->fetch_it != NULL); + luaL_iterator_delete(source->fetch_it); + if (source->ref > 0) + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref); + + free(source); +} + +static int +luaL_merger_source_buffer_next(struct merger_source *base, + box_tuple_format_t *format, + struct tuple **out) +{ + struct merger_source_buffer *source = container_of(base, + struct merger_source_buffer, base); + + /* + * Handle the case when all data were processed: ask a + * next chunk until a non-empty chunk is received or a + * chunks iterator ends. + */ + while (source->remaining_tuples_cnt == 0) { + int rc = luaL_merger_source_buffer_fetch(source); + if (rc < 0) + return -1; + if (rc == 0) { + *out = NULL; + return 0; + } + } + if (ibuf_used(source->buf) == 0) { + diag_set(IllegalParams, "Unexpected msgpack buffer end"); + return -1; + } + const char *tuple_beg = source->buf->rpos; + const char *tuple_end = tuple_beg; + /* + * mp_next() is faster then mp_check(), but can read bytes + * outside of the buffer and so can cause segmentation + * faults or an incorrect result. + * + * We check buffer boundaries after the mp_next() call and + * throw an error when the boundaries are violated, but it + * does not save us from possible segmentation faults. + * + * It is in a user responsibility to provide valid + * msgpack. + */ + mp_next(&tuple_end); + --source->remaining_tuples_cnt; + if (tuple_end > source->buf->wpos) { + diag_set(IllegalParams, "Unexpected msgpack buffer end"); + return -1; + } + source->buf->rpos = (char *) tuple_end; + if (format == NULL) + format = box_tuple_format_default(); + struct tuple *tuple = box_tuple_new(format, tuple_beg, tuple_end); + if (tuple == NULL) + return -1; + + box_tuple_ref(tuple); + *out = tuple; + return 0; +} + +/* Lua functions */ + +/** + * Create a new buffer source and push it onto the Lua stack. + */ +static int +lbox_merger_new_buffer_source(struct lua_State *L) +{ + return lbox_merger_source_new(L, "merger.new_buffer_source", + luaL_merger_source_buffer_new); +} + +/* }}} */ + +/* {{{ Table merger source */ + +struct merger_source_table { + struct merger_source base; + /* + * A reference to a Lua iterator to fetch a next chunk of + * tuples. + */ + struct luaL_iterator *fetch_it; + /* + * A reference to a table with a current chunk of tuples. + */ + int ref; + /* An index of current tuples within a current chunk. */ + int next_idx; +}; + +/* Virtual methods declarations */ + +static void +luaL_merger_source_table_delete(struct merger_source *base); +static int +luaL_merger_source_table_next(struct merger_source *base, + box_tuple_format_t *format, + struct tuple **out); + +/* Non-virtual methods */ + +/** + * Create a new merger source of the table type. + * + * In case of an error it returns NULL and set a diag. + */ +static struct merger_source * +luaL_merger_source_table_new(struct lua_State *L) +{ + static struct merger_source_vtab merger_source_table_vtab = { + .delete = luaL_merger_source_table_delete, + .next = luaL_merger_source_table_next, + }; + + struct merger_source_table *source = (struct merger_source_table *) + malloc(sizeof(struct merger_source_table)); + if (source == NULL) { + diag_set(OutOfMemory, sizeof(struct merger_source_table), + "malloc", "merger_source_table"); + return NULL; + } + + merger_source_new(&source->base, &merger_source_table_vtab); + + source->fetch_it = luaL_iterator_new(L, 0); + source->ref = 0; + source->next_idx = 1; + + return &source->base; +} + +/** + * Call a user provided function to fill the source. + * + * Return 0 when a tables iterator ends, 1 when a new table is + * received and -1 at an error (set a diag). + */ +static int +luaL_merger_source_table_fetch(struct merger_source_table *source) +{ + struct lua_State *L = fiber()->storage.lua.stack; + int nresult = luaL_iterator_next(L, source->fetch_it); + + /* Handle a Lua error in a gen function. */ + if (nresult == -1) + return -1; + + /* No more data: do nothing. */ + if (nresult == 0) + return 0; + + /* Handle incorrect results count. */ + if (nresult != 2) { + diag_set(IllegalParams, "Expected , , got %d " + "return values", nresult); + return -1; + } + + /* Set a new table as the current chunk. */ + if (source->ref > 0) + luaL_unref(L, LUA_REGISTRYINDEX, source->ref); + lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */ + source->ref = luaL_ref(L, LUA_REGISTRYINDEX); + source->next_idx = 1; + lua_pop(L, nresult); + + return 1; +} + +/* Virtual methods */ + +static void +luaL_merger_source_table_delete(struct merger_source *base) +{ + struct merger_source_table *source = container_of(base, + struct merger_source_table, base); + + assert(source->fetch_it != NULL); + luaL_iterator_delete(source->fetch_it); + if (source->ref > 0) + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref); + + free(source); +} + +static int +luaL_merger_source_table_next(struct merger_source *base, + box_tuple_format_t *format, + struct tuple **out) +{ + struct lua_State *L = fiber()->storage.lua.stack; + struct merger_source_table *source = container_of(base, + struct merger_source_table, base); + + if (source->ref > 0) { + lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref); + lua_pushinteger(L, source->next_idx); + lua_gettable(L, -2); + } + /* + * If all data were processed, try to fetch more. + */ + while (source->ref == 0 || lua_isnil(L, -1)) { + if (source->ref > 0) + lua_pop(L, 2); + int rc = luaL_merger_source_table_fetch(source); + if (rc < 0) + return -1; + if (rc == 0) { + *out = NULL; + return 0; + } + /* + * Retry tuple extracting when a next table is + * received. + */ + lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref); + lua_pushinteger(L, source->next_idx); + lua_gettable(L, -2); + } + if (format == NULL) + format = box_tuple_format_default(); + struct tuple *tuple = luaT_tuple_new(L, -1, format); + if (tuple == NULL) + return -1; + ++source->next_idx; + lua_pop(L, 2); + + box_tuple_ref(tuple); + *out = tuple; + return 0; +} + +/* Lua functions */ + +/** + * Create a new table source and push it onto the Lua stack. + */ +static int +lbox_merger_new_table_source(struct lua_State *L) +{ + return lbox_merger_source_new(L, "merger.new_table_source", + luaL_merger_source_table_new); +} + +/* }}} */ + +/* {{{ Tuple merger source */ + +struct merger_source_tuple { + struct merger_source base; + /* + * A reference to a Lua iterator to fetch a next chunk of + * tuples. + */ + struct luaL_iterator *fetch_it; +}; + +/* Virtual methods declarations */ + +static void +luaL_merger_source_tuple_delete(struct merger_source *base); +static int +luaL_merger_source_tuple_next(struct merger_source *base, + box_tuple_format_t *format, + struct tuple **out); + +/* Non-virtual methods */ + +/** + * Create a new merger source of the tuple type. + * + * In case of an error it returns NULL and set a diag. + */ +static struct merger_source * +luaL_merger_source_tuple_new(struct lua_State *L) +{ + static struct merger_source_vtab merger_source_tuple_vtab = { + .delete = luaL_merger_source_tuple_delete, + .next = luaL_merger_source_tuple_next, + }; + + struct merger_source_tuple *source = + (struct merger_source_tuple *) malloc( + sizeof(struct merger_source_tuple)); + if (source == NULL) { + diag_set(OutOfMemory, sizeof(struct merger_source_tuple), + "malloc", "merger_source_tuple"); + return NULL; + } + + merger_source_new(&source->base, &merger_source_tuple_vtab); + + source->fetch_it = luaL_iterator_new(L, 0); + + return &source->base; +} + +/** + * Call a user provided function to fill the source. + * + * Return 1 at success and push a resulting tuple to a the Lua + * stack. + * Return 0 when no more data. + * Return -1 at error (set a diag). + */ +static int +luaL_merger_source_tuple_fetch(struct merger_source_tuple *source, + struct lua_State *L) +{ + int nresult = luaL_iterator_next(L, source->fetch_it); + + /* Handle a Lua error in a gen function. */ + if (nresult == -1) + return -1; + + /* No more data: do nothing. */ + if (nresult == 0) + return 0; + + /* Handle incorrect results count. */ + if (nresult != 2) { + diag_set(IllegalParams, "Expected , got %d " + "return values", nresult); + return -1; + } + + /* Set a new tuple as the current chunk. */ + lua_insert(L, -2); /* Swap state and tuple. */ + lua_pop(L, 1); /* Pop state. */ + + return 1; +} + +/* Virtual methods */ + +static void +luaL_merger_source_tuple_delete(struct merger_source *base) +{ + struct merger_source_tuple *source = container_of(base, + struct merger_source_tuple, base); + + assert(source->fetch_it != NULL); + luaL_iterator_delete(source->fetch_it); + + free(source); +} + +static int +luaL_merger_source_tuple_next(struct merger_source *base, + box_tuple_format_t *format, + struct tuple **out) +{ + struct lua_State *L = fiber()->storage.lua.stack; + struct merger_source_tuple *source = container_of(base, + struct merger_source_tuple, base); + + int rc = luaL_merger_source_tuple_fetch(source, L); + if (rc < 0) + return -1; + /* + * Check whether a tuple appears after the fetch. + */ + if (rc == 0) { + *out = NULL; + return 0; + } + + if (format == NULL) + format = box_tuple_format_default(); + struct tuple *tuple = luaT_tuple_new(L, -1, format); + if (tuple == NULL) + return -1; + lua_pop(L, 1); + + box_tuple_ref(tuple); + *out = tuple; + return 0; +} + +/* Lua functions */ + +/** + * Create a new tuple source and push it onto the Lua stack. + */ +static int +lbox_merger_new_tuple_source(struct lua_State *L) +{ + return lbox_merger_source_new(L, "merger.new_tuple_source", + luaL_merger_source_tuple_new); +} + +/* }}} */ + +/* {{{ Merger source Lua methods */ + +/** + * Iterator gen function to traverse source results. + * + * Expected a nil as the first parameter (param) and a + * merger_source as the second parameter (state) on a Lua stack. + * + * Push the original merger_source (as a new state) and a next + * tuple. + */ +static int +lbox_merger_source_gen(struct lua_State *L) +{ + struct merger_source *source; + bool ok = lua_gettop(L) == 2 && lua_isnil(L, 1) && + (source = luaT_check_merger_source(L, 2)) != NULL; + if (!ok) + return luaL_error(L, "Bad params, use: lbox_merger_source_gen(" + "nil, merger_source)"); + + struct tuple *tuple; + if (source->vtab->next(source, NULL, &tuple) != 0) + return luaT_error(L); + if (tuple == NULL) { + lua_pushnil(L); + lua_pushnil(L); + return 2; + } + + /* Push merger_source, tuple. */ + *(struct merger_source **) + luaL_pushcdata(L, merger_source_type_id) = source; + luaT_pushtuple(L, tuple); + + /* + * luaT_pushtuple() references the tuple, so we + * unreference it on merger's side. + */ + box_tuple_unref(tuple); + + return 2; +} + +/** + * Iterate over merger source results from Lua. + * + * Push three values to the Lua stack: + * + * 1. gen (lbox_merger_source_gen wrapped by fun.wrap()); + * 2. param (nil); + * 3. state (merger_source). + */ +static int +lbox_merger_source_ipairs(struct lua_State *L) +{ + struct merger_source *source; + bool ok = lua_gettop(L) == 1 && + (source = luaT_check_merger_source(L, 1)) != NULL; + if (!ok) + return luaL_error(L, "Usage: merger_source:ipairs()"); + /* Stack: merger_source. */ + + luaL_loadstring(L, "return require('fun').wrap"); + lua_call(L, 0, 1); + lua_insert(L, -2); /* Swap merger_source and wrap. */ + /* Stack: wrap, merger_source. */ + + lua_pushcfunction(L, lbox_merger_source_gen); + lua_insert(L, -2); /* Swap merger_source and gen. */ + /* Stack: wrap, gen, merger_source. */ + + /* + * Push nil as an iterator param, because all needed state + * is in a merger source. + */ + lua_pushnil(L); + /* Stack: wrap, gen, merger_source, nil. */ + + lua_insert(L, -2); /* Swap merger_source and nil. */ + /* Stack: wrap, gen, nil, merger_source. */ + + /* Call fun.wrap(gen, nil, merger_source). */ + lua_call(L, 3, 3); + return 3; +} + +/** + * Write source results into ibuf. + * + * It is the helper for lbox_merger_source_select(). + */ +static int +encode_result_buffer(struct lua_State *L, struct merger_source *source, + struct ibuf *obuf, uint32_t limit) +{ + uint32_t result_len = 0; + uint32_t result_len_offset = 4; + + /* + * Reserve maximum size for the array around resulting + * tuples to set it later. + */ + encode_header(obuf, UINT32_MAX); + + /* Fetch, merge and copy tuples to the buffer. */ + struct tuple *tuple; + int rc = 0; + while (result_len < limit && (rc = + source->vtab->next(source, NULL, &tuple)) == 0 && + tuple != NULL) { + uint32_t bsize = tuple->bsize; + ibuf_reserve(obuf, bsize); + memcpy(obuf->wpos, tuple_data(tuple), bsize); + obuf->wpos += bsize; + result_len_offset += bsize; + ++result_len; + + /* The received tuple is not more needed. */ + box_tuple_unref(tuple); + } + + if (rc != 0) + return luaT_error(L); + + /* Write the real array size. */ + mp_store_u32(obuf->wpos - result_len_offset, result_len); + + return 0; +} + +/** + * Write source results into a new Lua table. + * + * It is the helper for lbox_merger_source_select(). + */ +static int +create_result_table(struct lua_State *L, struct merger_source *source, + uint32_t limit) +{ + /* Create result table. */ + lua_newtable(L); + + uint32_t cur = 1; + + /* Fetch, merge and save tuples to the table. */ + struct tuple *tuple; + int rc = 0; + while (cur - 1 < limit && (rc = + source->vtab->next(source, NULL, &tuple)) == 0 && + tuple != NULL) { + luaT_pushtuple(L, tuple); + lua_rawseti(L, -2, cur); + ++cur; + + /* + * luaT_pushtuple() references the tuple, so we + * unreference it on merger's side. + */ + box_tuple_unref(tuple); + } + + if (rc != 0) + return luaT_error(L); + + return 1; +} + +/** + * Raise a Lua error with merger_inst:select() usage info. + */ +static int +lbox_merger_source_select_usage(struct lua_State *L, const char *param_name) +{ + static const char *usage = "merger_source:select([{" + "buffer = > or , " + "limit = or }])"; + if (param_name == NULL) + return luaL_error(L, "Bad params, use: %s", usage); + else + return luaL_error(L, "Bad param \"%s\", use: %s", param_name, + usage); +} + +/** + * Pull results of a merger source to a Lua stack. + * + * Write results into a buffer or a Lua table depending on + * options. + * + * Expected a merger source and options (optional) on a Lua stack. + * + * Return a Lua table or nothing when a 'buffer' option is + * provided. + */ +static int +lbox_merger_source_select(struct lua_State *L) +{ + struct merger_source *source; + int top = lua_gettop(L); + bool ok = (top == 1 || top == 2) && + /* Merger source. */ + (source = luaT_check_merger_source(L, 1)) != NULL && + /* Opts. */ + (lua_isnoneornil(L, 2) == 1 || lua_istable(L, 2) == 1); + if (!ok) + return lbox_merger_source_select_usage(L, NULL); + + uint32_t limit = 0xFFFFFFFF; + struct ibuf *obuf = NULL; + + /* Parse options. */ + if (!lua_isnoneornil(L, 2)) { + /* Parse buffer. */ + lua_pushstring(L, "buffer"); + lua_gettable(L, 2); + if (!lua_isnil(L, -1)) { + if ((obuf = luaT_check_ibuf(L, -1)) == NULL) + return lbox_merger_source_select_usage(L, + "buffer"); + } + lua_pop(L, 1); + + /* Parse limit. */ + lua_pushstring(L, "limit"); + lua_gettable(L, 2); + if (!lua_isnil(L, -1)) { + if (lua_isnumber(L, -1)) + limit = lua_tointeger(L, -1); + else + return lbox_merger_source_select_usage(L, + "limit"); + } + lua_pop(L, 1); + } + + if (obuf == NULL) + return create_result_table(L, source, limit); + else + return encode_result_buffer(L, source, obuf, limit); +} + +/* }}} */ + +/** + * Register the module. + */ +LUA_API int +luaopen_merger(struct lua_State *L) +{ + luaL_cdef(L, "struct merger_source;"); + luaL_cdef(L, "struct merger_context;"); + luaL_cdef(L, "struct ibuf;"); + + merger_source_type_id = luaL_ctypeid(L, "struct merger_source&"); + merger_context_type_id = luaL_ctypeid(L, "struct merger_context&"); + ibuf_type_id = luaL_ctypeid(L, "struct ibuf"); + + /* Export C functions to Lua. */ + static const struct luaL_Reg meta[] = { + {"new_buffer_source", lbox_merger_new_buffer_source}, + {"new_table_source", lbox_merger_new_table_source}, + {"new_tuple_source", lbox_merger_new_tuple_source}, + {"new", lbox_merger_new}, + {NULL, NULL} + }; + luaL_register_module(L, "merger", meta); + + /* Add context.new(). */ + lua_newtable(L); /* merger.context */ + lua_pushcfunction(L, lbox_merger_context_new); + lua_setfield(L, -2, "new"); + lua_setfield(L, -2, "context"); + + /* Add internal.{select,ipairs}(). */ + lua_newtable(L); /* merger.internal */ + lua_pushcfunction(L, lbox_merger_source_select); + lua_setfield(L, -2, "select"); + lua_pushcfunction(L, lbox_merger_source_ipairs); + lua_setfield(L, -2, "ipairs"); + lua_setfield(L, -2, "internal"); + + return 1; +} diff --git a/src/box/lua/merger.h b/src/box/lua/merger.h new file mode 100644 index 000000000..c3f648678 --- /dev/null +++ b/src/box/lua/merger.h @@ -0,0 +1,47 @@ +#ifndef TARANTOOL_BOX_LUA_MERGER_H_INCLUDED +#define TARANTOOL_BOX_LUA_MERGER_H_INCLUDED +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#if defined(__cplusplus) +extern "C" { +#endif /* defined(__cplusplus) */ + +struct lua_State; + +int +luaopen_merger(struct lua_State *L); + +#if defined(__cplusplus) +} /* extern "C" */ +#endif /* defined(__cplusplus) */ + +#endif /* TARANTOOL_BOX_LUA_MERGER_H_INCLUDED */ diff --git a/src/box/lua/merger.lua b/src/box/lua/merger.lua new file mode 100644 index 000000000..9bf7cc019 --- /dev/null +++ b/src/box/lua/merger.lua @@ -0,0 +1,41 @@ +local ffi = require('ffi') +local fun = require('fun') +local merger = require('merger') + +local ibuf_t = ffi.typeof('struct ibuf') +local merger_source_t = ffi.typeof('struct merger_source') + +-- Create a source from one buffer. +merger.new_source_frombuffer = function(buf) + local func_name = 'merger.new_source_frombuffer' + if type(buf) ~= 'cdata' or not ffi.istype(ibuf_t, buf) then + error(('Usage: %s(>)'):format(func_name), 0) + end + + return merger.new_buffer_source(fun.iter({buf})) +end + +-- Create a source from one table. +merger.new_source_fromtable = function(tbl) + local func_name = 'merger.new_source_fromtable' + if type(tbl) ~= 'table' then + error(('Usage: %s(
)'):format(func_name), 0) + end + + return merger.new_table_source(fun.iter({tbl})) +end + +local methods = { + ['select'] = merger.internal.select, + ['pairs'] = merger.internal.ipairs, + ['ipairs'] = merger.internal.ipairs, +} + +ffi.metatype(merger_source_t, { + __index = function(self, key) + return methods[key] + end, + -- Lua 5.2 compatibility + __pairs = merger.internal.ipairs, + __ipairs = merger.internal.ipairs, +}) diff --git a/test/box-tap/merger.test.lua b/test/box-tap/merger.test.lua new file mode 100755 index 000000000..1412c6126 --- /dev/null +++ b/test/box-tap/merger.test.lua @@ -0,0 +1,725 @@ +#!/usr/bin/env tarantool + +local tap = require('tap') +local buffer = require('buffer') +local msgpackffi = require('msgpackffi') +local digest = require('digest') +local key_def = require('key_def') +local merger = require('merger') +local fiber = require('fiber') +local utf8 = require('utf8') +local ffi = require('ffi') +local fun = require('fun') + +-- A chunk size for table and buffer sources. A chunk size for +-- tuple source is always 1. +local FETCH_BLOCK_SIZE = 10 + +local function merger_new_usage(param) + local msg = 'merger.new(merger_context, ' .. + '{source, source, ...}[, {' .. + 'reverse = or }])' + if not param then + return ('Bad params, use: %s'):format(msg) + else + return ('Bad param "%s", use: %s'):format(param, msg) + end +end + +local function merger_select_usage(param) + local msg = 'merger_source:select([{' .. + 'buffer = > or , ' .. + 'limit = or }])' + if not param then + return ('Bad params, use: %s'):format(msg) + else + return ('Bad param "%s", use: %s'):format(param, msg) + end +end + +-- Get buffer with data encoded without last 'trunc' bytes. +local function truncated_msgpack_buffer(data, trunc) + local data = msgpackffi.encode(data) + data = data:sub(1, data:len() - trunc) + local len = data:len() + local buf = buffer.ibuf() + -- Ensure we have enough buffer to write len + trunc bytes. + buf:reserve(len + trunc) + local p = buf:alloc(len) + -- Ensure len bytes follows with trunc zero bytes. + ffi.copy(p, data .. string.rep('\0', trunc), len + trunc) + return buf +end + +local function truncated_msgpack_source(data, trunc) + local buf = truncated_msgpack_buffer(data, trunc) + return merger.new_source_frombuffer(buf) +end + +local bad_source_new_calls = { + { + 'Bad fetch iterator', + funcs = {'new_buffer_source', 'new_table_source', + 'new_tuple_source'}, + params = {1}, + exp_err = '^Usage: merger%.[a-z_]+%(gen, param, state%)$', + }, + { + 'Bad chunk type', + funcs = {'new_source_frombuffer', 'new_source_fromtable'}, + params = {1}, + exp_err = '^Usage: merger%.[a-z_]+%(<.+>%)$', + }, + { + 'Bad buffer chunk', + funcs = {'new_source_frombuffer'}, + params = {ffi.new('char *')}, + exp_err = '^Usage: merger%.[a-z_]+%(>%)$', + }, +} + +local bad_merger_new_calls = { + { + 'Bad opts', + sources = {}, + opts = 1, + exp_err = merger_new_usage(nil), + }, + { + 'Bad opts.reverse', + sources = {}, + opts = {reverse = 1}, + exp_err = merger_new_usage('reverse'), + }, +} + +local bad_merger_select_calls = { + { + 'Wrong source of table type', + sources = {merger.new_source_fromtable({1})}, + opts = nil, + exp_err = 'A tuple or a table expected, got number', + }, + { + 'Bad msgpack source: wrong length of the tuples array', + -- Remove the last tuple from msgpack data, but keep old + -- tuples array size. + sources = { + truncated_msgpack_source({{''}, {''}, {''}}, 2), + }, + opts = {}, + exp_err = 'Unexpected msgpack buffer end', + }, + { + 'Bad msgpack source: wrong length of a tuple', + -- Remove half of the last tuple, but keep old tuple size. + sources = { + truncated_msgpack_source({{''}, {''}, {''}}, 1), + }, + opts = {}, + exp_err = 'Unexpected msgpack buffer end', + }, + { + 'Bad opts.buffer (wrong type)', + sources = {}, + opts = {buffer = 1}, + exp_err = merger_select_usage('buffer'), + }, + { + 'Bad opts.buffer (wrong cdata type)', + sources = {}, + opts = {buffer = ffi.new('char *')}, + exp_err = merger_select_usage('buffer'), + }, + { + 'Bad opts.limit (wrong type)', + sources = {}, + opts = {limit = 'hello'}, + exp_err = merger_select_usage('limit'), + } +} + +local schemas = { + { + name = 'small_unsigned', + parts = { + { + fieldno = 2, + type = 'unsigned', + } + }, + gen_tuple = function(tupleno) + return {'id_' .. tostring(tupleno), tupleno} + end, + }, + -- Test with N-1 equal parts and Nth different. + { + name = 'many_parts', + parts = (function() + local parts = {} + for i = 1, 16 do + parts[i] = { + fieldno = i, + type = 'unsigned', + } + end + return parts + end)(), + gen_tuple = function(tupleno) + local tuple = {} + -- 15 constant parts + for i = 1, 15 do + tuple[i] = i + end + -- 16th part is varying + tuple[16] = tupleno + return tuple + end, + -- reduce tuples count to decrease test run time + tuples_cnt = 16, + }, + -- Test null value in nullable field of an index. + { + name = 'nullable', + parts = { + { + fieldno = 1, + type = 'unsigned', + }, + { + fieldno = 2, + type = 'string', + is_nullable = true, + }, + }, + gen_tuple = function(i) + if i % 1 == 1 then + return {0, tostring(i)} + else + return {0, box.NULL} + end + end, + }, + -- Test index part with 'collation_id' option (as in net.box's + -- response). + { + name = 'collation_id', + parts = { + { + fieldno = 1, + type = 'string', + collation_id = 2, -- unicode_ci + }, + }, + gen_tuple = function(i) + local letters = {'a', 'b', 'c', 'A', 'B', 'C'} + if i <= #letters then + return {letters[i]} + else + return {''} + end + end, + }, + -- Test index part with 'collation' option (as in local index + -- parts). + { + name = 'collation', + parts = { + { + fieldno = 1, + type = 'string', + collation = 'unicode_ci', + }, + }, + gen_tuple = function(i) + local letters = {'a', 'b', 'c', 'A', 'B', 'C'} + if i <= #letters then + return {letters[i]} + else + return {''} + end + end, + }, +} + +local function is_unicode_ci_part(part) + return part.collation_id == 2 or part.collation == 'unicode_ci' +end + +local function tuple_comparator(a, b, parts) + for _, part in ipairs(parts) do + local fieldno = part.fieldno + if a[fieldno] ~= b[fieldno] then + if a[fieldno] == nil then + return -1 + end + if b[fieldno] == nil then + return 1 + end + if is_unicode_ci_part(part) then + return utf8.casecmp(a[fieldno], b[fieldno]) + end + return a[fieldno] < b[fieldno] and -1 or 1 + end + end + + return 0 +end + +local function sort_tuples(tuples, parts, opts) + local function tuple_comparator_wrapper(a, b) + local cmp = tuple_comparator(a, b, parts) + if cmp < 0 then + return not opts.reverse + elseif cmp > 0 then + return opts.reverse + else + return false + end + end + + table.sort(tuples, tuple_comparator_wrapper) +end + +local function lowercase_unicode_ci_fields(tuples, parts) + for i = 1, #tuples do + local tuple = tuples[i] + for _, part in ipairs(parts) do + if is_unicode_ci_part(part) then + -- Workaround #3709. + if tuple[part.fieldno]:len() > 0 then + tuple[part.fieldno] = utf8.lower(tuple[part.fieldno]) + end + end + end + end +end + +local function fetch_source_gen(param, state) + local input_type = param.input_type + local tuples = param.tuples + local last_pos = state.last_pos + local fetch_block_size = FETCH_BLOCK_SIZE + -- A chunk size is always 1 for a tuple source. + if input_type == 'tuple' then + fetch_block_size = 1 + end + local data = fun.iter(tuples):drop(last_pos):take( + fetch_block_size):totable() + if #data == 0 then + return + end + local new_state = {last_pos = last_pos + #data} + if input_type == 'table' then + return new_state, data + elseif input_type == 'buffer' then + local buf = buffer.ibuf() + msgpackffi.internal.encode_r(buf, data, 0) + return new_state, buf + elseif input_type == 'tuple' then + assert(#data <= 1) + if #data == 0 then return end + return new_state, data[1] + else + assert(false) + end +end + +local function fetch_source_iterator(input_type, tuples) + local param = { + input_type = input_type, + tuples = tuples, + } + local state = { + last_pos = 0, + } + return fetch_source_gen, param, state +end + +local function prepare_data(schema, tuples_cnt, sources_cnt, opts) + local opts = opts or {} + local input_type = opts.input_type + local use_table_as_tuple = opts.use_table_as_tuple + local use_fetch_source = opts.use_fetch_source + + local tuples = {} + local exp_result = {} + + -- Ensure empty sources are empty table and not nil. + for i = 1, sources_cnt do + if tuples[i] == nil then + tuples[i] = {} + end + end + + -- Prepare N tables with tuples as input for merger. + for i = 1, tuples_cnt do + -- [1, sources_cnt] + local guava = digest.guava(i, sources_cnt) + 1 + local tuple = schema.gen_tuple(i) + table.insert(exp_result, tuple) + if not use_table_as_tuple then + assert(input_type ~= 'buffer') + tuple = box.tuple.new(tuple) + end + table.insert(tuples[guava], tuple) + end + + -- Sort tuples within each source. + for _, source_tuples in pairs(tuples) do + sort_tuples(source_tuples, schema.parts, opts) + end + + -- Sort expected result. + sort_tuples(exp_result, schema.parts, opts) + + -- Fill sources. + local sources + if use_fetch_source then + sources = {} + for i = 1, sources_cnt do + local func = ('new_%s_source'):format(input_type) + sources[i] = merger[func](fetch_source_iterator(input_type, + tuples[i])) + end + elseif input_type == 'table' then + -- Imitate netbox's select w/o {buffer = ...}. + sources = {} + for i = 1, sources_cnt do + sources[i] = merger.new_source_fromtable(tuples[i]) + end + elseif input_type == 'buffer' then + -- Imitate netbox's select with {buffer = ...}. + sources = {} + for i = 1, sources_cnt do + local buf = buffer.ibuf() + sources[i] = merger.new_source_frombuffer(buf) + msgpackffi.internal.encode_r(buf, tuples[i], 0) + end + elseif input_type == 'tuple' then + assert(false) + else + assert(false) + end + + return sources, exp_result +end + +local function test_case_opts_str(opts) + local params = {} + + if opts.input_type then + table.insert(params, 'input_type: ' .. opts.input_type) + end + + if opts.output_type then + table.insert(params, 'output_type: ' .. opts.output_type) + end + + if opts.reverse then + table.insert(params, 'reverse') + end + + if opts.use_table_as_tuple then + table.insert(params, 'use_table_as_tuple') + end + + if opts.use_fetch_source then + table.insert(params, 'use_fetch_source') + end + + if next(params) == nil then + return '' + end + + return (' (%s)'):format(table.concat(params, ', ')) +end + +local function run_merger(test, schema, tuples_cnt, sources_cnt, opts) + fiber.yield() + + local opts = opts or {} + + -- Prepare data. + local sources, exp_result = prepare_data(schema, tuples_cnt, sources_cnt, + opts) + + -- Create a merger instance. + local merger_inst = merger.new(schema.merger_context, sources, + {reverse = opts.reverse}) + + local res + + -- Run merger and prepare output for compare. + if opts.output_type == 'table' then + -- Table output. + res = merger_inst:select() + elseif opts.output_type == 'buffer' then + -- Buffer output. + local obuf = buffer.ibuf() + merger_inst:select({buffer = obuf}) + res = msgpackffi.decode(obuf.rpos) + else + -- Tuple output. + assert(opts.output_type == 'tuple') + res = merger_inst:pairs():totable() + end + + -- A bit more postprocessing to compare. + for i = 1, #res do + if type(res[i]) ~= 'table' then + res[i] = res[i]:totable() + end + end + + -- unicode_ci does not differentiate btw 'A' and 'a', so the + -- order is arbitrary. We transform fields with unicode_ci + -- collation in parts to lower case before comparing. + lowercase_unicode_ci_fields(res, schema.parts) + lowercase_unicode_ci_fields(exp_result, schema.parts) + + test:is_deeply(res, exp_result, + ('check order on %3d tuples in %4d sources%s') + :format(tuples_cnt, sources_cnt, test_case_opts_str(opts))) +end + +local function run_case(test, schema, opts) + local opts = opts or {} + + local case_name = ('testing on schema %s%s'):format( + schema.name, test_case_opts_str(opts)) + local tuples_cnt = schema.tuples_cnt or 100 + + local input_type = opts.input_type + local use_table_as_tuple = opts.use_table_as_tuple + local use_fetch_source = opts.use_fetch_source + + -- Skip meaningless flags combinations. + if input_type == 'buffer' and not use_table_as_tuple then + return + end + if input_type == 'tuple' and not use_fetch_source then + return + end + + test:test(case_name, function(test) + test:plan(4) + + -- Check with small buffers count. + run_merger(test, schema, tuples_cnt, 1, opts) + run_merger(test, schema, tuples_cnt, 2, opts) + run_merger(test, schema, tuples_cnt, 5, opts) + + -- Check more buffers then tuples count. + run_merger(test, schema, tuples_cnt, 128, opts) + end) +end + +local test = tap.test('merger') +test:plan(#bad_source_new_calls + #bad_merger_new_calls + + #bad_merger_select_calls + 6 + #schemas * 48) + +-- For collations. +box.cfg{} + +for _, case in ipairs(bad_source_new_calls) do + test:test(case[1], function(test) + local funcs = case.funcs + test:plan(#funcs) + for _, func in ipairs(funcs) do + local ok, err = pcall(merger[func], unpack(case.params)) + test:ok(ok == false and err:match(case.exp_err), func) + end + end) +end + +-- Create the merger context for the test cases below. +local ctx = merger.context.new(key_def.new({{ + fieldno = 1, + type = 'string', +}})) + +-- Bad merger.new() calls. +for _, case in ipairs(bad_merger_new_calls) do + local ok, err = pcall(merger.new, ctx, case.sources, case.opts) + err = tostring(err) -- cdata -> string + test:is_deeply({ok, err}, {false, case.exp_err}, case[1]) +end + +-- Bad source or/and opts parameters for merger's methods. +for _, case in ipairs(bad_merger_select_calls) do + local merger_inst = merger.new(ctx, case.sources) + local ok, err = pcall(merger_inst.select, merger_inst, case.opts) + err = tostring(err) -- cdata -> string + test:is_deeply({ok, err}, {false, case.exp_err}, case[1]) +end + +-- Create a merger context for each schema. +for _, schema in ipairs(schemas) do + schema.merger_context = merger.context.new(key_def.new(schema.parts)) +end + +test:test('use a source in two mergers', function(test) + test:plan(5) + + local data = {{'a'}, {'b'}, {'c'}} + local source = merger.new_source_fromtable(data) + local i1 = merger.new(ctx, {source}):pairs() + local i2 = merger.new(ctx, {source}):pairs() + + local t1 = i1:head():totable() + test:is_deeply(t1, data[1], 'tuple 1 from merger 1') + + local t3 = i2:head():totable() + test:is_deeply(t3, data[3], 'tuple 3 from merger 2') + + local t2 = i1:head():totable() + test:is_deeply(t2, data[2], 'tuple 2 from merger 1') + + test:ok(i1:is_null(), 'merger 1 ends') + test:ok(i2:is_null(), 'merger 2 ends') +end) + +local function reusable_source_gen(param) + local chunks = param.chunks + local idx = param.idx or 1 + + if idx > table.maxn(chunks) then + return + end + + local chunk = chunks[idx] + param.idx = idx + 1 + + if chunk == nil then + return + end + return box.NULL, chunk +end + +local function verify_reusable_source(test, source) + test:plan(3) + + local exp = {{1}, {2}} + local res = source:pairs():map(box.tuple.totable):totable() + test:is_deeply(res, exp, '1st use') + + local exp = {{3}, {4}, {5}} + local res = source:pairs():map(box.tuple.totable):totable() + test:is_deeply(res, exp, '2nd use') + + local exp = {} + local res = source:pairs():map(box.tuple.totable):totable() + test:is_deeply(res, exp, 'end') +end + +test:test('reuse a tuple source', function(test) + local tuples = {{1}, {2}, nil, {3}, {4}, {5}} + local source = merger.new_tuple_source(reusable_source_gen, + {chunks = tuples}) + verify_reusable_source(test, source) +end) + +test:test('reuse a table source', function(test) + local chunks = {{{1}}, {{2}}, {}, nil, {{3}}, {{4}}, {}, {{5}}} + local source = merger.new_table_source(reusable_source_gen, + {chunks = chunks}) + verify_reusable_source(test, source) +end) + +test:test('reuse a buffer source', function(test) + local chunks_tbl = {{{1}}, {{2}}, {}, nil, {{3}}, {{4}}, {}, {{5}}} + local chunks = {} + for i = 1, table.maxn(chunks_tbl) do + if chunks_tbl[i] == nil then + chunks[i] = nil + else + chunks[i] = buffer.ibuf() + msgpackffi.internal.encode_r(chunks[i], chunks_tbl[i], 0) + end + end + local source = merger.new_buffer_source(reusable_source_gen, + {chunks = chunks}) + verify_reusable_source(test, source) +end) + +test:test('use limit', function(test) + test:plan(6) + + local data = {{'a'}, {'b'}} + + local source = merger.new_source_fromtable(data) + local m = merger.new(ctx, {source}) + local res = m:select({limit = 0}) + test:is(#res, 0, 'table output with limit 0') + + local source = merger.new_source_fromtable(data) + local m = merger.new(ctx, {source}) + local res = m:select({limit = 1}) + test:is(#res, 1, 'table output with limit 1') + test:is_deeply(res[1]:totable(), data[1], 'tuple content') + + local source = merger.new_source_fromtable(data) + local m = merger.new(ctx, {source}) + local obuf = buffer.ibuf() + m:select({buffer = obuf, limit = 0}) + local res = msgpackffi.decode(obuf.rpos) + test:is(#res, 0, 'buffer output with limit 0') + + local source = merger.new_source_fromtable(data) + local m = merger.new(ctx, {source}) + obuf:recycle() + m:select({buffer = obuf, limit = 1}) + local res = msgpackffi.decode(obuf.rpos) + test:is(#res, 1, 'buffer output with limit 1') + test:is_deeply(res[1], data[1], 'tuple content') +end) + +test:test('cascade mergers', function(test) + test:plan(2) + + local data = {{'a'}, {'b'}} + + local source = merger.new_source_fromtable(data) + local m1 = merger.new(ctx, {source}) + local m2 = merger.new(ctx, {m1}) + + local res = m2:pairs():map(box.tuple.totable):totable() + test:is_deeply(res, data, 'same context') + + local ctx_unicode = merger.context.new(key_def.new({{ + fieldno = 1, + type = 'string', + collation = 'unicode', + }})) + + local source = merger.new_source_fromtable(data) + local m1 = merger.new(ctx, {source}) + local m2 = merger.new(ctx_unicode, {m1}) + + local res = m2:pairs():map(box.tuple.totable):totable() + test:is_deeply(res, data, 'different contexts') +end) + +-- Merging cases. +for _, input_type in ipairs({'buffer', 'table', 'tuple'}) do + for _, output_type in ipairs({'buffer', 'table', 'tuple'}) do + for _, reverse in ipairs({false, true}) do + for _, use_table_as_tuple in ipairs({false, true}) do + for _, use_fetch_source in ipairs({false, true}) do + for _, schema in ipairs(schemas) do + run_case(test, schema, { + input_type = input_type, + output_type = output_type, + reverse = reverse, + use_table_as_tuple = use_table_as_tuple, + use_fetch_source = use_fetch_source, + }) + end + end + end + end + end +end + +os.exit(test:check() and 0 or 1) -- 2.20.1