[PATCH 3/5] vinyl: encapsulate reader thread selection logic in a helper function
Vladimir Davydov
vdavydov.dev at gmail.com
Wed May 29 18:12:49 MSK 2019
Page reading code is intermixed with the reader thread selection in the
same function, which makes it difficult to extend the former. So let's
introduce a helper function encapsulating a call on behalf of a reader
thread.
---
src/box/vy_run.c | 106 +++++++++++++++++++++++++++++--------------------------
1 file changed, 55 insertions(+), 51 deletions(-)
diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index 5b990992..a8ca3afe 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -197,6 +197,44 @@ vy_run_env_enable_coio(struct vy_run_env *env)
}
/**
+ * Execute a task on behalf of a reader thread.
+ * Calls free_cb on failure.
+ */
+static int
+vy_run_env_coio_call(struct vy_run_env *env, struct cbus_call_msg *msg,
+ cbus_call_f func, cbus_call_f free_cb, double timeout)
+{
+ /* Optimization: use blocking I/O during WAL recovery. */
+ if (env->reader_pool == NULL) {
+ if (func(msg) != 0)
+ goto fail;
+ return 0;
+ }
+
+ /* Pick a reader thread. */
+ struct vy_run_reader *reader;
+ reader = &env->reader_pool[env->next_reader++];
+ env->next_reader %= env->reader_pool_size;
+
+ /* Post the task to the reader thread. */
+ int rc = cbus_call(&reader->reader_pipe, &reader->tx_pipe,
+ msg, func, free_cb, timeout);
+ if (!msg->complete) {
+ /*
+ * Operation timed out or the fiber was cancelled.
+ * free_cb will be called on task completion.
+ */
+ return -1;
+ }
+ if (rc != 0)
+ goto fail;
+ return 0;
+fail:
+ free_cb(msg);
+ return -1;
+}
+
+/**
* Initialize page info struct
*
* @retval 0 for Success
@@ -996,58 +1034,24 @@ vy_run_iterator_load_page(struct vy_run_iterator *itr, uint32_t page_no,
return -1;
/* Read page data from the disk */
- int rc;
- if (env->reader_pool != NULL) {
- /* Allocate a cbus task. */
- struct vy_page_read_task *task;
- task = mempool_alloc(&env->read_task_pool);
- if (task == NULL) {
- diag_set(OutOfMemory, sizeof(*task), "mempool",
- "vy_page_read_task");
- vy_page_delete(page);
- return -1;
- }
-
- /* Pick a reader thread. */
- struct vy_run_reader *reader;
- reader = &env->reader_pool[env->next_reader++];
- env->next_reader %= env->reader_pool_size;
-
- task->run = slice->run;
- task->page_info = page_info;
- task->page = page;
- vy_run_ref(task->run);
-
- /* Post task to the reader thread. */
- rc = cbus_call(&reader->reader_pipe, &reader->tx_pipe,
- &task->base, vy_page_read_cb,
- vy_page_read_cb_free, TIMEOUT_INFINITY);
- if (!task->base.complete)
- return -1; /* timed out or cancelled */
-
- vy_run_unref(task->run);
- mempool_free(&env->read_task_pool, task);
-
- if (rc != 0) {
- /* posted, but failed */
- vy_page_delete(page);
- return -1;
- }
- } else {
- /*
- * Optimization: use blocked I/O for non-TX threads or
- * during WAL recovery (env->status != VINYL_ONLINE).
- */
- ZSTD_DStream *zdctx = vy_env_get_zdctx(env);
- if (zdctx == NULL) {
- vy_page_delete(page);
- return -1;
- }
- if (vy_page_read(page, page_info, slice->run, zdctx) != 0) {
- vy_page_delete(page);
- return -1;
- }
+ struct vy_page_read_task *task = mempool_alloc(&env->read_task_pool);
+ if (task == NULL) {
+ diag_set(OutOfMemory, sizeof(*task),
+ "mempool", "vy_page_read_task");
+ vy_page_delete(page);
+ return -1;
}
+ task->run = slice->run;
+ task->page_info = page_info;
+ task->page = page;
+ vy_run_ref(task->run);
+
+ if (vy_run_env_coio_call(env, &task->base, vy_page_read_cb,
+ vy_page_read_cb_free, TIMEOUT_INFINITY) != 0)
+ return -1;
+
+ vy_run_unref(task->run);
+ mempool_free(&env->read_task_pool, task);
/* Update cache */
if (itr->prev_page != NULL)
--
2.11.0
More information about the Tarantool-patches
mailing list