From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 3/5] vinyl: encapsulate reader thread selection logic in a helper function Date: Wed, 29 May 2019 18:12:49 +0300 Message-Id: <82705410db4b7fa6c31be8c20366bbd3d99a83e1.1559142561.git.vdavydov.dev@gmail.com> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org List-ID: Page reading code is intermixed with the reader thread selection in the same function, which makes it difficult to extend the former. So let's introduce a helper function encapsulating a call on behalf of a reader thread. --- src/box/vy_run.c | 106 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 55 insertions(+), 51 deletions(-) diff --git a/src/box/vy_run.c b/src/box/vy_run.c index 5b990992..a8ca3afe 100644 --- a/src/box/vy_run.c +++ b/src/box/vy_run.c @@ -197,6 +197,44 @@ vy_run_env_enable_coio(struct vy_run_env *env) } /** + * Execute a task on behalf of a reader thread. + * Calls free_cb on failure. + */ +static int +vy_run_env_coio_call(struct vy_run_env *env, struct cbus_call_msg *msg, + cbus_call_f func, cbus_call_f free_cb, double timeout) +{ + /* Optimization: use blocking I/O during WAL recovery. */ + if (env->reader_pool == NULL) { + if (func(msg) != 0) + goto fail; + return 0; + } + + /* Pick a reader thread. */ + struct vy_run_reader *reader; + reader = &env->reader_pool[env->next_reader++]; + env->next_reader %= env->reader_pool_size; + + /* Post the task to the reader thread. */ + int rc = cbus_call(&reader->reader_pipe, &reader->tx_pipe, + msg, func, free_cb, timeout); + if (!msg->complete) { + /* + * Operation timed out or the fiber was cancelled. + * free_cb will be called on task completion. + */ + return -1; + } + if (rc != 0) + goto fail; + return 0; +fail: + free_cb(msg); + return -1; +} + +/** * Initialize page info struct * * @retval 0 for Success @@ -996,58 +1034,24 @@ vy_run_iterator_load_page(struct vy_run_iterator *itr, uint32_t page_no, return -1; /* Read page data from the disk */ - int rc; - if (env->reader_pool != NULL) { - /* Allocate a cbus task. */ - struct vy_page_read_task *task; - task = mempool_alloc(&env->read_task_pool); - if (task == NULL) { - diag_set(OutOfMemory, sizeof(*task), "mempool", - "vy_page_read_task"); - vy_page_delete(page); - return -1; - } - - /* Pick a reader thread. */ - struct vy_run_reader *reader; - reader = &env->reader_pool[env->next_reader++]; - env->next_reader %= env->reader_pool_size; - - task->run = slice->run; - task->page_info = page_info; - task->page = page; - vy_run_ref(task->run); - - /* Post task to the reader thread. */ - rc = cbus_call(&reader->reader_pipe, &reader->tx_pipe, - &task->base, vy_page_read_cb, - vy_page_read_cb_free, TIMEOUT_INFINITY); - if (!task->base.complete) - return -1; /* timed out or cancelled */ - - vy_run_unref(task->run); - mempool_free(&env->read_task_pool, task); - - if (rc != 0) { - /* posted, but failed */ - vy_page_delete(page); - return -1; - } - } else { - /* - * Optimization: use blocked I/O for non-TX threads or - * during WAL recovery (env->status != VINYL_ONLINE). - */ - ZSTD_DStream *zdctx = vy_env_get_zdctx(env); - if (zdctx == NULL) { - vy_page_delete(page); - return -1; - } - if (vy_page_read(page, page_info, slice->run, zdctx) != 0) { - vy_page_delete(page); - return -1; - } + struct vy_page_read_task *task = mempool_alloc(&env->read_task_pool); + if (task == NULL) { + diag_set(OutOfMemory, sizeof(*task), + "mempool", "vy_page_read_task"); + vy_page_delete(page); + return -1; } + task->run = slice->run; + task->page_info = page_info; + task->page = page; + vy_run_ref(task->run); + + if (vy_run_env_coio_call(env, &task->base, vy_page_read_cb, + vy_page_read_cb_free, TIMEOUT_INFINITY) != 0) + return -1; + + vy_run_unref(task->run); + mempool_free(&env->read_task_pool, task); /* Update cache */ if (itr->prev_page != NULL) -- 2.11.0