[PATCH v2 09/14] vinyl: clean up write iterator source destruction

Vladimir Davydov vdavydov.dev at gmail.com
Wed Mar 13 11:52:55 MSK 2019


By convention we have two methods in each write iterator stream
implementation (including the write iterator itself as it implements
the interface too): 'stop' and 'close'. The 'stop' method is called
in a worker thread. It reverses the effect of 'start'. We need it
unreference all tuples referenced during the iteration (we must do
it in the worker thread, where the tuples were referenced in the first
place so as not to unreference tuple formats, see vy_tuple_delete).
The 'close' method is called from the tx thread to unreference tuple
formats if necessary and release memory.

For the write iterator itself we follow this convention. However,
for individual sources, for vy_slice_stream source to be more exact,
we do not - the write iterator calls both 'stop' and 'close' from
its own 'stop method. Let's cleanup this mess and make the write
iterator follow the convention. We'll need it in the next patch.
---
 src/box/vy_run.c            |  8 +++----
 src/box/vy_write_iterator.c | 54 ++++++++++++++++++++++++++-------------------
 2 files changed, 35 insertions(+), 27 deletions(-)

diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index 57f62b72..59ae906a 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -2654,9 +2654,9 @@ vy_slice_stream_next(struct vy_stmt_stream *virt_stream, struct tuple **ret)
  * Free resources.
  */
 static void
-vy_slice_stream_close(struct vy_stmt_stream *virt_stream)
+vy_slice_stream_stop(struct vy_stmt_stream *virt_stream)
 {
-	assert(virt_stream->iface->close == vy_slice_stream_close);
+	assert(virt_stream->iface->stop == vy_slice_stream_stop);
 	struct vy_slice_stream *stream = (struct vy_slice_stream *)virt_stream;
 	if (stream->page != NULL) {
 		vy_page_delete(stream->page);
@@ -2671,8 +2671,8 @@ vy_slice_stream_close(struct vy_stmt_stream *virt_stream)
 static const struct vy_stmt_stream_iface vy_slice_stream_iface = {
 	.start = vy_slice_stream_search,
 	.next = vy_slice_stream_next,
-	.stop = NULL,
-	.close = vy_slice_stream_close
+	.stop = vy_slice_stream_stop,
+	.close = NULL,
 };
 
 void
diff --git a/src/box/vy_write_iterator.c b/src/box/vy_write_iterator.c
index 2c4dbf58..8988daa3 100644
--- a/src/box/vy_write_iterator.c
+++ b/src/box/vy_write_iterator.c
@@ -265,6 +265,7 @@ vy_write_iterator_new_src(struct vy_write_iterator *stream)
 			 "malloc", "vinyl write stream");
 		return NULL;
 	}
+	heap_node_create(&res->heap_node);
 	res->is_end_of_key = false;
 	rlist_add(&stream->src_list, &res->in_src_list);
 	return res;
