[PATCH v3 7/7] Add merger for tuple streams (Lua part)

Alexander Turenko alexander.turenko at tarantool.org
Wed May 8 01:14:43 MSK 2019


Thanks!

I'll send 4th version of the patchset.

WBR, Alexander Turenko.

On Tue, Apr 30, 2019 at 08:37:24PM +0300, Vladimir Davydov wrote:
> On Wed, Apr 10, 2019 at 06:21:25PM +0300, Alexander Turenko wrote:
> > A merger is a special kind of a source, which is created from a key_def
> > object and a set of sources. It performs a kind of the merge sort:
> > chooses a source with a minimal / maximal tuple on each step, consumes
> > a tuple from this source and repeats. The API to create a merger is the
> > following:
> > 
> > ```lua
> > local ctx = merger.context.new(key_def.new(<...>))
> 
> Discussed verbally, agreed that a performance gain from using
> merger.context is somewhat dubious. It's compelling to drop it
> altogether and pass key_def to merger.new directly, creating
> a format every time (we can reuse formats if necessary in future).
> Alexander will think about it.

Okay. Removed.

I'll suggest to cache a key_def instead of a merge context in
tarantool-merger-examples.

> 
> Other than that the API and the patch looks good to me.
> 
> See a few minor comments below.
> 
> > local sources = {<...>}
> > local merger_inst = merger.new(ctx, sources, {
> >     -- Ascending (false) or descending (true) order.
> >     -- Default is ascending.
> >     reverse = <boolean> or <nil>,
> > })
> > ```
> 
> > diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c
> > new file mode 100644
> > index 000000000..ebe60bc8d
> > --- /dev/null
> > +++ b/src/box/lua/merger.c
> > @@ -0,0 +1,1184 @@
> 
> > +static uint32_t merger_source_type_id = 0;
> > +static uint32_t merger_context_type_id = 0;
> > +static uint32_t ibuf_type_id = 0;
> 
> We typically use upper case for naming variables storing Lua type ids,
> e.g. CTID_STRUCT_TUPLE_REF.

Changed to CTID_STRUCT_IBUF and CTID_STRUCT_MERGE_SOURCE_REF.

