[Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Dec 18 00:43:30 MSK 2020


Thanks for the patch!

See 7 comments below.

> diff --git a/src/box/box.cc b/src/box/box.cc
> index 8e0c9a160..fb9167977 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1001,6 +1001,25 @@ box_set_replication_anon(void)
>  
>  }
>  
> +struct lsn_watcher_data {
> +	int *ack_count;
> +	int *watcher_count;
> +};
> +
> +static void
> +count_confirm_f(void *data)
> +{
> +	struct lsn_watcher_data *d = (struct lsn_watcher_data *)data;
> +	(*d->ack_count)++;
> +}
> +
> +static void
> +watcher_destroy_f(void *data)
> +{
> +	struct lsn_watcher_data *d = (struct lsn_watcher_data *)data;
> +	(*d->watcher_count)--;

An ultra-pro-master hint - you wouldn't need () if you would use
prefix -- and ++. But you decide, no rules.

> +}
> +
>  int
>  box_clear_synchro_queue(bool try_wait)
>  {
> @@ -1032,34 +1055,93 @@ box_clear_synchro_queue(bool try_wait)
>  		}
>  	}
>  
> -	if (!txn_limbo_is_empty(&txn_limbo)) {
> -		int64_t lsns[VCLOCK_MAX];
> -		int len = 0;
> -		const struct vclock  *vclock;
> +	if (txn_limbo_is_empty(&txn_limbo))
> +		return 0;
> +
> +	/*
> +	 * Allocate the watchers statically to not bother with alloc/free.
> +	 * This is fine since we have a single execution guard.
> +	 */
> +	static struct relay_lsn_watcher watchers[VCLOCK_MAX];
> +	for (int i = 1; i < VCLOCK_MAX; i++)
> +		rlist_create(&watchers[i].in_list);
> +
> +	int64_t wait_lsn = 0;
> +	bool restart = false;
> +	do {
> +		wait_lsn = txn_limbo_last_entry(&txn_limbo)->lsn;

1. What if the last transaction is asynchronous? They have -1 lsn in
the limbo.

> +		/*
> +		 * Take this node into account immediately.
> +		 * clear_synchro_queue() is a no-op on the limbo owner for now,
> +		 * so all the rows in the limbo must've come through the applier
> +		 * and so they already have an lsn assigned, even if their wal
> +		 * write isn't finished yet.
> +		 */
> +		assert(wait_lsn > 0);
> +		int count = vclock_get(box_vclock, former_leader_id) >= wait_lsn;
> +		int watcher_count = 0;
> +		struct lsn_watcher_data data = {
> +			.ack_count = &count,
> +			.watcher_count = &watcher_count,
> +		};
> +
>  		replicaset_foreach(replica) {
> -			if (replica->relay != NULL &&
> -			    relay_get_state(replica->relay) != RELAY_OFF &&
> -			    !replica->anon) {
> -				assert(!tt_uuid_is_equal(&INSTANCE_UUID,
> -							 &replica->uuid));
> -				vclock = relay_vclock(replica->relay);
> -				int64_t lsn = vclock_get(vclock,
> -							 former_leader_id);
> -				lsns[len++] = lsn;
> +			if (replica->anon || replica->relay == NULL ||

2. replica->relay is never NULL AFAIR.

> +			    relay_get_state(replica->relay) != RELAY_FOLLOW)
> +				continue;
> +			assert(replica->id != 0);

3. Maybe better use REPLICA_ID_NIL.

> +			assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid));
> +
> +			if (vclock_get(relay_vclock(replica->relay),
> +				       former_leader_id) >= wait_lsn) {
> +				count++;
> +				continue;
>  			}
> +
> +			relay_lsn_watcher_create(&watchers[replica->id],
> +						 former_leader_id, wait_lsn,
> +						 count_confirm_f,
> +						 watcher_destroy_f, &data);
> +			relay_set_lsn_watcher(replica->relay,
> +					      &watchers[replica->id]);
> +			watcher_count++;
>  		}
> -		lsns[len++] = vclock_get(box_vclock, former_leader_id);
> -		assert(len < VCLOCK_MAX);
>  
> -		int64_t confirm_lsn = 0;
> -		if (len >= replication_synchro_quorum) {
> -			qsort(lsns, len, sizeof(int64_t), cmp_i64);
> -			confirm_lsn = lsns[len - replication_synchro_quorum];
> +
> +		while (count < replication_synchro_quorum &&
> +		       count + watcher_count >= replication_synchro_quorum) {
> +			fiber_yield();

4. Does it mean the fiber hangs infinitely? I was thinking we could wait
for the timeout and return an error when we failed to wait.

Or maybe at least add a fiber cancellation check here so as this function
could be stopped somehow. Currently the function is basically immortal and
infinite if no quorum for too long time.

>  		}
>  
> -		txn_limbo_force_empty(&txn_limbo, confirm_lsn);
> -		assert(txn_limbo_is_empty(&txn_limbo));
> -	}
> +		/*
> +		 * In case some new limbo entries arrived, confirm them as well.
> +		 */
> +		restart = wait_lsn < txn_limbo_last_entry(&txn_limbo)->lsn;
> +
> +		/*
> +		 * Not enough replicas connected. Give user some time to
> +		 * reconfigure quorum and replicas some time to reconnect, then
> +		 * restart the watchers.
> +		 */
> +		if (count + watcher_count < replication_synchro_quorum) {
> +			say_info("clear_syncrho_queue cannot collect quorum: "
> +				 "number of connected replicas (%d) is less "
> +				 "than replication_synchro_quorum (%d). "
> +				 "Will retry in %.2f seconds",
> +				 count + watcher_count,
> +				 replication_synchro_quorum,
> +				 replication_timeout);
> +			fiber_sleep(replication_timeout);

5. The fixed time sleep looks not much better than the polling in
the old version of box_clear_synchro_queue(). Can we not just sleep
but wait for an event?

> +			restart = true;
> +		}
> +
> +		/* Detach the watchers that haven't fired. */
> +		for (int i = 1; i < VCLOCK_MAX; i++)
> +			rlist_del_entry(&watchers[i], in_list);
> +	} while (restart);
> +
> +	txn_limbo_force_empty(&txn_limbo, wait_lsn);
> +	assert(txn_limbo_is_empty(&txn_limbo));
>  	return 0;
> diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
> new file mode 100644
> index 000000000..e633f9e60
> --- /dev/null
> +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
> @@ -0,0 +1,137 @@
> +-- test-run result file version 2
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +
> +--
> +-- gh-5435: make sure the new limbo owner commits everything there is left in
> +-- the limbo from an old owner.
> +--
> +
> +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
> + | ---
> + | ...
> +
> +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'})
> + | ---
> + | ...
> +test_run:wait_fullmesh(SERVERS)
> + | ---
> + | ...
> +
> +-- Force election_replica1 to become leader.
> +test_run:switch('election_replica2')
> + | ---
> + | - true
> + | ...
> +box.cfg{election_mode='voter'}
> + | ---
> + | ...
> +test_run:switch('election_replica3')
> + | ---
> + | - true
> + | ...
> +box.cfg{election_mode='voter'}
> + | ---
> + | ...
> +
> +test_run:switch('election_replica1')
> + | ---
> + | - true
> + | ...
> +box.ctl.wait_rw()
> + | ---
> + | ...
> +
> +_ = box.schema.space.create('test', {is_sync=true})
> + | ---
> + | ...
> +_ = box.space.test:create_index('pk')
> + | ---
> + | ...
> +
> +-- Fill the limbo with pending entries. 3 mustn't receive them yet.
> +test_run:cmd('stop server election_replica3')
> + | ---
> + | - true
> + | ...
> +box.cfg{replication_synchro_quorum=3}
> + | ---
> + | ...
> +
> +lsn = box.info.lsn
> + | ---
> + | ...
> +
> +for i=1,10 do\
> +    require('fiber').create(function() box.space.test:insert{i} end)\
> +end
> + | ---
> + | ...
> +
> +-- Wait for WAL write and replication.
> +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end)
> + | ---
> + | - true
> + | ...
> +test_run:wait_lsn('election_replica2', 'election_replica1')
> + | ---
> + | ...
> +
> +test_run:cmd('switch election_replica2')
> + | ---
> + | - true
> + | ...
> +
> +test_run:cmd('stop server election_replica1')
> + | ---
> + | - true
> + | ...
> +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
> +-- It will vote for 3, however, since 3 has newer data, and start replication
> +-- once 3 becomes the leader.

6. 'vote for 2', '2 has newer data', '2 becomes the leader'. Otherwise I don't
understand.

Here you are at node 2, and below you make it candidate + wait_rw(). So it
becomes a leader, not 3.

> +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
> + | ---
> + | - true
> + | ...
> +
> +box.cfg{election_mode='candidate'}
> + | ---
> + | ...
> +box.ctl.wait_rw()
> + | ---
> + | ...
> +
> +-- If 2 decided whether to keep the rows or not right on becoming the leader,
> +-- it would roll them all back. Make sure 2 waits till the rows are replicated
> +-- to 3.
> +box.space.test:select{}
> + | ---
> + | - - [1]
> + |   - [2]
> + |   - [3]
> + |   - [4]
> + |   - [5]
> + |   - [6]
> + |   - [7]
> + |   - [8]
> + |   - [9]
> + |   - [10]
> + | ...
> +
> +test_run:cmd('switch default')
> + | ---
> + | - true
> + | ...
> +-- To silence the QA warning. The 1st replica is already stopped.
> +SERVERS[1] = nil
> + | ---
> + | ...
> +test_run:cmd('delete server election_replica1')
> + | ---
> + | - true
> + | ...
> +test_run:drop_cluster(SERVERS)
> + | ---
> + | ...

7. The test fails when I try to run multiple instances in parallel
like this:

	python test-run.py gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- 

It shows that the new leader rolled the data back.


More information about the Tarantool-patches mailing list