[Tarantool-patches] [PATCH v4 07/12] raft: filter rows based on known peer terms
Serge Petrenko
sergepetrenko at tarantool.org
Sun Apr 18 11:49:55 MSK 2021
17.04.2021 01:21, Vladislav Shpilevoy пишет:
> I appreciate the work you did here!
>
>> diff --git a/src/lib/raft/raft.h b/src/lib/raft/raft.h
>> index e447f6634..a5f7e08d9 100644
>> --- a/src/lib/raft/raft.h
>> +++ b/src/lib/raft/raft.h
>> @@ -207,6 +207,19 @@ struct raft {
>> * subsystems, such as Raft.
>> */
>> const struct vclock *vclock;
>> + /**
>> + * The biggest term seen by this instance and persisted in WAL as part
>> + * of a PROMOTE request. May be smaller than @a term, while there are
>> + * ongoing elections, or the leader is already known, but this instance
>> + * hasn't read its PROMOTE request yet.
>> + * During other times must be equal to @a term.
>> + */
>> + uint64_t greatest_term;
>> + /**
>> + * Latest terms received with PROMOTE entries from remote instances.
>> + * Raft uses them to determine data from which sources may be applied.
>> + */
>> + struct vclock term_map;
> I am sorry for not noticing this first time, but I realized the
> names are still not perfect - they give an impression the terms are
> collected on any term bump. But they are only for promotions. So
> they should probably be greatest_promote_term, and promote_term_map.
>
> Another issue I see after that rename - they depend on something not
> related to raft. Raft does write PROMOTEs. You can see that these
> 2 members are not used in raft code at all. Only in the limbo and
> box. On the other hand, they don't remove terms dependency from the
> limbo, because they are part of PROMOTE, which is part of the limbo.
>
> That means, we introduced an explicit dependency on raft in the
> limbo just to store some numbers in struct raft.
>
> Maybe move these 2 members to the limbo? They have nothing to do with
> the leader election as we can see, and our lib/raft is only about that.
>
> They are for filtering once the leader is elected already, which is
> synchronous replication's job, and which in turn is the limbo.
>
> This also makes us closer to the idea I mentioned about lsn map
> and promote term map merged into something new inside of the limbo.
>
> I tried to deal with that idea myself, and it resulted into a commit
> I pushed on top of your branch, and pasted below.
>
> I made so the limbo does not depend on raft anymore (on its API). It
> only uses term numbers. Box is the link between raft and limbo - it
> passes the raft terms to the new promote entries in box.ctl.promote().
>
> If you agree, please, squash. Otherwise lets discuss. I didn't delete
> the unit test about this new map yet, only commented it out. You would
> need to drop it if squash.
I see what you mean. It was hard to see that these methods belong to
txn_limbo, at first. But I agree with you now.
Your version looks good. Thanks for the help! Squashed.
> ====================
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 61d53fdec..b0e8fbba7 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -967,6 +967,59 @@ apply_final_join_tx(struct stailq *rows)
> return rc;
> }
>
> +/*
> + * When elections are enabled we must filter out synchronous rows coming
> + * from an instance that fell behind the current leader. This includes
> + * both synchronous tx rows and rows for txs following unconfirmed
> + * synchronous transactions.
> + * The rows are replaced with NOPs to preserve the vclock consistency.
> + */
> +static void
> +applier_synchro_filter_tx(struct applier *applier, struct stailq *rows)
> +{
> + /*
> + * XXX: in case raft is disabled, synchronous replication still works
> + * but without any filtering. That might lead to issues with
> + * unpredictable confirms after rollbacks which are supposed to be
> + * fixed by the filtering.
> + */
> + if (!raft_is_enabled(box_raft()))
> + return;
> + if (!txn_limbo_is_replica_outdated(&txn_limbo, applier->instance_id))
> + return;
> +
> + struct xrow_header *row;
> + row = &stailq_last_entry(rows, struct applier_tx_row, next)->row;
> + if (row->wait_sync)
> + goto nopify;
> +
> + row = &stailq_first_entry(rows, struct applier_tx_row, next)->row;
> + /*
> + * Not waiting for sync and not a synchro request - this make it already
> + * NOP or an asynchronous transaction not depending on any synchronous
> + * ones - let it go as is.
> + */
> + if (!iproto_type_is_synchro_request(row->type))
> + return;
> + /*
> + * Do not NOPify promotion, otherwise won't even know who is the limbo
> + * owner now.
> + */
> + if (iproto_type_is_promote_request(row->type))
> + return;
> +nopify:;
> + struct applier_tx_row *item;
> + stailq_foreach_entry(item, rows, next) {
> + row = &item->row;
> + row->type = IPROTO_NOP;
> + /*
> + * Row body is saved to fiber's region and will be freed
> + * on next fiber_gc() call.
> + */
> + row->bodycnt = 0;
> + }
> +}
> +
> /**
> * Apply all rows in the rows queue as a single transaction.
> *
> @@ -1026,29 +1079,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
> }
> }
> }
> -
> - /*
> - * When elections are enabled we must filter out synchronous rows coming
> - * from an instance that fell behind the current leader. This includes
> - * both synchronous tx rows and rows for txs following unconfirmed
> - * synchronous transactions.
> - * The rows are replaced with NOPs to preserve the vclock consistency.
> - */
> - struct applier_tx_row *item;
> - if (raft_is_node_outdated(box_raft(), applier->instance_id) &&
> - (last_row->wait_sync ||
> - (iproto_type_is_synchro_request(first_row->type) &&
> - !iproto_type_is_promote_request(first_row->type)))) {
> - stailq_foreach_entry(item, rows, next) {
> - struct xrow_header *row = &item->row;
> - row->type = IPROTO_NOP;
> - /*
> - * Row body is saved to fiber's region and will be freed
> - * on next fiber_gc() call.
> - */
> - row->bodycnt = 0;
> - }
> - }
> + applier_synchro_filter_tx(applier, rows);
> if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
> /*
> * Synchro messages are not transactions, in terms
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 70cb2bd53..cc68f0168 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1516,10 +1516,12 @@ box_promote(void)
>
> /*
> * Do nothing when box isn't configured and when PROMOTE was already
> - * written for this term.
> + * written for this term (synchronous replication and leader election
> + * are in sync, and both chose this node as a leader).
> */
> - if (!is_box_configured ||
> - raft_node_term(box_raft(), instance_id) == box_raft()->term)
> + if (!is_box_configured)
> + return 0;
> + if (txn_limbo_replica_term(&txn_limbo, instance_id) == box_raft()->term)
> return 0;
>
> bool run_elections = false;
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index 0726b5a04..bafb47aaa 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -34,7 +34,6 @@
> #include "iproto_constants.h"
> #include "journal.h"
> #include "box.h"
> -#include "raft.h"
>
> struct txn_limbo txn_limbo;
>
> @@ -46,6 +45,8 @@ txn_limbo_create(struct txn_limbo *limbo)
> limbo->owner_id = REPLICA_ID_NIL;
> fiber_cond_create(&limbo->wait_cond);
> vclock_create(&limbo->vclock);
> + vclock_create(&limbo->promote_term_map);
> + limbo->promote_greatest_term = 0;
> limbo->confirmed_lsn = 0;
> limbo->rollback_count = 0;
> limbo->is_in_rollback = false;
> @@ -644,8 +645,13 @@ complete:
> void
> txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
> {
> - /* It's ok to process an empty term. It'll just get ignored. */
> - raft_process_term(box_raft(), req->origin_id, req->term);
> + uint64_t term = req->term;
> + uint32_t origin = req->origin_id;
> + if (txn_limbo_replica_term(limbo, origin) < term) {
> + vclock_follow(&limbo->promote_term_map, origin, term);
> + if (term > limbo->promote_greatest_term)
> + limbo->promote_greatest_term = term;
> + }
> if (req->replica_id != limbo->owner_id) {
> /*
> * Ignore CONFIRM/ROLLBACK messages for a foreign master.
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index f35771dc9..e409ac657 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -129,6 +129,24 @@ struct txn_limbo {
> * transactions, created on the limbo's owner node.
> */
> struct vclock vclock;
> + /**
> + * Latest terms received with PROMOTE entries from remote instances.
> + * Limbo uses them to filter out the transactions coming not from the
> + * limbo owner, but so outdated that they are rolled back everywhere
> + * except outdated nodes.
> + */
> + struct vclock promote_term_map;
> + /**
> + * The biggest PROMOTE term seen by the instance and persisted in WAL.
> + * It is related to raft term, but not the same. Synchronous replication
> + * represented by the limbo is interested only in the won elections
> + * ended with PROMOTE request.
> + * It means the limbo's term might be smaller than the raft term, while
> + * there are ongoing elections, or the leader is already known and this
> + * instance hasn't read its PROMOTE request yet. During other times the
> + * limbo and raft are in sync and the terms are the same.
> + */
> + uint64_t promote_greatest_term;
> /**
> * Maximal LSN gathered quorum and either already confirmed in WAL, or
> * whose confirmation is in progress right now. Any attempt to confirm
> @@ -193,6 +211,28 @@ txn_limbo_last_entry(struct txn_limbo *limbo)
> in_queue);
> }
>
> +/**
> + * Return the latest term as seen in PROMOTE requests from instance with id
> + * @a replica_id.
> + */
> +static inline uint64_t
> +txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id)
> +{
> + return vclock_get(&limbo->promote_term_map, replica_id);
> +}
> +
> +/**
> + * Check whether replica with id @a source_id is too old to apply synchronous
> + * data from it. The check is only valid when elections are enabled.
> + */
> +static inline bool
> +txn_limbo_is_replica_outdated(const struct txn_limbo *limbo,
> + uint32_t replica_id)
> +{
> + return txn_limbo_replica_term(limbo, replica_id) <
> + limbo->promote_greatest_term;
> +}
> +
> /**
> * Return the last synchronous transaction in the limbo or NULL when it is
> * empty.
> diff --git a/src/lib/raft/raft.c b/src/lib/raft/raft.c
> index b21693642..874e9157e 100644
> --- a/src/lib/raft/raft.c
> +++ b/src/lib/raft/raft.c
> @@ -1012,7 +1012,6 @@ raft_create(struct raft *raft, const struct raft_vtab *vtab)
> .death_timeout = 5,
> .vtab = vtab,
> };
> - vclock_create(&raft->term_map);
> raft_ev_timer_init(&raft->timer, raft_sm_schedule_new_election_cb,
> 0, 0);
> raft->timer.data = raft;
> diff --git a/src/lib/raft/raft.h b/src/lib/raft/raft.h
> index 69dec63c6..f7bc205d2 100644
> --- a/src/lib/raft/raft.h
> +++ b/src/lib/raft/raft.h
> @@ -207,19 +207,6 @@ struct raft {
> * subsystems, such as Raft.
> */
> const struct vclock *vclock;
> - /**
> - * The biggest term seen by this instance and persisted in WAL as part
> - * of a PROMOTE request. May be smaller than @a term, while there are
> - * ongoing elections, or the leader is already known, but this instance
> - * hasn't read its PROMOTE request yet.
> - * During other times must be equal to @a term.
> - */
> - uint64_t greatest_term;
> - /**
> - * Latest terms received with PROMOTE entries from remote instances.
> - * Raft uses them to determine data from which sources may be applied.
> - */
> - struct vclock term_map;
> /** State machine timed event trigger. */
> struct ev_timer timer;
> /** Configured election timeout in seconds. */
> @@ -256,39 +243,6 @@ raft_is_source_allowed(const struct raft *raft, uint32_t source_id)
> return !raft->is_enabled || raft->leader == source_id;
> }
>
> -/**
> - * Return the latest term as seen in PROMOTE requests from instance with id
> - * @a source_id.
> - */
> -static inline uint64_t
> -raft_node_term(const struct raft *raft, uint32_t source_id)
> -{
> - assert(source_id < VCLOCK_MAX);
> - return vclock_get(&raft->term_map, source_id);
> -}
> -
> -/**
> - * Check whether replica with id @a source_id is too old to apply synchronous
> - * data from it. The check is only valid when elections are enabled.
> - */
> -static inline bool
> -raft_is_node_outdated(const struct raft *raft, uint32_t source_id)
> -{
> - uint64_t source_term = raft_node_term(raft, source_id);
> - return raft->is_enabled && source_term < raft->greatest_term;
> -}
> -
> -/** Remember the last term seen for replica with id @a source_id. */
> -static inline void
> -raft_process_term(struct raft *raft, uint32_t source_id, uint64_t term)
> -{
> - if (raft_node_term(raft, source_id) >= term)
> - return;
> - vclock_follow(&raft->term_map, source_id, term);
> - if (term > raft->greatest_term)
> - raft->greatest_term = term;
> -}
> -
> /** Check if Raft is enabled. */
> static inline bool
> raft_is_enabled(const struct raft *raft)
> diff --git a/test/unit/raft.c b/test/unit/raft.c
> index 575886932..4214dbc4c 100644
> --- a/test/unit/raft.c
> +++ b/test/unit/raft.c
> @@ -1267,38 +1267,38 @@ raft_test_too_long_wal_write(void)
> raft_finish_test();
> }
>
> -static void
> -raft_test_term_filter(void)
> -{
> - raft_start_test(9);
> - struct raft_node node;
> - raft_node_create(&node);
> -
> - is(raft_node_term(&node.raft, 1), 0, "empty node term");
> - ok(!raft_is_node_outdated(&node.raft, 1), "not outdated initially");
> -
> - raft_process_term(&node.raft, 1, 1);
> - is(raft_node_term(&node.raft, 1), 1, "node term updated");
> - ok(raft_is_node_outdated(&node.raft, 2), "other nodes are outdated");
> -
> - raft_process_term(&node.raft, 2, 100);
> - ok(raft_is_node_outdated(&node.raft, 1), "node outdated when others "
> - "have greater term");
> - ok(!raft_is_node_outdated(&node.raft, 2), "node with greatest term "
> - "isn't outdated");
> -
> - raft_process_term(&node.raft, 3, 100);
> - ok(!raft_is_node_outdated(&node.raft, 2), "node not outdated when "
> - "others have the same term");
> -
> - raft_process_term(&node.raft, 3, 99);
> - is(raft_node_term(&node.raft, 3), 100, "node term isn't decreased");
> - ok(!raft_is_node_outdated(&node.raft, 3), "node doesn't become "
> - "outdated");
> -
> - raft_node_destroy(&node);
> - raft_finish_test();
> -}
> +// static void
> +// raft_test_term_filter(void)
> +// {
> +// raft_start_test(9);
> +// struct raft_node node;
> +// raft_node_create(&node);
> +
> +// is(raft_node_term(&node.raft, 1), 0, "empty node term");
> +// ok(!raft_is_node_outdated(&node.raft, 1), "not outdated initially");
> +
> +// raft_process_term(&node.raft, 1, 1);
> +// is(raft_node_term(&node.raft, 1), 1, "node term updated");
> +// ok(raft_is_node_outdated(&node.raft, 2), "other nodes are outdated");
> +
> +// raft_process_term(&node.raft, 2, 100);
> +// ok(raft_is_node_outdated(&node.raft, 1), "node outdated when others "
> +// "have greater term");
> +// ok(!raft_is_node_outdated(&node.raft, 2), "node with greatest term "
> +// "isn't outdated");
> +
> +// raft_process_term(&node.raft, 3, 100);
> +// ok(!raft_is_node_outdated(&node.raft, 2), "node not outdated when "
> +// "others have the same term");
> +
> +// raft_process_term(&node.raft, 3, 99);
> +// is(raft_node_term(&node.raft, 3), 100, "node term isn't decreased");
> +// ok(!raft_is_node_outdated(&node.raft, 3), "node doesn't become "
> +// "outdated");
> +
> +// raft_node_destroy(&node);
> +// raft_finish_test();
> +// }
>
> static void
> raft_test_start_stop_candidate(void)
> @@ -1332,7 +1332,7 @@ raft_test_start_stop_candidate(void)
> static int
> main_f(va_list ap)
> {
> - raft_start_test(15);
> + raft_start_test(14);
>
> (void) ap;
> fakeev_init();
> @@ -1350,7 +1350,7 @@ main_f(va_list ap)
> raft_test_death_timeout();
> raft_test_enable_disable();
> raft_test_too_long_wal_write();
> - raft_test_term_filter();
> + //raft_test_term_filter();
> raft_test_start_stop_candidate();
>
> fakeev_free();
> diff --git a/test/unit/raft.result b/test/unit/raft.result
> index bb799936b..f9a8f249b 100644
> --- a/test/unit/raft.result
> +++ b/test/unit/raft.result
> @@ -1,5 +1,5 @@
> *** main_f ***
> -1..15
> +1..14
> *** raft_test_leader_election ***
> 1..24
> ok 1 - 1 pending message at start
> @@ -220,25 +220,12 @@ ok 12 - subtests
> ok 8 - became candidate
> ok 13 - subtests
> *** raft_test_too_long_wal_write: done ***
> - *** raft_test_term_filter ***
> - 1..9
> - ok 1 - empty node term
> - ok 2 - not outdated initially
> - ok 3 - node term updated
> - ok 4 - other nodes are outdated
> - ok 5 - node outdated when others have greater term
> - ok 6 - node with greatest term isn't outdated
> - ok 7 - node not outdated when others have the same term
> - ok 8 - node term isn't decreased
> - ok 9 - node doesn't become outdated
> -ok 14 - subtests
> - *** raft_test_term_filter: done ***
> *** raft_test_start_stop_candidate ***
> 1..4
> ok 1 - became leader after start_candidate
> ok 2 - remain leader after stop_candidate
> ok 3 - vote request from 2
> ok 4 - demote once new election starts
> -ok 15 - subtests
> +ok 14 - subtests
> *** raft_test_start_stop_candidate: done ***
> *** main_f: done ***
>
--
Serge Petrenko
More information about the Tarantool-patches
mailing list