[Tarantool-patches] [PATCH v3 06/10] raft: keep track of greatest known term and filter replication sources based on that

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Apr 16 02:27:40 MSK 2021


Good job on the patch!

Please, try to reduce length of the lines in the commit
message, or at least its title. It is suuuper long now.

See 10 comments below.

> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 4898f9f7b..3fb864686 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -790,6 +790,12 @@ apply_synchro_row_cb(struct journal_entry *entry)
>  		applier_rollback_by_wal_io();
>  	} else {
>  		txn_limbo_process(&txn_limbo, synchro_entry->req);
> +		if (iproto_type_is_promote_request(synchro_entry->req->type)) {
> +			raft_source_update_term(box_raft(),
> +						synchro_entry->req->origin_id,
> +						synchro_entry->req->term);

1. How about moving that to txn_limbo_read_promote()? What do you think? I see
you do it in 3 places where txn_limbo_process() or txn_limbo_read_promote()
are called on PROMOTE rows.

> +
> +		}
>  		trigger_run(&replicaset.applier.on_wal_write, NULL);
>  	}
>  	/* The fiber is the same on final join. */
> @@ -1027,6 +1033,28 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>  		}
>  	}
>  
> +	/*
> +	 * When elections are enabled we must filter out synchronous rows coming
> +	 * from an instance that fell behind the current leader. This includes
> +	 * both synchronous tx rows and rows for txs following unconfirmed
> +	 * synchronous transactions.
> +	 * The rows are replaced with NOPs to preserve the vclock consistency.
> +	 */
> +	struct applier_tx_row *item;
> +	if (raft_source_has_outdated_term(box_raft(), applier->instance_id) &&

2. The names are too long IMO. I would propose

	raft_is_node_outdated(raft, id)   // Check if behind
	raft_process_term(raft, id, term) // Set term for a node or skip if
						the same or older
	raft_node_term(raft, id)         // Get term

'source' is not really a perfect name, because raft nodes send
messages to each other. There are no one-directional channels
AFAIR like we have with upstream and downstream in the replication.

I used 'source' in raft_process_heartbeat() as like a source of the
heartbeat message. Note like the nodes are called sources everywhere.

Also I used 'process' for the new term, because we already have
raft_process_heartbeat() to handle info from a node with a given
ID, and I thought it makes sense to keep them similar.


3. Why does raft_is_source_allowed() still exist when we have this wonder?

> +	    (last_row->wait_sync ||
> +	     (iproto_type_is_synchro_request(first_row->type) &&
> +	     !iproto_type_is_promote_request(first_row->type)))) {
> +		stailq_foreach_entry(item, rows, next) {
> +			struct xrow_header *row = &item->row;
> +			row->type = IPROTO_NOP;
> +			/*
> +			 * Row body is saved to fiber's region and will be freed
> +			 * on next fiber_gc() call.
> +			 */
> +			row->bodycnt = 0;
> +		}
> +	}
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 722fc23b7..f44dd0e54 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -426,6 +426,10 @@ wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row)
>  		return -1;
>  	}
>  	txn_limbo_process(&txn_limbo, &syn_req);
> +	if (syn_req.type == IPROTO_PROMOTE) {
> +			raft_source_update_term(box_raft(), syn_req.origin_id,
> +						syn_req.term);

4. Misaligned. Also see the first comment.

> @@ -1558,20 +1567,21 @@ box_clear_synchro_queue(bool try_wait)
>  			rc = -1;
>  		} else {
>  promote:
> -			/*
> -			 * Term parameter is unused now, We'll pass
> -			 * box_raft()->term there later.
> -			 */
> -			txn_limbo_write_promote(&txn_limbo, wait_lsn, 0);
> +			/* We cannot possibly get here in a volatile state. */
> +			assert(box_raft()->volatile_term == box_raft()->term);
> +			txn_limbo_write_promote(&txn_limbo, wait_lsn,
> +						box_raft()->term);
>  			struct synchro_request req = {
>  				.type = 0, /* unused */
>  				.replica_id = 0, /* unused */
>  				.origin_id = instance_id,
>  				.lsn = wait_lsn,
> -				.term = 0, /* unused */
> +				.term = box_raft()->term,
>  			};
>  			txn_limbo_read_promote(&txn_limbo, &req);
>  			assert(txn_limbo_is_empty(&txn_limbo));
> +			raft_source_update_term(box_raft(), req.origin_id,
> +						req.term);

5. See the first comment.

>  		}
>  	}
> diff --git a/src/lib/raft/raft.h b/src/lib/raft/raft.h
> index e447f6634..01f548fee 100644
> --- a/src/lib/raft/raft.h
> +++ b/src/lib/raft/raft.h
> @@ -207,6 +207,19 @@ struct raft {
>  	 * subsystems, such as Raft.
>  	 */
>  	const struct vclock *vclock;
> +	/**
> +	 * The biggest term seen by this instance and persisted in WAL as part
> +	 * of a PROMOTE request. May be smaller than @a term, while there are
> +	 * ongoing elections, or the leader is already known, but this instance
> +	 * hasn't read its PROMOTE request yet.
> +	 * During other times must be equal to @a term.
> +	 */
> +	uint64_t greatest_known_term;

6. Maybe omit 'known'. There can't be 'greatest_unknown_term'.

> +	/**
> +	 * Latest terms received with PROMOTE entries from remote instances.
> +	 * Raft uses them to determine data from which sources may be applied.
> +	 */
> +	struct vclock term_map;

7. I have a feeling it is similar to the limbo's LSN map. Like
they should be merged into something one. Can't formulate that
properly. I hope we will see it more clear when will move all that
to the WAL thread someday.

>  	/** State machine timed event trigger. */
>  	struct ev_timer timer;
>  	/** Configured election timeout in seconds. */
> @@ -243,6 +256,39 @@ raft_is_source_allowed(const struct raft *raft, uint32_t source_id)
>  	return !raft->is_enabled || raft->leader == source_id;
>  }
>  
> +/**
> + * Return the latest term as seen in PROMOTE requests from instance with id
> + * @a source_id.
> + */
> +static inline uint64_t
> +raft_source_term(const struct raft *raft, uint32_t source_id)
> +{
> +	assert(source_id != 0 && source_id < VCLOCK_MAX);
> +	return vclock_get(&raft->term_map, source_id);
> +}
> +
> +/**
> + * Check whether replica with id @a source_id is too old to apply synchronous
> + * data from it. The check is only valid when elections are enabled.
> + */
> +static inline bool
> +raft_source_has_outdated_term(const struct raft *raft, uint32_t source_id)
> +{
> +	uint64_t source_term = vclock_get(&raft->term_map, source_id);
> +	return raft->is_enabled && source_term < raft->greatest_known_term;
> +}
> +
> +/** Remember the last term seen for replica  with id @a source_id. */
> +static inline void
> +raft_source_update_term(struct raft *raft, uint32_t source_id, uint64_t term)
> +{
> +	if ((uint64_t) vclock_get(&raft->term_map, source_id) >= term)

8. Probably having the term as uint64_t was a mistake from the beginning.
Feel free to change it to int64_t if you want, in a separate commit.

> +		return;
> +	vclock_follow(&raft->term_map, source_id, term);
> +	if (term > raft->greatest_known_term)
> +		raft->greatest_known_term = term;
> +}

