[Tarantool-patches] [PATCH v2 11/16] raft: move worker fiber from Raft library to box

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Nov 20 02:45:59 MSK 2020


Worker fiber is used by Raft library to perform yielding tasks
like WAL write, and simply long tasks like network broadcast. That
allows not to block the Raft state machine, and to collect
multiple updates during an event loop iteration to flush them all
at once.

While the worker fiber was inside Raft library, it wasn't possible
to use it for anything else. And that is exactly what is going to
be needed. The reason chain is quite long.

It all starts from that the elimination of all box appearances
from Raft library also includes relocation of
box_update_ro_summary().

The only place it can be moved to is box_raft_on_update trigger.

The trigger is currently called from the Raft worker fiber. It
means, that between Raft state update and trigger invocation there
is a yield. If box_update_ro_summary() would be blindly moved to
the trigger, users sometimes could observe miracles like instance
role being 'follower', but the node is still writable if it was a
leader before, because box_raft_on_update wasn't invoked yet, and
it didn't update RO summary.

Assume, the on_update triggers are invoked by Raft not in the
worker fiber, but right from the state machine. Then
box_update_ro_summary() would always follow a state change without
a yield.

However that creates another problem - the trigger also calls
box_clear_synchro_queue(), which yields. But on_update triggers
must not yield so as not to block the state machine.

This can be easily solved if it would be possible to schedule
box_clear_synchro_queue() from on_update trigger to be executed
later.

And after this patch it becomes possible, because the worker fiber
now can be used not only to handle Raft library async work, but
also for box-Raft async work, like the synchro queue clearance.

Part of #5303
---
 src/box/raft.c    | 65 ++++++++++++++++++++++++++++++++++-
 src/box/raftlib.c | 86 +++++++++++++----------------------------------
 src/box/raftlib.h | 13 +++++--
 3 files changed, 98 insertions(+), 66 deletions(-)

diff --git a/src/box/raft.c b/src/box/raft.c
index 8a034687b..0027230da 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -49,6 +49,15 @@ struct raft box_raft_global = {
  */
 static struct trigger box_raft_on_update;
 
+/**
+ * Worker fiber does all the asynchronous work, which may need yields and can be
+ * long. These are WAL writes, network broadcasts. That allows not to block the
+ * Raft state machine.
+ */
+static struct fiber *box_raft_worker = NULL;
+/** Flag installed each time when new work appears for the worker fiber. */
+static bool box_raft_has_work = false;
+
 static void
 box_raft_msg_to_request(const struct raft_msg *msg, struct raft_request *req)
 {
@@ -71,6 +80,59 @@ box_raft_request_to_msg(const struct raft_request *req, struct raft_msg *msg)
 	};
 }
 
