[PATCH 3/5] vinyl: encapsulate reader thread selection logic in a helper function

Vladimir Davydov vdavydov.dev at gmail.com
Wed May 29 18:12:49 MSK 2019


Page reading code is intermixed with the reader thread selection in the
same function, which makes it difficult to extend the former. So let's
introduce a helper function encapsulating a call on behalf of a reader
thread.
---
 src/box/vy_run.c | 106 +++++++++++++++++++++++++++++--------------------------
 1 file changed, 55 insertions(+), 51 deletions(-)

diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index 5b990992..a8ca3afe 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -197,6 +197,44 @@ vy_run_env_enable_coio(struct vy_run_env *env)
 }
 
 /**
+ * Execute a task on behalf of a reader thread.
+ * Calls free_cb on failure.
+ */
+static int
+vy_run_env_coio_call(struct vy_run_env *env, struct cbus_call_msg *msg,
+		     cbus_call_f func, cbus_call_f free_cb, double timeout)
+{
+	/* Optimization: use blocking I/O during WAL recovery. */
+	if (env->reader_pool == NULL) {
+		if (func(msg) != 0)
+			goto fail;
+		return 0;
+	}
+
+	/* Pick a reader thread. */
+	struct vy_run_reader *reader;
+	reader = &env->reader_pool[env->next_reader++];
+	env->next_reader %= env->reader_pool_size;
+
+	/* Post the task to the reader thread. */
+	int rc = cbus_call(&reader->reader_pipe, &reader->tx_pipe,
+			   msg, func, free_cb, timeout);
+	if (!msg->complete) {
+		/*
+		 * Operation timed out or the fiber was cancelled.
+		 * free_cb will be called on task completion.
+		 */
+		return -1;
+	}
+	if (rc != 0)
+		goto fail;
+	return 0;
+fail:
+	free_cb(msg);
+	return -1;
+}
+
+/**
  * Initialize page info struct
  *
  * @retval 0 for Success
@@ -996,58 +1034,24 @@ vy_run_iterator_load_page(struct vy_run_iterator *itr, uint32_t page_no,
 		return -1;
 
 	/* Read page data from the disk */
-	int rc;
-	if (env->reader_pool != NULL) {
-		/* Allocate a cbus task. */
-		struct vy_page_read_task *task;
-		task = mempool_alloc(&env->read_task_pool);
-		if (task == NULL) {
-			diag_set(OutOfMemory, sizeof(*task), "mempool",
-				 "vy_page_read_task");
-			vy_page_delete(page);
-			return -1;
-		}
-
-		/* Pick a reader thread. */
-		struct vy_run_reader *reader;
-		reader = &env->reader_pool[env->next_reader++];
-		env->next_reader %= env->reader_pool_size;
-
-		task->run = slice->run;
-		task->page_info = page_info;
-		task->page = page;
-		vy_run_ref(task->run);
-
-		/* Post task to the reader thread. */
-		rc = cbus_call(&reader->reader_pipe, &reader->tx_pipe,
-			       &task->base, vy_page_read_cb,
-			       vy_page_read_cb_free, TIMEOUT_INFINITY);
-		if (!task->base.complete)
-			return -1; /* timed out or cancelled */
-
-		vy_run_unref(task->run);
-		mempool_free(&env->read_task_pool, task);
-
-		if (rc != 0) {
-			/* posted, but failed */
-			vy_page_delete(page);
-			return -1;
-		}
-	} else {
-		/*
-		 * Optimization: use blocked I/O for non-TX threads or
-		 * during WAL recovery (env->status != VINYL_ONLINE).
-		 */
-		ZSTD_DStream *zdctx = vy_env_get_zdctx(env);
-		if (zdctx == NULL) {
-			vy_page_delete(page);
-			return -1;
-		}
-		if (vy_page_read(page, page_info, slice->run, zdctx) != 0) {
-			vy_page_delete(page);
-			return -1;
-		}
+	struct vy_page_read_task *task = mempool_alloc(&env->read_task_pool);
+	if (task == NULL) {
+		diag_set(OutOfMemory, sizeof(*task),
+			 "mempool", "vy_page_read_task");
+		vy_page_delete(page);
+		return -1;
 	}
+	task->run = slice->run;
+	task->page_info = page_info;
+	task->page = page;
+	vy_run_ref(task->run);
+
+	if (vy_run_env_coio_call(env, &task->base, vy_page_read_cb,
+				 vy_page_read_cb_free, TIMEOUT_INFINITY) != 0)
+		return -1;
+
+	vy_run_unref(task->run);
+	mempool_free(&env->read_task_pool, task);
 
 	/* Update cache */
 	if (itr->prev_page != NULL)
-- 
2.11.0




More information about the Tarantool-patches mailing list