From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp38.i.mail.ru (smtp38.i.mail.ru [94.100.177.98]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id C1F20469719 for ; Mon, 9 Nov 2020 16:46:20 +0300 (MSK) References: From: Serge Petrenko Message-ID: <62d3f8fa-20ae-477d-f73c-9aab04d43035@tarantool.org> Date: Mon, 9 Nov 2020 16:46:18 +0300 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-GB Subject: Re: [Tarantool-patches] [PATCH 3/4] raft: add explicit raft argument to all functions List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org, gorcunov@gmail.com 08.11.2020 21:03, Vladislav Shpilevoy пишет: > All raft functions worked with a global raft object. That would > make impossible to move raft to a separate module, where it could > be properly unit-tested with multiple raft nodes in each test. > > The patch adds an explicit raft pointer argument to each raft > function as a first part of moving raft to a separate library. > > The global object is renamed to box_raft so as to emphasize this > is a global box object, not from the future raft library. > > Part of #5303 > --- Hi! Thanks for the patch! LGTM with one question below. > src/box/applier.cc | 6 +- > src/box/box.cc | 27 +- > src/box/lua/info.c | 8 +- > src/box/memtx_engine.c | 4 +- > src/box/raft.c | 635 ++++++++++++++++++++++------------------- > src/box/raft.h | 68 +++-- > 6 files changed, 397 insertions(+), 351 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 7686d6cbc..0b0526ce5 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -893,7 +893,7 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row) > struct vclock candidate_clock; > if (xrow_decode_raft(row, &req, &candidate_clock) != 0) > return -1; > - return raft_process_msg(&req, applier->instance_id); > + return raft_process_msg(&box_raft, &req, applier->instance_id); > } > > /** > @@ -915,7 +915,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) > * anything, because won't change total number of rows sent in the > * network anyway. > */ > - if (!raft_is_source_allowed(applier->instance_id)) > + if (!raft_is_source_allowed(&box_raft, applier->instance_id)) > return 0; > struct xrow_header *first_row = &stailq_first_entry(rows, > struct applier_tx_row, next)->row; > @@ -1256,7 +1256,7 @@ applier_subscribe(struct applier *applier) > struct xrow_header *first_row = > &stailq_first_entry(&rows, struct applier_tx_row, > next)->row; > - raft_process_heartbeat(applier->instance_id); > + raft_process_heartbeat(&box_raft, applier->instance_id); > if (first_row->lsn == 0) { > if (unlikely(iproto_type_is_raft_request( > first_row->type))) { > diff --git a/src/box/box.cc b/src/box/box.cc > index 18568df3b..30b1ec065 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -162,7 +162,7 @@ void > box_update_ro_summary(void) > { > bool old_is_ro_summary = is_ro_summary; > - is_ro_summary = is_ro || is_orphan || raft_is_ro(); > + is_ro_summary = is_ro || is_orphan || raft_is_ro(&box_raft); > /* In 99% nothing changes. Filter this out first. */ > if (is_ro_summary == old_is_ro_summary) > return; > @@ -399,7 +399,7 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) > /* Vclock is never persisted in WAL by Raft. */ > if (xrow_decode_raft(row, &raft_req, NULL) != 0) > diag_raise(); > - raft_process_recovery(&raft_req); > + raft_process_recovery(&box_raft, &raft_req); > return; > } > xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); > @@ -796,8 +796,8 @@ box_set_election_mode(void) > const char *mode = box_check_election_mode(); > if (mode == NULL) > return -1; > - raft_cfg_is_candidate(strcmp(mode, "candidate") == 0); > - raft_cfg_is_enabled(strcmp(mode, "off") != 0); > + raft_cfg_is_candidate(&box_raft, strcmp(mode, "candidate") == 0); > + raft_cfg_is_enabled(&box_raft, strcmp(mode, "off") != 0); > return 0; > } > > @@ -807,7 +807,7 @@ box_set_election_timeout(void) > double d = box_check_election_timeout(); > if (d < 0) > return -1; > - raft_cfg_election_timeout(d); > + raft_cfg_election_timeout(&box_raft, d); > return 0; > } > > @@ -895,7 +895,7 @@ void > box_set_replication_timeout(void) > { > replication_timeout = box_check_replication_timeout(); > - raft_cfg_death_timeout(); > + raft_cfg_death_timeout(&box_raft); > } > > void > @@ -926,7 +926,7 @@ box_set_replication_synchro_quorum(void) > return -1; > replication_synchro_quorum = value; > txn_limbo_on_parameters_change(&txn_limbo); > - raft_cfg_election_quorum(); > + raft_cfg_election_quorum(&box_raft); > return 0; > } > > @@ -1065,7 +1065,7 @@ box_raft_on_update_f(struct trigger *trigger, void *event) > { > (void)trigger; > (void)event; > - if (raft.state != RAFT_STATE_LEADER) > + if (box_raft.state != RAFT_STATE_LEADER) > return 0; > /* > * When the node became a leader, it means it will ignore all records > @@ -2154,7 +2154,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) > tt_uuid_str(&replica_uuid), sio_socketname(io->fd)); > say_info("remote vclock %s local vclock %s", > vclock_to_string(&replica_clock), vclock_to_string(&vclock)); > - if (raft_is_enabled()) { > + if (raft_is_enabled(&box_raft)) { > /* > * Send out the current raft state of the instance. Don't do > * that if Raft is disabled. It can be that a part of the > @@ -2163,7 +2163,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) > * should be 0. > */ > struct raft_request req; > - raft_serialize_for_network(&req, &vclock); > + raft_serialize_for_network(&box_raft, &req, &vclock); > xrow_encode_raft(&row, &fiber()->gc, &req); > coio_write_xrow(io, &row); > } > @@ -2249,6 +2249,7 @@ box_free(void) > tuple_free(); > port_free(); > #endif > + box_raft_free(); > iproto_free(); > replication_free(); > sequence_free(); > @@ -2655,10 +2656,10 @@ box_init(void) > > txn_limbo_init(); > sequence_init(); > - raft_init(); > + box_raft_init(); > > trigger_create(&box_raft_on_update, box_raft_on_update_f, NULL, NULL); > - raft_on_update(&box_raft_on_update); > + raft_on_update(&box_raft, &box_raft_on_update); > } > > bool > @@ -2814,7 +2815,7 @@ box_cfg_xc(void) > * should take the control over the situation and start a new > * term immediately. > */ > - raft_new_term(); > + raft_new_term(&box_raft); > } > > /* box.cfg.read_only is not read yet. */ > diff --git a/src/box/lua/info.c b/src/box/lua/info.c > index 92d48c96c..07d09635e 100644 > --- a/src/box/lua/info.c > +++ b/src/box/lua/info.c > @@ -582,13 +582,13 @@ static int > lbox_info_election(struct lua_State *L) > { > lua_createtable(L, 0, 4); > - lua_pushstring(L, raft_state_str(raft.state)); > + lua_pushstring(L, raft_state_str(box_raft.state)); > lua_setfield(L, -2, "state"); > - luaL_pushuint64(L, raft.volatile_term); > + luaL_pushuint64(L, box_raft.volatile_term); > lua_setfield(L, -2, "term"); > - lua_pushinteger(L, raft.volatile_vote); > + lua_pushinteger(L, box_raft.volatile_vote); > lua_setfield(L, -2, "vote"); > - lua_pushinteger(L, raft.leader); > + lua_pushinteger(L, box_raft.leader); > lua_setfield(L, -2, "leader"); > return 1; > } > diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c > index 43000ba0b..f0cfdcdaa 100644 > --- a/src/box/memtx_engine.c > +++ b/src/box/memtx_engine.c > @@ -210,7 +210,7 @@ memtx_engine_recover_raft(const struct xrow_header *row) > /* Vclock is never persisted in WAL by Raft. */ > if (xrow_decode_raft(row, &req, NULL) != 0) > return -1; > - raft_process_recovery(&req); > + raft_process_recovery(&box_raft, &req); > return 0; > } > > @@ -554,7 +554,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit) > opts.free_cache = true; > xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts); > vclock_create(&ckpt->vclock); > - raft_serialize_for_disk(&ckpt->raft); > + raft_serialize_for_disk(&box_raft, &ckpt->raft); > ckpt->touch = false; > return ckpt; > } > diff --git a/src/box/raft.c b/src/box/raft.c > index a6a893373..c7db92494 100644 > --- a/src/box/raft.c > +++ b/src/box/raft.c > @@ -45,7 +45,7 @@ > #define RAFT_RANDOM_ELECTION_FACTOR 0.1 > > /** Raft state of this instance. */ > -struct raft raft = { > +struct raft box_raft = { > /* > * Set an invalid state to validate in all raft functions they are not > * used before raft initialization. > @@ -92,10 +92,10 @@ raft_state_str(uint32_t state) > * in any case. > */ > static bool > -raft_is_fully_on_disk(void) > +raft_is_fully_on_disk(const struct raft *raft) > { > - return raft.volatile_term == raft.term && > - raft.volatile_vote == raft.vote; > + return raft->volatile_term == raft->term && > + raft->volatile_vote == raft->vote; > } > > /** > @@ -106,9 +106,9 @@ raft_is_fully_on_disk(void) > * factor is a constant floating point value > 0. > */ > static inline double > -raft_new_random_election_shift(void) > +raft_new_random_election_shift(const struct raft *raft) > { > - double timeout = raft.election_timeout; > + double timeout = raft->election_timeout; > /* Translate to ms. Integer is needed to be able to use mod below. */ > uint32_t rand_part = > (uint32_t)(timeout * RAFT_RANDOM_ELECTION_FACTOR * 1000); > @@ -132,8 +132,9 @@ raft_new_random_election_shift(void) > * restart and forget who the previous leader was. > */ > static inline bool > -raft_can_vote_for(const struct vclock *v) > +raft_can_vote_for(const struct raft *raft, const struct vclock *v) > { > + (void)raft; > int cmp = vclock_compare_ignore0(v, &replicaset.vclock); > return cmp == 0 || cmp == 1; > } > @@ -176,8 +177,9 @@ raft_can_vote_for(const struct vclock *v) > * leader election quorum is affected. So synchronous data won't be lost. > */ > static inline int > -raft_election_quorum(void) > +raft_election_quorum(const struct raft *raft) > { > + (void)raft; > return MIN(replication_synchro_quorum, replicaset.registered_count); > } > > @@ -186,11 +188,11 @@ raft_election_quorum(void) > * does not exist yet, it is created. > */ > static void > -raft_worker_wakeup(void); > +raft_worker_wakeup(struct raft *raft); > > /** Schedule broadcast of the complete Raft state to all the followers. */ > static void > -raft_schedule_broadcast(void); > +raft_schedule_broadcast(struct raft *raft); > > /** Raft state machine methods. 'sm' stands for State Machine. */ > > @@ -201,7 +203,7 @@ raft_schedule_broadcast(void); > * it is writable. > */ > static void > -raft_sm_start(void); > +raft_sm_start(struct raft *raft); > > /** > * Stop the state machine. Now until Raft is re-enabled, > @@ -210,14 +212,14 @@ raft_sm_start(void); > * - this node can't vote. > */ > static void > -raft_sm_stop(void); > +raft_sm_stop(struct raft *raft); > > /** > * When the instance is a follower but is allowed to be a leader, it will wait > * for death of the current leader to start new election. > */ > static void > -raft_sm_wait_leader_dead(void); > +raft_sm_wait_leader_dead(struct raft *raft); > > /** > * Wait for the leader death timeout until a leader lets the node know he is > @@ -228,7 +230,7 @@ raft_sm_wait_leader_dead(void); > * restarts and may need some time to hear something from the leader. > */ > static void > -raft_sm_wait_leader_found(void); > +raft_sm_wait_leader_found(struct raft *raft); > > /** > * If election is started by this node, or it voted for some other node started > @@ -236,22 +238,22 @@ raft_sm_wait_leader_found(void); > * election times out. When it happens, the node will start new election. > */ > static void > -raft_sm_wait_election_end(void); > +raft_sm_wait_election_end(struct raft *raft); > > /** Bump volatile term and schedule its flush to disk. */ > static void > -raft_sm_schedule_new_term(uint64_t new_term); > +raft_sm_schedule_new_term(struct raft *raft, uint64_t new_term); > > /** Bump volatile vote and schedule its flush to disk. */ > static void > -raft_sm_schedule_new_vote(uint32_t new_vote); > +raft_sm_schedule_new_vote(struct raft *raft, uint32_t new_vote); > > /** > * Bump term and vote for self immediately. After that is persisted, the > * election timeout will be activated. Unless during that nothing newer happens. > */ > static void > -raft_sm_schedule_new_election(void); > +raft_sm_schedule_new_election(struct raft *raft); > > /** > * The main trigger of Raft state machine - start new election when the current > @@ -263,16 +265,16 @@ raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer, > > /** Start Raft state flush to disk. */ > static void > -raft_sm_pause_and_dump(void); > +raft_sm_pause_and_dump(struct raft *raft); > > static void > -raft_sm_become_leader(void); > +raft_sm_become_leader(struct raft *raft); > > static void > -raft_sm_follow_leader(uint32_t leader); > +raft_sm_follow_leader(struct raft *raft, uint32_t leader); > > static void > -raft_sm_become_candidate(void); > +raft_sm_become_candidate(struct raft *raft); > > static const char * > raft_request_to_string(const struct raft_request *req) > @@ -313,17 +315,17 @@ raft_request_to_string(const struct raft_request *req) > } > > void > -raft_process_recovery(const struct raft_request *req) > +raft_process_recovery(struct raft *raft, const struct raft_request *req) > { > - raft_validate(); > + raft_validate(raft); > say_verbose("RAFT: recover %s", raft_request_to_string(req)); > if (req->term != 0) { > - raft.term = req->term; > - raft.volatile_term = req->term; > + raft->term = req->term; > + raft->volatile_term = req->term; > } > if (req->vote != 0) { > - raft.vote = req->vote; > - raft.volatile_vote = req->vote; > + raft->vote = req->vote; > + raft->volatile_vote = req->vote; > } > /* > * Role is never persisted. If recovery is happening, the > @@ -338,13 +340,14 @@ raft_process_recovery(const struct raft_request *req) > */ > assert(req->vclock == NULL); > /* Raft is not enabled until recovery is finished. */ > - assert(!raft_is_enabled()); > + assert(!raft_is_enabled(raft)); > } > > int > -raft_process_msg(const struct raft_request *req, uint32_t source) > +raft_process_msg(struct raft *raft, const struct raft_request *req, > + uint32_t source) > { > - raft_validate(); > + raft_validate(raft); > say_info("RAFT: message %s from %u", raft_request_to_string(req), > source); > assert(source > 0); > @@ -361,32 +364,32 @@ raft_process_msg(const struct raft_request *req, uint32_t source) > return -1; > } > /* Outdated request. */ > - if (req->term < raft.volatile_term) { > + if (req->term < raft->volatile_term) { > say_info("RAFT: the message is ignored due to outdated term - " > - "current term is %u", raft.volatile_term); > + "current term is %u", raft->volatile_term); > return 0; > } > > /* Term bump. */ > - if (req->term > raft.volatile_term) > - raft_sm_schedule_new_term(req->term); > + if (req->term > raft->volatile_term) > + raft_sm_schedule_new_term(raft, req->term); > /* > * Either a vote request during an on-going election. Or an old vote > * persisted long time ago and still broadcasted. Or a vote response. > */ > if (req->vote != 0) { > - switch (raft.state) { > + switch (raft->state) { > case RAFT_STATE_FOLLOWER: > case RAFT_STATE_LEADER: > - if (!raft.is_enabled) { > + if (!raft->is_enabled) { > say_info("RAFT: vote request is skipped - RAFT " > "is disabled"); > break; > } > - if (raft.leader != 0) { > + if (raft->leader != 0) { > say_info("RAFT: vote request is skipped - the " > "leader is already known - %u", > - raft.leader); > + raft->leader); > break; > } > if (req->vote == instance_id) { > @@ -406,13 +409,13 @@ raft_process_msg(const struct raft_request *req, uint32_t source) > "for a third node, not a request"); > break; > } > - if (raft.volatile_vote != 0) { > + if (raft->volatile_vote != 0) { > say_info("RAFT: vote request is skipped - " > "already voted in this term"); > break; > } > /* Vclock is not NULL, validated above. */ > - if (!raft_can_vote_for(req->vclock)) { > + if (!raft_can_vote_for(raft, req->vclock)) { > say_info("RAFT: vote request is skipped - the " > "vclock is not acceptable"); > break; > @@ -421,7 +424,7 @@ raft_process_msg(const struct raft_request *req, uint32_t source) > * Either the term is new, or didn't vote in the current > * term yet. Anyway can vote now. > */ > - raft_sm_schedule_new_vote(req->vote); > + raft_sm_schedule_new_vote(raft, req->vote); > break; > case RAFT_STATE_CANDIDATE: > /* Check if this is a vote for a competing candidate. */ > @@ -434,39 +437,39 @@ raft_process_msg(const struct raft_request *req, uint32_t source) > * Vote for self was requested earlier in this round, > * and now was answered by some other instance. > */ > - assert(raft.volatile_vote == instance_id); > - int quorum = raft_election_quorum(); > - bool was_set = bit_set(&raft.vote_mask, source); > - raft.vote_count += !was_set; > - if (raft.vote_count < quorum) { > + assert(raft->volatile_vote == instance_id); > + int quorum = raft_election_quorum(raft); > + bool was_set = bit_set(&raft->vote_mask, source); > + raft->vote_count += !was_set; > + if (raft->vote_count < quorum) { > say_info("RAFT: accepted vote for self, vote " > - "count is %d/%d", raft.vote_count, > + "count is %d/%d", raft->vote_count, > quorum); > break; > } > - raft_sm_become_leader(); > + raft_sm_become_leader(raft); > break; > default: > unreachable(); > } > } > if (req->state != RAFT_STATE_LEADER) { > - if (source == raft.leader) { > + if (source == raft->leader) { > say_info("RAFT: the node %u has resigned from the " > - "leader role", raft.leader); > + "leader role", raft->leader); > /* > * Candidate node clears leader implicitly when starts a > * new term, but non-candidate won't do that, so clear > * it manually. > */ > - raft.leader = 0; > - if (raft.is_candidate) > - raft_sm_schedule_new_election(); > + raft->leader = 0; > + if (raft->is_candidate) > + raft_sm_schedule_new_election(raft); > } > return 0; > } > /* The node is a leader, but it is already known. */ > - if (source == raft.leader) > + if (source == raft->leader) > return 0; > /* > * XXX: A message from a conflicting leader. Split brain, basically. > @@ -474,21 +477,21 @@ raft_process_msg(const struct raft_request *req, uint32_t source) > * future either this node should try to become a leader, or should stop > * all writes and require manual intervention. > */ > - if (raft.leader != 0) { > + if (raft->leader != 0) { > say_warn("RAFT: conflicting leader detected in one term - " > - "known is %u, received %u", raft.leader, source); > + "known is %u, received %u", raft->leader, source); > return 0; > } > > /* New leader was elected. */ > - raft_sm_follow_leader(source); > + raft_sm_follow_leader(raft, source); > return 0; > } > > void > -raft_process_heartbeat(uint32_t source) > +raft_process_heartbeat(struct raft *raft, uint32_t source) > { > - raft_validate(); > + raft_validate(raft); > /* > * Raft handles heartbeats from all instances, including anon instances > * which don't participate in Raft. > @@ -499,19 +502,19 @@ raft_process_heartbeat(uint32_t source) > * When not a candidate - don't wait for anything. Therefore do not care > * about the leader being dead. > */ > - if (!raft.is_candidate) > + if (!raft->is_candidate) > return; > /* Don't care about heartbeats when this node is a leader itself. */ > - if (raft.state == RAFT_STATE_LEADER) > + if (raft->state == RAFT_STATE_LEADER) > return; > /* Not interested in heartbeats from not a leader. */ > - if (raft.leader != source) > + if (raft->leader != source) > return; > /* > * The instance currently is busy with writing something on disk. Can't > * react to heartbeats. > */ > - if (raft.is_write_in_progress) > + if (raft->is_write_in_progress) > return; > /* > * XXX: it may be expensive to reset the timer like that. It may be less > @@ -519,9 +522,9 @@ raft_process_heartbeat(uint32_t source) > * anything was heard from the leader. Then in the timer callback check > * the timestamp, and restart the timer, if it is fine. > */ > - assert(ev_is_active(&raft.timer)); > - ev_timer_stop(loop(), &raft.timer); > - raft_sm_wait_leader_dead(); > + assert(ev_is_active(&raft->timer)); > + ev_timer_stop(loop(), &raft->timer); > + raft_sm_wait_leader_dead(raft); > } > > /** Wakeup Raft state writer fiber waiting for WAL write end. */ > @@ -535,7 +538,6 @@ raft_write_cb(struct journal_entry *entry) > static void > raft_write_request(const struct raft_request *req) > { > - assert(raft.is_write_in_progress); Not related to this patch, just a thought. Looks like raft_write_request belongs to some other file now. Will you move it somewhere else? I don't know where to put it though. box.cc? > /* > * Vclock is never persisted by Raft. It is used only to > * be sent to network when vote for self. > @@ -579,104 +581,106 @@ fail: > > /* Dump Raft state to WAL in a blocking way. */ > static void > -raft_worker_handle_io(void) > +raft_worker_handle_io(struct raft *raft) > { > - assert(raft.is_write_in_progress); > + assert(raft->is_write_in_progress); > /* During write Raft can't be anything but a follower. */ > - assert(raft.state == RAFT_STATE_FOLLOWER); > + assert(raft->state == RAFT_STATE_FOLLOWER); > struct raft_request req; > > - if (raft_is_fully_on_disk()) { > + if (raft_is_fully_on_disk(raft)) { > end_dump: > - raft.is_write_in_progress = false; > + raft->is_write_in_progress = false; > /* > * The state machine is stable. Can see now, to what state to > * go. > */ > - if (!raft.is_candidate) { > + if (!raft->is_candidate) { > /* > * If not a candidate, can't do anything except vote for > * somebody (if Raft is enabled). Nothing to do except > * staying a follower without timeouts. > */ > - } else if (raft.leader != 0) { > + } else if (raft->leader != 0) { > /* There is a known leader. Wait until it is dead. */ > - raft_sm_wait_leader_dead(); > - } else if (raft.vote == instance_id) { > + raft_sm_wait_leader_dead(raft); > + } else if (raft->vote == instance_id) { > /* Just wrote own vote. */ > - if (raft_election_quorum() == 1) > - raft_sm_become_leader(); > + if (raft_election_quorum(raft) == 1) > + raft_sm_become_leader(raft); > else > - raft_sm_become_candidate(); > - } else if (raft.vote != 0) { > + raft_sm_become_candidate(raft); > + } else if (raft->vote != 0) { > /* > * Voted for some other node. Wait if it manages to > * become a leader. > */ > - raft_sm_wait_election_end(); > + raft_sm_wait_election_end(raft); > } else { > /* No leaders, no votes. */ > - raft_sm_schedule_new_vote(instance_id); > + raft_sm_schedule_new_vote(raft, instance_id); > } > } else { > memset(&req, 0, sizeof(req)); > - assert(raft.volatile_term >= raft.term); > - req.term = raft.volatile_term; > - req.vote = raft.volatile_vote; > + assert(raft->volatile_term >= raft->term); > + req.term = raft->volatile_term; > + req.vote = raft->volatile_vote; > > raft_write_request(&req); > say_info("RAFT: persisted state %s", > raft_request_to_string(&req)); > > - assert(req.term >= raft.term); > - raft.term = req.term; > - raft.vote = req.vote; > + assert(req.term >= raft->term); > + raft->term = req.term; > + raft->vote = req.vote; > /* > * Persistent state is visible, and it was changed - broadcast. > */ > - raft_schedule_broadcast(); > - if (raft_is_fully_on_disk()) > + raft_schedule_broadcast(raft); > + if (raft_is_fully_on_disk(raft)) > goto end_dump; > } > } > > /* Broadcast Raft complete state to the followers. */ > static void > -raft_worker_handle_broadcast(void) > +raft_worker_handle_broadcast(struct raft *raft) > { > - assert(raft.is_broadcast_scheduled); > + assert(raft->is_broadcast_scheduled); > struct raft_request req; > memset(&req, 0, sizeof(req)); > - req.term = raft.term; > - req.vote = raft.vote; > - req.state = raft.state; > + req.term = raft->term; > + req.vote = raft->vote; > + req.state = raft->state; > if (req.state == RAFT_STATE_CANDIDATE) { > - assert(raft.vote == instance_id); > + assert(raft->vote == instance_id); > req.vclock = &replicaset.vclock; > } > replicaset_foreach(replica) > relay_push_raft(replica->relay, &req); > - trigger_run(&raft.on_update, NULL); > - raft.is_broadcast_scheduled = false; > + trigger_run(&raft->on_update, NULL); > + raft->is_broadcast_scheduled = false; > } > > static int > raft_worker_f(va_list args) > { > (void)args; > + struct raft *raft = fiber()->arg; > + assert(raft == &box_raft); > bool is_idle; > while (!fiber_is_cancelled()) { > is_idle = true; > - if (raft.is_write_in_progress) { > - raft_worker_handle_io(); > + if (raft->is_write_in_progress) { > + raft_worker_handle_io(raft); > is_idle = false; > } > - if (raft.is_broadcast_scheduled) { > - raft_worker_handle_broadcast(); > + if (raft->is_broadcast_scheduled) { > + raft_worker_handle_broadcast(raft); > is_idle = false; > } > if (is_idle) { > - assert(raft_is_fully_on_disk()); > + assert(raft_is_fully_on_disk(raft)); > fiber_yield(); > } > fiber_sleep(0); > @@ -685,111 +689,111 @@ raft_worker_f(va_list args) > } > > static void > -raft_sm_pause_and_dump(void) > +raft_sm_pause_and_dump(struct raft *raft) > { > - assert(raft.state == RAFT_STATE_FOLLOWER); > - if (raft.is_write_in_progress) > + assert(raft->state == RAFT_STATE_FOLLOWER); > + if (raft->is_write_in_progress) > return; > - ev_timer_stop(loop(), &raft.timer); > - raft_worker_wakeup(); > - raft.is_write_in_progress = true; > + ev_timer_stop(loop(), &raft->timer); > + raft_worker_wakeup(raft); > + raft->is_write_in_progress = true; > } > > static void > -raft_sm_become_leader(void) > +raft_sm_become_leader(struct raft *raft) > { > - assert(raft.state != RAFT_STATE_LEADER); > + assert(raft->state != RAFT_STATE_LEADER); > say_info("RAFT: enter leader state with quorum %d", > - raft_election_quorum()); > - assert(raft.leader == 0); > - assert(raft.is_candidate); > - assert(!raft.is_write_in_progress); > - raft.state = RAFT_STATE_LEADER; > - raft.leader = instance_id; > - ev_timer_stop(loop(), &raft.timer); > + raft_election_quorum(raft)); > + assert(raft->leader == 0); > + assert(raft->is_candidate); > + assert(!raft->is_write_in_progress); > + raft->state = RAFT_STATE_LEADER; > + raft->leader = instance_id; > + ev_timer_stop(loop(), &raft->timer); > /* Make read-write (if other subsystems allow that. */ > box_update_ro_summary(); > /* State is visible and it is changed - broadcast. */ > - raft_schedule_broadcast(); > + raft_schedule_broadcast(raft); > } > > static void > -raft_sm_follow_leader(uint32_t leader) > +raft_sm_follow_leader(struct raft *raft, uint32_t leader) > { > say_info("RAFT: leader is %u, follow", leader); > - assert(raft.state != RAFT_STATE_LEADER); > - assert(raft.leader == 0); > - raft.state = RAFT_STATE_FOLLOWER; > - raft.leader = leader; > - if (!raft.is_write_in_progress && raft.is_candidate) { > - ev_timer_stop(loop(), &raft.timer); > - raft_sm_wait_leader_dead(); > + assert(raft->state != RAFT_STATE_LEADER); > + assert(raft->leader == 0); > + raft->state = RAFT_STATE_FOLLOWER; > + raft->leader = leader; > + if (!raft->is_write_in_progress && raft->is_candidate) { > + ev_timer_stop(loop(), &raft->timer); > + raft_sm_wait_leader_dead(raft); > } > /* State is visible and it is changed - broadcast. */ > - raft_schedule_broadcast(); > + raft_schedule_broadcast(raft); > } > > static void > -raft_sm_become_candidate(void) > +raft_sm_become_candidate(struct raft *raft) > { > say_info("RAFT: enter candidate state with 1 self vote"); > - assert(raft.state == RAFT_STATE_FOLLOWER); > - assert(raft.leader == 0); > - assert(raft.vote == instance_id); > - assert(raft.is_candidate); > - assert(!raft.is_write_in_progress); > - assert(raft_election_quorum() > 1); > - raft.state = RAFT_STATE_CANDIDATE; > - raft.vote_count = 1; > - raft.vote_mask = 0; > - bit_set(&raft.vote_mask, instance_id); > - raft_sm_wait_election_end(); > + assert(raft->state == RAFT_STATE_FOLLOWER); > + assert(raft->leader == 0); > + assert(raft->vote == instance_id); > + assert(raft->is_candidate); > + assert(!raft->is_write_in_progress); > + assert(raft_election_quorum(raft) > 1); > + raft->state = RAFT_STATE_CANDIDATE; > + raft->vote_count = 1; > + raft->vote_mask = 0; > + bit_set(&raft->vote_mask, instance_id); > + raft_sm_wait_election_end(raft); > /* State is visible and it is changed - broadcast. */ > - raft_schedule_broadcast(); > + raft_schedule_broadcast(raft); > } > > static void > -raft_sm_schedule_new_term(uint64_t new_term) > +raft_sm_schedule_new_term(struct raft *raft, uint64_t new_term) > { > say_info("RAFT: bump term to %llu, follow", new_term); > - assert(new_term > raft.volatile_term); > - assert(raft.volatile_term >= raft.term); > - raft.volatile_term = new_term; > + assert(new_term > raft->volatile_term); > + assert(raft->volatile_term >= raft->term); > + raft->volatile_term = new_term; > /* New terms means completely new Raft state. */ > - raft.volatile_vote = 0; > - raft.leader = 0; > - raft.state = RAFT_STATE_FOLLOWER; > + raft->volatile_vote = 0; > + raft->leader = 0; > + raft->state = RAFT_STATE_FOLLOWER; > box_update_ro_summary(); > - raft_sm_pause_and_dump(); > + raft_sm_pause_and_dump(raft); > /* > * State is visible and it is changed - broadcast. Term is also visible, > * but only persistent term. Volatile term is not broadcasted until > * saved to disk. > */ > - raft_schedule_broadcast(); > + raft_schedule_broadcast(raft); > } > > static void > -raft_sm_schedule_new_vote(uint32_t new_vote) > +raft_sm_schedule_new_vote(struct raft *raft, uint32_t new_vote) > { > - say_info("RAFT: vote for %u, follow", new_vote, raft.volatile_term); > - assert(raft.volatile_vote == 0); > - assert(raft.leader == 0); > - assert(raft.state == RAFT_STATE_FOLLOWER); > - raft.volatile_vote = new_vote; > - raft_sm_pause_and_dump(); > + say_info("RAFT: vote for %u, follow", new_vote, raft->volatile_term); > + assert(raft->volatile_vote == 0); > + assert(raft->leader == 0); > + assert(raft->state == RAFT_STATE_FOLLOWER); > + raft->volatile_vote = new_vote; > + raft_sm_pause_and_dump(raft); > /* Nothing visible is changed - no broadcast. */ > } > > static void > -raft_sm_schedule_new_election(void) > +raft_sm_schedule_new_election(struct raft *raft) > { > say_info("RAFT: begin new election round"); > - assert(raft_is_fully_on_disk()); > - assert(raft.is_candidate); > + assert(raft_is_fully_on_disk(raft)); > + assert(raft->is_candidate); > /* Everyone is a follower until its vote for self is persisted. */ > - raft_sm_schedule_new_term(raft.term + 1); > - raft_sm_schedule_new_vote(instance_id); > + raft_sm_schedule_new_term(raft, raft->term + 1); > + raft_sm_schedule_new_vote(raft, instance_id); > box_update_ro_summary(); > } > > @@ -797,75 +801,77 @@ static void > raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer, > int events) > { > - assert(timer == &raft.timer); > + struct raft *raft = timer->data; > + assert(timer == &raft->timer); > + assert(raft == &box_raft); > (void)events; > ev_timer_stop(loop, timer); > - raft_sm_schedule_new_election(); > + raft_sm_schedule_new_election(raft); > } > > static void > -raft_sm_wait_leader_dead(void) > +raft_sm_wait_leader_dead(struct raft *raft) > { > - assert(!ev_is_active(&raft.timer)); > - assert(!raft.is_write_in_progress); > - assert(raft.is_candidate); > - assert(raft.state == RAFT_STATE_FOLLOWER); > - assert(raft.leader != 0); > + assert(!ev_is_active(&raft->timer)); > + assert(!raft->is_write_in_progress); > + assert(raft->is_candidate); > + assert(raft->state == RAFT_STATE_FOLLOWER); > + assert(raft->leader != 0); > double death_timeout = replication_disconnect_timeout(); > - ev_timer_set(&raft.timer, death_timeout, death_timeout); > - ev_timer_start(loop(), &raft.timer); > + ev_timer_set(&raft->timer, death_timeout, death_timeout); > + ev_timer_start(loop(), &raft->timer); > } > > static void > -raft_sm_wait_leader_found(void) > +raft_sm_wait_leader_found(struct raft *raft) > { > - assert(!ev_is_active(&raft.timer)); > - assert(!raft.is_write_in_progress); > - assert(raft.is_candidate); > - assert(raft.state == RAFT_STATE_FOLLOWER); > - assert(raft.leader == 0); > + assert(!ev_is_active(&raft->timer)); > + assert(!raft->is_write_in_progress); > + assert(raft->is_candidate); > + assert(raft->state == RAFT_STATE_FOLLOWER); > + assert(raft->leader == 0); > double death_timeout = replication_disconnect_timeout(); > - ev_timer_set(&raft.timer, death_timeout, death_timeout); > - ev_timer_start(loop(), &raft.timer); > + ev_timer_set(&raft->timer, death_timeout, death_timeout); > + ev_timer_start(loop(), &raft->timer); > } > > static void > -raft_sm_wait_election_end(void) > +raft_sm_wait_election_end(struct raft *raft) > { > - assert(!ev_is_active(&raft.timer)); > - assert(!raft.is_write_in_progress); > - assert(raft.is_candidate); > - assert(raft.state == RAFT_STATE_FOLLOWER || > - (raft.state == RAFT_STATE_CANDIDATE && > - raft.volatile_vote == instance_id)); > - assert(raft.leader == 0); > - double election_timeout = raft.election_timeout + > - raft_new_random_election_shift(); > - ev_timer_set(&raft.timer, election_timeout, election_timeout); > - ev_timer_start(loop(), &raft.timer); > + assert(!ev_is_active(&raft->timer)); > + assert(!raft->is_write_in_progress); > + assert(raft->is_candidate); > + assert(raft->state == RAFT_STATE_FOLLOWER || > + (raft->state == RAFT_STATE_CANDIDATE && > + raft->volatile_vote == instance_id)); > + assert(raft->leader == 0); > + double election_timeout = raft->election_timeout + > + raft_new_random_election_shift(raft); > + ev_timer_set(&raft->timer, election_timeout, election_timeout); > + ev_timer_start(loop(), &raft->timer); > } > > static void > -raft_sm_start(void) > +raft_sm_start(struct raft *raft) > { > say_info("RAFT: start state machine"); > - assert(!ev_is_active(&raft.timer)); > - assert(!raft.is_enabled); > - assert(raft.state == RAFT_STATE_FOLLOWER); > - raft.is_enabled = true; > - raft.is_candidate = raft.is_cfg_candidate; > - if (raft.is_write_in_progress) { > + assert(!ev_is_active(&raft->timer)); > + assert(!raft->is_enabled); > + assert(raft->state == RAFT_STATE_FOLLOWER); > + raft->is_enabled = true; > + raft->is_candidate = raft->is_cfg_candidate; > + if (raft->is_write_in_progress) { > /* > * Nop. If write is in progress, the state machine is frozen. It > * is continued when write ends. > */ > - } else if (!raft.is_candidate) { > + } else if (!raft->is_candidate) { > /* > * Nop. When a node is not a candidate, it can't initiate > * elections anyway, so it does not need to monitor the leader. > */ > - } else if (raft.leader != 0) { > - raft_sm_wait_leader_dead(); > + } else if (raft->leader != 0) { > + raft_sm_wait_leader_dead(raft); > } else { > /* > * Don't start new election. The situation is most likely > @@ -874,39 +880,40 @@ raft_sm_start(void) > * disturb the current leader. Give it time to notify this node > * that there is a leader. > */ > - raft_sm_wait_leader_found(); > + raft_sm_wait_leader_found(raft); > } > box_update_ro_summary(); > } > > static void > -raft_sm_stop(void) > +raft_sm_stop(struct raft *raft) > { > say_info("RAFT: stop state machine"); > - assert(raft.is_enabled); > - raft.is_enabled = false; > - raft.is_candidate = false; > - if (raft.state == RAFT_STATE_LEADER) > - raft.leader = 0; > - raft.state = RAFT_STATE_FOLLOWER; > - ev_timer_stop(loop(), &raft.timer); > + assert(raft->is_enabled); > + raft->is_enabled = false; > + raft->is_candidate = false; > + if (raft->state == RAFT_STATE_LEADER) > + raft->leader = 0; > + raft->state = RAFT_STATE_FOLLOWER; > + ev_timer_stop(loop(), &raft->timer); > box_update_ro_summary(); > /* State is visible and changed - broadcast. */ > - raft_schedule_broadcast(); > + raft_schedule_broadcast(raft); > } > > void > -raft_serialize_for_network(struct raft_request *req, struct vclock *vclock) > +raft_serialize_for_network(const struct raft *raft, struct raft_request *req, > + struct vclock *vclock) > { > - raft_validate(); > + raft_validate(raft); > memset(req, 0, sizeof(*req)); > /* > * Volatile state is never used for any communications. > * Use only persisted state. > */ > - req->term = raft.term; > - req->vote = raft.vote; > - req->state = raft.state; > + req->term = raft->term; > + req->vote = raft->vote; > + req->state = raft->state; > /* > * Raft does not own vclock, so it always expects it passed externally. > * Vclock is sent out only by candidate instances. > @@ -918,134 +925,134 @@ raft_serialize_for_network(struct raft_request *req, struct vclock *vclock) > } > > void > -raft_serialize_for_disk(struct raft_request *req) > +raft_serialize_for_disk(const struct raft *raft, struct raft_request *req) > { > - raft_validate(); > + raft_validate(raft); > memset(req, 0, sizeof(*req)); > - req->term = raft.term; > - req->vote = raft.vote; > + req->term = raft->term; > + req->vote = raft->vote; > } > > void > -raft_on_update(struct trigger *trigger) > +raft_on_update(struct raft *raft, struct trigger *trigger) > { > - raft_validate(); > - trigger_add(&raft.on_update, trigger); > + raft_validate(raft); > + trigger_add(&raft->on_update, trigger); > } > > void > -raft_cfg_is_enabled(bool is_enabled) > +raft_cfg_is_enabled(struct raft *raft, bool is_enabled) > { > - raft_validate(); > - if (is_enabled == raft.is_enabled) > + raft_validate(raft); > + if (is_enabled == raft->is_enabled) > return; > > if (!is_enabled) > - raft_sm_stop(); > + raft_sm_stop(raft); > else > - raft_sm_start(); > + raft_sm_start(raft); > } > > void > -raft_cfg_is_candidate(bool is_candidate) > +raft_cfg_is_candidate(struct raft *raft, bool is_candidate) > { > - raft_validate(); > - bool old_is_candidate = raft.is_candidate; > - raft.is_cfg_candidate = is_candidate; > - raft.is_candidate = is_candidate && raft.is_enabled; > - if (raft.is_candidate == old_is_candidate) > + raft_validate(raft); > + bool old_is_candidate = raft->is_candidate; > + raft->is_cfg_candidate = is_candidate; > + raft->is_candidate = is_candidate && raft->is_enabled; > + if (raft->is_candidate == old_is_candidate) > return; > > - if (raft.is_candidate) { > - assert(raft.state == RAFT_STATE_FOLLOWER); > - if (raft.is_write_in_progress) { > + if (raft->is_candidate) { > + assert(raft->state == RAFT_STATE_FOLLOWER); > + if (raft->is_write_in_progress) { > /* > * If there is an on-going WAL write, it means there was > * some node who sent newer data to this node. So it is > * probably a better candidate. Anyway can't do anything > * until the new state is fully persisted. > */ > - } else if (raft.leader != 0) { > - raft_sm_wait_leader_dead(); > + } else if (raft->leader != 0) { > + raft_sm_wait_leader_dead(raft); > } else { > - raft_sm_wait_leader_found(); > + raft_sm_wait_leader_found(raft); > } > } else { > - if (raft.state != RAFT_STATE_LEADER) { > + if (raft->state != RAFT_STATE_LEADER) { > /* Do not wait for anything while being a voter. */ > - ev_timer_stop(loop(), &raft.timer); > + ev_timer_stop(loop(), &raft->timer); > } > - if (raft.state != RAFT_STATE_FOLLOWER) { > - if (raft.state == RAFT_STATE_LEADER) > - raft.leader = 0; > - raft.state = RAFT_STATE_FOLLOWER; > + if (raft->state != RAFT_STATE_FOLLOWER) { > + if (raft->state == RAFT_STATE_LEADER) > + raft->leader = 0; > + raft->state = RAFT_STATE_FOLLOWER; > /* State is visible and changed - broadcast. */ > - raft_schedule_broadcast(); > + raft_schedule_broadcast(raft); > } > } > box_update_ro_summary(); > } > > void > -raft_cfg_election_timeout(double timeout) > +raft_cfg_election_timeout(struct raft *raft, double timeout) > { > - raft_validate(); > - if (timeout == raft.election_timeout) > + raft_validate(raft); > + if (timeout == raft->election_timeout) > return; > > - raft.election_timeout = timeout; > - if (raft.vote != 0 && raft.leader == 0 && raft.is_candidate) { > - assert(ev_is_active(&raft.timer)); > - double timeout = ev_timer_remaining(loop(), &raft.timer) - > - raft.timer.at + raft.election_timeout; > - ev_timer_stop(loop(), &raft.timer); > - ev_timer_set(&raft.timer, timeout, timeout); > - ev_timer_start(loop(), &raft.timer); > + raft->election_timeout = timeout; > + if (raft->vote != 0 && raft->leader == 0 && raft->is_candidate) { > + assert(ev_is_active(&raft->timer)); > + double timeout = ev_timer_remaining(loop(), &raft->timer) - > + raft->timer.at + raft->election_timeout; > + ev_timer_stop(loop(), &raft->timer); > + ev_timer_set(&raft->timer, timeout, timeout); > + ev_timer_start(loop(), &raft->timer); > } > } > > void > -raft_cfg_election_quorum(void) > +raft_cfg_election_quorum(struct raft *raft) > { > - raft_validate(); > - if (raft.state != RAFT_STATE_CANDIDATE || > - raft.state == RAFT_STATE_LEADER) > + raft_validate(raft); > + if (raft->state != RAFT_STATE_CANDIDATE || > + raft->state == RAFT_STATE_LEADER) > return; > - if (raft.vote_count < raft_election_quorum()) > + if (raft->vote_count < raft_election_quorum(raft)) > return; > - raft_sm_become_leader(); > + raft_sm_become_leader(raft); > } > > void > -raft_cfg_death_timeout(void) > +raft_cfg_death_timeout(struct raft *raft) > { > - raft_validate(); > - if (raft.state == RAFT_STATE_FOLLOWER && raft.is_candidate && > - raft.leader != 0) { > - assert(ev_is_active(&raft.timer)); > + raft_validate(raft); > + if (raft->state == RAFT_STATE_FOLLOWER && raft->is_candidate && > + raft->leader != 0) { > + assert(ev_is_active(&raft->timer)); > double death_timeout = replication_disconnect_timeout(); > - double timeout = ev_timer_remaining(loop(), &raft.timer) - > - raft.timer.at + death_timeout; > - ev_timer_stop(loop(), &raft.timer); > - ev_timer_set(&raft.timer, timeout, timeout); > - ev_timer_start(loop(), &raft.timer); > + double timeout = ev_timer_remaining(loop(), &raft->timer) - > + raft->timer.at + death_timeout; > + ev_timer_stop(loop(), &raft->timer); > + ev_timer_set(&raft->timer, timeout, timeout); > + ev_timer_start(loop(), &raft->timer); > } > } > > void > -raft_new_term(void) > +raft_new_term(struct raft *raft) > { > - raft_validate(); > - if (raft.is_enabled) > - raft_sm_schedule_new_term(raft.volatile_term + 1); > + raft_validate(raft); > + if (raft->is_enabled) > + raft_sm_schedule_new_term(raft, raft->volatile_term + 1); > } > > static void > -raft_worker_wakeup(void) > +raft_worker_wakeup(struct raft *raft) > { > - if (raft.worker == NULL) { > - raft.worker = fiber_new("raft_worker", raft_worker_f); > - if (raft.worker == NULL) { > + if (raft->worker == NULL) { > + raft->worker = fiber_new("raft_worker", raft_worker_f); > + if (raft->worker == NULL) { > /* > * XXX: should be handled properly, no need to panic. > * The issue though is that most of the Raft state > @@ -1060,32 +1067,64 @@ raft_worker_wakeup(void) > panic("Could't create Raft worker fiber"); > return; > } > - fiber_set_joinable(raft.worker, true); > + raft->worker->arg = raft; > + fiber_set_joinable(raft->worker, true); > } > /* > * Don't wake the fiber if it writes something. Otherwise it would be a > * spurious wakeup breaking the WAL write not adapted to this. Also > * don't wakeup the current fiber - it leads to undefined behaviour. > */ > - if (!raft.is_write_in_progress && fiber() != raft.worker) > - fiber_wakeup(raft.worker); > + if (!raft->is_write_in_progress && fiber() != raft->worker) > + fiber_wakeup(raft->worker); > } > > static void > -raft_schedule_broadcast(void) > +raft_schedule_broadcast(struct raft *raft) > { > - raft.is_broadcast_scheduled = true; > - raft_worker_wakeup(); > + raft->is_broadcast_scheduled = true; > + raft_worker_wakeup(raft); > } > > void > -raft_init(void) > +raft_create(struct raft *raft) > { > - memset(&raft, 0, sizeof(raft)); > - raft.state = RAFT_STATE_FOLLOWER; > - raft.volatile_term = 1; > - raft.term = 1; > - raft.election_timeout = 5; > - ev_timer_init(&raft.timer, raft_sm_schedule_new_election_cb, 0, 0); > - rlist_create(&raft.on_update); > + memset(raft, 0, sizeof(*raft)); > + raft->state = RAFT_STATE_FOLLOWER; > + raft->volatile_term = 1; > + raft->term = 1; > + raft->election_timeout = 5; > + ev_timer_init(&raft->timer, raft_sm_schedule_new_election_cb, 0, 0); > + raft->timer.data = raft; > + rlist_create(&raft->on_update); > +} > + > +void > +raft_destroy(struct raft *raft) > +{ > + trigger_destroy(&raft->on_update); > + if (raft->worker != NULL) { > + raft_worker_wakeup(raft); > + fiber_cancel(raft->worker); > + fiber_join(raft->worker); > + raft->worker = NULL; > + } > + /* Invalidate so as any usage attempt would fail in raft_validate(). */ > + raft->state = 0; > +} > + > +void > +box_raft_init(void) > +{ > + raft_create(&box_raft); > +} > + > +void > +box_raft_free(void) > +{ > + trigger_destroy(&box_raft.on_update); > + /* > + * Can't join the fiber, because the event loop is stopped already, and > + * yields are not allowed. > + */ > } > diff --git a/src/box/raft.h b/src/box/raft.h > index 0c60eccdf..860062857 100644 > --- a/src/box/raft.h > +++ b/src/box/raft.h > @@ -163,16 +163,16 @@ struct raft { > struct rlist on_update; > }; > > -extern struct raft raft; > +extern struct raft box_raft; > > /** > * Ensure the raft node can be used. I.e. that it is properly initialized. > * Entirely for debug purposes. > */ > static inline void > -raft_validate(void) > +raft_validate(const struct raft *raft) > { > - assert(raft.state != 0); > + assert(raft->state != 0); > } > > /** > @@ -181,61 +181,58 @@ raft_validate(void) > * affected by box.cfg.read_only, connection quorum. > */ > static inline bool > -raft_is_ro(void) > +raft_is_ro(const struct raft *raft) > { > - raft_validate(); > - return raft.is_enabled && raft.state != RAFT_STATE_LEADER; > + raft_validate(raft); > + return raft->is_enabled && raft->state != RAFT_STATE_LEADER; > } > > /** See if the instance can accept rows from an instance with the given ID. */ > static inline bool > -raft_is_source_allowed(uint32_t source_id) > +raft_is_source_allowed(const struct raft *raft, uint32_t source_id) > { > - raft_validate(); > - return !raft.is_enabled || raft.leader == source_id; > + raft_validate(raft); > + return !raft->is_enabled || raft->leader == source_id; > } > > /** Check if Raft is enabled. */ > static inline bool > -raft_is_enabled(void) > +raft_is_enabled(const struct raft *raft) > { > - raft_validate(); > - return raft.is_enabled; > + raft_validate(raft); > + return raft->is_enabled; > } > > /** Process a raft entry stored in WAL/snapshot. */ > void > -raft_process_recovery(const struct raft_request *req); > +raft_process_recovery(struct raft *raft, const struct raft_request *req); > > -/** > - * Process a raft status message coming from the network. > - * @param req Raft request. > - * @param source Instance ID of the message sender. > - */ > +/** Process a raft status message coming from the network. */ > int > -raft_process_msg(const struct raft_request *req, uint32_t source); > +raft_process_msg(struct raft *raft, const struct raft_request *req, > + uint32_t source); > > /** > * Process a heartbeat message from an instance with the given ID. It is used to > * watch leader's health and start election when necessary. > */ > void > -raft_process_heartbeat(uint32_t source); > +raft_process_heartbeat(struct raft *raft, uint32_t source); > > /** Configure whether Raft is enabled. */ > void > -raft_cfg_is_enabled(bool is_enabled); > +raft_cfg_is_enabled(struct raft *raft, bool is_enabled); > > /** > * Configure whether the instance can be elected as Raft leader. Even if false, > * the node still can vote, when Raft is enabled. > */ > void > -raft_cfg_is_candidate(bool is_candidate); > +raft_cfg_is_candidate(struct raft *raft, bool is_candidate); > > /** Configure Raft leader election timeout. */ > void > -raft_cfg_election_timeout(double timeout); > +raft_cfg_election_timeout(struct raft *raft, double timeout); > > /** > * Configure Raft leader election quorum. There is no a separate option. > @@ -243,7 +240,7 @@ raft_cfg_election_timeout(double timeout); > * with synchronous replication. > */ > void > -raft_cfg_election_quorum(void); > +raft_cfg_election_quorum(struct raft *raft); > > /** > * Configure Raft leader death timeout. I.e. number of seconds without > @@ -251,7 +248,7 @@ raft_cfg_election_quorum(void); > * option. Raft uses replication timeout for that. > */ > void > -raft_cfg_death_timeout(void); > +raft_cfg_death_timeout(struct raft *raft); > > /** > * Bump the term. When it is persisted, the node checks if there is a leader, > @@ -259,32 +256,41 @@ raft_cfg_death_timeout(void); > * be used as tool to forcefully start new election, or restart an existing. > */ > void > -raft_new_term(void); > +raft_new_term(struct raft *raft); > > /** > * Save complete Raft state into a request to be sent to other instances of the > * cluster. It is allowed to save anything here, not only persistent state. > */ > void > -raft_serialize_for_network(struct raft_request *req, struct vclock *vclock); > +raft_serialize_for_network(const struct raft *raft, struct raft_request *req, > + struct vclock *vclock); > > /** > * Save complete Raft state into a request to be persisted on disk. Only term > * and vote are being persisted. > */ > void > -raft_serialize_for_disk(struct raft_request *req); > +raft_serialize_for_disk(const struct raft *raft, struct raft_request *req); > > /** > * Add a trigger invoked each time any of the Raft node visible attributes are > * changed. > */ > void > -raft_on_update(struct trigger *trigger); > +raft_on_update(struct raft *raft, struct trigger *trigger); > + > +void > +raft_create(struct raft *raft); > + > +void > +raft_destroy(struct raft *raft); > + > +void > +box_raft_init(void); > > -/** Initialize Raft global data structures. */ > void > -raft_init(void); > +box_raft_free(void); > > #if defined(__cplusplus) > } -- Serge Petrenko