[Tarantool-patches] [PATCH v2 11/16] raft: move worker fiber from Raft library to box

Serge Petrenko sergepetrenko at tarantool.org
Fri Nov 20 12:06:11 MSK 2020


20.11.2020 02:45, Vladislav Shpilevoy пишет:
> Worker fiber is used by Raft library to perform yielding tasks
> like WAL write, and simply long tasks like network broadcast. That
> allows not to block the Raft state machine, and to collect
> multiple updates during an event loop iteration to flush them all
> at once.
>
> While the worker fiber was inside Raft library, it wasn't possible
> to use it for anything else. And that is exactly what is going to
> be needed. The reason chain is quite long.
>
> It all starts from that the elimination of all box appearances
> from Raft library also includes relocation of
> box_update_ro_summary().
>
> The only place it can be moved to is box_raft_on_update trigger.
>
> The trigger is currently called from the Raft worker fiber. It
> means, that between Raft state update and trigger invocation there
> is a yield. If box_update_ro_summary() would be blindly moved to
> the trigger, users sometimes could observe miracles like instance
> role being 'follower', but the node is still writable if it was a
> leader before, because box_raft_on_update wasn't invoked yet, and
> it didn't update RO summary.
>
> Assume, the on_update triggers are invoked by Raft not in the
> worker fiber, but right from the state machine. Then
> box_update_ro_summary() would always follow a state change without
> a yield.
>
> However that creates another problem - the trigger also calls
> box_clear_synchro_queue(), which yields. But on_update triggers
> must not yield so as not to block the state machine.
>
> This can be easily solved if it would be possible to schedule
> box_clear_synchro_queue() from on_update trigger to be executed
> later.
>
> And after this patch it becomes possible, because the worker fiber
> now can be used not only to handle Raft library async work, but
> also for box-Raft async work, like the synchro queue clearance.
>
> Part of #5303


Thanks  for the patch!

LGTM.

