From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp32.i.mail.ru (smtp32.i.mail.ru [94.100.177.92]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id DA8494765E0 for ; Thu, 24 Dec 2020 19:12:22 +0300 (MSK) From: Serge Petrenko References: <3136023eb90fd3c6a10cb288466a9f3c8f9d2c01.1608724239.git.sergepetrenko@tarantool.org> <37c539ca-cdce-a4e9-5af9-3814b2c00131@tarantool.org> Message-ID: Date: Thu, 24 Dec 2020 19:12:21 +0300 MIME-Version: 1.0 In-Reply-To: <37c539ca-cdce-a4e9-5af9-3814b2c00131@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-GB Subject: Re: [Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy , gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org 23.12.2020 20:28, Vladislav Shpilevoy пишет: > Thanks for the patch! > > See 9 comments below, my diff in the end of the email, and in > the branch as a separate commit. Thanks for your review & the patch! I've applied your diff with a couple of changes. Please find my answers and diff below. >> diff --git a/src/box/box.cc b/src/box/box.cc >> index 2d403fc9a..38bf4034e 100644 >> --- a/src/box/box.cc >> +++ b/src/box/box.cc >> @@ -1002,6 +1002,36 @@ box_set_replication_anon(void) >> >> } >> >> +struct ack_trigger_data { >> + bool fired; > 1. This member can be deleted, if you would call > trigger_clear(trigger) instead of setting the flag. True. >> + int64_t *target_lsn; >> + uint32_t *replica_id; >> + int *quorum; >> + int *ack_count; > 2. Most of these members can be stored by value, see my diff. > Even if the idea in my diff will look bad, still some minor > details could be taken. I store quorum by reference so that the trigger gets the new value once quorumis changed. This would work when the quorum is increased, at least. Speaking of other values, I thought it might be useful to change them on the fly one day, even though this isn't used now. Anyway, let's just retry when quorum is changed. >> + struct fiber *waiter; >> +}; >> + >> +struct ack_trigger { >> + struct ack_trigger_data data; >> + struct trigger trigger; > 3. You can merge data right into the trigger object. > >> +}; >> + >> +static int ack_trigger_f(struct trigger *trigger, void *event) > 4. Normally we put return type on a separate line. Sure. My bad. >> +{ >> + struct relay *relay = (struct relay *)event; >> + struct ack_trigger_data *data = (struct ack_trigger_data *)trigger->data; >> + if (data->fired) >> + return 0; >> + if (*data->target_lsn <= vclock_get(relay_vclock(relay), >> + *data->replica_id)) { >> + ++*data->ack_count; >> + data->fired = true; >> + if (*data->ack_count >= *data->quorum) >> + fiber_wakeup(data->waiter); >> + } >> + return 0; >> +} >> @@ -1030,37 +1064,104 @@ box_clear_synchro_queue(bool try_wait) >> break; >> fiber_sleep(0.001); >> } >> + /* >> + * Our mission was to clear the limbo from former leader's >> + * transactions. Exit in case someone did that for us. >> + */ >> + if (txn_limbo_is_empty(&txn_limbo) || >> + former_leader_id != txn_limbo.owner_id) { >> + in_clear_synchro_queue = false; >> + return 0; >> + } >> } >> >> - if (!txn_limbo_is_empty(&txn_limbo)) { >> - int64_t lsns[VCLOCK_MAX]; >> - int len = 0; >> - const struct vclock *vclock; >> - replicaset_foreach(replica) { >> - if (replica->relay != NULL && >> - relay_get_state(replica->relay) != RELAY_OFF && >> - !replica->anon) { >> - assert(!tt_uuid_is_equal(&INSTANCE_UUID, >> - &replica->uuid)); >> - vclock = relay_vclock(replica->relay); >> - int64_t lsn = vclock_get(vclock, >> - former_leader_id); >> - lsns[len++] = lsn; >> - } >> - } >> - lsns[len++] = vclock_get(box_vclock, former_leader_id); >> - assert(len < VCLOCK_MAX); >> + /* >> + * clear_synchro_queue() is a no-op on the limbo owner, so all the rows >> + * in the limbo must've come through the applier meaning they already >> + * have an lsn assigned, even if their WAL write hasn't finished yet. >> + */ >> + int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn; >> + assert(wait_lsn > 0); >> + >> + struct ack_trigger triggers[VCLOCK_MAX]; >> + >> + /* Take this node into account immediately. */ >> + int ack_count = vclock_get(box_vclock, former_leader_id) >= wait_lsn; >> + int trigger_count = 0; >> + >> + replicaset_foreach(replica) { >> + if (relay_get_state(replica->relay) != RELAY_FOLLOW || >> + replica->anon) >> + continue; >> + >> + assert(replica->id != REPLICA_ID_NIL); >> + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)); >> >> - int64_t confirm_lsn = 0; >> - if (len >= replication_synchro_quorum) { >> - qsort(lsns, len, sizeof(int64_t), cmp_i64); >> - confirm_lsn = lsns[len - replication_synchro_quorum]; >> + if (vclock_get(relay_vclock(replica->relay), >> + former_leader_id) >= wait_lsn) { >> + ack_count++; >> + continue; >> } >> + int i = trigger_count++; >> + triggers[i].data = { >> + .fired = false, >> + .target_lsn = &wait_lsn, >> + .replica_id = &former_leader_id, >> + .quorum = &replication_synchro_quorum, >> + .ack_count = &ack_count, >> + .waiter = fiber(), >> + }; >> + trigger_create(&triggers[i].trigger, ack_trigger_f, >> + &triggers[i].data, NULL); >> + relay_on_status_update(replica->relay, &triggers[i].trigger); >> + } >> + >> + assert(trigger_count <= VCLOCK_MAX); >> + >> + if (ack_count + trigger_count < replication_synchro_quorum) { >> + /* Don't even bother waiting when not enough replicas. */ >> + say_warn("clear_synchro_queue cannot gather quorum. " >> + "There're only %d replicas (including this one), while" >> + "quorum should be %d.", ack_count + trigger_count, >> + replication_synchro_quorum); >> + for (int i = 0; i < trigger_count; i++) >> + trigger_clear(&triggers[i].trigger); >> + goto end; > 5. Better wait anyway. Because more replicas may appear soon and confirm > everything. During the timeout wait. Although it is probably not possible > if we bind triggers to individual relays. I tried to fix it with a global > trigger in my diff. Ok, your variant looks good. >> + } >> + >> + if (trigger_count > 0) { >> + /* Allow to interrupt the function when it takes too long. */ >> + bool cancellable = fiber_set_cancellable(true); >> + fiber_sleep(replication_synchro_timeout); >> + fiber_set_cancellable(cancellable); > 6. I think a sleep is enough. If a user made the fiber non-cancellable, better > leave it as is. We only change it to 'false', and only when do some operation > when a wakeup is not legal, such as WAL write. Okay. >> + } >> + >> + for (int i = 0; i < trigger_count; i++) >> + trigger_clear(&triggers[i].trigger); >> + >> + /* >> + * No point to proceed after cancellation even if got the quorum. >> + * Emptying the limbo involves a pair of blocking WAL writes, >> + * making the fiber sleep even longer, which isn't appropriate >> + * when it's cancelled. >> + */ >> + if (fiber_is_cancelled()) { >> + say_info("clear_synchro_queue interrupted by the fiber " >> + "cancellation."); >> + goto end; >> + } >> >> - txn_limbo_force_empty(&txn_limbo, confirm_lsn); >> - assert(txn_limbo_is_empty(&txn_limbo)); >> + if (ack_count < replication_synchro_quorum) { >> + say_warn("clear_synchro_queue timed out after %.2f " >> + "seconds. Collected %d acks, quorum is %d. ", >> + replication_synchro_timeout, ack_count, >> + replication_synchro_quorum); >> + goto end; > 7. Why don't you return an error? The queue couldn't be cleared, so it > looks like an error, no? > > I added diag_set() in my diff, but didn't change it to return -1 so far. Ok, let's return an error then. >> } >> >> + txn_limbo_force_empty(&txn_limbo, wait_lsn); >> + assert(txn_limbo_is_empty(&txn_limbo)); >> +end: >> in_clear_synchro_queue = false; >> return 0; >> } >> diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result >> new file mode 100644 >> index 000000000..e806d9d53 >> --- /dev/null >> +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result > 8. Perhaps better rename to gh-5435-qsync-.... . Because now it is very > useful that I can run all qsync tests with a small command > `python test-run.py qsync`. No problem. done. My diff's below: =============================================== diff --git a/src/box/box.cc b/src/box/box.cc index 90c07c342..22e3057f8 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1035,9 +1035,10 @@ box_quorum_on_ack_f(struct trigger *trigger, void *event)      vclock_follow(&t->vclock, ack->source, new_lsn);      ++t->ack_count; -    if (t->ack_count >= t->quorum) +    if (t->ack_count >= t->quorum) {          fiber_wakeup(t->waiter); -    trigger_clear(trigger); +        trigger_clear(trigger); +    }      return 0;  } @@ -1153,16 +1154,25 @@ box_clear_synchro_queue(bool try_wait)      int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn;      assert(wait_lsn > 0); -    if (box_wait_quorum(former_leader_id, wait_lsn, -                replication_synchro_quorum, -                replication_synchro_timeout) == 0) { -        txn_limbo_force_empty(&txn_limbo, wait_lsn); -        assert(txn_limbo_is_empty(&txn_limbo)); -    } else { -        diag_log(); +    int quorum = replication_synchro_quorum; +    int rc = box_wait_quorum(former_leader_id, wait_lsn, quorum, +                 replication_synchro_timeout); +    if (rc == 0) { +        if (quorum < replication_synchro_quorum) { +            diag_set(ClientError, ER_QUORUM_WAIT, quorum, +                 "quorum was increased while waiting"); +            rc = -1; +        } else if (wait_lsn < txn_limbo_last_synchro_entry(&txn_limbo)->lsn) { +            diag_set(ClientError, ER_QUORUM_WAIT, quorum, +                 "new synchronous transactions appeared"); +            rc = -1; +        } else { +            txn_limbo_force_empty(&txn_limbo, wait_lsn); +            assert(txn_limbo_is_empty(&txn_limbo)); +        }      }      in_clear_synchro_queue = false; -    return 0; +    return rc;  }  void diff --git a/src/box/raft.c b/src/box/raft.c index 1942df952..634740570 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -94,8 +94,17 @@ box_raft_update_synchro_queue(struct raft *raft)       * simply log a warning.       */      if (raft->state == RAFT_STATE_LEADER) { -        if (box_clear_synchro_queue(false) != 0) -            diag_log(); +        int rc = 0; +        uint32_t errcode = 0; +        do { +            rc = box_clear_synchro_queue(false); +            if (rc) { +                struct error *err = diag_last_error(diag_get()); +                errcode = box_error_code(err); +                diag_log(); +            } +        } while (rc != 0 && errcode == ER_QUORUM_WAIT && +               !fiber_is_cancelled());      }  } diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.result similarity index 100% rename from test/replication/gh-5435-clear-synchro-queue-commit-all.result rename to test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.result diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.test.lua similarity index 100% rename from test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua rename to test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.test.lua -- Serge Petrenko