[Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Fri Dec 18 00:43:30 MSK 2020
Thanks for the patch!
See 7 comments below.
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 8e0c9a160..fb9167977 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1001,6 +1001,25 @@ box_set_replication_anon(void)
>
> }
>
> +struct lsn_watcher_data {
> + int *ack_count;
> + int *watcher_count;
> +};
> +
> +static void
> +count_confirm_f(void *data)
> +{
> + struct lsn_watcher_data *d = (struct lsn_watcher_data *)data;
> + (*d->ack_count)++;
> +}
> +
> +static void
> +watcher_destroy_f(void *data)
> +{
> + struct lsn_watcher_data *d = (struct lsn_watcher_data *)data;
> + (*d->watcher_count)--;
An ultra-pro-master hint - you wouldn't need () if you would use
prefix -- and ++. But you decide, no rules.
> +}
> +
> int
> box_clear_synchro_queue(bool try_wait)
> {
> @@ -1032,34 +1055,93 @@ box_clear_synchro_queue(bool try_wait)
> }
> }
>
> - if (!txn_limbo_is_empty(&txn_limbo)) {
> - int64_t lsns[VCLOCK_MAX];
> - int len = 0;
> - const struct vclock *vclock;
> + if (txn_limbo_is_empty(&txn_limbo))
> + return 0;
> +
> + /*
> + * Allocate the watchers statically to not bother with alloc/free.
> + * This is fine since we have a single execution guard.
> + */
> + static struct relay_lsn_watcher watchers[VCLOCK_MAX];
> + for (int i = 1; i < VCLOCK_MAX; i++)
> + rlist_create(&watchers[i].in_list);
> +
> + int64_t wait_lsn = 0;
> + bool restart = false;
> + do {
> + wait_lsn = txn_limbo_last_entry(&txn_limbo)->lsn;
1. What if the last transaction is asynchronous? They have -1 lsn in
the limbo.
> + /*
> + * Take this node into account immediately.
> + * clear_synchro_queue() is a no-op on the limbo owner for now,
> + * so all the rows in the limbo must've come through the applier
> + * and so they already have an lsn assigned, even if their wal
> + * write isn't finished yet.
> + */
> + assert(wait_lsn > 0);
> + int count = vclock_get(box_vclock, former_leader_id) >= wait_lsn;
> + int watcher_count = 0;
> + struct lsn_watcher_data data = {
> + .ack_count = &count,
> + .watcher_count = &watcher_count,
> + };
> +
> replicaset_foreach(replica) {
> - if (replica->relay != NULL &&
> - relay_get_state(replica->relay) != RELAY_OFF &&
> - !replica->anon) {
> - assert(!tt_uuid_is_equal(&INSTANCE_UUID,
> - &replica->uuid));
> - vclock = relay_vclock(replica->relay);
> - int64_t lsn = vclock_get(vclock,
> - former_leader_id);
> - lsns[len++] = lsn;
> + if (replica->anon || replica->relay == NULL ||
2. replica->relay is never NULL AFAIR.
> + relay_get_state(replica->relay) != RELAY_FOLLOW)
> + continue;
> + assert(replica->id != 0);
3. Maybe better use REPLICA_ID_NIL.
> + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid));
> +
> + if (vclock_get(relay_vclock(replica->relay),
> + former_leader_id) >= wait_lsn) {
> + count++;
> + continue;
> }
> +
> + relay_lsn_watcher_create(&watchers[replica->id],
> + former_leader_id, wait_lsn,
> + count_confirm_f,
> + watcher_destroy_f, &data);
> + relay_set_lsn_watcher(replica->relay,
> + &watchers[replica->id]);
> + watcher_count++;
> }
> - lsns[len++] = vclock_get(box_vclock, former_leader_id);
> - assert(len < VCLOCK_MAX);
>
> - int64_t confirm_lsn = 0;
> - if (len >= replication_synchro_quorum) {
> - qsort(lsns, len, sizeof(int64_t), cmp_i64);
> - confirm_lsn = lsns[len - replication_synchro_quorum];
> +
> + while (count < replication_synchro_quorum &&
> + count + watcher_count >= replication_synchro_quorum) {
> + fiber_yield();
4. Does it mean the fiber hangs infinitely? I was thinking we could wait
for the timeout and return an error when we failed to wait.
Or maybe at least add a fiber cancellation check here so as this function
could be stopped somehow. Currently the function is basically immortal and
infinite if no quorum for too long time.
> }
>
> - txn_limbo_force_empty(&txn_limbo, confirm_lsn);
> - assert(txn_limbo_is_empty(&txn_limbo));
> - }
> + /*
> + * In case some new limbo entries arrived, confirm them as well.
> + */
> + restart = wait_lsn < txn_limbo_last_entry(&txn_limbo)->lsn;
> +
> + /*
> + * Not enough replicas connected. Give user some time to
> + * reconfigure quorum and replicas some time to reconnect, then
> + * restart the watchers.
> + */
> + if (count + watcher_count < replication_synchro_quorum) {
> + say_info("clear_syncrho_queue cannot collect quorum: "
> + "number of connected replicas (%d) is less "
> + "than replication_synchro_quorum (%d). "
> + "Will retry in %.2f seconds",
> + count + watcher_count,
> + replication_synchro_quorum,
> + replication_timeout);
> + fiber_sleep(replication_timeout);
5. The fixed time sleep looks not much better than the polling in
the old version of box_clear_synchro_queue(). Can we not just sleep
but wait for an event?
> + restart = true;
> + }
> +
> + /* Detach the watchers that haven't fired. */
> + for (int i = 1; i < VCLOCK_MAX; i++)
> + rlist_del_entry(&watchers[i], in_list);
> + } while (restart);
> +
> + txn_limbo_force_empty(&txn_limbo, wait_lsn);
> + assert(txn_limbo_is_empty(&txn_limbo));
> return 0;
> diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
> new file mode 100644
> index 000000000..e633f9e60
> --- /dev/null
> +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
> @@ -0,0 +1,137 @@
> +-- test-run result file version 2
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +
> +--
> +-- gh-5435: make sure the new limbo owner commits everything there is left in
> +-- the limbo from an old owner.
> +--
> +
> +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
> + | ---
> + | ...
> +
> +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'})
> + | ---
> + | ...
> +test_run:wait_fullmesh(SERVERS)
> + | ---
> + | ...
> +
> +-- Force election_replica1 to become leader.
> +test_run:switch('election_replica2')
> + | ---
> + | - true
> + | ...
> +box.cfg{election_mode='voter'}
> + | ---
> + | ...
> +test_run:switch('election_replica3')
> + | ---
> + | - true
> + | ...
> +box.cfg{election_mode='voter'}
> + | ---
> + | ...
> +
> +test_run:switch('election_replica1')
> + | ---
> + | - true
> + | ...
> +box.ctl.wait_rw()
> + | ---
> + | ...
> +
> +_ = box.schema.space.create('test', {is_sync=true})
> + | ---
> + | ...
> +_ = box.space.test:create_index('pk')
> + | ---
> + | ...
> +
> +-- Fill the limbo with pending entries. 3 mustn't receive them yet.
> +test_run:cmd('stop server election_replica3')
> + | ---
> + | - true
> + | ...
> +box.cfg{replication_synchro_quorum=3}
> + | ---
> + | ...
> +
> +lsn = box.info.lsn
> + | ---
> + | ...
> +
> +for i=1,10 do\
> + require('fiber').create(function() box.space.test:insert{i} end)\
> +end
> + | ---
> + | ...
> +
> +-- Wait for WAL write and replication.
> +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end)
> + | ---
> + | - true
> + | ...
> +test_run:wait_lsn('election_replica2', 'election_replica1')
> + | ---
> + | ...
> +
> +test_run:cmd('switch election_replica2')
> + | ---
> + | - true
> + | ...
> +
> +test_run:cmd('stop server election_replica1')
> + | ---
> + | - true
> + | ...
> +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
> +-- It will vote for 3, however, since 3 has newer data, and start replication
> +-- once 3 becomes the leader.
6. 'vote for 2', '2 has newer data', '2 becomes the leader'. Otherwise I don't
understand.
Here you are at node 2, and below you make it candidate + wait_rw(). So it
becomes a leader, not 3.
> +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
> + | ---
> + | - true
> + | ...
> +
> +box.cfg{election_mode='candidate'}
> + | ---
> + | ...
> +box.ctl.wait_rw()
> + | ---
> + | ...
> +
> +-- If 2 decided whether to keep the rows or not right on becoming the leader,
> +-- it would roll them all back. Make sure 2 waits till the rows are replicated
> +-- to 3.
> +box.space.test:select{}
> + | ---
> + | - - [1]
> + | - [2]
> + | - [3]
> + | - [4]
> + | - [5]
> + | - [6]
> + | - [7]
> + | - [8]
> + | - [9]
> + | - [10]
> + | ...
> +
> +test_run:cmd('switch default')
> + | ---
> + | - true
> + | ...
> +-- To silence the QA warning. The 1st replica is already stopped.
> +SERVERS[1] = nil
> + | ---
> + | ...
> +test_run:cmd('delete server election_replica1')
> + | ---
> + | - true
> + | ...
> +test_run:drop_cluster(SERVERS)
> + | ---
> + | ...
7. The test fails when I try to run multiple instances in parallel
like this:
python test-run.py gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear-
It shows that the new leader rolled the data back.
More information about the Tarantool-patches
mailing list