From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 5D07A4696C0 for ; Fri, 20 Nov 2020 02:46:16 +0300 (MSK) From: Vladislav Shpilevoy Date: Fri, 20 Nov 2020 00:45:59 +0100 Message-Id: <0140da7888b07060ce154cadd719e23f50dda23f.1605829282.git.v.shpilevoy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v2 11/16] raft: move worker fiber from Raft library to box List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org, sergepetrenko@tarantool.org Worker fiber is used by Raft library to perform yielding tasks like WAL write, and simply long tasks like network broadcast. That allows not to block the Raft state machine, and to collect multiple updates during an event loop iteration to flush them all at once. While the worker fiber was inside Raft library, it wasn't possible to use it for anything else. And that is exactly what is going to be needed. The reason chain is quite long. It all starts from that the elimination of all box appearances from Raft library also includes relocation of box_update_ro_summary(). The only place it can be moved to is box_raft_on_update trigger. The trigger is currently called from the Raft worker fiber. It means, that between Raft state update and trigger invocation there is a yield. If box_update_ro_summary() would be blindly moved to the trigger, users sometimes could observe miracles like instance role being 'follower', but the node is still writable if it was a leader before, because box_raft_on_update wasn't invoked yet, and it didn't update RO summary. Assume, the on_update triggers are invoked by Raft not in the worker fiber, but right from the state machine. Then box_update_ro_summary() would always follow a state change without a yield. However that creates another problem - the trigger also calls box_clear_synchro_queue(), which yields. But on_update triggers must not yield so as not to block the state machine. This can be easily solved if it would be possible to schedule box_clear_synchro_queue() from on_update trigger to be executed later. And after this patch it becomes possible, because the worker fiber now can be used not only to handle Raft library async work, but also for box-Raft async work, like the synchro queue clearance. Part of #5303 --- src/box/raft.c | 65 ++++++++++++++++++++++++++++++++++- src/box/raftlib.c | 86 +++++++++++++---------------------------------- src/box/raftlib.h | 13 +++++-- 3 files changed, 98 insertions(+), 66 deletions(-) diff --git a/src/box/raft.c b/src/box/raft.c index 8a034687b..0027230da 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -49,6 +49,15 @@ struct raft box_raft_global = { */ static struct trigger box_raft_on_update; +/** + * Worker fiber does all the asynchronous work, which may need yields and can be + * long. These are WAL writes, network broadcasts. That allows not to block the + * Raft state machine. + */ +static struct fiber *box_raft_worker = NULL; +/** Flag installed each time when new work appears for the worker fiber. */ +static bool box_raft_has_work = false; + static void box_raft_msg_to_request(const struct raft_msg *msg, struct raft_request *req) { @@ -71,6 +80,59 @@ box_raft_request_to_msg(const struct raft_request *req, struct raft_msg *msg) }; } +static int +box_raft_worker_f(va_list args) +{ + (void)args; + struct raft *raft = fiber()->f_arg; + assert(raft == box_raft()); + while (!fiber_is_cancelled()) { + box_raft_has_work = false; + + raft_process_async(raft); + + if (!box_raft_has_work) + fiber_yield(); + } + return 0; +} + +static void +box_raft_schedule_async(struct raft *raft) +{ + assert(raft == box_raft()); + if (box_raft_worker == NULL) { + box_raft_worker = fiber_new("raft_worker", box_raft_worker_f); + if (box_raft_worker == NULL) { + /* + * XXX: should be handled properly, no need to panic. + * The issue though is that most of the Raft state + * machine functions are not supposed to fail, and also + * they usually wakeup the fiber when their work is + * finished. So it is too late to fail. On the other + * hand it looks not so good to create the fiber when + * Raft is initialized. Because then it will occupy + * memory even if Raft is not used. + */ + diag_log(); + panic("Could't create Raft worker fiber"); + return; + } + box_raft_worker->f_arg = raft; + fiber_set_joinable(box_raft_worker, true); + } + /* + * Don't wake the fiber if it writes something (not cancellable). + * Otherwise it would be a spurious wakeup breaking the WAL write not + * adapted to this. Also don't wakeup the current fiber - it leads to + * undefined behaviour. + */ + if ((box_raft_worker->flags & FIBER_IS_CANCELLABLE) != 0 && + fiber() != box_raft_worker) + fiber_wakeup(box_raft_worker); + box_raft_has_work = true; +} + static int box_raft_on_update_f(struct trigger *trigger, void *event) { @@ -242,6 +304,7 @@ box_raft_init(void) static const struct raft_vtab box_raft_vtab = { .broadcast = box_raft_broadcast, .write = box_raft_write, + .schedule_async = box_raft_schedule_async, }; raft_create(&box_raft_global, &box_raft_vtab); trigger_create(&box_raft_on_update, box_raft_on_update_f, NULL, NULL); @@ -255,7 +318,7 @@ box_raft_free(void) * Can't join the fiber, because the event loop is stopped already, and * yields are not allowed. */ - box_raft_global.worker = NULL; + box_raft_worker = NULL; raft_destroy(&box_raft_global); /* * Invalidate so as box_raft() would fail if any usage attempt happens. diff --git a/src/box/raftlib.c b/src/box/raftlib.c index a1fca34cd..4457a784f 100644 --- a/src/box/raftlib.c +++ b/src/box/raftlib.c @@ -75,6 +75,23 @@ raft_write(struct raft *raft, const struct raft_msg *req) raft->vtab->write(raft, req); } +/** + * Schedule async work. The Raft node owner should eventually process the async + * events. + */ +static inline void +raft_schedule_async(struct raft *raft) +{ + /* + * The method is called from inside of the state machine, when yields + * are not allowed for its simplicity. + */ + int csw = fiber()->csw; + raft->vtab->schedule_async(raft); + assert(csw == fiber()->csw); + (void)csw; +} + /** * Check if Raft is completely synced with disk. Meaning all its critical values * are in WAL. Only in that state the node can become a leader or a candidate. @@ -140,13 +157,6 @@ raft_can_vote_for(const struct raft *raft, const struct vclock *v) return cmp == 0 || cmp == 1; } -/** - * Wakeup the Raft worker fiber in order to do some async work. If the fiber - * does not exist yet, it is created. - */ -static void -raft_worker_wakeup(struct raft *raft); - /** Schedule broadcast of the complete Raft state to all the followers. */ static void raft_schedule_broadcast(struct raft *raft); @@ -568,13 +578,11 @@ raft_worker_handle_broadcast(struct raft *raft) raft->is_broadcast_scheduled = false; } -static int -raft_worker_f(va_list args) +void +raft_process_async(struct raft *raft) { - (void)args; - struct raft *raft = fiber()->f_arg; bool is_idle; - while (!fiber_is_cancelled()) { + do { is_idle = true; if (raft->is_write_in_progress) { raft_worker_handle_io(raft); @@ -584,14 +592,8 @@ raft_worker_f(va_list args) raft_worker_handle_broadcast(raft); is_idle = false; } - if (is_idle) { - assert(raft_is_fully_on_disk(raft)); - fiber_yield(); - } else { - fiber_sleep(0); - } - } - return 0; + } while (!is_idle); + assert(raft_is_fully_on_disk(raft)); } static void @@ -601,7 +603,7 @@ raft_sm_pause_and_dump(struct raft *raft) if (raft->is_write_in_progress) return; ev_timer_stop(loop(), &raft->timer); - raft_worker_wakeup(raft); + raft_schedule_async(raft); raft->is_write_in_progress = true; } @@ -962,45 +964,11 @@ raft_new_term(struct raft *raft) raft_sm_schedule_new_term(raft, raft->volatile_term + 1); } -static void -raft_worker_wakeup(struct raft *raft) -{ - if (raft->worker == NULL) { - raft->worker = fiber_new("raft_worker", raft_worker_f); - if (raft->worker == NULL) { - /* - * XXX: should be handled properly, no need to panic. - * The issue though is that most of the Raft state - * machine functions are not supposed to fail, and also - * they usually wakeup the fiber when their work is - * finished. So it is too late to fail. On the other - * hand it looks not so good to create the fiber when - * Raft is initialized. Because then it will occupy - * memory even if Raft is not used. - */ - diag_log(); - panic("Could't create Raft worker fiber"); - return; - } - raft->worker->f_arg = raft; - fiber_set_joinable(raft->worker, true); - } - /* - * Don't wake the fiber if it writes something (not cancellable). - * Otherwise it would be a spurious wakeup breaking the WAL write not - * adapted to this. Also don't wakeup the current fiber - it leads to - * undefined behaviour. - */ - if ((raft->worker->flags & FIBER_IS_CANCELLABLE) != 0 && - fiber() != raft->worker) - fiber_wakeup(raft->worker); -} - static void raft_schedule_broadcast(struct raft *raft) { raft->is_broadcast_scheduled = true; - raft_worker_wakeup(raft); + raft_schedule_async(raft); } void @@ -1025,10 +993,4 @@ raft_destroy(struct raft *raft) { ev_timer_stop(loop(), &raft->timer); trigger_destroy(&raft->on_update); - if (raft->worker != NULL) { - raft_worker_wakeup(raft); - fiber_cancel(raft->worker); - fiber_join(raft->worker); - raft->worker = NULL; - } } diff --git a/src/box/raftlib.h b/src/box/raftlib.h index 4f4d24ca8..f545224a5 100644 --- a/src/box/raftlib.h +++ b/src/box/raftlib.h @@ -68,7 +68,6 @@ extern "C" { * than the configured one. See more details in the code. */ -struct fiber; struct raft; enum raft_state { @@ -120,6 +119,7 @@ struct raft_msg { typedef void (*raft_broadcast_f)(struct raft *raft, const struct raft_msg *req); typedef void (*raft_write_f)(struct raft *raft, const struct raft_msg *req); +typedef void (*raft_schedule_async_f)(struct raft *raft); /** * Raft connection to the environment, via which it talks to other nodes and @@ -130,6 +130,11 @@ struct raft_vtab { raft_broadcast_f broadcast; /** Save a message to disk. */ raft_write_f write; + /** + * Schedule asynchronous work which may yield, and it can't be done + * right now. + */ + raft_schedule_async_f schedule_async; }; struct raft { @@ -203,8 +208,6 @@ struct raft { const struct vclock *vclock; /** State machine timed event trigger. */ struct ev_timer timer; - /** Worker fiber to execute blocking tasks like IO. */ - struct fiber *worker; /** Configured election timeout in seconds. */ double election_timeout; /** @@ -255,6 +258,10 @@ int raft_process_msg(struct raft *raft, const struct raft_msg *req, uint32_t source); +/** Process all asynchronous events accumulated by Raft. */ +void +raft_process_async(struct raft *raft); + /** * Process a heartbeat message from an instance with the given ID. It is used to * watch leader's health and start election when necessary. -- 2.24.3 (Apple Git-128)