[PATCH 4/8] vinyl: move worker allocation closer to task creation

Vladimir Davydov vdavydov.dev at gmail.com
Tue Sep 4 20:23:47 MSK 2018


Call vy_worker_pool_get() from vy_scheduler_peek_{dump,compaction} so
that we can use different worker pools for dump and compaction tasks.
---
 src/box/vy_scheduler.c | 80 +++++++++++++++++++++++++++++++-------------------
 1 file changed, 49 insertions(+), 31 deletions(-)

diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 03126e5e..5d6960cd 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -229,8 +229,8 @@ vy_task_deferred_delete_iface;
  * does not free it from under us.
  */
 static struct vy_task *
-vy_task_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
-	    const struct vy_task_ops *ops)
+vy_task_new(struct vy_scheduler *scheduler, struct vy_worker *worker,
+	    struct vy_lsm *lsm, const struct vy_task_ops *ops)
 {
 	struct vy_task *task = calloc(1, sizeof(*task));
 	if (task == NULL) {
@@ -241,6 +241,7 @@ vy_task_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
 	memset(task, 0, sizeof(*task));
 	task->ops = ops;
 	task->scheduler = scheduler;
+	task->worker = worker;
 	task->lsm = lsm;
 	task->cmp_def = key_def_dup(lsm->cmp_def);
 	if (task->cmp_def == NULL) {
@@ -1269,8 +1270,8 @@ vy_task_dump_abort(struct vy_task *task)
  * trees created at @scheduler->dump_generation.
  */
 static int
-vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
-		 struct vy_task **p_task)
+vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_worker *worker,
+		 struct vy_lsm *lsm, struct vy_task **p_task)
 {
 	static struct vy_task_ops dump_ops = {
 		.execute = vy_task_dump_execute,
@@ -1323,7 +1324,7 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
 		return 0;
 	}
 
-	struct vy_task *task = vy_task_new(scheduler, lsm, &dump_ops);
+	struct vy_task *task = vy_task_new(scheduler, worker, lsm, &dump_ops);
 	if (task == NULL)
 		goto err;
 
@@ -1579,8 +1580,8 @@ vy_task_compact_abort(struct vy_task *task)
 }
 
 static int
-vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
-		    struct vy_task **p_task)
+vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_worker *worker,
+		    struct vy_lsm *lsm, struct vy_task **p_task)
 {
 	static struct vy_task_ops compact_ops = {
 		.execute = vy_task_compact_execute,
@@ -1604,7 +1605,7 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
 		return 0;
 	}
 
-	struct vy_task *task = vy_task_new(scheduler, lsm, &compact_ops);
+	struct vy_task *task = vy_task_new(scheduler, worker, lsm, &compact_ops);
 	if (task == NULL)
 		goto err_task;
 
@@ -1746,8 +1747,8 @@ vy_task_complete_f(struct cmsg *cmsg)
 
 /**
  * Create a task for dumping an LSM tree. The new task is returned
- * in @ptask. If there's no LSM tree that needs to be dumped @ptask
- * is set to NULL.
+ * in @ptask. If there's no LSM tree that needs to be dumped or all
+ * workers are currently busy, @ptask is set to NULL.
  *
  * We only dump an LSM tree if it needs to be snapshotted or the quota
  * on memory usage is exceeded. In either case, the oldest LSM tree
@@ -1759,6 +1760,7 @@ vy_task_complete_f(struct cmsg *cmsg)
 static int
 vy_scheduler_peek_dump(struct vy_scheduler *scheduler, struct vy_task **ptask)
 {
+	struct vy_worker *worker = NULL;
 retry:
 	*ptask = NULL;
 	if (!vy_scheduler_dump_in_progress(scheduler)) {
@@ -1766,7 +1768,7 @@ retry:
 		 * All memory trees of past generations have
 		 * been dumped, nothing to do.
 		 */
-		return 0;
+		goto no_task;
 	}
 	/*
 	 * Look up the oldest LSM tree eligible for dump.
@@ -1778,7 +1780,7 @@ retry:
 		 * Complete the current dump round.
 		 */
 		vy_scheduler_complete_dump(scheduler);
