[Tarantool-patches] [PATCH v6 1/1] applier: filter incoming synchro packets via transaction initiator

Serge Petrenko sergepetrenko at tarantool.org
Thu Jun 10 17:46:51 MSK 2021



10.06.2021 16:02, Cyrill Gorcunov пишет:
> Currently we use synchro packets filtration based on their contents,
> in particular by their xrow->replica_id value. Still there was a
> question if we can optimize this moment and rather filter out all
> packets coming from non-leader replica.
>
> Raft specification requires that only data from a current leader
> should be applied to local WAL but doesn't put a concrete claim on
> the data transport, ie how exactly rows are reaching replicas. This
> implies that data propagation may reach replicas indirectly via transit
> hops. Thus we drop applier->instance_id filtering and rely on
> xrow->replica_id matching instead.
>
> In the test (inspired by Serge Petrenko's test) we recreate the situation
> where replica3 obtains master's node data (which is a raft leader)
> indirectly via replica2 node.
>
> Closes #6035

Hi! Thanks for the patch!
Please find a couple of comments below.

>
> Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
> ---
>   src/box/applier.cc                            |  27 ++--
>   src/lib/raft/raft.h                           |   7 -
>   .../gh-5445-leader-inconsistency.result       |  15 +++
>   .../gh-5445-leader-inconsistency.test.lua     |   5 +
>   .../replication/gh-6035-applier-filter.result | 120 ++++++++++++++++++
>   .../gh-6035-applier-filter.test.lua           |  54 ++++++++
>   test/replication/gh-6035-master.lua           |   1 +
>   test/replication/gh-6035-node.lua             |  35 +++++
>   test/replication/gh-6035-replica1.lua         |   1 +
>   test/replication/gh-6035-replica2.lua         |   1 +
>   test/replication/suite.cfg                    |   3 +
>   11 files changed, 250 insertions(+), 19 deletions(-)
>   create mode 100644 test/replication/gh-6035-applier-filter.result
>   create mode 100644 test/replication/gh-6035-applier-filter.test.lua
>   create mode 120000 test/replication/gh-6035-master.lua
>   create mode 100644 test/replication/gh-6035-node.lua
>   create mode 120000 test/replication/gh-6035-replica1.lua
>   create mode 120000 test/replication/gh-6035-replica2.lua
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 33181fdbf..d3430f582 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -1027,21 +1027,24 @@ nopify:;
>    * Return 0 for success or -1 in case of an error.
>    */
>   static int
> -applier_apply_tx(struct applier *applier, struct stailq *rows)
> +applier_apply_tx(struct stailq *rows)
>   {
>   	/*
> -	 * Rows received not directly from a leader are ignored. That is a
> -	 * protection against the case when an old leader keeps sending data
> -	 * around not knowing yet that it is not a leader anymore.
> +	 * Initially we've been filtering out data if it came from
> +	 * an applier which instance_id doesn't match raft->leader,
> +	 * but this prevents from obtaining valid leader's data when
> +	 * it comes from intermediate node. For example a series of
> +	 * replica hops
>   	 *
> -	 * XXX: it may be that this can be fine to apply leader transactions by
> -	 * looking at their replica_id field if it is equal to leader id. That
> -	 * can be investigated as an 'optimization'. Even though may not give
> -	 * anything, because won't change total number of rows sent in the
> -	 * network anyway.
> +	 *  master -> replica 1 -> replica 2
> +	 *
> +	 * where each replica carries master's initiated transaction
> +	 * in xrow->replica_id field and master's data get propagated
> +	 * indirectly.
> +	 *
> +	 * Finally we dropped such "sender" filtration and use transaction
> +	 * "initiator" filtration via xrow->replica_id only.
>   	 */
> -	if (!raft_is_source_allowed(box_raft(), applier->instance_id))
> -		return 0;
>   	struct xrow_header *first_row = &stailq_first_entry(rows,
>   					struct applier_tx_row, next)->row;
>   	struct xrow_header *last_row;
> @@ -1312,7 +1315,7 @@ applier_subscribe(struct applier *applier)
>   					diag_raise();
>   			}
>   			applier_signal_ack(applier);
> -		} else if (applier_apply_tx(applier, &rows) != 0) {
> +		} else if (applier_apply_tx(&rows) != 0) {
>   			diag_raise();
>   		}
>   
> diff --git a/src/lib/raft/raft.h b/src/lib/raft/raft.h
> index a8da564b0..fae30b03d 100644
> --- a/src/lib/raft/raft.h
> +++ b/src/lib/raft/raft.h
> @@ -236,13 +236,6 @@ raft_is_ro(const struct raft *raft)
>   	return raft->is_enabled && raft->state != RAFT_STATE_LEADER;
>   }
>   
> -/** See if the instance can accept rows from an instance with the given ID. */
> -static inline bool
> -raft_is_source_allowed(const struct raft *raft, uint32_t source_id)
> -{
> -	return !raft->is_enabled || raft->leader == source_id;
> -}
> -
>   /** Check if Raft is enabled. */
>   static inline bool
>   raft_is_enabled(const struct raft *raft)
> diff --git a/test/replication/gh-5445-leader-inconsistency.result b/test/replication/gh-5445-leader-inconsistency.result
> index 5c6169f50..38d0b097c 100644
> --- a/test/replication/gh-5445-leader-inconsistency.result
> +++ b/test/replication/gh-5445-leader-inconsistency.result
> @@ -175,9 +175,17 @@ test_run:cmd('stop server '..leader)
>    | ---
>    | - true
>    | ...
> +-- And other node as well so it would notice new term on restart.

The comment should go one line below. Also how's that related to new term?
We're stopping the other node, because otherwise the row with [2] would get
replicated to next_leader.

>   is_possible_leader[leader_nr] = false
>    | ---
>    | ...
> +test_run:cmd('stop server '..other)
> + | ---
> + | - true
> + | ...
> +is_possible_leader[other_nr] = false
> + | ---
> + | ...
>   
>   -- Emulate a situation when next_leader wins the elections. It can't do that in
>   -- this configuration, obviously, because it's behind the 'other' node, so set
> @@ -195,6 +203,13 @@ assert(get_leader(is_possible_leader) == next_leader_nr)
>    | ---
>    | - true
>    | ...
> +test_run:cmd('start server '..other..' with args="1 0.4 voter 2"')
> + | ---
> + | - true
> + | ...

I see now why it's booted as voter. Because we want the old leader to win
the election once again below. Sorry for the confusion

> +is_possible_leader[other_nr] = true
> + | ---
> + | ...
>   test_run:switch(other)
>    | ---
>    | - true
> diff --git a/test/replication/gh-5445-leader-inconsistency.test.lua b/test/replication/gh-5445-leader-inconsistency.test.lua
> index e7952f5fa..fad101881 100644
> --- a/test/replication/gh-5445-leader-inconsistency.test.lua
> +++ b/test/replication/gh-5445-leader-inconsistency.test.lua
> @@ -81,7 +81,10 @@ test_run:wait_cond(function() return box.space.test:get{2} ~= nil end)
>   -- Old leader is gone.
>   test_run:switch('default')
>   test_run:cmd('stop server '..leader)
> +-- And other node as well so it would notice new term on restart.
>   is_possible_leader[leader_nr] = false
> +test_run:cmd('stop server '..other)
> +is_possible_leader[other_nr] = false
>   
>   -- Emulate a situation when next_leader wins the elections. It can't do that in
>   -- this configuration, obviously, because it's behind the 'other' node, so set
> @@ -93,6 +96,8 @@ is_possible_leader[leader_nr] = false
>   -- a situation when some rows from the old leader were not received).
>   test_run:cmd('start server '..next_leader..' with args="1 0.4 candidate 1"')
>   assert(get_leader(is_possible_leader) == next_leader_nr)
> +test_run:cmd('start server '..other..' with args="1 0.4 voter 2"')
> +is_possible_leader[other_nr] = true
>   test_run:switch(other)
>   -- New leader didn't know about the unconfirmed rows but still rolled them back.
>   test_run:wait_cond(function() return box.space.test:get{2} == nil end)
> diff --git a/test/replication/gh-6035-applier-filter.result b/test/replication/gh-6035-applier-filter.result
> new file mode 100644
> index 000000000..fd270de64
> --- /dev/null
> +++ b/test/replication/gh-6035-applier-filter.result
> @@ -0,0 +1,120 @@
> +-- test-run result file version 2
> +--
> +-- gh-6035: verify synchronous rows filtration in applier,
> +-- we need to be sure that filtering synchronous rows is
> +-- done via transaction initiator not sender (iow via
> +-- xrow->replica_id).
> +--
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +
> +--
> +-- Prepare a scheme with transitional node
> +--
> +--  master <=> replica1 => replica2
> +--
> +-- such as transaction initiated on the master node would
> +-- be replicated to the replica2 via interim replica1 node.
> +--
> +
> +test_run:cmd('create server master with script="replication/gh-6035-master.lua"')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('create server replica1 with script="replication/gh-6035-replica1.lua"')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('create server replica2 with script="replication/gh-6035-replica2.lua"')
> + | ---
> + | - true
> + | ...
> +
> +test_run:cmd('start server master')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('start server replica1')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('start server replica2')
> + | ---
> + | - true
> + | ...
> +
> +--
> +-- Make the master to be RAFT leader.
> +test_run:switch('master')
> + | ---
> + | - true
> + | ...
> +box.ctl.promote()
> + | ---
> + | ...
> +_ = box.schema.space.create("sync", {is_sync = true})
> + | ---
> + | ...
> +_ = box.space.sync:create_index("pk")
> + | ---
> + | ...
> +box.space.sync:insert{1}
> + | ---
> + | - [1]
> + | ...
> +
> +--
> +-- The first hop is replica1.
> +test_run:switch('replica1')
> + | ---
> + | - true
> + | ...
> +box.space.sync:select{}
> + | ---
> + | - - [1]
> + | ...
> +
> +--
> +-- And the second hop is replica2 where
> +-- replica1 replicated the data to us.
> +test_run:switch('replica2')
> + | ---
> + | - true
> + | ...
> +test_run:wait_cond(function()                   \
> +    return box.space.sync:select{} ~= nil and   \
> +           box.space.sync:select{}[1][1] == 1   \
> +    end, 100)

