[Tarantool-patches] [PATCH v3 06/10] raft: keep track of greatest known term and filter replication sources based on that

Serge Petrenko sergepetrenko at tarantool.org
Fri Apr 16 17:16:54 MSK 2021



16.04.2021 02:27, Vladislav Shpilevoy пишет:
> Good job on the patch!

Thanks for the review!

>
> Please, try to reduce length of the lines in the commit
> message, or at least its title. It is suuuper long now.
Ok, fixed the title. Commit message looks ok, it's less than 72 chars:


     raft: filter rows based on known peer terms

     Start writing the actual leader term together with the PROMOTE request
     and process terms in PROMOTE requests on receiver side.

     Make applier only apply synchronous transactions from the instance 
which
     has the greatest term as received in PROMOTE requests.

     Closes #5445


>
> See 10 comments below.
>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 4898f9f7b..3fb864686 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -790,6 +790,12 @@ apply_synchro_row_cb(struct journal_entry *entry)
>>   		applier_rollback_by_wal_io();
>>   	} else {
>>   		txn_limbo_process(&txn_limbo, synchro_entry->req);
>> +		if (iproto_type_is_promote_request(synchro_entry->req->type)) {
>> +			raft_source_update_term(box_raft(),
>> +						synchro_entry->req->origin_id,
>> +						synchro_entry->req->term);
> 1. How about moving that to txn_limbo_read_promote()? What do you think? I see
> you do it in 3 places where txn_limbo_process() or txn_limbo_read_promote()
> are called on PROMOTE rows.

I didn't want to do that, because raft and limbo are separate entities.
But this would simplify the code quite a bit, so I'm ok with it.

Other option would be to introduce some new function to `box/raft.c`:
it would call txn_limbo_read_promote() and raft_update_term().
But then again we would have a separate handler for PROMOTE requests,
while CONFIRM and ROLLBACK could both be handled with txn_limbo_process.

Long story short, I'll add raft_update_term() to txn_limbo_process()
and remove txn_limbo_read_promote() from txn_limbo.h.

>
>> +
>> +		}
>>   		trigger_run(&replicaset.applier.on_wal_write, NULL);
>>   	}
>>   	/* The fiber is the same on final join. */
>> @@ -1027,6 +1033,28 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>>   		}
>>   	}
>>   
>> +	/*
>> +	 * When elections are enabled we must filter out synchronous rows coming
>> +	 * from an instance that fell behind the current leader. This includes
>> +	 * both synchronous tx rows and rows for txs following unconfirmed
>> +	 * synchronous transactions.
>> +	 * The rows are replaced with NOPs to preserve the vclock consistency.
>> +	 */
>> +	struct applier_tx_row *item;
>> +	if (raft_source_has_outdated_term(box_raft(), applier->instance_id) &&
> 2. The names are too long IMO. I would propose
>
> 	raft_is_node_outdated(raft, id)   // Check if behind
> 	raft_process_term(raft, id, term) // Set term for a node or skip if
> 						the same or older
> 	raft_node_term(raft, id)         // Get term
>
> 'source' is not really a perfect name, because raft nodes send
> messages to each other. There are no one-directional channels
> AFAIR like we have with upstream and downstream in the replication.
>
> I used 'source' in raft_process_heartbeat() as like a source of the
> heartbeat message. Note like the nodes are called sources everywhere.
>
> Also I used 'process' for the new term, because we already have
> raft_process_heartbeat() to handle info from a node with a given
> ID, and I thought it makes sense to keep them similar.

Ok, I'm fine with new names.

>
>
> 3. Why does raft_is_source_allowed() still exist when we have this wonder?

Shouldn't we still ignore asynchronous rows from non-leaders?

"The wonder" only saves us from synchro transactions.

>
>> +	    (last_row->wait_sync ||
>> +	     (iproto_type_is_synchro_request(first_row->type) &&
>> +	     !iproto_type_is_promote_request(first_row->type)))) {
>> +		stailq_foreach_entry(item, rows, next) {
>> +			struct xrow_header *row = &item->row;
>> +			row->type = IPROTO_NOP;
>> +			/*
>> +			 * Row body is saved to fiber's region and will be freed
>> +			 * on next fiber_gc() call.
>> +			 */
>> +			row->bodycnt = 0;
>> +		}
>> +	}
>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index 722fc23b7..f44dd0e54 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -426,6 +426,10 @@ wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row)
>>   		return -1;
>>   	}
>>   	txn_limbo_process(&txn_limbo, &syn_req);
>> +	if (syn_req.type == IPROTO_PROMOTE) {
>> +			raft_source_update_term(box_raft(), syn_req.origin_id,
>> +						syn_req.term);
> 4. Misaligned. Also see the first comment.

Thanks. Removed that altogether (moved to txn_limbo_read_promote())

>
>> @@ -1558,20 +1567,21 @@ box_clear_synchro_queue(bool try_wait)
>>   			rc = -1;
>>   		} else {
>>   promote:
>> -			/*
>> -			 * Term parameter is unused now, We'll pass
>> -			 * box_raft()->term there later.
>> -			 */
>> -			txn_limbo_write_promote(&txn_limbo, wait_lsn, 0);
>> +			/* We cannot possibly get here in a volatile state. */
>> +			assert(box_raft()->volatile_term == box_raft()->term);
>> +			txn_limbo_write_promote(&txn_limbo, wait_lsn,
>> +						box_raft()->term);
>>   			struct synchro_request req = {
>>   				.type = 0, /* unused */
>>   				.replica_id = 0, /* unused */
>>   				.origin_id = instance_id,
>>   				.lsn = wait_lsn,
>> -				.term = 0, /* unused */
>> +				.term = box_raft()->term,
>>   			};
>>   			txn_limbo_read_promote(&txn_limbo, &req);
>>   			assert(txn_limbo_is_empty(&txn_limbo));
>> +			raft_source_update_term(box_raft(), req.origin_id,
>> +						req.term);
> 5. See the first comment.

Removed.

>
>>   		}
>>   	}
>> diff --git a/src/lib/raft/raft.h b/src/lib/raft/raft.h
>> index e447f6634..01f548fee 100644
>> --- a/src/lib/raft/raft.h
>> +++ b/src/lib/raft/raft.h
>> @@ -207,6 +207,19 @@ struct raft {
>>   	 * subsystems, such as Raft.
>>   	 */
>>   	const struct vclock *vclock;
>> +	/**
>> +	 * The biggest term seen by this instance and persisted in WAL as part
>> +	 * of a PROMOTE request. May be smaller than @a term, while there are
>> +	 * ongoing elections, or the leader is already known, but this instance
>> +	 * hasn't read its PROMOTE request yet.
>> +	 * During other times must be equal to @a term.
>> +	 */
>> +	uint64_t greatest_known_term;
> 6. Maybe omit 'known'. There can't be 'greatest_unknown_term'.

Ok.

>
>> +	/**
>> +	 * Latest terms received with PROMOTE entries from remote instances.
>> +	 * Raft uses them to determine data from which sources may be applied.
>> +	 */
>> +	struct vclock term_map;
> 7. I have a feeling it is similar to the limbo's LSN map. Like
> they should be merged into something one. Can't formulate that
> properly. I hope we will see it more clear when will move all that
> to the WAL thread someday.

Yes, they're quite similar.

Do you mean we should create some new structure instead of using vclock for
these entities? Like something which would incorporate remote state:
a map of ids with their known terms and confirmed lsns?

>
>>   	/** State machine timed event trigger. */
>>   	struct ev_timer timer;
>>   	/** Configured election timeout in seconds. */
>> @@ -243,6 +256,39 @@ raft_is_source_allowed(const struct raft *raft, uint32_t source_id)
>>   	return !raft->is_enabled || raft->leader == source_id;
>>   }
>>   
>> +/**
>> + * Return the latest term as seen in PROMOTE requests from instance with id
>> + * @a source_id.
>> + */
>> +static inline uint64_t
>> +raft_source_term(const struct raft *raft, uint32_t source_id)
>> +{
>> +	assert(source_id != 0 && source_id < VCLOCK_MAX);
>> +	return vclock_get(&raft->term_map, source_id);
>> +}
>> +
>> +/**
>> + * Check whether replica with id @a source_id is too old to apply synchronous
>> + * data from it. The check is only valid when elections are enabled.
>> + */
>> +static inline bool
>> +raft_source_has_outdated_term(const struct raft *raft, uint32_t source_id)
>> +{
>> +	uint64_t source_term = vclock_get(&raft->term_map, source_id);
>> +	return raft->is_enabled && source_term < raft->greatest_known_term;
>> +}
>> +
>> +/** Remember the last term seen for replica  with id @a source_id. */
>> +static inline void
>> +raft_source_update_term(struct raft *raft, uint32_t source_id, uint64_t term)
>> +{
>> +	if ((uint64_t) vclock_get(&raft->term_map, source_id) >= term)
> 8. Probably having the term as uint64_t was a mistake from the beginning.
> Feel free to change it to int64_t if you want, in a separate commit.

I replaced this particular line with
`
raft_node_term() >= term
`
So it's not that ugly now.

Speaking of uint64_t -> int64_t, I don't think it's worth it.

>
>> +		return;
>> +	vclock_follow(&raft->term_map, source_id, term);
>> +	if (term > raft->greatest_known_term)
>> +		raft->greatest_known_term = term;
>> +}
> 9. I see these are not used in the raft code at all. Did you think about
> moving it all to box/raft.h and box/raft.c? Or about covering this all
> with unit tests in unit/raft.c if you decide to keep it here?

AFAIU box/raft.h is about interconnecting box and raft functionality.

These functions aren't used in lib/raft indeed, but they belong here,
  I think. Just like `raft_is_source_allowed()`.

I'll come up with some unit tests.

>
>> +
>>   /** Check if Raft is enabled. */
>>   static inline bool
>>   raft_is_enabled(const struct raft *raft)
>> diff --git a/test/replication/gh-5445-leader-inconsistency.result b/test/replication/gh-5445-leader-inconsistency.result
>> new file mode 100644
>> index 000000000..ff3104de5
>> --- /dev/null
>> +++ b/test/replication/gh-5445-leader-inconsistency.result
>> @@ -0,0 +1,291 @@
>> +-- test-run result file version 2
>> +test_run = require("test_run").new()
>> + | ---
>> + | ...
>> +
>> +is_leader_cmd = "return box.info.election.state == 'leader'"
>> + | ---
>> + | ...
>> +
>> +-- Auxiliary.
>> +test_run:cmd('setopt delimiter ";"')
>> + | ---
>> + | - true
>> + | ...
>> +function get_leader(nrs)
>> +    local leader_nr = 0
>> +    test_run:wait_cond(function()
>> +        for nr, do_check in pairs(nrs) do
>> +            if do_check then
>> +                local is_leader = test_run:eval('election_replica'..nr,
>> +                                                is_leader_cmd)[1]
>> +                if is_leader then
>> +                    leader_nr = nr
>> +                    return true
>> +                end
>> +            end
>> +        end
>> +        return false
>> +    end)
>> +    assert(leader_nr ~= 0)
>> +    return leader_nr
>> +end;
>> + | ---
>> + | ...
>> +
>> +function name(id)
>> +    return 'election_replica'..id
>> +end;
> 10. You can move this function above get_leader() and use it
> in there.

Yes, indeed, thanks!

Incremental diff:

===================================================

diff --git a/src/box/applier.cc b/src/box/applier.cc
index c3ee620a2..61d53fdec 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -790,12 +790,6 @@ apply_synchro_row_cb(struct journal_entry *entry)
          applier_rollback_by_wal_io();
      } else {
          txn_limbo_process(&txn_limbo, synchro_entry->req);
-        if (iproto_type_is_promote_request(synchro_entry->req->type)) {
-            raft_source_update_term(box_raft(),
-                        synchro_entry->req->origin_id,
-                        synchro_entry->req->term);
-
-        }
          trigger_run(&replicaset.applier.on_wal_write, NULL);
      }
      /* The fiber is the same on final join. */
@@ -1041,7 +1035,7 @@ applier_apply_tx(struct applier *applier, struct 
stailq *rows)
       * The rows are replaced with NOPs to preserve the vclock consistency.
       */
      struct applier_tx_row *item;
-    if (raft_source_has_outdated_term(box_raft(), applier->instance_id) &&
+    if (raft_is_node_outdated(box_raft(), applier->instance_id) &&
          (last_row->wait_sync ||
           (iproto_type_is_synchro_request(first_row->type) &&
           !iproto_type_is_promote_request(first_row->type)))) {
diff --git a/src/box/box.cc b/src/box/box.cc
index 9c7e92a0e..907bcca31 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -426,10 +426,6 @@ wal_stream_apply_synchro_row(struct wal_stream 
*stream, struct xrow_header *row)
          return -1;
      }
      txn_limbo_process(&txn_limbo, &syn_req);
-    if (syn_req.type == IPROTO_PROMOTE) {
-            raft_source_update_term(box_raft(), syn_req.origin_id,
-                        syn_req.term);
-    }
      return 0;
  }

@@ -1512,7 +1508,7 @@ box_clear_synchro_queue(bool try_wait)
       * written for this term.
       */
      if (!is_box_configured ||
-        raft_source_term(box_raft(), instance_id) == box_raft()->term)
+        raft_node_term(box_raft(), instance_id) == box_raft()->term)
          return 0;
      uint32_t former_leader_id = txn_limbo.owner_id;
      int64_t wait_lsn = txn_limbo.confirmed_lsn;