-		return 0;
+		goto no_task;
 	}
 	struct vy_lsm *lsm = container_of(pn, struct vy_lsm, in_dump);
 	if (!lsm->is_dumping && lsm->pin_count == 0 &&
@@ -1788,8 +1790,15 @@ retry:
 		 * contains data that must be dumped at the current
 		 * round. Try to create a task for it.
 		 */
-		if (vy_task_dump_new(scheduler, lsm, ptask) != 0)
+		if (worker == NULL) {
+			worker = vy_worker_pool_get(&scheduler->worker_pool);
+			if (worker == NULL)
+				return 0; /* all workers are busy */
+		}
+		if (vy_task_dump_new(scheduler, worker, lsm, ptask) != 0) {
+			vy_worker_pool_put(&scheduler->worker_pool, worker);
 			return -1;
+		}
 		if (*ptask != NULL)
 			return 0; /* new task */
 		/*
@@ -1805,13 +1814,16 @@ retry:
 	 * is complete.
 	 */
 	assert(scheduler->dump_task_count > 0);
+no_task:
+	if (worker != NULL)
+		vy_worker_pool_put(&scheduler->worker_pool, worker);
 	return 0;
 }
 
 /**
  * Create a task for compacting a range. The new task is returned
- * in @ptask. If there's no range that needs to be compacted @ptask
- * is set to NULL.
+ * in @ptask. If there's no range that needs to be compacted or all
+ * workers are currently busy, @ptask is set to NULL.
  *
  * We compact ranges that have more runs in a level than specified
  * by run_count_per_level configuration option. Among those runs we
@@ -1824,19 +1836,31 @@ static int
 vy_scheduler_peek_compact(struct vy_scheduler *scheduler,
 			  struct vy_task **ptask)
 {
+	struct vy_worker *worker = NULL;
 retry:
 	*ptask = NULL;
 	struct heap_node *pn = vy_compact_heap_top(&scheduler->compact_heap);
 	if (pn == NULL)
-		return 0; /* nothing to do */
+		goto no_task; /* nothing to do */
 	struct vy_lsm *lsm = container_of(pn, struct vy_lsm, in_compact);
 	if (vy_lsm_compact_priority(lsm) <= 1)
-		return 0; /* nothing to do */
-	if (vy_task_compact_new(scheduler, lsm, ptask) != 0)
+		goto no_task; /* nothing to do */
+	if (worker == NULL) {
+		worker = vy_worker_pool_get(&scheduler->worker_pool);
+		if (worker == NULL)
+			return 0; /* all workers are busy */
+	}
+	if (vy_task_compact_new(scheduler, worker, lsm, ptask) != 0) {
+		vy_worker_pool_put(&scheduler->worker_pool, worker);
 		return -1;
+	}
 	if (*ptask == NULL)
 		goto retry; /* LSM tree dropped or range split/coalesced */
 	return 0; /* new task */
+no_task:
+	if (worker != NULL)
+		vy_worker_pool_put(&scheduler->worker_pool, worker);
+	return 0;
 }
 
 static int
@@ -1966,19 +1990,17 @@ vy_scheduler_f(va_list va)
 		/* Throttle for a while if a task failed. */
 		if (tasks_failed > 0)
 			goto error;
-		/* All worker threads are busy. */
-		if (scheduler->worker_pool.idle_worker_count == 0)
-			goto wait;
 		/* Get a task to schedule. */
 		if (vy_schedule(scheduler, &task) != 0)
 			goto error;
-		/* Nothing to do. */
-		if (task == NULL)
-			goto wait;
+		/* Nothing to do or all workers are busy. */
+		if (task == NULL) {
+			/* Wait for changes. */
+			fiber_cond_wait(&scheduler->scheduler_cond);
+			continue;
+		}
 
-		/* Queue the task and notify workers if necessary. */
-		task->worker = vy_worker_pool_get(&scheduler->worker_pool);
-		assert(task->worker != NULL);
+		/* Queue the task for execution. */
 		cmsg_init(&task->cmsg, vy_task_execute_route);
 		cpipe_push(&task->worker->worker_pipe, &task->cmsg);
 
@@ -2008,10 +2030,6 @@ error:
 		scheduler->is_throttled = true;
 		fiber_sleep(scheduler->timeout);
 		scheduler->is_throttled = false;
-		continue;
-wait:
-		/* Wait for changes */
-		fiber_cond_wait(&scheduler->scheduler_cond);
 	}
 
 	return 0;
-- 
2.11.0




More information about the Tarantool-patches mailing list