9. I see these are not used in the raft code at all. Did you think about
moving it all to box/raft.h and box/raft.c? Or about covering this all
with unit tests in unit/raft.c if you decide to keep it here?

> +
>  /** Check if Raft is enabled. */
>  static inline bool
>  raft_is_enabled(const struct raft *raft)
> diff --git a/test/replication/gh-5445-leader-inconsistency.result b/test/replication/gh-5445-leader-inconsistency.result
> new file mode 100644
> index 000000000..ff3104de5
> --- /dev/null
> +++ b/test/replication/gh-5445-leader-inconsistency.result
> @@ -0,0 +1,291 @@
> +-- test-run result file version 2
> +test_run = require("test_run").new()
> + | ---
> + | ...
> +
> +is_leader_cmd = "return box.info.election.state == 'leader'"
> + | ---
> + | ...
> +
> +-- Auxiliary.
> +test_run:cmd('setopt delimiter ";"')
> + | ---
> + | - true
> + | ...
> +function get_leader(nrs)
> +    local leader_nr = 0
> +    test_run:wait_cond(function()
> +        for nr, do_check in pairs(nrs) do
> +            if do_check then
> +                local is_leader = test_run:eval('election_replica'..nr,
> +                                                is_leader_cmd)[1]
> +                if is_leader then
> +                    leader_nr = nr
> +                    return true
> +                end
> +            end
> +        end
> +        return false
> +    end)
> +    assert(leader_nr ~= 0)
> +    return leader_nr
> +end;
> + | ---
> + | ...
> +
> +function name(id)
> +    return 'election_replica'..id
> +end;

10. You can move this function above get_leader() and use it
in there.


More information about the Tarantool-patches mailing list