From: Vladimir Davydov <vdavydov.dev@gmail.com> To: kostja@tarantool.org Cc: tarantool-patches@freelists.org Subject: [PATCH 13/25] vinyl: use cbus for communication between scheduler and worker threads Date: Fri, 27 Jul 2018 14:29:53 +0300 [thread overview] Message-ID: <513069ac9272442697671bbff2c06ccc9698900a.1532689066.git.vdavydov.dev@gmail.com> (raw) In-Reply-To: <cover.1532689065.git.vdavydov.dev@gmail.com> In-Reply-To: <cover.1532689065.git.vdavydov.dev@gmail.com> We need cbus for forwarding deferred DELETE statements generated in a worker thread during primary index compaction to the tx thread where they can be inserted into secondary indexes. Since pthread mutex/cond and cbus are incompatible by their nature, let's rework communication channel between the tx and worker threads using cbus. Needed for #2129 --- src/box/vy_scheduler.c | 215 ++++++++++++++++++++++++++++++------------------- src/box/vy_scheduler.h | 25 +----- 2 files changed, 134 insertions(+), 106 deletions(-) diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c index 1ae6dd02..4813f6f4 100644 --- a/src/box/vy_scheduler.c +++ b/src/box/vy_scheduler.c @@ -46,6 +46,7 @@ #include "errinj.h" #include "fiber.h" #include "fiber_cond.h" +#include "cbus.h" #include "salad/stailq.h" #include "say.h" #include "vy_lsm.h" @@ -55,14 +56,34 @@ #include "vy_run.h" #include "vy_write_iterator.h" #include "trivia/util.h" -#include "tt_pthread.h" /* Min and max values for vy_scheduler::timeout. */ #define VY_SCHEDULER_TIMEOUT_MIN 1 #define VY_SCHEDULER_TIMEOUT_MAX 60 -static void *vy_worker_f(void *); +static int vy_worker_f(va_list); static int vy_scheduler_f(va_list); +static void vy_task_execute_f(struct cmsg *); +static void vy_task_complete_f(struct cmsg *); + +static const struct cmsg_hop vy_task_execute_route[] = { + { vy_task_execute_f, NULL }, +}; + +static const struct cmsg_hop vy_task_complete_route[] = { + { vy_task_complete_f, NULL }, +}; + +/** Vinyl worker thread. */ +struct vy_worker { + struct cord cord; + /** Pipe from tx to the worker thread. */ + struct cpipe worker_pipe; + /** Pipe from the worker thread to tx. */ + struct cpipe tx_pipe; + /** Link in vy_scheduler::idle_workers. */ + struct stailq_entry in_idle; +}; struct vy_task; @@ -89,10 +110,22 @@ struct vy_task_ops { }; struct vy_task { + /** + * CBus message used for sending the task to/from + * a worker thread. + */ + struct cmsg cmsg; /** Virtual method table. */ const struct vy_task_ops *ops; /** Pointer to the scheduler. */ struct vy_scheduler *scheduler; + /** Worker thread this task is assigned to. */ + struct vy_worker *worker; + /** + * Fiber that is currently executing this task in + * a worker thread. + */ + struct fiber *fiber; /** Return code of ->execute. */ int status; /** If ->execute fails, the error is stored here. */ @@ -126,8 +159,6 @@ struct vy_task { */ double bloom_fpr; int64_t page_size; - /** Link in vy_scheduler::pending_tasks. */ - struct stailq_entry in_pending; /** Link in vy_scheduler::processed_tasks. */ struct stailq_entry in_processed; }; @@ -241,16 +272,6 @@ vy_compact_heap_less(struct heap_node *a, struct heap_node *b) #undef HEAP_NAME static void -vy_scheduler_async_cb(ev_loop *loop, struct ev_async *watcher, int events) -{ - (void)loop; - (void)events; - struct vy_scheduler *scheduler = container_of(watcher, - struct vy_scheduler, scheduler_async); - fiber_cond_signal(&scheduler->scheduler_cond); -} - -static void vy_scheduler_start_workers(struct vy_scheduler *scheduler) { assert(!scheduler->is_worker_pool_running); @@ -260,17 +281,19 @@ vy_scheduler_start_workers(struct vy_scheduler *scheduler) scheduler->is_worker_pool_running = true; scheduler->idle_worker_count = scheduler->worker_pool_size; scheduler->worker_pool = calloc(scheduler->worker_pool_size, - sizeof(struct cord)); + sizeof(*scheduler->worker_pool)); if (scheduler->worker_pool == NULL) panic("failed to allocate vinyl worker pool"); - ev_async_start(scheduler->scheduler_loop, &scheduler->scheduler_async); for (int i = 0; i < scheduler->worker_pool_size; i++) { char name[FIBER_NAME_MAX]; snprintf(name, sizeof(name), "vinyl.writer.%d", i); - if (cord_start(&scheduler->worker_pool[i], name, - vy_worker_f, scheduler) != 0) + struct vy_worker *worker = &scheduler->worker_pool[i]; + if (cord_costart(&worker->cord, name, vy_worker_f, worker) != 0) panic("failed to start vinyl worker thread"); + cpipe_create(&worker->worker_pipe, name); + stailq_add_tail_entry(&scheduler->idle_workers, + worker, in_idle); } } @@ -280,16 +303,12 @@ vy_scheduler_stop_workers(struct vy_scheduler *scheduler) assert(scheduler->is_worker_pool_running); scheduler->is_worker_pool_running = false; - /* Wake up worker threads. */ - tt_pthread_mutex_lock(&scheduler->mutex); - pthread_cond_broadcast(&scheduler->worker_cond); - tt_pthread_mutex_unlock(&scheduler->mutex); - - /* Wait for worker threads to exit. */ - for (int i = 0; i < scheduler->worker_pool_size; i++) - cord_join(&scheduler->worker_pool[i]); - ev_async_stop(scheduler->scheduler_loop, &scheduler->scheduler_async); - + for (int i = 0; i < scheduler->worker_pool_size; i++) { + struct vy_worker *worker = &scheduler->worker_pool[i]; + cbus_stop_loop(&worker->worker_pipe); + cpipe_destroy(&worker->worker_pipe); + cord_join(&worker->cord); + } free(scheduler->worker_pool); scheduler->worker_pool = NULL; } @@ -310,19 +329,14 @@ vy_scheduler_create(struct vy_scheduler *scheduler, int write_threads, if (scheduler->scheduler_fiber == NULL) panic("failed to allocate vinyl scheduler fiber"); - scheduler->scheduler_loop = loop(); fiber_cond_create(&scheduler->scheduler_cond); - ev_async_init(&scheduler->scheduler_async, vy_scheduler_async_cb); scheduler->worker_pool_size = write_threads; mempool_create(&scheduler->task_pool, cord_slab_cache(), sizeof(struct vy_task)); - stailq_create(&scheduler->pending_tasks); + stailq_create(&scheduler->idle_workers); stailq_create(&scheduler->processed_tasks); - tt_pthread_cond_init(&scheduler->worker_cond, NULL); - tt_pthread_mutex_init(&scheduler->mutex, NULL); - vy_dump_heap_create(&scheduler->dump_heap); vy_compact_heap_create(&scheduler->compact_heap); @@ -344,9 +358,6 @@ vy_scheduler_destroy(struct vy_scheduler *scheduler) if (scheduler->is_worker_pool_running) vy_scheduler_stop_workers(scheduler); - tt_pthread_cond_destroy(&scheduler->worker_cond); - tt_pthread_mutex_destroy(&scheduler->mutex); - diag_destroy(&scheduler->diag); mempool_destroy(&scheduler->task_pool); fiber_cond_destroy(&scheduler->dump_cond); @@ -647,6 +658,8 @@ vy_run_discard(struct vy_run *run) static int vy_task_write_run(struct vy_task *task) { + enum { YIELD_LOOPS = 32 }; + struct vy_lsm *lsm = task->lsm; struct vy_stmt_stream *wi = task->wi; @@ -668,6 +681,7 @@ vy_task_write_run(struct vy_task *task) if (wi->iface->start(wi) != 0) goto fail_abort_writer; int rc; + int loops = 0; struct tuple *stmt = NULL; while ((rc = wi->iface->next(wi, &stmt)) == 0 && stmt != NULL) { inj = errinj(ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT, ERRINJ_DOUBLE); @@ -678,7 +692,9 @@ vy_task_write_run(struct vy_task *task) if (rc != 0) break; - if (!task->scheduler->is_worker_pool_running) { + if (++loops % YIELD_LOOPS == 0) + fiber_sleep(0); + if (fiber_is_cancelled()) { diag_set(FiberIsCancelled); rc = -1; break; @@ -1316,6 +1332,62 @@ err_task: } /** + * Fiber function that actually executes a vinyl task. + * After finishing a task, it sends it back to tx. + */ +static int +vy_task_f(va_list va) +{ + struct vy_task *task = va_arg(va, struct vy_task *); + task->status = task->ops->execute(task); + if (task->status != 0) { + struct diag *diag = diag_get(); + assert(!diag_is_empty(diag)); + diag_move(diag, &task->diag); + } + cmsg_init(&task->cmsg, vy_task_complete_route); + cpipe_push(&task->worker->tx_pipe, &task->cmsg); + task->fiber = NULL; + return 0; +} + +/** + * Callback invoked by a worker thread upon receiving a task. + * It schedules a fiber which actually executes the task, so + * as not to block the event loop. + */ +static void +vy_task_execute_f(struct cmsg *cmsg) +{ + struct vy_task *task = container_of(cmsg, struct vy_task, cmsg); + assert(task->fiber == NULL); + task->fiber = fiber_new("task", vy_task_f); + if (task->fiber == NULL) { + task->status = -1; + diag_move(diag_get(), &task->diag); + cmsg_init(&task->cmsg, vy_task_complete_route); + cpipe_push(&task->worker->tx_pipe, &task->cmsg); + } else { + fiber_start(task->fiber, task); + } +} + +/** + * Callback invoked by the tx thread upon receiving an executed + * task from a worker thread. It adds the task to the processed + * task queue and wakes up the scheduler so that it can complete + * it. + */ +static void +vy_task_complete_f(struct cmsg *cmsg) +{ + struct vy_task *task = container_of(cmsg, struct vy_task, cmsg); + stailq_add_tail_entry(&task->scheduler->processed_tasks, + task, in_processed); + fiber_cond_signal(&task->scheduler->scheduler_cond); +} + +/** * Create a task for dumping an LSM tree. The new task is returned * in @ptask. If there's no LSM tree that needs to be dumped @ptask * is set to NULL. @@ -1503,13 +1575,10 @@ vy_scheduler_f(va_list va) struct stailq processed_tasks; struct vy_task *task, *next; int tasks_failed = 0, tasks_done = 0; - bool was_empty; /* Get the list of processed tasks. */ stailq_create(&processed_tasks); - tt_pthread_mutex_lock(&scheduler->mutex); stailq_concat(&processed_tasks, &scheduler->processed_tasks); - tt_pthread_mutex_unlock(&scheduler->mutex); /* Complete and delete all processed tasks. */ stailq_foreach_entry_safe(task, next, &processed_tasks, @@ -1518,6 +1587,8 @@ vy_scheduler_f(va_list va) tasks_failed++; else tasks_done++; + stailq_add_entry(&scheduler->idle_workers, + task->worker, in_idle); vy_task_delete(task); scheduler->idle_worker_count++; assert(scheduler->idle_worker_count <= @@ -1553,15 +1624,13 @@ vy_scheduler_f(va_list va) goto wait; /* Queue the task and notify workers if necessary. */ - tt_pthread_mutex_lock(&scheduler->mutex); - was_empty = stailq_empty(&scheduler->pending_tasks); - stailq_add_tail_entry(&scheduler->pending_tasks, - task, in_pending); - if (was_empty) - tt_pthread_cond_signal(&scheduler->worker_cond); - tt_pthread_mutex_unlock(&scheduler->mutex); - + assert(!stailq_empty(&scheduler->idle_workers)); + task->worker = stailq_shift_entry(&scheduler->idle_workers, + struct vy_worker, in_idle); scheduler->idle_worker_count--; + cmsg_init(&task->cmsg, vy_task_execute_route); + cpipe_push(&task->worker->worker_pipe, &task->cmsg); + fiber_reschedule(); continue; error: @@ -1597,41 +1666,17 @@ wait: return 0; } -static void * -vy_worker_f(void *arg) +static int +vy_worker_f(va_list ap) { - struct vy_scheduler *scheduler = arg; - struct vy_task *task = NULL; - - tt_pthread_mutex_lock(&scheduler->mutex); - while (scheduler->is_worker_pool_running) { - /* Wait for a task */ - if (stailq_empty(&scheduler->pending_tasks)) { - /* Wake scheduler up if there are no more tasks */ - ev_async_send(scheduler->scheduler_loop, - &scheduler->scheduler_async); - tt_pthread_cond_wait(&scheduler->worker_cond, - &scheduler->mutex); - continue; - } - task = stailq_shift_entry(&scheduler->pending_tasks, - struct vy_task, in_pending); - tt_pthread_mutex_unlock(&scheduler->mutex); - assert(task != NULL); - - /* Execute task */ - task->status = task->ops->execute(task); - if (task->status != 0) { - struct diag *diag = diag_get(); - assert(!diag_is_empty(diag)); - diag_move(diag, &task->diag); - } - - /* Return processed task to scheduler */ - tt_pthread_mutex_lock(&scheduler->mutex); - stailq_add_tail_entry(&scheduler->processed_tasks, - task, in_processed); - } - tt_pthread_mutex_unlock(&scheduler->mutex); - return NULL; + struct vy_worker *worker = va_arg(ap, struct vy_worker *); + struct cbus_endpoint endpoint; + + cpipe_create(&worker->tx_pipe, "tx"); + cbus_endpoint_create(&endpoint, cord_name(&worker->cord), + fiber_schedule_cb, fiber()); + cbus_loop(&endpoint); + cbus_endpoint_destroy(&endpoint, cbus_process); + cpipe_destroy(&worker->tx_pipe); + return 0; } diff --git a/src/box/vy_scheduler.h b/src/box/vy_scheduler.h index 284f666e..a235aa6f 100644 --- a/src/box/vy_scheduler.h +++ b/src/box/vy_scheduler.h @@ -42,16 +42,15 @@ #define HEAP_FORWARD_DECLARATION #include "salad/heap.h" #include "salad/stailq.h" -#include "tt_pthread.h" #if defined(__cplusplus) extern "C" { #endif /* defined(__cplusplus) */ -struct cord; struct fiber; struct vy_lsm; struct vy_run_env; +struct vy_worker; struct vy_scheduler; typedef void @@ -61,42 +60,26 @@ typedef void struct vy_scheduler { /** Scheduler fiber. */ struct fiber *scheduler_fiber; - /** Scheduler event loop. */ - struct ev_loop *scheduler_loop; /** Used to wake up the scheduler fiber from TX. */ struct fiber_cond scheduler_cond; - /** Used to wake up the scheduler from a worker thread. */ - struct ev_async scheduler_async; /** * Array of worker threads used for performing * dump/compaction tasks. */ - struct cord *worker_pool; + struct vy_worker *worker_pool; /** Set if the worker threads are running. */ bool is_worker_pool_running; /** Total number of worker threads. */ int worker_pool_size; /** Number worker threads that are currently idle. */ int idle_worker_count; + /** List of idle workers, linked by vy_worker::in_idle. */ + struct stailq idle_workers; /** Memory pool used for allocating vy_task objects. */ struct mempool task_pool; - /** Queue of pending tasks, linked by vy_task::in_pending. */ - struct stailq pending_tasks; /** Queue of processed tasks, linked by vy_task::in_processed. */ struct stailq processed_tasks; /** - * Signaled to wake up a worker when there is - * a pending task in the input queue. Also used - * to stop worker threads on shutdown. - */ - pthread_cond_t worker_cond; - /** - * Mutex protecting input and output queues and - * the condition variable used to wake up worker - * threads. - */ - pthread_mutex_t mutex; - /** * Heap of LSM trees, ordered by dump priority, * linked by vy_lsm::in_dump. */ -- 2.11.0
next prev parent reply other threads:[~2018-07-27 11:29 UTC|newest] Thread overview: 39+ messages / expand[flat|nested] mbox.gz Atom feed top 2018-07-27 11:29 [PATCH 00/25] vinyl: eliminate disk read on REPLACE/DELETE Vladimir Davydov 2018-07-27 11:29 ` [PATCH 01/25] vinyl: make point lookup always return the latest tuple version Vladimir Davydov 2018-07-27 11:29 ` [PATCH 02/25] vinyl: simplify vy_squash_process Vladimir Davydov 2018-07-27 11:29 ` [PATCH 03/25] vinyl: always get full tuple from pk after reading from secondary index Vladimir Davydov 2018-07-27 11:29 ` [PATCH 04/25] vinyl: fold vy_replace_one and vy_replace_impl Vladimir Davydov 2018-07-27 11:29 ` [PATCH 05/25] vinyl: fold vy_delete_impl Vladimir Davydov 2018-07-27 11:29 ` [PATCH 06/25] vinyl: refactor unique check Vladimir Davydov 2018-07-27 11:29 ` [PATCH 07/25] vinyl: check key uniqueness before modifying tx write set Vladimir Davydov 2018-07-27 11:29 ` [PATCH 08/25] vinyl: remove env argument of vy_check_is_unique_{primary,secondary} Vladimir Davydov 2018-07-31 20:45 ` [tarantool-patches] " Konstantin Osipov 2018-07-27 11:29 ` [PATCH 09/25] vinyl: store full tuples in secondary index cache Vladimir Davydov 2018-07-31 20:47 ` Konstantin Osipov 2018-07-27 11:29 ` [PATCH 10/25] vinyl: do not free pending tasks on shutdown Vladimir Davydov 2018-07-31 20:48 ` Konstantin Osipov 2018-07-27 11:29 ` [PATCH 11/25] vinyl: store pointer to scheduler in struct vy_task Vladimir Davydov 2018-07-31 20:49 ` Konstantin Osipov 2018-07-27 11:29 ` [PATCH 12/25] vinyl: rename some members of vy_scheduler and vy_task struct Vladimir Davydov 2018-07-27 11:29 ` Vladimir Davydov [this message] 2018-07-27 11:29 ` [PATCH 14/25] vinyl: zap vy_scheduler::is_worker_pool_running Vladimir Davydov 2018-07-27 11:29 ` [PATCH 15/25] vinyl: rename vy_task::status to is_failed Vladimir Davydov 2018-07-27 11:29 ` [PATCH 16/25] xrow: allow to store flags in DML requests Vladimir Davydov 2018-07-27 11:29 ` [PATCH 17/25] vinyl: pin last statement returned by write iterator explicitly Vladimir Davydov 2018-07-27 11:29 ` [PATCH 18/25] vinyl: teach write iterator to return overwritten tuples Vladimir Davydov 2018-07-27 11:29 ` [PATCH 19/25] vinyl: prepare write iterator heap comparator for deferred DELETEs Vladimir Davydov 2018-07-27 11:30 ` [PATCH 20/25] vinyl: allow to skip certain statements on read Vladimir Davydov 2018-07-27 11:30 ` [PATCH 21/25] vinyl: add function to create surrogate deletes from raw msgpack Vladimir Davydov 2018-07-27 11:30 ` [PATCH 22/25] vinyl: remove pointless assertion from vy_stmt_new_surrogate_delete Vladimir Davydov 2018-07-27 11:30 ` [PATCH 23/25] txn: add helper to detect transaction boundaries Vladimir Davydov 2018-07-31 20:52 ` [tarantool-patches] " Konstantin Osipov 2018-07-27 11:30 ` [PATCH 24/25] Introduce _vinyl_deferred_delete system space Vladimir Davydov 2018-07-31 20:54 ` Konstantin Osipov 2018-08-01 14:00 ` Vladimir Davydov 2018-08-01 20:25 ` [tarantool-patches] " Konstantin Osipov 2018-08-02 9:43 ` Vladimir Davydov 2018-08-06 8:42 ` Vladimir Davydov 2018-07-27 11:30 ` [PATCH 25/25] vinyl: eliminate disk read on REPLACE/DELETE Vladimir Davydov 2018-07-31 20:55 ` Konstantin Osipov 2018-08-01 16:03 ` Vladimir Davydov 2018-08-01 16:51 ` Vladimir Davydov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=513069ac9272442697671bbff2c06ccc9698900a.1532689066.git.vdavydov.dev@gmail.com \ --to=vdavydov.dev@gmail.com \ --cc=kostja@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [PATCH 13/25] vinyl: use cbus for communication between scheduler and worker threads' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox