[Tarantool-patches] [PATCH v4 07/12] raft: filter rows based on known peer terms

Serge Petrenko sergepetrenko at tarantool.org
Sun Apr 18 11:49:55 MSK 2021



17.04.2021 01:21, Vladislav Shpilevoy пишет:
> I appreciate the work you did here!
>
>> diff --git a/src/lib/raft/raft.h b/src/lib/raft/raft.h
>> index e447f6634..a5f7e08d9 100644
>> --- a/src/lib/raft/raft.h
>> +++ b/src/lib/raft/raft.h
>> @@ -207,6 +207,19 @@ struct raft {
>>   	 * subsystems, such as Raft.
>>   	 */
>>   	const struct vclock *vclock;
>> +	/**
>> +	 * The biggest term seen by this instance and persisted in WAL as part
>> +	 * of a PROMOTE request. May be smaller than @a term, while there are
>> +	 * ongoing elections, or the leader is already known, but this instance
>> +	 * hasn't read its PROMOTE request yet.
>> +	 * During other times must be equal to @a term.
>> +	 */
>> +	uint64_t greatest_term;
>> +	/**
>> +	 * Latest terms received with PROMOTE entries from remote instances.
>> +	 * Raft uses them to determine data from which sources may be applied.
>> +	 */
>> +	struct vclock term_map;
> I am sorry for not noticing this first time, but I realized the
> names are still not perfect - they give an impression the terms are
> collected on any term bump. But they are only for promotions. So
> they should probably be greatest_promote_term, and promote_term_map.
>
> Another issue I see after that rename - they depend on something not
> related to raft. Raft does write PROMOTEs. You can see that these
> 2 members are not used in raft code at all. Only in the limbo and
> box. On the other hand, they don't remove terms dependency from the
> limbo, because they are part of PROMOTE, which is part of the limbo.
>
> That means, we introduced an explicit dependency on raft in the
> limbo just to store some numbers in struct raft.
>
> Maybe move these 2 members to the limbo? They have nothing to do with
> the leader election as we can see, and our lib/raft is only about that.
>
> They are for filtering once the leader is elected already, which is
> synchronous replication's job, and which in turn is the limbo.
>
> This also makes us closer to the idea I mentioned about lsn map
> and promote term map merged into something new inside of the limbo.
>
> I tried to deal with that idea myself, and it resulted into a commit
> I pushed on top of your branch, and pasted below.
>
> I made so the limbo does not depend on raft anymore (on its API). It
> only uses term numbers. Box is the link between raft and limbo - it
> passes the raft terms to the new promote entries in box.ctl.promote().
>
> If you agree, please, squash. Otherwise lets discuss. I didn't delete
> the unit test about this new map yet, only commented it out. You would
> need to drop it if squash.

I see what you mean. It was hard to see that these methods belong to
txn_limbo, at first. But I agree with you now.

Your version looks good. Thanks for the help! Squashed.

> ====================
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 61d53fdec..b0e8fbba7 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -967,6 +967,59 @@ apply_final_join_tx(struct stailq *rows)
>   	return rc;
>   }
>   
> +/*
> + * When elections are enabled we must filter out synchronous rows coming
> + * from an instance that fell behind the current leader. This includes
> + * both synchronous tx rows and rows for txs following unconfirmed
> + * synchronous transactions.
> + * The rows are replaced with NOPs to preserve the vclock consistency.
> + */
> +static void
> +applier_synchro_filter_tx(struct applier *applier, struct stailq *rows)
> +{
> +	/*
> +	 * XXX: in case raft is disabled, synchronous replication still works
> +	 * but without any filtering. That might lead to issues with
> +	 * unpredictable confirms after rollbacks which are supposed to be
> +	 * fixed by the filtering.
> +	 */
> +	if (!raft_is_enabled(box_raft()))
> +		return;
> +	if (!txn_limbo_is_replica_outdated(&txn_limbo, applier->instance_id))
> +		return;
> +
> +	struct xrow_header *row;
> +	row = &stailq_last_entry(rows, struct applier_tx_row, next)->row;
> +	if (row->wait_sync)
> +		goto nopify;
> +
> +	row = &stailq_first_entry(rows, struct applier_tx_row, next)->row;
> +	/*
> +	 * Not waiting for sync and not a synchro request - this make it already
> +	 * NOP or an asynchronous transaction not depending on any synchronous
> +	 * ones - let it go as is.
> +	 */
> +	if (!iproto_type_is_synchro_request(row->type))
> +		return;
> +	/*
> +	 * Do not NOPify promotion, otherwise won't even know who is the limbo
> +	 * owner now.
> +	 */
> +	if (iproto_type_is_promote_request(row->type))
> +		return;
> +nopify:;
> +	struct applier_tx_row *item;
> +	stailq_foreach_entry(item, rows, next) {
> +		row = &item->row;
> +		row->type = IPROTO_NOP;
> +		/*
> +		 * Row body is saved to fiber's region and will be freed
> +		 * on next fiber_gc() call.
> +		 */
> +		row->bodycnt = 0;
> +	}
> +}
> +
>   /**
>    * Apply all rows in the rows queue as a single transaction.
>    *
> @@ -1026,29 +1079,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>   			}
>   		}
>   	}
> -
> -	/*
> -	 * When elections are enabled we must filter out synchronous rows coming
> -	 * from an instance that fell behind the current leader. This includes
> -	 * both synchronous tx rows and rows for txs following unconfirmed
> -	 * synchronous transactions.
> -	 * The rows are replaced with NOPs to preserve the vclock consistency.
> -	 */
> -	struct applier_tx_row *item;
> -	if (raft_is_node_outdated(box_raft(), applier->instance_id) &&
> -	    (last_row->wait_sync ||
> -	     (iproto_type_is_synchro_request(first_row->type) &&
> -	     !iproto_type_is_promote_request(first_row->type)))) {
> -		stailq_foreach_entry(item, rows, next) {
> -			struct xrow_header *row = &item->row;
> -			row->type = IPROTO_NOP;
> -			/*
> -			 * Row body is saved to fiber's region and will be freed
> -			 * on next fiber_gc() call.
> -			 */
> -			row->bodycnt = 0;
> -		}
> -	}
> +	applier_synchro_filter_tx(applier, rows);
>   	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
>   		/*
>   		 * Synchro messages are not transactions, in terms
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 70cb2bd53..cc68f0168 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1516,10 +1516,12 @@ box_promote(void)
>   
>   	/*
>   	 * Do nothing when box isn't configured and when PROMOTE was already
> -	 * written for this term.
> +	 * written for this term (synchronous replication and leader election
> +	 * are in sync, and both chose this node as a leader).
>   	 */
> -	if (!is_box_configured ||
> -	    raft_node_term(box_raft(), instance_id) == box_raft()->term)
> +	if (!is_box_configured)
> +		return 0;
> +	if (txn_limbo_replica_term(&txn_limbo, instance_id) == box_raft()->term)
>   		return 0;
>   
>   	bool run_elections = false;
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index 0726b5a04..bafb47aaa 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -34,7 +34,6 @@
>   #include "iproto_constants.h"
>   #include "journal.h"
>   #include "box.h"
> -#include "raft.h"
>   
>   struct txn_limbo txn_limbo;
>   
> @@ -46,6 +45,8 @@ txn_limbo_create(struct txn_limbo *limbo)
>   	limbo->owner_id = REPLICA_ID_NIL;
>   	fiber_cond_create(&limbo->wait_cond);
>   	vclock_create(&limbo->vclock);
> +	vclock_create(&limbo->promote_term_map);
> +	limbo->promote_greatest_term = 0;
>   	limbo->confirmed_lsn = 0;
>   	limbo->rollback_count = 0;
>   	limbo->is_in_rollback = false;
> @@ -644,8 +645,13 @@ complete:
>   void
>   txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
>   {
> -	/* It's ok to process an empty term. It'll just get ignored. */
> -	raft_process_term(box_raft(), req->origin_id, req->term);
> +	uint64_t term = req->term;
> +	uint32_t origin = req->origin_id;
> +	if (txn_limbo_replica_term(limbo, origin) < term) {
> +		vclock_follow(&limbo->promote_term_map, origin, term);
> +		if (term > limbo->promote_greatest_term)
> +			limbo->promote_greatest_term = term;
> +	}
>   	if (req->replica_id != limbo->owner_id) {
>   		/*
>   		 * Ignore CONFIRM/ROLLBACK messages for a foreign master.
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index f35771dc9..e409ac657 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -129,6 +129,24 @@ struct txn_limbo {
>   	 * transactions, created on the limbo's owner node.
>   	 */
>   	struct vclock vclock;
> +	/**
> +	 * Latest terms received with PROMOTE entries from remote instances.
> +	 * Limbo uses them to filter out the transactions coming not from the
> +	 * limbo owner, but so outdated that they are rolled back everywhere
> +	 * except outdated nodes.
> +	 */
> +	struct vclock promote_term_map;
> +	/**
> +	 * The biggest PROMOTE term seen by the instance and persisted in WAL.
> +	 * It is related to raft term, but not the same. Synchronous replication
> +	 * represented by the limbo is interested only in the won elections
> +	 * ended with PROMOTE request.
> +	 * It means the limbo's term might be smaller than the raft term, while
> +	 * there are ongoing elections, or the leader is already known and this
> +	 * instance hasn't read its PROMOTE request yet. During other times the
> +	 * limbo and raft are in sync and the terms are the same.
> +	 */
> +	uint64_t promote_greatest_term;
>   	/**
>   	 * Maximal LSN gathered quorum and either already confirmed in WAL, or
>   	 * whose confirmation is in progress right now. Any attempt to confirm
> @@ -193,6 +211,28 @@ txn_limbo_last_entry(struct txn_limbo *limbo)
>   				in_queue);
>   }
>   
> +/**
> + * Return the latest term as seen in PROMOTE requests from instance with id
> + * @a replica_id.
> + */
> +static inline uint64_t
> +txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id)
> +{
> +	return vclock_get(&limbo->promote_term_map, replica_id);
> +}
> +
> +/**
> + * Check whether replica with id @a source_id is too old to apply synchronous
> + * data from it. The check is only valid when elections are enabled.
> + */
> +static inline bool
> +txn_limbo_is_replica_outdated(const struct txn_limbo *limbo,
> +			      uint32_t replica_id)
> +{
> +	return txn_limbo_replica_term(limbo, replica_id) <
> +	       limbo->promote_greatest_term;
> +}
> +
>   /**
>    * Return the last synchronous transaction in the limbo or NULL when it is
>    * empty.
> diff --git a/src/lib/raft/raft.c b/src/lib/raft/raft.c
> index b21693642..874e9157e 100644
> --- a/src/lib/raft/raft.c
> +++ b/src/lib/raft/raft.c
> @@ -1012,7 +1012,6 @@ raft_create(struct raft *raft, const struct raft_vtab *vtab)
>   		.death_timeout = 5,
>   		.vtab = vtab,
>   	};
> -	vclock_create(&raft->term_map);
>   	raft_ev_timer_init(&raft->timer, raft_sm_schedule_new_election_cb,
>   			   0, 0);
>   	raft->timer.data = raft;
> diff --git a/src/lib/raft/raft.h b/src/lib/raft/raft.h
> index 69dec63c6..f7bc205d2 100644
> --- a/src/lib/raft/raft.h
> +++ b/src/lib/raft/raft.h
> @@ -207,19 +207,6 @@ struct raft {
>   	 * subsystems, such as Raft.
>   	 */
>   	const struct vclock *vclock;
> -	/**
> -	 * The biggest term seen by this instance and persisted in WAL as part
> -	 * of a PROMOTE request. May be smaller than @a term, while there are
> -	 * ongoing elections, or the leader is already known, but this instance
> -	 * hasn't read its PROMOTE request yet.
> -	 * During other times must be equal to @a term.
> -	 */
> -	uint64_t greatest_term;
> -	/**
> -	 * Latest terms received with PROMOTE entries from remote instances.
> -	 * Raft uses them to determine data from which sources may be applied.
> -	 */
> -	struct vclock term_map;
>   	/** State machine timed event trigger. */
>   	struct ev_timer timer;
>   	/** Configured election timeout in seconds. */
> @@ -256,39 +243,6 @@ raft_is_source_allowed(const struct raft *raft, uint32_t source_id)
>   	return !raft->is_enabled || raft->leader == source_id;
>   }
>   
> -/**
> - * Return the latest term as seen in PROMOTE requests from instance with id
> - * @a source_id.
> - */
> -static inline uint64_t
> -raft_node_term(const struct raft *raft, uint32_t source_id)
> -{
> -	assert(source_id < VCLOCK_MAX);
> -	return vclock_get(&raft->term_map, source_id);
> -}
> -
> -/**
> - * Check whether replica with id @a source_id is too old to apply synchronous
> - * data from it. The check is only valid when elections are enabled.
> - */
> -static inline bool
> -raft_is_node_outdated(const struct raft *raft, uint32_t source_id)
> -{
> -	uint64_t source_term = raft_node_term(raft, source_id);
> -	return raft->is_enabled && source_term < raft->greatest_term;
> -}
> -
> -/** Remember the last term seen for replica  with id @a source_id. */
> -static inline void
> -raft_process_term(struct raft *raft, uint32_t source_id, uint64_t term)
> -{
> -	if (raft_node_term(raft, source_id) >= term)
> -		return;
> -	vclock_follow(&raft->term_map, source_id, term);
> -	if (term > raft->greatest_term)
> -		raft->greatest_term = term;
> -}
> -
>   /** Check if Raft is enabled. */
>   static inline bool
>   raft_is_enabled(const struct raft *raft)
> diff --git a/test/unit/raft.c b/test/unit/raft.c
> index 575886932..4214dbc4c 100644
> --- a/test/unit/raft.c
> +++ b/test/unit/raft.c
> @@ -1267,38 +1267,38 @@ raft_test_too_long_wal_write(void)
>   	raft_finish_test();
>   }
>   
> -static void
> -raft_test_term_filter(void)
> -{
> -	raft_start_test(9);
> -	struct raft_node node;
> -	raft_node_create(&node);
> -
> -	is(raft_node_term(&node.raft, 1), 0, "empty node term");
> -	ok(!raft_is_node_outdated(&node.raft, 1), "not outdated initially");
> -
> -	raft_process_term(&node.raft, 1, 1);
> -	is(raft_node_term(&node.raft, 1), 1, "node term updated");
> -	ok(raft_is_node_outdated(&node.raft, 2), "other nodes are outdated");
> -
> -	raft_process_term(&node.raft, 2, 100);
> -	ok(raft_is_node_outdated(&node.raft, 1), "node outdated when others "
> -						 "have greater term");
> -	ok(!raft_is_node_outdated(&node.raft, 2), "node with greatest term "
> -						 "isn't outdated");
> -
> -	raft_process_term(&node.raft, 3, 100);
> -	ok(!raft_is_node_outdated(&node.raft, 2), "node not outdated when "
> -						 "others have the same term");
> -
> -	raft_process_term(&node.raft, 3, 99);
> -	is(raft_node_term(&node.raft, 3), 100, "node term isn't decreased");
> -	ok(!raft_is_node_outdated(&node.raft, 3), "node doesn't become "
> -						  "outdated");
> -
> -	raft_node_destroy(&node);
> -	raft_finish_test();
> -}
> +// static void
> +// raft_test_term_filter(void)
> +// {
> +// 	raft_start_test(9);
> +// 	struct raft_node node;
> +// 	raft_node_create(&node);
> +
> +// 	is(raft_node_term(&node.raft, 1), 0, "empty node term");
> +// 	ok(!raft_is_node_outdated(&node.raft, 1), "not outdated initially");
> +
> +// 	raft_process_term(&node.raft, 1, 1);
> +// 	is(raft_node_term(&node.raft, 1), 1, "node term updated");
> +// 	ok(raft_is_node_outdated(&node.raft, 2), "other nodes are outdated");
> +
> +// 	raft_process_term(&node.raft, 2, 100);
> +// 	ok(raft_is_node_outdated(&node.raft, 1), "node outdated when others "
> +// 						 "have greater term");
> +// 	ok(!raft_is_node_outdated(&node.raft, 2), "node with greatest term "
> +// 						 "isn't outdated");
> +
> +// 	raft_process_term(&node.raft, 3, 100);
> +// 	ok(!raft_is_node_outdated(&node.raft, 2), "node not outdated when "
> +// 						 "others have the same term");
> +
> +// 	raft_process_term(&node.raft, 3, 99);
> +// 	is(raft_node_term(&node.raft, 3), 100, "node term isn't decreased");
> +// 	ok(!raft_is_node_outdated(&node.raft, 3), "node doesn't become "
> +// 						  "outdated");
> +
> +// 	raft_node_destroy(&node);
> +// 	raft_finish_test();
> +// }
>   
>   static void
>   raft_test_start_stop_candidate(void)
> @@ -1332,7 +1332,7 @@ raft_test_start_stop_candidate(void)
>   static int
>   main_f(va_list ap)
>   {
> -	raft_start_test(15);
> +	raft_start_test(14);
>   
>   	(void) ap;
>   	fakeev_init();
> @@ -1350,7 +1350,7 @@ main_f(va_list ap)
>   	raft_test_death_timeout();
>   	raft_test_enable_disable();
>   	raft_test_too_long_wal_write();
> -	raft_test_term_filter();
> +	//raft_test_term_filter();
>   	raft_test_start_stop_candidate();
>   
>   	fakeev_free();
> diff --git a/test/unit/raft.result b/test/unit/raft.result
> index bb799936b..f9a8f249b 100644
> --- a/test/unit/raft.result
> +++ b/test/unit/raft.result
> @@ -1,5 +1,5 @@
>   	*** main_f ***
> -1..15
> +1..14
>   	*** raft_test_leader_election ***
>       1..24
>       ok 1 - 1 pending message at start
> @@ -220,25 +220,12 @@ ok 12 - subtests
>       ok 8 - became candidate
>   ok 13 - subtests
>   	*** raft_test_too_long_wal_write: done ***
> -	*** raft_test_term_filter ***
> -    1..9
> -    ok 1 - empty node term
> -    ok 2 - not outdated initially
> -    ok 3 - node term updated
> -    ok 4 - other nodes are outdated
> -    ok 5 - node outdated when others have greater term
> -    ok 6 - node with greatest term isn't outdated
> -    ok 7 - node not outdated when others have the same term
> -    ok 8 - node term isn't decreased
> -    ok 9 - node doesn't become outdated
> -ok 14 - subtests
> -	*** raft_test_term_filter: done ***
>   	*** raft_test_start_stop_candidate ***
>       1..4
>       ok 1 - became leader after start_candidate
>       ok 2 - remain leader after stop_candidate
>       ok 3 - vote request from 2
>       ok 4 - demote once new election starts
> -ok 15 - subtests
> +ok 14 - subtests
>   	*** raft_test_start_stop_candidate: done ***
>   	*** main_f: done ***
>

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list