[Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything
Serge Petrenko
sergepetrenko at tarantool.org
Thu Dec 24 19:12:21 MSK 2020
23.12.2020 20:28, Vladislav Shpilevoy пишет:
> Thanks for the patch!
>
> See 9 comments below, my diff in the end of the email, and in
> the branch as a separate commit.
Thanks for your review & the patch!
I've applied your diff with a couple of changes.
Please find my answers and diff below.
>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index 2d403fc9a..38bf4034e 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -1002,6 +1002,36 @@ box_set_replication_anon(void)
>>
>> }
>>
>> +struct ack_trigger_data {
>> + bool fired;
> 1. This member can be deleted, if you would call
> trigger_clear(trigger) instead of setting the flag.
True.
>> + int64_t *target_lsn;
>> + uint32_t *replica_id;
>> + int *quorum;
>> + int *ack_count;
> 2. Most of these members can be stored by value, see my diff.
> Even if the idea in my diff will look bad, still some minor
> details could be taken.
I store quorum by reference so that the trigger gets the new value
once quorumis changed. This would work when the quorum is
increased, at least.
Speaking of other values, I thought it might be useful to change them
on the fly one day, even though this isn't used now.
Anyway, let's just retry when quorum is changed.
>> + struct fiber *waiter;
>> +};
>> +
>> +struct ack_trigger {
>> + struct ack_trigger_data data;
>> + struct trigger trigger;
> 3. You can merge data right into the trigger object.
>
>> +};
>> +
>> +static int ack_trigger_f(struct trigger *trigger, void *event)
> 4. Normally we put return type on a separate line.
Sure. My bad.
>> +{
>> + struct relay *relay = (struct relay *)event;
>> + struct ack_trigger_data *data = (struct ack_trigger_data *)trigger->data;
>> + if (data->fired)
>> + return 0;
>> + if (*data->target_lsn <= vclock_get(relay_vclock(relay),
>> + *data->replica_id)) {
>> + ++*data->ack_count;
>> + data->fired = true;
>> + if (*data->ack_count >= *data->quorum)
>> + fiber_wakeup(data->waiter);
>> + }
>> + return 0;
>> +}
>> @@ -1030,37 +1064,104 @@ box_clear_synchro_queue(bool try_wait)
>> break;
>> fiber_sleep(0.001);
>> }
>> + /*
>> + * Our mission was to clear the limbo from former leader's
>> + * transactions. Exit in case someone did that for us.
>> + */
>> + if (txn_limbo_is_empty(&txn_limbo) ||
>> + former_leader_id != txn_limbo.owner_id) {
>> + in_clear_synchro_queue = false;
>> + return 0;
>> + }
>> }
>>
>> - if (!txn_limbo_is_empty(&txn_limbo)) {
>> - int64_t lsns[VCLOCK_MAX];
>> - int len = 0;
>> - const struct vclock *vclock;
>> - replicaset_foreach(replica) {
>> - if (replica->relay != NULL &&
>> - relay_get_state(replica->relay) != RELAY_OFF &&
>> - !replica->anon) {
>> - assert(!tt_uuid_is_equal(&INSTANCE_UUID,
>> - &replica->uuid));
>> - vclock = relay_vclock(replica->relay);
>> - int64_t lsn = vclock_get(vclock,
>> - former_leader_id);
>> - lsns[len++] = lsn;
>> - }
>> - }
>> - lsns[len++] = vclock_get(box_vclock, former_leader_id);
>> - assert(len < VCLOCK_MAX);
>> + /*
>> + * clear_synchro_queue() is a no-op on the limbo owner, so all the rows
>> + * in the limbo must've come through the applier meaning they already
>> + * have an lsn assigned, even if their WAL write hasn't finished yet.
>> + */
>> + int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn;
>> + assert(wait_lsn > 0);
>> +
>> + struct ack_trigger triggers[VCLOCK_MAX];
>> +
>> + /* Take this node into account immediately. */
>> + int ack_count = vclock_get(box_vclock, former_leader_id) >= wait_lsn;
>> + int trigger_count = 0;
>> +
>> + replicaset_foreach(replica) {
>> + if (relay_get_state(replica->relay) != RELAY_FOLLOW ||
>> + replica->anon)
>> + continue;
>> +
>> + assert(replica->id != REPLICA_ID_NIL);
>> + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid));
>>
>> - int64_t confirm_lsn = 0;
>> - if (len >= replication_synchro_quorum) {
>> - qsort(lsns, len, sizeof(int64_t), cmp_i64);
>> - confirm_lsn = lsns[len - replication_synchro_quorum];
>> + if (vclock_get(relay_vclock(replica->relay),
>> + former_leader_id) >= wait_lsn) {
>> + ack_count++;
>> + continue;
>> }
>> + int i = trigger_count++;
>> + triggers[i].data = {
>> + .fired = false,
>> + .target_lsn = &wait_lsn,
>> + .replica_id = &former_leader_id,
>> + .quorum = &replication_synchro_quorum,
>> + .ack_count = &ack_count,
>> + .waiter = fiber(),
>> + };
>> + trigger_create(&triggers[i].trigger, ack_trigger_f,
>> + &triggers[i].data, NULL);
>> + relay_on_status_update(replica->relay, &triggers[i].trigger);
>> + }
>> +
>> + assert(trigger_count <= VCLOCK_MAX);
>> +
>> + if (ack_count + trigger_count < replication_synchro_quorum) {
>> + /* Don't even bother waiting when not enough replicas. */
>> + say_warn("clear_synchro_queue cannot gather quorum. "
>> + "There're only %d replicas (including this one), while"
>> + "quorum should be %d.", ack_count + trigger_count,
>> + replication_synchro_quorum);
>> + for (int i = 0; i < trigger_count; i++)
>> + trigger_clear(&triggers[i].trigger);
>> + goto end;
> 5. Better wait anyway. Because more replicas may appear soon and confirm
> everything. During the timeout wait. Although it is probably not possible
> if we bind triggers to individual relays. I tried to fix it with a global
> trigger in my diff.
Ok, your variant looks good.
>> + }
>> +
>> + if (trigger_count > 0) {
>> + /* Allow to interrupt the function when it takes too long. */
>> + bool cancellable = fiber_set_cancellable(true);
>> + fiber_sleep(replication_synchro_timeout);
>> + fiber_set_cancellable(cancellable);
> 6. I think a sleep is enough. If a user made the fiber non-cancellable, better
> leave it as is. We only change it to 'false', and only when do some operation
> when a wakeup is not legal, such as WAL write.
Okay.
>> + }
>> +
>> + for (int i = 0; i < trigger_count; i++)
>> + trigger_clear(&triggers[i].trigger);
>> +
>> + /*
>> + * No point to proceed after cancellation even if got the quorum.
>> + * Emptying the limbo involves a pair of blocking WAL writes,
>> + * making the fiber sleep even longer, which isn't appropriate
>> + * when it's cancelled.
>> + */
>> + if (fiber_is_cancelled()) {
>> + say_info("clear_synchro_queue interrupted by the fiber "
>> + "cancellation.");
>> + goto end;
>> + }
>>
>> - txn_limbo_force_empty(&txn_limbo, confirm_lsn);
>> - assert(txn_limbo_is_empty(&txn_limbo));
>> + if (ack_count < replication_synchro_quorum) {
>> + say_warn("clear_synchro_queue timed out after %.2f "
>> + "seconds. Collected %d acks, quorum is %d. ",
>> + replication_synchro_timeout, ack_count,
>> + replication_synchro_quorum);
>> + goto end;
> 7. Why don't you return an error? The queue couldn't be cleared, so it
> looks like an error, no?
>
> I added diag_set() in my diff, but didn't change it to return -1 so far.
Ok, let's return an error then.
>> }
>>
>> + txn_limbo_force_empty(&txn_limbo, wait_lsn);
>> + assert(txn_limbo_is_empty(&txn_limbo));
>> +end:
>> in_clear_synchro_queue = false;
>> return 0;
>> }
>> diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
>> new file mode 100644
>> index 000000000..e806d9d53
>> --- /dev/null
>> +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
> 8. Perhaps better rename to gh-5435-qsync-.... . Because now it is very
> useful that I can run all qsync tests with a small command
> `python test-run.py qsync`.
No problem. done.
My diff's below:
===============================================
diff --git a/src/box/box.cc b/src/box/box.cc
index 90c07c342..22e3057f8 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1035,9 +1035,10 @@ box_quorum_on_ack_f(struct trigger *trigger, void
*event)
vclock_follow(&t->vclock, ack->source, new_lsn);
++t->ack_count;
- if (t->ack_count >= t->quorum)
+ if (t->ack_count >= t->quorum) {
fiber_wakeup(t->waiter);
- trigger_clear(trigger);
+ trigger_clear(trigger);
+ }
return 0;
}
@@ -1153,16 +1154,25 @@ box_clear_synchro_queue(bool try_wait)
int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn;
assert(wait_lsn > 0);
- if (box_wait_quorum(former_leader_id, wait_lsn,
- replication_synchro_quorum,
- replication_synchro_timeout) == 0) {
- txn_limbo_force_empty(&txn_limbo, wait_lsn);
- assert(txn_limbo_is_empty(&txn_limbo));
- } else {
- diag_log();
+ int quorum = replication_synchro_quorum;
+ int rc = box_wait_quorum(former_leader_id, wait_lsn, quorum,
+ replication_synchro_timeout);
+ if (rc == 0) {
+ if (quorum < replication_synchro_quorum) {
+ diag_set(ClientError, ER_QUORUM_WAIT, quorum,
+ "quorum was increased while waiting");
+ rc = -1;
+ } else if (wait_lsn <
txn_limbo_last_synchro_entry(&txn_limbo)->lsn) {
+ diag_set(ClientError, ER_QUORUM_WAIT, quorum,
+ "new synchronous transactions appeared");
+ rc = -1;
+ } else {
+ txn_limbo_force_empty(&txn_limbo, wait_lsn);
+ assert(txn_limbo_is_empty(&txn_limbo));
+ }
}
in_clear_synchro_queue = false;
- return 0;
+ return rc;
}
void
diff --git a/src/box/raft.c b/src/box/raft.c
index 1942df952..634740570 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -94,8 +94,17 @@ box_raft_update_synchro_queue(struct raft *raft)
* simply log a warning.
*/
if (raft->state == RAFT_STATE_LEADER) {
- if (box_clear_synchro_queue(false) != 0)
- diag_log();
+ int rc = 0;
+ uint32_t errcode = 0;
+ do {
+ rc = box_clear_synchro_queue(false);
+ if (rc) {
+ struct error *err = diag_last_error(diag_get());
+ errcode = box_error_code(err);
+ diag_log();
+ }
+ } while (rc != 0 && errcode == ER_QUORUM_WAIT &&
+ !fiber_is_cancelled());
}
}
diff --git
a/test/replication/gh-5435-clear-synchro-queue-commit-all.result
b/test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.result
similarity index 100%
rename from test/replication/gh-5435-clear-synchro-queue-commit-all.result
rename to
test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.result
diff --git
a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
b/test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.test.lua
similarity index 100%
rename from test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
rename to
test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.test.lua
--
Serge Petrenko
More information about the Tarantool-patches
mailing list