From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 3/8] vinyl: factor out worker pool from scheduler struct Date: Tue, 4 Sep 2018 20:23:46 +0300 Message-Id: <0f531f93b5c8acadeb5cd43e21af0c10117fa2db.1536080993.git.vdavydov.dev@gmail.com> In-Reply-To: References: <20180904115404.el6kdswgitsnopgf@esperanza> In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: A worker pool is an independent entity that provides the scheduler with worker threads on demand. Let's factor it out so that we can introduce separate pools for dump and compaction tasks. --- src/box/vy_scheduler.c | 104 ++++++++++++++++++++++++++++++++----------------- src/box/vy_scheduler.h | 23 ++++++----- 2 files changed, 82 insertions(+), 45 deletions(-) diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c index fc4fdf8c..03126e5e 100644 --- a/src/box/vy_scheduler.c +++ b/src/box/vy_scheduler.c @@ -93,7 +93,7 @@ struct vy_worker { * or NULL if the worker is idle. */ struct vy_task *task; - /** Link in vy_scheduler::idle_workers. */ + /** Link in vy_worker_pool::idle_workers. */ struct stailq_entry in_idle; /** Route for sending deferred DELETEs back to tx. */ struct cmsg_hop deferred_delete_route[2]; @@ -333,27 +333,25 @@ vy_compact_heap_less(struct heap_node *a, struct heap_node *b) #undef HEAP_NAME static void -vy_scheduler_start_workers(struct vy_scheduler *scheduler) +vy_worker_pool_start(struct vy_worker_pool *pool) { - assert(scheduler->worker_pool == NULL); + assert(pool->workers == NULL); /* One thread is reserved for dumps, see vy_schedule(). */ - assert(scheduler->worker_pool_size >= 2); + assert(pool->size >= 2); - scheduler->idle_worker_count = scheduler->worker_pool_size; - scheduler->worker_pool = calloc(scheduler->worker_pool_size, - sizeof(*scheduler->worker_pool)); - if (scheduler->worker_pool == NULL) + pool->idle_worker_count = pool->size; + pool->workers = calloc(pool->size, sizeof(*pool->workers)); + if (pool->workers == NULL) panic("failed to allocate vinyl worker pool"); - for (int i = 0; i < scheduler->worker_pool_size; i++) { + for (int i = 0; i < pool->size; i++) { char name[FIBER_NAME_MAX]; snprintf(name, sizeof(name), "vinyl.writer.%d", i); - struct vy_worker *worker = &scheduler->worker_pool[i]; + struct vy_worker *worker = &pool->workers[i]; if (cord_costart(&worker->cord, name, vy_worker_f, worker) != 0) panic("failed to start vinyl worker thread"); cpipe_create(&worker->worker_pipe, name); - stailq_add_tail_entry(&scheduler->idle_workers, - worker, in_idle); + stailq_add_tail_entry(&pool->idle_workers, worker, in_idle); struct cmsg_hop *route = worker->deferred_delete_route; route[0].f = vy_deferred_delete_batch_process_f; @@ -364,17 +362,60 @@ vy_scheduler_start_workers(struct vy_scheduler *scheduler) } static void -vy_scheduler_stop_workers(struct vy_scheduler *scheduler) +vy_worker_pool_stop(struct vy_worker_pool *pool) { - assert(scheduler->worker_pool != NULL); - for (int i = 0; i < scheduler->worker_pool_size; i++) { - struct vy_worker *worker = &scheduler->worker_pool[i]; + assert(pool->workers != NULL); + for (int i = 0; i < pool->size; i++) { + struct vy_worker *worker = &pool->workers[i]; cbus_stop_loop(&worker->worker_pipe); cpipe_destroy(&worker->worker_pipe); cord_join(&worker->cord); } - free(scheduler->worker_pool); - scheduler->worker_pool = NULL; + free(pool->workers); + pool->workers = NULL; +} + +static void +vy_worker_pool_create(struct vy_worker_pool *pool, int size) +{ + pool->size = size; + pool->workers = NULL; + stailq_create(&pool->idle_workers); + pool->idle_worker_count = 0; +} + +static void +vy_worker_pool_destroy(struct vy_worker_pool *pool) +{ + if (pool->workers != NULL) + vy_worker_pool_stop(pool); +} + +/** + * Get an idle worker from a pool. + */ +static struct vy_worker * +vy_worker_pool_get(struct vy_worker_pool *pool) +{ + struct vy_worker *worker = NULL; + if (!stailq_empty(&pool->idle_workers)) { + assert(pool->idle_worker_count > 0); + pool->idle_worker_count--; + worker = stailq_shift_entry(&pool->idle_workers, + struct vy_worker, in_idle); + } + return worker; +} + +/** + * Put a worker back to a pool once it's done its job. + */ +static void +vy_worker_pool_put(struct vy_worker_pool *pool, struct vy_worker *worker) +{ + assert(pool->idle_worker_count < pool->size); + pool->idle_worker_count++; + stailq_add_entry(&pool->idle_workers, worker, in_idle); } void @@ -394,9 +435,7 @@ vy_scheduler_create(struct vy_scheduler *scheduler, int write_threads, panic("failed to allocate vinyl scheduler fiber"); fiber_cond_create(&scheduler->scheduler_cond); - - scheduler->worker_pool_size = write_threads; - stailq_create(&scheduler->idle_workers); + vy_worker_pool_create(&scheduler->worker_pool, write_threads); stailq_create(&scheduler->processed_tasks); vy_dump_heap_create(&scheduler->dump_heap); @@ -417,9 +456,7 @@ vy_scheduler_destroy(struct vy_scheduler *scheduler) fiber_cond_signal(&scheduler->dump_cond); fiber_cond_signal(&scheduler->scheduler_cond); - if (scheduler->worker_pool != NULL) - vy_scheduler_stop_workers(scheduler); - + vy_worker_pool_destroy(&scheduler->worker_pool); diag_destroy(&scheduler->diag); fiber_cond_destroy(&scheduler->dump_cond); fiber_cond_destroy(&scheduler->scheduler_cond); @@ -1812,7 +1849,7 @@ vy_schedule(struct vy_scheduler *scheduler, struct vy_task **ptask) if (*ptask != NULL) return 0; - if (scheduler->idle_worker_count <= 1) { + if (scheduler->worker_pool.idle_worker_count <= 1) { /* * If all worker threads are busy doing compaction * when we run out of quota, ongoing transactions will @@ -1888,7 +1925,7 @@ vy_scheduler_f(va_list va) if (scheduler->scheduler_fiber == NULL) return 0; /* destroyed */ - vy_scheduler_start_workers(scheduler); + vy_worker_pool_start(&scheduler->worker_pool); while (scheduler->scheduler_fiber != NULL) { struct stailq processed_tasks; @@ -1906,12 +1943,9 @@ vy_scheduler_f(va_list va) tasks_failed++; else tasks_done++; - stailq_add_entry(&scheduler->idle_workers, - task->worker, in_idle); + vy_worker_pool_put(&scheduler->worker_pool, + task->worker); vy_task_delete(task); - scheduler->idle_worker_count++; - assert(scheduler->idle_worker_count <= - scheduler->worker_pool_size); } /* * Reset the timeout if we managed to successfully @@ -1933,7 +1967,7 @@ vy_scheduler_f(va_list va) if (tasks_failed > 0) goto error; /* All worker threads are busy. */ - if (scheduler->idle_worker_count == 0) + if (scheduler->worker_pool.idle_worker_count == 0) goto wait; /* Get a task to schedule. */ if (vy_schedule(scheduler, &task) != 0) @@ -1943,10 +1977,8 @@ vy_scheduler_f(va_list va) goto wait; /* Queue the task and notify workers if necessary. */ - assert(!stailq_empty(&scheduler->idle_workers)); - task->worker = stailq_shift_entry(&scheduler->idle_workers, - struct vy_worker, in_idle); - scheduler->idle_worker_count--; + task->worker = vy_worker_pool_get(&scheduler->worker_pool); + assert(task->worker != NULL); cmsg_init(&task->cmsg, vy_task_execute_route); cpipe_push(&task->worker->worker_pipe, &task->cmsg); diff --git a/src/box/vy_scheduler.h b/src/box/vy_scheduler.h index db85711a..606f3b31 100644 --- a/src/box/vy_scheduler.h +++ b/src/box/vy_scheduler.h @@ -56,22 +56,27 @@ typedef void (*vy_scheduler_dump_complete_f)(struct vy_scheduler *scheduler, int64_t dump_generation, double dump_duration); +struct vy_worker_pool { + /** Number of worker threads in the pool. */ + int size; + /** Array of all worker threads in the pool. */ + struct vy_worker *workers; + /** List of workers that are currently idle. */ + struct stailq idle_workers; + /** Length of @idle_workers list. */ + int idle_worker_count; +}; + struct vy_scheduler { /** Scheduler fiber. */ struct fiber *scheduler_fiber; /** Used to wake up the scheduler fiber from TX. */ struct fiber_cond scheduler_cond; /** - * Array of worker threads used for performing - * dump/compaction tasks. + * Pool of threads used for performing dump/compaction + * tasks in the background. */ - struct vy_worker *worker_pool; - /** Total number of worker threads. */ - int worker_pool_size; - /** Number worker threads that are currently idle. */ - int idle_worker_count; - /** List of idle workers, linked by vy_worker::in_idle. */ - struct stailq idle_workers; + struct vy_worker_pool worker_pool; /** Queue of processed tasks, linked by vy_task::in_processed. */ struct stailq processed_tasks; /** -- 2.11.0