[PATCH v3 7/7] Add merger for tuple streams (Lua part)
Vladimir Davydov
vdavydov.dev at gmail.com
Tue Apr 30 20:37:24 MSK 2019
On Wed, Apr 10, 2019 at 06:21:25PM +0300, Alexander Turenko wrote:
> A merger is a special kind of a source, which is created from a key_def
> object and a set of sources. It performs a kind of the merge sort:
> chooses a source with a minimal / maximal tuple on each step, consumes
> a tuple from this source and repeats. The API to create a merger is the
> following:
>
> ```lua
> local ctx = merger.context.new(key_def.new(<...>))
Discussed verbally, agreed that a performance gain from using
merger.context is somewhat dubious. It's compelling to drop it
altogether and pass key_def to merger.new directly, creating
a format every time (we can reuse formats if necessary in future).
Alexander will think about it.
Other than that the API and the patch looks good to me.
See a few minor comments below.
> local sources = {<...>}
> local merger_inst = merger.new(ctx, sources, {
> -- Ascending (false) or descending (true) order.
> -- Default is ascending.
> reverse = <boolean> or <nil>,
> })
> ```
> diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c
> new file mode 100644
> index 000000000..ebe60bc8d
> --- /dev/null
> +++ b/src/box/lua/merger.c
> @@ -0,0 +1,1184 @@
> +static uint32_t merger_source_type_id = 0;
> +static uint32_t merger_context_type_id = 0;
> +static uint32_t ibuf_type_id = 0;
We typically use upper case for naming variables storing Lua type ids,
e.g. CTID_STRUCT_TUPLE_REF.
> +static int
> +lbox_merger_source_gc(struct lua_State *L)
> +{
> + struct merger_source *source;
> + if ((source = luaT_check_merger_source(L, 1)) == NULL)
> + return 0;
Is it actually possible?
> + merger_source_unref(source);
> + return 0;
> +}
> +static struct merger_source **
> +luaT_merger_new_parse_sources(struct lua_State *L, int idx,
> + uint32_t *sources_count_ptr)
> +{
> + /* Allocate sources array. */
> + uint32_t sources_count = lua_objlen(L, idx);
> + const ssize_t sources_size =
> + sources_count * sizeof(struct merger_source *);
> + struct merger_source **sources =
> + (struct merger_source **) malloc(sources_size);
Pointless type conversion.
> + if (sources == NULL) {
> + diag_set(OutOfMemory, sources_size, "malloc", "sources");
> + return NULL;
> + }
> +
> + /* Save all sources. */
> + for (uint32_t i = 0; i < sources_count; ++i) {
> + lua_pushinteger(L, i + 1);
> + lua_gettable(L, idx);
> +
> + /* Extract a source from a Lua stack. */
> + struct merger_source *source = luaT_check_merger_source(L, -1);
> + if (source == NULL) {
> + free(sources);
> + diag_set(IllegalParams,
> + "Unknown source type at index %d", i + 1);
> + return NULL;
> + }
> + sources[i] = source;
> + }
> + lua_pop(L, sources_count);
> +
> + *sources_count_ptr = sources_count;
source_count
> +struct merger_source_buffer {
> + struct merger_source base;
> + /*
> + * A reference to a Lua iterator to fetch a next chunk of
> + * tuples.
> + */
> + struct luaL_iterator *fetch_it;
> + /*
> + * A reference a buffer with a current chunk of tuples.
A reference to a buffer storing the current chunk of tuples.
> + * It is needed to prevent LuaJIT from collecting the
> + * buffer while the source consider it as the current
> + * one.
> + */
> + int ref;
> + /*
> + * A buffer with a current chunk of tuples.
> + */
> + struct ibuf *buf;
> + /*
> + * A merger stops before end of a buffer when it is not
> + * the last merger in the chain.
> + */
> + size_t remaining_tuples_cnt;
Please rename to remaining_tuple_count.
> +static int
> +luaL_merger_source_buffer_fetch(struct merger_source_buffer *source)
> +{
> + struct lua_State *L = fiber()->storage.lua.stack;
> + int nresult = luaL_iterator_next(L, source->fetch_it);
> +
> + /* Handle a Lua error in a gen function. */
> + if (nresult == -1)
> + return -1;
> +
> + /* No more data: do nothing. */
> + if (nresult == 0)
> + return 0;
> +
> + /* Handle incorrect results count. */
> + if (nresult != 2) {
> + diag_set(IllegalParams, "Expected <state>, <buffer>, got %d "
> + "return values", nresult);
> + return -1;
> + }
> +
> + /* Set a new buffer as the current chunk. */
> + if (source->ref > 0)
> + luaL_unref(L, LUA_REGISTRYINDEX, source->ref);
> + lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */
> + source->ref = luaL_ref(L, LUA_REGISTRYINDEX);
> + source->buf = luaT_check_ibuf(L, -1);
> + assert(source->buf != NULL);
| tarantool> m = merger.new(c, {merger.new_buffer_source(box.space._space:pairs())})
| ---
| ...
|
| tarantool> m:select()
| luaL_merger_source_buffer_fetch: Assertion `source->buf != NULL' failed.
Please fail gracefully in this case.
> + lua_pop(L, nresult);
> +
> + /* Update remaining_tuples_cnt and skip the header. */
> + if (decode_header(source->buf, &source->remaining_tuples_cnt) != 0) {
> + diag_set(IllegalParams, "Invalid merger source %p",
> + &source->base);
> + return -1;
> + }
> + return 1;
> +}
> +
> +/* Virtual methods */
> +
> +static void
> +luaL_merger_source_buffer_delete(struct merger_source *base)
> +{
> + struct merger_source_buffer *source = container_of(base,
> + struct merger_source_buffer, base);
> +
> + assert(source->fetch_it != NULL);
> + luaL_iterator_delete(source->fetch_it);
> + if (source->ref > 0)
> + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref);
> +
> + free(source);
> +}
> +
> +static int
> +luaL_merger_source_buffer_next(struct merger_source *base,
> + box_tuple_format_t *format,
> + struct tuple **out)
> +{
> + struct merger_source_buffer *source = container_of(base,
> + struct merger_source_buffer, base);
> +
> + /*
> + * Handle the case when all data were processed: ask a
> + * next chunk until a non-empty chunk is received or a
> + * chunks iterator ends.
> + */
> + while (source->remaining_tuples_cnt == 0) {
> + int rc = luaL_merger_source_buffer_fetch(source);
> + if (rc < 0)
> + return -1;
> + if (rc == 0) {
> + *out = NULL;
> + return 0;
> + }
> + }
> + if (ibuf_used(source->buf) == 0) {
> + diag_set(IllegalParams, "Unexpected msgpack buffer end");
> + return -1;
> + }
> + const char *tuple_beg = source->buf->rpos;
> + const char *tuple_end = tuple_beg;
> + /*
> + * mp_next() is faster then mp_check(), but can read bytes
> + * outside of the buffer and so can cause segmentation
> + * faults or an incorrect result.
> + *
> + * We check buffer boundaries after the mp_next() call and
> + * throw an error when the boundaries are violated, but it
> + * does not save us from possible segmentation faults.
> + *
> + * It is in a user responsibility to provide valid
> + * msgpack.
Ugh, I'd check the buffer with mp_check anyway. Would probably provide
an option to skip the check if required ('unchecked'). Not sure if it's
really necessary though.
> + */
> + mp_next(&tuple_end);
> + --source->remaining_tuples_cnt;
> + if (tuple_end > source->buf->wpos) {
> + diag_set(IllegalParams, "Unexpected msgpack buffer end");
> + return -1;
> + }
> + source->buf->rpos = (char *) tuple_end;
> + if (format == NULL)
> + format = box_tuple_format_default();
I'd pass tuple_format_runtime explicitly to the ->next callback.
Special-casing NULL is kinda ugly.
> + struct tuple *tuple = box_tuple_new(format, tuple_beg, tuple_end);
> + if (tuple == NULL)
> + return -1;
> +
> + box_tuple_ref(tuple);
> + *out = tuple;
> + return 0;
> +}
> +static int
> +encode_result_buffer(struct lua_State *L, struct merger_source *source,
> + struct ibuf *obuf, uint32_t limit)
Better rename 'obuf' to 'out' or 'buf' or something - we have struct
obuf so 'struct ibuf *obuf' looks misleading.
> +{
> + uint32_t result_len = 0;
> + uint32_t result_len_offset = 4;
> +
> + /*
> + * Reserve maximum size for the array around resulting
> + * tuples to set it later.
> + */
> + encode_header(obuf, UINT32_MAX);
> +
> + /* Fetch, merge and copy tuples to the buffer. */
> + struct tuple *tuple;
> + int rc = 0;
> + while (result_len < limit && (rc =
> + source->vtab->next(source, NULL, &tuple)) == 0 &&
> + tuple != NULL) {
> + uint32_t bsize = tuple->bsize;
> + ibuf_reserve(obuf, bsize);
> + memcpy(obuf->wpos, tuple_data(tuple), bsize);
> + obuf->wpos += bsize;
> + result_len_offset += bsize;
> + ++result_len;
> +
> + /* The received tuple is not more needed. */
> + box_tuple_unref(tuple);
> + }
> +
> + if (rc != 0)
> + return luaT_error(L);
> +
> + /* Write the real array size. */
> + mp_store_u32(obuf->wpos - result_len_offset, result_len);
> +
> + return 0;
> +}
> +static int
> +lbox_merger_source_select(struct lua_State *L)
> +{
> + struct merger_source *source;
> + int top = lua_gettop(L);
> + bool ok = (top == 1 || top == 2) &&
> + /* Merger source. */
> + (source = luaT_check_merger_source(L, 1)) != NULL &&
> + /* Opts. */
> + (lua_isnoneornil(L, 2) == 1 || lua_istable(L, 2) == 1);
> + if (!ok)
> + return lbox_merger_source_select_usage(L, NULL);
> +
> + uint32_t limit = 0xFFFFFFFF;
Hmm, UINT32_MAX?
More information about the Tarantool-patches
mailing list