From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp33.i.mail.ru (smtp33.i.mail.ru [94.100.177.93]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 6A2E94765E0 for ; Wed, 23 Dec 2020 15:04:42 +0300 (MSK) From: Serge Petrenko References: <99accb77474e59e62203f9028196620a07635a7d.1607633488.git.sergepetrenko@tarantool.org> <6cb16c86-8db2-912d-1228-af9c550f8b80@tarantool.org> Message-ID: <1b444a17-dcf1-e1a0-2f3d-60d320125d99@tarantool.org> Date: Wed, 23 Dec 2020 15:04:41 +0300 MIME-Version: 1.0 In-Reply-To: <6cb16c86-8db2-912d-1228-af9c550f8b80@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-GB Subject: Re: [Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy , Cyrill Gorcunov Cc: tarantool-patches@dev.tarantool.org 18.12.2020 00:43, Vladislav Shpilevoy пишет: > Thanks for the patch! > > See 7 comments below. Hi! Thanks for the review! There's v2 of the patchset, where everything's different, so my answers don't matter much, but still. >> diff --git a/src/box/box.cc b/src/box/box.cc >> index 8e0c9a160..fb9167977 100644 >> --- a/src/box/box.cc >> +++ b/src/box/box.cc >> @@ -1001,6 +1001,25 @@ box_set_replication_anon(void) >> >> } >> >> +struct lsn_watcher_data { >> + int *ack_count; >> + int *watcher_count; >> +}; >> + >> +static void >> +count_confirm_f(void *data) >> +{ >> + struct lsn_watcher_data *d = (struct lsn_watcher_data *)data; >> + (*d->ack_count)++; >> +} >> + >> +static void >> +watcher_destroy_f(void *data) >> +{ >> + struct lsn_watcher_data *d = (struct lsn_watcher_data *)data; >> + (*d->watcher_count)--; > An ultra-pro-master hint - you wouldn't need () if you would use > prefix -- and ++. But you decide, no rules. Wow, cool. I didn't think of this. >> +} >> + >> int >> box_clear_synchro_queue(bool try_wait) >> { >> @@ -1032,34 +1055,93 @@ box_clear_synchro_queue(bool try_wait) >> } >> } >> >> - if (!txn_limbo_is_empty(&txn_limbo)) { >> - int64_t lsns[VCLOCK_MAX]; >> - int len = 0; >> - const struct vclock *vclock; >> + if (txn_limbo_is_empty(&txn_limbo)) >> + return 0; >> + >> + /* >> + * Allocate the watchers statically to not bother with alloc/free. >> + * This is fine since we have a single execution guard. >> + */ >> + static struct relay_lsn_watcher watchers[VCLOCK_MAX]; >> + for (int i = 1; i < VCLOCK_MAX; i++) >> + rlist_create(&watchers[i].in_list); >> + >> + int64_t wait_lsn = 0; >> + bool restart = false; >> + do { >> + wait_lsn = txn_limbo_last_entry(&txn_limbo)->lsn; > 1. What if the last transaction is asynchronous? They have -1 lsn in > the limbo. Oops, you're correct. I thought every limbo entry coming from a remote instance had an lsn assigned. >> + /* >> + * Take this node into account immediately. >> + * clear_synchro_queue() is a no-op on the limbo owner for now, >> + * so all the rows in the limbo must've come through the applier >> + * and so they already have an lsn assigned, even if their wal >> + * write isn't finished yet. >> + */ >> + assert(wait_lsn > 0); >> + int count = vclock_get(box_vclock, former_leader_id) >= wait_lsn; >> + int watcher_count = 0; >> + struct lsn_watcher_data data = { >> + .ack_count = &count, >> + .watcher_count = &watcher_count, >> + }; >> + >> replicaset_foreach(replica) { >> - if (replica->relay != NULL && >> - relay_get_state(replica->relay) != RELAY_OFF && >> - !replica->anon) { >> - assert(!tt_uuid_is_equal(&INSTANCE_UUID, >> - &replica->uuid)); >> - vclock = relay_vclock(replica->relay); >> - int64_t lsn = vclock_get(vclock, >> - former_leader_id); >> - lsns[len++] = lsn; >> + if (replica->anon || replica->relay == NULL || > 2. replica->relay is never NULL AFAIR. Yes, you're correct. I never noticed that. Thanks for pointing this out. >> + relay_get_state(replica->relay) != RELAY_FOLLOW) >> + continue; >> + assert(replica->id != 0); > 3. Maybe better use REPLICA_ID_NIL. I'm asserting here that watchers[0] is never set or used. This would be hidden by a check for REPLICA_ID_NIL. Is it possible that REPLICA_ID_NIL becomes, say, -1 one day? Then the assertion will lose its meaning. Anyway, by looking at `id != REPLICA_ID_NIL` you get no info on whether watchers[0] should be set or not. P.S. this part is also reworked. And you were correct, as always. >> + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)); >> + >> + if (vclock_get(relay_vclock(replica->relay), >> + former_leader_id) >= wait_lsn) { >> + count++; >> + continue; >> } >> + >> + relay_lsn_watcher_create(&watchers[replica->id], >> + former_leader_id, wait_lsn, >> + count_confirm_f, >> + watcher_destroy_f, &data); >> + relay_set_lsn_watcher(replica->relay, >> + &watchers[replica->id]); >> + watcher_count++; >> } >> - lsns[len++] = vclock_get(box_vclock, former_leader_id); >> - assert(len < VCLOCK_MAX); >> >> - int64_t confirm_lsn = 0; >> - if (len >= replication_synchro_quorum) { >> - qsort(lsns, len, sizeof(int64_t), cmp_i64); >> - confirm_lsn = lsns[len - replication_synchro_quorum]; >> + >> + while (count < replication_synchro_quorum && >> + count + watcher_count >= replication_synchro_quorum) { >> + fiber_yield(); > 4. Does it mean the fiber hangs infinitely? I was thinking we could wait > for the timeout and return an error when we failed to wait. Well, the fiber hangs while there is a possibility to collect quorum. Each time a relay exits, which should happen after a replication timeout when the replica is unresponsive, the corresponding watcher gets destroyed. When being destroyed it decreases watcher count and wakes the fiber up. Just like when it notifies the fiber that the `ack_count` is increased. So, once there are not enough watchers (relays) to collect a quorum, all the watchers are destroyed and the fiber sleeps for replication timeout. Then all the watchers are recreated and the process repeats. I'm either adding a timeout here, or a cancellation check. P.S. added both. > Or maybe at least add a fiber cancellation check here so as this function > could be stopped somehow. Currently the function is basically immortal and > infinite if no quorum for too long time. > >> } >> >> - txn_limbo_force_empty(&txn_limbo, confirm_lsn); >> - assert(txn_limbo_is_empty(&txn_limbo)); >> - } >> + /* >> + * In case some new limbo entries arrived, confirm them as well. >> + */ >> + restart = wait_lsn < txn_limbo_last_entry(&txn_limbo)->lsn; >> + >> + /* >> + * Not enough replicas connected. Give user some time to >> + * reconfigure quorum and replicas some time to reconnect, then >> + * restart the watchers. >> + */ >> + if (count + watcher_count < replication_synchro_quorum) { >> + say_info("clear_syncrho_queue cannot collect quorum: " >> + "number of connected replicas (%d) is less " >> + "than replication_synchro_quorum (%d). " >> + "Will retry in %.2f seconds", >> + count + watcher_count, >> + replication_synchro_quorum, >> + replication_timeout); >> + fiber_sleep(replication_timeout); > 5. The fixed time sleep looks not much better than the polling in > the old version of box_clear_synchro_queue(). Can we not just sleep > but wait for an event? Yes, that'd be better. Wait for replica connect then? P.S. I decided to  drop the 'retry' thing altogether. Let the user re-start `clear_synchro_queue` manually if there are not enough replicas. >> + restart = true; >> + } >> + >> + /* Detach the watchers that haven't fired. */ >> + for (int i = 1; i < VCLOCK_MAX; i++) >> + rlist_del_entry(&watchers[i], in_list); >> + } while (restart); >> + >> + txn_limbo_force_empty(&txn_limbo, wait_lsn); >> + assert(txn_limbo_is_empty(&txn_limbo)); >> return 0; >> diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result >> new file mode 100644 >> index 000000000..e633f9e60 >> --- /dev/null >> +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result >> @@ -0,0 +1,137 @@ >> +-- test-run result file version 2 >> +test_run = require('test_run').new() >> + | --- >> + | ... >> + >> +-- >> +-- gh-5435: make sure the new limbo owner commits everything there is left in >> +-- the limbo from an old owner. >> +-- >> + >> +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'} >> + | --- >> + | ... >> + >> +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'}) >> + | --- >> + | ... >> +test_run:wait_fullmesh(SERVERS) >> + | --- >> + | ... >> + >> +-- Force election_replica1 to become leader. >> +test_run:switch('election_replica2') >> + | --- >> + | - true >> + | ... >> +box.cfg{election_mode='voter'} >> + | --- >> + | ... >> +test_run:switch('election_replica3') >> + | --- >> + | - true >> + | ... >> +box.cfg{election_mode='voter'} >> + | --- >> + | ... >> + >> +test_run:switch('election_replica1') >> + | --- >> + | - true >> + | ... >> +box.ctl.wait_rw() >> + | --- >> + | ... >> + >> +_ = box.schema.space.create('test', {is_sync=true}) >> + | --- >> + | ... >> +_ = box.space.test:create_index('pk') >> + | --- >> + | ... >> + >> +-- Fill the limbo with pending entries. 3 mustn't receive them yet. >> +test_run:cmd('stop server election_replica3') >> + | --- >> + | - true >> + | ... >> +box.cfg{replication_synchro_quorum=3} >> + | --- >> + | ... >> + >> +lsn = box.info.lsn >> + | --- >> + | ... >> + >> +for i=1,10 do\ >> + require('fiber').create(function() box.space.test:insert{i} end)\ >> +end >> + | --- >> + | ... >> + >> +-- Wait for WAL write and replication. >> +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end) >> + | --- >> + | - true >> + | ... >> +test_run:wait_lsn('election_replica2', 'election_replica1') >> + | --- >> + | ... >> + >> +test_run:cmd('switch election_replica2') >> + | --- >> + | - true >> + | ... >> + >> +test_run:cmd('stop server election_replica1') >> + | --- >> + | - true >> + | ... >> +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it. >> +-- It will vote for 3, however, since 3 has newer data, and start replication >> +-- once 3 becomes the leader. > 6. 'vote for 2', '2 has newer data', '2 becomes the leader'. Otherwise I don't > understand. > > Here you are at node 2, and below you make it candidate + wait_rw(). So it > becomes a leader, not 3. Thanks! You're correct. >> +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"') >> + | --- >> + | - true >> + | ... >> + >> +box.cfg{election_mode='candidate'} >> + | --- >> + | ... >> +box.ctl.wait_rw() >> + | --- >> + | ... >> + >> +-- If 2 decided whether to keep the rows or not right on becoming the leader, >> +-- it would roll them all back. Make sure 2 waits till the rows are replicated >> +-- to 3. >> +box.space.test:select{} >> + | --- >> + | - - [1] >> + | - [2] >> + | - [3] >> + | - [4] >> + | - [5] >> + | - [6] >> + | - [7] >> + | - [8] >> + | - [9] >> + | - [10] >> + | ... >> + >> +test_run:cmd('switch default') >> + | --- >> + | - true >> + | ... >> +-- To silence the QA warning. The 1st replica is already stopped. >> +SERVERS[1] = nil >> + | --- >> + | ... >> +test_run:cmd('delete server election_replica1') >> + | --- >> + | - true >> + | ... >> +test_run:drop_cluster(SERVERS) >> + | --- >> + | ... > 7. The test fails when I try to run multiple instances in parallel > like this: > > python test-run.py gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- > > It shows that the new leader rolled the data back. Ok, will re-check. I've fixed the test, here's the diff: ================================================= diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro- queue-commit-all.result index e633f9e60..e806d9d53 100644 --- a/test/replication/gh-5435-clear-synchro-queue-commit-all.result +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result @@ -55,7 +55,7 @@ test_run:cmd('stop server election_replica3')   | ---   | - true   | ... -box.cfg{replication_synchro_quorum=3} +box.cfg{replication_synchro_quorum=3, replication_synchro_timeout=1000}   | ---   | ... @@ -88,13 +88,20 @@ test_run:cmd('stop server election_replica1')   | - true   | ...  -- Since 2 is not the leader yet, 3 doesn't replicate the rows from it. --- It will vote for 3, however, since 3 has newer data, and start replication --- once 3 becomes the leader. +-- It will vote for 2, however, since 2 has newer data, and start replication +-- once 2 becomes the leader.  test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')   | ---   | - true   | ... +-- Set a huge timeout for 2 reasons. +-- First, this guards us from the intance leaving clear_synchro_queue too early +-- and confirming nothing. +-- Second, it lets us test that the instance doesn't wait for the full timeout. +box.cfg{replication_synchro_timeout=1000} + | --- + | ...  box.cfg{election_mode='candidate'}   | ---   | ... -- Serge Petrenko