@@ -1580,8 +1576,6 @@ promote:
              };
              txn_limbo_read_promote(&txn_limbo, &req);
              assert(txn_limbo_is_empty(&txn_limbo));
-            raft_source_update_term(box_raft(), req.origin_id,
-                        req.term);
          }
      }
      in_clear_synchro_queue = false;
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index 93c8994b7..d72a0573f 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -34,6 +34,7 @@
  #include "iproto_constants.h"
  #include "journal.h"
  #include "box.h"
+#include "raft.h"

  struct txn_limbo txn_limbo;

@@ -495,6 +496,7 @@ txn_limbo_read_promote(struct txn_limbo *limbo,
      assert(txn_limbo_is_empty(&txn_limbo));
      limbo->owner_id = req->origin_id;
      limbo->confirmed_lsn = 0;
+    raft_process_term(box_raft(), req->origin_id, req->term);
  }

  void
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 4a1c43856..b90c50b33 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -280,7 +280,7 @@ txn_limbo_write_promote(struct txn_limbo *limbo, 
int64_t lsn, uint64_t term);

  /**
   * Process a PROMOTE request, i.e. confirm all entries <= @req.lsn and 
rollback all
- * entries > @req.lsn.
+ * entries > @req.lsn, and update known raft term for @req.origin_id.
   */
  void
  txn_limbo_read_promote(struct txn_limbo *limbo,
diff --git a/src/lib/raft/raft.h b/src/lib/raft/raft.h
index 01f548fee..75512e38a 100644
--- a/src/lib/raft/raft.h
+++ b/src/lib/raft/raft.h
@@ -214,7 +214,7 @@ struct raft {
       * hasn't read its PROMOTE request yet.
       * During other times must be equal to @a term.
       */
-    uint64_t greatest_known_term;
+    uint64_t greatest_term;
      /**
       * Latest terms received with PROMOTE entries from remote instances.
       * Raft uses them to determine data from which sources may be applied.
@@ -261,7 +261,7 @@ raft_is_source_allowed(const struct raft *raft, 
uint32_t source_id)
   * @a source_id.
   */
  static inline uint64_t
-raft_source_term(const struct raft *raft, uint32_t source_id)
+raft_node_term(const struct raft *raft, uint32_t source_id)
  {
      assert(source_id != 0 && source_id < VCLOCK_MAX);
      return vclock_get(&raft->term_map, source_id);
@@ -272,21 +272,21 @@ raft_source_term(const struct raft *raft, uint32_t 
source_id)
   * data from it. The check is only valid when elections are enabled.
   */
  static inline bool
-raft_source_has_outdated_term(const struct raft *raft, uint32_t source_id)
+raft_is_node_outdated(const struct raft *raft, uint32_t source_id)
  {
-    uint64_t source_term = vclock_get(&raft->term_map, source_id);
-    return raft->is_enabled && source_term < raft->greatest_known_term;
+    uint64_t source_term = raft_node_term(raft, source_id);
+    return raft->is_enabled && source_term < raft->greatest_term;
  }

  /** Remember the last term seen for replica  with id @a source_id. */
  static inline void
-raft_source_update_term(struct raft *raft, uint32_t source_id, uint64_t 
term)
+raft_process_term(struct raft *raft, uint32_t source_id, uint64_t term)
  {
-    if ((uint64_t) vclock_get(&raft->term_map, source_id) >= term)
+    if (raft_node_term(raft, source_id) >= term)
          return;
      vclock_follow(&raft->term_map, source_id, term);
-    if (term > raft->greatest_known_term)
-        raft->greatest_known_term = term;
+    if (term > raft->greatest_term)
+        raft->greatest_term = term;
  }

  /** Check if Raft is enabled. */
diff --git a/test/replication/gh-5445-leader-inconsistency.result 
b/test/replication/gh-5445-leader-inconsistency.result
index ff3104de5..5c6169f50 100644
--- a/test/replication/gh-5445-leader-inconsistency.result
+++ b/test/replication/gh-5445-leader-inconsistency.result
@@ -12,12 +12,18 @@ test_run:cmd('setopt delimiter ";"')
   | ---
   | - true
   | ...
+function name(id)
+    return 'election_replica'..id
+end;
+ | ---
+ | ...
+
  function get_leader(nrs)
      local leader_nr = 0
      test_run:wait_cond(function()
          for nr, do_check in pairs(nrs) do
              if do_check then
-                local is_leader = test_run:eval('election_replica'..nr,
+                local is_leader = test_run:eval(name(nr),
                                                  is_leader_cmd)[1]
                  if is_leader then
                      leader_nr = nr
@@ -33,11 +39,6 @@ end;
   | ---
   | ...

-function name(id)
-    return 'election_replica'..id
-end;
- | ---
- | ...
  test_run:cmd('setopt delimiter ""');
   | ---
   | - true
diff --git a/test/replication/gh-5445-leader-inconsistency.test.lua 
b/test/replication/gh-5445-leader-inconsistency.test.lua
index bf8b31886..e7952f5fa 100644
--- a/test/replication/gh-5445-leader-inconsistency.test.lua
+++ b/test/replication/gh-5445-leader-inconsistency.test.lua
@@ -4,12 +4,16 @@ is_leader_cmd = "return box.info.election.state == 
'leader'"

  -- Auxiliary.
  test_run:cmd('setopt delimiter ";"')
+function name(id)
+    return 'election_replica'..id
+end;
+
  function get_leader(nrs)
      local leader_nr = 0
      test_run:wait_cond(function()
          for nr, do_check in pairs(nrs) do
              if do_check then
-                local is_leader = test_run:eval('election_replica'..nr,
+                local is_leader = test_run:eval(name(nr),
                                                  is_leader_cmd)[1]
                  if is_leader then
                      leader_nr = nr
@@ -23,9 +27,6 @@ function get_leader(nrs)
      return leader_nr
  end;

-function name(id)
-    return 'election_replica'..id
-end;
  test_run:cmd('setopt delimiter ""');

  --
diff --git a/test/unit/raft.c b/test/unit/raft.c
index d0d13d8c7..0306cefcd 100644
--- a/test/unit/raft.c
+++ b/test/unit/raft.c
@@ -1267,10 +1267,44 @@ raft_test_too_long_wal_write(void)
      raft_finish_test();
  }

+static void
+raft_test_term_filter(void)
+{
+    raft_start_test(9);
+    struct raft_node node;
+    raft_node_create(&node);
+
+    is(raft_node_term(&node.raft, 1), 0, "empty node term");
+    ok(!raft_is_node_outdated(&node.raft, 1), "not outdated initially");
+
+    raft_process_term(&node.raft, 1, 1);
+    is(raft_node_term(&node.raft, 1), 1, "node term updated");
+    ok(raft_is_node_outdated(&node.raft, 2), "other nodes are outdated");
+
+    raft_process_term(&node.raft, 2, 100);
+    ok(raft_is_node_outdated(&node.raft, 1), "node outdated when others "
+                         "have greater term");
+    ok(!raft_is_node_outdated(&node.raft, 2), "node with greatest term "
+                         "isn't outdated");
+
+    raft_process_term(&node.raft, 3, 100);
+    ok(!raft_is_node_outdated(&node.raft, 2), "node not outdated when "
+                         "others have the same term");
+
+    raft_process_term(&node.raft, 3, 99);
+    is(raft_node_term(&node.raft, 3), 100, "node term isn't decreased");
+    ok(!raft_is_node_outdated(&node.raft, 3), "node doesn't become "
+                          "outdated");
+
+
+    raft_node_destroy(&node);
+    raft_finish_test();
+}
+
  static int
  main_f(va_list ap)
  {
-    raft_start_test(13);
+    raft_start_test(14);

      (void) ap;
      fakeev_init();
@@ -1288,6 +1322,7 @@ main_f(va_list ap)
      raft_test_death_timeout();
      raft_test_enable_disable();
      raft_test_too_long_wal_write();
+    raft_test_term_filter();

      fakeev_free();

diff --git a/test/unit/raft.result b/test/unit/raft.result
index 96bfc3b86..ecb962e42 100644
--- a/test/unit/raft.result
+++ b/test/unit/raft.result
@@ -1,5 +1,5 @@
      *** main_f ***
-1..13
+1..14
      *** raft_test_leader_election ***
      1..24
      ok 1 - 1 pending message at start
@@ -220,4 +220,17 @@ ok 12 - subtests
      ok 8 - became candidate
  ok 13 - subtests
      *** raft_test_too_long_wal_write: done ***
+    *** raft_test_term_filter ***
+    1..9
+    ok 1 - empty node term
+    ok 2 - not outdated initially
+    ok 3 - node term updated
+    ok 4 - other nodes are outdated
+    ok 5 - node outdated when others have greater term
+    ok 6 - node with greatest term isn't outdated
+    ok 7 - node not outdated when others have the same term
+    ok 8 - node term isn't decreased
+    ok 9 - node doesn't become outdated
+ok 14 - subtests
+    *** raft_test_term_filter: done ***
      *** main_f: done ***

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list