[Tarantool-patches] [PATCH v2 11/16] raft: move worker fiber from Raft library to box
Serge Petrenko
sergepetrenko at tarantool.org
Fri Nov 20 12:06:11 MSK 2020
20.11.2020 02:45, Vladislav Shpilevoy пишет:
> Worker fiber is used by Raft library to perform yielding tasks
> like WAL write, and simply long tasks like network broadcast. That
> allows not to block the Raft state machine, and to collect
> multiple updates during an event loop iteration to flush them all
> at once.
>
> While the worker fiber was inside Raft library, it wasn't possible
> to use it for anything else. And that is exactly what is going to
> be needed. The reason chain is quite long.
>
> It all starts from that the elimination of all box appearances
> from Raft library also includes relocation of
> box_update_ro_summary().
>
> The only place it can be moved to is box_raft_on_update trigger.
>
> The trigger is currently called from the Raft worker fiber. It
> means, that between Raft state update and trigger invocation there
> is a yield. If box_update_ro_summary() would be blindly moved to
> the trigger, users sometimes could observe miracles like instance
> role being 'follower', but the node is still writable if it was a
> leader before, because box_raft_on_update wasn't invoked yet, and
> it didn't update RO summary.
>
> Assume, the on_update triggers are invoked by Raft not in the
> worker fiber, but right from the state machine. Then
> box_update_ro_summary() would always follow a state change without
> a yield.
>
> However that creates another problem - the trigger also calls
> box_clear_synchro_queue(), which yields. But on_update triggers
> must not yield so as not to block the state machine.
>
> This can be easily solved if it would be possible to schedule
> box_clear_synchro_queue() from on_update trigger to be executed
> later.
>
> And after this patch it becomes possible, because the worker fiber
> now can be used not only to handle Raft library async work, but
> also for box-Raft async work, like the synchro queue clearance.
>
> Part of #5303
Thanks for the patch!
LGTM.
> ---
> src/box/raft.c | 65 ++++++++++++++++++++++++++++++++++-
> src/box/raftlib.c | 86 +++++++++++++----------------------------------
> src/box/raftlib.h | 13 +++++--
> 3 files changed, 98 insertions(+), 66 deletions(-)
>
> diff --git a/src/box/raft.c b/src/box/raft.c
> index 8a034687b..0027230da 100644
> --- a/src/box/raft.c
> +++ b/src/box/raft.c
> @@ -49,6 +49,15 @@ struct raft box_raft_global = {
> */
> static struct trigger box_raft_on_update;
>
> +/**
> + * Worker fiber does all the asynchronous work, which may need yields and can be
> + * long. These are WAL writes, network broadcasts. That allows not to block the
> + * Raft state machine.
> + */
> +static struct fiber *box_raft_worker = NULL;
> +/** Flag installed each time when new work appears for the worker fiber. */
> +static bool box_raft_has_work = false;
> +
> static void
> box_raft_msg_to_request(const struct raft_msg *msg, struct raft_request *req)
> {
> @@ -71,6 +80,59 @@ box_raft_request_to_msg(const struct raft_request *req, struct raft_msg *msg)
> };
> }
>
> +static int
> +box_raft_worker_f(va_list args)
> +{
> + (void)args;
> + struct raft *raft = fiber()->f_arg;
> + assert(raft == box_raft());
> + while (!fiber_is_cancelled()) {
> + box_raft_has_work = false;
> +
> + raft_process_async(raft);
> +
> + if (!box_raft_has_work)
> + fiber_yield();
> + }
> + return 0;
> +}
> +
> +static void
> +box_raft_schedule_async(struct raft *raft)
> +{
> + assert(raft == box_raft());
> + if (box_raft_worker == NULL) {
> + box_raft_worker = fiber_new("raft_worker", box_raft_worker_f);
> + if (box_raft_worker == NULL) {
> + /*
> + * XXX: should be handled properly, no need to panic.
> + * The issue though is that most of the Raft state
> + * machine functions are not supposed to fail, and also
> + * they usually wakeup the fiber when their work is
> + * finished. So it is too late to fail. On the other
> + * hand it looks not so good to create the fiber when
> + * Raft is initialized. Because then it will occupy
> + * memory even if Raft is not used.
> + */
> + diag_log();
> + panic("Could't create Raft worker fiber");
> + return;
> + }
> + box_raft_worker->f_arg = raft;
> + fiber_set_joinable(box_raft_worker, true);
> + }
> + /*
> + * Don't wake the fiber if it writes something (not cancellable).
> + * Otherwise it would be a spurious wakeup breaking the WAL write not
> + * adapted to this. Also don't wakeup the current fiber - it leads to
> + * undefined behaviour.
> + */
> + if ((box_raft_worker->flags & FIBER_IS_CANCELLABLE) != 0 &&
> + fiber() != box_raft_worker)
> + fiber_wakeup(box_raft_worker);
> + box_raft_has_work = true;
> +}
> +
> static int
> box_raft_on_update_f(struct trigger *trigger, void *event)
> {
> @@ -242,6 +304,7 @@ box_raft_init(void)
> static const struct raft_vtab box_raft_vtab = {
> .broadcast = box_raft_broadcast,
> .write = box_raft_write,
> + .schedule_async = box_raft_schedule_async,
> };
> raft_create(&box_raft_global, &box_raft_vtab);
> trigger_create(&box_raft_on_update, box_raft_on_update_f, NULL, NULL);
> @@ -255,7 +318,7 @@ box_raft_free(void)
> * Can't join the fiber, because the event loop is stopped already, and
> * yields are not allowed.
> */
> - box_raft_global.worker = NULL;
> + box_raft_worker = NULL;
> raft_destroy(&box_raft_global);
> /*
> * Invalidate so as box_raft() would fail if any usage attempt happens.
> diff --git a/src/box/raftlib.c b/src/box/raftlib.c
> index a1fca34cd..4457a784f 100644
> --- a/src/box/raftlib.c
> +++ b/src/box/raftlib.c
> @@ -75,6 +75,23 @@ raft_write(struct raft *raft, const struct raft_msg *req)
> raft->vtab->write(raft, req);
> }
>
> +/**
> + * Schedule async work. The Raft node owner should eventually process the async
> + * events.
> + */
> +static inline void
> +raft_schedule_async(struct raft *raft)
> +{
> + /*
> + * The method is called from inside of the state machine, when yields
> + * are not allowed for its simplicity.
> + */
> + int csw = fiber()->csw;
> + raft->vtab->schedule_async(raft);
> + assert(csw == fiber()->csw);
> + (void)csw;
> +}
> +
> /**
> * Check if Raft is completely synced with disk. Meaning all its critical values
> * are in WAL. Only in that state the node can become a leader or a candidate.
> @@ -140,13 +157,6 @@ raft_can_vote_for(const struct raft *raft, const struct vclock *v)
> return cmp == 0 || cmp == 1;
> }
>
> -/**
> - * Wakeup the Raft worker fiber in order to do some async work. If the fiber
> - * does not exist yet, it is created.
> - */
> -static void
> -raft_worker_wakeup(struct raft *raft);
> -
> /** Schedule broadcast of the complete Raft state to all the followers. */
> static void
> raft_schedule_broadcast(struct raft *raft);
> @@ -568,13 +578,11 @@ raft_worker_handle_broadcast(struct raft *raft)
> raft->is_broadcast_scheduled = false;
> }
>
> -static int
> -raft_worker_f(va_list args)
> +void
> +raft_process_async(struct raft *raft)
> {
> - (void)args;
> - struct raft *raft = fiber()->f_arg;
> bool is_idle;
> - while (!fiber_is_cancelled()) {
> + do {
> is_idle = true;
> if (raft->is_write_in_progress) {
> raft_worker_handle_io(raft);
> @@ -584,14 +592,8 @@ raft_worker_f(va_list args)
> raft_worker_handle_broadcast(raft);
> is_idle = false;
> }
> - if (is_idle) {
> - assert(raft_is_fully_on_disk(raft));
> - fiber_yield();
> - } else {
> - fiber_sleep(0);
> - }
> - }
> - return 0;
> + } while (!is_idle);
> + assert(raft_is_fully_on_disk(raft));
> }
>
> static void
> @@ -601,7 +603,7 @@ raft_sm_pause_and_dump(struct raft *raft)
> if (raft->is_write_in_progress)
> return;
> ev_timer_stop(loop(), &raft->timer);
> - raft_worker_wakeup(raft);
> + raft_schedule_async(raft);
> raft->is_write_in_progress = true;
> }
>
> @@ -962,45 +964,11 @@ raft_new_term(struct raft *raft)
> raft_sm_schedule_new_term(raft, raft->volatile_term + 1);
> }
>
> -static void
> -raft_worker_wakeup(struct raft *raft)
> -{
> - if (raft->worker == NULL) {
> - raft->worker = fiber_new("raft_worker", raft_worker_f);
> - if (raft->worker == NULL) {
> - /*
> - * XXX: should be handled properly, no need to panic.
> - * The issue though is that most of the Raft state
> - * machine functions are not supposed to fail, and also
> - * they usually wakeup the fiber when their work is
> - * finished. So it is too late to fail. On the other
> - * hand it looks not so good to create the fiber when
> - * Raft is initialized. Because then it will occupy
> - * memory even if Raft is not used.
> - */
> - diag_log();
> - panic("Could't create Raft worker fiber");
> - return;
> - }
> - raft->worker->f_arg = raft;
> - fiber_set_joinable(raft->worker, true);
> - }
> - /*
> - * Don't wake the fiber if it writes something (not cancellable).
> - * Otherwise it would be a spurious wakeup breaking the WAL write not
> - * adapted to this. Also don't wakeup the current fiber - it leads to
> - * undefined behaviour.
> - */
> - if ((raft->worker->flags & FIBER_IS_CANCELLABLE) != 0 &&
> - fiber() != raft->worker)
> - fiber_wakeup(raft->worker);
> -}
> -
> static void
> raft_schedule_broadcast(struct raft *raft)
> {
> raft->is_broadcast_scheduled = true;
> - raft_worker_wakeup(raft);
> + raft_schedule_async(raft);
> }
>
> void
> @@ -1025,10 +993,4 @@ raft_destroy(struct raft *raft)
> {
> ev_timer_stop(loop(), &raft->timer);
> trigger_destroy(&raft->on_update);
> - if (raft->worker != NULL) {
> - raft_worker_wakeup(raft);
> - fiber_cancel(raft->worker);
> - fiber_join(raft->worker);
> - raft->worker = NULL;
> - }
> }
> diff --git a/src/box/raftlib.h b/src/box/raftlib.h
> index 4f4d24ca8..f545224a5 100644
> --- a/src/box/raftlib.h
> +++ b/src/box/raftlib.h
> @@ -68,7 +68,6 @@ extern "C" {
> * than the configured one. See more details in the code.
> */
>
> -struct fiber;
> struct raft;
>
> enum raft_state {
> @@ -120,6 +119,7 @@ struct raft_msg {
>
> typedef void (*raft_broadcast_f)(struct raft *raft, const struct raft_msg *req);
> typedef void (*raft_write_f)(struct raft *raft, const struct raft_msg *req);
> +typedef void (*raft_schedule_async_f)(struct raft *raft);
>
> /**
> * Raft connection to the environment, via which it talks to other nodes and
> @@ -130,6 +130,11 @@ struct raft_vtab {
> raft_broadcast_f broadcast;
> /** Save a message to disk. */
> raft_write_f write;
> + /**
> + * Schedule asynchronous work which may yield, and it can't be done
> + * right now.
> + */
> + raft_schedule_async_f schedule_async;
> };
>
> struct raft {
> @@ -203,8 +208,6 @@ struct raft {
> const struct vclock *vclock;
> /** State machine timed event trigger. */
> struct ev_timer timer;
> - /** Worker fiber to execute blocking tasks like IO. */
> - struct fiber *worker;
> /** Configured election timeout in seconds. */
> double election_timeout;
> /**
> @@ -255,6 +258,10 @@ int
> raft_process_msg(struct raft *raft, const struct raft_msg *req,
> uint32_t source);
>
> +/** Process all asynchronous events accumulated by Raft. */
> +void
> +raft_process_async(struct raft *raft);
> +
> /**
> * Process a heartbeat message from an instance with the given ID. It is used to
> * watch leader's health and start election when necessary.
--
Serge Petrenko
More information about the Tarantool-patches
mailing list