> ---
>   src/box/raft.c    | 65 ++++++++++++++++++++++++++++++++++-
>   src/box/raftlib.c | 86 +++++++++++++----------------------------------
>   src/box/raftlib.h | 13 +++++--
>   3 files changed, 98 insertions(+), 66 deletions(-)
>
> diff --git a/src/box/raft.c b/src/box/raft.c
> index 8a034687b..0027230da 100644
> --- a/src/box/raft.c
> +++ b/src/box/raft.c
> @@ -49,6 +49,15 @@ struct raft box_raft_global = {
>    */
>   static struct trigger box_raft_on_update;
>   
> +/**
> + * Worker fiber does all the asynchronous work, which may need yields and can be
> + * long. These are WAL writes, network broadcasts. That allows not to block the
> + * Raft state machine.
> + */
> +static struct fiber *box_raft_worker = NULL;
> +/** Flag installed each time when new work appears for the worker fiber. */
> +static bool box_raft_has_work = false;
> +
>   static void
>   box_raft_msg_to_request(const struct raft_msg *msg, struct raft_request *req)
>   {
> @@ -71,6 +80,59 @@ box_raft_request_to_msg(const struct raft_request *req, struct raft_msg *msg)
>   	};
>   }
>   
> +static int
> +box_raft_worker_f(va_list args)
> +{
> +	(void)args;
> +	struct raft *raft = fiber()->f_arg;
> +	assert(raft == box_raft());
> +	while (!fiber_is_cancelled()) {
> +		box_raft_has_work = false;
> +
> +		raft_process_async(raft);
> +
> +		if (!box_raft_has_work)
> +			fiber_yield();
> +	}
> +	return 0;
> +}
> +
> +static void
> +box_raft_schedule_async(struct raft *raft)
> +{
> +	assert(raft == box_raft());
> +	if (box_raft_worker == NULL) {
> +		box_raft_worker = fiber_new("raft_worker", box_raft_worker_f);
> +		if (box_raft_worker == NULL) {
> +			/*
> +			 * XXX: should be handled properly, no need to panic.
> +			 * The issue though is that most of the Raft state
> +			 * machine functions are not supposed to fail, and also
> +			 * they usually wakeup the fiber when their work is
> +			 * finished. So it is too late to fail. On the other
> +			 * hand it looks not so good to create the fiber when
> +			 * Raft is initialized. Because then it will occupy
> +			 * memory even if Raft is not used.
> +			 */
> +			diag_log();
> +			panic("Could't create Raft worker fiber");
> +			return;
> +		}
> +		box_raft_worker->f_arg = raft;
> +		fiber_set_joinable(box_raft_worker, true);
> +	}
> +	/*
> +	 * Don't wake the fiber if it writes something (not cancellable).
> +	 * Otherwise it would be a spurious wakeup breaking the WAL write not
> +	 * adapted to this. Also don't wakeup the current fiber - it leads to
> +	 * undefined behaviour.
> +	 */
> +	if ((box_raft_worker->flags & FIBER_IS_CANCELLABLE) != 0 &&
> +	    fiber() != box_raft_worker)
> +		fiber_wakeup(box_raft_worker);
> +	box_raft_has_work = true;
> +}
> +
>   static int
>   box_raft_on_update_f(struct trigger *trigger, void *event)
>   {
> @@ -242,6 +304,7 @@ box_raft_init(void)
>   	static const struct raft_vtab box_raft_vtab = {
>   		.broadcast = box_raft_broadcast,
>   		.write = box_raft_write,
> +		.schedule_async = box_raft_schedule_async,
>   	};
>   	raft_create(&box_raft_global, &box_raft_vtab);
>   	trigger_create(&box_raft_on_update, box_raft_on_update_f, NULL, NULL);
> @@ -255,7 +318,7 @@ box_raft_free(void)
>   	 * Can't join the fiber, because the event loop is stopped already, and
>   	 * yields are not allowed.
>   	 */
> -	box_raft_global.worker = NULL;
> +	box_raft_worker = NULL;
>   	raft_destroy(&box_raft_global);
>   	/*
>   	 * Invalidate so as box_raft() would fail if any usage attempt happens.
> diff --git a/src/box/raftlib.c b/src/box/raftlib.c
> index a1fca34cd..4457a784f 100644
> --- a/src/box/raftlib.c
> +++ b/src/box/raftlib.c
> @@ -75,6 +75,23 @@ raft_write(struct raft *raft, const struct raft_msg *req)
>   	raft->vtab->write(raft, req);
>   }
>   
> +/**
> + * Schedule async work. The Raft node owner should eventually process the async
> + * events.
> + */
> +static inline void
> +raft_schedule_async(struct raft *raft)
> +{
> +	/*
> +	 * The method is called from inside of the state machine, when yields
> +	 * are not allowed for its simplicity.
> +	 */
> +	int csw = fiber()->csw;
> +	raft->vtab->schedule_async(raft);
> +	assert(csw == fiber()->csw);
> +	(void)csw;
> +}
> +
>   /**
>    * Check if Raft is completely synced with disk. Meaning all its critical values
>    * are in WAL. Only in that state the node can become a leader or a candidate.
> @@ -140,13 +157,6 @@ raft_can_vote_for(const struct raft *raft, const struct vclock *v)
>   	return cmp == 0 || cmp == 1;
>   }
>   
> -/**
> - * Wakeup the Raft worker fiber in order to do some async work. If the fiber
> - * does not exist yet, it is created.
> - */
> -static void
> -raft_worker_wakeup(struct raft *raft);
> -
>   /** Schedule broadcast of the complete Raft state to all the followers. */
>   static void
>   raft_schedule_broadcast(struct raft *raft);
> @@ -568,13 +578,11 @@ raft_worker_handle_broadcast(struct raft *raft)
>   	raft->is_broadcast_scheduled = false;
>   }
>   
> -static int
> -raft_worker_f(va_list args)
> +void
> +raft_process_async(struct raft *raft)
>   {
> -	(void)args;
> -	struct raft *raft = fiber()->f_arg;
>   	bool is_idle;
> -	while (!fiber_is_cancelled()) {
> +	do {
>   		is_idle = true;
>   		if (raft->is_write_in_progress) {
>   			raft_worker_handle_io(raft);
> @@ -584,14 +592,8 @@ raft_worker_f(va_list args)
>   			raft_worker_handle_broadcast(raft);
>   			is_idle = false;
>   		}
> -		if (is_idle) {
> -			assert(raft_is_fully_on_disk(raft));
> -			fiber_yield();
> -		} else {
> -			fiber_sleep(0);
> -		}
> -	}
> -	return 0;
> +	} while (!is_idle);
> +	assert(raft_is_fully_on_disk(raft));
>   }
>   
>   static void
> @@ -601,7 +603,7 @@ raft_sm_pause_and_dump(struct raft *raft)
>   	if (raft->is_write_in_progress)
>   		return;
>   	ev_timer_stop(loop(), &raft->timer);
> -	raft_worker_wakeup(raft);
> +	raft_schedule_async(raft);
>   	raft->is_write_in_progress = true;
>   }
>   
> @@ -962,45 +964,11 @@ raft_new_term(struct raft *raft)
>   		raft_sm_schedule_new_term(raft, raft->volatile_term + 1);
>   }
>   
> -static void
> -raft_worker_wakeup(struct raft *raft)
> -{
> -	if (raft->worker == NULL) {
> -		raft->worker = fiber_new("raft_worker", raft_worker_f);
> -		if (raft->worker == NULL) {
> -			/*
> -			 * XXX: should be handled properly, no need to panic.
> -			 * The issue though is that most of the Raft state
> -			 * machine functions are not supposed to fail, and also
> -			 * they usually wakeup the fiber when their work is
> -			 * finished. So it is too late to fail. On the other
> -			 * hand it looks not so good to create the fiber when
> -			 * Raft is initialized. Because then it will occupy
> -			 * memory even if Raft is not used.
> -			 */
> -			diag_log();
> -			panic("Could't create Raft worker fiber");
> -			return;
> -		}
> -		raft->worker->f_arg = raft;
> -		fiber_set_joinable(raft->worker, true);
> -	}
> -	/*
> -	 * Don't wake the fiber if it writes something (not cancellable).
> -	 * Otherwise it would be a spurious wakeup breaking the WAL write not
> -	 * adapted to this. Also don't wakeup the current fiber - it leads to
> -	 * undefined behaviour.
> -	 */
> -	if ((raft->worker->flags & FIBER_IS_CANCELLABLE) != 0 &&
> -	    fiber() != raft->worker)
> -		fiber_wakeup(raft->worker);
> -}
> -
>   static void
>   raft_schedule_broadcast(struct raft *raft)
>   {
>   	raft->is_broadcast_scheduled = true;
> -	raft_worker_wakeup(raft);
> +	raft_schedule_async(raft);
>   }
>   
>   void
> @@ -1025,10 +993,4 @@ raft_destroy(struct raft *raft)
>   {
>   	ev_timer_stop(loop(), &raft->timer);
>   	trigger_destroy(&raft->on_update);
> -	if (raft->worker != NULL) {
> -		raft_worker_wakeup(raft);
> -		fiber_cancel(raft->worker);
> -		fiber_join(raft->worker);
> -		raft->worker = NULL;
> -	}
>   }
> diff --git a/src/box/raftlib.h b/src/box/raftlib.h
> index 4f4d24ca8..f545224a5 100644
> --- a/src/box/raftlib.h
> +++ b/src/box/raftlib.h
> @@ -68,7 +68,6 @@ extern "C" {
>    * than the configured one. See more details in the code.
>    */
>   
> -struct fiber;
>   struct raft;
>   
>   enum raft_state {
> @@ -120,6 +119,7 @@ struct raft_msg {
>   
>   typedef void (*raft_broadcast_f)(struct raft *raft, const struct raft_msg *req);
>   typedef void (*raft_write_f)(struct raft *raft, const struct raft_msg *req);
> +typedef void (*raft_schedule_async_f)(struct raft *raft);
>   
>   /**
>    * Raft connection to the environment, via which it talks to other nodes and
> @@ -130,6 +130,11 @@ struct raft_vtab {
>   	raft_broadcast_f broadcast;
>   	/** Save a message to disk. */
>   	raft_write_f write;
> +	/**
> +	 * Schedule asynchronous work which may yield, and it can't be done
> +	 * right now.
> +	 */
> +	raft_schedule_async_f schedule_async;
>   };
>   
>   struct raft {
> @@ -203,8 +208,6 @@ struct raft {
>   	const struct vclock *vclock;
>   	/** State machine timed event trigger. */
>   	struct ev_timer timer;
> -	/** Worker fiber to execute blocking tasks like IO. */
> -	struct fiber *worker;
>   	/** Configured election timeout in seconds. */
>   	double election_timeout;
>   	/**
> @@ -255,6 +258,10 @@ int
>   raft_process_msg(struct raft *raft, const struct raft_msg *req,
>   		 uint32_t source);
>   
> +/** Process all asynchronous events accumulated by Raft. */
> +void
> +raft_process_async(struct raft *raft);
> +
>   /**
>    * Process a heartbeat message from an instance with the given ID. It is used to
>    * watch leader's health and start election when necessary.

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list