From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH v2 09/14] vinyl: clean up write iterator source destruction Date: Wed, 13 Mar 2019 11:52:55 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: By convention we have two methods in each write iterator stream implementation (including the write iterator itself as it implements the interface too): 'stop' and 'close'. The 'stop' method is called in a worker thread. It reverses the effect of 'start'. We need it unreference all tuples referenced during the iteration (we must do it in the worker thread, where the tuples were referenced in the first place so as not to unreference tuple formats, see vy_tuple_delete). The 'close' method is called from the tx thread to unreference tuple formats if necessary and release memory. For the write iterator itself we follow this convention. However, for individual sources, for vy_slice_stream source to be more exact, we do not - the write iterator calls both 'stop' and 'close' from its own 'stop method. Let's cleanup this mess and make the write iterator follow the convention. We'll need it in the next patch. --- src/box/vy_run.c | 8 +++---- src/box/vy_write_iterator.c | 54 ++++++++++++++++++++++++++------------------- 2 files changed, 35 insertions(+), 27 deletions(-) diff --git a/src/box/vy_run.c b/src/box/vy_run.c index 57f62b72..59ae906a 100644 --- a/src/box/vy_run.c +++ b/src/box/vy_run.c @@ -2654,9 +2654,9 @@ vy_slice_stream_next(struct vy_stmt_stream *virt_stream, struct tuple **ret) * Free resources. */ static void -vy_slice_stream_close(struct vy_stmt_stream *virt_stream) +vy_slice_stream_stop(struct vy_stmt_stream *virt_stream) { - assert(virt_stream->iface->close == vy_slice_stream_close); + assert(virt_stream->iface->stop == vy_slice_stream_stop); struct vy_slice_stream *stream = (struct vy_slice_stream *)virt_stream; if (stream->page != NULL) { vy_page_delete(stream->page); @@ -2671,8 +2671,8 @@ vy_slice_stream_close(struct vy_stmt_stream *virt_stream) static const struct vy_stmt_stream_iface vy_slice_stream_iface = { .start = vy_slice_stream_search, .next = vy_slice_stream_next, - .stop = NULL, - .close = vy_slice_stream_close + .stop = vy_slice_stream_stop, + .close = NULL, }; void diff --git a/src/box/vy_write_iterator.c b/src/box/vy_write_iterator.c index 2c4dbf58..8988daa3 100644 --- a/src/box/vy_write_iterator.c +++ b/src/box/vy_write_iterator.c @@ -265,6 +265,7 @@ vy_write_iterator_new_src(struct vy_write_iterator *stream) "malloc", "vinyl write stream"); return NULL; } + heap_node_create(&res->heap_node); res->is_end_of_key = false; rlist_add(&stream->src_list, &res->in_src_list); return res; @@ -278,8 +279,6 @@ vy_write_iterator_delete_src(struct vy_write_iterator *stream, { (void)stream; assert(!src->is_end_of_key); - if (src->stream.iface->stop != NULL) - src->stream.iface->stop(&src->stream); if (src->stream.iface->close != NULL) src->stream.iface->close(&src->stream); rlist_del(&src->in_src_list); @@ -287,8 +286,8 @@ vy_write_iterator_delete_src(struct vy_write_iterator *stream, } /** - * Add a source to the write iterator heap. The added source - * must be open. + * Start iteration in the given source, retrieve the first tuple, + * and add the source to the write iterator heap. * * @return 0 - success, not 0 - error. */ @@ -298,35 +297,38 @@ vy_write_iterator_add_src(struct vy_write_iterator *stream, { if (src->stream.iface->start != NULL) { int rc = src->stream.iface->start(&src->stream); - if (rc != 0) { - vy_write_iterator_delete_src(stream, src); + if (rc != 0) return rc; - } } int rc = src->stream.iface->next(&src->stream, &src->tuple); - if (rc != 0 || src->tuple == NULL) { - vy_write_iterator_delete_src(stream, src); - return rc; - } + if (rc != 0 || src->tuple == NULL) + goto stop; + rc = vy_source_heap_insert(&stream->src_heap, src); if (rc != 0) { diag_set(OutOfMemory, sizeof(void *), "malloc", "vinyl write stream heap"); - vy_write_iterator_delete_src(stream, src); - return rc; + goto stop; } return 0; +stop: + if (src->stream.iface->stop != NULL) + src->stream.iface->stop(&src->stream); + return rc; } /** - * Remove a source from the heap, destroy and free it. + * Remove a source from the heap and stop iteration. */ static void vy_write_iterator_remove_src(struct vy_write_iterator *stream, struct vy_write_src *src) { + if (heap_node_is_stray(&src->heap_node)) + return; /* already removed */ vy_source_heap_delete(&stream->src_heap, src); - vy_write_iterator_delete_src(stream, src); + if (src->stream.iface->stop != NULL) + src->stream.iface->stop(&src->stream); } static const struct vy_stmt_stream_iface vy_slice_stream_iface; @@ -394,16 +396,20 @@ vy_write_iterator_start(struct vy_stmt_stream *vstream) { assert(vstream->iface->start == vy_write_iterator_start); struct vy_write_iterator *stream = (struct vy_write_iterator *)vstream; - struct vy_write_src *src, *tmp; - rlist_foreach_entry_safe(src, &stream->src_list, in_src_list, tmp) { + struct vy_write_src *src; + rlist_foreach_entry(src, &stream->src_list, in_src_list) { if (vy_write_iterator_add_src(stream, src) != 0) - return -1; + goto fail; } return 0; +fail: + rlist_foreach_entry(src, &stream->src_list, in_src_list) + vy_write_iterator_remove_src(stream, src); + return -1; } /** - * Free all resources. + * Stop iteration in all sources. */ static void vy_write_iterator_stop(struct vy_stmt_stream *vstream) @@ -412,9 +418,9 @@ vy_write_iterator_stop(struct vy_stmt_stream *vstream) struct vy_write_iterator *stream = (struct vy_write_iterator *)vstream; for (int i = 0; i < stream->rv_count; ++i) vy_read_view_stmt_destroy(&stream->read_views[i]); - struct vy_write_src *src, *tmp; - rlist_foreach_entry_safe(src, &stream->src_list, in_src_list, tmp) - vy_write_iterator_delete_src(stream, src); + struct vy_write_src *src; + rlist_foreach_entry(src, &stream->src_list, in_src_list) + vy_write_iterator_remove_src(stream, src); if (stream->last_stmt != NULL) { vy_stmt_unref_if_possible(stream->last_stmt); stream->last_stmt = NULL; @@ -439,7 +445,9 @@ vy_write_iterator_close(struct vy_stmt_stream *vstream) { assert(vstream->iface->close == vy_write_iterator_close); struct vy_write_iterator *stream = (struct vy_write_iterator *)vstream; - vy_write_iterator_stop(vstream); + struct vy_write_src *src, *tmp; + rlist_foreach_entry_safe(src, &stream->src_list, in_src_list, tmp) + vy_write_iterator_delete_src(stream, src); vy_source_heap_destroy(&stream->src_heap); tuple_format_unref(stream->format); free(stream); -- 2.11.0