[PATCH 1/3] vinyl: introduce vy_run_writer

Vladimir Davydov vdavydov.dev at gmail.com
Thu Mar 1 13:57:30 MSK 2018


From: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>

vy_run_writer incapsulates logic of writing statements to a run.
It provides API to write statements one by one. It is needed so
that we can abort a run writing task before waiting for it to
finish writing the file.

Edited by @locker:
 - add region_truncate() wherever necessary
 - do not reallocate row index ibuf for each page
 - remove vy_run_write(), use vy_run_writer directly instead

Needed for #3166
---
 src/box/vy_run.c            | 511 +++++++++++++++++++++-----------------------
 src/box/vy_run.h            |  91 +++++++-
 src/box/vy_scheduler.c      |  60 ++++--
 test/unit/vy_point_lookup.c |  47 +++-
 4 files changed, 412 insertions(+), 297 deletions(-)

diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index 60180841..c677dc09 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -1890,9 +1890,6 @@ vy_run_dump_stmt(const struct tuple *value, struct xlog *data_xlog,
 		 struct vy_page_info *info, const struct key_def *key_def,
 		 bool is_primary)
 {
-	struct region *region = &fiber()->gc;
-	size_t used = region_used(region);
-
 	struct xrow_header xrow;
 	int rc = (is_primary ?
 		  vy_stmt_encode_primary(value, key_def, 0, &xrow) :
@@ -1904,8 +1901,6 @@ vy_run_dump_stmt(const struct tuple *value, struct xlog *data_xlog,
 	if ((row_size = xlog_write_row(data_xlog, &xrow)) < 0)
 		return -1;
 
-	region_truncate(region, used);
-
 	info->unpacked_size += row_size;
 	info->row_count++;
 	return 0;
@@ -1967,247 +1962,6 @@ vy_run_alloc_page_info(struct vy_run *run, uint32_t *page_info_capacity)
 	return 0;
 }
 
-/**
- * Write statements from the iterator to a new page in the run,
- * update page and run statistics.
- *
- *  @retval  1 all is ok, the iterator is finished
- *  @retval  0 all is ok, the iterator isn't finished
- *  @retval -1 error occurred
- */
-static int
-vy_run_write_page(struct vy_run *run, struct xlog *data_xlog,
-		  struct vy_stmt_stream *wi, struct tuple **curr_stmt,
-		  uint64_t page_size, struct bloom_spectrum *bs,
-		  const struct key_def *cmp_def,
-		  const struct key_def *key_def, bool is_primary,
-		  uint32_t *page_info_capacity)
-{
-	assert(curr_stmt != NULL);
-	assert(*curr_stmt != NULL);
-	struct vy_page_info *page = NULL;
-	const char *region_key;
-	bool end_of_run = false;
-	/* Last written statement */
-	struct tuple *last_stmt = *curr_stmt;
-	vy_stmt_ref_if_possible(last_stmt);
-
-	/* row offsets accumulator */
-	struct ibuf row_index_buf;
-	ibuf_create(&row_index_buf, &cord()->slabc, sizeof(uint32_t) * 4096);
-
-	if (run->info.page_count >= *page_info_capacity &&
-	    vy_run_alloc_page_info(run, page_info_capacity) != 0)
-		goto error_row_index;
-	assert(*page_info_capacity >= run->info.page_count);
-
-	/* See comment to run_info->max_key allocation below. */
-	region_key = tuple_extract_key(*curr_stmt, cmp_def, NULL);
-	if (region_key == NULL)
-		goto error_row_index;
-
-	if (run->info.page_count == 0) {
-		assert(run->info.min_key == NULL);
-		run->info.min_key = vy_key_dup(region_key);
-		if (run->info.min_key == NULL)
-			goto error_row_index;
-	}
-
-	page = run->page_info + run->info.page_count;
-	if (vy_page_info_create(page, data_xlog->offset, region_key) != 0)
-		goto error_row_index;
-	xlog_tx_begin(data_xlog);
-
-	do {
-		uint32_t *offset = (uint32_t *) ibuf_alloc(&row_index_buf,
-							   sizeof(uint32_t));
-		if (offset == NULL) {
-			diag_set(OutOfMemory, sizeof(uint32_t),
-				 "ibuf", "row index");
-			goto error_rollback;
-		}
-		*offset = page->unpacked_size;
-
-		if (vy_run_dump_stmt(*curr_stmt, data_xlog, page,
-				     cmp_def, is_primary) != 0)
-			goto error_rollback;
-
-		if (bs != NULL)
-			bloom_spectrum_add(bs, tuple_hash(*curr_stmt, key_def));
-
-		int64_t lsn = vy_stmt_lsn(*curr_stmt);
-		run->info.min_lsn = MIN(run->info.min_lsn, lsn);
-		run->info.max_lsn = MAX(run->info.max_lsn, lsn);
-
-		if (wi->iface->next(wi, curr_stmt))
-			goto error_rollback;
-
-		if (*curr_stmt == NULL) {
-			end_of_run = true;
-		} else {
-			vy_stmt_unref_if_possible(last_stmt);
-			last_stmt = *curr_stmt;
-			vy_stmt_ref_if_possible(last_stmt);
-		}
-	} while (end_of_run == false &&
-		 obuf_size(&data_xlog->obuf) < page_size);
-
-	/* We don't write empty pages. */
-	assert(last_stmt != NULL);
-
-	if (end_of_run) {
-		/*
-		 * Tuple_extract_key allocates the key on a
-		 * region, but the max_key must be allocated on
-		 * the heap, because the max_key can live longer
-		 * than a fiber. To reach this, we must copy the
-		 * key into malloced memory.
-		 */
-		region_key = tuple_extract_key(last_stmt, cmp_def, NULL);
-		if (region_key == NULL)
-			goto error_rollback;
-		assert(run->info.max_key == NULL);
-		run->info.max_key = vy_key_dup(region_key);
-		if (run->info.max_key == NULL)
-			goto error_rollback;
-	}
-	vy_stmt_unref_if_possible(last_stmt);
-	last_stmt = NULL;
-
-	/* Save offset to row index  */
-	page->row_index_offset = page->unpacked_size;
-
-	/* Write row index */
-	struct xrow_header xrow;
-	const uint32_t *row_index = (const uint32_t *) row_index_buf.rpos;
-	assert(ibuf_used(&row_index_buf) == sizeof(uint32_t) * page->row_count);
-	if (vy_row_index_encode(row_index, page->row_count, &xrow) < 0)
-		goto error_rollback;
-
-	ssize_t written = xlog_write_row(data_xlog, &xrow);
-	if (written < 0)
-		goto error_rollback;
-
-	page->unpacked_size += written;
-
-	written = xlog_tx_commit(data_xlog);
-	if (written == 0)
-		written = xlog_flush(data_xlog);
-	if (written < 0)
-		goto error_row_index;
-
-	page->size = written;
-
-	assert(page->row_count > 0);
-
-	run->info.page_count++;
-	vy_run_acct_page(run, page);
-
-	ibuf_destroy(&row_index_buf);
-	return !end_of_run ? 0: 1;
-
-error_rollback:
-	xlog_tx_rollback(data_xlog);
-error_row_index:
-	ibuf_destroy(&row_index_buf);
-	if (last_stmt != NULL)
-		vy_stmt_unref_if_possible(last_stmt);
-	return -1;
-}
-
-/**
- * Write statements from the iterator to a new run file.
- *
- *  @retval 0 success
- *  @retval -1 error occurred
- */
-static int
-vy_run_write_data(struct vy_run *run, const char *dirpath,
-		  uint32_t space_id, uint32_t iid,
-		  struct vy_stmt_stream *wi, uint64_t page_size,
-		  const struct key_def *cmp_def,
-		  const struct key_def *key_def,
-		  size_t max_output_count, double bloom_fpr)
-{
-	struct tuple *stmt;
-
-	/* Start iteration. */
-	if (wi->iface->start(wi) != 0)
-		goto err;
-	if (wi->iface->next(wi, &stmt) != 0)
-		goto err;
-
-	/* Do not create empty run files. */
-	if (stmt == NULL)
-		goto done;
-
-	struct bloom_spectrum bs;
-	bool has_bloom = bloom_fpr < 1;
-	if (has_bloom && bloom_spectrum_create(&bs, max_output_count,
-					bloom_fpr, runtime.quota) != 0) {
-		diag_set(OutOfMemory, 0,
-			 "bloom_spectrum_create", "bloom_spectrum");
-		goto err;
-	}
-
-	char path[PATH_MAX];
-	vy_run_snprint_path(path, sizeof(path), dirpath,
-			    space_id, iid, run->id, VY_FILE_RUN);
-
-	say_info("writing `%s'", path);
-
-	struct xlog data_xlog;
-	struct xlog_meta meta = {
-		.filetype = XLOG_META_TYPE_RUN,
-		.instance_uuid = INSTANCE_UUID,
-	};
-	if (xlog_create(&data_xlog, path, 0, &meta) < 0)
-		goto err_free_bloom;
-
-	run->info.min_lsn = INT64_MAX;
-	run->info.max_lsn = -1;
-
-	assert(run->page_info == NULL);
-	uint32_t page_info_capacity = 0;
-	int rc;
-	do {
-		rc = vy_run_write_page(run, &data_xlog, wi, &stmt, page_size,
-				       has_bloom ? &bs : NULL, cmp_def, key_def,
-				       iid == 0, &page_info_capacity);
-		if (rc < 0)
-			goto err_close_xlog;
-		fiber_gc();
-	} while (rc == 0);
-
-	/* Sync data and link the file to the final name. */
-	if (xlog_sync(&data_xlog) < 0 ||
-	    xlog_rename(&data_xlog) < 0)
-		goto err_close_xlog;
-
-	run->fd = data_xlog.fd;
-	xlog_close(&data_xlog, true);
-	fiber_gc();
-
-	if (has_bloom) {
-		bloom_spectrum_choose(&bs, &run->info.bloom);
-		run->info.has_bloom = true;
-		bloom_spectrum_destroy(&bs, runtime.quota);
-	}
-done:
-	wi->iface->stop(wi);
-	return 0;
-
-err_close_xlog:
-	xlog_close(&data_xlog, false);
-	fiber_gc();
-err_free_bloom:
-	if (has_bloom)
-		bloom_spectrum_destroy(&bs, runtime.quota);
-err:
-	wi->iface->stop(wi);
-	return -1;
-}
-
 /** {{{ vy_page_info */
 
 /**
@@ -2446,41 +2200,262 @@ fail:
 	return -1;
 }
 
-/*
- * Create a run file, write statements returned by a write
- * iterator to it, and create an index file.
- */
 int
-vy_run_write(struct vy_run *run, const char *dirpath,
-	     uint32_t space_id, uint32_t iid,
-	     struct vy_stmt_stream *wi, uint64_t page_size,
-	     const struct key_def *cmp_def,
-	     const struct key_def *key_def,
-	     size_t max_output_count, double bloom_fpr)
+vy_run_writer_create(struct vy_run_writer *writer, struct vy_run *run,
+		const char *dirpath, uint32_t space_id, uint32_t iid,
+		const struct key_def *cmp_def, const struct key_def *key_def,
+		uint64_t page_size, double bloom_fpr, size_t max_output_count)
+{
+	memset(writer, 0, sizeof(*writer));
+	writer->run = run;
+	writer->dirpath = dirpath;
+	writer->space_id = space_id;
+	writer->iid = iid;
+	writer->cmp_def = cmp_def;
+	writer->key_def = key_def;
+	writer->page_size = page_size;
+	writer->has_bloom = (max_output_count > 0 && bloom_fpr < 1);
+	if (writer->has_bloom &&
+	    bloom_spectrum_create(&writer->bloom, max_output_count,
+				  bloom_fpr, runtime.quota) != 0) {
+		diag_set(OutOfMemory, 0,
+			 "bloom_spectrum_create", "bloom_spectrum");
+		return -1;
+	}
+	xlog_clear(&writer->data_xlog);
+	ibuf_create(&writer->row_index_buf, &cord()->slabc,
+		    4096 * sizeof(uint32_t));
+	run->info.min_lsn = INT64_MAX;
+	run->info.max_lsn = -1;
+	assert(run->page_info == NULL);
+	return 0;
+}
+
+/**
+ * Create an xlog to write run.
+ * @param writer Run writer.
+ * @retval -1 Memory or IO error.
+ * @retval  0 Success.
+ */
+static int
+vy_run_writer_create_xlog(struct vy_run_writer *writer)
 {
-	ERROR_INJECT(ERRINJ_VY_RUN_WRITE,
-		     {diag_set(ClientError, ER_INJECTION,
-			       "vinyl dump"); return -1;});
+	assert(!xlog_is_open(&writer->data_xlog));
+	char path[PATH_MAX];
+	vy_run_snprint_path(path, sizeof(path), writer->dirpath,
+			    writer->space_id, writer->iid, writer->run->id,
+			    VY_FILE_RUN);
+	say_info("writing `%s'", path);
+	const struct xlog_meta meta = {
+		.filetype = XLOG_META_TYPE_RUN,
+		.instance_uuid = INSTANCE_UUID,
+	};
+	return xlog_create(&writer->data_xlog, path, 0, &meta);
+}
 
-	struct errinj *inj = errinj(ERRINJ_VY_RUN_WRITE_TIMEOUT, ERRINJ_DOUBLE);
-	if (inj != NULL && inj->dparam > 0)
-		usleep(inj->dparam * 1000000);
+/**
+ * Start a new page with a min_key stored in @a first_stmt.
+ * @param writer Run writer.
+ * @param first_stmt First statement of a page.
+ *
+ * @retval -1 Memory error.
+ * @retval  0 Success.
+ */
+static int
+vy_run_writer_start_page(struct vy_run_writer *writer,
+			 const struct tuple *first_stmt)
+{
+	struct vy_run *run = writer->run;
+	if (run->info.page_count >= writer->page_info_capacity &&
+	    vy_run_alloc_page_info(run, &writer->page_info_capacity) != 0)
+		return -1;
+	const char *key = tuple_extract_key(first_stmt, writer->cmp_def, NULL);
+	if (key == NULL)
+		return -1;
+	if (run->info.page_count == 0) {
+		assert(run->info.min_key == NULL);
+		run->info.min_key = vy_key_dup(key);
+		if (run->info.min_key == NULL)
+			return -1;
+	}
+	struct vy_page_info *page = run->page_info + run->info.page_count;
+	if (vy_page_info_create(page, writer->data_xlog.offset, key) != 0)
+		return -1;
+	xlog_tx_begin(&writer->data_xlog);
+	return 0;
+}
 
-	if (vy_run_write_data(run, dirpath, space_id, iid,
-			      wi, page_size, cmp_def, key_def,
-			      max_output_count, bloom_fpr) != 0)
+/**
+ * Write @a stmt into a current page.
+ * @param writer Run writer.
+ * @param stmt Statement to write.
+ *
+ * @retval -1 Memory or IO error.
+ * @retval  0 Success.
+ */
+static int
+vy_run_writer_write_to_page(struct vy_run_writer *writer, struct tuple *stmt)
+{
+	if (writer->last_stmt != NULL)
+		vy_stmt_unref_if_possible(writer->last_stmt);
+	writer->last_stmt = stmt;
+	vy_stmt_ref_if_possible(stmt);
+	struct vy_run *run = writer->run;
+	struct vy_page_info *page = run->page_info + run->info.page_count;
+	uint32_t *offset = (uint32_t *)ibuf_alloc(&writer->row_index_buf,
+						  sizeof(uint32_t));
+	if (offset == NULL) {
+		diag_set(OutOfMemory, sizeof(uint32_t), "ibuf", "row index");
+		return -1;
+	}
+	*offset = page->unpacked_size;
+	if (vy_run_dump_stmt(stmt, &writer->data_xlog, page,
+			     writer->cmp_def, writer->iid == 0) != 0)
 		return -1;
+	if (writer->has_bloom) {
+		bloom_spectrum_add(&writer->bloom,
+				   tuple_hash(stmt, writer->key_def));
+	}
+	int64_t lsn = vy_stmt_lsn(stmt);
+	run->info.min_lsn = MIN(run->info.min_lsn, lsn);
+	run->info.max_lsn = MAX(run->info.max_lsn, lsn);
+	return 0;
+}
 
-	if (vy_run_is_empty(run))
-		return 0;
+/**
+ * Finish a current page.
+ * @param writer Run writer.
+ * @retval -1 Memory or IO error.
+ * @retval  0 Success.
+ */
+static int
+vy_run_writer_end_page(struct vy_run_writer *writer)
+{
+	struct vy_run *run = writer->run;
+	struct vy_page_info *page = run->page_info + run->info.page_count;
+
+	assert(page->row_count > 0);
+	assert(ibuf_used(&writer->row_index_buf) ==
+	       sizeof(uint32_t) * page->row_count);
 
-	if (vy_run_write_index(run, dirpath, space_id, iid) != 0)
+	struct xrow_header xrow;
+	uint32_t *row_index = (uint32_t *)writer->row_index_buf.rpos;
+	if (vy_row_index_encode(row_index, page->row_count, &xrow) < 0)
+		return -1;
+	ssize_t written = xlog_write_row(&writer->data_xlog, &xrow);
+	if (written < 0)
 		return -1;
+	page->row_index_offset = page->unpacked_size;
+	page->unpacked_size += written;
 
+	written = xlog_tx_commit(&writer->data_xlog);
+	if (written == 0)
+		written = xlog_flush(&writer->data_xlog);
+	if (written < 0)
+		return -1;
+	page->size = written;
+	run->info.page_count++;
+	vy_run_acct_page(run, page);
+	ibuf_reset(&writer->row_index_buf);
 	return 0;
 }
 
 int
+vy_run_writer_append_stmt(struct vy_run_writer *writer, struct tuple *stmt)
+{
+	int rc = -1;
+	size_t region_svp = region_used(&fiber()->gc);
+	if (!xlog_is_open(&writer->data_xlog) &&
+	    vy_run_writer_create_xlog(writer) != 0)
+		goto out;
+	if (ibuf_used(&writer->row_index_buf) == 0 &&
+	    vy_run_writer_start_page(writer, stmt) != 0)
+		goto out;
+	if (vy_run_writer_write_to_page(writer, stmt) != 0)
+		goto out;
+	if (obuf_size(&writer->data_xlog.obuf) >= writer->page_size &&
+	    vy_run_writer_end_page(writer) != 0)
+		goto out;
+	rc = 0;
+out:
+	region_truncate(&fiber()->gc, region_svp);
+	return rc;
+}
+
+/**
+ * Destroy a run writer.
+ * @param writer Writer to destroy.
+ * @param reuse_fd True in a case of success run write. And else
+ *        false.
+ */
+static void
+vy_run_writer_destroy(struct vy_run_writer *writer, bool reuse_fd)
+{
+	if (writer->last_stmt != NULL)
+		vy_stmt_unref_if_possible(writer->last_stmt);
+	if (xlog_is_open(&writer->data_xlog))
+		xlog_close(&writer->data_xlog, reuse_fd);
+	if (writer->has_bloom)
+		bloom_spectrum_destroy(&writer->bloom, runtime.quota);
+	ibuf_destroy(&writer->row_index_buf);
+}
+
+int
+vy_run_writer_commit(struct vy_run_writer *writer)
+{
+	int rc = -1;
+	size_t region_svp = region_used(&fiber()->gc);
+
+	if (ibuf_used(&writer->row_index_buf) != 0 &&
+	    vy_run_writer_end_page(writer) != 0)
+		goto out;
+
+	struct vy_run *run = writer->run;
+	if (vy_run_is_empty(run)) {
+		vy_run_writer_destroy(writer, false);
+		rc = 0;
+		goto out;
+	}
+
+	assert(writer->last_stmt != NULL);
+	const char *key = tuple_extract_key(writer->last_stmt,
+					    writer->cmp_def, NULL);
+	if (key == NULL)
+		goto out;
+
+	assert(run->info.max_key == NULL);
+	run->info.max_key = vy_key_dup(key);
+	if (run->info.max_key == NULL)
+		goto out;
+
+	/* Sync data and link the file to the final name. */
+	if (xlog_sync(&writer->data_xlog) < 0 ||
+	    xlog_rename(&writer->data_xlog) < 0)
+		goto out;
+
+	if (writer->has_bloom) {
+		bloom_spectrum_choose(&writer->bloom, &run->info.bloom);
+		run->info.has_bloom = true;
+	}
+	if (vy_run_write_index(run, writer->dirpath,
+			       writer->space_id, writer->iid) != 0)
+		goto out;
+
+	run->fd = writer->data_xlog.fd;
+	vy_run_writer_destroy(writer, true);
+	rc = 0;
+out:
+	region_truncate(&fiber()->gc, region_svp);
+	return rc;
+}
+
+void
+vy_run_writer_abort(struct vy_run_writer *writer)
+{
+	vy_run_writer_destroy(writer, false);
+}
+
+int
 vy_run_rebuild_index(struct vy_run *run, const char *dir,
 		     uint32_t space_id, uint32_t iid,
 		     const struct key_def *cmp_def,
diff --git a/src/box/vy_run.h b/src/box/vy_run.h
index de54937c..60a29d73 100644
--- a/src/box/vy_run.h
+++ b/src/box/vy_run.h
@@ -41,6 +41,7 @@
 #include "vy_read_view.h"
 #include "vy_stat.h"
 #include "index_def.h"
+#include "xlog.h"
 
 #include "small/mempool.h"
 #include "salad/bloom.h"
@@ -418,14 +419,6 @@ int
 vy_run_remove_files(const char *dir, uint32_t space_id,
 		    uint32_t iid, int64_t run_id);
 
-int
-vy_run_write(struct vy_run *run, const char *dirpath,
-	     uint32_t space_id, uint32_t iid,
-	     struct vy_stmt_stream *wi, uint64_t page_size,
-	     const struct key_def *cmp_def,
-	     const struct key_def *key_def,
-	     size_t max_output_count, double bloom_fpr);
-
 /**
  * Allocate a new run slice.
  * This function increments @run->refs.
@@ -575,6 +568,88 @@ vy_slice_stream_open(struct vy_slice_stream *stream, struct vy_slice *slice,
 		     const struct key_def *cmp_def, struct tuple_format *format,
 		     struct tuple_format *upsert_format, bool is_primary);
 
+/**
+ * Run_writer fills a created run with statements one by one,
+ * splitting them into pages.
+ */
+struct vy_run_writer {
+	/** Run to fill. */
+	struct vy_run *run;
+	/** Path to directory with run files. */
+	const char *dirpath;
+	/** Identifier of a space owning the run. */
+	uint32_t space_id;
+	/** Identifier of an index owning the run. */
+	uint32_t iid;
+	/**
+	 * Key definition to extract from tuple and store as page
+	 * min key, run min/max keys, and secondary index
+	 * statements.
+	 */
+	const struct key_def *cmp_def;
+	/** Key definition to calculate bloom. */
+	const struct key_def *key_def;
+	/**
+	 * Minimal page size. When a page becames bigger, it is
+	 * dumped.
+	 */
+	uint64_t page_size;
+	/**
+	 * Current page info capacity. Can grow with page number.
+	 */
+	uint32_t page_info_capacity;
+	/** Xlog to write data. */
+	struct xlog data_xlog;
+	/** Set iff bloom filter is available. */
+	bool has_bloom;
+	/** Bloom filter. */
+	struct bloom_spectrum bloom;
+	/** Buffer of a current page row offsets. */
+	struct ibuf row_index_buf;
+	/**
+	 * Remember a last written statement to use it as a source
+	 * of max key of a finished run.
+	 */
+	struct tuple *last_stmt;
+};
+
+/** Create a run writer to fill a run with statements. */
+int
+vy_run_writer_create(struct vy_run_writer *writer, struct vy_run *run,
+		const char *dirpath, uint32_t space_id, uint32_t iid,
+		const struct key_def *cmp_def, const struct key_def *key_def,
+		uint64_t page_size, double bloom_fpr, size_t max_output_count);
+
+/**
+ * Write a specified statement into a run.
+ * @param writer Writer to write a statement.
+ * @param stmt Statement to write.
+ *
+ * @retval -1 Memory error.
+ * @retval  0 Success.
+ */
+int
+vy_run_writer_append_stmt(struct vy_run_writer *writer, struct tuple *stmt);
+
+/**
+ * Finalize run writing by writing run index into file. The writer
+ * is deleted after call.
+ * @param writer Run writer.
+ * @retval -1 Memory or IO error.
+ * @retval  0 Success.
+ */
+int
+vy_run_writer_commit(struct vy_run_writer *writer);
+
+/**
+ * Abort run writing. Can not delete a run and run's file here,
+ * becase it must be done from tx thread. The writer is deleted
+ * after call.
+ * @param Run writer.
+ */
+void
+vy_run_writer_abort(struct vy_run_writer *writer);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index df8eb87f..e4cf5601 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -622,15 +622,55 @@ vy_run_discard(struct vy_run *run)
 }
 
 static int
-vy_task_dump_execute(struct vy_task *task)
+vy_task_write_run(struct vy_task *task)
 {
 	struct vy_index *index = task->index;
+	struct vy_stmt_stream *wi = task->wi;
+
+	ERROR_INJECT(ERRINJ_VY_RUN_WRITE,
+		     {diag_set(ClientError, ER_INJECTION,
+			       "vinyl dump"); return -1;});
+
+	struct errinj *inj = errinj(ERRINJ_VY_RUN_WRITE_TIMEOUT, ERRINJ_DOUBLE);
+	if (inj != NULL && inj->dparam > 0)
+		usleep(inj->dparam * 1000000);
+
+	struct vy_run_writer writer;
+	if (vy_run_writer_create(&writer, task->new_run, index->env->path,
+				 index->space_id, index->id,
+				 index->cmp_def, index->key_def,
+				 task->page_size, task->bloom_fpr,
+				 task->max_output_count) != 0)
+		goto fail;
+
+	if (wi->iface->start(wi) != 0)
+		goto fail_abort_writer;
+	int rc;
+	struct tuple *stmt = NULL;
+	while ((rc = wi->iface->next(wi, &stmt)) == 0 && stmt != NULL) {
+		rc = vy_run_writer_append_stmt(&writer, stmt);
+		if (rc != 0)
+			break;
+	}
+	wi->iface->stop(wi);
 
-	return vy_run_write(task->new_run, index->env->path,
-			    index->space_id, index->id, task->wi,
-			    task->page_size, index->cmp_def,
-			    index->key_def, task->max_output_count,
-			    task->bloom_fpr);
+	if (rc == 0)
+		rc = vy_run_writer_commit(&writer);
+	if (rc != 0)
+		goto fail_abort_writer;
+
+	return 0;
+
+fail_abort_writer:
+	vy_run_writer_abort(&writer);
+fail:
+	return -1;
+}
+
+static int
+vy_task_dump_execute(struct vy_task *task)
+{
+	return vy_task_write_run(task);
 }
 
 static int
@@ -993,13 +1033,7 @@ err:
 static int
 vy_task_compact_execute(struct vy_task *task)
 {
-	struct vy_index *index = task->index;
-
-	return vy_run_write(task->new_run, index->env->path,
-			    index->space_id, index->id, task->wi,
-			    task->page_size, index->cmp_def,
-			    index->key_def, task->max_output_count,
-			    task->bloom_fpr);
+	return vy_task_write_run(task);
 }
 
 static int
diff --git a/test/unit/vy_point_lookup.c b/test/unit/vy_point_lookup.c
index d360b3b4..0cc8dc22 100644
--- a/test/unit/vy_point_lookup.c
+++ b/test/unit/vy_point_lookup.c
@@ -13,13 +13,48 @@
 
 uint32_t schema_version;
 
+static int
+write_run(struct vy_run *run, const char *dir_name,
+	  struct vy_index *index, struct vy_stmt_stream *wi)
+{
+	struct vy_run_writer writer;
+	if (vy_run_writer_create(&writer, run, dir_name,
+				 index->space_id, index->id,
+				 index->cmp_def, index->key_def,
+				 4096, 0.1, 100500) != 0)
+		goto fail;
+
+	if (wi->iface->start(wi) != 0)
+		goto fail_abort_writer;
+	int rc;
+	struct tuple *stmt = NULL;
+	while ((rc = wi->iface->next(wi, &stmt)) == 0 && stmt != NULL) {
+		rc = vy_run_writer_append_stmt(&writer, stmt);
+		if (rc != 0)
+			break;
+	}
+	wi->iface->stop(wi);
+
+	if (rc == 0)
+		rc = vy_run_writer_commit(&writer);
+	if (rc != 0)
+		goto fail_abort_writer;
+
+	return 0;
+
+fail_abort_writer:
+	vy_run_writer_abort(&writer);
+fail:
+	return -1;
+}
+
 static void
 test_basic()
 {
 	header();
 	plan(15);
 
-	/** Suppress info messages from vy_run_write(). */
+	/** Suppress info messages from vy_run_writer. */
 	say_set_log_level(S_WARN);
 
 	const size_t QUOTA = 100 * 1024 * 1024;
@@ -72,7 +107,7 @@ test_basic()
 	isnt(dir_name, NULL, "temp dir name is not NULL")
 	char path[PATH_MAX];
 	strcpy(path, dir_name);
-	strcat(path, "/0");
+	strcat(path, "/512");
 	rc = mkdir(path, 0777);
 	is(rc, 0, "temp dir create (2)");
 	strcat(path, "/0");
@@ -163,9 +198,7 @@ test_basic()
 	struct vy_run *run = vy_run_new(&run_env, 1);
 	isnt(run, NULL, "vy_run_new");
 
-	rc = vy_run_write(run, dir_name, 0, pk->id,
-			  write_stream, 4096, pk->cmp_def, pk->key_def,
-			  100500, 0.1);
+	rc = write_run(run, dir_name, pk, write_stream);
 	is(rc, 0, "vy_run_write");
 
 	write_stream->iface->close(write_stream);
@@ -200,9 +233,7 @@ test_basic()
 	run = vy_run_new(&run_env, 2);
 	isnt(run, NULL, "vy_run_new");
 
-	rc = vy_run_write(run, dir_name, 0, pk->id,
-			  write_stream, 4096, pk->cmp_def, pk->key_def,
-			  100500, 0.1);
+	rc = write_run(run, dir_name, pk, write_stream);
 	is(rc, 0, "vy_run_write");
 
 	write_stream->iface->close(write_stream);
-- 
2.11.0




More information about the Tarantool-patches mailing list