Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladimir Davydov <vdavydov.dev@gmail.com>
To: kostja@tarantool.org
Cc: tarantool-patches@freelists.org
Subject: [PATCH 3/8] vinyl: factor out worker pool from scheduler struct
Date: Tue,  4 Sep 2018 20:23:46 +0300	[thread overview]
Message-ID: <0f531f93b5c8acadeb5cd43e21af0c10117fa2db.1536080993.git.vdavydov.dev@gmail.com> (raw)
In-Reply-To: <cover.1536080993.git.vdavydov.dev@gmail.com>
In-Reply-To: <cover.1536080993.git.vdavydov.dev@gmail.com>

A worker pool is an independent entity that provides the scheduler with
worker threads on demand. Let's factor it out so that we can introduce
separate pools for dump and compaction tasks.
---
 src/box/vy_scheduler.c | 104 ++++++++++++++++++++++++++++++++-----------------
 src/box/vy_scheduler.h |  23 ++++++-----
 2 files changed, 82 insertions(+), 45 deletions(-)

diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index fc4fdf8c..03126e5e 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -93,7 +93,7 @@ struct vy_worker {
 	 * or NULL if the worker is idle.
 	 */
 	struct vy_task *task;
-	/** Link in vy_scheduler::idle_workers. */
+	/** Link in vy_worker_pool::idle_workers. */
 	struct stailq_entry in_idle;
 	/** Route for sending deferred DELETEs back to tx. */
 	struct cmsg_hop deferred_delete_route[2];
@@ -333,27 +333,25 @@ vy_compact_heap_less(struct heap_node *a, struct heap_node *b)
 #undef HEAP_NAME
 
 static void
-vy_scheduler_start_workers(struct vy_scheduler *scheduler)
+vy_worker_pool_start(struct vy_worker_pool *pool)
 {
-	assert(scheduler->worker_pool == NULL);
+	assert(pool->workers == NULL);
 	/* One thread is reserved for dumps, see vy_schedule(). */
-	assert(scheduler->worker_pool_size >= 2);
+	assert(pool->size >= 2);
 
-	scheduler->idle_worker_count = scheduler->worker_pool_size;
-	scheduler->worker_pool = calloc(scheduler->worker_pool_size,
-					sizeof(*scheduler->worker_pool));
-	if (scheduler->worker_pool == NULL)
+	pool->idle_worker_count = pool->size;
+	pool->workers = calloc(pool->size, sizeof(*pool->workers));
+	if (pool->workers == NULL)
 		panic("failed to allocate vinyl worker pool");
 
-	for (int i = 0; i < scheduler->worker_pool_size; i++) {
+	for (int i = 0; i < pool->size; i++) {
 		char name[FIBER_NAME_MAX];
 		snprintf(name, sizeof(name), "vinyl.writer.%d", i);
-		struct vy_worker *worker = &scheduler->worker_pool[i];
+		struct vy_worker *worker = &pool->workers[i];
 		if (cord_costart(&worker->cord, name, vy_worker_f, worker) != 0)
 			panic("failed to start vinyl worker thread");
 		cpipe_create(&worker->worker_pipe, name);
-		stailq_add_tail_entry(&scheduler->idle_workers,
-				      worker, in_idle);
+		stailq_add_tail_entry(&pool->idle_workers, worker, in_idle);
 
 		struct cmsg_hop *route = worker->deferred_delete_route;
 		route[0].f = vy_deferred_delete_batch_process_f;
@@ -364,17 +362,60 @@ vy_scheduler_start_workers(struct vy_scheduler *scheduler)
 }
 
 static void
-vy_scheduler_stop_workers(struct vy_scheduler *scheduler)
+vy_worker_pool_stop(struct vy_worker_pool *pool)
 {
-	assert(scheduler->worker_pool != NULL);
-	for (int i = 0; i < scheduler->worker_pool_size; i++) {
-		struct vy_worker *worker = &scheduler->worker_pool[i];
+	assert(pool->workers != NULL);
+	for (int i = 0; i < pool->size; i++) {
+		struct vy_worker *worker = &pool->workers[i];
 		cbus_stop_loop(&worker->worker_pipe);
 		cpipe_destroy(&worker->worker_pipe);
 		cord_join(&worker->cord);
 	}
-	free(scheduler->worker_pool);
-	scheduler->worker_pool = NULL;
+	free(pool->workers);
+	pool->workers = NULL;
+}
+
+static void
+vy_worker_pool_create(struct vy_worker_pool *pool, int size)
+{
+	pool->size = size;
+	pool->workers = NULL;
+	stailq_create(&pool->idle_workers);
+	pool->idle_worker_count = 0;
+}
+
+static void
+vy_worker_pool_destroy(struct vy_worker_pool *pool)
+{
+	if (pool->workers != NULL)
+		vy_worker_pool_stop(pool);
+}
+
+/**
+ * Get an idle worker from a pool.
+ */
+static struct vy_worker *
+vy_worker_pool_get(struct vy_worker_pool *pool)
+{
+	struct vy_worker *worker = NULL;
+	if (!stailq_empty(&pool->idle_workers)) {
+		assert(pool->idle_worker_count > 0);
+		pool->idle_worker_count--;
+		worker = stailq_shift_entry(&pool->idle_workers,
+					    struct vy_worker, in_idle);
+	}
+	return worker;
+}
+
+/**
+ * Put a worker back to a pool once it's done its job.
+ */
+static void
+vy_worker_pool_put(struct vy_worker_pool *pool, struct vy_worker *worker)
+{
+	assert(pool->idle_worker_count < pool->size);
+	pool->idle_worker_count++;
+	stailq_add_entry(&pool->idle_workers, worker, in_idle);
 }
 
 void
@@ -394,9 +435,7 @@ vy_scheduler_create(struct vy_scheduler *scheduler, int write_threads,
 		panic("failed to allocate vinyl scheduler fiber");
 
 	fiber_cond_create(&scheduler->scheduler_cond);
-
-	scheduler->worker_pool_size = write_threads;
-	stailq_create(&scheduler->idle_workers);
+	vy_worker_pool_create(&scheduler->worker_pool, write_threads);
 	stailq_create(&scheduler->processed_tasks);
 
 	vy_dump_heap_create(&scheduler->dump_heap);
@@ -417,9 +456,7 @@ vy_scheduler_destroy(struct vy_scheduler *scheduler)
 	fiber_cond_signal(&scheduler->dump_cond);
 	fiber_cond_signal(&scheduler->scheduler_cond);
 
-	if (scheduler->worker_pool != NULL)
-		vy_scheduler_stop_workers(scheduler);
-
+	vy_worker_pool_destroy(&scheduler->worker_pool);
 	diag_destroy(&scheduler->diag);
 	fiber_cond_destroy(&scheduler->dump_cond);
 	fiber_cond_destroy(&scheduler->scheduler_cond);
@@ -1812,7 +1849,7 @@ vy_schedule(struct vy_scheduler *scheduler, struct vy_task **ptask)
 	if (*ptask != NULL)
 		return 0;
 
-	if (scheduler->idle_worker_count <= 1) {
+	if (scheduler->worker_pool.idle_worker_count <= 1) {
 		/*
 		 * If all worker threads are busy doing compaction
 		 * when we run out of quota, ongoing transactions will
@@ -1888,7 +1925,7 @@ vy_scheduler_f(va_list va)
 	if (scheduler->scheduler_fiber == NULL)
 		return 0; /* destroyed */
 
-	vy_scheduler_start_workers(scheduler);
+	vy_worker_pool_start(&scheduler->worker_pool);
 
 	while (scheduler->scheduler_fiber != NULL) {
 		struct stailq processed_tasks;
@@ -1906,12 +1943,9 @@ vy_scheduler_f(va_list va)
 				tasks_failed++;
 			else
 				tasks_done++;
-			stailq_add_entry(&scheduler->idle_workers,
-					 task->worker, in_idle);
+			vy_worker_pool_put(&scheduler->worker_pool,
+					   task->worker);
 			vy_task_delete(task);
-			scheduler->idle_worker_count++;
-			assert(scheduler->idle_worker_count <=
-			       scheduler->worker_pool_size);
 		}
 		/*
 		 * Reset the timeout if we managed to successfully
@@ -1933,7 +1967,7 @@ vy_scheduler_f(va_list va)
 		if (tasks_failed > 0)
 			goto error;
 		/* All worker threads are busy. */
-		if (scheduler->idle_worker_count == 0)
+		if (scheduler->worker_pool.idle_worker_count == 0)
 			goto wait;
 		/* Get a task to schedule. */
 		if (vy_schedule(scheduler, &task) != 0)
@@ -1943,10 +1977,8 @@ vy_scheduler_f(va_list va)
 			goto wait;
 
 		/* Queue the task and notify workers if necessary. */
-		assert(!stailq_empty(&scheduler->idle_workers));
-		task->worker = stailq_shift_entry(&scheduler->idle_workers,
-						  struct vy_worker, in_idle);
-		scheduler->idle_worker_count--;
+		task->worker = vy_worker_pool_get(&scheduler->worker_pool);
+		assert(task->worker != NULL);
 		cmsg_init(&task->cmsg, vy_task_execute_route);
 		cpipe_push(&task->worker->worker_pipe, &task->cmsg);
 
diff --git a/src/box/vy_scheduler.h b/src/box/vy_scheduler.h
index db85711a..606f3b31 100644
--- a/src/box/vy_scheduler.h
+++ b/src/box/vy_scheduler.h
@@ -56,22 +56,27 @@ typedef void
 (*vy_scheduler_dump_complete_f)(struct vy_scheduler *scheduler,
 				int64_t dump_generation, double dump_duration);
 
+struct vy_worker_pool {
+	/** Number of worker threads in the pool. */
+	int size;
+	/** Array of all worker threads in the pool. */
+	struct vy_worker *workers;
+	/** List of workers that are currently idle. */
+	struct stailq idle_workers;
+	/** Length of @idle_workers list. */
+	int idle_worker_count;
+};
+
 struct vy_scheduler {
 	/** Scheduler fiber. */
 	struct fiber *scheduler_fiber;
 	/** Used to wake up the scheduler fiber from TX. */
 	struct fiber_cond scheduler_cond;
 	/**
-	 * Array of worker threads used for performing
-	 * dump/compaction tasks.
+	 * Pool of threads used for performing dump/compaction
+	 * tasks in the background.
 	 */
-	struct vy_worker *worker_pool;
-	/** Total number of worker threads. */
-	int worker_pool_size;
-	/** Number worker threads that are currently idle. */
-	int idle_worker_count;
-	/** List of idle workers, linked by vy_worker::in_idle. */
-	struct stailq idle_workers;
+	struct vy_worker_pool worker_pool;
 	/** Queue of processed tasks, linked by vy_task::in_processed. */
 	struct stailq processed_tasks;
 	/**
-- 
2.11.0

  parent reply	other threads:[~2018-09-04 17:23 UTC|newest]

Thread overview: 39+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-09-02 20:18 [PATCH 0/7] vinyl: improve stats for throttling Vladimir Davydov
2018-09-02 20:18 ` [PATCH 1/7] vinyl: fix accounting of secondary index cache statements Vladimir Davydov
2018-09-02 22:26   ` [tarantool-patches] " Konstantin Osipov
2018-09-02 20:18 ` [PATCH 2/7] vinyl: add global memory stats Vladimir Davydov
2018-09-02 22:27   ` [tarantool-patches] " Konstantin Osipov
2018-09-02 22:27   ` Konstantin Osipov
2018-09-03  8:10     ` Vladimir Davydov
2018-09-02 20:18 ` [PATCH 3/7] vinyl: add global disk stats Vladimir Davydov
2018-09-02 22:30   ` [tarantool-patches] " Konstantin Osipov
2018-09-02 20:18 ` [PATCH 4/7] vinyl: fix force compaction logic Vladimir Davydov
2018-09-02 20:18 ` [PATCH 5/7] vinyl: update compact priority usual way on range split/coalesce Vladimir Davydov
2018-09-02 20:18 ` [PATCH 6/7] vinyl: keep track of compaction queue length and debt Vladimir Davydov
2018-09-02 20:19 ` [PATCH 7/7] vinyl: keep track of disk idle time Vladimir Davydov
2018-09-04 11:54   ` Vladimir Davydov
2018-09-04 17:23     ` Vladimir Davydov
2018-09-04 17:23       ` [PATCH 1/8] vinyl: add helper to check whether dump is in progress Vladimir Davydov
2018-09-06  7:33         ` Konstantin Osipov
2018-09-04 17:23       ` [PATCH 2/8] vinyl: don't use mempool for allocating background tasks Vladimir Davydov
2018-09-06  7:33         ` Konstantin Osipov
2018-09-04 17:23       ` Vladimir Davydov [this message]
2018-09-06  7:34         ` [PATCH 3/8] vinyl: factor out worker pool from scheduler struct Konstantin Osipov
2018-09-04 17:23       ` [PATCH 4/8] vinyl: move worker allocation closer to task creation Vladimir Davydov
2018-09-06  7:35         ` Konstantin Osipov
2018-09-04 17:23       ` [PATCH 5/8] vinyl: use separate thread pools for dump and compaction tasks Vladimir Davydov
2018-09-06  7:37         ` Konstantin Osipov
2018-09-06  9:48           ` Vladimir Davydov
2018-09-06 10:32             ` Konstantin Osipov
2018-09-04 17:23       ` [PATCH 6/8] vinyl: zap vy_worker_pool::idle_worker_count Vladimir Davydov
2018-09-06  7:38         ` Konstantin Osipov
2018-09-04 17:23       ` [PATCH 7/8] vinyl: don't start scheduler fiber until local recovery is complete Vladimir Davydov
2018-09-06  7:39         ` Konstantin Osipov
2018-09-04 17:23       ` [PATCH 8/8] vinyl: keep track of thread pool idle ratio Vladimir Davydov
2018-09-06  7:49         ` Konstantin Osipov
2018-09-06  8:18           ` Vladimir Davydov
2018-09-06 10:26             ` Konstantin Osipov
2018-09-06 10:52               ` Vladimir Davydov
2018-09-06 10:57                 ` Konstantin Osipov
2018-09-06 11:59                   ` Vladimir Davydov
2018-09-09 11:41 ` [PATCH 0/7] vinyl: improve stats for throttling Vladimir Davydov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=0f531f93b5c8acadeb5cd43e21af0c10117fa2db.1536080993.git.vdavydov.dev@gmail.com \
    --to=vdavydov.dev@gmail.com \
    --cc=kostja@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [PATCH 3/8] vinyl: factor out worker pool from scheduler struct' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox