From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Wed, 8 May 2019 01:14:43 +0300 From: Alexander Turenko Subject: Re: [PATCH v3 7/7] Add merger for tuple streams (Lua part) Message-ID: <20190507221442.gocz6rpnctp245pj@tkn_work_nb> References: <9af4b8f1311537ef696d71a1b09bc1721bde8ef0.1554906327.git.alexander.turenko@tarantool.org> <20190430173724.fgvtexkhbdutrt5x@esperanza> MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Disposition: inline In-Reply-To: <20190430173724.fgvtexkhbdutrt5x@esperanza> To: Vladimir Davydov Cc: tarantool-patches@freelists.org List-ID: Thanks! I'll send 4th version of the patchset. WBR, Alexander Turenko. On Tue, Apr 30, 2019 at 08:37:24PM +0300, Vladimir Davydov wrote: > On Wed, Apr 10, 2019 at 06:21:25PM +0300, Alexander Turenko wrote: > > A merger is a special kind of a source, which is created from a key_def > > object and a set of sources. It performs a kind of the merge sort: > > chooses a source with a minimal / maximal tuple on each step, consumes > > a tuple from this source and repeats. The API to create a merger is the > > following: > > > > ```lua > > local ctx = merger.context.new(key_def.new(<...>)) > > Discussed verbally, agreed that a performance gain from using > merger.context is somewhat dubious. It's compelling to drop it > altogether and pass key_def to merger.new directly, creating > a format every time (we can reuse formats if necessary in future). > Alexander will think about it. Okay. Removed. I'll suggest to cache a key_def instead of a merge context in tarantool-merger-examples. > > Other than that the API and the patch looks good to me. > > See a few minor comments below. > > > local sources = {<...>} > > local merger_inst = merger.new(ctx, sources, { > > -- Ascending (false) or descending (true) order. > > -- Default is ascending. > > reverse = or , > > }) > > ``` > > > diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c > > new file mode 100644 > > index 000000000..ebe60bc8d > > --- /dev/null > > +++ b/src/box/lua/merger.c > > @@ -0,0 +1,1184 @@ > > > +static uint32_t merger_source_type_id = 0; > > +static uint32_t merger_context_type_id = 0; > > +static uint32_t ibuf_type_id = 0; > > We typically use upper case for naming variables storing Lua type ids, > e.g. CTID_STRUCT_TUPLE_REF. Changed to CTID_STRUCT_IBUF and CTID_STRUCT_MERGE_SOURCE_REF. > > > +static int > > +lbox_merger_source_gc(struct lua_State *L) > > +{ > > + struct merger_source *source; > > + if ((source = luaT_check_merger_source(L, 1)) == NULL) > > + return 0; > > Is it actually possible? No. It was carried from the original shard/driver.c, but tarantool's code don't perform such checks. Replaced with assert. > > > + merger_source_unref(source); > > + return 0; > > +} > > > +static struct merger_source ** > > +luaT_merger_new_parse_sources(struct lua_State *L, int idx, > > + uint32_t *sources_count_ptr) > > +{ > > + /* Allocate sources array. */ > > + uint32_t sources_count = lua_objlen(L, idx); > > + const ssize_t sources_size = > > + sources_count * sizeof(struct merger_source *); > > + struct merger_source **sources = > > + (struct merger_source **) malloc(sources_size); > > Pointless type conversion. Removed. Also changed the order from `count * sizeof(...)` to `sizeof(...) * count` where we allocate arrays to do the arithmetic in size_t. There is no real reason, just to do things in the right way. Also fixed the place (it is this function) where I mistakely save an array size into a ssize_t (signed) variable. > > > + if (sources == NULL) { > > + diag_set(OutOfMemory, sources_size, "malloc", "sources"); > > + return NULL; > > + } > > + > > + /* Save all sources. */ > > + for (uint32_t i = 0; i < sources_count; ++i) { > > + lua_pushinteger(L, i + 1); > > + lua_gettable(L, idx); > > + > > + /* Extract a source from a Lua stack. */ > > + struct merger_source *source = luaT_check_merger_source(L, -1); > > + if (source == NULL) { > > + free(sources); > > + diag_set(IllegalParams, > > + "Unknown source type at index %d", i + 1); > > + return NULL; > > + } > > + sources[i] = source; > > + } > > + lua_pop(L, sources_count); > > + > > + *sources_count_ptr = sources_count; > > source_count Changed all *_count names in that way and also changed all *_cnt to *_count. > > > +struct merger_source_buffer { > > + struct merger_source base; > > + /* > > + * A reference to a Lua iterator to fetch a next chunk of > > + * tuples. > > + */ > > + struct luaL_iterator *fetch_it; > > + /* > > + * A reference a buffer with a current chunk of tuples. > > A reference to a buffer storing the current chunk of tuples. I don't get why 'THE current chunk', but changed as you suggested anyway. > > > + * It is needed to prevent LuaJIT from collecting the > > + * buffer while the source consider it as the current > > + * one. > > + */ > > + int ref; > > + /* > > + * A buffer with a current chunk of tuples. > > + */ > > + struct ibuf *buf; > > + /* > > + * A merger stops before end of a buffer when it is not > > + * the last merger in the chain. > > + */ > > + size_t remaining_tuples_cnt; > > Please rename to remaining_tuple_count. Done. > > > +static int > > +luaL_merger_source_buffer_fetch(struct merger_source_buffer *source) > > +{ > > + struct lua_State *L = fiber()->storage.lua.stack; > > + int nresult = luaL_iterator_next(L, source->fetch_it); > > + > > + /* Handle a Lua error in a gen function. */ > > + if (nresult == -1) > > + return -1; > > + > > + /* No more data: do nothing. */ > > + if (nresult == 0) > > + return 0; > > + > > + /* Handle incorrect results count. */ > > + if (nresult != 2) { > > + diag_set(IllegalParams, "Expected , , got %d " > > + "return values", nresult); > > + return -1; > > + } > > + > > + /* Set a new buffer as the current chunk. */ > > + if (source->ref > 0) > > + luaL_unref(L, LUA_REGISTRYINDEX, source->ref); > > + lua_pushvalue(L, -nresult + 1); /* Popped by luaL_ref(). */ > > + source->ref = luaL_ref(L, LUA_REGISTRYINDEX); > > + source->buf = luaT_check_ibuf(L, -1); > > + assert(source->buf != NULL); > > | tarantool> m = merger.new(c, {merger.new_buffer_source(box.space._space:pairs())}) > | --- > | ... > | > | tarantool> m:select() > | luaL_merger_source_buffer_fetch: Assertion `source->buf != NULL' failed. > > Please fail gracefully in this case. Nice catch. Fixed. Added 'bad_chunks' test cases. > > > + lua_pop(L, nresult); > > + > > + /* Update remaining_tuples_cnt and skip the header. */ > > + if (decode_header(source->buf, &source->remaining_tuples_cnt) != 0) { > > + diag_set(IllegalParams, "Invalid merger source %p", > > + &source->base); > > + return -1; > > + } > > + return 1; > > +} > > + > > +/* Virtual methods */ > > + > > +static void > > +luaL_merger_source_buffer_delete(struct merger_source *base) > > +{ > > + struct merger_source_buffer *source = container_of(base, > > + struct merger_source_buffer, base); > > + > > + assert(source->fetch_it != NULL); > > + luaL_iterator_delete(source->fetch_it); > > + if (source->ref > 0) > > + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, source->ref); > > + > > + free(source); > > +} > > + > > +static int > > +luaL_merger_source_buffer_next(struct merger_source *base, > > + box_tuple_format_t *format, > > + struct tuple **out) > > +{ > > + struct merger_source_buffer *source = container_of(base, > > + struct merger_source_buffer, base); > > + > > + /* > > + * Handle the case when all data were processed: ask a > > + * next chunk until a non-empty chunk is received or a > > + * chunks iterator ends. > > + */ > > + while (source->remaining_tuples_cnt == 0) { > > + int rc = luaL_merger_source_buffer_fetch(source); > > + if (rc < 0) > > + return -1; > > + if (rc == 0) { > > + *out = NULL; > > + return 0; > > + } > > + } > > + if (ibuf_used(source->buf) == 0) { > > + diag_set(IllegalParams, "Unexpected msgpack buffer end"); > > + return -1; > > + } > > + const char *tuple_beg = source->buf->rpos; > > + const char *tuple_end = tuple_beg; > > + /* > > + * mp_next() is faster then mp_check(), but can read bytes > > + * outside of the buffer and so can cause segmentation > > + * faults or an incorrect result. > > + * > > + * We check buffer boundaries after the mp_next() call and > > + * throw an error when the boundaries are violated, but it > > + * does not save us from possible segmentation faults. > > + * > > + * It is in a user responsibility to provide valid > > + * msgpack. > > Ugh, I'd check the buffer with mp_check anyway. Would probably provide > an option to skip the check if required ('unchecked'). Not sure if it's > really necessary though. Okay. > > > + */ > > + mp_next(&tuple_end); > > + --source->remaining_tuples_cnt; > > + if (tuple_end > source->buf->wpos) { > > + diag_set(IllegalParams, "Unexpected msgpack buffer end"); > > + return -1; > > + } > > + source->buf->rpos = (char *) tuple_end; > > + if (format == NULL) > > + format = box_tuple_format_default(); > > I'd pass tuple_format_runtime explicitly to the ->next callback. > Special-casing NULL is kinda ugly. NULL value for a format has the special meaning for source->next(): it does not matter for a caller in which format a resulting tuple will be (see the comment for next() in struct merge_source_vtab). It is not always means that a tuple will be in the runtime format: say, for a tuple source it means that it will return tuples in its original format. The same is applicable for a table source when a chunk contains tuples (not Lua tables). This does not matter much for a buffer source, but it is the part of a source contract in general. I'll leave it as is if you don't mind. > > > + struct tuple *tuple = box_tuple_new(format, tuple_beg, tuple_end); > > + if (tuple == NULL) > > + return -1; > > + > > + box_tuple_ref(tuple); > > + *out = tuple; > > + return 0; > > +} > > > +static int > > +encode_result_buffer(struct lua_State *L, struct merger_source *source, > > + struct ibuf *obuf, uint32_t limit) > > Better rename 'obuf' to 'out' or 'buf' or something - we have struct > obuf so 'struct ibuf *obuf' looks misleading. We use 'buf' where reading from it, so I would use some another name when writing. 'out' looks more appropriate for a scalar / pointer value we'll replace in a function. I'll use 'output_buffer' here and in other such places in C and Lua if you don't mind. > > > +{ > > + uint32_t result_len = 0; > > + uint32_t result_len_offset = 4; > > + > > + /* > > + * Reserve maximum size for the array around resulting > > + * tuples to set it later. > > + */ > > + encode_header(obuf, UINT32_MAX); > > + > > + /* Fetch, merge and copy tuples to the buffer. */ > > + struct tuple *tuple; > > + int rc = 0; > > + while (result_len < limit && (rc = > > + source->vtab->next(source, NULL, &tuple)) == 0 && > > + tuple != NULL) { > > + uint32_t bsize = tuple->bsize; > > + ibuf_reserve(obuf, bsize); > > + memcpy(obuf->wpos, tuple_data(tuple), bsize); > > + obuf->wpos += bsize; > > + result_len_offset += bsize; > > + ++result_len; > > + > > + /* The received tuple is not more needed. */ > > + box_tuple_unref(tuple); > > + } > > + > > + if (rc != 0) > > + return luaT_error(L); > > + > > + /* Write the real array size. */ > > + mp_store_u32(obuf->wpos - result_len_offset, result_len); > > + > > + return 0; > > +} > > > +static int > > +lbox_merger_source_select(struct lua_State *L) > > +{ > > + struct merger_source *source; > > + int top = lua_gettop(L); > > + bool ok = (top == 1 || top == 2) && > > + /* Merger source. */ > > + (source = luaT_check_merger_source(L, 1)) != NULL && > > + /* Opts. */ > > + (lua_isnoneornil(L, 2) == 1 || lua_istable(L, 2) == 1); > > + if (!ok) > > + return lbox_merger_source_select_usage(L, NULL); > > + > > + uint32_t limit = 0xFFFFFFFF; > > Hmm, UINT32_MAX? Okay, changed.