> 
> > +static int
> > +lbox_merger_source_gc(struct lua_State *L)
> > +{
> > +	struct merger_source *source;
> > +	if ((source = luaT_check_merger_source(L, 1)) == NULL)
> > +		return 0;
> 
> Is it actually possible?

No. It was carried from the original shard/driver.c, but tarantool's
code don't perform such checks. Replaced with assert.

> 
> > +	merger_source_unref(source);
> > +	return 0;
> > +}
> 
> > +static struct merger_source **
> > +luaT_merger_new_parse_sources(struct lua_State *L, int idx,
> > +			      uint32_t *sources_count_ptr)
> > +{
> > +	/* Allocate sources array. */
> > +	uint32_t sources_count = lua_objlen(L, idx);
> > +	const ssize_t sources_size =
> > +		sources_count * sizeof(struct merger_source *);
> > +	struct merger_source **sources =
> > +		(struct merger_source **) malloc(sources_size);
> 
> Pointless type conversion.

Removed. Also changed the order from `count * sizeof(...)` to
`sizeof(...) * count` where we allocate arrays to do the arithmetic in
size_t. There is no real reason, just to do things in the right way.

Also fixed the place (it is this function) where I mistakely save an
array size into a ssize_t (signed) variable.

> 
> > +	if (sources == NULL) {
> > +		diag_set(OutOfMemory, sources_size, "malloc", "sources");
> > +		return NULL;
> > +	}
> > +
> > +	/* Save all sources. */
> > +	for (uint32_t i = 0; i < sources_count; ++i) {
> > +		lua_pushinteger(L, i + 1);
> > +		lua_gettable(L, idx);
> > +
> > +		/* Extract a source from a Lua stack. */
> > +		struct merger_source *source = luaT_check_merger_source(L, -1);
> > +		if (source == NULL) {
> > +			free(sources);
> > +			diag_set(IllegalParams,
> > +				 "Unknown source type at index %d", i + 1);
> > +			return NULL;
> > +		}
> > +		sources[i] = source;
> > +	}
> > +	lua_pop(L, sources_count);
> > +
> > +	*sources_count_ptr = sources_count;
> 
> source_count

Changed all *_count names in that way and also changed all *_cnt to
*_count.

> 
> > +struct merger_source_buffer {
> > +	struct merger_source base;
> > +	/*
> > +	 * A reference to a Lua iterator to fetch a next chunk of
> > +	 * tuples.
> > +	 */
> > +	struct luaL_iterator *fetch_it;
> > +	/*
> > +	 * A reference a buffer with a current chunk of tuples.
> 
> A reference to a buffer storing the current chunk of tuples.

I don't get why 'THE current chunk', but changed as you suggested
anyway.

> 
> > +	 * It is needed to prevent LuaJIT from collecting the
> > +	 * buffer while the source consider it as the current
> > +	 * one.
> > +	 */
> > +	int ref;
> > +	/*
> > +	 * A buffer with a current chunk of tuples.
> > +	 */
> > +	struct ibuf *buf;
> > +	/*
> > +	 * A merger stops before end of a buffer when it is not
> > +	 * the last merger in the chain.
> > +	 */
> > +	size_t remaining_tuples_cnt;
> 
> Please rename to remaining_tuple_count.

Done.

> 
> > +static int
> > +luaL_merger_source_buffer_fetch(struct merger_source_buffer *source)
> > +{
> > +	struct lua_State *L = fiber()->storage.lua.stack;
> > +	int nresult = luaL_iterator_next(L, source->fetch_it);
> > +
> > +	/* Handle a Lua error in a gen function. */
> > +	if (nresult == -1)
> > +		return -1;
> > +
> > +	/* No more data: do nothing. */
> > +	if (nresult == 0)
> > +		return 0;
> > +
> > +	/* Handle incorrect results count. */
> > +	if (nresult != 2) {
> > +		diag_set(IllegalParams, "Expected <state>, <buffer>, got %d "
> > +			 "return values", nresult);
> > +		return -1;
> > +	}
> > +
> > +	/* Set a new buffer as the current chunk. */
> > +	if (source->ref > 0)
> > +		luaL_unref(L, LUA_REGISTRYINDEX, source->ref);
> > +	lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */
> > +	source->ref = luaL_ref(L, LUA_REGISTRYINDEX);
> > +	source->buf = luaT_check_ibuf(L, -1);
> > +	assert(source->buf != NULL);
> 
>  | tarantool> m = merger.new(c, {merger.new_buffer_source(box.space._space:pairs())})
>  | ---
>  | ...
>  | 
>  | tarantool> m:select()
>  | luaL_merger_source_buffer_fetch: Assertion `source->buf != NULL' failed.
> 
> Please fail gracefully in this case.

Nice catch. Fixed. Added 'bad_chunks' test cases.

> 
> > +	lua_pop(L, nresult);
> > +
> > +	/* Update remaining_tuples_cnt and skip the header. */
> > +	if (decode_header(source->buf, &source->remaining_tuples_cnt) != 0) {
> > +		diag_set(IllegalParams, "Invalid merger source %p",
> > +			 &source->base);
> > +		return -1;
> > +	}
> > +	return 1;
> > +}
> > +
> > +/* Virtual methods */
> > +
> > +static void
> > +luaL_merger_source_buffer_delete(struct merger_source *base)
> > +{
> > +	struct merger_source_buffer *source = container_of(base,
> > +		struct merger_source_buffer, base);
> > +
> > +	assert(source->fetch_it != NULL);
> > +	luaL_iterator_delete(source->fetch_it);
> > +	if (source->ref > 0)
> > +		luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref);
> > +
> > +	free(source);
> > +}
> > +
> > +static int
> > +luaL_merger_source_buffer_next(struct merger_source *base,
> > +			       box_tuple_format_t *format,
> > +			       struct tuple **out)
> > +{
> > +	struct merger_source_buffer *source = container_of(base,
> > +		struct merger_source_buffer, base);
> > +
> > +	/*
> > +	 * Handle the case when all data were processed: ask a
> > +	 * next chunk until a non-empty chunk is received or a
> > +	 * chunks iterator ends.
> > +	 */
> > +	while (source->remaining_tuples_cnt == 0) {
> > +		int rc = luaL_merger_source_buffer_fetch(source);
> > +		if (rc < 0)
> > +			return -1;
> > +		if (rc == 0) {
> > +			*out = NULL;
> > +			return 0;
> > +		}
> > +	}
> > +	if (ibuf_used(source->buf) == 0) {
> > +		diag_set(IllegalParams, "Unexpected msgpack buffer end");
> > +		return -1;
> > +	}
> > +	const char *tuple_beg = source->buf->rpos;
> > +	const char *tuple_end = tuple_beg;
> > +	/*
> > +	 * mp_next() is faster then mp_check(), but can	read bytes
> > +	 * outside of the buffer and so can cause segmentation
> > +	 * faults or an incorrect result.
> > +	 *
> > +	 * We check buffer boundaries after the mp_next() call and
> > +	 * throw an error when the boundaries are violated, but it
> > +	 * does not save us from possible segmentation faults.
> > +	 *
> > +	 * It is in a user responsibility to provide valid
> > +	 * msgpack.
> 
> Ugh, I'd check the buffer with mp_check anyway. Would probably provide
> an option to skip the check if required ('unchecked'). Not sure if it's
> really necessary though.

Okay.

> 
> > +	 */
> > +	mp_next(&tuple_end);
> > +	--source->remaining_tuples_cnt;
> > +	if (tuple_end > source->buf->wpos) {
> > +		diag_set(IllegalParams, "Unexpected msgpack buffer end");
> > +		return -1;
> > +	}
> > +	source->buf->rpos = (char *) tuple_end;
> > +	if (format == NULL)
> > +		format = box_tuple_format_default();
> 
> I'd pass tuple_format_runtime explicitly to the ->next callback.
> Special-casing NULL is kinda ugly.

NULL value for a format has the special meaning for source->next(): it
does not matter for a caller in which format a resulting tuple will be
(see the comment for next() in struct merge_source_vtab). It is not
always means that a tuple will be in the runtime format: say, for a
tuple source it means that it will return tuples in its original format.
The same is applicable for a table source when a chunk contains tuples
(not Lua tables).

This does not matter much for a buffer source, but it is the part of a
source contract in general. I'll leave it as is if you don't mind.

> 
> > +	struct tuple *tuple = box_tuple_new(format, tuple_beg, tuple_end);
> > +	if (tuple == NULL)
> > +		return -1;
> > +
> > +	box_tuple_ref(tuple);
> > +	*out = tuple;
> > +	return 0;
> > +}
> 
> > +static int
> > +encode_result_buffer(struct lua_State *L, struct merger_source *source,
> > +		     struct ibuf *obuf, uint32_t limit)
> 
> Better rename 'obuf' to 'out' or 'buf' or something - we have struct
> obuf so 'struct ibuf *obuf' looks misleading.

We use 'buf' where reading from it, so I would use some another name
when writing. 'out' looks more appropriate for a scalar / pointer value
we'll replace in a function. I'll use 'output_buffer' here and in other
such places in C and Lua if you don't mind.

> 
> > +{
> > +	uint32_t result_len = 0;
> > +	uint32_t result_len_offset = 4;
> > +
> > +	/*
> > +	 * Reserve maximum size for the array around resulting
> > +	 * tuples to set it later.
> > +	 */
> > +	encode_header(obuf, UINT32_MAX);
> > +
> > +	/* Fetch, merge and copy tuples to the buffer. */
> > +	struct tuple *tuple;
> > +	int rc = 0;
> > +	while (result_len < limit && (rc =
> > +	       source->vtab->next(source, NULL, &tuple)) == 0 &&
> > +	       tuple != NULL) {
> > +		uint32_t bsize = tuple->bsize;
> > +		ibuf_reserve(obuf, bsize);
> > +		memcpy(obuf->wpos, tuple_data(tuple), bsize);
> > +		obuf->wpos += bsize;
> > +		result_len_offset += bsize;
> > +		++result_len;
> > +
> > +		/* The received tuple is not more needed. */
> > +		box_tuple_unref(tuple);
> > +	}
> > +
> > +	if (rc != 0)
> > +		return luaT_error(L);
> > +
> > +	/* Write the real array size. */
> > +	mp_store_u32(obuf->wpos - result_len_offset, result_len);
> > +
> > +	return 0;
> > +}
> 
> > +static int
> > +lbox_merger_source_select(struct lua_State *L)
> > +{
> > +	struct merger_source *source;
> > +	int top = lua_gettop(L);
> > +	bool ok = (top == 1 || top == 2) &&
> > +		/* Merger source. */
> > +		(source = luaT_check_merger_source(L, 1)) != NULL &&
> > +		/* Opts. */
> > +		(lua_isnoneornil(L, 2) == 1 || lua_istable(L, 2) == 1);
> > +	if (!ok)
> > +		return lbox_merger_source_select_usage(L, NULL);
> > +
> > +	uint32_t limit = 0xFFFFFFFF;
> 
> Hmm, UINT32_MAX?

Okay, changed.



More information about the Tarantool-patches mailing list