+static int
+box_raft_worker_f(va_list args)
+{
+	(void)args;
+	struct raft *raft = fiber()->f_arg;
+	assert(raft == box_raft());
+	while (!fiber_is_cancelled()) {
+		box_raft_has_work = false;
+
+		raft_process_async(raft);
+
+		if (!box_raft_has_work)
+			fiber_yield();
+	}
+	return 0;
+}
+
+static void
+box_raft_schedule_async(struct raft *raft)
+{
+	assert(raft == box_raft());
+	if (box_raft_worker == NULL) {
+		box_raft_worker = fiber_new("raft_worker", box_raft_worker_f);
+		if (box_raft_worker == NULL) {
+			/*
+			 * XXX: should be handled properly, no need to panic.
+			 * The issue though is that most of the Raft state
+			 * machine functions are not supposed to fail, and also
+			 * they usually wakeup the fiber when their work is
+			 * finished. So it is too late to fail. On the other
+			 * hand it looks not so good to create the fiber when
+			 * Raft is initialized. Because then it will occupy
+			 * memory even if Raft is not used.
+			 */
+			diag_log();
+			panic("Could't create Raft worker fiber");
+			return;
+		}
+		box_raft_worker->f_arg = raft;
+		fiber_set_joinable(box_raft_worker, true);
+	}
+	/*
+	 * Don't wake the fiber if it writes something (not cancellable).
+	 * Otherwise it would be a spurious wakeup breaking the WAL write not
+	 * adapted to this. Also don't wakeup the current fiber - it leads to
+	 * undefined behaviour.
+	 */
+	if ((box_raft_worker->flags & FIBER_IS_CANCELLABLE) != 0 &&
+	    fiber() != box_raft_worker)
+		fiber_wakeup(box_raft_worker);
+	box_raft_has_work = true;
+}
+
 static int
 box_raft_on_update_f(struct trigger *trigger, void *event)
 {
@@ -242,6 +304,7 @@ box_raft_init(void)
 	static const struct raft_vtab box_raft_vtab = {
 		.broadcast = box_raft_broadcast,
 		.write = box_raft_write,
+		.schedule_async = box_raft_schedule_async,
 	};
 	raft_create(&box_raft_global, &box_raft_vtab);
 	trigger_create(&box_raft_on_update, box_raft_on_update_f, NULL, NULL);
@@ -255,7 +318,7 @@ box_raft_free(void)
 	 * Can't join the fiber, because the event loop is stopped already, and
 	 * yields are not allowed.
 	 */
-	box_raft_global.worker = NULL;
+	box_raft_worker = NULL;
 	raft_destroy(&box_raft_global);
 	/*
 	 * Invalidate so as box_raft() would fail if any usage attempt happens.
diff --git a/src/box/raftlib.c b/src/box/raftlib.c
index a1fca34cd..4457a784f 100644
--- a/src/box/raftlib.c
+++ b/src/box/raftlib.c
@@ -75,6 +75,23 @@ raft_write(struct raft *raft, const struct raft_msg *req)
 	raft->vtab->write(raft, req);
 }
 
+/**
+ * Schedule async work. The Raft node owner should eventually process the async
+ * events.
+ */
+static inline void
+raft_schedule_async(struct raft *raft)
+{
+	/*
+	 * The method is called from inside of the state machine, when yields
+	 * are not allowed for its simplicity.
+	 */
+	int csw = fiber()->csw;
+	raft->vtab->schedule_async(raft);
+	assert(csw == fiber()->csw);
+	(void)csw;
+}
+
 /**
  * Check if Raft is completely synced with disk. Meaning all its critical values
  * are in WAL. Only in that state the node can become a leader or a candidate.
@@ -140,13 +157,6 @@ raft_can_vote_for(const struct raft *raft, const struct vclock *v)
 	return cmp == 0 || cmp == 1;
 }
 
-/**
- * Wakeup the Raft worker fiber in order to do some async work. If the fiber
- * does not exist yet, it is created.
- */
-static void
-raft_worker_wakeup(struct raft *raft);
-
 /** Schedule broadcast of the complete Raft state to all the followers. */
 static void
 raft_schedule_broadcast(struct raft *raft);
@@ -568,13 +578,11 @@ raft_worker_handle_broadcast(struct raft *raft)
 	raft->is_broadcast_scheduled = false;
 }
 
-static int
-raft_worker_f(va_list args)
+void
+raft_process_async(struct raft *raft)
 {
-	(void)args;
-	struct raft *raft = fiber()->f_arg;
 	bool is_idle;
-	while (!fiber_is_cancelled()) {
+	do {
 		is_idle = true;
 		if (raft->is_write_in_progress) {
 			raft_worker_handle_io(raft);
@@ -584,14 +592,8 @@ raft_worker_f(va_list args)
 			raft_worker_handle_broadcast(raft);
 			is_idle = false;
 		}
-		if (is_idle) {
-			assert(raft_is_fully_on_disk(raft));
-			fiber_yield();
-		} else {
-			fiber_sleep(0);
-		}
-	}
-	return 0;
+	} while (!is_idle);
+	assert(raft_is_fully_on_disk(raft));
 }
 
 static void
@@ -601,7 +603,7 @@ raft_sm_pause_and_dump(struct raft *raft)
 	if (raft->is_write_in_progress)
 		return;
 	ev_timer_stop(loop(), &raft->timer);
