[PATCH v4 4/4] Add merger for tuple streams (Lua part)
Alexander Turenko
alexander.turenko at tarantool.org
Wed May 8 01:30:48 MSK 2019
Fixes #3276.
@TarantoolBot document
Title: Merger for tuple streams
The main concept of the merger is a source. It is an object that
provides a stream of tuples. There are four types of sources: a tuple
source, a table source, a buffer source and a merger itself.
A tuple source just return one tuple. However this source (as well as a
table and a buffer ones) supports fetching of a next data chunk, so the
API allows to create it from a Lua iterator:
`merger.new_tuple_source(gen, param, state)`. A `gen` function should
return `state, tuple` on each call and then return `nil` when no more
tuples available. Consider the example:
```lua
box.cfg({})
box.schema.space.create('s')
box.space.s:create_index('pk')
box.space.s:insert({1})
box.space.s:insert({2})
box.space.s:insert({3})
s = merger.new_tuple_source(box.space.s:pairs())
s:select()
---
- - [1]
- [2]
- [3]
...
s = merger.new_tuple_source(box.space.s:pairs())
s:pairs():totable()
---
- - [1]
- [2]
- [3]
...
```
As we see a source (it is common for all sources) has `:select()` and
`:pairs()` methods. The first one has two options: `buffer` and `limit`
with the same meaning as ones in net.box `:select()`. The `:pairs()`
method (or `:ipairs()` alias) returns a luafun iterator (it is a Lua
iterator, but also provides a set of handy methods to operate in
functional style).
The same API exists to create a table and a buffer source:
`merger.new_table_source(gen, param, state)` and
`merger.new_buffer_source(gen, param, state)`. A `gen` function should
return a table or a buffer on each call.
There are also helpers that are useful when all data are available at
once: `merger.new_source_fromtable(tbl)` and
`merger.new_source_frombuffer(buf)`.
A merger is a special kind of a source, which is created from a key_def
object and a set of sources. It performs a kind of the merge sort:
chooses a source with a minimal / maximal tuple on each step, consumes
a tuple from this source and repeats. The API to create a merger is the
following:
```lua
local key_def_lib = require('key_def')
local merger = require('merger')
local key_def = key_def_lib.new(<...>)
local sources = {<...>}
local merger_inst = merger.new(key_def, sources, {
-- Ascending (false) or descending (true) order.
-- Default is ascending.
reverse = <boolean> or <nil>,
})
```
An instance of a merger has the same `:select()` and `:pairs()` methods
as any other source.
The `key_def_lib.new()` function takes a table of key parts as an
argument in the same format as box.space.<...>.index.<...>.parts or
conn.space.<...>.index.<...>.parts (where conn is a net.box connection):
```
local key_parts = {
{
fieldno = <number>,
type = <string>,
[ is_nullable = <boolean>, ]
[ collation_id = <number>, ]
[ collation = <string>, ]
},
...
}
local key_def = key_def_lib.new(key_parts)
```
A key_def can be cached across requests with the same ordering rules
(typically requests to a same space).
---
src/box/CMakeLists.txt | 2 +
src/box/lua/init.c | 7 +-
src/box/lua/merger.c | 1143 ++++++++++++++++++++++++++++++++++
src/box/lua/merger.h | 47 ++
src/box/lua/merger.lua | 41 ++
test/box-tap/merger.test.lua | 768 +++++++++++++++++++++++
6 files changed, 2007 insertions(+), 1 deletion(-)
create mode 100644 src/box/lua/merger.c
create mode 100644 src/box/lua/merger.h
create mode 100644 src/box/lua/merger.lua
create mode 100755 test/box-tap/merger.test.lua
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index ce328fb95..0864c3433 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -13,6 +13,7 @@ lua_source(lua_sources lua/upgrade.lua)
lua_source(lua_sources lua/console.lua)
lua_source(lua_sources lua/xlog.lua)
lua_source(lua_sources lua/key_def.lua)
+lua_source(lua_sources lua/merger.lua)
set(bin_sources)
bin_source(bin_sources bootstrap.snap bootstrap.h)
@@ -144,6 +145,7 @@ add_library(box STATIC
lua/xlog.c
lua/execute.c
lua/key_def.c
+ lua/merger.c
${bin_sources})
target_link_libraries(box box_error tuple stat xrow xlog vclock crc32 scramble
diff --git a/src/box/lua/init.c b/src/box/lua/init.c
index 6b8be5096..76b987b4b 100644
--- a/src/box/lua/init.c
+++ b/src/box/lua/init.c
@@ -60,6 +60,7 @@
#include "box/lua/tuple.h"
#include "box/lua/execute.h"
#include "box/lua/key_def.h"
+#include "box/lua/merger.h"
extern char session_lua[],
tuple_lua[],
@@ -70,7 +71,8 @@ extern char session_lua[],
feedback_daemon_lua[],
net_box_lua[],
upgrade_lua[],
- console_lua[];
+ console_lua[],
+ merger_lua[];
static const char *lua_sources[] = {
"box/session", session_lua,
@@ -83,6 +85,7 @@ static const char *lua_sources[] = {
"box/load_cfg", load_cfg_lua,
"box/xlog", xlog_lua,
"box/key_def", key_def_lua,
+ "box/merger", merger_lua,
NULL
};
@@ -317,6 +320,8 @@ box_lua_init(struct lua_State *L)
lua_pop(L, 1);
luaopen_key_def(L);
lua_pop(L, 1);
+ luaopen_merger(L);
+ lua_pop(L, 1);
/* Load Lua extension */
for (const char **s = lua_sources; *s; s += 2) {
diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c
new file mode 100644
index 000000000..a61adb389
--- /dev/null
+++ b/src/box/lua/merger.c
@@ -0,0 +1,1143 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "box/lua/merger.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <lua.h> /* lua_*() */
+#include <lauxlib.h> /* luaL_*() */
+
+#include "fiber.h" /* fiber() */
+#include "diag.h" /* diag_set() */
+
+#include "box/tuple.h" /* tuple_format_runtime,
+ tuple_*(), ... */
+
+#include "lua/error.h" /* luaT_error() */
+#include "lua/utils.h" /* luaL_pushcdata(),
+ luaL_iterator_*() */
+
+#include "box/lua/key_def.h" /* luaT_check_key_def() */
+#include "box/lua/tuple.h" /* luaT_tuple_new() */
+
+#include "small/ibuf.h" /* struct ibuf */
+#include "msgpuck.h" /* mp_*() */
+
+#include "box/merger.h" /* merge_source_*, merger_*() */
+
+static uint32_t CTID_STRUCT_MERGE_SOURCE_REF = 0;
+static uint32_t CTID_STRUCT_IBUF = 0;
+
+/**
+ * A type of a function to create a source from a Lua iterator on
+ * a Lua stack.
+ *
+ * Such function is to be passed to lbox_merge_source_new() as
+ * a parameter.
+ */
+typedef struct merge_source *(*luaL_merge_source_new_f)(struct lua_State *L);
+
+/* {{{ Helpers */
+
+/**
+ * Extract an ibuf object from the Lua stack.
+ */
+static struct ibuf *
+luaT_check_ibuf(struct lua_State *L, int idx)
+{
+ if (lua_type(L, idx) != LUA_TCDATA)
+ return NULL;
+
+ uint32_t cdata_type;
+ struct ibuf *ibuf_ptr = luaL_checkcdata(L, idx, &cdata_type);
+ if (ibuf_ptr == NULL || cdata_type != CTID_STRUCT_IBUF)
+ return NULL;
+ return ibuf_ptr;
+}
+
+/**
+ * Extract a merge source from the Lua stack.
+ */
+static struct merge_source *
+luaT_check_merge_source(struct lua_State *L, int idx)
+{
+ uint32_t cdata_type;
+ struct merge_source **source_ptr = luaL_checkcdata(L, idx, &cdata_type);
+ if (source_ptr == NULL || cdata_type != CTID_STRUCT_MERGE_SOURCE_REF)
+ return NULL;
+ return *source_ptr;
+}
+
+/**
+ * Skip an array around tuples and save its length.
+ */
+static int
+decode_header(struct ibuf *buf, size_t *len_p)
+{
+ /* Check the buffer is correct. */
+ if (buf->rpos > buf->wpos)
+ return -1;
+
+ /* Skip decoding if the buffer is empty. */
+ if (ibuf_used(buf) == 0) {
+ *len_p = 0;
+ return 0;
+ }
+
+ /* Check and skip the array around tuples. */
+ int ok = mp_typeof(*buf->rpos) == MP_ARRAY;
+ if (ok)
+ ok = mp_check_array(buf->rpos, buf->wpos) <= 0;
+ if (ok)
+ *len_p = mp_decode_array((const char **) &buf->rpos);
+ return ok ? 0 : -1;
+}
+
+/**
+ * Encode an array around tuples.
+ */
+static void
+encode_header(struct ibuf *output_buffer, uint32_t result_len)
+{
+ ibuf_reserve(output_buffer, mp_sizeof_array(result_len));
+ output_buffer->wpos = mp_encode_array(output_buffer->wpos, result_len);
+}
+
+/**
+ * Get a tuple from a Lua stack.
+ *
+ * If a Lua table is on a specified index, create a tuple with
+ * provided format and return. If format is NULL use the runtime
+ * format.
+ *
+ * If a tuple is on a specified index, validate it against
+ * provided format (if it is not NULL) and return.
+ *
+ * In case of an error return NULL and set a diag.
+ */
+static struct tuple *
+luaT_gettuple(struct lua_State *L, int idx, struct tuple_format *format)
+{
+ struct tuple *tuple = luaT_istuple(L, idx);
+ if (tuple == NULL) {
+ /* Create a tuple from a Lua table. */
+ if (format == NULL)
+ format = tuple_format_runtime;
+ if ((tuple = luaT_tuple_new(L, idx, format)) == NULL)
+ return NULL;
+ } else {
+ /* Validate a tuple. */
+ if (format != NULL && tuple_validate(format, tuple) != 0)
+ return NULL;
+ }
+ return tuple;
+}
+
+/* }}} */
+
+/* {{{ Create, destroy structures from Lua */
+
+/**
+ * Free a merge source from a Lua code.
+ */
+static int
+lbox_merge_source_gc(struct lua_State *L)
+{
+ struct merge_source *source = luaT_check_merge_source(L, 1);
+ assert(source != NULL);
+ merge_source_unref(source);
+ return 0;
+}
+
+/**
+ * Create a new source from a Lua iterator and push it onto the
+ * Lua stack.
+ *
+ * It is the helper for lbox_merger_new_buffer_source(),
+ * lbox_merger_new_table_source() and
+ * lbox_merger_new_tuple_source().
+ */
+static int
+lbox_merge_source_new(struct lua_State *L, const char *func_name,
+ luaL_merge_source_new_f luaL_merge_source_new)
+{
+ int top = lua_gettop(L);
+ if (top < 1 || top > 3 || !luaL_iscallable(L, 1))
+ return luaL_error(L, "Usage: %s(gen, param, state)", func_name);
+
+ /*
+ * luaL_merge_source_new() reads exactly three top values.
+ */
+ while (lua_gettop(L) < 3)
+ lua_pushnil(L);
+
+ struct merge_source *source = luaL_merge_source_new(L);
+ if (source == NULL) {
+ merge_source_unref(source);
+ return luaT_error(L);
+ }
+ *(struct merge_source **)
+ luaL_pushcdata(L, CTID_STRUCT_MERGE_SOURCE_REF) = source;
+ lua_pushcfunction(L, lbox_merge_source_gc);
+ luaL_setcdatagc(L, -2);
+
+ return 1;
+}
+
+/**
+ * Raise a Lua error with merger.new() usage info.
+ */
+static int
+lbox_merger_new_usage(struct lua_State *L, const char *param_name)
+{
+ static const char *usage = "merger.new(key_def, "
+ "{source, source, ...}[, {"
+ "reverse = <boolean> or <nil>}])";
+ if (param_name == NULL)
+ return luaL_error(L, "Bad params, use: %s", usage);
+ else
+ return luaL_error(L, "Bad param \"%s\", use: %s", param_name,
+ usage);
+}
+
+/**
+ * Parse a second parameter of merger.new() into an array of
+ * sources.
+ *
+ * Return an array of pointers to sources and set @a
+ * source_count_ptr. In case of an error set a diag and return
+ * NULL.
+ *
+ * It is the helper for lbox_merger_new().
+ */
+static struct merge_source **
+luaT_merger_new_parse_sources(struct lua_State *L, int idx,
+ uint32_t *source_count_ptr)
+{
+ /* Allocate sources array. */
+ uint32_t source_count = lua_objlen(L, idx);
+ const size_t sources_size = sizeof(struct merge_source *) *
+ source_count;
+ struct merge_source **sources = malloc(sources_size);
+ if (sources == NULL) {
+ diag_set(OutOfMemory, sources_size, "malloc", "sources");
+ return NULL;
+ }
+
+ /* Save all sources. */
+ for (uint32_t i = 0; i < source_count; ++i) {
+ lua_pushinteger(L, i + 1);
+ lua_gettable(L, idx);
+
+ /* Extract a source from a Lua stack. */
+ struct merge_source *source = luaT_check_merge_source(L, -1);
+ if (source == NULL) {
+ free(sources);
+ diag_set(IllegalParams,
+ "Unknown source type at index %d", i + 1);
+ return NULL;
+ }
+ sources[i] = source;
+ }
+ lua_pop(L, source_count);
+
+ *source_count_ptr = source_count;
+ return sources;
+}
+
+/**
+ * Create a new merger and push it to a Lua stack as a merge
+ * source.
+ *
+ * Expect cdata<struct key_def>, a table of sources and
+ * (optionally) a table of options on a Lua stack.
+ */
+static int
+lbox_merger_new(struct lua_State *L)
+{
+ struct key_def *key_def;
+ int top = lua_gettop(L);
+ bool ok = (top == 2 || top == 3) &&
+ /* key_def. */
+ (key_def = luaT_check_key_def(L, 1)) != NULL &&
+ /* Sources. */
+ lua_istable(L, 2) == 1 &&
+ /* Opts. */
+ (lua_isnoneornil(L, 3) == 1 || lua_istable(L, 3) == 1);
+ if (!ok)
+ return lbox_merger_new_usage(L, NULL);
+
+ /* Options. */
+ bool reverse = false;
+
+ /* Parse options. */
+ if (!lua_isnoneornil(L, 3)) {
+ /* Parse reverse. */
+ lua_pushstring(L, "reverse");
+ lua_gettable(L, 3);
+ if (!lua_isnil(L, -1)) {
+ if (lua_isboolean(L, -1))
+ reverse = lua_toboolean(L, -1);
+ else
+ return lbox_merger_new_usage(L, "reverse");
+ }
+ lua_pop(L, 1);
+ }
+
+ uint32_t source_count = 0;
+ struct merge_source **sources = luaT_merger_new_parse_sources(L, 2,
+ &source_count);
+ if (sources == NULL)
+ return luaT_error(L);
+
+ struct merge_source *merger = merger_new(key_def, sources, source_count,
+ reverse);
+ free(sources);
+ if (merger == NULL) {
+ merge_source_unref(merger);
+ return luaT_error(L);
+ }
+
+ *(struct merge_source **)
+ luaL_pushcdata(L, CTID_STRUCT_MERGE_SOURCE_REF) = merger;
+ lua_pushcfunction(L, lbox_merge_source_gc);
+ luaL_setcdatagc(L, -2);
+
+ return 1;
+}
+
+/* }}} */
+
+/* {{{ Buffer merge source */
+
+struct merge_source_buffer {
+ struct merge_source base;
+ /*
+ * A reference to a Lua iterator to fetch a next chunk of
+ * tuples.
+ */
+ struct luaL_iterator *fetch_it;
+ /*
+ * A reference to a buffer storing the current chunk of
+ * tuples. It is needed to prevent LuaJIT from collecting
+ * the buffer while the source consider it as the current
+ * one.
+ */
+ int ref;
+ /*
+ * A buffer with a current chunk of tuples.
+ */
+ struct ibuf *buf;
+ /*
+ * A merger stops before end of a buffer when it is not
+ * the last merger in the chain.
+ */
+ size_t remaining_tuple_count;
+};
+
+/* Virtual methods declarations */
+
+static void
+luaL_merge_source_buffer_destroy(struct merge_source *base);
+static int
+luaL_merge_source_buffer_next(struct merge_source *base,
+ struct tuple_format *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Create a new merge source of the buffer type.
+ *
+ * Reads gen, param, state from the top of a Lua stack.
+ *
+ * In case of an error it returns NULL and sets a diag.
+ */
+static struct merge_source *
+luaL_merge_source_buffer_new(struct lua_State *L)
+{
+ static struct merge_source_vtab merge_source_buffer_vtab = {
+ .destroy = luaL_merge_source_buffer_destroy,
+ .next = luaL_merge_source_buffer_next,
+ };
+
+ struct merge_source_buffer *source = malloc(
+ sizeof(struct merge_source_buffer));
+ if (source == NULL) {
+ diag_set(OutOfMemory, sizeof(struct merge_source_buffer),
+ "malloc", "merge_source_buffer");
+ return NULL;
+ }
+
+ merge_source_create(&source->base, &merge_source_buffer_vtab);
+
+ source->fetch_it = luaL_iterator_new(L, 0);
+ source->ref = 0;
+ source->buf = NULL;
+ source->remaining_tuple_count = 0;
+
+ return &source->base;
+}
+
+/**
+ * Call a user provided function to get a next data chunk (a
+ * buffer).
+ *
+ * Return 1 when a new buffer is received, 0 when a buffers
+ * iterator ends and -1 at error and set a diag.
+ */
+static int
+luaL_merge_source_buffer_fetch(struct merge_source_buffer *source)
+{
+ struct lua_State *L = fiber()->storage.lua.stack;
+ int nresult = luaL_iterator_next(L, source->fetch_it);
+
+ /* Handle a Lua error in a gen function. */
+ if (nresult == -1)
+ return -1;
+
+ /* No more data: do nothing. */
+ if (nresult == 0)
+ return 0;
+
+ /* Handle incorrect results count. */
+ if (nresult != 2) {
+ diag_set(IllegalParams, "Expected <state>, <buffer>, got %d "
+ "return values", nresult);
+ return -1;
+ }
+
+ /* Set a new buffer as the current chunk. */
+ if (source->ref > 0) {
+ luaL_unref(L, LUA_REGISTRYINDEX, source->ref);
+ source->ref = 0;
+ }
+ lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */
+ source->buf = luaT_check_ibuf(L, -1);
+ if (source->buf == NULL) {
+ diag_set(IllegalParams, "Expected <state>, <buffer>");
+ return -1;
+ }
+ source->ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ lua_pop(L, nresult);
+
+ /* Update remaining_tuple_count and skip the header. */
+ if (decode_header(source->buf, &source->remaining_tuple_count) != 0) {
+ diag_set(IllegalParams, "Invalid merge source %p",
+ &source->base);
+ return -1;
+ }
+ return 1;
+}
+
+/* Virtual methods */
+
+static void
+luaL_merge_source_buffer_destroy(struct merge_source *base)
+{
+ struct merge_source_buffer *source = container_of(base,
+ struct merge_source_buffer, base);
+
+ assert(source->fetch_it != NULL);
+ luaL_iterator_delete(source->fetch_it);
+ if (source->ref > 0)
+ luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref);
+
+ free(source);
+}
+
+static int
+luaL_merge_source_buffer_next(struct merge_source *base,
+ struct tuple_format *format,
+ struct tuple **out)
+{
+ struct merge_source_buffer *source = container_of(base,
+ struct merge_source_buffer, base);
+
+ /*
+ * Handle the case when all data were processed: ask a
+ * next chunk until a non-empty chunk is received or a
+ * chunks iterator ends.
+ */
+ while (source->remaining_tuple_count == 0) {
+ int rc = luaL_merge_source_buffer_fetch(source);
+ if (rc < 0)
+ return -1;
+ if (rc == 0) {
+ *out = NULL;
+ return 0;
+ }
+ }
+ if (ibuf_used(source->buf) == 0) {
+ diag_set(IllegalParams, "Unexpected msgpack buffer end");
+ return -1;
+ }
+ const char *tuple_beg = source->buf->rpos;
+ const char *tuple_end = tuple_beg;
+ if (mp_check(&tuple_end, source->buf->wpos) != 0) {
+ diag_set(IllegalParams, "Unexpected msgpack buffer end");
+ return -1;
+ }
+ --source->remaining_tuple_count;
+ source->buf->rpos = (char *) tuple_end;
+ if (format == NULL)
+ format = tuple_format_runtime;
+ struct tuple *tuple = tuple_new(format, tuple_beg, tuple_end);
+ if (tuple == NULL)
+ return -1;
+
+ tuple_ref(tuple);
+ *out = tuple;
+ return 0;
+}
+
+/* Lua functions */
+
+/**
+ * Create a new buffer source and push it onto the Lua stack.
+ */
+static int
+lbox_merger_new_buffer_source(struct lua_State *L)
+{
+ return lbox_merge_source_new(L, "merger.new_buffer_source",
+ luaL_merge_source_buffer_new);
+}
+
+/* }}} */
+
+/* {{{ Table merge source */
+
+struct merge_source_table {
+ struct merge_source base;
+ /*
+ * A reference to a Lua iterator to fetch a next chunk of
+ * tuples.
+ */
+ struct luaL_iterator *fetch_it;
+ /*
+ * A reference to a table with a current chunk of tuples.
+ */
+ int ref;
+ /* An index of current tuples within a current chunk. */
+ int next_idx;
+};
+
+/* Virtual methods declarations */
+
+static void
+luaL_merge_source_table_destroy(struct merge_source *base);
+static int
+luaL_merge_source_table_next(struct merge_source *base,
+ struct tuple_format *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Create a new merge source of the table type.
+ *
+ * In case of an error it returns NULL and set a diag.
+ */
+static struct merge_source *
+luaL_merge_source_table_new(struct lua_State *L)
+{
+ static struct merge_source_vtab merge_source_table_vtab = {
+ .destroy = luaL_merge_source_table_destroy,
+ .next = luaL_merge_source_table_next,
+ };
+
+ struct merge_source_table *source = malloc(
+ sizeof(struct merge_source_table));
+ if (source == NULL) {
+ diag_set(OutOfMemory, sizeof(struct merge_source_table),
+ "malloc", "merge_source_table");
+ return NULL;
+ }
+
+ merge_source_create(&source->base, &merge_source_table_vtab);
+
+ source->fetch_it = luaL_iterator_new(L, 0);
+ source->ref = 0;
+ source->next_idx = 1;
+
+ return &source->base;
+}
+
+/**
+ * Call a user provided function to fill the source.
+ *
+ * Return 0 when a tables iterator ends, 1 when a new table is
+ * received and -1 at an error (set a diag).
+ */
+static int
+luaL_merge_source_table_fetch(struct merge_source_table *source)
+{
+ struct lua_State *L = fiber()->storage.lua.stack;
+ int nresult = luaL_iterator_next(L, source->fetch_it);
+
+ /* Handle a Lua error in a gen function. */
+ if (nresult == -1)
+ return -1;
+
+ /* No more data: do nothing. */
+ if (nresult == 0)
+ return 0;
+
+ /* Handle incorrect results count. */
+ if (nresult != 2) {
+ diag_set(IllegalParams, "Expected <state>, <table>, got %d "
+ "return values", nresult);
+ return -1;
+ }
+
+ /* Set a new table as the current chunk. */
+ if (source->ref > 0) {
+ luaL_unref(L, LUA_REGISTRYINDEX, source->ref);
+ source->ref = 0;
+ }
+ lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */
+ if (lua_istable(L, -1) == 0) {
+ diag_set(IllegalParams, "Expected <state>, <table>");
+ return -1;
+ }
+ source->ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ source->next_idx = 1;
+ lua_pop(L, nresult);
+
+ return 1;
+}
+
+/* Virtual methods */
+
+static void
+luaL_merge_source_table_destroy(struct merge_source *base)
+{
+ struct merge_source_table *source = container_of(base,
+ struct merge_source_table, base);
+
+ assert(source->fetch_it != NULL);
+ luaL_iterator_delete(source->fetch_it);
+ if (source->ref > 0)
+ luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref);
+
+ free(source);
+}
+
+static int
+luaL_merge_source_table_next(struct merge_source *base,
+ struct tuple_format *format,
+ struct tuple **out)
+{
+ struct lua_State *L = fiber()->storage.lua.stack;
+ struct merge_source_table *source = container_of(base,
+ struct merge_source_table, base);
+
+ if (source->ref > 0) {
+ lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref);
+ lua_pushinteger(L, source->next_idx);
+ lua_gettable(L, -2);
+ }
+ /*
+ * If all data were processed, try to fetch more.
+ */
+ while (source->ref == 0 || lua_isnil(L, -1)) {
+ if (source->ref > 0)
+ lua_pop(L, 2);
+ int rc = luaL_merge_source_table_fetch(source);
+ if (rc < 0)
+ return -1;
+ if (rc == 0) {
+ *out = NULL;
+ return 0;
+ }
+ /*
+ * Retry tuple extracting when a next table is
+ * received.
+ */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, source->ref);
+ lua_pushinteger(L, source->next_idx);
+ lua_gettable(L, -2);
+ }
+
+ struct tuple *tuple = luaT_gettuple(L, -1, format);
+ if (tuple == NULL)
+ return -1;
+
+ ++source->next_idx;
+ lua_pop(L, 2);
+
+ tuple_ref(tuple);
+ *out = tuple;
+ return 0;
+}
+
+/* Lua functions */
+
+/**
+ * Create a new table source and push it onto the Lua stack.
+ */
+static int
+lbox_merger_new_table_source(struct lua_State *L)
+{
+ return lbox_merge_source_new(L, "merger.new_table_source",
+ luaL_merge_source_table_new);
+}
+
+/* }}} */
+
+/* {{{ Tuple merge source */
+
+struct merge_source_tuple {
+ struct merge_source base;
+ /*
+ * A reference to a Lua iterator to fetch a next chunk of
+ * tuples.
+ */
+ struct luaL_iterator *fetch_it;
+};
+
+/* Virtual methods declarations */
+
+static void
+luaL_merge_source_tuple_destroy(struct merge_source *base);
+static int
+luaL_merge_source_tuple_next(struct merge_source *base,
+ struct tuple_format *format,
+ struct tuple **out);
+
+/* Non-virtual methods */
+
+/**
+ * Create a new merge source of the tuple type.
+ *
+ * In case of an error it returns NULL and set a diag.
+ */
+static struct merge_source *
+luaL_merge_source_tuple_new(struct lua_State *L)
+{
+ static struct merge_source_vtab merge_source_tuple_vtab = {
+ .destroy = luaL_merge_source_tuple_destroy,
+ .next = luaL_merge_source_tuple_next,
+ };
+
+ struct merge_source_tuple *source = malloc(
+ sizeof(struct merge_source_tuple));
+ if (source == NULL) {
+ diag_set(OutOfMemory, sizeof(struct merge_source_tuple),
+ "malloc", "merge_source_tuple");
+ return NULL;
+ }
+
+ merge_source_create(&source->base, &merge_source_tuple_vtab);
+
+ source->fetch_it = luaL_iterator_new(L, 0);
+
+ return &source->base;
+}
+
+/**
+ * Call a user provided function to fill the source.
+ *
+ * This function does not check whether a user-provided value
+ * is a tuple. A called should check it on its side.
+ *
+ * Return 1 at success and push a resulting tuple to a the Lua
+ * stack.
+ * Return 0 when no more data.
+ * Return -1 at error (set a diag).
+ */
+static int
+luaL_merge_source_tuple_fetch(struct merge_source_tuple *source,
+ struct lua_State *L)
+{
+ int nresult = luaL_iterator_next(L, source->fetch_it);
+
+ /* Handle a Lua error in a gen function. */
+ if (nresult == -1)
+ return -1;
+
+ /* No more data: do nothing. */
+ if (nresult == 0)
+ return 0;
+
+ /* Handle incorrect results count. */
+ if (nresult != 2) {
+ diag_set(IllegalParams, "Expected <state>, <tuple> got %d "
+ "return values", nresult);
+ return -1;
+ }
+
+ /* Set a new tuple as the current chunk. */
+ lua_insert(L, -2); /* Swap state and tuple. */
+ lua_pop(L, 1); /* Pop state. */
+
+ return 1;
+}
+
+/* Virtual methods */
+
+static void
+luaL_merge_source_tuple_destroy(struct merge_source *base)
+{
+ struct merge_source_tuple *source = container_of(base,
+ struct merge_source_tuple, base);
+
+ assert(source->fetch_it != NULL);
+ luaL_iterator_delete(source->fetch_it);
+
+ free(source);
+}
+
+static int
+luaL_merge_source_tuple_next(struct merge_source *base,
+ struct tuple_format *format,
+ struct tuple **out)
+{
+ struct lua_State *L = fiber()->storage.lua.stack;
+ struct merge_source_tuple *source = container_of(base,
+ struct merge_source_tuple, base);
+
+ int rc = luaL_merge_source_tuple_fetch(source, L);
+ if (rc < 0)
+ return -1;
+ /*
+ * Check whether a tuple appears after the fetch.
+ */
+ if (rc == 0) {
+ *out = NULL;
+ return 0;
+ }
+
+ struct tuple *tuple = luaT_gettuple(L, -1, format);
+ if (tuple == NULL)
+ return -1;
+
+ lua_pop(L, 1);
+ tuple_ref(tuple);
+ *out = tuple;
+ return 0;
+}
+
+/* Lua functions */
+
+/**
+ * Create a new tuple source and push it onto the Lua stack.
+ */
+static int
+lbox_merger_new_tuple_source(struct lua_State *L)
+{
+ return lbox_merge_source_new(L, "merger.new_tuple_source",
+ luaL_merge_source_tuple_new);
+}
+
+/* }}} */
+
+/* {{{ Merge source Lua methods */
+
+/**
+ * Iterator gen function to traverse source results.
+ *
+ * Expected a nil as the first parameter (param) and a
+ * merge_source as the second parameter (state) on a Lua stack.
+ *
+ * Push the original merge_source (as a new state) and a next
+ * tuple.
+ */
+static int
+lbox_merge_source_gen(struct lua_State *L)
+{
+ struct merge_source *source;
+ bool ok = lua_gettop(L) == 2 && lua_isnil(L, 1) &&
+ (source = luaT_check_merge_source(L, 2)) != NULL;
+ if (!ok)
+ return luaL_error(L, "Bad params, use: lbox_merge_source_gen("
+ "nil, merge_source)");
+
+ struct tuple *tuple;
+ if (merge_source_next(source, NULL, &tuple) != 0)
+ return luaT_error(L);
+ if (tuple == NULL) {
+ lua_pushnil(L);
+ lua_pushnil(L);
+ return 2;
+ }
+
+ /* Push merge_source, tuple. */
+ *(struct merge_source **)
+ luaL_pushcdata(L, CTID_STRUCT_MERGE_SOURCE_REF) = source;
+ luaT_pushtuple(L, tuple);
+
+ /*
+ * luaT_pushtuple() references the tuple, so we
+ * unreference it on merger's side.
+ */
+ tuple_unref(tuple);
+
+ return 2;
+}
+
+/**
+ * Iterate over merge source results from Lua.
+ *
+ * Push three values to the Lua stack:
+ *
+ * 1. gen (lbox_merge_source_gen wrapped by fun.wrap());
+ * 2. param (nil);
+ * 3. state (merge_source).
+ */
+static int
+lbox_merge_source_ipairs(struct lua_State *L)
+{
+ struct merge_source *source;
+ bool ok = lua_gettop(L) == 1 &&
+ (source = luaT_check_merge_source(L, 1)) != NULL;
+ if (!ok)
+ return luaL_error(L, "Usage: merge_source:ipairs()");
+ /* Stack: merge_source. */
+
+ luaL_loadstring(L, "return require('fun').wrap");
+ lua_call(L, 0, 1);
+ lua_insert(L, -2); /* Swap merge_source and wrap. */
+ /* Stack: wrap, merge_source. */
+
+ lua_pushcfunction(L, lbox_merge_source_gen);
+ lua_insert(L, -2); /* Swap merge_source and gen. */
+ /* Stack: wrap, gen, merge_source. */
+
+ /*
+ * Push nil as an iterator param, because all needed state
+ * is in a merge source.
+ */
+ lua_pushnil(L);
+ /* Stack: wrap, gen, merge_source, nil. */
+
+ lua_insert(L, -2); /* Swap merge_source and nil. */
+ /* Stack: wrap, gen, nil, merge_source. */
+
+ /* Call fun.wrap(gen, nil, merge_source). */
+ lua_call(L, 3, 3);
+ return 3;
+}
+
+/**
+ * Write source results into ibuf.
+ *
+ * It is the helper for lbox_merge_source_select().
+ */
+static int
+encode_result_buffer(struct lua_State *L, struct merge_source *source,
+ struct ibuf *output_buffer, uint32_t limit)
+{
+ uint32_t result_len = 0;
+ uint32_t result_len_offset = 4;
+
+ /*
+ * Reserve maximum size for the array around resulting
+ * tuples to set it later.
+ */
+ encode_header(output_buffer, UINT32_MAX);
+
+ /* Fetch, merge and copy tuples to the buffer. */
+ struct tuple *tuple;
+ int rc = 0;
+ while (result_len < limit && (rc =
+ merge_source_next(source, NULL, &tuple)) == 0 &&
+ tuple != NULL) {
+ uint32_t bsize = tuple->bsize;
+ ibuf_reserve(output_buffer, bsize);
+ memcpy(output_buffer->wpos, tuple_data(tuple), bsize);
+ output_buffer->wpos += bsize;
+ result_len_offset += bsize;
+ ++result_len;
+
+ /* The received tuple is not more needed. */
+ tuple_unref(tuple);
+ }
+
+ if (rc != 0)
+ return luaT_error(L);
+
+ /* Write the real array size. */
+ mp_store_u32(output_buffer->wpos - result_len_offset, result_len);
+
+ return 0;
+}
+
+/**
+ * Write source results into a new Lua table.
+ *
+ * It is the helper for lbox_merge_source_select().
+ */
+static int
+create_result_table(struct lua_State *L, struct merge_source *source,
+ uint32_t limit)
+{
+ /* Create result table. */
+ lua_newtable(L);
+
+ uint32_t cur = 1;
+
+ /* Fetch, merge and save tuples to the table. */
+ struct tuple *tuple;
+ int rc = 0;
+ while (cur - 1 < limit && (rc =
+ merge_source_next(source, NULL, &tuple)) == 0 &&
+ tuple != NULL) {
+ luaT_pushtuple(L, tuple);
+ lua_rawseti(L, -2, cur);
+ ++cur;
+
+ /*
+ * luaT_pushtuple() references the tuple, so we
+ * unreference it on merger's side.
+ */
+ tuple_unref(tuple);
+ }
+
+ if (rc != 0)
+ return luaT_error(L);
+
+ return 1;
+}
+
+/**
+ * Raise a Lua error with merger_inst:select() usage info.
+ */
+static int
+lbox_merge_source_select_usage(struct lua_State *L, const char *param_name)
+{
+ static const char *usage = "merge_source:select([{"
+ "buffer = <cdata<struct ibuf>> or <nil>, "
+ "limit = <number> or <nil>}])";
+ if (param_name == NULL)
+ return luaL_error(L, "Bad params, use: %s", usage);
+ else
+ return luaL_error(L, "Bad param \"%s\", use: %s", param_name,
+ usage);
+}
+
+/**
+ * Pull results of a merge source to a Lua stack.
+ *
+ * Write results into a buffer or a Lua table depending on
+ * options.
+ *
+ * Expected a merge source and options (optional) on a Lua stack.
+ *
+ * Return a Lua table or nothing when a 'buffer' option is
+ * provided.
+ */
+static int
+lbox_merge_source_select(struct lua_State *L)
+{
+ struct merge_source *source;
+ int top = lua_gettop(L);
+ bool ok = (top == 1 || top == 2) &&
+ /* Merge source. */
+ (source = luaT_check_merge_source(L, 1)) != NULL &&
+ /* Opts. */
+ (lua_isnoneornil(L, 2) == 1 || lua_istable(L, 2) == 1);
+ if (!ok)
+ return lbox_merge_source_select_usage(L, NULL);
+
+ uint32_t limit = UINT32_MAX;
+ struct ibuf *output_buffer = NULL;
+
+ /* Parse options. */
+ if (!lua_isnoneornil(L, 2)) {
+ /* Parse buffer. */
+ lua_pushstring(L, "buffer");
+ lua_gettable(L, 2);
+ if (!lua_isnil(L, -1)) {
+ if ((output_buffer = luaT_check_ibuf(L, -1)) == NULL)
+ return lbox_merge_source_select_usage(L,
+ "buffer");
+ }
+ lua_pop(L, 1);
+
+ /* Parse limit. */
+ lua_pushstring(L, "limit");
+ lua_gettable(L, 2);
+ if (!lua_isnil(L, -1)) {
+ if (lua_isnumber(L, -1))
+ limit = lua_tointeger(L, -1);
+ else
+ return lbox_merge_source_select_usage(L,
+ "limit");
+ }
+ lua_pop(L, 1);
+ }
+
+ if (output_buffer == NULL)
+ return create_result_table(L, source, limit);
+ else
+ return encode_result_buffer(L, source, output_buffer, limit);
+}
+
+/* }}} */
+
+/**
+ * Register the module.
+ */
+LUA_API int
+luaopen_merger(struct lua_State *L)
+{
+ luaL_cdef(L, "struct merge_source;");
+ luaL_cdef(L, "struct ibuf;");
+
+ CTID_STRUCT_MERGE_SOURCE_REF = luaL_ctypeid(L, "struct merge_source&");
+ CTID_STRUCT_IBUF = luaL_ctypeid(L, "struct ibuf");
+
+ /* Export C functions to Lua. */
+ static const struct luaL_Reg meta[] = {
+ {"new_buffer_source", lbox_merger_new_buffer_source},
+ {"new_table_source", lbox_merger_new_table_source},
+ {"new_tuple_source", lbox_merger_new_tuple_source},
+ {"new", lbox_merger_new},
+ {NULL, NULL}
+ };
+ luaL_register_module(L, "merger", meta);
+
+ /* Add internal.{select,ipairs}(). */
+ lua_newtable(L); /* merger.internal */
+ lua_pushcfunction(L, lbox_merge_source_select);
+ lua_setfield(L, -2, "select");
+ lua_pushcfunction(L, lbox_merge_source_ipairs);
+ lua_setfield(L, -2, "ipairs");
+ lua_setfield(L, -2, "internal");
+
+ return 1;
+}
diff --git a/src/box/lua/merger.h b/src/box/lua/merger.h
new file mode 100644
index 000000000..c3f648678
--- /dev/null
+++ b/src/box/lua/merger.h
@@ -0,0 +1,47 @@
+#ifndef TARANTOOL_BOX_LUA_MERGER_H_INCLUDED
+#define TARANTOOL_BOX_LUA_MERGER_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct lua_State;
+
+int
+luaopen_merger(struct lua_State *L);
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_BOX_LUA_MERGER_H_INCLUDED */
diff --git a/src/box/lua/merger.lua b/src/box/lua/merger.lua
new file mode 100644
index 000000000..11abd1a25
--- /dev/null
+++ b/src/box/lua/merger.lua
@@ -0,0 +1,41 @@
+local ffi = require('ffi')
+local fun = require('fun')
+local merger = require('merger')
+
+local ibuf_t = ffi.typeof('struct ibuf')
+local merge_source_t = ffi.typeof('struct merge_source')
+
+-- Create a source from one buffer.
+merger.new_source_frombuffer = function(buf)
+ local func_name = 'merger.new_source_frombuffer'
+ if type(buf) ~= 'cdata' or not ffi.istype(ibuf_t, buf) then
+ error(('Usage: %s(<cdata<struct ibuf>>)'):format(func_name), 0)
+ end
+
+ return merger.new_buffer_source(fun.iter({buf}))
+end
+
+-- Create a source from one table.
+merger.new_source_fromtable = function(tbl)
+ local func_name = 'merger.new_source_fromtable'
+ if type(tbl) ~= 'table' then
+ error(('Usage: %s(<table>)'):format(func_name), 0)
+ end
+
+ return merger.new_table_source(fun.iter({tbl}))
+end
+
+local methods = {
+ ['select'] = merger.internal.select,
+ ['pairs'] = merger.internal.ipairs,
+ ['ipairs'] = merger.internal.ipairs,
+}
+
+ffi.metatype(merge_source_t, {
+ __index = function(self, key)
+ return methods[key]
+ end,
+ -- Lua 5.2 compatibility
+ __pairs = merger.internal.ipairs,
+ __ipairs = merger.internal.ipairs,
+})
diff --git a/test/box-tap/merger.test.lua b/test/box-tap/merger.test.lua
new file mode 100755
index 000000000..ee9eaeaed
--- /dev/null
+++ b/test/box-tap/merger.test.lua
@@ -0,0 +1,768 @@
+#!/usr/bin/env tarantool
+
+local tap = require('tap')
+local buffer = require('buffer')
+local msgpackffi = require('msgpackffi')
+local digest = require('digest')
+local key_def_lib = require('key_def')
+local merger = require('merger')
+local fiber = require('fiber')
+local utf8 = require('utf8')
+local ffi = require('ffi')
+local fun = require('fun')
+
+-- A chunk size for table and buffer sources. A chunk size for
+-- tuple source is always 1.
+local FETCH_BLOCK_SIZE = 10
+
+local function merger_new_usage(param)
+ local msg = 'merger.new(key_def, ' ..
+ '{source, source, ...}[, {' ..
+ 'reverse = <boolean> or <nil>}])'
+ if not param then
+ return ('Bad params, use: %s'):format(msg)
+ else
+ return ('Bad param "%s", use: %s'):format(param, msg)
+ end
+end
+
+local function merger_select_usage(param)
+ local msg = 'merge_source:select([{' ..
+ 'buffer = <cdata<struct ibuf>> or <nil>, ' ..
+ 'limit = <number> or <nil>}])'
+ if not param then
+ return ('Bad params, use: %s'):format(msg)
+ else
+ return ('Bad param "%s", use: %s'):format(param, msg)
+ end
+end
+
+-- Get buffer with data encoded without last 'trunc' bytes.
+local function truncated_msgpack_buffer(data, trunc)
+ local data = msgpackffi.encode(data)
+ data = data:sub(1, data:len() - trunc)
+ local len = data:len()
+ local buf = buffer.ibuf()
+ -- Ensure we have enough buffer to write len + trunc bytes.
+ buf:reserve(len + trunc)
+ local p = buf:alloc(len)
+ -- Ensure len bytes follows with trunc zero bytes.
+ ffi.copy(p, data .. string.rep('\0', trunc), len + trunc)
+ return buf
+end
+
+local function truncated_msgpack_source(data, trunc)
+ local buf = truncated_msgpack_buffer(data, trunc)
+ return merger.new_source_frombuffer(buf)
+end
+
+local bad_source_new_calls = {
+ {
+ 'Bad fetch iterator',
+ funcs = {'new_buffer_source', 'new_table_source',
+ 'new_tuple_source'},
+ params = {1},
+ exp_err = '^Usage: merger%.[a-z_]+%(gen, param, state%)$',
+ },
+ {
+ 'Bad chunk type',
+ funcs = {'new_source_frombuffer', 'new_source_fromtable'},
+ params = {1},
+ exp_err = '^Usage: merger%.[a-z_]+%(<.+>%)$',
+ },
+ {
+ 'Bad buffer chunk',
+ funcs = {'new_source_frombuffer'},
+ params = {ffi.new('char *')},
+ exp_err = '^Usage: merger%.[a-z_]+%(<cdata<struct ibuf>>%)$',
+ },
+}
+
+local bad_chunks = {
+ {
+ 'Bad buffer source chunk (not cdata)',
+ func = 'new_buffer_source',
+ chunk = 1,
+ exp_err = 'Expected <state>, <buffer>',
+ },
+ {
+ 'Bad buffer source chunk (wrong ctype)',
+ func = 'new_buffer_source',
+ chunk = ffi.new('char *'),
+ exp_err = 'Expected <state>, <buffer>',
+ },
+ {
+ 'Bad table source chunk',
+ func = 'new_table_source',
+ chunk = 1,
+ exp_err = 'Expected <state>, <table>',
+ },
+ {
+ 'Bad tuple source chunk (not cdata)',
+ func = 'new_tuple_source',
+ chunk = 1,
+ exp_err = 'A tuple or a table expected, got number',
+ },
+ {
+ 'Bad tuple source chunk (wrong ctype)',
+ func = 'new_tuple_source',
+ chunk = ffi.new('char *'),
+ exp_err = 'A tuple or a table expected, got cdata',
+ },
+}
+
+local bad_merger_new_calls = {
+ {
+ 'Bad opts',
+ sources = {},
+ opts = 1,
+ exp_err = merger_new_usage(nil),
+ },
+ {
+ 'Bad opts.reverse',
+ sources = {},
+ opts = {reverse = 1},
+ exp_err = merger_new_usage('reverse'),
+ },
+}
+
+local bad_merger_select_calls = {
+ {
+ 'Wrong source of table type',
+ sources = {merger.new_source_fromtable({1})},
+ opts = nil,
+ exp_err = 'A tuple or a table expected, got number',
+ },
+ {
+ 'Bad msgpack source: wrong length of the tuples array',
+ -- Remove the last tuple from msgpack data, but keep old
+ -- tuples array size.
+ sources = {
+ truncated_msgpack_source({{''}, {''}, {''}}, 2),
+ },
+ opts = {},
+ exp_err = 'Unexpected msgpack buffer end',
+ },
+ {
+ 'Bad msgpack source: wrong length of a tuple',
+ -- Remove half of the last tuple, but keep old tuple size.
+ sources = {
+ truncated_msgpack_source({{''}, {''}, {''}}, 1),
+ },
+ opts = {},
+ exp_err = 'Unexpected msgpack buffer end',
+ },
+ {
+ 'Bad opts.buffer (wrong type)',
+ sources = {},
+ opts = {buffer = 1},
+ exp_err = merger_select_usage('buffer'),
+ },
+ {
+ 'Bad opts.buffer (wrong cdata type)',
+ sources = {},
+ opts = {buffer = ffi.new('char *')},
+ exp_err = merger_select_usage('buffer'),
+ },
+ {
+ 'Bad opts.limit (wrong type)',
+ sources = {},
+ opts = {limit = 'hello'},
+ exp_err = merger_select_usage('limit'),
+ }
+}
+
+local schemas = {
+ {
+ name = 'small_unsigned',
+ parts = {
+ {
+ fieldno = 2,
+ type = 'unsigned',
+ }
+ },
+ gen_tuple = function(tupleno)
+ return {'id_' .. tostring(tupleno), tupleno}
+ end,
+ },
+ -- Test with N-1 equal parts and Nth different.
+ {
+ name = 'many_parts',
+ parts = (function()
+ local parts = {}
+ for i = 1, 16 do
+ parts[i] = {
+ fieldno = i,
+ type = 'unsigned',
+ }
+ end
+ return parts
+ end)(),
+ gen_tuple = function(tupleno)
+ local tuple = {}
+ -- 15 constant parts
+ for i = 1, 15 do
+ tuple[i] = i
+ end
+ -- 16th part is varying
+ tuple[16] = tupleno
+ return tuple
+ end,
+ -- reduce tuple count to decrease test run time
+ tuple_count = 16,
+ },
+ -- Test null value in nullable field of an index.
+ {
+ name = 'nullable',
+ parts = {
+ {
+ fieldno = 1,
+ type = 'unsigned',
+ },
+ {
+ fieldno = 2,
+ type = 'string',
+ is_nullable = true,
+ },
+ },
+ gen_tuple = function(i)
+ if i % 1 == 1 then
+ return {0, tostring(i)}
+ else
+ return {0, box.NULL}
+ end
+ end,
+ },
+ -- Test index part with 'collation_id' option (as in net.box's
+ -- response).
+ {
+ name = 'collation_id',
+ parts = {
+ {
+ fieldno = 1,
+ type = 'string',
+ collation_id = 2, -- unicode_ci
+ },
+ },
+ gen_tuple = function(i)
+ local letters = {'a', 'b', 'c', 'A', 'B', 'C'}
+ if i <= #letters then
+ return {letters[i]}
+ else
+ return {''}
+ end
+ end,
+ },
+ -- Test index part with 'collation' option (as in local index
+ -- parts).
+ {
+ name = 'collation',
+ parts = {
+ {
+ fieldno = 1,
+ type = 'string',
+ collation = 'unicode_ci',
+ },
+ },
+ gen_tuple = function(i)
+ local letters = {'a', 'b', 'c', 'A', 'B', 'C'}
+ if i <= #letters then
+ return {letters[i]}
+ else
+ return {''}
+ end
+ end,
+ },
+}
+
+local function is_unicode_ci_part(part)
+ return part.collation_id == 2 or part.collation == 'unicode_ci'
+end
+
+local function tuple_comparator(a, b, parts)
+ for _, part in ipairs(parts) do
+ local fieldno = part.fieldno
+ if a[fieldno] ~= b[fieldno] then
+ if a[fieldno] == nil then
+ return -1
+ end
+ if b[fieldno] == nil then
+ return 1
+ end
+ if is_unicode_ci_part(part) then
+ return utf8.casecmp(a[fieldno], b[fieldno])
+ end
+ return a[fieldno] < b[fieldno] and -1 or 1
+ end
+ end
+
+ return 0
+end
+
+local function sort_tuples(tuples, parts, opts)
+ local function tuple_comparator_wrapper(a, b)
+ local cmp = tuple_comparator(a, b, parts)
+ if cmp < 0 then
+ return not opts.reverse
+ elseif cmp > 0 then
+ return opts.reverse
+ else
+ return false
+ end
+ end
+
+ table.sort(tuples, tuple_comparator_wrapper)
+end
+
+local function lowercase_unicode_ci_fields(tuples, parts)
+ for i = 1, #tuples do
+ local tuple = tuples[i]
+ for _, part in ipairs(parts) do
+ if is_unicode_ci_part(part) then
+ -- Workaround #3709.
+ if tuple[part.fieldno]:len() > 0 then
+ tuple[part.fieldno] = utf8.lower(tuple[part.fieldno])
+ end
+ end
+ end
+ end
+end
+
+local function fetch_source_gen(param, state)
+ local input_type = param.input_type
+ local tuples = param.tuples
+ local last_pos = state.last_pos
+ local fetch_block_size = FETCH_BLOCK_SIZE
+ -- A chunk size is always 1 for a tuple source.
+ if input_type == 'tuple' then
+ fetch_block_size = 1
+ end
+ local data = fun.iter(tuples):drop(last_pos):take(
+ fetch_block_size):totable()
+ if #data == 0 then
+ return
+ end
+ local new_state = {last_pos = last_pos + #data}
+ if input_type == 'table' then
+ return new_state, data
+ elseif input_type == 'buffer' then
+ local buf = buffer.ibuf()
+ msgpackffi.internal.encode_r(buf, data, 0)
+ return new_state, buf
+ elseif input_type == 'tuple' then
+ assert(#data <= 1)
+ if #data == 0 then return end
+ return new_state, data[1]
+ else
+ assert(false)
+ end
+end
+
+local function fetch_source_iterator(input_type, tuples)
+ local param = {
+ input_type = input_type,
+ tuples = tuples,
+ }
+ local state = {
+ last_pos = 0,
+ }
+ return fetch_source_gen, param, state
+end
+
+local function prepare_data(schema, tuple_count, source_count, opts)
+ local opts = opts or {}
+ local input_type = opts.input_type
+ local use_table_as_tuple = opts.use_table_as_tuple
+ local use_fetch_source = opts.use_fetch_source
+
+ local tuples = {}
+ local exp_result = {}
+
+ -- Ensure empty sources are empty table and not nil.
+ for i = 1, source_count do
+ if tuples[i] == nil then
+ tuples[i] = {}
+ end
+ end
+
+ -- Prepare N tables with tuples as input for merger.
+ for i = 1, tuple_count do
+ -- [1, source_count]
+ local guava = digest.guava(i, source_count) + 1
+ local tuple = schema.gen_tuple(i)
+ table.insert(exp_result, tuple)
+ if not use_table_as_tuple then
+ assert(input_type ~= 'buffer')
+ tuple = box.tuple.new(tuple)
+ end
+ table.insert(tuples[guava], tuple)
+ end
+
+ -- Sort tuples within each source.
+ for _, source_tuples in pairs(tuples) do
+ sort_tuples(source_tuples, schema.parts, opts)
+ end
+
+ -- Sort expected result.
+ sort_tuples(exp_result, schema.parts, opts)
+
+ -- Fill sources.
+ local sources
+ if use_fetch_source then
+ sources = {}
+ for i = 1, source_count do
+ local func = ('new_%s_source'):format(input_type)
+ sources[i] = merger[func](fetch_source_iterator(input_type,
+ tuples[i]))
+ end
+ elseif input_type == 'table' then
+ -- Imitate netbox's select w/o {buffer = ...}.
+ sources = {}
+ for i = 1, source_count do
+ sources[i] = merger.new_source_fromtable(tuples[i])
+ end
+ elseif input_type == 'buffer' then
+ -- Imitate netbox's select with {buffer = ...}.
+ sources = {}
+ for i = 1, source_count do
+ local buf = buffer.ibuf()
+ sources[i] = merger.new_source_frombuffer(buf)
+ msgpackffi.internal.encode_r(buf, tuples[i], 0)
+ end
+ elseif input_type == 'tuple' then
+ assert(false)
+ else
+ assert(false)
+ end
+
+ return sources, exp_result
+end
+
+local function test_case_opts_str(opts)
+ local params = {}
+
+ if opts.input_type then
+ table.insert(params, 'input_type: ' .. opts.input_type)
+ end
+
+ if opts.output_type then
+ table.insert(params, 'output_type: ' .. opts.output_type)
+ end
+
+ if opts.reverse then
+ table.insert(params, 'reverse')
+ end
+
+ if opts.use_table_as_tuple then
+ table.insert(params, 'use_table_as_tuple')
+ end
+
+ if opts.use_fetch_source then
+ table.insert(params, 'use_fetch_source')
+ end
+
+ if next(params) == nil then
+ return ''
+ end
+
+ return (' (%s)'):format(table.concat(params, ', '))
+end
+
+local function run_merger(test, schema, tuple_count, source_count, opts)
+ fiber.yield()
+
+ local opts = opts or {}
+
+ -- Prepare data.
+ local sources, exp_result = prepare_data(schema, tuple_count, source_count,
+ opts)
+
+ -- Create a merger instance.
+ local merger_inst = merger.new(schema.key_def, sources,
+ {reverse = opts.reverse})
+
+ local res
+
+ -- Run merger and prepare output for compare.
+ if opts.output_type == 'table' then
+ -- Table output.
+ res = merger_inst:select()
+ elseif opts.output_type == 'buffer' then
+ -- Buffer output.
+ local output_buffer = buffer.ibuf()
+ merger_inst:select({buffer = output_buffer})
+ res = msgpackffi.decode(output_buffer.rpos)
+ else
+ -- Tuple output.
+ assert(opts.output_type == 'tuple')
+ res = merger_inst:pairs():totable()
+ end
+
+ -- A bit more postprocessing to compare.
+ for i = 1, #res do
+ if type(res[i]) ~= 'table' then
+ res[i] = res[i]:totable()
+ end
+ end
+
+ -- unicode_ci does not differentiate btw 'A' and 'a', so the
+ -- order is arbitrary. We transform fields with unicode_ci
+ -- collation in parts to lower case before comparing.
+ lowercase_unicode_ci_fields(res, schema.parts)
+ lowercase_unicode_ci_fields(exp_result, schema.parts)
+
+ test:is_deeply(res, exp_result,
+ ('check order on %3d tuples in %4d sources%s')
+ :format(tuple_count, source_count, test_case_opts_str(opts)))
+end
+
+local function run_case(test, schema, opts)
+ local opts = opts or {}
+
+ local case_name = ('testing on schema %s%s'):format(
+ schema.name, test_case_opts_str(opts))
+ local tuple_count = schema.tuple_count or 100
+
+ local input_type = opts.input_type
+ local use_table_as_tuple = opts.use_table_as_tuple
+ local use_fetch_source = opts.use_fetch_source
+
+ -- Skip meaningless flags combinations.
+ if input_type == 'buffer' and not use_table_as_tuple then
+ return
+ end
+ if input_type == 'tuple' and not use_fetch_source then
+ return
+ end
+
+ test:test(case_name, function(test)
+ test:plan(4)
+
+ -- Check with small buffer count.
+ run_merger(test, schema, tuple_count, 1, opts)
+ run_merger(test, schema, tuple_count, 2, opts)
+ run_merger(test, schema, tuple_count, 5, opts)
+
+ -- Check more buffers then tuple count.
+ run_merger(test, schema, tuple_count, 128, opts)
+ end)
+end
+
+local test = tap.test('merger')
+test:plan(#bad_source_new_calls + #bad_chunks + #bad_merger_new_calls +
+ #bad_merger_select_calls + 6 + #schemas * 48)
+
+-- For collations.
+box.cfg{}
+
+for _, case in ipairs(bad_source_new_calls) do
+ test:test(case[1], function(test)
+ local funcs = case.funcs
+ test:plan(#funcs)
+ for _, func in ipairs(funcs) do
+ local ok, err = pcall(merger[func], unpack(case.params))
+ test:ok(ok == false and err:match(case.exp_err), func)
+ end
+ end)
+end
+
+for _, case in ipairs(bad_chunks) do
+ local source = merger[case.func](function(_, state)
+ return state, case.chunk
+ end, {}, {})
+ local ok, err = pcall(function()
+ return source:pairs():take(1):totable()
+ end)
+ test:ok(ok == false and err:match(case.exp_err), case[1])
+end
+
+-- Create the key_def for the test cases below.
+local key_def = key_def_lib.new({{
+ fieldno = 1,
+ type = 'string',
+}})
+
+-- Bad merger.new() calls.
+for _, case in ipairs(bad_merger_new_calls) do
+ local ok, err = pcall(merger.new, key_def, case.sources, case.opts)
+ err = tostring(err) -- cdata -> string
+ test:is_deeply({ok, err}, {false, case.exp_err}, case[1])
+end
+
+-- Bad source or/and opts parameters for merger's methods.
+for _, case in ipairs(bad_merger_select_calls) do
+ local merger_inst = merger.new(key_def, case.sources)
+ local ok, err = pcall(merger_inst.select, merger_inst, case.opts)
+ err = tostring(err) -- cdata -> string
+ test:is_deeply({ok, err}, {false, case.exp_err}, case[1])
+end
+
+-- Create a key_def for each schema.
+for _, schema in ipairs(schemas) do
+ schema.key_def = key_def_lib.new(schema.parts)
+end
+
+test:test('use a source in two mergers', function(test)
+ test:plan(5)
+
+ local data = {{'a'}, {'b'}, {'c'}}
+ local source = merger.new_source_fromtable(data)
+ local i1 = merger.new(key_def, {source}):pairs()
+ local i2 = merger.new(key_def, {source}):pairs()
+
+ local t1 = i1:head():totable()
+ test:is_deeply(t1, data[1], 'tuple 1 from merger 1')
+
+ local t3 = i2:head():totable()
+ test:is_deeply(t3, data[3], 'tuple 3 from merger 2')
+
+ local t2 = i1:head():totable()
+ test:is_deeply(t2, data[2], 'tuple 2 from merger 1')
+
+ test:ok(i1:is_null(), 'merger 1 ends')
+ test:ok(i2:is_null(), 'merger 2 ends')
+end)
+
+local function reusable_source_gen(param)
+ local chunks = param.chunks
+ local idx = param.idx or 1
+
+ if idx > table.maxn(chunks) then
+ return
+ end
+
+ local chunk = chunks[idx]
+ param.idx = idx + 1
+
+ if chunk == nil then
+ return
+ end
+ return box.NULL, chunk
+end
+
+local function verify_reusable_source(test, source)
+ test:plan(3)
+
+ local exp = {{1}, {2}}
+ local res = source:pairs():map(box.tuple.totable):totable()
+ test:is_deeply(res, exp, '1st use')
+
+ local exp = {{3}, {4}, {5}}
+ local res = source:pairs():map(box.tuple.totable):totable()
+ test:is_deeply(res, exp, '2nd use')
+
+ local exp = {}
+ local res = source:pairs():map(box.tuple.totable):totable()
+ test:is_deeply(res, exp, 'end')
+end
+
+test:test('reuse a tuple source', function(test)
+ local tuples = {{1}, {2}, nil, {3}, {4}, {5}}
+ local source = merger.new_tuple_source(reusable_source_gen,
+ {chunks = tuples})
+ verify_reusable_source(test, source)
+end)
+
+test:test('reuse a table source', function(test)
+ local chunks = {{{1}}, {{2}}, {}, nil, {{3}}, {{4}}, {}, {{5}}}
+ local source = merger.new_table_source(reusable_source_gen,
+ {chunks = chunks})
+ verify_reusable_source(test, source)
+end)
+
+test:test('reuse a buffer source', function(test)
+ local chunks_tbl = {{{1}}, {{2}}, {}, nil, {{3}}, {{4}}, {}, {{5}}}
+ local chunks = {}
+ for i = 1, table.maxn(chunks_tbl) do
+ if chunks_tbl[i] == nil then
+ chunks[i] = nil
+ else
+ chunks[i] = buffer.ibuf()
+ msgpackffi.internal.encode_r(chunks[i], chunks_tbl[i], 0)
+ end
+ end
+ local source = merger.new_buffer_source(reusable_source_gen,
+ {chunks = chunks})
+ verify_reusable_source(test, source)
+end)
+
+test:test('use limit', function(test)
+ test:plan(6)
+
+ local data = {{'a'}, {'b'}}
+
+ local source = merger.new_source_fromtable(data)
+ local m = merger.new(key_def, {source})
+ local res = m:select({limit = 0})
+ test:is(#res, 0, 'table output with limit 0')
+
+ local source = merger.new_source_fromtable(data)
+ local m = merger.new(key_def, {source})
+ local res = m:select({limit = 1})
+ test:is(#res, 1, 'table output with limit 1')
+ test:is_deeply(res[1]:totable(), data[1], 'tuple content')
+
+ local source = merger.new_source_fromtable(data)
+ local m = merger.new(key_def, {source})
+ local output_buffer = buffer.ibuf()
+ m:select({buffer = output_buffer, limit = 0})
+ local res = msgpackffi.decode(output_buffer.rpos)
+ test:is(#res, 0, 'buffer output with limit 0')
+
+ local source = merger.new_source_fromtable(data)
+ local m = merger.new(key_def, {source})
+ output_buffer:recycle()
+ m:select({buffer = output_buffer, limit = 1})
+ local res = msgpackffi.decode(output_buffer.rpos)
+ test:is(#res, 1, 'buffer output with limit 1')
+ test:is_deeply(res[1], data[1], 'tuple content')
+end)
+
+test:test('cascade mergers', function(test)
+ test:plan(2)
+
+ local data = {{'a'}, {'b'}}
+
+ local source = merger.new_source_fromtable(data)
+ local m1 = merger.new(key_def, {source})
+ local m2 = merger.new(key_def, {m1})
+
+ local res = m2:pairs():map(box.tuple.totable):totable()
+ test:is_deeply(res, data, 'same key_def')
+
+ local key_def_unicode = key_def_lib.new({{
+ fieldno = 1,
+ type = 'string',
+ collation = 'unicode',
+ }})
+
+ local source = merger.new_source_fromtable(data)
+ local m1 = merger.new(key_def, {source})
+ local m2 = merger.new(key_def_unicode, {m1})
+
+ local res = m2:pairs():map(box.tuple.totable):totable()
+ test:is_deeply(res, data, 'different key_defs')
+end)
+
+-- Merging cases.
+for _, input_type in ipairs({'buffer', 'table', 'tuple'}) do
+ for _, output_type in ipairs({'buffer', 'table', 'tuple'}) do
+ for _, reverse in ipairs({false, true}) do
+ for _, use_table_as_tuple in ipairs({false, true}) do
+ for _, use_fetch_source in ipairs({false, true}) do
+ for _, schema in ipairs(schemas) do
+ run_case(test, schema, {
+ input_type = input_type,
+ output_type = output_type,
+ reverse = reverse,
+ use_table_as_tuple = use_table_as_tuple,
+ use_fetch_source = use_fetch_source,
+ })
+ end
+ end
+ end
+ end
+ end
+end
+
+os.exit(test:check() and 0 or 1)
--
2.21.0
More information about the Tarantool-patches
mailing list