From: Alexander Turenko <alexander.turenko@tarantool.org> To: Vladimir Davydov <vdavydov.dev@gmail.com> Cc: Alexander Turenko <alexander.turenko@tarantool.org>, tarantool-patches@freelists.org Subject: [PATCH v2 6/6] Add merger for tuple streams Date: Wed, 9 Jan 2019 23:20:14 +0300 [thread overview] Message-ID: <7c1abeb46357d3495c86bc5287d7dafc3c9c587b.1547064388.git.alexander.turenko@tarantool.org> (raw) In-Reply-To: <cover.1547064388.git.alexander.turenko@tarantool.org> Fixes #3276. @TarantoolBot document Title: Merger for tuple streams ## API and basic usage The following example demonstrates API of the module: ```lua local msgpack = require('msgpack') local net_box = require('net.box') local buffer = require('buffer') local merger = require('merger') -- The format of key_parts parameter is the same as -- `{box,conn}.space.<...>.index.<...>.parts` (where conn is -- net.box connection). local key_parts = { { fieldno = <number>, type = <string>, [ is_nullable = <boolean>, ] [ collation_id = <number>, ] [ collation = <string>, ] }, ... } -- Create the merger context. local ctx = merger.context.new(key_parts) -- Optional parameters. local opts = { -- Output buffer, only for merger.select(ctx, <...>). [ buffer = <buffer>, ] -- Ascending (default) or descending result order. [ descending = <boolean>, ] -- Optional callback to fetch more data. [ fetch_source = <function>, ] } -- Prepare buffer source. local conn = net_box.connect('localhost:3301') local buf = buffer.ibuf() conn.space.s:select(nil, {buffer = buf}) -- read to buffer buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)) -- We have three sources here. local sources = { buf, -- buffer source box.space.s:select(), -- table source {box.space.s:pairs()}, -- iterator source } -- Read the whole result at once. local res = merger.select(ctx, sources, opts) -- Read the result tuple per tuple. local res = {} for _, tuple in merger.pairs(ctx, sources, opts) do -- Some stop merge condition. if tuple[1] > MAX_VALUE then break end table.insert(res, tuple) end -- The same in the functional style. local function cond(tuple) return tuple[1] <= MAX_VALUE end local res = merger.pairs(ctx, sources, opts):take(cond):totable() ``` The basic case of using merger is when there are M storages and data are partitioned (sharded) across them. A client want to fetch the data (tuples stream) from each storage and merge them into one tuple stream: ```lua local msgpack = require('msgpack') local net_box = require('net.box') local buffer = require('buffer') local merger = require('merger') -- Prepare M sources. local net_box_opts = {reconnect_after = 0.1} local connects = { net_box.connect('localhost:3301', net_box_opts), net_box.connect('localhost:3302', net_box_opts), ... net_box.connect('localhost:<...>', net_box_opts), } local sources = {} for _, conn in ipairs(connects) do local buf = buffer.ibuf() conn.space.<...>.index.<...>:select(<...>, {buffer = buf}) buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)) table.insert(sources, buf) end -- See the 'Notes...' section below. local key_parts = {} local space = connects[1].space.<...> local index = space.index.<...> for _, part in ipairs(index.parts) do table.insert(key_parts, part) end if not index.unique then for _, part in ipairs(space.index[0]) do table.insert(key_parts, part) end end -- Create the merger context. local ctx = merger.context.new(key_parts) -- Merge. local res = merger.select(ctx, sources) ``` ## Notes re source sorting and key parts The merger expects that each source tuples stream is sorted according to provided key parts and perform a kind of merge sort (choose minimal / maximal tuple across sources on each step). Tuples from select() from Tarantool's space are sorted according to key parts from index that was used. When secondary non-unique index is used tuples are sorted according to the key parts of the secondary index and, then, key parts of the primary index. ## Preparing buffers We'll use the symbol T below to represent an msgpack array that corresponds to a tuple. A select response has the following structure: `{[48] = {T, T, ...}}`, while a call response is `{[48] = {{T, T, ...}}}` (because it should support multiple return values). A user should skip extra headers and pass a buffer with the read position on `{T, T, ...}` to merger. Typical headers are the following: Cases | Buffer structure ---------------- | ---------------- raw data | {T, T, ...} net.box select | {[48] = {T, T, ...}} net.box call | {[48] = {{T, T, ...}}} The example how to skip iproto_data (`{[48] = ...}`) and array headers and obtain raw data in a buffer is [here](XXX). XXX: link the example from 'net.box: helpers to decode msgpack headers' docbot request. How to check buffer data structure myself: ```lua #!usr/bin/env tarantool local net_box = require('net.box') local buffer = require('buffer') local ffi = require('ffi') local msgpack = require('msgpack') local yaml = require('yaml') box.cfg{listen = 3301} box.once('load_data', function() box.schema.user.grant('guest', 'read,write,execute', 'universe') box.schema.space.create('s') box.space.s:create_index('pk') box.space.s:insert({1}) box.space.s:insert({2}) box.space.s:insert({3}) box.space.s:insert({4}) end) local function foo() return box.space.s:select() end _G.foo = foo local conn = net_box.connect('localhost:3301') local buf = buffer.ibuf() conn.space.s:select(nil, {buffer = buf}) local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos) local buf_lua = msgpack.decode(buf_str) print('select:\n' .. yaml.encode(buf_lua)) local buf = buffer.ibuf() conn:call('foo', nil, {buffer = buf}) local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos) local buf_lua = msgpack.decode(buf_str) print('call:\n' .. yaml.encode(buf_lua)) os.exit() ``` A user can use its own shape of the result under net.box call wrapper, so additional actions could be necessary before pass a buffer to merger as the source. See the 'merge multiplexed requests' section for the example. ## Chunked data transfer The merger can ask for further data for a drained source using `fetch_source` callback with the following signature: ```lua fetch_source = function(source, last_tuple, processed) <...> end ``` If this callback is provided the merger will invoke it when a buffer or a table source reaches the end (but it doesn't called for an iterator source). If the new data become available after the call, the merger will use the new data or will consider the source entirely drained otherwise. `fetch_source` should update provided buffer in case of a buffer source or return a new table in case of a table source. An empty buffer, a buffer with zero tuples count, an empty/nil table are considered as stoppers: the callback will not called anymore. `source` is the table with the following fields: - `source.idx` is one-based index of the source; - `source.type` is a string: 'buffer' or 'table'; - `source.buffer` is a cdata<struct ibuf> or nil; - `source.table` is a previous table or nil. `last_tuple` is a last tuple was fetched from the source (can be nil), `processed` is a count of tuples were extracted from this source (over all previous iterations). If no data are available in a source when the merge starts it will call the callback. `last_tuple` will be `nil` in the case, `processed` will be 0. This allows to just define the `fetch_source` callback and don't fill buffers / tables before start. When using `is_async = true` net.box option one can lean on the fact that net.box writes an answer w/o yield: partial result cannot be observed. The following example fetches a data from two storages in chunks, the requests are performed from the `fetch_source` callback. The first request uses ALL iterator and BLOCK_SIZE limit, the following ones use GT iterator (with a key extracted from the last fetched tuple) and the same limit. Note: such way to implement a cursor / a pagination will work smoothly only with unique indexes. See also #3898. More complex scenarious are possible: using futures (`is_async = true` parameters of net.box methods) to fetch a next chunk while merge a current one or, say, call a function with several return values (some of them need to be skipped manually in the callback to let merger read tuples). ```lua -- Storage script -- -------------- box.cfg({<...>}) box.schema.space.create('s') box.space.s:create_index('pk') if instance_name == 'storage_1' then box.space.s:insert({1, 'one'}) box.space.s:insert({3, 'three'}) box.space.s:insert({5, 'five'}) box.space.s:insert({7, 'seven'}) box.space.s:insert({9, 'nine'}) else box.space.s:insert({2, 'two'}) box.space.s:insert({4, 'four'}) box.space.s:insert({6, 'six'}) box.space.s:insert({8, 'eight'}) box.space.s:insert({10, 'ten'}) end box.schema.user.grant('guest', 'read', 'space', 's') box.cfg({listen = <...>}) -- Client script -- ------------- <...requires...> local BLOCK_SIZE = 2 local function key_from_tuple(tuple, key_parts) local key = {} for _, part in ipairs(key_parts) do table.insert(key, tuple[part.fieldno] or box.NULL) end return key end local function gen_fetch_source(conns, key_parts) return function(source, last_tuple, _) local conn = conns[source.idx] local buf = source.buffer local opts = { limit = BLOCK_SIZE, buffer = buf, } -- the first request: ALL iterator + limit if last_tuple == nil then conn.space.s:select(nil, opts) buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)) return end -- subsequent requests: GT iterator + limit local key = key_from_tuple(last_tuple, key_parts) opts.iterator = box.index.GT conn.space.s:select(key, opts) buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)) end end local conns = <...> local buffers = <...> local key_parts = conns[1].space.s.index.pk.parts local ctx = merger.context.new(key_parts) local fetch_source = gen_fetch_source(conns, key_parts) local res = merger.select(ctx, buffers, {fetch_source = fetch_source}) print(yaml.encode(res)) os.exit() ``` ## Merge multiplexed requests Consider the case when a network latency between storage machines and frontend machine(s) is much larger then time to process a request on the frontend. This situation is typical when a workload consists of many amount of light requests. So it could be worth to 'multiplex' different requests to storage machines within one network request. We'll consider approach when a storage function returns many box.space.<...>:select(<...>) results instead of one. One need to skip iproto_data packet header, two array headers and then run merger N times on the same buffers (with the same or different contexts). No extra data copies, no tuples decoding into the Lua memory. ```lua -- Storage script -- -------------- -- Return N results in a table. -- Each result is table of tuples. local function batch_select(<...>) local res = {} for i = 1, N do local tuples = box.space.<...>:select(<...>) table.insert(res, tuples) end return res end -- Expose to call it using net.box. _G.batch_select = batch_select -- Client script -- ------------- local net_box = require('net.box') local buffer = require('buffer') local merger = require('merger') -- Prepare M sources. local connects = <...> local sources = {} for _, conn in ipairs(connects) do local buf = buffer.ibuf() conn:call('batch_select', <...>, {buffer = buf}) buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)) buf.rpos = assert(msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, 1)) buf.rpos = assert(msgpack.check_array(buf.rpos, buf.wpos - buf.rpos)) table.insert(sources, buf) end -- Now we have M sources and each have N results. We want to -- merge all 1st results, all 2nd results, ..., all Nth -- results. local ctx = merger.context.new(<...>) local res = {} for i = 1, N do -- We use the same merger instance for each merge, but it -- is possible to use different ones. local tuples = merger.select(ctx, sources) table.insert(res, tuples) end ``` The result of these N merges can be written into a buffer (using the `buffer` option) and this buffer can be used as the source in N merges of the next level (see cascading mergers below for the idea). ## Cascading mergers The idea is simple: the merger output formats are the same as source formats, so it is possible to merge results of previous merges. The example below is synthetic to be simple. Real cases when cascading can be profitable likely involve additional layers of Tarantool instances between storages and clients or separate threads to merge blocks of each level. To be honest no one use this ability for now. It exists, because the same input and output formats looks as good property of the API. ```lua <...requires...> local sources = <...100 buffers...> local ctx = merger.context.new(<...>) -- We use buffer sources at 1st and 2nd merge layers, but read -- the final result as the table. local sources_level_2 = {} for i = 1, 10 do -- Take next 10 first level sources. local sources_level_1 = {} for j = 1, 10 do sources_level_1[j] = sources[(i - 1) * 10 + j] end -- Merge 10 sources into a second level source. local result_level_1 = buffer.ibuf() merger.select(ctx, sources_level_1, {buffer = result_level_1}) sources_level_2[i] = result_level_1 end local res = merger.select(ctx, sources_level_2) ``` --- src/box/CMakeLists.txt | 1 + src/box/lua/init.c | 3 + src/box/lua/merger.c | 1402 ++++++++++++++++++++++++++++++++++ src/box/lua/merger.h | 47 ++ test/box-tap/merger.test.lua | 558 ++++++++++++++ test/box-tap/suite.ini | 1 + 6 files changed, 2012 insertions(+) create mode 100644 src/box/lua/merger.c create mode 100644 src/box/lua/merger.h create mode 100755 test/box-tap/merger.test.lua diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index 0db093768..b6a60a618 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -140,6 +140,7 @@ add_library(box STATIC lua/xlog.c lua/sql.c lua/key_def.c + lua/merger.c ${bin_sources}) target_link_libraries(box box_error tuple stat xrow xlog vclock crc32 scramble diff --git a/src/box/lua/init.c b/src/box/lua/init.c index 0e90f6be5..c08bcc288 100644 --- a/src/box/lua/init.c +++ b/src/box/lua/init.c @@ -59,6 +59,7 @@ #include "box/lua/console.h" #include "box/lua/tuple.h" #include "box/lua/sql.h" +#include "box/lua/merger.h" extern char session_lua[], tuple_lua[], @@ -312,6 +313,8 @@ box_lua_init(struct lua_State *L) lua_pop(L, 1); tarantool_lua_console_init(L); lua_pop(L, 1); + luaopen_merger(L); + lua_pop(L, 1); /* Load Lua extension */ for (const char **s = lua_sources; *s; s += 2) { diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c new file mode 100644 index 000000000..cac6918d4 --- /dev/null +++ b/src/box/lua/merger.c @@ -0,0 +1,1402 @@ +/* + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include "box/lua/merger.h" + +#include <string.h> +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> + +#include <lua.h> +#include <lauxlib.h> + +#include "lua/error.h" +#include "lua/utils.h" +#include "small/ibuf.h" +#include "msgpuck.h" +#include "mpstream.h" +#include "lua/msgpack.h" + +#define HEAP_FORWARD_DECLARATION +#include "salad/heap.h" + +#include "box/field_def.h" +#include "box/key_def.h" +#include "box/lua/key_def.h" +#include "box/schema_def.h" +#include "box/tuple.h" +#include "box/lua/tuple.h" +#include "box/box.h" +#include "box/index.h" +#include "diag.h" + +static bool +source_less(const heap_t *heap, const struct heap_node *a, + const struct heap_node *b); +#define HEAP_NAME merger_heap +#define HEAP_LESS source_less +#include "salad/heap.h" + +static uint32_t merger_context_type_id = 0; +static uint32_t merger_state_type_id = 0; +static uint32_t ibuf_type_id = 0; + +/* {{{ Merger structures */ + +struct merger_source; +struct merger_context; +struct merger_state; + +struct merger_source_vtab { + /** + * Free the merger source. + * + * We need to know Lua state here, because sources of + * table and iterator types are saved as references within + * the Lua state. + */ + void (*delete)(struct merger_source *base, struct lua_State *L); + /** + * Update source->tuple of specific source. + * + * Increases the reference counter of the tuple. + * + * Return 0 when successfully fetched a tuple or NULL. In + * case of an error push an error message to the Lua stack + * and return 1. + */ + int (*next)(struct merger_source *base, box_tuple_format_t *format, + const struct merger_state *state, struct lua_State *L); +}; + +/** + * Base (abstract) structure to represent a merge source state. + * Concrete implementations are in box/lua/merger.c. + */ +struct merger_source { + /* Source-specific methods. */ + struct merger_source_vtab *vtab; + /* Ordinal number of the source. */ + int idx; + /* How huch tuples were used from this source. */ + uint32_t processed; + /* Next tuple. */ + struct tuple *tuple; + /* + * A source is the heap node. Compared by the next tuple. + */ + struct heap_node hnode; +}; + +/** + * Holds immutable parameters of a merger. + */ +struct merger_context { + struct key_def *key_def; + box_tuple_format_t *format; +}; + +/** + * Holds parameters of merge process, sources, result storage + * (if any), heap of sources and utility flags / counters. + */ +struct merger_state { + /* Heap of sources. */ + heap_t heap; + /* + * Copy of key_def from merger_context. + * + * A merger_context can be collected by LuaJIT GC + * independently from a merger_state, so we need either + * copy key_def or implement reference counting for + * merger_context and save the pointer. + * + * key_def is needed in source_less(), where merger_state + * is known, but merger_context is not. + */ + struct key_def *key_def; + /* Parsed sources. */ + uint32_t sources_count; + struct merger_source **sources; + /* Ascending / descending order. */ + int order; + /* Optional output buffer. */ + struct ibuf *obuf; + /* Optional fetch_source() callback. */ + int fetch_source_ref; +}; + +/* }}} */ + +/* {{{ Helpers for source methods and merger functions */ + +/** + * How much more memory the heap will reserve at the next grow. + * + * See HEAP(reserve)() function in lib/salad/heap.h. + */ +size_t heap_next_grow_size(const heap_t *heap) +{ + heap_off_t heap_capacity_diff = heap->capacity == 0 ? + HEAP_INITIAL_CAPACITY : heap->capacity; + return heap_capacity_diff * sizeof(struct heap_node *); +} + +/** + * Extract an ibuf object from the Lua stack. + */ +static struct ibuf * +check_ibuf(struct lua_State *L, int idx) +{ + if (lua_type(L, idx) != LUA_TCDATA) + return NULL; + + uint32_t cdata_type; + struct ibuf *ibuf_ptr = luaL_checkcdata(L, idx, &cdata_type); + if (ibuf_ptr == NULL || cdata_type != ibuf_type_id) + return NULL; + return ibuf_ptr; +} + +/** + * Extract a merger context from the Lua stack. + */ +static struct merger_context * +check_merger_context(struct lua_State *L, int idx) +{ + uint32_t cdata_type; + struct merger_context **ctx_ptr = luaL_checkcdata(L, idx, &cdata_type); + if (ctx_ptr == NULL || cdata_type != merger_context_type_id) + return NULL; + return *ctx_ptr; +} + +/** + * Extract a merger state from the Lua stack. + */ +static struct merger_state * +check_merger_state(struct lua_State *L, int idx) +{ + uint32_t cdata_type; + struct merger_state **state_ptr = luaL_checkcdata(L, idx, &cdata_type); + if (state_ptr == NULL || cdata_type != merger_state_type_id) + return NULL; + return *state_ptr; +} + +/** + * Skip the array around tuples and save its length. + */ +static int +decode_header(struct ibuf *buf, size_t *len_p) +{ + /* Check the buffer is correct. */ + if (buf->rpos > buf->wpos) + return 0; + + /* Skip decoding if the buffer is empty. */ + if (ibuf_used(buf) == 0) { + *len_p = 0; + return 1; + } + + /* Check and skip the array around tuples. */ + int ok = mp_typeof(*buf->rpos) == MP_ARRAY; + if (ok) + ok = mp_check_array(buf->rpos, buf->wpos) <= 0; + if (ok) + *len_p = mp_decode_array((const char **) &buf->rpos); + return ok; +} + +/** + * Encode the array around tuples. + */ +static void +encode_header(struct ibuf *obuf, uint32_t result_len) +{ + ibuf_reserve(obuf, mp_sizeof_array(result_len)); + obuf->wpos = mp_encode_array(obuf->wpos, result_len); +} + +/* }}} */ + +/* {{{ Buffer merger source */ + +struct merger_source_buffer { + struct merger_source base; + /* + * The reference is needed to push the + * buffer to Lua as a part of the source + * table to the fetch_source callback. + * + * See luaL_merger_source_buffer_push(). + */ + int ref; + struct ibuf *buf; + /* + * A merger stops before end of a buffer + * when it is not the last merger in the + * chain. + */ + size_t remaining_tuples_cnt; +}; + +/* Virtual methods declarations */ + +static void +luaL_merger_source_buffer_delete(struct merger_source *base, + struct lua_State *L); +static int +luaL_merger_source_buffer_next(struct merger_source *base, + box_tuple_format_t *format, + const struct merger_state *state, + struct lua_State *L); + +/* Non-virtual methods */ + +/** + * Create the new merger source of buffer type using content of a + * Lua stack. + * + * In case of an error it returns NULL and pushes the error to the + * Lua stack. + */ +static struct merger_source * +luaL_merger_source_buffer_new(struct lua_State *L, int idx, int ordinal, + struct merger_state *state) +{ + static struct merger_source_vtab merger_source_buffer_vtab = { + .delete = luaL_merger_source_buffer_delete, + .next = luaL_merger_source_buffer_next, + }; + + struct merger_source_buffer *source = (struct merger_source_buffer *) + malloc(sizeof(struct merger_source_buffer)); + + if (source == NULL) { + diag_set(OutOfMemory, sizeof(struct merger_source_buffer), + "malloc", "merger_source_buffer"); + luaT_pusherror(L, diag_last_error(diag_get())); + return NULL; + } + + source->base.idx = ordinal; + source->base.processed = 0; + source->base.tuple = NULL; + /* source->base.hnode does not need to be initialized. */ + + lua_pushvalue(L, idx); /* Popped by luaL_ref(). */ + source->ref = luaL_ref(L, LUA_REGISTRYINDEX); + source->buf = check_ibuf(L, idx); + assert(source->buf != NULL); + source->remaining_tuples_cnt = 0; + + /* + * We decode a buffer header once at start when no fetch + * callback is provided. In case when there is the + * callback we should call it first: it is performed in + * the source->base.vtab->next() function. + * + * The reason is that a user can want to skip some data + * (say, a request metainformation) before proceed with + * merge. + */ + if (state->fetch_source_ref <= 0) { + if (!decode_header(source->buf, + &source->remaining_tuples_cnt)) { + luaL_unref(L, LUA_REGISTRYINDEX, source->ref); + free(source); + lua_pushfstring(L, "Invalid merge source %d", + ordinal + 1); + return NULL; + } + } + + source->base.vtab = &merger_source_buffer_vtab; + return &source->base; +} + +/** + * Push certain fields of a source to Lua. + */ +static int +luaL_merger_source_buffer_push(const struct merger_source_buffer *source, + struct lua_State *L) +{ + lua_createtable(L, 0, 3); + + lua_pushinteger(L, source->base.idx + 1); + lua_setfield(L, -2, "idx"); + + lua_pushstring(L, "buffer"); + lua_setfield(L, -2, "type"); + + lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref); + lua_setfield(L, -2, "buffer"); + + return 1; +} + +/** + * Call a user provided function to fill the source and, maybe, + * to skip data before tuples array. + * + * Return 0 at success and 1 at error (push the error object). + */ +static int +luaL_merger_source_buffer_fetch(struct merger_source_buffer *source, + const struct merger_state *state, + struct tuple *last_tuple, struct lua_State *L) +{ + /* No fetch callback: do nothing. */ + if (state->fetch_source_ref <= 0) + return 0; + /* Push fetch callback. */ + lua_rawgeti(L, LUA_REGISTRYINDEX, state->fetch_source_ref); + /* Push source, last_tuple, processed. */ + luaL_merger_source_buffer_push(source, L); + if (last_tuple == NULL) + lua_pushnil(L); + else + luaT_pushtuple(L, last_tuple); + lua_pushinteger(L, source->base.processed); + /* Invoke the callback and process data. */ + if (lua_pcall(L, 3, 0, 0)) + return 1; + /* Update remaining_tuples_cnt and skip the header. */ + if (!decode_header(source->buf, &source->remaining_tuples_cnt)) { + lua_pushfstring(L, "Invalid merge source %d", + source->base.idx + 1); + return 1; + } + return 0; +} + +/* Virtual methods */ + +static void +luaL_merger_source_buffer_delete(struct merger_source *base, + struct lua_State *L) +{ + struct merger_source_buffer *source = container_of(base, + struct merger_source_buffer, base); + + luaL_unref(L, LUA_REGISTRYINDEX, source->ref); + + if (base->tuple != NULL) + box_tuple_unref(base->tuple); + + free(source); +} + +static int +luaL_merger_source_buffer_next(struct merger_source *base, + box_tuple_format_t *format, + const struct merger_state *state, + struct lua_State *L) +{ + struct merger_source_buffer *source = container_of(base, + struct merger_source_buffer, base); + + struct tuple *last_tuple = base->tuple; + base->tuple = NULL; + + /* + * Handle the case when all data were processed: + * ask more and stop if no data arrived. + */ + if (source->remaining_tuples_cnt == 0) { + int rc = luaL_merger_source_buffer_fetch(source, state, + last_tuple, L); + if (rc != 0) + return 1; + if (source->remaining_tuples_cnt == 0) + return 0; + } + if (ibuf_used(source->buf) == 0) { + lua_pushstring(L, "Unexpected msgpack buffer end"); + return 1; + } + const char *tuple_beg = source->buf->rpos; + const char *tuple_end = tuple_beg; + /* + * mp_next() is faster then mp_check(), but can + * read bytes outside of the buffer and so can + * cause segmentation faults or incorrect result. + * + * We check buffer boundaries after the mp_next() + * call and throw an error when the boundaries are + * violated, but it does not save us from possible + * segmentation faults. + * + * It is in a user responsibility to provide valid + * msgpack. + */ + mp_next(&tuple_end); + --source->remaining_tuples_cnt; + if (tuple_end > source->buf->wpos) { + lua_pushstring(L, "Unexpected msgpack buffer end"); + return 1; + } + ++base->processed; + source->buf->rpos = (char *) tuple_end; + base->tuple = box_tuple_new(format, tuple_beg, tuple_end); + if (base->tuple == NULL) { + luaT_pusherror(L, diag_last_error(diag_get())); + return 1; + } + + box_tuple_ref(base->tuple); + return 0; +} + +/* }}} */ + +/* {{{ Table merger source */ + +struct merger_source_table { + struct merger_source base; + int ref; + int next_idx; +}; + +/* Virtual methods declarations */ + +static void +luaL_merger_source_table_delete(struct merger_source *base, + struct lua_State *L); +static int +luaL_merger_source_table_next(struct merger_source *base, + box_tuple_format_t *format, + const struct merger_state *state, + struct lua_State *L); + +/* Non-virtual methods */ + +/** + * Create the new merger source of table type using content of a + * Lua stack. + * + * In case of an error it returns NULL and pushes the error to the + * Lua stack. + */ +static struct merger_source * +luaL_merger_source_table_new(struct lua_State *L, int idx, int ordinal, + struct merger_state *state) +{ + (void) state; + + static struct merger_source_vtab merger_source_table_vtab = { + .delete = luaL_merger_source_table_delete, + .next = luaL_merger_source_table_next, + }; + + struct merger_source_table *source = (struct merger_source_table *) + malloc(sizeof(struct merger_source_table)); + + if (source == NULL) { + diag_set(OutOfMemory, sizeof(struct merger_source_table), + "malloc", "merger_source_table"); + luaT_pusherror(L, diag_last_error(diag_get())); + return NULL; + } + + source->base.idx = ordinal; + source->base.processed = 0; + source->base.tuple = NULL; + /* source->base.hnode does not need to be initialized. */ + + lua_pushvalue(L, idx); /* Popped by luaL_ref(). */ + source->ref = luaL_ref(L, LUA_REGISTRYINDEX); + source->next_idx = 1; + + source->base.vtab = &merger_source_table_vtab; + return &source->base; +} + +/** + * Push certain fields of a source to Lua. + */ +static int +luaL_merger_source_table_push(const struct merger_source_table *source, + struct lua_State *L) +{ + lua_createtable(L, 0, 3); + + lua_pushinteger(L, source->base.idx + 1); + lua_setfield(L, -2, "idx"); + + lua_pushstring(L, "table"); + lua_setfield(L, -2, "type"); + + lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref); + lua_setfield(L, -2, "table"); + + return 1; +} + +/** + * Call a user provided function to fill the source. + * + * Return 0 at success and 1 at error (push the error object). + */ +static int +luaL_merger_source_table_fetch(struct merger_source_table *source, + const struct merger_state *state, + struct tuple *last_tuple, struct lua_State *L) +{ + /* No fetch callback: do nothing. */ + if (state->fetch_source_ref <= 0) + return 0; + /* Push fetch callback. */ + lua_rawgeti(L, LUA_REGISTRYINDEX, state->fetch_source_ref); + /* Push source, last_tuple, processed. */ + luaL_merger_source_table_push(source, L); + if (last_tuple == NULL) + lua_pushnil(L); + else + luaT_pushtuple(L, last_tuple); + lua_pushinteger(L, source->base.processed); + /* Invoke the callback and process data. */ + if (lua_pcall(L, 3, 1, 0)) + return 1; + /* No more data: do nothing. */ + if (lua_isnil(L, -1)) { + lua_pop(L, 1); + return 0; + } + /* Set the new table as the source. */ + luaL_unref(L, LUA_REGISTRYINDEX, source->ref); + source->ref = luaL_ref(L, LUA_REGISTRYINDEX); + source->next_idx = 1; + return 0; + +} + +/* Virtual methods */ + +static void +luaL_merger_source_table_delete(struct merger_source *base, + struct lua_State *L) +{ + struct merger_source_buffer *source = container_of(base, + struct merger_source_buffer, base); + + luaL_unref(L, LUA_REGISTRYINDEX, source->ref); + + if (base->tuple != NULL) + box_tuple_unref(base->tuple); + + free(source); +} + +static int +luaL_merger_source_table_next(struct merger_source *base, + box_tuple_format_t *format, + const struct merger_state *state, + struct lua_State *L) +{ + struct merger_source_table *source = container_of(base, + struct merger_source_table, base); + + struct tuple *last_tuple = base->tuple; + base->tuple = NULL; + + lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref); + lua_pushinteger(L, source->next_idx); + lua_gettable(L, -2); + /* + * If all data were processed, try to fetch more. + */ + if (lua_isnil(L, -1)) { + lua_pop(L, 2); + int rc = luaL_merger_source_table_fetch(source, state, + last_tuple, L); + if (rc != 0) + return 1; + /* + * Retry tuple extracting after fetching + * of the source. + */ + lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref); + lua_pushinteger(L, source->next_idx); + lua_gettable(L, -2); + if (lua_isnil(L, -1)) { + lua_pop(L, 2); + return 0; + } + } + base->tuple = luaT_newtuple(L, -1, format); + if (base->tuple == NULL) + return 1; + ++source->next_idx; + ++base->processed; + lua_pop(L, 2); + + box_tuple_ref(base->tuple); + return 0; +} + +/* }}} */ + +/* {{{ Iterator merger source */ + +struct merger_source_iterator { + struct merger_source base; + struct luaL_iterator *it; +}; + +/* Virtual methods declarations */ + +static void +luaL_merger_source_iterator_delete(struct merger_source *base, + struct lua_State *L); +static int +luaL_merger_source_iterator_next(struct merger_source *base, + box_tuple_format_t *format, + const struct merger_state *state, + struct lua_State *L); + +/* Non-virtual methods */ + +/** + * Create the new merger source of iterator type using content of + * a Lua stack. + * + * In case of an error it returns NULL and pushes the error to the + * Lua stack. + */ +static struct merger_source * +luaL_merger_source_iterator_new(struct lua_State *L, int idx, int ordinal, + struct merger_state *state) +{ + (void) state; + + static struct merger_source_vtab merger_source_iterator_vtab = { + .delete = luaL_merger_source_iterator_delete, + .next = luaL_merger_source_iterator_next, + }; + + struct merger_source_iterator *source = + (struct merger_source_iterator *) malloc( + sizeof(struct merger_source_iterator)); + + if (source == NULL) { + diag_set(OutOfMemory, sizeof(struct merger_source_iterator), + "malloc", "merger_source_iterator"); + luaT_pusherror(L, diag_last_error(diag_get())); + return NULL; + } + + source->base.idx = ordinal; + source->base.processed = 0; + source->base.tuple = NULL; + /* source->base.hnode does not need to be initialized. */ + + source->it = luaL_iterator_new_fromtable(L, idx); + + source->base.vtab = &merger_source_iterator_vtab; + return &source->base; +} + +/* Virtual methods */ + +static void +luaL_merger_source_iterator_delete(struct merger_source *base, + struct lua_State *L) +{ + struct merger_source_iterator *source = container_of(base, + struct merger_source_iterator, base); + + luaL_iterator_free(L, source->it); + + if (base->tuple != NULL) + box_tuple_unref(base->tuple); + + free(source); +} + +static int +luaL_merger_source_iterator_next(struct merger_source *base, + box_tuple_format_t *format, + const struct merger_state *state, + struct lua_State *L) +{ + (void) state; + + struct merger_source_iterator *source = container_of(base, + struct merger_source_iterator, base); + + base->tuple = NULL; + + int nresult = luaL_iterator_next(L, source->it); + if (nresult == 0) + return 0; + base->tuple = luaT_newtuple(L, -nresult + 1, format); + if (base->tuple == NULL) + return 1; + ++base->processed; + lua_pop(L, nresult); + + box_tuple_ref(base->tuple); + return 0; +} + +/* }}} */ + +/* {{{ Create a source using Lua stack */ + +/** + * Create the new merger source using content of a Lua stack. + * + * In case of an error it returns NULL and pushes the error to the + * Lua stack. + */ +struct merger_source * +merger_source_new(struct lua_State *L, int idx, int ordinal, + struct merger_context *ctx, struct merger_state *state) +{ + struct merger_source *base = NULL; + + /* Determine type of a merger source on the Lua stack. */ + if (lua_type(L, idx) == LUA_TCDATA) { + struct ibuf *buf = check_ibuf(L, idx); + if (buf == NULL) + goto err; + /* Create the new buffer source. */ + base = luaL_merger_source_buffer_new(L, idx, ordinal, state); + } else if (lua_istable(L, idx)) { + lua_rawgeti(L, idx, 1); + int iscallable = luaL_iscallable(L, idx); + lua_pop(L, 1); + if (iscallable) { + /* Create the new iterator source. */ + base = luaL_merger_source_iterator_new(L, idx, ordinal, + state); + } else { + /* Create the new table source. */ + base = luaL_merger_source_table_new(L, idx, ordinal, + state); + } + } else { + goto err; + } + + if (base == NULL) + return NULL; + + /* Acquire the next tuple. */ + int rc = base->vtab->next(base, ctx->format, state, L); + if (rc) { + base->vtab->delete(base, L); + return NULL; + } + + /* Update the heap. */ + if (base->tuple != NULL) { + rc = merger_heap_insert(&state->heap, &base->hnode); + if (rc) { + base->vtab->delete(base, L); + diag_set(OutOfMemory, heap_next_grow_size(&state->heap), + "malloc", "merger heap"); + luaT_pusherror(L, diag_last_error(diag_get())); + return NULL; + } + } + + return base; + +err: + lua_pushfstring(L, "Unknown source type at index %d", ordinal + 1); + return NULL; +} + +/* }}} */ + +/* {{{ merger_context functions */ + +/** + * Free the merger context from a Lua code. + */ +static int +lbox_merger_context_gc(struct lua_State *L) +{ + struct merger_context *ctx; + if ((ctx = check_merger_context(L, 1)) == NULL) + return 0; + box_key_def_delete(ctx->key_def); + box_tuple_format_unref(ctx->format); + free(ctx); + return 0; +} + +/** + * Create the new merger context. + * + * Expected a table of key parts on the Lua stack. + * + * Returns the new instance. + */ +static int +lbox_merger_context_new(struct lua_State *L) +{ + if (lua_gettop(L) != 1) + return luaL_error(L, "Usage: merger.context.new(key_parts)"); + + struct merger_context *ctx = (struct merger_context *) malloc( + sizeof(struct merger_context)); + if (ctx == NULL) { + diag_set(OutOfMemory, sizeof(struct merger_context), "malloc", + "merger_context"); + return luaT_error(L); + } + ctx->key_def = luaT_new_key_def(L, 1); + if (ctx->key_def == NULL) { + free(ctx); + return luaL_error(L, "Cannot create key_def"); + } + + ctx->format = box_tuple_format_new(&ctx->key_def, 1); + if (ctx->format == NULL) { + box_key_def_delete(ctx->key_def); + free(ctx); + return luaL_error(L, "Cannot create format"); + } + + *(struct merger_context **) luaL_pushcdata(L, merger_context_type_id) = + ctx; + + lua_pushcfunction(L, lbox_merger_context_gc); + luaL_setcdatagc(L, -2); + + return 1; +} + +/* }}} */ + +/* {{{ merger_state functions */ + +/** + * Free the merger state. + * + * We need to know Lua state here, because sources of table and + * iterator types are saved as references within the Lua state. + */ +static void +merger_state_delete(struct lua_State *L, struct merger_state *state) +{ + merger_heap_destroy(&state->heap); + box_key_def_delete(state->key_def); + + for (uint32_t i = 0; i < state->sources_count; ++i) { + assert(state->sources != NULL); + assert(state->sources[i] != NULL); + state->sources[i]->vtab->delete(state->sources[i], L); + } + + if (state->sources != NULL) + free(state->sources); + + if (state->fetch_source_ref > 0) + luaL_unref(L, LUA_REGISTRYINDEX, state->fetch_source_ref); + + free(state); +} + +/** + * Free the merger state from a Lua code. + */ +static int +lbox_merger_state_gc(struct lua_State *L) +{ + struct merger_state *state; + if ((state = check_merger_state(L, 1)) == NULL) + return 0; + merger_state_delete(L, state); + return 0; +} + +/** + * Push 'bad params' / 'bad param X' and the usage info to the Lua + * stack. + */ +static int +merger_usage(struct lua_State *L, const char *param_name) +{ + static const char *usage = "merger.{ipairs,pairs,select}(" + "merger_context, " + "{source, source, ...}[, {" + "descending = <boolean> or <nil>, " + "buffer = <cdata<struct ibuf>> or <nil>, " + "fetch_source = <function> or <nil>}])"; + if (param_name == NULL) + lua_pushfstring(L, "Bad params, use: %s", usage); + else + lua_pushfstring(L, "Bad param \"%s\", use: %s", param_name, + usage); + return 1; +} + +/** + * Parse optional third parameter of merger.pairs() and + * merger.select() into the merger_state structure. + * + * Returns 0 on success. In case of an error it pushes an error + * message to the Lua stack and returns 1. + * + * It is the helper for merger_state_new(). + */ +static int +parse_opts(struct lua_State *L, int idx, struct merger_state *state) +{ + /* No opts: use defaults. */ + if (lua_isnoneornil(L, idx)) + return 0; + + /* Not a table: error. */ + if (!lua_istable(L, idx)) + return merger_usage(L, NULL); + + /* Parse descending to state->order. */ + lua_pushstring(L, "descending"); + lua_gettable(L, idx); + if (!lua_isnil(L, -1)) { + if (lua_isboolean(L, -1)) + state->order = lua_toboolean(L, -1) ? -1 : 1; + else + return merger_usage(L, "descending"); + } + lua_pop(L, 1); + + /* Parse buffer. */ + lua_pushstring(L, "buffer"); + lua_gettable(L, idx); + if (!lua_isnil(L, -1)) { + if ((state->obuf = check_ibuf(L, -1)) == NULL) + return merger_usage(L, "buffer"); + } + lua_pop(L, 1); + + /* Parse fetch_source. */ + lua_pushstring(L, "fetch_source"); + lua_gettable(L, idx); + if (!lua_isnil(L, -1)) { + if (!luaL_iscallable(L, -1)) + return merger_usage(L, "fetch_source"); + lua_pushvalue(L, -1); /* Popped by luaL_ref(). */ + state->fetch_source_ref = luaL_ref(L, LUA_REGISTRYINDEX); + } + lua_pop(L, 1); + + return 0; +} + +/** + * Parse sources table: second parameter of merger.pairs() + * and merger.select() into the merger_state structure. + * + * Note: This function should be called when options are already + * parsed (using parse_opts()). + * + * Returns 0 on success. In case of an error it pushes an error + * message to the Lua stack and returns 1. + * + * It is the helper for merger_state_new(). + */ +static int +parse_sources(struct lua_State *L, int idx, struct merger_context *ctx, + struct merger_state *state) +{ + /* Allocate sources array. */ + uint32_t capacity = 8; + const ssize_t sources_size = capacity * sizeof(struct merger_source *); + state->sources = (struct merger_source **) malloc(sources_size); + if (state->sources == NULL) { + diag_set(OutOfMemory, sources_size, "malloc", "state->sources"); + luaT_pusherror(L, diag_last_error(diag_get())); + return 1; + } + + /* Fetch all sources. */ + while (true) { + lua_pushinteger(L, state->sources_count + 1); + lua_gettable(L, idx); + if (lua_isnil(L, -1)) + break; + + /* Grow sources array if needed. */ + if (state->sources_count == capacity) { + capacity *= 2; + struct merger_source **new_sources; + const ssize_t new_sources_size = + capacity * sizeof(struct merger_source *); + new_sources = (struct merger_source **) realloc( + state->sources, new_sources_size); + if (new_sources == NULL) { + diag_set(OutOfMemory, new_sources_size / 2, + "malloc", "new_sources"); + luaT_pusherror(L, diag_last_error(diag_get())); + return 1; + } + state->sources = new_sources; + } + + /* Create the new source. */ + struct merger_source *source = merger_source_new(L, -1, + state->sources_count, ctx, state); + if (source == NULL) + return 1; + state->sources[state->sources_count] = source; + ++state->sources_count; + } + lua_pop(L, state->sources_count + 1); + + return 0; +} + +/** + * Parse sources and options on Lua stack and create the new + * merger_state instance. + * + * It is common code for parsing parameters for + * lbox_merger_ipairs() and lbox_merger_select(). + */ +static struct merger_state * +merger_state_new(struct lua_State *L) +{ + struct merger_context *ctx; + int ok = (lua_gettop(L) == 2 || lua_gettop(L) == 3) && + /* Merger context. */ + (ctx = check_merger_context(L, 1)) != NULL && + /* Sources. */ + lua_istable(L, 2) == 1 && + /* Opts. */ + (lua_isnoneornil(L, 3) == 1 || lua_istable(L, 3) == 1); + if (!ok) { + merger_usage(L, NULL); + lua_error(L); + unreachable(); + return NULL; + } + + struct merger_state *state = (struct merger_state *) + malloc(sizeof(struct merger_state)); + merger_heap_create(&state->heap); + state->key_def = key_def_dup(ctx->key_def); + state->sources_count = 0; + state->sources = NULL; + state->order = 1; + state->obuf = NULL; + state->fetch_source_ref = 0; + + if (parse_opts(L, 3, state) != 0 || + parse_sources(L, 2, ctx, state) != 0) { + merger_state_delete(L, state); + lua_error(L); + unreachable(); + return NULL; + } + + return state; +} + +/* }}} */ + +/* {{{ merger module logic */ + +/** + * Data comparing function to construct heap of sources. + */ +static bool +source_less(const heap_t *heap, const struct heap_node *a, + const struct heap_node *b) +{ + struct merger_source *left = container_of(a, struct merger_source, + hnode); + struct merger_source *right = container_of(b, struct merger_source, + hnode); + if (left->tuple == NULL && right->tuple == NULL) + return false; + if (left->tuple == NULL) + return false; + if (right->tuple == NULL) + return true; + struct merger_state *state = container_of(heap, struct merger_state, + heap); + return state->order * box_tuple_compare(left->tuple, right->tuple, + state->key_def) < 0; +} + +/** + * Get a tuple from a top source, update the source, update the + * heap. + * + * The reference counter of the tuple is increased (in + * source->vtab->next()). + * + * Return NULL when all sources are drained. + */ +static struct tuple * +merger_next(struct lua_State *L, struct merger_context *ctx, + struct merger_state *state) +{ + struct heap_node *hnode = merger_heap_top(&state->heap); + if (hnode == NULL) + return NULL; + + struct merger_source *source = container_of(hnode, struct merger_source, + hnode); + struct tuple *tuple = source->tuple; + assert(tuple != NULL); + int rc = source->vtab->next(source, ctx->format, state, L); + if (rc != 0) { + lua_error(L); + unreachable(); + return NULL; + } + if (source->tuple == NULL) + merger_heap_delete(&state->heap, hnode); + else + merger_heap_update(&state->heap, hnode); + + return tuple; +} + + +/** + * Iterator gen function to traverse merger results. + * + * Expected a merger context as the first parameter (state) and a + * merger_state as the second parameter (param) on the Lua + * stack. + * + * Push the merger_state (the new param) and the next tuple. + */ +static int +lbox_merger_gen(struct lua_State *L) +{ + struct merger_context *ctx; + struct merger_state *state; + bool ok = (ctx = check_merger_context(L, -2)) != NULL && + (state = check_merger_state(L, -1)) != NULL; + if (!ok) + return luaL_error(L, "Bad params, use: " + "lbox_merger_gen(merger_context, " + "merger_state)"); + + struct tuple *tuple = merger_next(L, ctx, state); + if (tuple == NULL) { + lua_pushnil(L); + lua_pushnil(L); + return 2; + } + + /* Push merger_state, tuple. */ + *(struct merger_state **) + luaL_pushcdata(L, merger_state_type_id) = state; + luaT_pushtuple(L, tuple); + + box_tuple_unref(tuple); + return 2; +} + +/** + * Iterate over merge results from Lua. + * + * Push three values to the Lua stack: + * + * 1. gen (lbox_merger_gen wrapped by fun.wrap()); + * 2. param (merger_context); + * 3. state (merger_state). + */ +static int +lbox_merger_ipairs(struct lua_State *L) +{ + /* Create merger_state. */ + struct merger_state *state = merger_state_new(L); + lua_settop(L, 1); /* Pop sources, [opts]. */ + /* Stack: merger_context. */ + + if (state->obuf != NULL) + return luaL_error(L, "\"buffer\" option is forbidden with " + "merger.pairs(<...>)"); + + luaL_loadstring(L, "return require('fun').wrap"); + lua_call(L, 0, 1); + lua_insert(L, -2); /* Swap merger_context and wrap. */ + /* Stack: wrap, merger_context. */ + + lua_pushcfunction(L, lbox_merger_gen); + lua_insert(L, -2); /* Swap merger_context and gen. */ + /* Stack: wrap, gen, merger_context. */ + + *(struct merger_state **) + luaL_pushcdata(L, merger_state_type_id) = state; + lua_pushcfunction(L, lbox_merger_state_gc); + luaL_setcdatagc(L, -2); + /* Stack: wrap, gen, merger_context, merger_state. */ + + /* Call fun.wrap(gen, merger_context, merger_state). */ + lua_call(L, 3, 3); + return 3; +} + +/** + * Write merge results into ibuf. + * + * It is the helper for lbox_merger_select(). + */ +static void +encode_result_buffer(struct lua_State *L, struct merger_context *ctx, + struct merger_state *state) +{ + struct ibuf *obuf = state->obuf; + uint32_t result_len = 0; + uint32_t result_len_offset = 4; + + /* + * Reserve maximum size for the array around resulting + * tuples to set it later. + */ + encode_header(state->obuf, UINT32_MAX); + + /* Fetch, merge and copy tuples to the buffer. */ + struct tuple *tuple; + while ((tuple = merger_next(L, ctx, state)) != NULL) { + uint32_t bsize = tuple->bsize; + ibuf_reserve(obuf, bsize); + memcpy(obuf->wpos, tuple_data(tuple), bsize); + obuf->wpos += bsize; + result_len_offset += bsize; + box_tuple_unref(tuple); + ++result_len; + } + + /* Write the real array size. */ + mp_store_u32(obuf->wpos - result_len_offset, result_len); +} + +/** + * Write merge results into the new Lua table. + * + * It is the helper for lbox_merger_select(). + */ +static int +create_result_table(struct lua_State *L, struct merger_context *ctx, + struct merger_state *state) +{ + /* Create result table. */ + lua_newtable(L); + + uint32_t cur = 1; + + /* Fetch, merge and save tuples to the table. */ + struct tuple *tuple; + while ((tuple = merger_next(L, ctx, state)) != NULL) { + luaT_pushtuple(L, tuple); + lua_rawseti(L, -2, cur); + box_tuple_unref(tuple); + ++cur; + } + + return 1; +} + +/** + * Perform the merge. + * + * Write results into a buffer or a Lua table depending on + * options. + * + * Expected a merger context, sources table and options (optional) + * on the Lua stack. + * + * Return the Lua table or nothing when the 'buffer' option is + * provided. + */ +static int +lbox_merger_select(struct lua_State *L) +{ + struct merger_context *ctx = check_merger_context(L, 1); + if (ctx == NULL) { + merger_usage(L, NULL); + lua_error(L); + } + + struct merger_state *state = merger_state_new(L); + lua_settop(L, 0); /* Pop merger_context, sources, [opts]. */ + + if (state->obuf == NULL) { + create_result_table(L, ctx, state); + merger_state_delete(L, state); + return 1; + } else { + encode_result_buffer(L, ctx, state); + merger_state_delete(L, state); + return 0; + } +} + +/** + * Register the module. + */ +LUA_API int +luaopen_merger(lua_State *L) +{ + luaL_cdef(L, "struct merger_context;"); + luaL_cdef(L, "struct merger_state;"); + luaL_cdef(L, "struct ibuf;"); + + merger_context_type_id = luaL_ctypeid(L, "struct merger_context&"); + merger_state_type_id = luaL_ctypeid(L, "struct merger_state&"); + ibuf_type_id = luaL_ctypeid(L, "struct ibuf"); + + /* Export C functions to Lua. */ + static const struct luaL_Reg meta[] = { + {"select", lbox_merger_select}, + {"ipairs", lbox_merger_ipairs}, + {"pairs", lbox_merger_ipairs}, + {NULL, NULL} + }; + luaL_register_module(L, "merger", meta); + + /* Add context.new(). */ + lua_newtable(L); /* merger.context */ + lua_pushcfunction(L, lbox_merger_context_new); + lua_setfield(L, -2, "new"); + lua_setfield(L, -2, "context"); + + return 1; +} + +/* }}} */ diff --git a/src/box/lua/merger.h b/src/box/lua/merger.h new file mode 100644 index 000000000..e444e99e4 --- /dev/null +++ b/src/box/lua/merger.h @@ -0,0 +1,47 @@ +#ifndef TARANTOOL_BOX_LUA_MERGER_H_INCLUDED +#define TARANTOOL_BOX_LUA_MERGER_H_INCLUDED +/* + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#if defined(__cplusplus) +extern "C" { +#endif /* defined(__cplusplus) */ + +struct lua_State; + +int +luaopen_merger(struct lua_State *L); + +#if defined(__cplusplus) +} /* extern "C" */ +#endif /* defined(__cplusplus) */ + +#endif /* TARANTOOL_BOX_LUA_MERGER_H_INCLUDED */ diff --git a/test/box-tap/merger.test.lua b/test/box-tap/merger.test.lua new file mode 100755 index 000000000..c6f890edf --- /dev/null +++ b/test/box-tap/merger.test.lua @@ -0,0 +1,558 @@ +#!/usr/bin/env tarantool + +local tap = require('tap') +local buffer = require('buffer') +local msgpackffi = require('msgpackffi') +local digest = require('digest') +local merger = require('merger') +local fiber = require('fiber') +local utf8 = require('utf8') +local ffi = require('ffi') +local fun = require('fun') + +local FETCH_BLOCK_SIZE = 10 + +local function merger_usage(param) + local msg = 'merger.{ipairs,pairs,select}(' .. + 'merger_context, ' .. + '{source, source, ...}[, {' .. + 'descending = <boolean> or <nil>, ' .. + 'buffer = <cdata<struct ibuf>> or <nil>, ' .. + 'fetch_source = <function> or <nil>}])' + if not param then + return ('Bad params, use: %s'):format(msg) + else + return ('Bad param "%s", use: %s'):format(param, msg) + end +end + +-- Get buffer with data encoded without last 'trunc' bytes. +local function truncated_msgpack_buffer(data, trunc) + local data = msgpackffi.encode(data) + data = data:sub(1, data:len() - trunc) + local len = data:len() + local buf = buffer.ibuf() + -- Ensure we have enough buffer to write len + trunc bytes. + buf:reserve(len + trunc) + local p = buf:alloc(len) + -- Ensure len bytes follows with trunc zero bytes. + ffi.copy(p, data .. string.rep('\0', trunc), len + trunc) + return buf +end + +local bad_merger_methods_calls = { + { + 'Bad opts', + sources = {}, + opts = 1, + exp_err = merger_usage(nil), + }, + { + 'Bad opts.descending', + sources = {}, + opts = {descending = 1}, + exp_err = merger_usage('descending'), + }, + { + 'Bad source', + sources = {1}, + opts = nil, + exp_err = 'Unknown source type at index 1', + }, + { + 'Bad cdata source', + sources = {ffi.new('char *')}, + opts = nil, + exp_err = 'Unknown source type at index 1', + }, + { + 'Wrong source of table type', + sources = {{1}}, + opts = nil, + exp_err = 'A tuple or a table expected, got number', + }, + { + 'Use buffer with an iterator result', + sources = {}, + opts = {buffer = buffer.ibuf()}, + funcs = {'pairs', 'ipairs'}, + exp_err = '"buffer" option is forbidden with merger.pairs(<...>)', + }, + { + 'Bad msgpack source: wrong length of the tuples array', + -- Remove the last tuple from msgpack data, but keep old + -- tuples array size. + sources = { + truncated_msgpack_buffer({{''}, {''}, {''}}, 2), + }, + opts = {}, + funcs = {'select'}, + exp_err = 'Unexpected msgpack buffer end', + }, + { + 'Bad msgpack source: wrong length of a tuple', + -- Remove half of the last tuple, but keep old tuple size. + sources = { + truncated_msgpack_buffer({{''}, {''}, {''}}, 1), + }, + opts = {}, + funcs = {'select'}, + exp_err = 'Unexpected msgpack buffer end', + }, + { + 'Bad fetch_source type', + sources = {}, + opts = {fetch_source = 1}, + exp_err = merger_usage('fetch_source'), + }, +} + +local schemas = { + { + name = 'small_unsigned', + parts = { + { + fieldno = 2, + type = 'unsigned', + } + }, + gen_tuple = function(tupleno) + return {'id_' .. tostring(tupleno), tupleno} + end, + }, + -- Merger allocates a memory for 8 parts by default. + -- Test that reallocation works properly. + -- Test with N-1 equal parts and Nth different. + { + name = 'many_parts', + parts = (function() + local parts = {} + for i = 1, 128 do + parts[i] = { + fieldno = i, + type = 'unsigned', + } + end + return parts + end)(), + gen_tuple = function(tupleno) + local tuple = {} + -- 127 constant parts + for i = 1, 127 do + tuple[i] = i + end + -- 128th part is varying + tuple[128] = tupleno + return tuple + end, + -- reduce tuples count to decrease test run time + tuples_cnt = 16, + }, + -- Test null value in nullable field of an index. + { + name = 'nullable', + parts = { + { + fieldno = 1, + type = 'unsigned', + }, + { + fieldno = 2, + type = 'string', + is_nullable = true, + }, + }, + gen_tuple = function(i) + if i % 1 == 1 then + return {0, tostring(i)} + else + return {0, box.NULL} + end + end, + }, + -- Test index part with 'collation_id' option (as in net.box's + -- response). + { + name = 'collation_id', + parts = { + { + fieldno = 1, + type = 'string', + collation_id = 2, -- unicode_ci + }, + }, + gen_tuple = function(i) + local letters = {'a', 'b', 'c', 'A', 'B', 'C'} + if i <= #letters then + return {letters[i]} + else + return {''} + end + end, + }, + -- Test index part with 'collation' option (as in local index + -- parts). + { + name = 'collation', + parts = { + { + fieldno = 1, + type = 'string', + collation = 'unicode_ci', + }, + }, + gen_tuple = function(i) + local letters = {'a', 'b', 'c', 'A', 'B', 'C'} + if i <= #letters then + return {letters[i]} + else + return {''} + end + end, + }, +} + +local function is_unicode_ci_part(part) + return part.collation_id == 2 or part.collation == 'unicode_ci' +end + +local function tuple_comparator(a, b, parts) + for _, part in ipairs(parts) do + local fieldno = part.fieldno + if a[fieldno] ~= b[fieldno] then + if a[fieldno] == nil then + return -1 + end + if b[fieldno] == nil then + return 1 + end + if is_unicode_ci_part(part) then + return utf8.casecmp(a[fieldno], b[fieldno]) + end + return a[fieldno] < b[fieldno] and -1 or 1 + end + end + + return 0 +end + +local function sort_tuples(tuples, parts, opts) + local function tuple_comparator_wrapper(a, b) + local cmp = tuple_comparator(a, b, parts) + if cmp < 0 then + return not opts.descending + elseif cmp > 0 then + return opts.descending + else + return false + end + end + + table.sort(tuples, tuple_comparator_wrapper) +end + +local function lowercase_unicode_ci_fields(tuples, parts) + for i = 1, #tuples do + local tuple = tuples[i] + for _, part in ipairs(parts) do + if is_unicode_ci_part(part) then + -- Workaround #3709. + if tuple[part.fieldno]:len() > 0 then + tuple[part.fieldno] = utf8.lower(tuple[part.fieldno]) + end + end + end + end +end + +local function gen_fetch_source(schema, tuples, opts) + local opts = opts or {} + local input_type = opts.input_type + local sources_cnt = #tuples + + local sources = {} + local last_positions = {} + for i = 1, sources_cnt do + sources[i] = input_type == 'table' and {} or buffer.ibuf() + last_positions[i] = 0 + end + + local fetch_source = function(source, last_tuple, processed) + assert(source.type == input_type) + if source.type == 'buffer' then + assert(type(source.buffer) == 'cdata') + assert(ffi.istype('struct ibuf', source.buffer)) + assert(source.table == nil) + else + assert(source.type == 'table') + assert(type(source.table) == 'table') + assert(source.buffer == nil) + end + local idx = source.idx + local last_pos = last_positions[idx] + local exp_last_tuple = tuples[idx][last_pos] + assert((last_tuple == nil and exp_last_tuple == nil) or + tuple_comparator(last_tuple, exp_last_tuple, + schema.parts) == 0) + assert(last_pos == processed) + local data = fun.iter(tuples[idx]):drop(last_pos):take( + FETCH_BLOCK_SIZE):totable() + assert(#data > 0 or processed == #tuples[idx]) + last_positions[idx] = last_pos + #data + if source.type == 'table' then + return data + elseif source.type == 'buffer' then + msgpackffi.internal.encode_r(source.buffer, data, 0) + else + assert(false) + end + end + + return sources, fetch_source +end + +local function prepare_data(schema, tuples_cnt, sources_cnt, opts) + local opts = opts or {} + local input_type = opts.input_type + local use_table_as_tuple = opts.use_table_as_tuple + local use_fetch_source = opts.use_fetch_source + + local tuples = {} + local exp_result = {} + local fetch_source + + -- Ensure empty sources are empty table and not nil. + for i = 1, sources_cnt do + if tuples[i] == nil then + tuples[i] = {} + end + end + + -- Prepare N tables with tuples as input for merger. + for i = 1, tuples_cnt do + -- [1, sources_cnt] + local guava = digest.guava(i, sources_cnt) + 1 + local tuple = schema.gen_tuple(i) + table.insert(exp_result, tuple) + if not use_table_as_tuple then + assert(input_type ~= 'buffer') + tuple = box.tuple.new(tuple) + end + table.insert(tuples[guava], tuple) + end + + -- Sort tuples within each source. + for _, source_tuples in pairs(tuples) do + sort_tuples(source_tuples, schema.parts, opts) + end + + -- Sort expected result. + sort_tuples(exp_result, schema.parts, opts) + + -- Fill sources. + local sources + if input_type == 'table' then + -- Imitate netbox's select w/o {buffer = ...}. + if use_fetch_source then + sources, fetch_source = gen_fetch_source(schema, tuples, opts) + else + sources = tuples + end + elseif input_type == 'buffer' then + -- Imitate netbox's select with {buffer = ...}. + if use_fetch_source then + sources, fetch_source = gen_fetch_source(schema, tuples, opts) + else + sources = {} + for i = 1, sources_cnt do + sources[i] = buffer.ibuf() + msgpackffi.internal.encode_r(sources[i], tuples[i], 0) + end + end + elseif input_type == 'iterator' then + -- Lua iterator. + assert(not use_fetch_source) + sources = {} + for i = 1, sources_cnt do + sources[i] = { + -- gen (next) + next, + -- param (tuples) + tuples[i], + -- state (idx) + nil + } + end + end + + return sources, exp_result, fetch_source +end + +local function test_case_opts_str(opts) + local params = {} + + if opts.input_type then + table.insert(params, 'input_type: ' .. opts.input_type) + end + + if opts.output_type then + table.insert(params, 'output_type: ' .. opts.output_type) + end + + if opts.descending then + table.insert(params, 'descending') + end + + if opts.use_table_as_tuple then + table.insert(params, 'use_table_as_tuple') + end + + if opts.use_fetch_source then + table.insert(params, 'use_fetch_source') + end + + if next(params) == nil then + return '' + end + + return (' (%s)'):format(table.concat(params, ', ')) +end + +local function run_merger(test, schema, tuples_cnt, sources_cnt, opts) + fiber.yield() + + local opts = opts or {} + + -- Prepare data. + local sources, exp_result, fetch_source = + prepare_data(schema, tuples_cnt, sources_cnt, opts) + + -- Create a merger instance and fill options. + local ctx = merger.context.new(schema.parts) + local merger_opts = { + descending = opts.descending, + fetch_source = fetch_source, + } + if opts.output_type == 'buffer' then + merger_opts.buffer = buffer.ibuf() + end + + local res + + -- Run merger and prepare output for compare. + if opts.output_type == 'table' then + -- Table output. + res = merger.select(ctx, sources, merger_opts) + elseif opts.output_type == 'buffer' then + -- Buffer output. + merger.select(ctx, sources, merger_opts) + local obuf = merger_opts.buffer + res = msgpackffi.decode(obuf.rpos) + else + -- Iterator output. + assert(opts.output_type == 'iterator') + res = merger.pairs(ctx, sources, merger_opts):totable() + end + + -- A bit more postprocessing to compare. + for i = 1, #res do + if type(res[i]) ~= 'table' then + res[i] = res[i]:totable() + end + end + + -- unicode_ci does not differentiate btw 'A' and 'a', so the + -- order is arbitrary. We transform fields with unicode_ci + -- collation in parts to lower case before comparing. + lowercase_unicode_ci_fields(res, schema.parts) + lowercase_unicode_ci_fields(exp_result, schema.parts) + + test:is_deeply(res, exp_result, + ('check order on %3d tuples in %4d sources%s') + :format(tuples_cnt, sources_cnt, test_case_opts_str(opts))) +end + +local function run_case(test, schema, opts) + local opts = opts or {} + + local case_name = ('testing on schema %s%s'):format( + schema.name, test_case_opts_str(opts)) + local tuples_cnt = schema.tuples_cnt or 100 + + local input_type = opts.input_type + local use_table_as_tuple = opts.use_table_as_tuple + local use_fetch_source = opts.use_fetch_source + + -- Skip meaningless flags combinations. + if input_type == 'buffer' and not use_table_as_tuple then + return + end + if input_type == 'iterator' and use_fetch_source then + return + end + + test:test(case_name, function(test) + test:plan(6) + + -- Check with small buffers count. + run_merger(test, schema, tuples_cnt, 1, opts) + run_merger(test, schema, tuples_cnt, 2, opts) + run_merger(test, schema, tuples_cnt, 3, opts) + run_merger(test, schema, tuples_cnt, 4, opts) + run_merger(test, schema, tuples_cnt, 5, opts) + + -- Check more buffers then tuples count. + run_merger(test, schema, tuples_cnt, 1000, opts) + end) +end + +local test = tap.test('merger') +test:plan(#bad_merger_methods_calls + #schemas * 48) + +-- For collations. +box.cfg{} + +-- Create the instance to use in testing merger's methods below. +local ctx = merger.context.new({{ + fieldno = 1, + type = 'string', +}}) + +-- Bad source or/and opts parameters for merger's methods. +for _, case in ipairs(bad_merger_methods_calls) do + test:test(case[1], function(test) + local funcs = case.funcs or {'pairs', 'ipairs', 'select'} + test:plan(#funcs) + for _, func in ipairs(funcs) do + local exp_ok = case.exp_err == nil + local ok, err = pcall(merger[func], ctx, case.sources, case.opts) + if ok then + err = nil + end + test:is_deeply({ok, err}, {exp_ok, case.exp_err}, func) + end + end) +end + +-- Merging cases. +for _, input_type in ipairs({'buffer', 'table', 'iterator'}) do + for _, output_type in ipairs({'buffer', 'table', 'iterator'}) do + for _, descending in ipairs({false, true}) do + for _, use_table_as_tuple in ipairs({false, true}) do + for _, use_fetch_source in ipairs({false, true}) do + for _, schema in ipairs(schemas) do + run_case(test, schema, { + input_type = input_type, + output_type = output_type, + descending = descending, + use_table_as_tuple = use_table_as_tuple, + use_fetch_source = use_fetch_source, + }) + end + end + end + end + end +end + +os.exit(test:check() and 0 or 1) diff --git a/test/box-tap/suite.ini b/test/box-tap/suite.ini index 50dc1f435..3361102be 100644 --- a/test/box-tap/suite.ini +++ b/test/box-tap/suite.ini @@ -1,4 +1,5 @@ [default] core = app description = Database tests with #! using TAP +long_run = merger.test.lua is_parallel = True -- 2.20.1
prev parent reply other threads:[~2019-01-09 20:20 UTC|newest] Thread overview: 28+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-01-09 20:20 [PATCH v2 0/6] Merger Alexander Turenko 2019-01-09 20:20 ` [PATCH v2 1/6] Add luaL_iscallable with support of cdata metatype Alexander Turenko 2019-01-10 12:21 ` Vladimir Davydov 2019-01-09 20:20 ` [PATCH v2 2/6] Add functions to ease using Lua iterators from C Alexander Turenko 2019-01-10 12:29 ` Vladimir Davydov 2019-01-15 23:26 ` Alexander Turenko 2019-01-16 8:18 ` Vladimir Davydov 2019-01-16 11:40 ` Alexander Turenko 2019-01-16 12:20 ` Vladimir Davydov 2019-01-17 1:20 ` Alexander Turenko 2019-01-28 18:17 ` Alexander Turenko 2019-03-01 4:04 ` Alexander Turenko 2019-01-09 20:20 ` [PATCH v2 3/6] lua: add luaT_newtuple() Alexander Turenko 2019-01-10 12:44 ` Vladimir Davydov 2019-01-18 21:58 ` Alexander Turenko 2019-01-23 16:12 ` Vladimir Davydov 2019-01-28 16:51 ` Alexander Turenko 2019-03-01 4:08 ` Alexander Turenko 2019-01-09 20:20 ` [PATCH v2 4/6] lua: add luaT_new_key_def() Alexander Turenko 2019-01-10 13:07 ` Vladimir Davydov 2019-01-29 18:52 ` Alexander Turenko 2019-01-30 10:58 ` Alexander Turenko 2019-03-01 4:10 ` Alexander Turenko 2019-01-09 20:20 ` [PATCH v2 5/6] net.box: add helpers to decode msgpack headers Alexander Turenko 2019-01-10 17:29 ` Vladimir Davydov 2019-02-01 15:11 ` Alexander Turenko 2019-03-05 12:00 ` Alexander Turenko 2019-01-09 20:20 ` Alexander Turenko [this message]
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=7c1abeb46357d3495c86bc5287d7dafc3c9c587b.1547064388.git.alexander.turenko@tarantool.org \ --to=alexander.turenko@tarantool.org \ --cc=tarantool-patches@freelists.org \ --cc=vdavydov.dev@gmail.com \ --subject='Re: [PATCH v2 6/6] Add merger for tuple streams' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox