From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 9DDB24765E0 for ; Wed, 23 Dec 2020 20:28:05 +0300 (MSK) References: <3136023eb90fd3c6a10cb288466a9f3c8f9d2c01.1608724239.git.sergepetrenko@tarantool.org> From: Vladislav Shpilevoy Message-ID: <37c539ca-cdce-a4e9-5af9-3814b2c00131@tarantool.org> Date: Wed, 23 Dec 2020 18:28:03 +0100 MIME-Version: 1.0 In-Reply-To: <3136023eb90fd3c6a10cb288466a9f3c8f9d2c01.1608724239.git.sergepetrenko@tarantool.org> Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 7bit Subject: Re: [Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Serge Petrenko , gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org Thanks for the patch! See 9 comments below, my diff in the end of the email, and in the branch as a separate commit. > diff --git a/src/box/box.cc b/src/box/box.cc > index 2d403fc9a..38bf4034e 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -1002,6 +1002,36 @@ box_set_replication_anon(void) > > } > > +struct ack_trigger_data { > + bool fired; 1. This member can be deleted, if you would call trigger_clear(trigger) instead of setting the flag. > + int64_t *target_lsn; > + uint32_t *replica_id; > + int *quorum; > + int *ack_count; 2. Most of these members can be stored by value, see my diff. Even if the idea in my diff will look bad, still some minor details could be taken. > + struct fiber *waiter; > +}; > + > +struct ack_trigger { > + struct ack_trigger_data data; > + struct trigger trigger; 3. You can merge data right into the trigger object. > +}; > + > +static int ack_trigger_f(struct trigger *trigger, void *event) 4. Normally we put return type on a separate line. > +{ > + struct relay *relay = (struct relay *)event; > + struct ack_trigger_data *data = (struct ack_trigger_data *)trigger->data; > + if (data->fired) > + return 0; > + if (*data->target_lsn <= vclock_get(relay_vclock(relay), > + *data->replica_id)) { > + ++*data->ack_count; > + data->fired = true; > + if (*data->ack_count >= *data->quorum) > + fiber_wakeup(data->waiter); > + } > + return 0; > +} > @@ -1030,37 +1064,104 @@ box_clear_synchro_queue(bool try_wait) > break; > fiber_sleep(0.001); > } > + /* > + * Our mission was to clear the limbo from former leader's > + * transactions. Exit in case someone did that for us. > + */ > + if (txn_limbo_is_empty(&txn_limbo) || > + former_leader_id != txn_limbo.owner_id) { > + in_clear_synchro_queue = false; > + return 0; > + } > } > > - if (!txn_limbo_is_empty(&txn_limbo)) { > - int64_t lsns[VCLOCK_MAX]; > - int len = 0; > - const struct vclock *vclock; > - replicaset_foreach(replica) { > - if (replica->relay != NULL && > - relay_get_state(replica->relay) != RELAY_OFF && > - !replica->anon) { > - assert(!tt_uuid_is_equal(&INSTANCE_UUID, > - &replica->uuid)); > - vclock = relay_vclock(replica->relay); > - int64_t lsn = vclock_get(vclock, > - former_leader_id); > - lsns[len++] = lsn; > - } > - } > - lsns[len++] = vclock_get(box_vclock, former_leader_id); > - assert(len < VCLOCK_MAX); > + /* > + * clear_synchro_queue() is a no-op on the limbo owner, so all the rows > + * in the limbo must've come through the applier meaning they already > + * have an lsn assigned, even if their WAL write hasn't finished yet. > + */ > + int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn; > + assert(wait_lsn > 0); > + > + struct ack_trigger triggers[VCLOCK_MAX]; > + > + /* Take this node into account immediately. */ > + int ack_count = vclock_get(box_vclock, former_leader_id) >= wait_lsn; > + int trigger_count = 0; > + > + replicaset_foreach(replica) { > + if (relay_get_state(replica->relay) != RELAY_FOLLOW || > + replica->anon) > + continue; > + > + assert(replica->id != REPLICA_ID_NIL); > + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)); > > - int64_t confirm_lsn = 0; > - if (len >= replication_synchro_quorum) { > - qsort(lsns, len, sizeof(int64_t), cmp_i64); > - confirm_lsn = lsns[len - replication_synchro_quorum]; > + if (vclock_get(relay_vclock(replica->relay), > + former_leader_id) >= wait_lsn) { > + ack_count++; > + continue; > } > + int i = trigger_count++; > + triggers[i].data = { > + .fired = false, > + .target_lsn = &wait_lsn, > + .replica_id = &former_leader_id, > + .quorum = &replication_synchro_quorum, > + .ack_count = &ack_count, > + .waiter = fiber(), > + }; > + trigger_create(&triggers[i].trigger, ack_trigger_f, > + &triggers[i].data, NULL); > + relay_on_status_update(replica->relay, &triggers[i].trigger); > + } > + > + assert(trigger_count <= VCLOCK_MAX); > + > + if (ack_count + trigger_count < replication_synchro_quorum) { > + /* Don't even bother waiting when not enough replicas. */ > + say_warn("clear_synchro_queue cannot gather quorum. " > + "There're only %d replicas (including this one), while" > + "quorum should be %d.", ack_count + trigger_count, > + replication_synchro_quorum); > + for (int i = 0; i < trigger_count; i++) > + trigger_clear(&triggers[i].trigger); > + goto end; 5. Better wait anyway. Because more replicas may appear soon and confirm everything. During the timeout wait. Although it is probably not possible if we bind triggers to individual relays. I tried to fix it with a global trigger in my diff. > + } > + > + if (trigger_count > 0) { > + /* Allow to interrupt the function when it takes too long. */ > + bool cancellable = fiber_set_cancellable(true); > + fiber_sleep(replication_synchro_timeout); > + fiber_set_cancellable(cancellable); 6. I think a sleep is enough. If a user made the fiber non-cancellable, better leave it as is. We only change it to 'false', and only when do some operation when a wakeup is not legal, such as WAL write. > + } > + > + for (int i = 0; i < trigger_count; i++) > + trigger_clear(&triggers[i].trigger); > + > + /* > + * No point to proceed after cancellation even if got the quorum. > + * Emptying the limbo involves a pair of blocking WAL writes, > + * making the fiber sleep even longer, which isn't appropriate > + * when it's cancelled. > + */ > + if (fiber_is_cancelled()) { > + say_info("clear_synchro_queue interrupted by the fiber " > + "cancellation."); > + goto end; > + } > > - txn_limbo_force_empty(&txn_limbo, confirm_lsn); > - assert(txn_limbo_is_empty(&txn_limbo)); > + if (ack_count < replication_synchro_quorum) { > + say_warn("clear_synchro_queue timed out after %.2f " > + "seconds. Collected %d acks, quorum is %d. ", > + replication_synchro_timeout, ack_count, > + replication_synchro_quorum); > + goto end; 7. Why don't you return an error? The queue couldn't be cleared, so it looks like an error, no? I added diag_set() in my diff, but didn't change it to return -1 so far. > } > > + txn_limbo_force_empty(&txn_limbo, wait_lsn); > + assert(txn_limbo_is_empty(&txn_limbo)); > +end: > in_clear_synchro_queue = false; > return 0; > } > diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result > new file mode 100644 > index 000000000..e806d9d53 > --- /dev/null > +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result 8. Perhaps better rename to gh-5435-qsync-.... . Because now it is very useful that I can run all qsync tests with a small command `python test-run.py qsync`. > diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua > new file mode 100644 > index 000000000..da218624b > --- /dev/null > +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua > @@ -0,0 +1,65 @@ > +test_run = require('test_run').new() > + > +-- > +-- gh-5435: make sure the new limbo owner commits everything there is left in > +-- the limbo from an old owner. > +-- > + > +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'} > + > +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'}) > +test_run:wait_fullmesh(SERVERS) > + > +-- Force election_replica1 to become leader. > +test_run:switch('election_replica2') > +box.cfg{election_mode='voter'} > +test_run:switch('election_replica3') > +box.cfg{election_mode='voter'} > + > +test_run:switch('election_replica1') > +box.ctl.wait_rw() > + > +_ = box.schema.space.create('test', {is_sync=true}) > +_ = box.space.test:create_index('pk') > + > +-- Fill the limbo with pending entries. 3 mustn't receive them yet. > +test_run:cmd('stop server election_replica3') > +box.cfg{replication_synchro_quorum=3, replication_synchro_timeout=1000} > + > +lsn = box.info.lsn > + > +for i=1,10 do\ > + require('fiber').create(function() box.space.test:insert{i} end)\ > +end > + > +-- Wait for WAL write and replication. > +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end) > +test_run:wait_lsn('election_replica2', 'election_replica1') > + > +test_run:cmd('switch election_replica2') > + > +test_run:cmd('stop server election_replica1') > +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it. > +-- It will vote for 2, however, since 2 has newer data, and start replication > +-- once 2 becomes the leader. > +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"') > + > +-- Set a huge timeout for 2 reasons. > +-- First, this guards us from the intance leaving clear_synchro_queue too early 9. intance -> instance. See my changes below and on the branch in a separate commit. ==================== diff --git a/src/box/box.cc b/src/box/box.cc index 38bf4034e..90c07c342 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1002,32 +1002,102 @@ box_set_replication_anon(void) } -struct ack_trigger_data { - bool fired; - int64_t *target_lsn; - uint32_t *replica_id; - int *quorum; - int *ack_count; +/** Trigger to catch ACKs from all nodes when need to wait for quorum. */ +struct box_quorum_trigger { + /** Inherit trigger. */ + struct trigger base; + /** Minimal number of nodes who should confirm the target LSN. */ + int quorum; + /** Target LSN to wait for. */ + int64_t target_lsn; + /** Replica ID whose LSN is being waited. */ + uint32_t replica_id; + /** + * All versions of the given replica's LSN as seen by other nodes. The + * same as in the txn limbo. + */ + struct vclock vclock; + /** Number of nodes who confirmed the LSN. */ + int ack_count; + /** Fiber to wakeup when quorum is reached. */ struct fiber *waiter; }; -struct ack_trigger { - struct ack_trigger_data data; - struct trigger trigger; -}; - -static int ack_trigger_f(struct trigger *trigger, void *event) +static int +box_quorum_on_ack_f(struct trigger *trigger, void *event) { - struct relay *relay = (struct relay *)event; - struct ack_trigger_data *data = (struct ack_trigger_data *)trigger->data; - if (data->fired) + struct replication_ack *ack = (struct replication_ack *)event; + struct box_quorum_trigger *t = (struct box_quorum_trigger *)trigger; + int64_t new_lsn = vclock_get(ack->vclock, t->replica_id); + int64_t old_lsn = vclock_get(&t->vclock, ack->source); + if (new_lsn < t->target_lsn || old_lsn >= t->target_lsn) return 0; - if (*data->target_lsn <= vclock_get(relay_vclock(relay), - *data->replica_id)) { - ++*data->ack_count; - data->fired = true; - if (*data->ack_count >= *data->quorum) - fiber_wakeup(data->waiter); + + vclock_follow(&t->vclock, ack->source, new_lsn); + ++t->ack_count; + if (t->ack_count >= t->quorum) + fiber_wakeup(t->waiter); + trigger_clear(trigger); + return 0; +} + +/** + * Wait until at least @a quorum of nodes confirm @a target_lsn from the node + * with id @a lead_id. + */ +static int +box_wait_quorum(uint32_t lead_id, int64_t target_lsn, int quorum, + double timeout) +{ + struct box_quorum_trigger t; + memset(&t, 0, sizeof(t)); + vclock_create(&t.vclock); + + /* Take this node into account immediately. */ + int ack_count = vclock_get(box_vclock, lead_id) >= target_lsn; + replicaset_foreach(replica) { + if (relay_get_state(replica->relay) != RELAY_FOLLOW || + replica->anon) + continue; + + assert(replica->id != REPLICA_ID_NIL); + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)); + + int64_t lsn = vclock_get(relay_vclock(replica->relay), lead_id); + vclock_follow(&t.vclock, replica->id, lsn); + if (lsn >= target_lsn) { + ack_count++; + continue; + } + } + if (ack_count < quorum) { + t.quorum = quorum; + t.target_lsn = target_lsn; + t.replica_id = lead_id; + t.ack_count = ack_count; + t.waiter = fiber(); + trigger_create(&t.base, box_quorum_on_ack_f, NULL, NULL); + trigger_add(&replicaset.on_ack, &t.base); + fiber_sleep(timeout); + trigger_clear(&t.base); + ack_count = t.ack_count; + } + /* + * No point to proceed after cancellation even if got the quorum. The + * quorum is waited by limbo clear function. Emptying the limbo involves + * a pair of blocking WAL writes, making the fiber sleep even longer, + * which isn't appropriate when it's canceled. + */ + if (fiber_is_cancelled()) { + diag_set(ClientError, ER_QUORUM_WAIT, quorum, + "fiber is canceled"); + return -1; + } + if (ack_count < quorum) { + diag_set(ClientError, ER_QUORUM_WAIT, quorum, tt_sprintf( + "timeout after %.2lf seconds, collected %d acks with " + "%d quorum", timeout, ack_count, quorum)); + return -1; } return 0; } @@ -1083,85 +1153,14 @@ box_clear_synchro_queue(bool try_wait) int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn; assert(wait_lsn > 0); - struct ack_trigger triggers[VCLOCK_MAX]; - - /* Take this node into account immediately. */ - int ack_count = vclock_get(box_vclock, former_leader_id) >= wait_lsn; - int trigger_count = 0; - - replicaset_foreach(replica) { - if (relay_get_state(replica->relay) != RELAY_FOLLOW || - replica->anon) - continue; - - assert(replica->id != REPLICA_ID_NIL); - assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)); - - if (vclock_get(relay_vclock(replica->relay), - former_leader_id) >= wait_lsn) { - ack_count++; - continue; - } - int i = trigger_count++; - triggers[i].data = { - .fired = false, - .target_lsn = &wait_lsn, - .replica_id = &former_leader_id, - .quorum = &replication_synchro_quorum, - .ack_count = &ack_count, - .waiter = fiber(), - }; - trigger_create(&triggers[i].trigger, ack_trigger_f, - &triggers[i].data, NULL); - relay_on_status_update(replica->relay, &triggers[i].trigger); - } - - assert(trigger_count <= VCLOCK_MAX); - - if (ack_count + trigger_count < replication_synchro_quorum) { - /* Don't even bother waiting when not enough replicas. */ - say_warn("clear_synchro_queue cannot gather quorum. " - "There're only %d replicas (including this one), while" - "quorum should be %d.", ack_count + trigger_count, - replication_synchro_quorum); - for (int i = 0; i < trigger_count; i++) - trigger_clear(&triggers[i].trigger); - goto end; - } - - if (trigger_count > 0) { - /* Allow to interrupt the function when it takes too long. */ - bool cancellable = fiber_set_cancellable(true); - fiber_sleep(replication_synchro_timeout); - fiber_set_cancellable(cancellable); - } - - for (int i = 0; i < trigger_count; i++) - trigger_clear(&triggers[i].trigger); - - /* - * No point to proceed after cancellation even if got the quorum. - * Emptying the limbo involves a pair of blocking WAL writes, - * making the fiber sleep even longer, which isn't appropriate - * when it's cancelled. - */ - if (fiber_is_cancelled()) { - say_info("clear_synchro_queue interrupted by the fiber " - "cancellation."); - goto end; - } - - if (ack_count < replication_synchro_quorum) { - say_warn("clear_synchro_queue timed out after %.2f " - "seconds. Collected %d acks, quorum is %d. ", - replication_synchro_timeout, ack_count, - replication_synchro_quorum); - goto end; + if (box_wait_quorum(former_leader_id, wait_lsn, + replication_synchro_quorum, + replication_synchro_timeout) == 0) { + txn_limbo_force_empty(&txn_limbo, wait_lsn); + assert(txn_limbo_is_empty(&txn_limbo)); + } else { + diag_log(); } - - txn_limbo_force_empty(&txn_limbo, wait_lsn); - assert(txn_limbo_is_empty(&txn_limbo)); -end: in_clear_synchro_queue = false; return 0; } diff --git a/src/box/errcode.h b/src/box/errcode.h index cfea5bcf2..56573688e 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -274,6 +274,7 @@ struct errcode_record { /*219 */_(ER_XLOG_GAP, "%s") \ /*220 */_(ER_TOO_EARLY_SUBSCRIBE, "Can't subscribe non-anonymous replica %s until join is done") \ /*221 */_(ER_SQL_CANT_ADD_AUTOINC, "Can't add AUTOINCREMENT: space %s can't feature more than one AUTOINCREMENT field") \ + /*222 */_(ER_QUORUM_WAIT, "Couldn't wait for quorum %d: %s") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/box/relay.cc b/src/box/relay.cc index dbad8a680..df04f8198 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -146,8 +146,6 @@ struct relay { alignas(CACHELINE_SIZE) /** Known relay vclock. */ struct vclock vclock; - /** Triggers fired once tx knows the new relay vclock. */ - struct rlist on_status_update; /** * True if the relay needs Raft updates. It can live fine * without sending Raft updates, if it is a relay to an @@ -175,12 +173,6 @@ relay_vclock(const struct relay *relay) return &relay->tx.vclock; } -void -relay_on_status_update(struct relay *relay, struct trigger *trigger) -{ - trigger_add(&relay->tx.on_status_update, trigger); -} - double relay_last_row_time(const struct relay *relay) { @@ -209,7 +201,6 @@ relay_new(struct replica *replica) diag_create(&relay->diag); stailq_create(&relay->pending_gc); relay->state = RELAY_OFF; - rlist_create(&relay->tx.on_status_update); return relay; } @@ -421,6 +412,9 @@ tx_status_update(struct cmsg *msg) { struct relay_status_msg *status = (struct relay_status_msg *)msg; vclock_copy(&status->relay->tx.vclock, &status->vclock); + struct replication_ack ack; + ack.source = status->relay->replica->id; + ack.vclock = &status->vclock; /* * Let pending synchronous transactions know, which of * them were successfully sent to the replica. Acks are @@ -429,11 +423,10 @@ tx_status_update(struct cmsg *msg) * for master's CONFIRM message instead. */ if (txn_limbo.owner_id == instance_id) { - txn_limbo_ack(&txn_limbo, status->relay->replica->id, - vclock_get(&status->vclock, instance_id)); + txn_limbo_ack(&txn_limbo, ack.source, + vclock_get(ack.vclock, instance_id)); } - - trigger_run(&status->relay->tx.on_status_update, status->relay); + trigger_run(&replicaset.on_ack, &ack); static const struct cmsg_hop route[] = { {relay_status_update, NULL} diff --git a/src/box/relay.h b/src/box/relay.h index 4e6ecaffb..b32e2ea2a 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -93,10 +93,6 @@ relay_vclock(const struct relay *relay); double relay_last_row_time(const struct relay *relay); -/** Set a trigger on relay vclock update as seen by tx. */ -void -relay_on_status_update(struct relay *relay, struct trigger *trigger); - /** * Send a Raft update request to the relay channel. It is not * guaranteed that it will be delivered. The connection may break. diff --git a/src/box/replication.cc b/src/box/replication.cc index 931c73a37..980109597 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -98,6 +98,8 @@ replication_init(void) rlist_create(&replicaset.applier.on_rollback); rlist_create(&replicaset.applier.on_wal_write); + rlist_create(&replicaset.on_ack); + diag_create(&replicaset.applier.diag); } @@ -113,6 +115,7 @@ replication_free(void) relay_cancel(replica->relay); diag_destroy(&replicaset.applier.diag); + trigger_destroy(&replicaset.on_ack); } int diff --git a/src/box/replication.h b/src/box/replication.h index e57912848..2ad1cbf66 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -190,6 +190,14 @@ extern struct tt_uuid REPLICASET_UUID; typedef rb_tree(struct replica) replica_hash_t; +/** Ack which is passed to on ack triggers. */ +struct replication_ack { + /** Replica ID of the ACK source. */ + uint32_t source; + /** Confirmed vclock. */ + const struct vclock *vclock; +}; + /** * Replica set state. * @@ -271,6 +279,8 @@ struct replicaset { /* Shared applier diagnostic area. */ struct diag diag; } applier; + /** Triggers are invoked on each ACK from each replica. */ + struct rlist on_ack; /** Map of all known replica_id's to correspponding replica's. */ struct replica *replica_by_id[VCLOCK_MAX]; }; diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result index e806d9d53..2699231e5 100644 --- a/test/replication/gh-5435-clear-synchro-queue-commit-all.result +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result @@ -96,7 +96,7 @@ test_run:cmd('start server election_replica3 with wait=False, wait_load=False, a | ... -- Set a huge timeout for 2 reasons. --- First, this guards us from the intance leaving clear_synchro_queue too early +-- First, this guards us from the instance leaving clear_synchro_queue too early -- and confirming nothing. -- Second, it lets us test that the instance doesn't wait for the full timeout. box.cfg{replication_synchro_timeout=1000} diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua index da218624b..03705d96c 100644 --- a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua @@ -45,7 +45,7 @@ test_run:cmd('stop server election_replica1') test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"') -- Set a huge timeout for 2 reasons. --- First, this guards us from the intance leaving clear_synchro_queue too early +-- First, this guards us from the instance leaving clear_synchro_queue too early -- and confirming nothing. -- Second, it lets us test that the instance doesn't wait for the full timeout. box.cfg{replication_synchro_timeout=1000}