From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 13/25] vinyl: use cbus for communication between scheduler and worker threads Date: Fri, 27 Jul 2018 14:29:53 +0300 Message-Id: <513069ac9272442697671bbff2c06ccc9698900a.1532689066.git.vdavydov.dev@gmail.com> In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: We need cbus for forwarding deferred DELETE statements generated in a worker thread during primary index compaction to the tx thread where they can be inserted into secondary indexes. Since pthread mutex/cond and cbus are incompatible by their nature, let's rework communication channel between the tx and worker threads using cbus. Needed for #2129 --- src/box/vy_scheduler.c | 215 ++++++++++++++++++++++++++++++------------------- src/box/vy_scheduler.h | 25 +----- 2 files changed, 134 insertions(+), 106 deletions(-) diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c index 1ae6dd02..4813f6f4 100644 --- a/src/box/vy_scheduler.c +++ b/src/box/vy_scheduler.c @@ -46,6 +46,7 @@ #include "errinj.h" #include "fiber.h" #include "fiber_cond.h" +#include "cbus.h" #include "salad/stailq.h" #include "say.h" #include "vy_lsm.h" @@ -55,14 +56,34 @@ #include "vy_run.h" #include "vy_write_iterator.h" #include "trivia/util.h" -#include "tt_pthread.h" /* Min and max values for vy_scheduler::timeout. */ #define VY_SCHEDULER_TIMEOUT_MIN 1 #define VY_SCHEDULER_TIMEOUT_MAX 60 -static void *vy_worker_f(void *); +static int vy_worker_f(va_list); static int vy_scheduler_f(va_list); +static void vy_task_execute_f(struct cmsg *); +static void vy_task_complete_f(struct cmsg *); + +static const struct cmsg_hop vy_task_execute_route[] = { + { vy_task_execute_f, NULL }, +}; + +static const struct cmsg_hop vy_task_complete_route[] = { + { vy_task_complete_f, NULL }, +}; + +/** Vinyl worker thread. */ +struct vy_worker { + struct cord cord; + /** Pipe from tx to the worker thread. */ + struct cpipe worker_pipe; + /** Pipe from the worker thread to tx. */ + struct cpipe tx_pipe; + /** Link in vy_scheduler::idle_workers. */ + struct stailq_entry in_idle; +}; struct vy_task; @@ -89,10 +110,22 @@ struct vy_task_ops { }; struct vy_task { + /** + * CBus message used for sending the task to/from + * a worker thread. + */ + struct cmsg cmsg; /** Virtual method table. */ const struct vy_task_ops *ops; /** Pointer to the scheduler. */ struct vy_scheduler *scheduler; + /** Worker thread this task is assigned to. */ + struct vy_worker *worker; + /** + * Fiber that is currently executing this task in + * a worker thread. + */ + struct fiber *fiber; /** Return code of ->execute. */ int status; /** If ->execute fails, the error is stored here. */ @@ -126,8 +159,6 @@ struct vy_task { */ double bloom_fpr; int64_t page_size; - /** Link in vy_scheduler::pending_tasks. */ - struct stailq_entry in_pending; /** Link in vy_scheduler::processed_tasks. */ struct stailq_entry in_processed; }; @@ -241,16 +272,6 @@ vy_compact_heap_less(struct heap_node *a, struct heap_node *b) #undef HEAP_NAME static void -vy_scheduler_async_cb(ev_loop *loop, struct ev_async *watcher, int events) -{ - (void)loop; - (void)events; - struct vy_scheduler *scheduler = container_of(watcher, - struct vy_scheduler, scheduler_async); - fiber_cond_signal(&scheduler->scheduler_cond); -} - -static void vy_scheduler_start_workers(struct vy_scheduler *scheduler) { assert(!scheduler->is_worker_pool_running); @@ -260,17 +281,19 @@ vy_scheduler_start_workers(struct vy_scheduler *scheduler) scheduler->is_worker_pool_running = true; scheduler->idle_worker_count = scheduler->worker_pool_size; scheduler->worker_pool = calloc(scheduler->worker_pool_size, - sizeof(struct cord)); + sizeof(*scheduler->worker_pool)); if (scheduler->worker_pool == NULL) panic("failed to allocate vinyl worker pool"); - ev_async_start(scheduler->scheduler_loop, &scheduler->scheduler_async); for (int i = 0; i < scheduler->worker_pool_size; i++) { char name[FIBER_NAME_MAX]; snprintf(name, sizeof(name), "vinyl.writer.%d", i); - if (cord_start(&scheduler->worker_pool[i], name, - vy_worker_f, scheduler) != 0) + struct vy_worker *worker = &scheduler->worker_pool[i]; + if (cord_costart(&worker->cord, name, vy_worker_f, worker) != 0) panic("failed to start vinyl worker thread"); + cpipe_create(&worker->worker_pipe, name); + stailq_add_tail_entry(&scheduler->idle_workers, + worker, in_idle); } } @@ -280,16 +303,12 @@ vy_scheduler_stop_workers(struct vy_scheduler *scheduler) assert(scheduler->is_worker_pool_running); scheduler->is_worker_pool_running = false; - /* Wake up worker threads. */ - tt_pthread_mutex_lock(&scheduler->mutex); - pthread_cond_broadcast(&scheduler->worker_cond); - tt_pthread_mutex_unlock(&scheduler->mutex); - - /* Wait for worker threads to exit. */ - for (int i = 0; i < scheduler->worker_pool_size; i++) - cord_join(&scheduler->worker_pool[i]); - ev_async_stop(scheduler->scheduler_loop, &scheduler->scheduler_async); - + for (int i = 0; i < scheduler->worker_pool_size; i++) { + struct vy_worker *worker = &scheduler->worker_pool[i]; + cbus_stop_loop(&worker->worker_pipe); + cpipe_destroy(&worker->worker_pipe); + cord_join(&worker->cord); + } free(scheduler->worker_pool); scheduler->worker_pool = NULL; } @@ -310,19 +329,14 @@ vy_scheduler_create(struct vy_scheduler *scheduler, int write_threads, if (scheduler->scheduler_fiber == NULL) panic("failed to allocate vinyl scheduler fiber"); - scheduler->scheduler_loop = loop(); fiber_cond_create(&scheduler->scheduler_cond); - ev_async_init(&scheduler->scheduler_async, vy_scheduler_async_cb); scheduler->worker_pool_size = write_threads; mempool_create(&scheduler->task_pool, cord_slab_cache(), sizeof(struct vy_task)); - stailq_create(&scheduler->pending_tasks); + stailq_create(&scheduler->idle_workers); stailq_create(&scheduler->processed_tasks); - tt_pthread_cond_init(&scheduler->worker_cond, NULL); - tt_pthread_mutex_init(&scheduler->mutex, NULL); - vy_dump_heap_create(&scheduler->dump_heap); vy_compact_heap_create(&scheduler->compact_heap); @@ -344,9 +358,6 @@ vy_scheduler_destroy(struct vy_scheduler *scheduler) if (scheduler->is_worker_pool_running) vy_scheduler_stop_workers(scheduler); - tt_pthread_cond_destroy(&scheduler->worker_cond); - tt_pthread_mutex_destroy(&scheduler->mutex); - diag_destroy(&scheduler->diag); mempool_destroy(&scheduler->task_pool); fiber_cond_destroy(&scheduler->dump_cond); @@ -647,6 +658,8 @@ vy_run_discard(struct vy_run *run) static int vy_task_write_run(struct vy_task *task) { + enum { YIELD_LOOPS = 32 }; + struct vy_lsm *lsm = task->lsm; struct vy_stmt_stream *wi = task->wi; @@ -668,6 +681,7 @@ vy_task_write_run(struct vy_task *task) if (wi->iface->start(wi) != 0) goto fail_abort_writer; int rc; + int loops = 0; struct tuple *stmt = NULL; while ((rc = wi->iface->next(wi, &stmt)) == 0 && stmt != NULL) { inj = errinj(ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT, ERRINJ_DOUBLE); @@ -678,7 +692,9 @@ vy_task_write_run(struct vy_task *task) if (rc != 0) break; - if (!task->scheduler->is_worker_pool_running) { + if (++loops % YIELD_LOOPS == 0) + fiber_sleep(0); + if (fiber_is_cancelled()) { diag_set(FiberIsCancelled); rc = -1; break; @@ -1316,6 +1332,62 @@ err_task: } /** + * Fiber function that actually executes a vinyl task. + * After finishing a task, it sends it back to tx. + */ +static int +vy_task_f(va_list va) +{ + struct vy_task *task = va_arg(va, struct vy_task *); + task->status = task->ops->execute(task); + if (task->status != 0) { + struct diag *diag = diag_get(); + assert(!diag_is_empty(diag)); + diag_move(diag, &task->diag); + } + cmsg_init(&task->cmsg, vy_task_complete_route); + cpipe_push(&task->worker->tx_pipe, &task->cmsg); + task->fiber = NULL; + return 0; +} + +/** + * Callback invoked by a worker thread upon receiving a task. + * It schedules a fiber which actually executes the task, so + * as not to block the event loop. + */ +static void +vy_task_execute_f(struct cmsg *cmsg) +{ + struct vy_task *task = container_of(cmsg, struct vy_task, cmsg); + assert(task->fiber == NULL); + task->fiber = fiber_new("task", vy_task_f); + if (task->fiber == NULL) { + task->status = -1; + diag_move(diag_get(), &task->diag); + cmsg_init(&task->cmsg, vy_task_complete_route); + cpipe_push(&task->worker->tx_pipe, &task->cmsg); + } else { + fiber_start(task->fiber, task); + } +} + +/** + * Callback invoked by the tx thread upon receiving an executed + * task from a worker thread. It adds the task to the processed + * task queue and wakes up the scheduler so that it can complete + * it. + */ +static void +vy_task_complete_f(struct cmsg *cmsg) +{ + struct vy_task *task = container_of(cmsg, struct vy_task, cmsg); + stailq_add_tail_entry(&task->scheduler->processed_tasks, + task, in_processed); + fiber_cond_signal(&task->scheduler->scheduler_cond); +} + +/** * Create a task for dumping an LSM tree. The new task is returned * in @ptask. If there's no LSM tree that needs to be dumped @ptask * is set to NULL. @@ -1503,13 +1575,10 @@ vy_scheduler_f(va_list va) struct stailq processed_tasks; struct vy_task *task, *next; int tasks_failed = 0, tasks_done = 0; - bool was_empty; /* Get the list of processed tasks. */ stailq_create(&processed_tasks); - tt_pthread_mutex_lock(&scheduler->mutex); stailq_concat(&processed_tasks, &scheduler->processed_tasks); - tt_pthread_mutex_unlock(&scheduler->mutex); /* Complete and delete all processed tasks. */ stailq_foreach_entry_safe(task, next, &processed_tasks, @@ -1518,6 +1587,8 @@ vy_scheduler_f(va_list va) tasks_failed++; else tasks_done++; + stailq_add_entry(&scheduler->idle_workers, + task->worker, in_idle); vy_task_delete(task); scheduler->idle_worker_count++; assert(scheduler->idle_worker_count <= @@ -1553,15 +1624,13 @@ vy_scheduler_f(va_list va) goto wait; /* Queue the task and notify workers if necessary. */ - tt_pthread_mutex_lock(&scheduler->mutex); - was_empty = stailq_empty(&scheduler->pending_tasks); - stailq_add_tail_entry(&scheduler->pending_tasks, - task, in_pending); - if (was_empty) - tt_pthread_cond_signal(&scheduler->worker_cond); - tt_pthread_mutex_unlock(&scheduler->mutex); - + assert(!stailq_empty(&scheduler->idle_workers)); + task->worker = stailq_shift_entry(&scheduler->idle_workers, + struct vy_worker, in_idle); scheduler->idle_worker_count--; + cmsg_init(&task->cmsg, vy_task_execute_route); + cpipe_push(&task->worker->worker_pipe, &task->cmsg); + fiber_reschedule(); continue; error: @@ -1597,41 +1666,17 @@ wait: return 0; } -static void * -vy_worker_f(void *arg) +static int +vy_worker_f(va_list ap) { - struct vy_scheduler *scheduler = arg; - struct vy_task *task = NULL; - - tt_pthread_mutex_lock(&scheduler->mutex); - while (scheduler->is_worker_pool_running) { - /* Wait for a task */ - if (stailq_empty(&scheduler->pending_tasks)) { - /* Wake scheduler up if there are no more tasks */ - ev_async_send(scheduler->scheduler_loop, - &scheduler->scheduler_async); - tt_pthread_cond_wait(&scheduler->worker_cond, - &scheduler->mutex); - continue; - } - task = stailq_shift_entry(&scheduler->pending_tasks, - struct vy_task, in_pending); - tt_pthread_mutex_unlock(&scheduler->mutex); - assert(task != NULL); - - /* Execute task */ - task->status = task->ops->execute(task); - if (task->status != 0) { - struct diag *diag = diag_get(); - assert(!diag_is_empty(diag)); - diag_move(diag, &task->diag); - } - - /* Return processed task to scheduler */ - tt_pthread_mutex_lock(&scheduler->mutex); - stailq_add_tail_entry(&scheduler->processed_tasks, - task, in_processed); - } - tt_pthread_mutex_unlock(&scheduler->mutex); - return NULL; + struct vy_worker *worker = va_arg(ap, struct vy_worker *); + struct cbus_endpoint endpoint; + + cpipe_create(&worker->tx_pipe, "tx"); + cbus_endpoint_create(&endpoint, cord_name(&worker->cord), + fiber_schedule_cb, fiber()); + cbus_loop(&endpoint); + cbus_endpoint_destroy(&endpoint, cbus_process); + cpipe_destroy(&worker->tx_pipe); + return 0; } diff --git a/src/box/vy_scheduler.h b/src/box/vy_scheduler.h index 284f666e..a235aa6f 100644 --- a/src/box/vy_scheduler.h +++ b/src/box/vy_scheduler.h @@ -42,16 +42,15 @@ #define HEAP_FORWARD_DECLARATION #include "salad/heap.h" #include "salad/stailq.h" -#include "tt_pthread.h" #if defined(__cplusplus) extern "C" { #endif /* defined(__cplusplus) */ -struct cord; struct fiber; struct vy_lsm; struct vy_run_env; +struct vy_worker; struct vy_scheduler; typedef void @@ -61,42 +60,26 @@ typedef void struct vy_scheduler { /** Scheduler fiber. */ struct fiber *scheduler_fiber; - /** Scheduler event loop. */ - struct ev_loop *scheduler_loop; /** Used to wake up the scheduler fiber from TX. */ struct fiber_cond scheduler_cond; - /** Used to wake up the scheduler from a worker thread. */ - struct ev_async scheduler_async; /** * Array of worker threads used for performing * dump/compaction tasks. */ - struct cord *worker_pool; + struct vy_worker *worker_pool; /** Set if the worker threads are running. */ bool is_worker_pool_running; /** Total number of worker threads. */ int worker_pool_size; /** Number worker threads that are currently idle. */ int idle_worker_count; + /** List of idle workers, linked by vy_worker::in_idle. */ + struct stailq idle_workers; /** Memory pool used for allocating vy_task objects. */ struct mempool task_pool; - /** Queue of pending tasks, linked by vy_task::in_pending. */ - struct stailq pending_tasks; /** Queue of processed tasks, linked by vy_task::in_processed. */ struct stailq processed_tasks; /** - * Signaled to wake up a worker when there is - * a pending task in the input queue. Also used - * to stop worker threads on shutdown. - */ - pthread_cond_t worker_cond; - /** - * Mutex protecting input and output queues and - * the condition variable used to wake up worker - * threads. - */ - pthread_mutex_t mutex; - /** * Heap of LSM trees, ordered by dump priority, * linked by vy_lsm::in_dump. */ -- 2.11.0