You may simply say `box.spacy.sync:get{1} ~= nil` here.
> + | ---
> + | - true
> + | ...
> +
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('stop server master')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('delete server master')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('stop server replica1')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('delete server replica1')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('stop server replica2')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('delete server replica2')
> + | ---
> + | - true
> + | ...
> diff --git a/test/replication/gh-6035-applier-filter.test.lua b/test/replication/gh-6035-applier-filter.test.lua
> new file mode 100644
> index 000000000..5a1ca3fd3
> --- /dev/null
> +++ b/test/replication/gh-6035-applier-filter.test.lua
> @@ -0,0 +1,54 @@
> +--
> +-- gh-6035: verify synchronous rows filtration in applier,
> +-- we need to be sure that filtering synchronous rows is
> +-- done via transaction initiator not sender (iow via
> +-- xrow->replica_id).
> +--
> +test_run = require('test_run').new()
> +
> +--
> +-- Prepare a scheme with transitional node
> +--
> +--  master <=> replica1 => replica2
> +--
> +-- such as transaction initiated on the master node would
> +-- be replicated to the replica2 via interim replica1 node.
> +--
> +
> +test_run:cmd('create server master with script="replication/gh-6035-master.lua"')
> +test_run:cmd('create server replica1 with script="replication/gh-6035-replica1.lua"')
> +test_run:cmd('create server replica2 with script="replication/gh-6035-replica2.lua"')
> +
> +test_run:cmd('start server master')
> +test_run:cmd('start server replica1')
> +test_run:cmd('start server replica2')
> +
> +--
> +-- Make the master to be RAFT leader.
> +test_run:switch('master')
> +box.ctl.promote()
> +_ = box.schema.space.create("sync", {is_sync = true})
> +_ = box.space.sync:create_index("pk")
> +box.space.sync:insert{1}
> +
> +--
> +-- The first hop is replica1.
> +test_run:switch('replica1')
> +box.space.sync:select{}
> +
> +--
> +-- And the second hop is replica2 where
> +-- replica1 replicated the data to us.
> +test_run:switch('replica2')
> +test_run:wait_cond(function()                   \
> +    return box.space.sync:select{} ~= nil and   \
> +           box.space.sync:select{}[1][1] == 1   \
> +    end, 100)
> +
> +test_run:switch('default')
> +test_run:cmd('stop server master')
> +test_run:cmd('delete server master')
> +test_run:cmd('stop server replica1')
> +test_run:cmd('delete server replica1')
> +test_run:cmd('stop server replica2')
> +test_run:cmd('delete server replica2')
> diff --git a/test/replication/gh-6035-master.lua b/test/replication/gh-6035-master.lua
> new file mode 120000
> index 000000000..f7ede7ef2
> --- /dev/null
> +++ b/test/replication/gh-6035-master.lua
> @@ -0,0 +1 @@
> +gh-6035-node.lua
> \ No newline at end of file
> diff --git a/test/replication/gh-6035-node.lua b/test/replication/gh-6035-node.lua
> new file mode 100644
> index 000000000..e3819471a
> --- /dev/null
> +++ b/test/replication/gh-6035-node.lua
> @@ -0,0 +1,35 @@
> +local SOCKET_DIR = require('fio').cwd()
> +local INSTANCE_ID = string.match(arg[0], "gh%-6035%-(.+)%.lua")
> +
> +local function unix_socket(name)
> +    return SOCKET_DIR .. "/" .. name .. '.sock';
> +end
> +
> +require('console').listen(os.getenv('ADMIN'))
> +
> +if INSTANCE_ID == "master" then
> +    box.cfg({
> +        listen = unix_socket("master"),
> +    })
> +elseif INSTANCE_ID == "replica1" then
> +    box.cfg({
> +        listen = unix_socket("replica1"),
> +        replication = {
> +            unix_socket("master"),
> +            unix_socket("replica1")
> +        },
> +        election_mode = 'voter'
> +    })
> +else
> +    assert(INSTANCE_ID == "replica2")
> +    box.cfg({
> +        replication = {
> +            unix_socket("master"),

It should be 'replica1' instead of  'master' here.

Also you set up replica1 above to be voter, but it should have no 
election_mode
initially. Otherwise it won't be able to register replica2.
Once everyone's registered, make replica1 a voter.


> +        },
> +        election_mode = 'voter'
> +    })
> +end
> +
> +box.once("bootstrap", function()
> +    box.schema.user.grant('guest', 'super')
> +end)
> diff --git a/test/replication/gh-6035-replica1.lua b/test/replication/gh-6035-replica1.lua
> new file mode 120000
> index 000000000..f7ede7ef2
> --- /dev/null
> +++ b/test/replication/gh-6035-replica1.lua
> @@ -0,0 +1 @@
> +gh-6035-node.lua
> \ No newline at end of file
> diff --git a/test/replication/gh-6035-replica2.lua b/test/replication/gh-6035-replica2.lua
> new file mode 120000
> index 000000000..f7ede7ef2
> --- /dev/null
> +++ b/test/replication/gh-6035-replica2.lua
> @@ -0,0 +1 @@
> +gh-6035-node.lua
> \ No newline at end of file
> diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
> index 27eab20c2..55ec022ff 100644
> --- a/test/replication/suite.cfg
> +++ b/test/replication/suite.cfg
> @@ -47,6 +47,9 @@
>       "gh-6032-promote-wal-write.test.lua": {},
>       "gh-6057-qsync-confirm-async-no-wal.test.lua": {},
>       "gh-6094-rs-uuid-mismatch.test.lua": {},
> +    "gh-6035-applier-filter.test.lua": {
> +        "memtx": {"engine": "memtx"}
> +    },
>       "*": {
>           "memtx": {"engine": "memtx"},
>           "vinyl": {"engine": "vinyl"}

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list