[RFC PATCH 20/23] vinyl: use cbus for communication between scheduler and worker threads

Vladimir Davydov vdavydov.dev at gmail.com
Sun Jul 8 19:48:51 MSK 2018


We need cbus for forwarding deferred DELETE statements generated in a
worker thread during primary index compaction to the tx thread where
they can be inserted into secondary indexes. Since pthread mutex/cond
and cbus are incompatible by their nature, let's rework communication
channel between the tx and worker threads using cbus.

Needed for #2129
---
 src/box/vy_scheduler.c | 215 ++++++++++++++++++++++++++++++-------------------
 src/box/vy_scheduler.h |  25 +-----
 2 files changed, 134 insertions(+), 106 deletions(-)

diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 4d84f9bc..bd3ad4be 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -46,6 +46,7 @@
 #include "errinj.h"
 #include "fiber.h"
 #include "fiber_cond.h"
+#include "cbus.h"
 #include "salad/stailq.h"
 #include "say.h"
 #include "vy_lsm.h"
@@ -55,14 +56,34 @@
 #include "vy_run.h"
 #include "vy_write_iterator.h"
 #include "trivia/util.h"
-#include "tt_pthread.h"
 
 /* Min and max values for vy_scheduler::timeout. */
 #define VY_SCHEDULER_TIMEOUT_MIN	1
 #define VY_SCHEDULER_TIMEOUT_MAX	60
 
-static void *vy_worker_f(void *);
+static int vy_worker_f(va_list);
 static int vy_scheduler_f(va_list);
+static void vy_task_execute_f(struct cmsg *);
+static void vy_task_complete_f(struct cmsg *);
+
+static const struct cmsg_hop vy_task_execute_route[] = {
+	{ vy_task_execute_f, NULL },
+};
+
+static const struct cmsg_hop vy_task_complete_route[] = {
+	{ vy_task_complete_f, NULL },
+};
+
+/** Vinyl worker thread. */
+struct vy_worker {
+	struct cord cord;
+	/** Pipe from tx to the worker thread. */
+	struct cpipe worker_pipe;
+	/** Pipe from the worker thread to tx. */
+	struct cpipe tx_pipe;
+	/** Link in vy_scheduler::idle_workers. */
+	struct stailq_entry in_idle;
+};
 
 struct vy_task;
 
@@ -89,10 +110,22 @@ struct vy_task_ops {
 };
 
 struct vy_task {
+	/**
+	 * CBus message used for sending the task to/from
+	 * a worker thread.
+	 */
+	struct cmsg cmsg;
 	/** Virtual method table. */
 	const struct vy_task_ops *ops;
 	/** Pointer to the scheduler. */
 	struct vy_scheduler *scheduler;
+	/** Worker thread this task is assigned to. */
+	struct vy_worker *worker;
+	/**
+	 * Fiber that is currently executing this task in
+	 * a worker thread.
+	 */
+	struct fiber *fiber;
 	/** Return code of ->execute. */
 	int status;
 	/** If ->execute fails, the error is stored here. */
@@ -126,8 +159,6 @@ struct vy_task {
 	 */
 	double bloom_fpr;
 	int64_t page_size;
-	/** Link in vy_scheduler::pending_tasks. */
-	struct stailq_entry in_pending;
 	/** Link in vy_scheduler::processed_tasks. */
 	struct stailq_entry in_processed;
 };
@@ -241,16 +272,6 @@ vy_compact_heap_less(struct heap_node *a, struct heap_node *b)
 #undef HEAP_NAME
 
 static void
-vy_scheduler_async_cb(ev_loop *loop, struct ev_async *watcher, int events)
-{
-	(void)loop;
-	(void)events;
-	struct vy_scheduler *scheduler = container_of(watcher,
-			struct vy_scheduler, scheduler_async);
-	fiber_cond_signal(&scheduler->scheduler_cond);
-}
-
-static void
 vy_scheduler_start_workers(struct vy_scheduler *scheduler)
 {
 	assert(!scheduler->is_worker_pool_running);
@@ -260,17 +281,19 @@ vy_scheduler_start_workers(struct vy_scheduler *scheduler)
 	scheduler->is_worker_pool_running = true;
 	scheduler->idle_worker_count = scheduler->worker_pool_size;
 	scheduler->worker_pool = calloc(scheduler->worker_pool_size,
-					sizeof(struct cord));
+					sizeof(*scheduler->worker_pool));
 	if (scheduler->worker_pool == NULL)
 		panic("failed to allocate vinyl worker pool");
 
-	ev_async_start(scheduler->scheduler_loop, &scheduler->scheduler_async);
 	for (int i = 0; i < scheduler->worker_pool_size; i++) {
 		char name[FIBER_NAME_MAX];
 		snprintf(name, sizeof(name), "vinyl.writer.%d", i);
-		if (cord_start(&scheduler->worker_pool[i], name,
-				 vy_worker_f, scheduler) != 0)
+		struct vy_worker *worker = &scheduler->worker_pool[i];
+		if (cord_costart(&worker->cord, name, vy_worker_f, worker) != 0)
 			panic("failed to start vinyl worker thread");
+		cpipe_create(&worker->worker_pipe, name);
+		stailq_add_tail_entry(&scheduler->idle_workers,
+				      worker, in_idle);
 	}
 }
 
