From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 9806945C304 for ; Fri, 18 Dec 2020 00:43:31 +0300 (MSK) References: <99accb77474e59e62203f9028196620a07635a7d.1607633488.git.sergepetrenko@tarantool.org> From: Vladislav Shpilevoy Message-ID: <6cb16c86-8db2-912d-1228-af9c550f8b80@tarantool.org> Date: Thu, 17 Dec 2020 22:43:30 +0100 MIME-Version: 1.0 In-Reply-To: <99accb77474e59e62203f9028196620a07635a7d.1607633488.git.sergepetrenko@tarantool.org> Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 7bit Subject: Re: [Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Serge Petrenko , cyrillos@gmail.com Cc: tarantool-patches@dev.tarantool.org Thanks for the patch! See 7 comments below. > diff --git a/src/box/box.cc b/src/box/box.cc > index 8e0c9a160..fb9167977 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -1001,6 +1001,25 @@ box_set_replication_anon(void) > > } > > +struct lsn_watcher_data { > + int *ack_count; > + int *watcher_count; > +}; > + > +static void > +count_confirm_f(void *data) > +{ > + struct lsn_watcher_data *d = (struct lsn_watcher_data *)data; > + (*d->ack_count)++; > +} > + > +static void > +watcher_destroy_f(void *data) > +{ > + struct lsn_watcher_data *d = (struct lsn_watcher_data *)data; > + (*d->watcher_count)--; An ultra-pro-master hint - you wouldn't need () if you would use prefix -- and ++. But you decide, no rules. > +} > + > int > box_clear_synchro_queue(bool try_wait) > { > @@ -1032,34 +1055,93 @@ box_clear_synchro_queue(bool try_wait) > } > } > > - if (!txn_limbo_is_empty(&txn_limbo)) { > - int64_t lsns[VCLOCK_MAX]; > - int len = 0; > - const struct vclock *vclock; > + if (txn_limbo_is_empty(&txn_limbo)) > + return 0; > + > + /* > + * Allocate the watchers statically to not bother with alloc/free. > + * This is fine since we have a single execution guard. > + */ > + static struct relay_lsn_watcher watchers[VCLOCK_MAX]; > + for (int i = 1; i < VCLOCK_MAX; i++) > + rlist_create(&watchers[i].in_list); > + > + int64_t wait_lsn = 0; > + bool restart = false; > + do { > + wait_lsn = txn_limbo_last_entry(&txn_limbo)->lsn; 1. What if the last transaction is asynchronous? They have -1 lsn in the limbo. > + /* > + * Take this node into account immediately. > + * clear_synchro_queue() is a no-op on the limbo owner for now, > + * so all the rows in the limbo must've come through the applier > + * and so they already have an lsn assigned, even if their wal > + * write isn't finished yet. > + */ > + assert(wait_lsn > 0); > + int count = vclock_get(box_vclock, former_leader_id) >= wait_lsn; > + int watcher_count = 0; > + struct lsn_watcher_data data = { > + .ack_count = &count, > + .watcher_count = &watcher_count, > + }; > + > replicaset_foreach(replica) { > - if (replica->relay != NULL && > - relay_get_state(replica->relay) != RELAY_OFF && > - !replica->anon) { > - assert(!tt_uuid_is_equal(&INSTANCE_UUID, > - &replica->uuid)); > - vclock = relay_vclock(replica->relay); > - int64_t lsn = vclock_get(vclock, > - former_leader_id); > - lsns[len++] = lsn; > + if (replica->anon || replica->relay == NULL || 2. replica->relay is never NULL AFAIR. > + relay_get_state(replica->relay) != RELAY_FOLLOW) > + continue; > + assert(replica->id != 0); 3. Maybe better use REPLICA_ID_NIL. > + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)); > + > + if (vclock_get(relay_vclock(replica->relay), > + former_leader_id) >= wait_lsn) { > + count++; > + continue; > } > + > + relay_lsn_watcher_create(&watchers[replica->id], > + former_leader_id, wait_lsn, > + count_confirm_f, > + watcher_destroy_f, &data); > + relay_set_lsn_watcher(replica->relay, > + &watchers[replica->id]); > + watcher_count++; > } > - lsns[len++] = vclock_get(box_vclock, former_leader_id); > - assert(len < VCLOCK_MAX); > > - int64_t confirm_lsn = 0; > - if (len >= replication_synchro_quorum) { > - qsort(lsns, len, sizeof(int64_t), cmp_i64); > - confirm_lsn = lsns[len - replication_synchro_quorum]; > + > + while (count < replication_synchro_quorum && > + count + watcher_count >= replication_synchro_quorum) { > + fiber_yield(); 4. Does it mean the fiber hangs infinitely? I was thinking we could wait for the timeout and return an error when we failed to wait. Or maybe at least add a fiber cancellation check here so as this function could be stopped somehow. Currently the function is basically immortal and infinite if no quorum for too long time. > } > > - txn_limbo_force_empty(&txn_limbo, confirm_lsn); > - assert(txn_limbo_is_empty(&txn_limbo)); > - } > + /* > + * In case some new limbo entries arrived, confirm them as well. > + */ > + restart = wait_lsn < txn_limbo_last_entry(&txn_limbo)->lsn; > + > + /* > + * Not enough replicas connected. Give user some time to > + * reconfigure quorum and replicas some time to reconnect, then > + * restart the watchers. > + */ > + if (count + watcher_count < replication_synchro_quorum) { > + say_info("clear_syncrho_queue cannot collect quorum: " > + "number of connected replicas (%d) is less " > + "than replication_synchro_quorum (%d). " > + "Will retry in %.2f seconds", > + count + watcher_count, > + replication_synchro_quorum, > + replication_timeout); > + fiber_sleep(replication_timeout); 5. The fixed time sleep looks not much better than the polling in the old version of box_clear_synchro_queue(). Can we not just sleep but wait for an event? > + restart = true; > + } > + > + /* Detach the watchers that haven't fired. */ > + for (int i = 1; i < VCLOCK_MAX; i++) > + rlist_del_entry(&watchers[i], in_list); > + } while (restart); > + > + txn_limbo_force_empty(&txn_limbo, wait_lsn); > + assert(txn_limbo_is_empty(&txn_limbo)); > return 0; > diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result > new file mode 100644 > index 000000000..e633f9e60 > --- /dev/null > +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result > @@ -0,0 +1,137 @@ > +-- test-run result file version 2 > +test_run = require('test_run').new() > + | --- > + | ... > + > +-- > +-- gh-5435: make sure the new limbo owner commits everything there is left in > +-- the limbo from an old owner. > +-- > + > +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'} > + | --- > + | ... > + > +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'}) > + | --- > + | ... > +test_run:wait_fullmesh(SERVERS) > + | --- > + | ... > + > +-- Force election_replica1 to become leader. > +test_run:switch('election_replica2') > + | --- > + | - true > + | ... > +box.cfg{election_mode='voter'} > + | --- > + | ... > +test_run:switch('election_replica3') > + | --- > + | - true > + | ... > +box.cfg{election_mode='voter'} > + | --- > + | ... > + > +test_run:switch('election_replica1') > + | --- > + | - true > + | ... > +box.ctl.wait_rw() > + | --- > + | ... > + > +_ = box.schema.space.create('test', {is_sync=true}) > + | --- > + | ... > +_ = box.space.test:create_index('pk') > + | --- > + | ... > + > +-- Fill the limbo with pending entries. 3 mustn't receive them yet. > +test_run:cmd('stop server election_replica3') > + | --- > + | - true > + | ... > +box.cfg{replication_synchro_quorum=3} > + | --- > + | ... > + > +lsn = box.info.lsn > + | --- > + | ... > + > +for i=1,10 do\ > + require('fiber').create(function() box.space.test:insert{i} end)\ > +end > + | --- > + | ... > + > +-- Wait for WAL write and replication. > +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end) > + | --- > + | - true > + | ... > +test_run:wait_lsn('election_replica2', 'election_replica1') > + | --- > + | ... > + > +test_run:cmd('switch election_replica2') > + | --- > + | - true > + | ... > + > +test_run:cmd('stop server election_replica1') > + | --- > + | - true > + | ... > +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it. > +-- It will vote for 3, however, since 3 has newer data, and start replication > +-- once 3 becomes the leader. 6. 'vote for 2', '2 has newer data', '2 becomes the leader'. Otherwise I don't understand. Here you are at node 2, and below you make it candidate + wait_rw(). So it becomes a leader, not 3. > +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"') > + | --- > + | - true > + | ... > + > +box.cfg{election_mode='candidate'} > + | --- > + | ... > +box.ctl.wait_rw() > + | --- > + | ... > + > +-- If 2 decided whether to keep the rows or not right on becoming the leader, > +-- it would roll them all back. Make sure 2 waits till the rows are replicated > +-- to 3. > +box.space.test:select{} > + | --- > + | - - [1] > + | - [2] > + | - [3] > + | - [4] > + | - [5] > + | - [6] > + | - [7] > + | - [8] > + | - [9] > + | - [10] > + | ... > + > +test_run:cmd('switch default') > + | --- > + | - true > + | ... > +-- To silence the QA warning. The 1st replica is already stopped. > +SERVERS[1] = nil > + | --- > + | ... > +test_run:cmd('delete server election_replica1') > + | --- > + | - true > + | ... > +test_run:drop_cluster(SERVERS) > + | --- > + | ... 7. The test fails when I try to run multiple instances in parallel like this: python test-run.py gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- It shows that the new leader rolled the data back.