[Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Wed Dec 23 20:28:03 MSK 2020
Thanks for the patch!
See 9 comments below, my diff in the end of the email, and in
the branch as a separate commit.
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 2d403fc9a..38bf4034e 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1002,6 +1002,36 @@ box_set_replication_anon(void)
>
> }
>
> +struct ack_trigger_data {
> + bool fired;
1. This member can be deleted, if you would call
trigger_clear(trigger) instead of setting the flag.
> + int64_t *target_lsn;
> + uint32_t *replica_id;
> + int *quorum;
> + int *ack_count;
2. Most of these members can be stored by value, see my diff.
Even if the idea in my diff will look bad, still some minor
details could be taken.
> + struct fiber *waiter;
> +};
> +
> +struct ack_trigger {
> + struct ack_trigger_data data;
> + struct trigger trigger;
3. You can merge data right into the trigger object.
> +};
> +
> +static int ack_trigger_f(struct trigger *trigger, void *event)
4. Normally we put return type on a separate line.
> +{
> + struct relay *relay = (struct relay *)event;
> + struct ack_trigger_data *data = (struct ack_trigger_data *)trigger->data;
> + if (data->fired)
> + return 0;
> + if (*data->target_lsn <= vclock_get(relay_vclock(relay),
> + *data->replica_id)) {
> + ++*data->ack_count;
> + data->fired = true;
> + if (*data->ack_count >= *data->quorum)
> + fiber_wakeup(data->waiter);
> + }
> + return 0;
> +}
> @@ -1030,37 +1064,104 @@ box_clear_synchro_queue(bool try_wait)
> break;
> fiber_sleep(0.001);
> }
> + /*
> + * Our mission was to clear the limbo from former leader's
> + * transactions. Exit in case someone did that for us.
> + */
> + if (txn_limbo_is_empty(&txn_limbo) ||
> + former_leader_id != txn_limbo.owner_id) {
> + in_clear_synchro_queue = false;
> + return 0;
> + }
> }
>
> - if (!txn_limbo_is_empty(&txn_limbo)) {
> - int64_t lsns[VCLOCK_MAX];
> - int len = 0;
> - const struct vclock *vclock;
> - replicaset_foreach(replica) {
> - if (replica->relay != NULL &&
> - relay_get_state(replica->relay) != RELAY_OFF &&
> - !replica->anon) {
> - assert(!tt_uuid_is_equal(&INSTANCE_UUID,
> - &replica->uuid));
> - vclock = relay_vclock(replica->relay);
> - int64_t lsn = vclock_get(vclock,
> - former_leader_id);
> - lsns[len++] = lsn;
> - }
> - }
> - lsns[len++] = vclock_get(box_vclock, former_leader_id);
> - assert(len < VCLOCK_MAX);
> + /*
> + * clear_synchro_queue() is a no-op on the limbo owner, so all the rows
> + * in the limbo must've come through the applier meaning they already
> + * have an lsn assigned, even if their WAL write hasn't finished yet.
> + */
> + int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn;
> + assert(wait_lsn > 0);
> +
> + struct ack_trigger triggers[VCLOCK_MAX];
> +
> + /* Take this node into account immediately. */
> + int ack_count = vclock_get(box_vclock, former_leader_id) >= wait_lsn;
> + int trigger_count = 0;
> +
> + replicaset_foreach(replica) {
> + if (relay_get_state(replica->relay) != RELAY_FOLLOW ||
> + replica->anon)
> + continue;
> +
> + assert(replica->id != REPLICA_ID_NIL);
> + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid));
>
> - int64_t confirm_lsn = 0;
> - if (len >= replication_synchro_quorum) {
> - qsort(lsns, len, sizeof(int64_t), cmp_i64);
> - confirm_lsn = lsns[len - replication_synchro_quorum];
> + if (vclock_get(relay_vclock(replica->relay),
> + former_leader_id) >= wait_lsn) {
> + ack_count++;
> + continue;
> }
> + int i = trigger_count++;
> + triggers[i].data = {
> + .fired = false,
> + .target_lsn = &wait_lsn,
> + .replica_id = &former_leader_id,
> + .quorum = &replication_synchro_quorum,
> + .ack_count = &ack_count,
> + .waiter = fiber(),
> + };
> + trigger_create(&triggers[i].trigger, ack_trigger_f,
> + &triggers[i].data, NULL);
> + relay_on_status_update(replica->relay, &triggers[i].trigger);
> + }
> +
> + assert(trigger_count <= VCLOCK_MAX);
> +
> + if (ack_count + trigger_count < replication_synchro_quorum) {
> + /* Don't even bother waiting when not enough replicas. */
> + say_warn("clear_synchro_queue cannot gather quorum. "
> + "There're only %d replicas (including this one), while"
> + "quorum should be %d.", ack_count + trigger_count,
> + replication_synchro_quorum);
> + for (int i = 0; i < trigger_count; i++)
> + trigger_clear(&triggers[i].trigger);
> + goto end;
5. Better wait anyway. Because more replicas may appear soon and confirm
everything. During the timeout wait. Although it is probably not possible
if we bind triggers to individual relays. I tried to fix it with a global
trigger in my diff.
> + }
> +
> + if (trigger_count > 0) {
> + /* Allow to interrupt the function when it takes too long. */
> + bool cancellable = fiber_set_cancellable(true);
> + fiber_sleep(replication_synchro_timeout);
> + fiber_set_cancellable(cancellable);
6. I think a sleep is enough. If a user made the fiber non-cancellable, better
leave it as is. We only change it to 'false', and only when do some operation
when a wakeup is not legal, such as WAL write.
> + }
> +
> + for (int i = 0; i < trigger_count; i++)
> + trigger_clear(&triggers[i].trigger);
> +
> + /*
> + * No point to proceed after cancellation even if got the quorum.
> + * Emptying the limbo involves a pair of blocking WAL writes,
> + * making the fiber sleep even longer, which isn't appropriate
> + * when it's cancelled.
> + */
> + if (fiber_is_cancelled()) {
> + say_info("clear_synchro_queue interrupted by the fiber "
> + "cancellation.");
> + goto end;
> + }
>
> - txn_limbo_force_empty(&txn_limbo, confirm_lsn);
> - assert(txn_limbo_is_empty(&txn_limbo));
> + if (ack_count < replication_synchro_quorum) {
> + say_warn("clear_synchro_queue timed out after %.2f "
> + "seconds. Collected %d acks, quorum is %d. ",
> + replication_synchro_timeout, ack_count,
> + replication_synchro_quorum);
> + goto end;
7. Why don't you return an error? The queue couldn't be cleared, so it
looks like an error, no?
I added diag_set() in my diff, but didn't change it to return -1 so far.
> }
>
> + txn_limbo_force_empty(&txn_limbo, wait_lsn);
> + assert(txn_limbo_is_empty(&txn_limbo));
> +end:
> in_clear_synchro_queue = false;
> return 0;
> }
> diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
> new file mode 100644
> index 000000000..e806d9d53
> --- /dev/null
> +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
8. Perhaps better rename to gh-5435-qsync-.... . Because now it is very
useful that I can run all qsync tests with a small command
`python test-run.py qsync`.
> diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
> new file mode 100644
> index 000000000..da218624b
> --- /dev/null
> +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
> @@ -0,0 +1,65 @@
> +test_run = require('test_run').new()
> +
> +--
> +-- gh-5435: make sure the new limbo owner commits everything there is left in
> +-- the limbo from an old owner.
> +--
> +
> +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
> +
> +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'})
> +test_run:wait_fullmesh(SERVERS)
> +
> +-- Force election_replica1 to become leader.
> +test_run:switch('election_replica2')
> +box.cfg{election_mode='voter'}
> +test_run:switch('election_replica3')
> +box.cfg{election_mode='voter'}
> +
> +test_run:switch('election_replica1')
> +box.ctl.wait_rw()
> +
> +_ = box.schema.space.create('test', {is_sync=true})
> +_ = box.space.test:create_index('pk')
> +
> +-- Fill the limbo with pending entries. 3 mustn't receive them yet.
> +test_run:cmd('stop server election_replica3')
> +box.cfg{replication_synchro_quorum=3, replication_synchro_timeout=1000}
> +
> +lsn = box.info.lsn
> +
> +for i=1,10 do\
> + require('fiber').create(function() box.space.test:insert{i} end)\
> +end
> +
> +-- Wait for WAL write and replication.
> +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end)
> +test_run:wait_lsn('election_replica2', 'election_replica1')
> +
> +test_run:cmd('switch election_replica2')
> +
> +test_run:cmd('stop server election_replica1')
> +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
> +-- It will vote for 2, however, since 2 has newer data, and start replication
> +-- once 2 becomes the leader.
> +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
> +
> +-- Set a huge timeout for 2 reasons.
> +-- First, this guards us from the intance leaving clear_synchro_queue too early
9. intance -> instance.
See my changes below and on the branch in a separate commit.
====================
diff --git a/src/box/box.cc b/src/box/box.cc
index 38bf4034e..90c07c342 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1002,32 +1002,102 @@ box_set_replication_anon(void)
}
-struct ack_trigger_data {
- bool fired;
- int64_t *target_lsn;
- uint32_t *replica_id;
- int *quorum;
- int *ack_count;
+/** Trigger to catch ACKs from all nodes when need to wait for quorum. */
+struct box_quorum_trigger {
+ /** Inherit trigger. */
+ struct trigger base;
+ /** Minimal number of nodes who should confirm the target LSN. */
+ int quorum;
+ /** Target LSN to wait for. */
+ int64_t target_lsn;
+ /** Replica ID whose LSN is being waited. */
+ uint32_t replica_id;
+ /**
+ * All versions of the given replica's LSN as seen by other nodes. The
+ * same as in the txn limbo.
+ */
+ struct vclock vclock;
+ /** Number of nodes who confirmed the LSN. */
+ int ack_count;
+ /** Fiber to wakeup when quorum is reached. */
struct fiber *waiter;
};
-struct ack_trigger {
- struct ack_trigger_data data;
- struct trigger trigger;
-};
-
-static int ack_trigger_f(struct trigger *trigger, void *event)
+static int
+box_quorum_on_ack_f(struct trigger *trigger, void *event)
{
- struct relay *relay = (struct relay *)event;
- struct ack_trigger_data *data = (struct ack_trigger_data *)trigger->data;
- if (data->fired)
+ struct replication_ack *ack = (struct replication_ack *)event;
+ struct box_quorum_trigger *t = (struct box_quorum_trigger *)trigger;
+ int64_t new_lsn = vclock_get(ack->vclock, t->replica_id);
+ int64_t old_lsn = vclock_get(&t->vclock, ack->source);
+ if (new_lsn < t->target_lsn || old_lsn >= t->target_lsn)
return 0;
- if (*data->target_lsn <= vclock_get(relay_vclock(relay),
- *data->replica_id)) {
- ++*data->ack_count;
- data->fired = true;
- if (*data->ack_count >= *data->quorum)
- fiber_wakeup(data->waiter);
+
+ vclock_follow(&t->vclock, ack->source, new_lsn);
+ ++t->ack_count;
+ if (t->ack_count >= t->quorum)
+ fiber_wakeup(t->waiter);
+ trigger_clear(trigger);
+ return 0;
+}
+
+/**
+ * Wait until at least @a quorum of nodes confirm @a target_lsn from the node
+ * with id @a lead_id.
+ */
+static int
+box_wait_quorum(uint32_t lead_id, int64_t target_lsn, int quorum,
+ double timeout)
+{
+ struct box_quorum_trigger t;
+ memset(&t, 0, sizeof(t));
+ vclock_create(&t.vclock);
+
+ /* Take this node into account immediately. */
+ int ack_count = vclock_get(box_vclock, lead_id) >= target_lsn;
+ replicaset_foreach(replica) {
+ if (relay_get_state(replica->relay) != RELAY_FOLLOW ||
+ replica->anon)
+ continue;
+
+ assert(replica->id != REPLICA_ID_NIL);
+ assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid));
+
+ int64_t lsn = vclock_get(relay_vclock(replica->relay), lead_id);
+ vclock_follow(&t.vclock, replica->id, lsn);
+ if (lsn >= target_lsn) {
+ ack_count++;
+ continue;
+ }
+ }
+ if (ack_count < quorum) {
+ t.quorum = quorum;
+ t.target_lsn = target_lsn;
+ t.replica_id = lead_id;
+ t.ack_count = ack_count;
+ t.waiter = fiber();
+ trigger_create(&t.base, box_quorum_on_ack_f, NULL, NULL);
+ trigger_add(&replicaset.on_ack, &t.base);
+ fiber_sleep(timeout);
+ trigger_clear(&t.base);
+ ack_count = t.ack_count;
+ }
+ /*
+ * No point to proceed after cancellation even if got the quorum. The
+ * quorum is waited by limbo clear function. Emptying the limbo involves
+ * a pair of blocking WAL writes, making the fiber sleep even longer,
+ * which isn't appropriate when it's canceled.
+ */
+ if (fiber_is_cancelled()) {
+ diag_set(ClientError, ER_QUORUM_WAIT, quorum,
+ "fiber is canceled");
+ return -1;
+ }
+ if (ack_count < quorum) {
+ diag_set(ClientError, ER_QUORUM_WAIT, quorum, tt_sprintf(
+ "timeout after %.2lf seconds, collected %d acks with "
+ "%d quorum", timeout, ack_count, quorum));
+ return -1;
}
return 0;
}
@@ -1083,85 +1153,14 @@ box_clear_synchro_queue(bool try_wait)
int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn;
assert(wait_lsn > 0);
- struct ack_trigger triggers[VCLOCK_MAX];
-
- /* Take this node into account immediately. */
- int ack_count = vclock_get(box_vclock, former_leader_id) >= wait_lsn;
- int trigger_count = 0;
-
- replicaset_foreach(replica) {
- if (relay_get_state(replica->relay) != RELAY_FOLLOW ||
- replica->anon)
- continue;
-
- assert(replica->id != REPLICA_ID_NIL);
- assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid));
-
- if (vclock_get(relay_vclock(replica->relay),
- former_leader_id) >= wait_lsn) {
- ack_count++;
- continue;
- }
- int i = trigger_count++;
- triggers[i].data = {
- .fired = false,
- .target_lsn = &wait_lsn,
- .replica_id = &former_leader_id,
- .quorum = &replication_synchro_quorum,
- .ack_count = &ack_count,
- .waiter = fiber(),
- };
- trigger_create(&triggers[i].trigger, ack_trigger_f,
- &triggers[i].data, NULL);
- relay_on_status_update(replica->relay, &triggers[i].trigger);
- }
-
- assert(trigger_count <= VCLOCK_MAX);
-
- if (ack_count + trigger_count < replication_synchro_quorum) {
- /* Don't even bother waiting when not enough replicas. */
- say_warn("clear_synchro_queue cannot gather quorum. "
- "There're only %d replicas (including this one), while"
- "quorum should be %d.", ack_count + trigger_count,
- replication_synchro_quorum);
- for (int i = 0; i < trigger_count; i++)
- trigger_clear(&triggers[i].trigger);
- goto end;
- }
-
- if (trigger_count > 0) {
- /* Allow to interrupt the function when it takes too long. */
- bool cancellable = fiber_set_cancellable(true);
- fiber_sleep(replication_synchro_timeout);
- fiber_set_cancellable(cancellable);
- }
-
- for (int i = 0; i < trigger_count; i++)
- trigger_clear(&triggers[i].trigger);
-
- /*
- * No point to proceed after cancellation even if got the quorum.
- * Emptying the limbo involves a pair of blocking WAL writes,
- * making the fiber sleep even longer, which isn't appropriate
- * when it's cancelled.
- */
- if (fiber_is_cancelled()) {
- say_info("clear_synchro_queue interrupted by the fiber "
- "cancellation.");
- goto end;
- }
-
- if (ack_count < replication_synchro_quorum) {
- say_warn("clear_synchro_queue timed out after %.2f "
- "seconds. Collected %d acks, quorum is %d. ",
- replication_synchro_timeout, ack_count,
- replication_synchro_quorum);
- goto end;
+ if (box_wait_quorum(former_leader_id, wait_lsn,
+ replication_synchro_quorum,
+ replication_synchro_timeout) == 0) {
+ txn_limbo_force_empty(&txn_limbo, wait_lsn);
+ assert(txn_limbo_is_empty(&txn_limbo));
+ } else {
+ diag_log();
}
-
- txn_limbo_force_empty(&txn_limbo, wait_lsn);
- assert(txn_limbo_is_empty(&txn_limbo));
-end:
in_clear_synchro_queue = false;
return 0;
}
diff --git a/src/box/errcode.h b/src/box/errcode.h
index cfea5bcf2..56573688e 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -274,6 +274,7 @@ struct errcode_record {
/*219 */_(ER_XLOG_GAP, "%s") \
/*220 */_(ER_TOO_EARLY_SUBSCRIBE, "Can't subscribe non-anonymous replica %s until join is done") \
/*221 */_(ER_SQL_CANT_ADD_AUTOINC, "Can't add AUTOINCREMENT: space %s can't feature more than one AUTOINCREMENT field") \
+ /*222 */_(ER_QUORUM_WAIT, "Couldn't wait for quorum %d: %s") \
/*
* !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/relay.cc b/src/box/relay.cc
index dbad8a680..df04f8198 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -146,8 +146,6 @@ struct relay {
alignas(CACHELINE_SIZE)
/** Known relay vclock. */
struct vclock vclock;
- /** Triggers fired once tx knows the new relay vclock. */
- struct rlist on_status_update;
/**
* True if the relay needs Raft updates. It can live fine
* without sending Raft updates, if it is a relay to an
@@ -175,12 +173,6 @@ relay_vclock(const struct relay *relay)
return &relay->tx.vclock;
}
-void
-relay_on_status_update(struct relay *relay, struct trigger *trigger)
-{
- trigger_add(&relay->tx.on_status_update, trigger);
-}
-
double
relay_last_row_time(const struct relay *relay)
{
@@ -209,7 +201,6 @@ relay_new(struct replica *replica)
diag_create(&relay->diag);
stailq_create(&relay->pending_gc);
relay->state = RELAY_OFF;
- rlist_create(&relay->tx.on_status_update);
return relay;
}
@@ -421,6 +412,9 @@ tx_status_update(struct cmsg *msg)
{
struct relay_status_msg *status = (struct relay_status_msg *)msg;
vclock_copy(&status->relay->tx.vclock, &status->vclock);
+ struct replication_ack ack;
+ ack.source = status->relay->replica->id;
+ ack.vclock = &status->vclock;
/*
* Let pending synchronous transactions know, which of
* them were successfully sent to the replica. Acks are
@@ -429,11 +423,10 @@ tx_status_update(struct cmsg *msg)
* for master's CONFIRM message instead.
*/
if (txn_limbo.owner_id == instance_id) {
- txn_limbo_ack(&txn_limbo, status->relay->replica->id,
- vclock_get(&status->vclock, instance_id));
+ txn_limbo_ack(&txn_limbo, ack.source,
+ vclock_get(ack.vclock, instance_id));
}
-
- trigger_run(&status->relay->tx.on_status_update, status->relay);
+ trigger_run(&replicaset.on_ack, &ack);
static const struct cmsg_hop route[] = {
{relay_status_update, NULL}
diff --git a/src/box/relay.h b/src/box/relay.h
index 4e6ecaffb..b32e2ea2a 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -93,10 +93,6 @@ relay_vclock(const struct relay *relay);
double
relay_last_row_time(const struct relay *relay);
-/** Set a trigger on relay vclock update as seen by tx. */
-void
-relay_on_status_update(struct relay *relay, struct trigger *trigger);
-
/**
* Send a Raft update request to the relay channel. It is not
* guaranteed that it will be delivered. The connection may break.
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 931c73a37..980109597 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -98,6 +98,8 @@ replication_init(void)
rlist_create(&replicaset.applier.on_rollback);
rlist_create(&replicaset.applier.on_wal_write);
+ rlist_create(&replicaset.on_ack);
+
diag_create(&replicaset.applier.diag);
}
@@ -113,6 +115,7 @@ replication_free(void)
relay_cancel(replica->relay);
diag_destroy(&replicaset.applier.diag);
+ trigger_destroy(&replicaset.on_ack);
}
int
diff --git a/src/box/replication.h b/src/box/replication.h
index e57912848..2ad1cbf66 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -190,6 +190,14 @@ extern struct tt_uuid REPLICASET_UUID;
typedef rb_tree(struct replica) replica_hash_t;
+/** Ack which is passed to on ack triggers. */
+struct replication_ack {
+ /** Replica ID of the ACK source. */
+ uint32_t source;
+ /** Confirmed vclock. */
+ const struct vclock *vclock;
+};
+
/**
* Replica set state.
*
@@ -271,6 +279,8 @@ struct replicaset {
/* Shared applier diagnostic area. */
struct diag diag;
} applier;
+ /** Triggers are invoked on each ACK from each replica. */
+ struct rlist on_ack;
/** Map of all known replica_id's to correspponding replica's. */
struct replica *replica_by_id[VCLOCK_MAX];
};
diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
index e806d9d53..2699231e5 100644
--- a/test/replication/gh-5435-clear-synchro-queue-commit-all.result
+++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
@@ -96,7 +96,7 @@ test_run:cmd('start server election_replica3 with wait=False, wait_load=False, a
| ...
-- Set a huge timeout for 2 reasons.
--- First, this guards us from the intance leaving clear_synchro_queue too early
+-- First, this guards us from the instance leaving clear_synchro_queue too early
-- and confirming nothing.
-- Second, it lets us test that the instance doesn't wait for the full timeout.
box.cfg{replication_synchro_timeout=1000}
diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
index da218624b..03705d96c 100644
--- a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
+++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
@@ -45,7 +45,7 @@ test_run:cmd('stop server election_replica1')
test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
-- Set a huge timeout for 2 reasons.
--- First, this guards us from the intance leaving clear_synchro_queue too early
+-- First, this guards us from the instance leaving clear_synchro_queue too early
-- and confirming nothing.
-- Second, it lets us test that the instance doesn't wait for the full timeout.
box.cfg{replication_synchro_timeout=1000}
More information about the Tarantool-patches
mailing list