@@ -278,8 +279,6 @@ vy_write_iterator_delete_src(struct vy_write_iterator *stream,
 {
 	(void)stream;
 	assert(!src->is_end_of_key);
-	if (src->stream.iface->stop != NULL)
-		src->stream.iface->stop(&src->stream);
 	if (src->stream.iface->close != NULL)
 		src->stream.iface->close(&src->stream);
 	rlist_del(&src->in_src_list);
@@ -287,8 +286,8 @@ vy_write_iterator_delete_src(struct vy_write_iterator *stream,
 }
 
 /**
- * Add a source to the write iterator heap. The added source
- * must be open.
+ * Start iteration in the given source, retrieve the first tuple,
+ * and add the source to the write iterator heap.
  *
  * @return 0 - success, not 0 - error.
  */
@@ -298,35 +297,38 @@ vy_write_iterator_add_src(struct vy_write_iterator *stream,
 {
 	if (src->stream.iface->start != NULL) {
 		int rc = src->stream.iface->start(&src->stream);
-		if (rc != 0) {
-			vy_write_iterator_delete_src(stream, src);
+		if (rc != 0)
 			return rc;
-		}
 	}
 	int rc = src->stream.iface->next(&src->stream, &src->tuple);
-	if (rc != 0 || src->tuple == NULL) {
-		vy_write_iterator_delete_src(stream, src);
-		return rc;
-	}
+	if (rc != 0 || src->tuple == NULL)
+		goto stop;
+
 	rc = vy_source_heap_insert(&stream->src_heap, src);
 	if (rc != 0) {
 		diag_set(OutOfMemory, sizeof(void *),
 			 "malloc", "vinyl write stream heap");
-		vy_write_iterator_delete_src(stream, src);
-		return rc;
+		goto stop;
 	}
 	return 0;
+stop:
+	if (src->stream.iface->stop != NULL)
+		src->stream.iface->stop(&src->stream);
+	return rc;
 }
 
 /**
- * Remove a source from the heap, destroy and free it.
+ * Remove a source from the heap and stop iteration.
  */
 static void
 vy_write_iterator_remove_src(struct vy_write_iterator *stream,
 			   struct vy_write_src *src)
 {
+	if (heap_node_is_stray(&src->heap_node))
+		return; /* already removed */
 	vy_source_heap_delete(&stream->src_heap, src);
-	vy_write_iterator_delete_src(stream, src);
+	if (src->stream.iface->stop != NULL)
+		src->stream.iface->stop(&src->stream);
 }
 
 static const struct vy_stmt_stream_iface vy_slice_stream_iface;
@@ -394,16 +396,20 @@ vy_write_iterator_start(struct vy_stmt_stream *vstream)
 {
 	assert(vstream->iface->start == vy_write_iterator_start);
 	struct vy_write_iterator *stream = (struct vy_write_iterator *)vstream;
-	struct vy_write_src *src, *tmp;
-	rlist_foreach_entry_safe(src, &stream->src_list, in_src_list, tmp) {
+	struct vy_write_src *src;
+	rlist_foreach_entry(src, &stream->src_list, in_src_list) {
 		if (vy_write_iterator_add_src(stream, src) != 0)
-			return -1;
+			goto fail;
 	}
 	return 0;
+fail:
+	rlist_foreach_entry(src, &stream->src_list, in_src_list)
+		vy_write_iterator_remove_src(stream, src);
+	return -1;
 }
 
 /**
- * Free all resources.
+ * Stop iteration in all sources.
  */
 static void
 vy_write_iterator_stop(struct vy_stmt_stream *vstream)
@@ -412,9 +418,9 @@ vy_write_iterator_stop(struct vy_stmt_stream *vstream)
 	struct vy_write_iterator *stream = (struct vy_write_iterator *)vstream;
 	for (int i = 0; i < stream->rv_count; ++i)
 		vy_read_view_stmt_destroy(&stream->read_views[i]);
-	struct vy_write_src *src, *tmp;
-	rlist_foreach_entry_safe(src, &stream->src_list, in_src_list, tmp)
-		vy_write_iterator_delete_src(stream, src);
+	struct vy_write_src *src;
+	rlist_foreach_entry(src, &stream->src_list, in_src_list)
+		vy_write_iterator_remove_src(stream, src);
 	if (stream->last_stmt != NULL) {
 		vy_stmt_unref_if_possible(stream->last_stmt);
 		stream->last_stmt = NULL;
@@ -439,7 +445,9 @@ vy_write_iterator_close(struct vy_stmt_stream *vstream)
 {
 	assert(vstream->iface->close == vy_write_iterator_close);
 	struct vy_write_iterator *stream = (struct vy_write_iterator *)vstream;
-	vy_write_iterator_stop(vstream);
+	struct vy_write_src *src, *tmp;
+	rlist_foreach_entry_safe(src, &stream->src_list, in_src_list, tmp)
+		vy_write_iterator_delete_src(stream, src);
 	vy_source_heap_destroy(&stream->src_heap);
 	tuple_format_unref(stream->format);
 	free(stream);
-- 
2.11.0




More information about the Tarantool-patches mailing list