From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 4/8] vinyl: move worker allocation closer to task creation Date: Tue, 4 Sep 2018 20:23:47 +0300 Message-Id: In-Reply-To: References: <20180904115404.el6kdswgitsnopgf@esperanza> In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: Call vy_worker_pool_get() from vy_scheduler_peek_{dump,compaction} so that we can use different worker pools for dump and compaction tasks. --- src/box/vy_scheduler.c | 80 +++++++++++++++++++++++++++++++------------------- 1 file changed, 49 insertions(+), 31 deletions(-) diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c index 03126e5e..5d6960cd 100644 --- a/src/box/vy_scheduler.c +++ b/src/box/vy_scheduler.c @@ -229,8 +229,8 @@ vy_task_deferred_delete_iface; * does not free it from under us. */ static struct vy_task * -vy_task_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, - const struct vy_task_ops *ops) +vy_task_new(struct vy_scheduler *scheduler, struct vy_worker *worker, + struct vy_lsm *lsm, const struct vy_task_ops *ops) { struct vy_task *task = calloc(1, sizeof(*task)); if (task == NULL) { @@ -241,6 +241,7 @@ vy_task_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, memset(task, 0, sizeof(*task)); task->ops = ops; task->scheduler = scheduler; + task->worker = worker; task->lsm = lsm; task->cmp_def = key_def_dup(lsm->cmp_def); if (task->cmp_def == NULL) { @@ -1269,8 +1270,8 @@ vy_task_dump_abort(struct vy_task *task) * trees created at @scheduler->dump_generation. */ static int -vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, - struct vy_task **p_task) +vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_worker *worker, + struct vy_lsm *lsm, struct vy_task **p_task) { static struct vy_task_ops dump_ops = { .execute = vy_task_dump_execute, @@ -1323,7 +1324,7 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, return 0; } - struct vy_task *task = vy_task_new(scheduler, lsm, &dump_ops); + struct vy_task *task = vy_task_new(scheduler, worker, lsm, &dump_ops); if (task == NULL) goto err; @@ -1579,8 +1580,8 @@ vy_task_compact_abort(struct vy_task *task) } static int -vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, - struct vy_task **p_task) +vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_worker *worker, + struct vy_lsm *lsm, struct vy_task **p_task) { static struct vy_task_ops compact_ops = { .execute = vy_task_compact_execute, @@ -1604,7 +1605,7 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, return 0; } - struct vy_task *task = vy_task_new(scheduler, lsm, &compact_ops); + struct vy_task *task = vy_task_new(scheduler, worker, lsm, &compact_ops); if (task == NULL) goto err_task; @@ -1746,8 +1747,8 @@ vy_task_complete_f(struct cmsg *cmsg) /** * Create a task for dumping an LSM tree. The new task is returned - * in @ptask. If there's no LSM tree that needs to be dumped @ptask - * is set to NULL. + * in @ptask. If there's no LSM tree that needs to be dumped or all + * workers are currently busy, @ptask is set to NULL. * * We only dump an LSM tree if it needs to be snapshotted or the quota * on memory usage is exceeded. In either case, the oldest LSM tree @@ -1759,6 +1760,7 @@ vy_task_complete_f(struct cmsg *cmsg) static int vy_scheduler_peek_dump(struct vy_scheduler *scheduler, struct vy_task **ptask) { + struct vy_worker *worker = NULL; retry: *ptask = NULL; if (!vy_scheduler_dump_in_progress(scheduler)) { @@ -1766,7 +1768,7 @@ retry: * All memory trees of past generations have * been dumped, nothing to do. */ - return 0; + goto no_task; } /* * Look up the oldest LSM tree eligible for dump. @@ -1778,7 +1780,7 @@ retry: * Complete the current dump round. */ vy_scheduler_complete_dump(scheduler); - return 0; + goto no_task; } struct vy_lsm *lsm = container_of(pn, struct vy_lsm, in_dump); if (!lsm->is_dumping && lsm->pin_count == 0 && @@ -1788,8 +1790,15 @@ retry: * contains data that must be dumped at the current * round. Try to create a task for it. */ - if (vy_task_dump_new(scheduler, lsm, ptask) != 0) + if (worker == NULL) { + worker = vy_worker_pool_get(&scheduler->worker_pool); + if (worker == NULL) + return 0; /* all workers are busy */ + } + if (vy_task_dump_new(scheduler, worker, lsm, ptask) != 0) { + vy_worker_pool_put(&scheduler->worker_pool, worker); return -1; + } if (*ptask != NULL) return 0; /* new task */ /* @@ -1805,13 +1814,16 @@ retry: * is complete. */ assert(scheduler->dump_task_count > 0); +no_task: + if (worker != NULL) + vy_worker_pool_put(&scheduler->worker_pool, worker); return 0; } /** * Create a task for compacting a range. The new task is returned - * in @ptask. If there's no range that needs to be compacted @ptask - * is set to NULL. + * in @ptask. If there's no range that needs to be compacted or all + * workers are currently busy, @ptask is set to NULL. * * We compact ranges that have more runs in a level than specified * by run_count_per_level configuration option. Among those runs we @@ -1824,19 +1836,31 @@ static int vy_scheduler_peek_compact(struct vy_scheduler *scheduler, struct vy_task **ptask) { + struct vy_worker *worker = NULL; retry: *ptask = NULL; struct heap_node *pn = vy_compact_heap_top(&scheduler->compact_heap); if (pn == NULL) - return 0; /* nothing to do */ + goto no_task; /* nothing to do */ struct vy_lsm *lsm = container_of(pn, struct vy_lsm, in_compact); if (vy_lsm_compact_priority(lsm) <= 1) - return 0; /* nothing to do */ - if (vy_task_compact_new(scheduler, lsm, ptask) != 0) + goto no_task; /* nothing to do */ + if (worker == NULL) { + worker = vy_worker_pool_get(&scheduler->worker_pool); + if (worker == NULL) + return 0; /* all workers are busy */ + } + if (vy_task_compact_new(scheduler, worker, lsm, ptask) != 0) { + vy_worker_pool_put(&scheduler->worker_pool, worker); return -1; + } if (*ptask == NULL) goto retry; /* LSM tree dropped or range split/coalesced */ return 0; /* new task */ +no_task: + if (worker != NULL) + vy_worker_pool_put(&scheduler->worker_pool, worker); + return 0; } static int @@ -1966,19 +1990,17 @@ vy_scheduler_f(va_list va) /* Throttle for a while if a task failed. */ if (tasks_failed > 0) goto error; - /* All worker threads are busy. */ - if (scheduler->worker_pool.idle_worker_count == 0) - goto wait; /* Get a task to schedule. */ if (vy_schedule(scheduler, &task) != 0) goto error; - /* Nothing to do. */ - if (task == NULL) - goto wait; + /* Nothing to do or all workers are busy. */ + if (task == NULL) { + /* Wait for changes. */ + fiber_cond_wait(&scheduler->scheduler_cond); + continue; + } - /* Queue the task and notify workers if necessary. */ - task->worker = vy_worker_pool_get(&scheduler->worker_pool); - assert(task->worker != NULL); + /* Queue the task for execution. */ cmsg_init(&task->cmsg, vy_task_execute_route); cpipe_push(&task->worker->worker_pipe, &task->cmsg); @@ -2008,10 +2030,6 @@ error: scheduler->is_throttled = true; fiber_sleep(scheduler->timeout); scheduler->is_throttled = false; - continue; -wait: - /* Wait for changes */ - fiber_cond_wait(&scheduler->scheduler_cond); } return 0; -- 2.11.0