[Tarantool-patches] [PATCH 3/4] raft: add explicit raft argument to all functions

Serge Petrenko sergepetrenko at tarantool.org
Mon Nov 9 16:46:18 MSK 2020


08.11.2020 21:03, Vladislav Shpilevoy пишет:
> All raft functions worked with a global raft object. That would
> make impossible to move raft to a separate module, where it could
> be properly unit-tested with multiple raft nodes in each test.
>
> The patch adds an explicit raft pointer argument to each raft
> function as a first part of moving raft to a separate library.
>
> The global object is renamed to box_raft so as to emphasize this
> is a global box object, not from the future raft library.
>
> Part of #5303
> ---

Hi! Thanks for the patch!

LGTM with one question below.

>   src/box/applier.cc     |   6 +-
>   src/box/box.cc         |  27 +-
>   src/box/lua/info.c     |   8 +-
>   src/box/memtx_engine.c |   4 +-
>   src/box/raft.c         | 635 ++++++++++++++++++++++-------------------
>   src/box/raft.h         |  68 +++--
>   6 files changed, 397 insertions(+), 351 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 7686d6cbc..0b0526ce5 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -893,7 +893,7 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
>   	struct vclock candidate_clock;
>   	if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
>   		return -1;
> -	return raft_process_msg(&req, applier->instance_id);
> +	return raft_process_msg(&box_raft, &req, applier->instance_id);
>   }
>   
>   /**
> @@ -915,7 +915,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>   	 * anything, because won't change total number of rows sent in the
>   	 * network anyway.
>   	 */
> -	if (!raft_is_source_allowed(applier->instance_id))
> +	if (!raft_is_source_allowed(&box_raft, applier->instance_id))
>   		return 0;
>   	struct xrow_header *first_row = &stailq_first_entry(rows,
>   					struct applier_tx_row, next)->row;
> @@ -1256,7 +1256,7 @@ applier_subscribe(struct applier *applier)
>   		struct xrow_header *first_row =
>   			&stailq_first_entry(&rows, struct applier_tx_row,
>   					    next)->row;
> -		raft_process_heartbeat(applier->instance_id);
> +		raft_process_heartbeat(&box_raft, applier->instance_id);
>   		if (first_row->lsn == 0) {
>   			if (unlikely(iproto_type_is_raft_request(
>   							first_row->type))) {
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 18568df3b..30b1ec065 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -162,7 +162,7 @@ void
>   box_update_ro_summary(void)
>   {
>   	bool old_is_ro_summary = is_ro_summary;
> -	is_ro_summary = is_ro || is_orphan || raft_is_ro();
> +	is_ro_summary = is_ro || is_orphan || raft_is_ro(&box_raft);
>   	/* In 99% nothing changes. Filter this out first. */
>   	if (is_ro_summary == old_is_ro_summary)
>   		return;
> @@ -399,7 +399,7 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
>   		/* Vclock is never persisted in WAL by Raft. */
>   		if (xrow_decode_raft(row, &raft_req, NULL) != 0)
>   			diag_raise();
> -		raft_process_recovery(&raft_req);
> +		raft_process_recovery(&box_raft, &raft_req);
>   		return;
>   	}
>   	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
> @@ -796,8 +796,8 @@ box_set_election_mode(void)
>   	const char *mode = box_check_election_mode();
>   	if (mode == NULL)
>   		return -1;
> -	raft_cfg_is_candidate(strcmp(mode, "candidate") == 0);
> -	raft_cfg_is_enabled(strcmp(mode, "off") != 0);
> +	raft_cfg_is_candidate(&box_raft, strcmp(mode, "candidate") == 0);
> +	raft_cfg_is_enabled(&box_raft, strcmp(mode, "off") != 0);
>   	return 0;
>   }
>   
> @@ -807,7 +807,7 @@ box_set_election_timeout(void)
>   	double d = box_check_election_timeout();
>   	if (d < 0)
>   		return -1;
> -	raft_cfg_election_timeout(d);
> +	raft_cfg_election_timeout(&box_raft, d);
>   	return 0;
>   }
>   
> @@ -895,7 +895,7 @@ void
>   box_set_replication_timeout(void)
>   {
>   	replication_timeout = box_check_replication_timeout();
> -	raft_cfg_death_timeout();
> +	raft_cfg_death_timeout(&box_raft);
>   }
>   
>   void
> @@ -926,7 +926,7 @@ box_set_replication_synchro_quorum(void)
>   		return -1;
>   	replication_synchro_quorum = value;
>   	txn_limbo_on_parameters_change(&txn_limbo);
> -	raft_cfg_election_quorum();
> +	raft_cfg_election_quorum(&box_raft);
>   	return 0;
>   }
>   
> @@ -1065,7 +1065,7 @@ box_raft_on_update_f(struct trigger *trigger, void *event)
>   {
>   	(void)trigger;
>   	(void)event;
> -	if (raft.state != RAFT_STATE_LEADER)
> +	if (box_raft.state != RAFT_STATE_LEADER)
>   		return 0;
>   	/*
>   	 * When the node became a leader, it means it will ignore all records
> @@ -2154,7 +2154,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
>   		 tt_uuid_str(&replica_uuid), sio_socketname(io->fd));
>   	say_info("remote vclock %s local vclock %s",
>   		 vclock_to_string(&replica_clock), vclock_to_string(&vclock));
> -	if (raft_is_enabled()) {
> +	if (raft_is_enabled(&box_raft)) {
>   		/*
>   		 * Send out the current raft state of the instance. Don't do
>   		 * that if Raft is disabled. It can be that a part of the
> @@ -2163,7 +2163,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
>   		 * should be 0.
>   		 */
>   		struct raft_request req;
> -		raft_serialize_for_network(&req, &vclock);
> +		raft_serialize_for_network(&box_raft, &req, &vclock);
>   		xrow_encode_raft(&row, &fiber()->gc, &req);
>   		coio_write_xrow(io, &row);
>   	}
> @@ -2249,6 +2249,7 @@ box_free(void)
>   		tuple_free();
>   		port_free();
>   #endif
> +		box_raft_free();
>   		iproto_free();
>   		replication_free();
>   		sequence_free();
> @@ -2655,10 +2656,10 @@ box_init(void)
>   
>   	txn_limbo_init();
>   	sequence_init();
> -	raft_init();
> +	box_raft_init();
>   
>   	trigger_create(&box_raft_on_update, box_raft_on_update_f, NULL, NULL);
> -	raft_on_update(&box_raft_on_update);
> +	raft_on_update(&box_raft, &box_raft_on_update);
>   }
>   
>   bool
> @@ -2814,7 +2815,7 @@ box_cfg_xc(void)
>   		 * should take the control over the situation and start a new
>   		 * term immediately.
>   		 */
> -		raft_new_term();
> +		raft_new_term(&box_raft);
>   	}
>   
>   	/* box.cfg.read_only is not read yet. */
> diff --git a/src/box/lua/info.c b/src/box/lua/info.c
> index 92d48c96c..07d09635e 100644
> --- a/src/box/lua/info.c
> +++ b/src/box/lua/info.c
> @@ -582,13 +582,13 @@ static int
>   lbox_info_election(struct lua_State *L)
>   {
>   	lua_createtable(L, 0, 4);
> -	lua_pushstring(L, raft_state_str(raft.state));
> +	lua_pushstring(L, raft_state_str(box_raft.state));
>   	lua_setfield(L, -2, "state");
> -	luaL_pushuint64(L, raft.volatile_term);
> +	luaL_pushuint64(L, box_raft.volatile_term);
>   	lua_setfield(L, -2, "term");
> -	lua_pushinteger(L, raft.volatile_vote);
> +	lua_pushinteger(L, box_raft.volatile_vote);
>   	lua_setfield(L, -2, "vote");
> -	lua_pushinteger(L, raft.leader);
> +	lua_pushinteger(L, box_raft.leader);
>   	lua_setfield(L, -2, "leader");
>   	return 1;
>   }
> diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
> index 43000ba0b..f0cfdcdaa 100644
> --- a/src/box/memtx_engine.c
> +++ b/src/box/memtx_engine.c
> @@ -210,7 +210,7 @@ memtx_engine_recover_raft(const struct xrow_header *row)
>   	/* Vclock is never persisted in WAL by Raft. */
>   	if (xrow_decode_raft(row, &req, NULL) != 0)
>   		return -1;
> -	raft_process_recovery(&req);
> +	raft_process_recovery(&box_raft, &req);
>   	return 0;
>   }
>   
> @@ -554,7 +554,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
>   	opts.free_cache = true;
>   	xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
>   	vclock_create(&ckpt->vclock);
> -	raft_serialize_for_disk(&ckpt->raft);
> +	raft_serialize_for_disk(&box_raft, &ckpt->raft);
>   	ckpt->touch = false;
>   	return ckpt;
>   }
> diff --git a/src/box/raft.c b/src/box/raft.c
> index a6a893373..c7db92494 100644
> --- a/src/box/raft.c
> +++ b/src/box/raft.c
> @@ -45,7 +45,7 @@
>   #define RAFT_RANDOM_ELECTION_FACTOR 0.1
>   
>   /** Raft state of this instance. */
> -struct raft raft = {
> +struct raft box_raft = {
>   	/*
>   	 * Set an invalid state to validate in all raft functions they are not
>   	 * used before raft initialization.
> @@ -92,10 +92,10 @@ raft_state_str(uint32_t state)
>    * in any case.
>    */
>   static bool
> -raft_is_fully_on_disk(void)
> +raft_is_fully_on_disk(const struct raft *raft)
>   {
> -	return raft.volatile_term == raft.term &&
> -	       raft.volatile_vote == raft.vote;
> +	return raft->volatile_term == raft->term &&
> +	       raft->volatile_vote == raft->vote;
>   }
>   
>   /**
> @@ -106,9 +106,9 @@ raft_is_fully_on_disk(void)
>    * factor is a constant floating point value > 0.
>    */
>   static inline double
> -raft_new_random_election_shift(void)
> +raft_new_random_election_shift(const struct raft *raft)
>   {
> -	double timeout = raft.election_timeout;
> +	double timeout = raft->election_timeout;
>   	/* Translate to ms. Integer is needed to be able to use mod below. */
>   	uint32_t rand_part =
>   		(uint32_t)(timeout * RAFT_RANDOM_ELECTION_FACTOR * 1000);
> @@ -132,8 +132,9 @@ raft_new_random_election_shift(void)
>    * restart and forget who the previous leader was.
>    */
>   static inline bool
> -raft_can_vote_for(const struct vclock *v)
> +raft_can_vote_for(const struct raft *raft, const struct vclock *v)
>   {
> +	(void)raft;
>   	int cmp = vclock_compare_ignore0(v, &replicaset.vclock);
>   	return cmp == 0 || cmp == 1;
>   }
> @@ -176,8 +177,9 @@ raft_can_vote_for(const struct vclock *v)
>    *   leader election quorum is affected. So synchronous data won't be lost.
>    */
>   static inline int
> -raft_election_quorum(void)
> +raft_election_quorum(const struct raft *raft)
>   {
> +	(void)raft;
>   	return MIN(replication_synchro_quorum, replicaset.registered_count);
>   }
>   
> @@ -186,11 +188,11 @@ raft_election_quorum(void)
>    * does not exist yet, it is created.
>    */
>   static void
> -raft_worker_wakeup(void);
> +raft_worker_wakeup(struct raft *raft);
>   
>   /** Schedule broadcast of the complete Raft state to all the followers. */
>   static void
> -raft_schedule_broadcast(void);
> +raft_schedule_broadcast(struct raft *raft);
>   
>   /** Raft state machine methods. 'sm' stands for State Machine. */
>   
> @@ -201,7 +203,7 @@ raft_schedule_broadcast(void);
>    * it is writable.
>    */
>   static void
> -raft_sm_start(void);
> +raft_sm_start(struct raft *raft);
>   
>   /**
>    * Stop the state machine. Now until Raft is re-enabled,
> @@ -210,14 +212,14 @@ raft_sm_start(void);
>    * - this node can't vote.
>    */
>   static void
> -raft_sm_stop(void);
> +raft_sm_stop(struct raft *raft);
>   
>   /**
>    * When the instance is a follower but is allowed to be a leader, it will wait
>    * for death of the current leader to start new election.
>    */
>   static void
> -raft_sm_wait_leader_dead(void);
> +raft_sm_wait_leader_dead(struct raft *raft);
>   
>   /**
>    * Wait for the leader death timeout until a leader lets the node know he is
> @@ -228,7 +230,7 @@ raft_sm_wait_leader_dead(void);
>    * restarts and may need some time to hear something from the leader.
>    */
>   static void
> -raft_sm_wait_leader_found(void);
> +raft_sm_wait_leader_found(struct raft *raft);
>   
>   /**
>    * If election is started by this node, or it voted for some other node started
> @@ -236,22 +238,22 @@ raft_sm_wait_leader_found(void);
>    * election times out. When it happens, the node will start new election.
>    */
>   static void
> -raft_sm_wait_election_end(void);
> +raft_sm_wait_election_end(struct raft *raft);
>   
>   /** Bump volatile term and schedule its flush to disk. */
>   static void
> -raft_sm_schedule_new_term(uint64_t new_term);
> +raft_sm_schedule_new_term(struct raft *raft, uint64_t new_term);
>   
>   /** Bump volatile vote and schedule its flush to disk. */
>   static void
> -raft_sm_schedule_new_vote(uint32_t new_vote);
> +raft_sm_schedule_new_vote(struct raft *raft, uint32_t new_vote);
>   
>   /**
>    * Bump term and vote for self immediately. After that is persisted, the
>    * election timeout will be activated. Unless during that nothing newer happens.
>    */
>   static void
> -raft_sm_schedule_new_election(void);
> +raft_sm_schedule_new_election(struct raft *raft);
>   
>   /**
>    * The main trigger of Raft state machine - start new election when the current
> @@ -263,16 +265,16 @@ raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
>   
>   /** Start Raft state flush to disk. */
>   static void
> -raft_sm_pause_and_dump(void);
> +raft_sm_pause_and_dump(struct raft *raft);
>   
>   static void
> -raft_sm_become_leader(void);
> +raft_sm_become_leader(struct raft *raft);
>   
>   static void
> -raft_sm_follow_leader(uint32_t leader);
> +raft_sm_follow_leader(struct raft *raft, uint32_t leader);
>   
>   static void
> -raft_sm_become_candidate(void);
> +raft_sm_become_candidate(struct raft *raft);
>   
>   static const char *
>   raft_request_to_string(const struct raft_request *req)
> @@ -313,17 +315,17 @@ raft_request_to_string(const struct raft_request *req)
>   }
>   
>   void
> -raft_process_recovery(const struct raft_request *req)
> +raft_process_recovery(struct raft *raft, const struct raft_request *req)
>   {
> -	raft_validate();
> +	raft_validate(raft);
>   	say_verbose("RAFT: recover %s", raft_request_to_string(req));
>   	if (req->term != 0) {
> -		raft.term = req->term;
> -		raft.volatile_term = req->term;
> +		raft->term = req->term;
> +		raft->volatile_term = req->term;
>   	}
>   	if (req->vote != 0) {
> -		raft.vote = req->vote;
> -		raft.volatile_vote = req->vote;
> +		raft->vote = req->vote;
> +		raft->volatile_vote = req->vote;
>   	}
>   	/*
>   	 * Role is never persisted. If recovery is happening, the
> @@ -338,13 +340,14 @@ raft_process_recovery(const struct raft_request *req)
>   	 */
>   	assert(req->vclock == NULL);
>   	/* Raft is not enabled until recovery is finished. */
> -	assert(!raft_is_enabled());
> +	assert(!raft_is_enabled(raft));
>   }
>   
>   int
> -raft_process_msg(const struct raft_request *req, uint32_t source)
> +raft_process_msg(struct raft *raft, const struct raft_request *req,
> +		 uint32_t source)
>   {
> -	raft_validate();
> +	raft_validate(raft);
>   	say_info("RAFT: message %s from %u", raft_request_to_string(req),
>   		 source);
>   	assert(source > 0);
> @@ -361,32 +364,32 @@ raft_process_msg(const struct raft_request *req, uint32_t source)
>   		return -1;
>   	}
>   	/* Outdated request. */
> -	if (req->term < raft.volatile_term) {
> +	if (req->term < raft->volatile_term) {
>   		say_info("RAFT: the message is ignored due to outdated term - "
> -			 "current term is %u", raft.volatile_term);
> +			 "current term is %u", raft->volatile_term);
>   		return 0;
>   	}
>   
>   	/* Term bump. */
> -	if (req->term > raft.volatile_term)
> -		raft_sm_schedule_new_term(req->term);
> +	if (req->term > raft->volatile_term)
> +		raft_sm_schedule_new_term(raft, req->term);
>   	/*
>   	 * Either a vote request during an on-going election. Or an old vote
>   	 * persisted long time ago and still broadcasted. Or a vote response.
>   	 */
>   	if (req->vote != 0) {
> -		switch (raft.state) {
> +		switch (raft->state) {
>   		case RAFT_STATE_FOLLOWER:
>   		case RAFT_STATE_LEADER:
> -			if (!raft.is_enabled) {
> +			if (!raft->is_enabled) {
>   				say_info("RAFT: vote request is skipped - RAFT "
>   					 "is disabled");
>   				break;
>   			}
> -			if (raft.leader != 0) {
> +			if (raft->leader != 0) {
>   				say_info("RAFT: vote request is skipped - the "
>   					 "leader is already known - %u",
> -					 raft.leader);
> +					 raft->leader);
>   				break;
>   			}
>   			if (req->vote == instance_id) {
> @@ -406,13 +409,13 @@ raft_process_msg(const struct raft_request *req, uint32_t source)
>   					 "for a third node, not a request");
>   				break;
>   			}
> -			if (raft.volatile_vote != 0) {
> +			if (raft->volatile_vote != 0) {
>   				say_info("RAFT: vote request is skipped - "
>   					 "already voted in this term");
>   				break;
>   			}
>   			/* Vclock is not NULL, validated above. */
> -			if (!raft_can_vote_for(req->vclock)) {
> +			if (!raft_can_vote_for(raft, req->vclock)) {
>   				say_info("RAFT: vote request is skipped - the "
>   					 "vclock is not acceptable");
>   				break;
> @@ -421,7 +424,7 @@ raft_process_msg(const struct raft_request *req, uint32_t source)
>   			 * Either the term is new, or didn't vote in the current
>   			 * term yet. Anyway can vote now.
>   			 */
> -			raft_sm_schedule_new_vote(req->vote);
> +			raft_sm_schedule_new_vote(raft, req->vote);
>   			break;
>   		case RAFT_STATE_CANDIDATE:
>   			/* Check if this is a vote for a competing candidate. */
> @@ -434,39 +437,39 @@ raft_process_msg(const struct raft_request *req, uint32_t source)
>   			 * Vote for self was requested earlier in this round,
>   			 * and now was answered by some other instance.
>   			 */
> -			assert(raft.volatile_vote == instance_id);
> -			int quorum = raft_election_quorum();
> -			bool was_set = bit_set(&raft.vote_mask, source);
> -			raft.vote_count += !was_set;
> -			if (raft.vote_count < quorum) {
> +			assert(raft->volatile_vote == instance_id);
> +			int quorum = raft_election_quorum(raft);
> +			bool was_set = bit_set(&raft->vote_mask, source);
> +			raft->vote_count += !was_set;
> +			if (raft->vote_count < quorum) {
>   				say_info("RAFT: accepted vote for self, vote "
> -					 "count is %d/%d", raft.vote_count,
> +					 "count is %d/%d", raft->vote_count,
>   					 quorum);
>   				break;
>   			}
> -			raft_sm_become_leader();
> +			raft_sm_become_leader(raft);
>   			break;
>   		default:
>   			unreachable();
>   		}
>   	}
>   	if (req->state != RAFT_STATE_LEADER) {
> -		if (source == raft.leader) {
> +		if (source == raft->leader) {
>   			say_info("RAFT: the node %u has resigned from the "
> -				 "leader role", raft.leader);
> +				 "leader role", raft->leader);
>   			/*
>   			 * Candidate node clears leader implicitly when starts a
>   			 * new term, but non-candidate won't do that, so clear
>   			 * it manually.
>   			 */
> -			raft.leader = 0;
> -			if (raft.is_candidate)
> -				raft_sm_schedule_new_election();
> +			raft->leader = 0;
> +			if (raft->is_candidate)
> +				raft_sm_schedule_new_election(raft);
>   		}
>   		return 0;
>   	}
>   	/* The node is a leader, but it is already known. */
> -	if (source == raft.leader)
> +	if (source == raft->leader)
>   		return 0;
>   	/*
>   	 * XXX: A message from a conflicting leader. Split brain, basically.
> @@ -474,21 +477,21 @@ raft_process_msg(const struct raft_request *req, uint32_t source)
>   	 * future either this node should try to become a leader, or should stop
>   	 * all writes and require manual intervention.
>   	 */
> -	if (raft.leader != 0) {
> +	if (raft->leader != 0) {
>   		say_warn("RAFT: conflicting leader detected in one term - "
> -			 "known is %u, received %u", raft.leader, source);
> +			 "known is %u, received %u", raft->leader, source);
>   		return 0;
>   	}
>   
>   	/* New leader was elected. */
> -	raft_sm_follow_leader(source);
> +	raft_sm_follow_leader(raft, source);
>   	return 0;
>   }
>   
>   void
> -raft_process_heartbeat(uint32_t source)
> +raft_process_heartbeat(struct raft *raft, uint32_t source)
>   {
> -	raft_validate();
> +	raft_validate(raft);
>   	/*
>   	 * Raft handles heartbeats from all instances, including anon instances
>   	 * which don't participate in Raft.
> @@ -499,19 +502,19 @@ raft_process_heartbeat(uint32_t source)
>   	 * When not a candidate - don't wait for anything. Therefore do not care
>   	 * about the leader being dead.
>   	 */
> -	if (!raft.is_candidate)
> +	if (!raft->is_candidate)
>   		return;
>   	/* Don't care about heartbeats when this node is a leader itself. */
> -	if (raft.state == RAFT_STATE_LEADER)
> +	if (raft->state == RAFT_STATE_LEADER)
>   		return;
>   	/* Not interested in heartbeats from not a leader. */
> -	if (raft.leader != source)
> +	if (raft->leader != source)
>   		return;
>   	/*
>   	 * The instance currently is busy with writing something on disk. Can't
>   	 * react to heartbeats.
>   	 */
> -	if (raft.is_write_in_progress)
> +	if (raft->is_write_in_progress)
>   		return;
>   	/*
>   	 * XXX: it may be expensive to reset the timer like that. It may be less
> @@ -519,9 +522,9 @@ raft_process_heartbeat(uint32_t source)
>   	 * anything was heard from the leader. Then in the timer callback check
>   	 * the timestamp, and restart the timer, if it is fine.
>   	 */
> -	assert(ev_is_active(&raft.timer));
> -	ev_timer_stop(loop(), &raft.timer);
> -	raft_sm_wait_leader_dead();
> +	assert(ev_is_active(&raft->timer));
> +	ev_timer_stop(loop(), &raft->timer);
> +	raft_sm_wait_leader_dead(raft);
>   }
>   
>   /** Wakeup Raft state writer fiber waiting for WAL write end. */
> @@ -535,7 +538,6 @@ raft_write_cb(struct journal_entry *entry)
>   static void
>   raft_write_request(const struct raft_request *req)
>   {
> -	assert(raft.is_write_in_progress);
Not related to this patch, just a thought.
Looks like raft_write_request belongs to some other file now.
Will you move it somewhere else?
I don't know where to put it though. box.cc?
>   	/*
>   	 * Vclock is never persisted by Raft. It is used only to
>   	 * be sent to network when vote for self.
> @@ -579,104 +581,106 @@ fail:
>   
>   /* Dump Raft state to WAL in a blocking way. */
>   static void
> -raft_worker_handle_io(void)
> +raft_worker_handle_io(struct raft *raft)
>   {
> -	assert(raft.is_write_in_progress);
> +	assert(raft->is_write_in_progress);
>   	/* During write Raft can't be anything but a follower. */
> -	assert(raft.state == RAFT_STATE_FOLLOWER);
> +	assert(raft->state == RAFT_STATE_FOLLOWER);
>   	struct raft_request req;
>   
> -	if (raft_is_fully_on_disk()) {
> +	if (raft_is_fully_on_disk(raft)) {
>   end_dump:
> -		raft.is_write_in_progress = false;
> +		raft->is_write_in_progress = false;
>   		/*
>   		 * The state machine is stable. Can see now, to what state to
>   		 * go.
>   		 */
> -		if (!raft.is_candidate) {
> +		if (!raft->is_candidate) {
>   			/*
>   			 * If not a candidate, can't do anything except vote for
>   			 * somebody (if Raft is enabled). Nothing to do except
>   			 * staying a follower without timeouts.
>   			 */
> -		} else if (raft.leader != 0) {
> +		} else if (raft->leader != 0) {
>   			/* There is a known leader. Wait until it is dead. */
> -			raft_sm_wait_leader_dead();
> -		} else if (raft.vote == instance_id) {
> +			raft_sm_wait_leader_dead(raft);
> +		} else if (raft->vote == instance_id) {
>   			/* Just wrote own vote. */
> -			if (raft_election_quorum() == 1)
> -				raft_sm_become_leader();
> +			if (raft_election_quorum(raft) == 1)
> +				raft_sm_become_leader(raft);
>   			else
> -				raft_sm_become_candidate();
> -		} else if (raft.vote != 0) {
> +				raft_sm_become_candidate(raft);
> +		} else if (raft->vote != 0) {
>   			/*
>   			 * Voted for some other node. Wait if it manages to
>   			 * become a leader.
>   			 */
> -			raft_sm_wait_election_end();
> +			raft_sm_wait_election_end(raft);
>   		} else {
>   			/* No leaders, no votes. */
> -			raft_sm_schedule_new_vote(instance_id);
> +			raft_sm_schedule_new_vote(raft, instance_id);
>   		}
>   	} else {
>   		memset(&req, 0, sizeof(req));
> -		assert(raft.volatile_term >= raft.term);
> -		req.term = raft.volatile_term;
> -		req.vote = raft.volatile_vote;
> +		assert(raft->volatile_term >= raft->term);
> +		req.term = raft->volatile_term;
> +		req.vote = raft->volatile_vote;
>   
>   		raft_write_request(&req);
>   		say_info("RAFT: persisted state %s",
>   			 raft_request_to_string(&req));
>   
> -		assert(req.term >= raft.term);
> -		raft.term = req.term;
> -		raft.vote = req.vote;
> +		assert(req.term >= raft->term);
> +		raft->term = req.term;
> +		raft->vote = req.vote;
>   		/*
>   		 * Persistent state is visible, and it was changed - broadcast.
>   		 */
> -		raft_schedule_broadcast();
> -		if (raft_is_fully_on_disk())
> +		raft_schedule_broadcast(raft);
> +		if (raft_is_fully_on_disk(raft))
>   			goto end_dump;
>   	}
>   }
>   
>   /* Broadcast Raft complete state to the followers. */
>   static void
> -raft_worker_handle_broadcast(void)
> +raft_worker_handle_broadcast(struct raft *raft)
>   {
> -	assert(raft.is_broadcast_scheduled);
> +	assert(raft->is_broadcast_scheduled);
>   	struct raft_request req;
>   	memset(&req, 0, sizeof(req));
> -	req.term = raft.term;
> -	req.vote = raft.vote;
> -	req.state = raft.state;
> +	req.term = raft->term;
> +	req.vote = raft->vote;
> +	req.state = raft->state;
>   	if (req.state == RAFT_STATE_CANDIDATE) {
> -		assert(raft.vote == instance_id);
> +		assert(raft->vote == instance_id);
>   		req.vclock = &replicaset.vclock;
>   	}
>   	replicaset_foreach(replica)
>   		relay_push_raft(replica->relay, &req);
> -	trigger_run(&raft.on_update, NULL);
> -	raft.is_broadcast_scheduled = false;
> +	trigger_run(&raft->on_update, NULL);
> +	raft->is_broadcast_scheduled = false;
>   }
>   
>   static int
>   raft_worker_f(va_list args)
>   {
>   	(void)args;
> +	struct raft *raft = fiber()->arg;
> +	assert(raft == &box_raft);
>   	bool is_idle;
>   	while (!fiber_is_cancelled()) {
>   		is_idle = true;
> -		if (raft.is_write_in_progress) {
> -			raft_worker_handle_io();
> +		if (raft->is_write_in_progress) {
> +			raft_worker_handle_io(raft);
>   			is_idle = false;
>   		}
> -		if (raft.is_broadcast_scheduled) {
> -			raft_worker_handle_broadcast();
> +		if (raft->is_broadcast_scheduled) {
> +			raft_worker_handle_broadcast(raft);
>   			is_idle = false;
>   		}
>   		if (is_idle) {
> -			assert(raft_is_fully_on_disk());
> +			assert(raft_is_fully_on_disk(raft));
>   			fiber_yield();
>   		}
>   		fiber_sleep(0);
> @@ -685,111 +689,111 @@ raft_worker_f(va_list args)
>   }
>   
>   static void
> -raft_sm_pause_and_dump(void)
> +raft_sm_pause_and_dump(struct raft *raft)
>   {
> -	assert(raft.state == RAFT_STATE_FOLLOWER);
> -	if (raft.is_write_in_progress)
> +	assert(raft->state == RAFT_STATE_FOLLOWER);
> +	if (raft->is_write_in_progress)
>   		return;
> -	ev_timer_stop(loop(), &raft.timer);
> -	raft_worker_wakeup();
> -	raft.is_write_in_progress = true;
> +	ev_timer_stop(loop(), &raft->timer);
> +	raft_worker_wakeup(raft);
> +	raft->is_write_in_progress = true;
>   }
>   
>   static void
> -raft_sm_become_leader(void)
> +raft_sm_become_leader(struct raft *raft)
>   {
> -	assert(raft.state != RAFT_STATE_LEADER);
> +	assert(raft->state != RAFT_STATE_LEADER);
>   	say_info("RAFT: enter leader state with quorum %d",
> -		 raft_election_quorum());
> -	assert(raft.leader == 0);
> -	assert(raft.is_candidate);
> -	assert(!raft.is_write_in_progress);
> -	raft.state = RAFT_STATE_LEADER;
> -	raft.leader = instance_id;
> -	ev_timer_stop(loop(), &raft.timer);
> +		 raft_election_quorum(raft));
> +	assert(raft->leader == 0);
> +	assert(raft->is_candidate);
> +	assert(!raft->is_write_in_progress);
> +	raft->state = RAFT_STATE_LEADER;
> +	raft->leader = instance_id;
> +	ev_timer_stop(loop(), &raft->timer);
>   	/* Make read-write (if other subsystems allow that. */
>   	box_update_ro_summary();
>   	/* State is visible and it is changed - broadcast. */
> -	raft_schedule_broadcast();
> +	raft_schedule_broadcast(raft);
>   }
>   
>   static void
> -raft_sm_follow_leader(uint32_t leader)
> +raft_sm_follow_leader(struct raft *raft, uint32_t leader)
>   {
>   	say_info("RAFT: leader is %u, follow", leader);
> -	assert(raft.state != RAFT_STATE_LEADER);
> -	assert(raft.leader == 0);
> -	raft.state = RAFT_STATE_FOLLOWER;
> -	raft.leader = leader;
> -	if (!raft.is_write_in_progress && raft.is_candidate) {
> -		ev_timer_stop(loop(), &raft.timer);
> -		raft_sm_wait_leader_dead();
> +	assert(raft->state != RAFT_STATE_LEADER);
> +	assert(raft->leader == 0);
> +	raft->state = RAFT_STATE_FOLLOWER;
> +	raft->leader = leader;
> +	if (!raft->is_write_in_progress && raft->is_candidate) {
> +		ev_timer_stop(loop(), &raft->timer);
> +		raft_sm_wait_leader_dead(raft);
>   	}
>   	/* State is visible and it is changed - broadcast. */
> -	raft_schedule_broadcast();
> +	raft_schedule_broadcast(raft);
>   }
>   
>   static void
> -raft_sm_become_candidate(void)
> +raft_sm_become_candidate(struct raft *raft)
>   {
>   	say_info("RAFT: enter candidate state with 1 self vote");
> -	assert(raft.state == RAFT_STATE_FOLLOWER);
> -	assert(raft.leader == 0);
> -	assert(raft.vote == instance_id);
> -	assert(raft.is_candidate);
> -	assert(!raft.is_write_in_progress);
> -	assert(raft_election_quorum() > 1);
> -	raft.state = RAFT_STATE_CANDIDATE;
> -	raft.vote_count = 1;
> -	raft.vote_mask = 0;
> -	bit_set(&raft.vote_mask, instance_id);
> -	raft_sm_wait_election_end();
> +	assert(raft->state == RAFT_STATE_FOLLOWER);
> +	assert(raft->leader == 0);
> +	assert(raft->vote == instance_id);
> +	assert(raft->is_candidate);
> +	assert(!raft->is_write_in_progress);
> +	assert(raft_election_quorum(raft) > 1);
> +	raft->state = RAFT_STATE_CANDIDATE;
> +	raft->vote_count = 1;
> +	raft->vote_mask = 0;
> +	bit_set(&raft->vote_mask, instance_id);
> +	raft_sm_wait_election_end(raft);
>   	/* State is visible and it is changed - broadcast. */
> -	raft_schedule_broadcast();
> +	raft_schedule_broadcast(raft);
>   }
>   
>   static void
> -raft_sm_schedule_new_term(uint64_t new_term)
> +raft_sm_schedule_new_term(struct raft *raft, uint64_t new_term)
>   {
>   	say_info("RAFT: bump term to %llu, follow", new_term);
> -	assert(new_term > raft.volatile_term);
> -	assert(raft.volatile_term >= raft.term);
> -	raft.volatile_term = new_term;
> +	assert(new_term > raft->volatile_term);
> +	assert(raft->volatile_term >= raft->term);
> +	raft->volatile_term = new_term;
>   	/* New terms means completely new Raft state. */
> -	raft.volatile_vote = 0;
> -	raft.leader = 0;
> -	raft.state = RAFT_STATE_FOLLOWER;
> +	raft->volatile_vote = 0;
> +	raft->leader = 0;
> +	raft->state = RAFT_STATE_FOLLOWER;
>   	box_update_ro_summary();
> -	raft_sm_pause_and_dump();
> +	raft_sm_pause_and_dump(raft);
>   	/*
>   	 * State is visible and it is changed - broadcast. Term is also visible,
>   	 * but only persistent term. Volatile term is not broadcasted until
>   	 * saved to disk.
>   	 */
> -	raft_schedule_broadcast();
> +	raft_schedule_broadcast(raft);
>   }
>   
>   static void
> -raft_sm_schedule_new_vote(uint32_t new_vote)
> +raft_sm_schedule_new_vote(struct raft *raft, uint32_t new_vote)
>   {
> -	say_info("RAFT: vote for %u, follow", new_vote, raft.volatile_term);
> -	assert(raft.volatile_vote == 0);
> -	assert(raft.leader == 0);
> -	assert(raft.state == RAFT_STATE_FOLLOWER);
> -	raft.volatile_vote = new_vote;
> -	raft_sm_pause_and_dump();
> +	say_info("RAFT: vote for %u, follow", new_vote, raft->volatile_term);
> +	assert(raft->volatile_vote == 0);
> +	assert(raft->leader == 0);
> +	assert(raft->state == RAFT_STATE_FOLLOWER);
> +	raft->volatile_vote = new_vote;
> +	raft_sm_pause_and_dump(raft);
>   	/* Nothing visible is changed - no broadcast. */
>   }
>   
>   static void
> -raft_sm_schedule_new_election(void)
> +raft_sm_schedule_new_election(struct raft *raft)
>   {
>   	say_info("RAFT: begin new election round");
> -	assert(raft_is_fully_on_disk());
> -	assert(raft.is_candidate);
> +	assert(raft_is_fully_on_disk(raft));
> +	assert(raft->is_candidate);
>   	/* Everyone is a follower until its vote for self is persisted. */
> -	raft_sm_schedule_new_term(raft.term + 1);
> -	raft_sm_schedule_new_vote(instance_id);
> +	raft_sm_schedule_new_term(raft, raft->term + 1);
> +	raft_sm_schedule_new_vote(raft, instance_id);
>   	box_update_ro_summary();
>   }
>   
> @@ -797,75 +801,77 @@ static void
>   raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
>   				 int events)
>   {
> -	assert(timer == &raft.timer);
> +	struct raft *raft = timer->data;
> +	assert(timer == &raft->timer);
> +	assert(raft == &box_raft);
>   	(void)events;
>   	ev_timer_stop(loop, timer);
> -	raft_sm_schedule_new_election();
> +	raft_sm_schedule_new_election(raft);
>   }
>   
>   static void
> -raft_sm_wait_leader_dead(void)
> +raft_sm_wait_leader_dead(struct raft *raft)
>   {
> -	assert(!ev_is_active(&raft.timer));
> -	assert(!raft.is_write_in_progress);
> -	assert(raft.is_candidate);
> -	assert(raft.state == RAFT_STATE_FOLLOWER);
> -	assert(raft.leader != 0);
> +	assert(!ev_is_active(&raft->timer));
> +	assert(!raft->is_write_in_progress);
> +	assert(raft->is_candidate);
> +	assert(raft->state == RAFT_STATE_FOLLOWER);
> +	assert(raft->leader != 0);
>   	double death_timeout = replication_disconnect_timeout();
> -	ev_timer_set(&raft.timer, death_timeout, death_timeout);
> -	ev_timer_start(loop(), &raft.timer);
> +	ev_timer_set(&raft->timer, death_timeout, death_timeout);
> +	ev_timer_start(loop(), &raft->timer);
>   }
>   
>   static void
> -raft_sm_wait_leader_found(void)
> +raft_sm_wait_leader_found(struct raft *raft)
>   {
> -	assert(!ev_is_active(&raft.timer));
> -	assert(!raft.is_write_in_progress);
> -	assert(raft.is_candidate);
> -	assert(raft.state == RAFT_STATE_FOLLOWER);
> -	assert(raft.leader == 0);
> +	assert(!ev_is_active(&raft->timer));
> +	assert(!raft->is_write_in_progress);
> +	assert(raft->is_candidate);
> +	assert(raft->state == RAFT_STATE_FOLLOWER);
> +	assert(raft->leader == 0);
>   	double death_timeout = replication_disconnect_timeout();
> -	ev_timer_set(&raft.timer, death_timeout, death_timeout);
> -	ev_timer_start(loop(), &raft.timer);
> +	ev_timer_set(&raft->timer, death_timeout, death_timeout);
> +	ev_timer_start(loop(), &raft->timer);
>   }
>   
>   static void
> -raft_sm_wait_election_end(void)
> +raft_sm_wait_election_end(struct raft *raft)
>   {
> -	assert(!ev_is_active(&raft.timer));
> -	assert(!raft.is_write_in_progress);
> -	assert(raft.is_candidate);
> -	assert(raft.state == RAFT_STATE_FOLLOWER ||
> -	       (raft.state == RAFT_STATE_CANDIDATE &&
> -		raft.volatile_vote == instance_id));
> -	assert(raft.leader == 0);
> -	double election_timeout = raft.election_timeout +
> -				  raft_new_random_election_shift();
> -	ev_timer_set(&raft.timer, election_timeout, election_timeout);
> -	ev_timer_start(loop(), &raft.timer);
> +	assert(!ev_is_active(&raft->timer));
> +	assert(!raft->is_write_in_progress);
> +	assert(raft->is_candidate);
> +	assert(raft->state == RAFT_STATE_FOLLOWER ||
> +	       (raft->state == RAFT_STATE_CANDIDATE &&
> +		raft->volatile_vote == instance_id));
> +	assert(raft->leader == 0);
> +	double election_timeout = raft->election_timeout +
> +				  raft_new_random_election_shift(raft);
> +	ev_timer_set(&raft->timer, election_timeout, election_timeout);
> +	ev_timer_start(loop(), &raft->timer);
>   }
>   
>   static void
> -raft_sm_start(void)
> +raft_sm_start(struct raft *raft)
>   {
>   	say_info("RAFT: start state machine");
> -	assert(!ev_is_active(&raft.timer));
> -	assert(!raft.is_enabled);
> -	assert(raft.state == RAFT_STATE_FOLLOWER);
> -	raft.is_enabled = true;
> -	raft.is_candidate = raft.is_cfg_candidate;
> -	if (raft.is_write_in_progress) {
> +	assert(!ev_is_active(&raft->timer));
> +	assert(!raft->is_enabled);
> +	assert(raft->state == RAFT_STATE_FOLLOWER);
> +	raft->is_enabled = true;
> +	raft->is_candidate = raft->is_cfg_candidate;
> +	if (raft->is_write_in_progress) {
>   		/*
>   		 * Nop. If write is in progress, the state machine is frozen. It
>   		 * is continued when write ends.
>   		 */
> -	} else if (!raft.is_candidate) {
> +	} else if (!raft->is_candidate) {
>   		/*
>   		 * Nop. When a node is not a candidate, it can't initiate
>   		 * elections anyway, so it does not need to monitor the leader.
>   		 */
> -	} else if (raft.leader != 0) {
> -		raft_sm_wait_leader_dead();
> +	} else if (raft->leader != 0) {
> +		raft_sm_wait_leader_dead(raft);
>   	} else {
>   		/*
>   		 * Don't start new election. The situation is most likely
> @@ -874,39 +880,40 @@ raft_sm_start(void)
>   		 * disturb the current leader. Give it time to notify this node
>   		 * that there is a leader.
>   		 */
> -		raft_sm_wait_leader_found();
> +		raft_sm_wait_leader_found(raft);
>   	}
>   	box_update_ro_summary();
>   }
>   
>   static void
> -raft_sm_stop(void)
> +raft_sm_stop(struct raft *raft)
>   {
>   	say_info("RAFT: stop state machine");
> -	assert(raft.is_enabled);
> -	raft.is_enabled = false;
> -	raft.is_candidate = false;
> -	if (raft.state == RAFT_STATE_LEADER)
> -		raft.leader = 0;
> -	raft.state = RAFT_STATE_FOLLOWER;
> -	ev_timer_stop(loop(), &raft.timer);
> +	assert(raft->is_enabled);
> +	raft->is_enabled = false;
> +	raft->is_candidate = false;
> +	if (raft->state == RAFT_STATE_LEADER)
> +		raft->leader = 0;
> +	raft->state = RAFT_STATE_FOLLOWER;
> +	ev_timer_stop(loop(), &raft->timer);
>   	box_update_ro_summary();
>   	/* State is visible and changed - broadcast. */
> -	raft_schedule_broadcast();
> +	raft_schedule_broadcast(raft);
>   }
>   
>   void
> -raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)
> +raft_serialize_for_network(const struct raft *raft, struct raft_request *req,
> +			   struct vclock *vclock)
>   {
> -	raft_validate();
> +	raft_validate(raft);
>   	memset(req, 0, sizeof(*req));
>   	/*
>   	 * Volatile state is never used for any communications.
>   	 * Use only persisted state.
>   	 */
> -	req->term = raft.term;
> -	req->vote = raft.vote;
> -	req->state = raft.state;
> +	req->term = raft->term;
> +	req->vote = raft->vote;
> +	req->state = raft->state;
>   	/*
>   	 * Raft does not own vclock, so it always expects it passed externally.
>   	 * Vclock is sent out only by candidate instances.
> @@ -918,134 +925,134 @@ raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)
>   }
>   
>   void
> -raft_serialize_for_disk(struct raft_request *req)
> +raft_serialize_for_disk(const struct raft *raft, struct raft_request *req)
>   {
> -	raft_validate();
> +	raft_validate(raft);
>   	memset(req, 0, sizeof(*req));
> -	req->term = raft.term;
> -	req->vote = raft.vote;
> +	req->term = raft->term;
> +	req->vote = raft->vote;
>   }
>   
>   void
> -raft_on_update(struct trigger *trigger)
> +raft_on_update(struct raft *raft, struct trigger *trigger)
>   {
> -	raft_validate();
> -	trigger_add(&raft.on_update, trigger);
> +	raft_validate(raft);
> +	trigger_add(&raft->on_update, trigger);
>   }
>   
>   void
> -raft_cfg_is_enabled(bool is_enabled)
> +raft_cfg_is_enabled(struct raft *raft, bool is_enabled)
>   {
> -	raft_validate();
> -	if (is_enabled == raft.is_enabled)
> +	raft_validate(raft);
> +	if (is_enabled == raft->is_enabled)
>   		return;
>   
>   	if (!is_enabled)
> -		raft_sm_stop();
> +		raft_sm_stop(raft);
>   	else
> -		raft_sm_start();
> +		raft_sm_start(raft);
>   }
>   
>   void
> -raft_cfg_is_candidate(bool is_candidate)
> +raft_cfg_is_candidate(struct raft *raft, bool is_candidate)
>   {
> -	raft_validate();
> -	bool old_is_candidate = raft.is_candidate;
> -	raft.is_cfg_candidate = is_candidate;
> -	raft.is_candidate = is_candidate && raft.is_enabled;
> -	if (raft.is_candidate == old_is_candidate)
> +	raft_validate(raft);
> +	bool old_is_candidate = raft->is_candidate;
> +	raft->is_cfg_candidate = is_candidate;
> +	raft->is_candidate = is_candidate && raft->is_enabled;
> +	if (raft->is_candidate == old_is_candidate)
>   		return;
>   
> -	if (raft.is_candidate) {
> -		assert(raft.state == RAFT_STATE_FOLLOWER);
> -		if (raft.is_write_in_progress) {
> +	if (raft->is_candidate) {
> +		assert(raft->state == RAFT_STATE_FOLLOWER);
> +		if (raft->is_write_in_progress) {
>   			/*
>   			 * If there is an on-going WAL write, it means there was
>   			 * some node who sent newer data to this node. So it is
>   			 * probably a better candidate. Anyway can't do anything
>   			 * until the new state is fully persisted.
>   			 */
> -		} else if (raft.leader != 0) {
> -			raft_sm_wait_leader_dead();
> +		} else if (raft->leader != 0) {
> +			raft_sm_wait_leader_dead(raft);
>   		} else {
> -			raft_sm_wait_leader_found();
> +			raft_sm_wait_leader_found(raft);
>   		}
>   	} else {
> -		if (raft.state != RAFT_STATE_LEADER) {
> +		if (raft->state != RAFT_STATE_LEADER) {
>   			/* Do not wait for anything while being a voter. */
> -			ev_timer_stop(loop(), &raft.timer);
> +			ev_timer_stop(loop(), &raft->timer);
>   		}
> -		if (raft.state != RAFT_STATE_FOLLOWER) {
> -			if (raft.state == RAFT_STATE_LEADER)
> -				raft.leader = 0;
> -			raft.state = RAFT_STATE_FOLLOWER;
> +		if (raft->state != RAFT_STATE_FOLLOWER) {
> +			if (raft->state == RAFT_STATE_LEADER)
> +				raft->leader = 0;
> +			raft->state = RAFT_STATE_FOLLOWER;
>   			/* State is visible and changed - broadcast. */
> -			raft_schedule_broadcast();
> +			raft_schedule_broadcast(raft);
>   		}
>   	}
>   	box_update_ro_summary();
>   }
>   
>   void
> -raft_cfg_election_timeout(double timeout)
> +raft_cfg_election_timeout(struct raft *raft, double timeout)
>   {
> -	raft_validate();
> -	if (timeout == raft.election_timeout)
> +	raft_validate(raft);
> +	if (timeout == raft->election_timeout)
>   		return;
>   
> -	raft.election_timeout = timeout;
> -	if (raft.vote != 0 && raft.leader == 0 && raft.is_candidate) {
> -		assert(ev_is_active(&raft.timer));
> -		double timeout = ev_timer_remaining(loop(), &raft.timer) -
> -				 raft.timer.at + raft.election_timeout;
> -		ev_timer_stop(loop(), &raft.timer);
> -		ev_timer_set(&raft.timer, timeout, timeout);
> -		ev_timer_start(loop(), &raft.timer);
> +	raft->election_timeout = timeout;
> +	if (raft->vote != 0 && raft->leader == 0 && raft->is_candidate) {
> +		assert(ev_is_active(&raft->timer));
> +		double timeout = ev_timer_remaining(loop(), &raft->timer) -
> +				 raft->timer.at + raft->election_timeout;
> +		ev_timer_stop(loop(), &raft->timer);
> +		ev_timer_set(&raft->timer, timeout, timeout);
> +		ev_timer_start(loop(), &raft->timer);
>   	}
>   }
>   
>   void
> -raft_cfg_election_quorum(void)
> +raft_cfg_election_quorum(struct raft *raft)
>   {
> -	raft_validate();
> -	if (raft.state != RAFT_STATE_CANDIDATE ||
> -	    raft.state == RAFT_STATE_LEADER)
> +	raft_validate(raft);
> +	if (raft->state != RAFT_STATE_CANDIDATE ||
> +	    raft->state == RAFT_STATE_LEADER)
>   		return;
> -	if (raft.vote_count < raft_election_quorum())
> +	if (raft->vote_count < raft_election_quorum(raft))
>   		return;
> -	raft_sm_become_leader();
> +	raft_sm_become_leader(raft);
>   }
>   
>   void
> -raft_cfg_death_timeout(void)
> +raft_cfg_death_timeout(struct raft *raft)
>   {
> -	raft_validate();
> -	if (raft.state == RAFT_STATE_FOLLOWER && raft.is_candidate &&
> -	    raft.leader != 0) {
> -		assert(ev_is_active(&raft.timer));
> +	raft_validate(raft);
> +	if (raft->state == RAFT_STATE_FOLLOWER && raft->is_candidate &&
> +	    raft->leader != 0) {
> +		assert(ev_is_active(&raft->timer));
>   		double death_timeout = replication_disconnect_timeout();
> -		double timeout = ev_timer_remaining(loop(), &raft.timer) -
> -				 raft.timer.at + death_timeout;
> -		ev_timer_stop(loop(), &raft.timer);
> -		ev_timer_set(&raft.timer, timeout, timeout);
> -		ev_timer_start(loop(), &raft.timer);
> +		double timeout = ev_timer_remaining(loop(), &raft->timer) -
> +				 raft->timer.at + death_timeout;
> +		ev_timer_stop(loop(), &raft->timer);
> +		ev_timer_set(&raft->timer, timeout, timeout);
> +		ev_timer_start(loop(), &raft->timer);
>   	}
>   }
>   
>   void
> -raft_new_term(void)
> +raft_new_term(struct raft *raft)
>   {
> -	raft_validate();
> -	if (raft.is_enabled)
> -		raft_sm_schedule_new_term(raft.volatile_term + 1);
> +	raft_validate(raft);
> +	if (raft->is_enabled)
> +		raft_sm_schedule_new_term(raft, raft->volatile_term + 1);
>   }
>   
>   static void
> -raft_worker_wakeup(void)
> +raft_worker_wakeup(struct raft *raft)
>   {
> -	if (raft.worker == NULL) {
> -		raft.worker = fiber_new("raft_worker", raft_worker_f);
> -		if (raft.worker == NULL) {
> +	if (raft->worker == NULL) {
> +		raft->worker = fiber_new("raft_worker", raft_worker_f);
> +		if (raft->worker == NULL) {
>   			/*
>   			 * XXX: should be handled properly, no need to panic.
>   			 * The issue though is that most of the Raft state
> @@ -1060,32 +1067,64 @@ raft_worker_wakeup(void)
>   			panic("Could't create Raft worker fiber");
>   			return;
>   		}
> -		fiber_set_joinable(raft.worker, true);
> +		raft->worker->arg = raft;
> +		fiber_set_joinable(raft->worker, true);
>   	}
>   	/*
>   	 * Don't wake the fiber if it writes something. Otherwise it would be a
>   	 * spurious wakeup breaking the WAL write not adapted to this. Also
>   	 * don't wakeup the current fiber - it leads to undefined behaviour.
>   	 */
> -	if (!raft.is_write_in_progress && fiber() != raft.worker)
> -		fiber_wakeup(raft.worker);
> +	if (!raft->is_write_in_progress && fiber() != raft->worker)
> +		fiber_wakeup(raft->worker);
>   }
>   
>   static void
> -raft_schedule_broadcast(void)
> +raft_schedule_broadcast(struct raft *raft)
>   {
> -	raft.is_broadcast_scheduled = true;
> -	raft_worker_wakeup();
> +	raft->is_broadcast_scheduled = true;
> +	raft_worker_wakeup(raft);
>   }
>   
>   void
> -raft_init(void)
> +raft_create(struct raft *raft)
>   {
> -	memset(&raft, 0, sizeof(raft));
> -	raft.state = RAFT_STATE_FOLLOWER;
> -	raft.volatile_term = 1;
> -	raft.term = 1;
> -	raft.election_timeout = 5;
> -	ev_timer_init(&raft.timer, raft_sm_schedule_new_election_cb, 0, 0);
> -	rlist_create(&raft.on_update);
> +	memset(raft, 0, sizeof(*raft));
> +	raft->state = RAFT_STATE_FOLLOWER;
> +	raft->volatile_term = 1;
> +	raft->term = 1;
> +	raft->election_timeout = 5;
> +	ev_timer_init(&raft->timer, raft_sm_schedule_new_election_cb, 0, 0);
> +	raft->timer.data = raft;
> +	rlist_create(&raft->on_update);
> +}
> +
> +void
> +raft_destroy(struct raft *raft)
> +{
> +	trigger_destroy(&raft->on_update);
> +	if (raft->worker != NULL) {
> +		raft_worker_wakeup(raft);
> +		fiber_cancel(raft->worker);
> +		fiber_join(raft->worker);
> +		raft->worker = NULL;
> +	}
> +	/* Invalidate so as any usage attempt would fail in raft_validate(). */
> +	raft->state = 0;
> +}
> +
> +void
> +box_raft_init(void)
> +{
> +	raft_create(&box_raft);
> +}
> +
> +void
> +box_raft_free(void)
> +{
> +	trigger_destroy(&box_raft.on_update);
> +	/*
> +	 * Can't join the fiber, because the event loop is stopped already, and
> +	 * yields are not allowed.
> +	 */
>   }
> diff --git a/src/box/raft.h b/src/box/raft.h
> index 0c60eccdf..860062857 100644
> --- a/src/box/raft.h
> +++ b/src/box/raft.h
> @@ -163,16 +163,16 @@ struct raft {
>   	struct rlist on_update;
>   };
>   
> -extern struct raft raft;
> +extern struct raft box_raft;
>   
>   /**
>    * Ensure the raft node can be used. I.e. that it is properly initialized.
>    * Entirely for debug purposes.
>    */
>   static inline void
> -raft_validate(void)
> +raft_validate(const struct raft *raft)
>   {
> -	assert(raft.state != 0);
> +	assert(raft->state != 0);
>   }
>   
>   /**
> @@ -181,61 +181,58 @@ raft_validate(void)
>    * affected by box.cfg.read_only, connection quorum.
>    */
>   static inline bool
> -raft_is_ro(void)
> +raft_is_ro(const struct raft *raft)
>   {
> -	raft_validate();
> -	return raft.is_enabled && raft.state != RAFT_STATE_LEADER;
> +	raft_validate(raft);
> +	return raft->is_enabled && raft->state != RAFT_STATE_LEADER;
>   }
>   
>   /** See if the instance can accept rows from an instance with the given ID. */
>   static inline bool
> -raft_is_source_allowed(uint32_t source_id)
> +raft_is_source_allowed(const struct raft *raft, uint32_t source_id)
>   {
> -	raft_validate();
> -	return !raft.is_enabled || raft.leader == source_id;
> +	raft_validate(raft);
> +	return !raft->is_enabled || raft->leader == source_id;
>   }
>   
>   /** Check if Raft is enabled. */
>   static inline bool
> -raft_is_enabled(void)
> +raft_is_enabled(const struct raft *raft)
>   {
> -	raft_validate();
> -	return raft.is_enabled;
> +	raft_validate(raft);
> +	return raft->is_enabled;
>   }
>   
>   /** Process a raft entry stored in WAL/snapshot. */
>   void
> -raft_process_recovery(const struct raft_request *req);
> +raft_process_recovery(struct raft *raft, const struct raft_request *req);
>   
> -/**
> - * Process a raft status message coming from the network.
> - * @param req Raft request.
> - * @param source Instance ID of the message sender.
> - */
> +/** Process a raft status message coming from the network. */
>   int
> -raft_process_msg(const struct raft_request *req, uint32_t source);
> +raft_process_msg(struct raft *raft, const struct raft_request *req,
> +		 uint32_t source);
>   
>   /**
>    * Process a heartbeat message from an instance with the given ID. It is used to
>    * watch leader's health and start election when necessary.
>    */
>   void
> -raft_process_heartbeat(uint32_t source);
> +raft_process_heartbeat(struct raft *raft, uint32_t source);
>   
>   /** Configure whether Raft is enabled. */
>   void
> -raft_cfg_is_enabled(bool is_enabled);
> +raft_cfg_is_enabled(struct raft *raft, bool is_enabled);
>   
>   /**
>    * Configure whether the instance can be elected as Raft leader. Even if false,
>    * the node still can vote, when Raft is enabled.
>    */
>   void
> -raft_cfg_is_candidate(bool is_candidate);
> +raft_cfg_is_candidate(struct raft *raft, bool is_candidate);
>   
>   /** Configure Raft leader election timeout. */
>   void
> -raft_cfg_election_timeout(double timeout);
> +raft_cfg_election_timeout(struct raft *raft, double timeout);
>   
>   /**
>    * Configure Raft leader election quorum. There is no a separate option.
> @@ -243,7 +240,7 @@ raft_cfg_election_timeout(double timeout);
>    * with synchronous replication.
>    */
>   void
> -raft_cfg_election_quorum(void);
> +raft_cfg_election_quorum(struct raft *raft);
>   
>   /**
>    * Configure Raft leader death timeout. I.e. number of seconds without
> @@ -251,7 +248,7 @@ raft_cfg_election_quorum(void);
>    * option. Raft uses replication timeout for that.
>    */
>   void
> -raft_cfg_death_timeout(void);
> +raft_cfg_death_timeout(struct raft *raft);
>   
>   /**
>    * Bump the term. When it is persisted, the node checks if there is a leader,
> @@ -259,32 +256,41 @@ raft_cfg_death_timeout(void);
>    * be used as tool to forcefully start new election, or restart an existing.
>    */
>   void
> -raft_new_term(void);
> +raft_new_term(struct raft *raft);
>   
>   /**
>    * Save complete Raft state into a request to be sent to other instances of the
>    * cluster. It is allowed to save anything here, not only persistent state.
>    */
>   void
> -raft_serialize_for_network(struct raft_request *req, struct vclock *vclock);
> +raft_serialize_for_network(const struct raft *raft, struct raft_request *req,
> +			   struct vclock *vclock);
>   
>   /**
>    * Save complete Raft state into a request to be persisted on disk. Only term
>    * and vote are being persisted.
>    */
>   void
> -raft_serialize_for_disk(struct raft_request *req);
> +raft_serialize_for_disk(const struct raft *raft, struct raft_request *req);
>   
>   /**
>    * Add a trigger invoked each time any of the Raft node visible attributes are
>    * changed.
>    */
>   void
> -raft_on_update(struct trigger *trigger);
> +raft_on_update(struct raft *raft, struct trigger *trigger);
> +
> +void
> +raft_create(struct raft *raft);
> +
> +void
> +raft_destroy(struct raft *raft);
> +
> +void
> +box_raft_init(void);
>   
> -/** Initialize Raft global data structures. */
>   void
> -raft_init(void);
> +box_raft_free(void);
>   
>   #if defined(__cplusplus)
>   }

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list