@@ -280,16 +303,12 @@ vy_scheduler_stop_workers(struct vy_scheduler *scheduler)
 	assert(scheduler->is_worker_pool_running);
 	scheduler->is_worker_pool_running = false;
 
-	/* Wake up worker threads. */
-	tt_pthread_mutex_lock(&scheduler->mutex);
-	pthread_cond_broadcast(&scheduler->worker_cond);
-	tt_pthread_mutex_unlock(&scheduler->mutex);
-
-	/* Wait for worker threads to exit. */
-	for (int i = 0; i < scheduler->worker_pool_size; i++)
-		cord_join(&scheduler->worker_pool[i]);
-	ev_async_stop(scheduler->scheduler_loop, &scheduler->scheduler_async);
-
+	for (int i = 0; i < scheduler->worker_pool_size; i++) {
+		struct vy_worker *worker = &scheduler->worker_pool[i];
+		cbus_stop_loop(&worker->worker_pipe);
+		cpipe_destroy(&worker->worker_pipe);
+		cord_join(&worker->cord);
+	}
 	free(scheduler->worker_pool);
 	scheduler->worker_pool = NULL;
 }
@@ -310,19 +329,14 @@ vy_scheduler_create(struct vy_scheduler *scheduler, int write_threads,
 	if (scheduler->scheduler_fiber == NULL)
 		panic("failed to allocate vinyl scheduler fiber");
 
-	scheduler->scheduler_loop = loop();
 	fiber_cond_create(&scheduler->scheduler_cond);
-	ev_async_init(&scheduler->scheduler_async, vy_scheduler_async_cb);
 
 	scheduler->worker_pool_size = write_threads;
 	mempool_create(&scheduler->task_pool, cord_slab_cache(),
 		       sizeof(struct vy_task));
-	stailq_create(&scheduler->pending_tasks);
+	stailq_create(&scheduler->idle_workers);
 	stailq_create(&scheduler->processed_tasks);
 
-	tt_pthread_cond_init(&scheduler->worker_cond, NULL);
-	tt_pthread_mutex_init(&scheduler->mutex, NULL);
-
 	vy_dump_heap_create(&scheduler->dump_heap);
 	vy_compact_heap_create(&scheduler->compact_heap);
 
@@ -344,9 +358,6 @@ vy_scheduler_destroy(struct vy_scheduler *scheduler)
 	if (scheduler->is_worker_pool_running)
 		vy_scheduler_stop_workers(scheduler);
 
-	tt_pthread_cond_destroy(&scheduler->worker_cond);
-	tt_pthread_mutex_destroy(&scheduler->mutex);
-
 	diag_destroy(&scheduler->diag);
 	mempool_destroy(&scheduler->task_pool);
 	fiber_cond_destroy(&scheduler->dump_cond);
@@ -647,6 +658,8 @@ vy_run_discard(struct vy_run *run)
 static int
 vy_task_write_run(struct vy_task *task)
 {
+	enum { YIELD_LOOPS = 32 };
+
 	struct vy_lsm *lsm = task->lsm;
 	struct vy_stmt_stream *wi = task->wi;
 
@@ -668,6 +681,7 @@ vy_task_write_run(struct vy_task *task)
 	if (wi->iface->start(wi) != 0)
 		goto fail_abort_writer;
 	int rc;
+	int loops = 0;
 	struct tuple *stmt = NULL;
 	while ((rc = wi->iface->next(wi, &stmt)) == 0 && stmt != NULL) {
 		inj = errinj(ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT, ERRINJ_DOUBLE);
@@ -678,7 +692,9 @@ vy_task_write_run(struct vy_task *task)
 		if (rc != 0)
 			break;
 
-		if (!task->scheduler->is_worker_pool_running) {
+		if (++loops % YIELD_LOOPS == 0)
+			fiber_sleep(0);
+		if (fiber_is_cancelled()) {
 			diag_set(FiberIsCancelled);
 			rc = -1;
 			break;
@@ -1316,6 +1332,62 @@ err_task:
 }
 
 /**
+ * Fiber function that actually executes a vinyl task.
+ * After finishing a task, it sends it back to tx.
+ */
+static int
+vy_task_f(va_list va)
+{
+	struct vy_task *task = va_arg(va, struct vy_task *);
+	task->status = task->ops->execute(task);
+	if (task->status != 0) {
+		struct diag *diag = diag_get();
+		assert(!diag_is_empty(diag));
+		diag_move(diag, &task->diag);
+	}
+	cmsg_init(&task->cmsg, vy_task_complete_route);
+	cpipe_push(&task->worker->tx_pipe, &task->cmsg);
+	task->fiber = NULL;
+	return 0;
+}
+
+/**
+ * Callback invoked by a worker thread upon receiving a task.
+ * It schedules a fiber which actually executes the task, so
+ * as not to block the event loop.
+ */
+static void
+vy_task_execute_f(struct cmsg *cmsg)
+{
+	struct vy_task *task = container_of(cmsg, struct vy_task, cmsg);
+	assert(task->fiber == NULL);
+	task->fiber = fiber_new("task", vy_task_f);
+	if (task->fiber == NULL) {
+		task->status = -1;
+		diag_move(diag_get(), &task->diag);
+		cmsg_init(&task->cmsg, vy_task_complete_route);
+		cpipe_push(&task->worker->tx_pipe, &task->cmsg);
+	} else {
+		fiber_start(task->fiber, task);
+	}
+}
+
+/**
+ * Callback invoked by the tx thread upon receiving an executed
+ * task from a worker thread. It adds the task to the processed
+ * task queue and wakes up the scheduler so that it can complete
+ * it.
+ */
+static void
+vy_task_complete_f(struct cmsg *cmsg)
+{
+	struct vy_task *task = container_of(cmsg, struct vy_task, cmsg);
+	stailq_add_tail_entry(&task->scheduler->processed_tasks,
+			      task, in_processed);
+	fiber_cond_signal(&task->scheduler->scheduler_cond);
+}
+
+/**
  * Create a task for dumping an LSM tree. The new task is returned
  * in @ptask. If there's no LSM tree that needs to be dumped @ptask
  * is set to NULL.
@@ -1503,13 +1575,10 @@ vy_scheduler_f(va_list va)
 		struct stailq processed_tasks;
 		struct vy_task *task, *next;
 		int tasks_failed = 0, tasks_done = 0;
-		bool was_empty;
 
 		/* Get the list of processed tasks. */
 		stailq_create(&processed_tasks);
-		tt_pthread_mutex_lock(&scheduler->mutex);
 		stailq_concat(&processed_tasks, &scheduler->processed_tasks);
-		tt_pthread_mutex_unlock(&scheduler->mutex);
 
 		/* Complete and delete all processed tasks. */
 		stailq_foreach_entry_safe(task, next, &processed_tasks,
@@ -1518,6 +1587,8 @@ vy_scheduler_f(va_list va)
 				tasks_failed++;
 			else
 				tasks_done++;
+			stailq_add_entry(&scheduler->idle_workers,
+					 task->worker, in_idle);
 			vy_task_delete(task);
 			scheduler->idle_worker_count++;
 			assert(scheduler->idle_worker_count <=
@@ -1553,15 +1624,13 @@ vy_scheduler_f(va_list va)
 			goto wait;
 
 		/* Queue the task and notify workers if necessary. */
-		tt_pthread_mutex_lock(&scheduler->mutex);
-		was_empty = stailq_empty(&scheduler->pending_tasks);
-		stailq_add_tail_entry(&scheduler->pending_tasks,
-				      task, in_pending);
-		if (was_empty)
-			tt_pthread_cond_signal(&scheduler->worker_cond);
-		tt_pthread_mutex_unlock(&scheduler->mutex);
-
+		assert(!stailq_empty(&scheduler->idle_workers));
+		task->worker = stailq_shift_entry(&scheduler->idle_workers,
+						  struct vy_worker, in_idle);
 		scheduler->idle_worker_count--;
+		cmsg_init(&task->cmsg, vy_task_execute_route);
+		cpipe_push(&task->worker->worker_pipe, &task->cmsg);
+
 		fiber_reschedule();
 		continue;
 error:
@@ -1597,41 +1666,17 @@ wait:
 	return 0;
 }
 
-static void *
-vy_worker_f(void *arg)
+static int
+vy_worker_f(va_list ap)
 {
-	struct vy_scheduler *scheduler = arg;
-	struct vy_task *task = NULL;
-
-	tt_pthread_mutex_lock(&scheduler->mutex);
-	while (scheduler->is_worker_pool_running) {
-		/* Wait for a task */
-		if (stailq_empty(&scheduler->pending_tasks)) {
-			/* Wake scheduler up if there are no more tasks */
-			ev_async_send(scheduler->scheduler_loop,
-				      &scheduler->scheduler_async);
-			tt_pthread_cond_wait(&scheduler->worker_cond,
-					     &scheduler->mutex);
-			continue;
-		}
-		task = stailq_shift_entry(&scheduler->pending_tasks,
-					  struct vy_task, in_pending);
-		tt_pthread_mutex_unlock(&scheduler->mutex);
-		assert(task != NULL);
-
-		/* Execute task */
-		task->status = task->ops->execute(task);
-		if (task->status != 0) {
-			struct diag *diag = diag_get();
-			assert(!diag_is_empty(diag));
-			diag_move(diag, &task->diag);
-		}
-
-		/* Return processed task to scheduler */
-		tt_pthread_mutex_lock(&scheduler->mutex);
-		stailq_add_tail_entry(&scheduler->processed_tasks,
-				      task, in_processed);
-	}
-	tt_pthread_mutex_unlock(&scheduler->mutex);
-	return NULL;
+	struct vy_worker *worker = va_arg(ap, struct vy_worker *);
+	struct cbus_endpoint endpoint;
+
+	cpipe_create(&worker->tx_pipe, "tx");
+	cbus_endpoint_create(&endpoint, cord_name(&worker->cord),
+			     fiber_schedule_cb, fiber());
+	cbus_loop(&endpoint);
+	cbus_endpoint_destroy(&endpoint, cbus_process);
+	cpipe_destroy(&worker->tx_pipe);
+	return 0;
 }
diff --git a/src/box/vy_scheduler.h b/src/box/vy_scheduler.h
index 284f666e..a235aa6f 100644
--- a/src/box/vy_scheduler.h
+++ b/src/box/vy_scheduler.h
@@ -42,16 +42,15 @@
 #define HEAP_FORWARD_DECLARATION
 #include "salad/heap.h"
 #include "salad/stailq.h"
-#include "tt_pthread.h"
 
 #if defined(__cplusplus)
 extern "C" {
 #endif /* defined(__cplusplus) */
 
-struct cord;
 struct fiber;
 struct vy_lsm;
 struct vy_run_env;
+struct vy_worker;
 struct vy_scheduler;
 
 typedef void
@@ -61,42 +60,26 @@ typedef void
 struct vy_scheduler {
 	/** Scheduler fiber. */
 	struct fiber *scheduler_fiber;
-	/** Scheduler event loop. */
-	struct ev_loop *scheduler_loop;
 	/** Used to wake up the scheduler fiber from TX. */
 	struct fiber_cond scheduler_cond;
-	/** Used to wake up the scheduler from a worker thread. */
-	struct ev_async scheduler_async;
 	/**
 	 * Array of worker threads used for performing
 	 * dump/compaction tasks.
 	 */
-	struct cord *worker_pool;
+	struct vy_worker *worker_pool;
 	/** Set if the worker threads are running. */
 	bool is_worker_pool_running;
 	/** Total number of worker threads. */
 	int worker_pool_size;
 	/** Number worker threads that are currently idle. */
 	int idle_worker_count;
+	/** List of idle workers, linked by vy_worker::in_idle. */
+	struct stailq idle_workers;
 	/** Memory pool used for allocating vy_task objects. */
 	struct mempool task_pool;
-	/** Queue of pending tasks, linked by vy_task::in_pending. */
-	struct stailq pending_tasks;
 	/** Queue of processed tasks, linked by vy_task::in_processed. */
 	struct stailq processed_tasks;
 	/**
-	 * Signaled to wake up a worker when there is
-	 * a pending task in the input queue. Also used
-	 * to stop worker threads on shutdown.
-	 */
-	pthread_cond_t worker_cond;
-	/**
-	 * Mutex protecting input and output queues and
-	 * the condition variable used to wake up worker
-	 * threads.
-	 */
-	pthread_mutex_t mutex;
-	/**
 	 * Heap of LSM trees, ordered by dump priority,
 	 * linked by vy_lsm::in_dump.
 	 */
-- 
2.11.0




More information about the Tarantool-patches mailing list