-	raft_worker_wakeup(raft);
+	raft_schedule_async(raft);
 	raft->is_write_in_progress = true;
 }
 
@@ -962,45 +964,11 @@ raft_new_term(struct raft *raft)
 		raft_sm_schedule_new_term(raft, raft->volatile_term + 1);
 }
 
-static void
-raft_worker_wakeup(struct raft *raft)
-{
-	if (raft->worker == NULL) {
-		raft->worker = fiber_new("raft_worker", raft_worker_f);
-		if (raft->worker == NULL) {
-			/*
-			 * XXX: should be handled properly, no need to panic.
-			 * The issue though is that most of the Raft state
-			 * machine functions are not supposed to fail, and also
-			 * they usually wakeup the fiber when their work is
-			 * finished. So it is too late to fail. On the other
-			 * hand it looks not so good to create the fiber when
-			 * Raft is initialized. Because then it will occupy
-			 * memory even if Raft is not used.
-			 */
-			diag_log();
-			panic("Could't create Raft worker fiber");
-			return;
-		}
-		raft->worker->f_arg = raft;
-		fiber_set_joinable(raft->worker, true);
-	}
-	/*
-	 * Don't wake the fiber if it writes something (not cancellable).
-	 * Otherwise it would be a spurious wakeup breaking the WAL write not
-	 * adapted to this. Also don't wakeup the current fiber - it leads to
-	 * undefined behaviour.
-	 */
-	if ((raft->worker->flags & FIBER_IS_CANCELLABLE) != 0 &&
-	    fiber() != raft->worker)
-		fiber_wakeup(raft->worker);
-}
-
 static void
 raft_schedule_broadcast(struct raft *raft)
 {
 	raft->is_broadcast_scheduled = true;
-	raft_worker_wakeup(raft);
+	raft_schedule_async(raft);
 }
 
 void
@@ -1025,10 +993,4 @@ raft_destroy(struct raft *raft)
 {
 	ev_timer_stop(loop(), &raft->timer);
 	trigger_destroy(&raft->on_update);
-	if (raft->worker != NULL) {
-		raft_worker_wakeup(raft);
-		fiber_cancel(raft->worker);
-		fiber_join(raft->worker);
-		raft->worker = NULL;
-	}
 }
diff --git a/src/box/raftlib.h b/src/box/raftlib.h
index 4f4d24ca8..f545224a5 100644
--- a/src/box/raftlib.h
+++ b/src/box/raftlib.h
@@ -68,7 +68,6 @@ extern "C" {
  * than the configured one. See more details in the code.
  */
 
-struct fiber;
 struct raft;
 
 enum raft_state {
@@ -120,6 +119,7 @@ struct raft_msg {
 
 typedef void (*raft_broadcast_f)(struct raft *raft, const struct raft_msg *req);
 typedef void (*raft_write_f)(struct raft *raft, const struct raft_msg *req);
+typedef void (*raft_schedule_async_f)(struct raft *raft);
 
 /**
  * Raft connection to the environment, via which it talks to other nodes and
@@ -130,6 +130,11 @@ struct raft_vtab {
 	raft_broadcast_f broadcast;
 	/** Save a message to disk. */
 	raft_write_f write;
+	/**
+	 * Schedule asynchronous work which may yield, and it can't be done
+	 * right now.
+	 */
+	raft_schedule_async_f schedule_async;
 };
 
 struct raft {
@@ -203,8 +208,6 @@ struct raft {
 	const struct vclock *vclock;
 	/** State machine timed event trigger. */
 	struct ev_timer timer;
-	/** Worker fiber to execute blocking tasks like IO. */
-	struct fiber *worker;
 	/** Configured election timeout in seconds. */
 	double election_timeout;
 	/**
@@ -255,6 +258,10 @@ int
 raft_process_msg(struct raft *raft, const struct raft_msg *req,
 		 uint32_t source);
 
+/** Process all asynchronous events accumulated by Raft. */
+void
+raft_process_async(struct raft *raft);
+
 /**
  * Process a heartbeat message from an instance with the given ID. It is used to
  * watch leader's health and start election when necessary.
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list