* [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue commit everything
@ 2020-12-10 20:55 Serge Petrenko
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 1/4] box: add a single execution guard to clear_synchro_queue Serge Petrenko
` (5 more replies)
0 siblings, 6 replies; 17+ messages in thread
From: Serge Petrenko @ 2020-12-10 20:55 UTC (permalink / raw)
To: v.shpilevoy, cyrillos; +Cc: tarantool-patches
The patchset fails `replication/election_qsync_stress.test.lua` test.
I haven't found the reason for this yet, but I still think it may be reviewed
while I'm trying to figure out where the test failures come from.
Serge Petrenko (4):
box: add a single execution guard to clear_synchro_queue
relay: rename is_raft_enabled message to relay_is_running
relay: introduce relay_lsn_watcher
box: rework clear_synchro_queue to commit everything
src/box/box.cc | 143 +++++++++++++++---
src/box/box.h | 2 +-
src/box/lua/ctl.c | 4 +-
src/box/relay.cc | 120 +++++++++++----
src/box/relay.h | 44 ++++++
test/replication/election_replica.lua | 5 +-
...5435-clear-synchro-queue-commit-all.result | 137 +++++++++++++++++
...35-clear-synchro-queue-commit-all.test.lua | 60 ++++++++
test/replication/suite.cfg | 1 +
9 files changed, 460 insertions(+), 56 deletions(-)
create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.result
create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
--
2.24.3 (Apple Git-128)
^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH 1/4] box: add a single execution guard to clear_synchro_queue
2020-12-10 20:55 [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue commit everything Serge Petrenko
@ 2020-12-10 20:55 ` Serge Petrenko
2020-12-17 21:43 ` Vladislav Shpilevoy
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 2/4] relay: rename is_raft_enabled message to relay_is_running Serge Petrenko
` (4 subsequent siblings)
5 siblings, 1 reply; 17+ messages in thread
From: Serge Petrenko @ 2020-12-10 20:55 UTC (permalink / raw)
To: v.shpilevoy, cyrillos; +Cc: tarantool-patches
Clear_synchro_queue isn't meant to be called multiple times on a single
instance.
Multiple simultaneous invocations of clear_synhcro_queue() shouldn't
hurt now, since clear_synchro_queue simply exits on an empty limbo, but
may be harmful in future, when clear_synchro_queue is reworked.
Prohibit such misuse by introducing an execution guard and raising an
error once duplicate invocation is detected.
Prerequisite #5435
---
src/box/box.cc | 17 ++++++++++++++---
src/box/box.h | 2 +-
src/box/lua/ctl.c | 4 ++--
3 files changed, 17 insertions(+), 6 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index a8bc3471d..8e0c9a160 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1001,15 +1001,25 @@ box_set_replication_anon(void)
}
-void
+int
box_clear_synchro_queue(bool try_wait)
{
+ /* A guard to block multiple simultaneous function invocations. */
+ static bool in_clear_synchro_queue = false;
+ if (in_clear_synchro_queue) {
+ diag_set(ClientError, ER_UNSUPPORTED, "clear_synchro_queue",
+ "simultaneous invocations");
+ return -1;
+ }
if (!is_box_configured || txn_limbo_is_empty(&txn_limbo))
- return;
+ return 0;
uint32_t former_leader_id = txn_limbo.owner_id;
assert(former_leader_id != REPLICA_ID_NIL);
if (former_leader_id == instance_id)
- return;
+ return 0;
+
+ in_clear_synchro_queue = true;
+ auto guard = make_scoped_guard([&] { in_clear_synchro_queue = false; });
if (try_wait) {
/* Wait until pending confirmations/rollbacks reach us. */
@@ -1050,6 +1060,7 @@ box_clear_synchro_queue(bool try_wait)
txn_limbo_force_empty(&txn_limbo, confirm_lsn);
assert(txn_limbo_is_empty(&txn_limbo));
}
+ return 0;
}
void
diff --git a/src/box/box.h b/src/box/box.h
index b47a220b7..95336688b 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -266,7 +266,7 @@ extern "C" {
typedef struct tuple box_tuple_t;
-void
+int
box_clear_synchro_queue(bool try_wait);
/* box_select is private and used only by FFI */
diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c
index bf26465e6..a3447f3e7 100644
--- a/src/box/lua/ctl.c
+++ b/src/box/lua/ctl.c
@@ -81,8 +81,8 @@ lbox_ctl_on_schema_init(struct lua_State *L)
static int
lbox_ctl_clear_synchro_queue(struct lua_State *L)
{
- (void) L;
- box_clear_synchro_queue(true);
+ if (box_clear_synchro_queue(true) != 0)
+ return luaT_error(L);
return 0;
}
--
2.24.3 (Apple Git-128)
^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH 2/4] relay: rename is_raft_enabled message to relay_is_running
2020-12-10 20:55 [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue commit everything Serge Petrenko
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 1/4] box: add a single execution guard to clear_synchro_queue Serge Petrenko
@ 2020-12-10 20:55 ` Serge Petrenko
2020-12-17 21:43 ` Vladislav Shpilevoy
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher Serge Petrenko
` (3 subsequent siblings)
5 siblings, 1 reply; 17+ messages in thread
From: Serge Petrenko @ 2020-12-10 20:55 UTC (permalink / raw)
To: v.shpilevoy, cyrillos; +Cc: tarantool-patches
Relay sends a raft enabler/disabler message on start and stop. This
message would also be useful to perform some additional work related to relay
initialization/deinitialization done in the tx thread.
For example, the message will be used to notify tx thread that there's no
point in wating until a relay replicates a specific row, since the relay
is exiting. This is the case in #5435.
So rename the message and all the connected functions to represent its
enriched functionality.
Prerequisite #5435
---
src/box/relay.cc | 47 +++++++++++++++++++++++------------------------
1 file changed, 23 insertions(+), 24 deletions(-)
diff --git a/src/box/relay.cc b/src/box/relay.cc
index e16ac5a6b..f342a79dd 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -579,8 +579,8 @@ relay_send_heartbeat(struct relay *relay)
}
}
-/** A message to set Raft enabled flag in TX thread from a relay thread. */
-struct relay_is_raft_enabled_msg {
+/** A message to notify the TX thread that relay is operational. */
+struct relay_is_running_msg {
/** Base cbus message. */
struct cmsg base;
/**
@@ -590,44 +590,45 @@ struct relay_is_raft_enabled_msg {
struct cmsg_hop route[2];
/** Relay pointer to set the flag in. */
struct relay *relay;
- /** New flag value. */
- bool value;
+ /** Relay running status. */
+ bool is_running;
/** Flag to wait for the flag being set, in a relay thread. */
bool is_finished;
};
-/** TX thread part of the Raft flag setting, first hop. */
+/** TX thread part of the relay is running notification, first hop. */
static void
-tx_set_is_raft_enabled(struct cmsg *base)
+tx_notify_is_relay_running(struct cmsg *base)
{
- struct relay_is_raft_enabled_msg *msg =
- (struct relay_is_raft_enabled_msg *)base;
- msg->relay->tx.is_raft_enabled = msg->value;
+ struct relay_is_running_msg *msg = (struct relay_is_running_msg *)base;
+ /* Never subscribe anonymous replicas to raft updates. */
+ if (!msg->relay->replica->anon)
+ msg->relay->tx.is_raft_enabled = msg->is_running;
}
-/** Relay thread part of the Raft flag setting, second hop. */
+/** Relay thread part of the relay is running notification, second hop. */
static void
-relay_set_is_raft_enabled(struct cmsg *base)
+relay_notify_is_relay_running(struct cmsg *base)
{
- struct relay_is_raft_enabled_msg *msg =
- (struct relay_is_raft_enabled_msg *)base;
+ struct relay_is_running_msg *msg = (struct relay_is_running_msg *)base;
msg->is_finished = true;
}
/**
- * Set relay Raft enabled flag from a relay thread to be accessed by the TX
+ * Notify the TX thread that the relay is operational.
+ * For now this will only set relay Raft enabled flag to be accessed by the TX
* thread.
*/
static void
-relay_send_is_raft_enabled(struct relay *relay,
- struct relay_is_raft_enabled_msg *msg, bool value)
+relay_send_is_running(struct relay *relay, struct relay_is_running_msg *msg,
+ bool is_running)
{
- msg->route[0].f = tx_set_is_raft_enabled;
+ msg->route[0].f = tx_notify_is_relay_running;
msg->route[0].pipe = &relay->relay_pipe;
- msg->route[1].f = relay_set_is_raft_enabled;
+ msg->route[1].f = relay_notify_is_relay_running;
msg->route[1].pipe = NULL;
msg->relay = relay;
- msg->value = value;
+ msg->is_running = is_running;
msg->is_finished = false;
cmsg_init(&msg->base, msg->route);
cpipe_push(&relay->tx_pipe, &msg->base);
@@ -667,9 +668,8 @@ relay_subscribe_f(va_list ap)
cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
&relay->relay_pipe, NULL, NULL, cbus_process);
- struct relay_is_raft_enabled_msg raft_enabler;
- if (!relay->replica->anon)
- relay_send_is_raft_enabled(relay, &raft_enabler, true);
+ struct relay_is_running_msg relay_is_running_msg;
+ relay_send_is_running(relay, &relay_is_running_msg, true);
/*
* Setup garbage collection trigger.
@@ -750,8 +750,7 @@ relay_subscribe_f(va_list ap)
cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
}
- if (!relay->replica->anon)
- relay_send_is_raft_enabled(relay, &raft_enabler, false);
+ relay_send_is_running(relay, &relay_is_running_msg, false);
/*
* Clear garbage collector trigger and WAL watcher.
--
2.24.3 (Apple Git-128)
^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher
2020-12-10 20:55 [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue commit everything Serge Petrenko
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 1/4] box: add a single execution guard to clear_synchro_queue Serge Petrenko
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 2/4] relay: rename is_raft_enabled message to relay_is_running Serge Petrenko
@ 2020-12-10 20:55 ` Serge Petrenko
2020-12-17 21:43 ` Vladislav Shpilevoy
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything Serge Petrenko
` (2 subsequent siblings)
5 siblings, 1 reply; 17+ messages in thread
From: Serge Petrenko @ 2020-12-10 20:55 UTC (permalink / raw)
To: v.shpilevoy, cyrillos; +Cc: tarantool-patches
Add a list of lsn watchers to relay.
Relay_lsn_watcher is a structure that lives in tx thread and monitors
replica's progress in applying rows coming from a given replica id.
The watcher fires once relay reports that replica has acked the target lsn
for the given replica id.
The watcher is owned by some tx fiber, which typically sleeps until the
watcher wakes it up. Besides waking the waiting fiber up, the watcher may
perform some work as dictated by the notify function.
Prerequisite #5435
---
src/box/relay.cc | 73 ++++++++++++++++++++++++++++++++++++++++++++++--
src/box/relay.h | 44 +++++++++++++++++++++++++++++
2 files changed, 114 insertions(+), 3 deletions(-)
diff --git a/src/box/relay.cc b/src/box/relay.cc
index f342a79dd..4014792a6 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -152,6 +152,8 @@ struct relay {
* anonymous replica, for example.
*/
bool is_raft_enabled;
+ /** A list of lsn watchers registered for this relay. */
+ struct rlist lsn_watchers;
} tx;
};
@@ -200,6 +202,7 @@ relay_new(struct replica *replica)
fiber_cond_create(&relay->reader_cond);
diag_create(&relay->diag);
stailq_create(&relay->pending_gc);
+ rlist_create(&relay->tx.lsn_watchers);
relay->state = RELAY_OFF;
return relay;
}
@@ -394,6 +397,54 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
});
}
+void
+relay_set_lsn_watcher(struct relay *relay, struct relay_lsn_watcher *watcher)
+{
+ rlist_add_tail_entry(&relay->tx.lsn_watchers, watcher, in_list);
+}
+
+void
+relay_lsn_watcher_create(struct relay_lsn_watcher *watcher, uint32_t replica_id,
+ int64_t target_lsn, relay_lsn_watcher_f notify,
+ relay_lsn_watcher_f destroy, void *data)
+{
+ watcher->replica_id = replica_id;
+ watcher->target_lsn = target_lsn;
+ watcher->notify = notify;
+ watcher->destroy = destroy;
+ watcher->data = data;
+ watcher->waiter = fiber();
+}
+
+/**
+ * Destroy the watcher.
+ * Wake the waiting fiber up, fire the on destroy callback and remove the
+ * watcher from the relay's watcher list.
+ */
+static void
+relay_lsn_watcher_destroy(struct relay_lsn_watcher *watcher)
+{
+ watcher->destroy(watcher->data);
+ fiber_wakeup(watcher->waiter);
+ rlist_del_entry(watcher, in_list);
+}
+
+/**
+ * Notify the watcher that the replica has advanced to the given vclock.
+ * In case target_lsn is hit for watcher's replica_id, fire the notify
+ * callback and destroy the watcher.
+ */
+static void
+relay_lsn_watcher_notify(struct relay_lsn_watcher *watcher,
+ struct vclock *vclock)
+{
+ int64_t lsn = vclock_get(vclock, watcher->replica_id);
+ if (lsn >= watcher->target_lsn) {
+ watcher->notify(watcher->data);
+ relay_lsn_watcher_destroy(watcher);
+ }
+}
+
/**
* The message which updated tx thread with a new vclock has returned back
* to the relay.
@@ -411,7 +462,8 @@ static void
tx_status_update(struct cmsg *msg)
{
struct relay_status_msg *status = (struct relay_status_msg *)msg;
- vclock_copy(&status->relay->tx.vclock, &status->vclock);
+ struct relay *relay = status->relay;
+ vclock_copy(&relay->tx.vclock, &status->vclock);
/*
* Let pending synchronous transactions know, which of
* them were successfully sent to the replica. Acks are
@@ -420,14 +472,18 @@ tx_status_update(struct cmsg *msg)
* for master's CONFIRM message instead.
*/
if (txn_limbo.owner_id == instance_id) {
- txn_limbo_ack(&txn_limbo, status->relay->replica->id,
+ txn_limbo_ack(&txn_limbo, relay->replica->id,
vclock_get(&status->vclock, instance_id));
}
+ struct relay_lsn_watcher *watcher, *tmp;
+ rlist_foreach_entry_safe(watcher, &relay->tx.lsn_watchers, in_list, tmp) {
+ relay_lsn_watcher_notify(watcher, &status->vclock);
+ }
static const struct cmsg_hop route[] = {
{relay_status_update, NULL}
};
cmsg_init(msg, route);
- cpipe_push(&status->relay->relay_pipe, msg);
+ cpipe_push(&relay->relay_pipe, msg);
}
/**
@@ -604,6 +660,17 @@ tx_notify_is_relay_running(struct cmsg *base)
/* Never subscribe anonymous replicas to raft updates. */
if (!msg->relay->replica->anon)
msg->relay->tx.is_raft_enabled = msg->is_running;
+ /*
+ * Notify everyone waiting for a specific row to be replicated through
+ * this relay there's nothing to wait for anymore.
+ */
+ if (!msg->is_running) {
+ struct relay_lsn_watcher *watcher, *tmp;
+ rlist_foreach_entry_safe(watcher, &msg->relay->tx.lsn_watchers,
+ in_list, tmp) {
+ relay_lsn_watcher_destroy(watcher);
+ }
+ }
}
/** Relay thread part of the relay is running notification, second hop. */
diff --git a/src/box/relay.h b/src/box/relay.h
index b32e2ea2a..da2e3498a 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -32,6 +32,7 @@
*/
#include <stdint.h>
+#include "small/rlist.h"
#if defined(__cplusplus)
extern "C" {
@@ -134,4 +135,47 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
struct vclock *replica_vclock, uint32_t replica_version_id,
uint32_t replica_id_filter);
+/**
+ * A callback invoked once lsn watcher sees the replica has reached the target
+ * lsn for the given replica id.
+ */
+typedef void (*relay_lsn_watcher_f)(void *data);
+
+/**
+ * A relay watcher which notifies tx via calling a given function once the
+ * replica confirms it has received the rows from replica_id up to target_lsn.
+ */
+struct relay_lsn_watcher {
+ /** A replica_id to monitor rows from. */
+ uint32_t replica_id;
+ /** The lsn to wait for. */
+ int64_t target_lsn;
+ /**
+ * A callback invoked in the tx thread once the watcher sees that
+ * replica has reached the target lsn.
+ */
+ relay_lsn_watcher_f notify;
+ /**
+ * A callback fired once the relay thread exits to notify the waiter
+ * there's nothing to wait for anymore.
+ */
+ relay_lsn_watcher_f destroy;
+ /** Data passed to \a notify and \a destroy. */
+ void *data;
+ /** A fiber waiting for this watcher to fire. */
+ struct fiber *waiter;
+ /** A link in relay's watcher list. */
+ struct rlist in_list;
+};
+
+/** Initialize a pre-allocated lsn watcher. */
+void
+relay_lsn_watcher_create(struct relay_lsn_watcher *watcher, uint32_t replica_id,
+ int64_t target_lsn, relay_lsn_watcher_f notify,
+ relay_lsn_watcher_f destroy, void *data);
+
+/** Attach the given lsn watcher to the relay. */
+void
+relay_set_lsn_watcher(struct relay *relay, struct relay_lsn_watcher *watcher);
+
#endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
--
2.24.3 (Apple Git-128)
^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything
2020-12-10 20:55 [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue commit everything Serge Petrenko
` (2 preceding siblings ...)
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher Serge Petrenko
@ 2020-12-10 20:55 ` Serge Petrenko
2020-12-17 21:43 ` Vladislav Shpilevoy
2020-12-11 7:15 ` [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue " Serge Petrenko
2020-12-11 9:19 ` Serge Petrenko
5 siblings, 1 reply; 17+ messages in thread
From: Serge Petrenko @ 2020-12-10 20:55 UTC (permalink / raw)
To: v.shpilevoy, cyrillos; +Cc: tarantool-patches
It is possible that a new leader (elected either via raft or manually or
via some user-written election algorithm) loses the data that the old
leader has successfully committed and confirmed.
Imagine such a situation: there are N nodes in a replicaset, the old
leader, denoted A, tries to apply some synchronous transaction. It is
written on the leader itself and N other nodes, one of which is B.
The transaction has thus gathered quorum, N/2 + 1 acks.
Now A writes CONFIRM and commits the transaction, but dies before the
confirmation reaches any of its followers. B is elected the new leader and it
sees that the last A's transaction is present on N/2 nodes, so it doesn't have a
quorum (A was one of the N/2 + 1).
Current `clear_synchro_queue()` implementation makes B roll back the
transaction, leading to rollback after commit, which is unacceptable.
To fix the problem, make `clear_synchro_queue()` wait until all the rows from
the previous leader gather `replication_synchro_quorum` acks.
In case any new rows come via replication while waiting for acks,
wait for their confirmation as well.
Closes #5435
---
src/box/box.cc | 126 +++++++++++++---
test/replication/election_replica.lua | 5 +-
...5435-clear-synchro-queue-commit-all.result | 137 ++++++++++++++++++
...35-clear-synchro-queue-commit-all.test.lua | 60 ++++++++
test/replication/suite.cfg | 1 +
5 files changed, 306 insertions(+), 23 deletions(-)
create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.result
create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
diff --git a/src/box/box.cc b/src/box/box.cc
index 8e0c9a160..fb9167977 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1001,6 +1001,25 @@ box_set_replication_anon(void)
}
+struct lsn_watcher_data {
+ int *ack_count;
+ int *watcher_count;
+};
+
+static void
+count_confirm_f(void *data)
+{
+ struct lsn_watcher_data *d = (struct lsn_watcher_data *)data;
+ (*d->ack_count)++;
+}
+
+static void
+watcher_destroy_f(void *data)
+{
+ struct lsn_watcher_data *d = (struct lsn_watcher_data *)data;
+ (*d->watcher_count)--;
+}
+
int
box_clear_synchro_queue(bool try_wait)
{
@@ -1011,6 +1030,10 @@ box_clear_synchro_queue(bool try_wait)
"simultaneous invocations");
return -1;
}
+ /*
+ * XXX: we may want to write confirm + rollback even when the limbo is
+ * empty for the sake of limbo ownership transition.
+ */
if (!is_box_configured || txn_limbo_is_empty(&txn_limbo))
return 0;
uint32_t former_leader_id = txn_limbo.owner_id;
@@ -1032,34 +1055,93 @@ box_clear_synchro_queue(bool try_wait)
}
}
- if (!txn_limbo_is_empty(&txn_limbo)) {
- int64_t lsns[VCLOCK_MAX];
- int len = 0;
- const struct vclock *vclock;
+ if (txn_limbo_is_empty(&txn_limbo))
+ return 0;
+
+ /*
+ * Allocate the watchers statically to not bother with alloc/free.
+ * This is fine since we have a single execution guard.
+ */
+ static struct relay_lsn_watcher watchers[VCLOCK_MAX];
+ for (int i = 1; i < VCLOCK_MAX; i++)
+ rlist_create(&watchers[i].in_list);
+
+ int64_t wait_lsn = 0;
+ bool restart = false;
+ do {
+ wait_lsn = txn_limbo_last_entry(&txn_limbo)->lsn;
+ /*
+ * Take this node into account immediately.
+ * clear_synchro_queue() is a no-op on the limbo owner for now,
+ * so all the rows in the limbo must've come through the applier
+ * and so they already have an lsn assigned, even if their wal
+ * write isn't finished yet.
+ */
+ assert(wait_lsn > 0);
+ int count = vclock_get(box_vclock, former_leader_id) >= wait_lsn;
+ int watcher_count = 0;
+ struct lsn_watcher_data data = {
+ .ack_count = &count,
+ .watcher_count = &watcher_count,
+ };
+
replicaset_foreach(replica) {
- if (replica->relay != NULL &&
- relay_get_state(replica->relay) != RELAY_OFF &&
- !replica->anon) {
- assert(!tt_uuid_is_equal(&INSTANCE_UUID,
- &replica->uuid));
- vclock = relay_vclock(replica->relay);
- int64_t lsn = vclock_get(vclock,
- former_leader_id);
- lsns[len++] = lsn;
+ if (replica->anon || replica->relay == NULL ||
+ relay_get_state(replica->relay) != RELAY_FOLLOW)
+ continue;
+ assert(replica->id != 0);
+ assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid));
+
+ if (vclock_get(relay_vclock(replica->relay),
+ former_leader_id) >= wait_lsn) {
+ count++;
+ continue;
}
+
+ relay_lsn_watcher_create(&watchers[replica->id],
+ former_leader_id, wait_lsn,
+ count_confirm_f,
+ watcher_destroy_f, &data);
+ relay_set_lsn_watcher(replica->relay,
+ &watchers[replica->id]);
+ watcher_count++;
}
- lsns[len++] = vclock_get(box_vclock, former_leader_id);
- assert(len < VCLOCK_MAX);
- int64_t confirm_lsn = 0;
- if (len >= replication_synchro_quorum) {
- qsort(lsns, len, sizeof(int64_t), cmp_i64);
- confirm_lsn = lsns[len - replication_synchro_quorum];
+
+ while (count < replication_synchro_quorum &&
+ count + watcher_count >= replication_synchro_quorum) {
+ fiber_yield();
}
- txn_limbo_force_empty(&txn_limbo, confirm_lsn);
- assert(txn_limbo_is_empty(&txn_limbo));
- }
+ /*
+ * In case some new limbo entries arrived, confirm them as well.
+ */
+ restart = wait_lsn < txn_limbo_last_entry(&txn_limbo)->lsn;
+
+ /*
+ * Not enough replicas connected. Give user some time to
+ * reconfigure quorum and replicas some time to reconnect, then
+ * restart the watchers.
+ */
+ if (count + watcher_count < replication_synchro_quorum) {
+ say_info("clear_syncrho_queue cannot collect quorum: "
+ "number of connected replicas (%d) is less "
+ "than replication_synchro_quorum (%d). "
+ "Will retry in %.2f seconds",
+ count + watcher_count,
+ replication_synchro_quorum,
+ replication_timeout);
+ fiber_sleep(replication_timeout);
+ restart = true;
+ }
+
+ /* Detach the watchers that haven't fired. */
+ for (int i = 1; i < VCLOCK_MAX; i++)
+ rlist_del_entry(&watchers[i], in_list);
+ } while (restart);
+
+ txn_limbo_force_empty(&txn_limbo, wait_lsn);
+ assert(txn_limbo_is_empty(&txn_limbo));
return 0;
}
diff --git a/test/replication/election_replica.lua b/test/replication/election_replica.lua
index db037ed67..3b4d9a123 100644
--- a/test/replication/election_replica.lua
+++ b/test/replication/election_replica.lua
@@ -4,6 +4,8 @@ local INSTANCE_ID = string.match(arg[0], "%d")
local SOCKET_DIR = require('fio').cwd()
local SYNCHRO_QUORUM = arg[1] and tonumber(arg[1]) or 3
local ELECTION_TIMEOUT = arg[2] and tonumber(arg[2]) or 0.1
+local ELECTION_MODE = arg[3] or 'candidate'
+local CONNECT_QUORUM = arg[4] and tonumber(arg[4]) or 3
local function instance_uri(instance_id)
return SOCKET_DIR..'/election_replica'..instance_id..'.sock';
@@ -19,7 +21,8 @@ box.cfg({
instance_uri(3),
},
replication_timeout = 0.1,
- election_mode = 'candidate',
+ replication_connect_quorum = CONNECT_QUORUM,
+ election_mode = ELECTION_MODE,
election_timeout = ELECTION_TIMEOUT,
replication_synchro_quorum = SYNCHRO_QUORUM,
replication_synchro_timeout = 0.1,
diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
new file mode 100644
index 000000000..e633f9e60
--- /dev/null
+++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
@@ -0,0 +1,137 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+--
+-- gh-5435: make sure the new limbo owner commits everything there is left in
+-- the limbo from an old owner.
+--
+
+SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
+ | ---
+ | ...
+
+test_run:create_cluster(SERVERS, "replication", {args='2 0.4'})
+ | ---
+ | ...
+test_run:wait_fullmesh(SERVERS)
+ | ---
+ | ...
+
+-- Force election_replica1 to become leader.
+test_run:switch('election_replica2')
+ | ---
+ | - true
+ | ...
+box.cfg{election_mode='voter'}
+ | ---
+ | ...
+test_run:switch('election_replica3')
+ | ---
+ | - true
+ | ...
+box.cfg{election_mode='voter'}
+ | ---
+ | ...
+
+test_run:switch('election_replica1')
+ | ---
+ | - true
+ | ...
+box.ctl.wait_rw()
+ | ---
+ | ...
+
+_ = box.schema.space.create('test', {is_sync=true})
+ | ---
+ | ...
+_ = box.space.test:create_index('pk')
+ | ---
+ | ...
+
+-- Fill the limbo with pending entries. 3 mustn't receive them yet.
+test_run:cmd('stop server election_replica3')
+ | ---
+ | - true
+ | ...
+box.cfg{replication_synchro_quorum=3}
+ | ---
+ | ...
+
+lsn = box.info.lsn
+ | ---
+ | ...
+
+for i=1,10 do\
+ require('fiber').create(function() box.space.test:insert{i} end)\
+end
+ | ---
+ | ...
+
+-- Wait for WAL write and replication.
+test_run:wait_cond(function() return box.info.lsn == lsn + 10 end)
+ | ---
+ | - true
+ | ...
+test_run:wait_lsn('election_replica2', 'election_replica1')
+ | ---
+ | ...
+
+test_run:cmd('switch election_replica2')
+ | ---
+ | - true
+ | ...
+
+test_run:cmd('stop server election_replica1')
+ | ---
+ | - true
+ | ...
+-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
+-- It will vote for 3, however, since 3 has newer data, and start replication
+-- once 3 becomes the leader.
+test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
+ | ---
+ | - true
+ | ...
+
+box.cfg{election_mode='candidate'}
+ | ---
+ | ...
+box.ctl.wait_rw()
+ | ---
+ | ...
+
+-- If 2 decided whether to keep the rows or not right on becoming the leader,
+-- it would roll them all back. Make sure 2 waits till the rows are replicated
+-- to 3.
+box.space.test:select{}
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | - [4]
+ | - [5]
+ | - [6]
+ | - [7]
+ | - [8]
+ | - [9]
+ | - [10]
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+-- To silence the QA warning. The 1st replica is already stopped.
+SERVERS[1] = nil
+ | ---
+ | ...
+test_run:cmd('delete server election_replica1')
+ | ---
+ | - true
+ | ...
+test_run:drop_cluster(SERVERS)
+ | ---
+ | ...
+
diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
new file mode 100644
index 000000000..6cf616671
--- /dev/null
+++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
@@ -0,0 +1,60 @@
+test_run = require('test_run').new()
+
+--
+-- gh-5435: make sure the new limbo owner commits everything there is left in
+-- the limbo from an old owner.
+--
+
+SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
+
+test_run:create_cluster(SERVERS, "replication", {args='2 0.4'})
+test_run:wait_fullmesh(SERVERS)
+
+-- Force election_replica1 to become leader.
+test_run:switch('election_replica2')
+box.cfg{election_mode='voter'}
+test_run:switch('election_replica3')
+box.cfg{election_mode='voter'}
+
+test_run:switch('election_replica1')
+box.ctl.wait_rw()
+
+_ = box.schema.space.create('test', {is_sync=true})
+_ = box.space.test:create_index('pk')
+
+-- Fill the limbo with pending entries. 3 mustn't receive them yet.
+test_run:cmd('stop server election_replica3')
+box.cfg{replication_synchro_quorum=3}
+
+lsn = box.info.lsn
+
+for i=1,10 do\
+ require('fiber').create(function() box.space.test:insert{i} end)\
+end
+
+-- Wait for WAL write and replication.
+test_run:wait_cond(function() return box.info.lsn == lsn + 10 end)
+test_run:wait_lsn('election_replica2', 'election_replica1')
+
+test_run:cmd('switch election_replica2')
+
+test_run:cmd('stop server election_replica1')
+-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
+-- It will vote for 3, however, since 3 has newer data, and start replication
+-- once 3 becomes the leader.
+test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
+
+box.cfg{election_mode='candidate'}
+box.ctl.wait_rw()
+
+-- If 2 decided whether to keep the rows or not right on becoming the leader,
+-- it would roll them all back. Make sure 2 waits till the rows are replicated
+-- to 3.
+box.space.test:select{}
+
+test_run:cmd('switch default')
+-- To silence the QA warning. The 1st replica is already stopped.
+SERVERS[1] = nil
+test_run:cmd('delete server election_replica1')
+test_run:drop_cluster(SERVERS)
+
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 5670acc4d..8fe3930db 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -36,6 +36,7 @@
"gh-4730-applier-rollback.test.lua": {},
"gh-4928-tx-boundaries.test.lua": {},
"gh-5440-qsync-ro.test.lua": {},
+ "gh-5435-clear-synchro-queue-commit-all.test.lua": {},
"*": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
--
2.24.3 (Apple Git-128)
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue commit everything
2020-12-10 20:55 [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue commit everything Serge Petrenko
` (3 preceding siblings ...)
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything Serge Petrenko
@ 2020-12-11 7:15 ` Serge Petrenko
2020-12-11 9:19 ` Serge Petrenko
5 siblings, 0 replies; 17+ messages in thread
From: Serge Petrenko @ 2020-12-11 7:15 UTC (permalink / raw)
To: Cyrill Gorcunov; +Cc: tarantool-patches
cc Cyrill Gorcunov.
Sorry, Cyrill, I misspelled your address.
10.12.2020 23:55, Serge Petrenko пишет:
> The patchset fails `replication/election_qsync_stress.test.lua` test.
> I haven't found the reason for this yet, but I still think it may be reviewed
> while I'm trying to figure out where the test failures come from.
>
> Serge Petrenko (4):
> box: add a single execution guard to clear_synchro_queue
> relay: rename is_raft_enabled message to relay_is_running
> relay: introduce relay_lsn_watcher
> box: rework clear_synchro_queue to commit everything
>
> src/box/box.cc | 143 +++++++++++++++---
> src/box/box.h | 2 +-
> src/box/lua/ctl.c | 4 +-
> src/box/relay.cc | 120 +++++++++++----
> src/box/relay.h | 44 ++++++
> test/replication/election_replica.lua | 5 +-
> ...5435-clear-synchro-queue-commit-all.result | 137 +++++++++++++++++
> ...35-clear-synchro-queue-commit-all.test.lua | 60 ++++++++
> test/replication/suite.cfg | 1 +
> 9 files changed, 460 insertions(+), 56 deletions(-)
> create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.result
> create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
>
--
Serge Petrenko
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue commit everything
2020-12-10 20:55 [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue commit everything Serge Petrenko
` (4 preceding siblings ...)
2020-12-11 7:15 ` [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue " Serge Petrenko
@ 2020-12-11 9:19 ` Serge Petrenko
5 siblings, 0 replies; 17+ messages in thread
From: Serge Petrenko @ 2020-12-11 9:19 UTC (permalink / raw)
To: v.shpilevoy, Cyrill Gorcunov; +Cc: tarantool-patches
10.12.2020 23:55, Serge Petrenko пишет:
> The patchset fails `replication/election_qsync_stress.test.lua` test.
> I haven't found the reason for this yet, but I still think it may be reviewed
> while I'm trying to figure out where the test failures come from.
>
> Serge Petrenko (4):
> box: add a single execution guard to clear_synchro_queue
> relay: rename is_raft_enabled message to relay_is_running
> relay: introduce relay_lsn_watcher
> box: rework clear_synchro_queue to commit everything
>
> src/box/box.cc | 143 +++++++++++++++---
> src/box/box.h | 2 +-
> src/box/lua/ctl.c | 4 +-
> src/box/relay.cc | 120 +++++++++++----
> src/box/relay.h | 44 ++++++
> test/replication/election_replica.lua | 5 +-
> ...5435-clear-synchro-queue-commit-all.result | 137 +++++++++++++++++
> ...35-clear-synchro-queue-commit-all.test.lua | 60 ++++++++
> test/replication/suite.cfg | 1 +
> 9 files changed, 460 insertions(+), 56 deletions(-)
> create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.result
> create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
>
https://github.com/tarantool/tarantool/issues/5435
sp/gh-5435-new-leader-commit-all
--
Serge Petrenko
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH 1/4] box: add a single execution guard to clear_synchro_queue
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 1/4] box: add a single execution guard to clear_synchro_queue Serge Petrenko
@ 2020-12-17 21:43 ` Vladislav Shpilevoy
2020-12-21 10:18 ` Serge Petrenko
0 siblings, 1 reply; 17+ messages in thread
From: Vladislav Shpilevoy @ 2020-12-17 21:43 UTC (permalink / raw)
To: Serge Petrenko, cyrillos; +Cc: tarantool-patches
Hi! Thanks for the patch!
Looks fine. Only 2 notes below.
> diff --git a/src/box/box.cc b/src/box/box.cc
> index a8bc3471d..8e0c9a160 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1001,15 +1001,25 @@ box_set_replication_anon(void)
>
> }
>
> -void
> +int
> box_clear_synchro_queue(bool try_wait)
> {
> + /* A guard to block multiple simultaneous function invocations. */
> + static bool in_clear_synchro_queue = false;
> + if (in_clear_synchro_queue) {
> + diag_set(ClientError, ER_UNSUPPORTED, "clear_synchro_queue",
> + "simultaneous invocations");
> + return -1;
> + }
> if (!is_box_configured || txn_limbo_is_empty(&txn_limbo))
> - return;
> + return 0;
> uint32_t former_leader_id = txn_limbo.owner_id;
> assert(former_leader_id != REPLICA_ID_NIL);
> if (former_leader_id == instance_id)
> - return;
> + return 0;
> +
> + in_clear_synchro_queue = true;
> + auto guard = make_scoped_guard([&] { in_clear_synchro_queue = false; });
I would better not use C++ here, because guards were
introduced only for protection against exceptions.
But I don't mind having this guard here if you want it.
Only my thoughts.
> if (try_wait) {
> /* Wait until pending confirmations/rollbacks reach us. */
> diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c
> index bf26465e6..a3447f3e7 100644
> --- a/src/box/lua/ctl.c
> +++ b/src/box/lua/ctl.c
> @@ -81,8 +81,8 @@ lbox_ctl_on_schema_init(struct lua_State *L)
> static int
> lbox_ctl_clear_synchro_queue(struct lua_State *L)
> {
> - (void) L;
> - box_clear_synchro_queue(true);
> + if (box_clear_synchro_queue(true) != 0)
> + return luaT_error(L);
Maybe better use nil + error object return way? I thought
we still use it in the new code.
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH 2/4] relay: rename is_raft_enabled message to relay_is_running
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 2/4] relay: rename is_raft_enabled message to relay_is_running Serge Petrenko
@ 2020-12-17 21:43 ` Vladislav Shpilevoy
2020-12-23 12:01 ` Serge Petrenko
0 siblings, 1 reply; 17+ messages in thread
From: Vladislav Shpilevoy @ 2020-12-17 21:43 UTC (permalink / raw)
To: Serge Petrenko, cyrillos; +Cc: tarantool-patches
Thanks for the patch!
I have an idea how to do this all a bit easier probably. But could
take a few more commits.
AFAIU, here is what we need, looking from above.
We need box_clear_synchro_queue() wait until quorum of replicas got
the changes of the instance owning the limbo.
Limbo already collects the LSNs on the node owning the limbo, through
txn_limbo_ack(). This function populates a vclock object with versions
of the owner LSN how it is seen on the replicas.
If we would collect LSNs into that vclock on the replica nodes too, it
would be quite simple to check if a quorum is reached. Just do the
same what txn_limbo_assign_local_lsn() does - scan vclock and check if
there is a quorum.
If there is no a quorum, the replica must wait for it. We can add a
trigger to the limbo (on_ack()) or directly to tx_status_update(),
and wait for the moment when the quorum is collected.
Another option - copy limbo's vclock on the stack in box_clear_synchro_queue(),
check current number of confirmed replicas, then hang a trigger on
tx_status_update(), and wait. Even simpler than the first option.
The reason why I don't like or don't get yet (maybe it is good?) the
approach through the relays is that firstly I want is_raft_enabled be
deleted instead of extended. It is an ugly crutch, which probably
should be removed by this:
https://github.com/tarantool/tarantool/issues/5438.
Secondly, any changes in relay not related to just reading and writing
network scare shit out of me. It is a complex thing running in a
separate thread, so better avoid adding any more complex logic to it.
It should be as dumb as possible. Until all relays are moved to one
thread (WAL) with network and disk IO served through worker threads.
What I see in the last commit now looks extremely complex.
This is a discussion proposal. I may be wrong here and miss something.
On 10.12.2020 21:55, Serge Petrenko via Tarantool-patches wrote:
> Relay sends a raft enabler/disabler message on start and stop. This
> message would also be useful to perform some additional work related to relay
> initialization/deinitialization done in the tx thread.
>
> For example, the message will be used to notify tx thread that there's no
> point in wating until a relay replicates a specific row, since the relay
> is exiting. This is the case in #5435.
Why do we need such a notification? Why can't we let box_clear_synchro_queue()
simply wait until timeout? What if the timeout is 10 sec, and we gave up
immediately because there was no point, but in 5 sec the relays would start,
deliver everything, and the queue could be cleared?
P.S. I read the last commit and now I see that you don't wait for timeout.
But still you want infinitely. So does it matter if a relay has exited or
not anyway?
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher Serge Petrenko
@ 2020-12-17 21:43 ` Vladislav Shpilevoy
[not found] ` <4b7f4fc1-6d48-4332-c432-1eeb0b28c016@tarantool.org>
0 siblings, 1 reply; 17+ messages in thread
From: Vladislav Shpilevoy @ 2020-12-17 21:43 UTC (permalink / raw)
To: Serge Petrenko, cyrillos; +Cc: tarantool-patches
Thanks for the patch and for the design!
See 3 comments below.
On 10.12.2020 21:55, Serge Petrenko via Tarantool-patches wrote:
> Add a list of lsn watchers to relay.
>
> Relay_lsn_watcher is a structure that lives in tx thread and monitors
> replica's progress in applying rows coming from a given replica id.
>
> The watcher fires once relay reports that replica has acked the target lsn
> for the given replica id.
>
> The watcher is owned by some tx fiber, which typically sleeps until the
> watcher wakes it up. Besides waking the waiting fiber up, the watcher may
> perform some work as dictated by the notify function.
>
> Prerequisite #5435
> ---
> src/box/relay.cc | 73 ++++++++++++++++++++++++++++++++++++++++++++++--
> src/box/relay.h | 44 +++++++++++++++++++++++++++++
> 2 files changed, 114 insertions(+), 3 deletions(-)
>
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index f342a79dd..4014792a6 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -394,6 +397,54 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
> });
> }
>
> +void
> +relay_set_lsn_watcher(struct relay *relay, struct relay_lsn_watcher *watcher)
> +{
> + rlist_add_tail_entry(&relay->tx.lsn_watchers, watcher, in_list);
> +}
> +
> +void
> +relay_lsn_watcher_create(struct relay_lsn_watcher *watcher, uint32_t replica_id,
> + int64_t target_lsn, relay_lsn_watcher_f notify,
> + relay_lsn_watcher_f destroy, void *data)
> +{
> + watcher->replica_id = replica_id;
> + watcher->target_lsn = target_lsn;
> + watcher->notify = notify;
> + watcher->destroy = destroy;
> + watcher->data = data;
> + watcher->waiter = fiber();
> +}
1. The notify + destroy + invocation of the watchers looks very similar
to what trigger.h API offers. Can you try to implement this as triggers?
I mean as a list of struct trigger objects.
> +
> +/**
> + * Destroy the watcher.
> + * Wake the waiting fiber up, fire the on destroy callback and remove the
> + * watcher from the relay's watcher list.
> + */
> +static void
> +relay_lsn_watcher_destroy(struct relay_lsn_watcher *watcher)
> +{
> + watcher->destroy(watcher->data);
> + fiber_wakeup(watcher->waiter);
> + rlist_del_entry(watcher, in_list);
> +}
> +
> +/**
> + * Notify the watcher that the replica has advanced to the given vclock.
> + * In case target_lsn is hit for watcher's replica_id, fire the notify
> + * callback and destroy the watcher.
> + */
> +static void
> +relay_lsn_watcher_notify(struct relay_lsn_watcher *watcher,
> + struct vclock *vclock)
> +{
> + int64_t lsn = vclock_get(vclock, watcher->replica_id);
> + if (lsn >= watcher->target_lsn) {
> + watcher->notify(watcher->data);
> + relay_lsn_watcher_destroy(watcher);
> + }
> +}
2. This all seems too intricate. Why does the watcher destroys itself?
Wouldn't it be easier to let the watcher decide if he wants to die, or
change LSN and wait more, or whatever else?
I have a feeling that once you will try to switch to struct triggers,
it will all come together automatically. Since triggers API is notably
hard to misuse.
> +
> /**
> * The message which updated tx thread with a new vclock has returned back
> * to the relay.
> diff --git a/src/box/relay.h b/src/box/relay.h
> index b32e2ea2a..da2e3498a 100644
> --- a/src/box/relay.h
> +++ b/src/box/relay.h
> @@ -134,4 +135,47 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
> struct vclock *replica_vclock, uint32_t replica_version_id,
> uint32_t replica_id_filter);
>
> +/**
> + * A callback invoked once lsn watcher sees the replica has reached the target
> + * lsn for the given replica id.
> + */
> +typedef void (*relay_lsn_watcher_f)(void *data);
> +
> +/**
> + * A relay watcher which notifies tx via calling a given function once the
> + * replica confirms it has received the rows from replica_id up to target_lsn.
> + */
> +struct relay_lsn_watcher {
> + /** A replica_id to monitor rows from. */
> + uint32_t replica_id;
> + /** The lsn to wait for. */
> + int64_t target_lsn;
> + /**
> + * A callback invoked in the tx thread once the watcher sees that
> + * replica has reached the target lsn.
> + */
> + relay_lsn_watcher_f notify;
> + /**
> + * A callback fired once the relay thread exits to notify the waiter
> + * there's nothing to wait for anymore.
> + */
> + relay_lsn_watcher_f destroy;
> + /** Data passed to \a notify and \a destroy. */
3. Lets better use @a. To be consistent with all the new code (except
cases when a whole file uses tons of \a).
> + void *data;
> + /** A fiber waiting for this watcher to fire. */
> + struct fiber *waiter;
> + /** A link in relay's watcher list. */
> + struct rlist in_list;
> +};
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything Serge Petrenko
@ 2020-12-17 21:43 ` Vladislav Shpilevoy
2020-12-23 12:04 ` Serge Petrenko
0 siblings, 1 reply; 17+ messages in thread
From: Vladislav Shpilevoy @ 2020-12-17 21:43 UTC (permalink / raw)
To: Serge Petrenko, cyrillos; +Cc: tarantool-patches
Thanks for the patch!
See 7 comments below.
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 8e0c9a160..fb9167977 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1001,6 +1001,25 @@ box_set_replication_anon(void)
>
> }
>
> +struct lsn_watcher_data {
> + int *ack_count;
> + int *watcher_count;
> +};
> +
> +static void
> +count_confirm_f(void *data)
> +{
> + struct lsn_watcher_data *d = (struct lsn_watcher_data *)data;
> + (*d->ack_count)++;
> +}
> +
> +static void
> +watcher_destroy_f(void *data)
> +{
> + struct lsn_watcher_data *d = (struct lsn_watcher_data *)data;
> + (*d->watcher_count)--;
An ultra-pro-master hint - you wouldn't need () if you would use
prefix -- and ++. But you decide, no rules.
> +}
> +
> int
> box_clear_synchro_queue(bool try_wait)
> {
> @@ -1032,34 +1055,93 @@ box_clear_synchro_queue(bool try_wait)
> }
> }
>
> - if (!txn_limbo_is_empty(&txn_limbo)) {
> - int64_t lsns[VCLOCK_MAX];
> - int len = 0;
> - const struct vclock *vclock;
> + if (txn_limbo_is_empty(&txn_limbo))
> + return 0;
> +
> + /*
> + * Allocate the watchers statically to not bother with alloc/free.
> + * This is fine since we have a single execution guard.
> + */
> + static struct relay_lsn_watcher watchers[VCLOCK_MAX];
> + for (int i = 1; i < VCLOCK_MAX; i++)
> + rlist_create(&watchers[i].in_list);
> +
> + int64_t wait_lsn = 0;
> + bool restart = false;
> + do {
> + wait_lsn = txn_limbo_last_entry(&txn_limbo)->lsn;
1. What if the last transaction is asynchronous? They have -1 lsn in
the limbo.
> + /*
> + * Take this node into account immediately.
> + * clear_synchro_queue() is a no-op on the limbo owner for now,
> + * so all the rows in the limbo must've come through the applier
> + * and so they already have an lsn assigned, even if their wal
> + * write isn't finished yet.
> + */
> + assert(wait_lsn > 0);
> + int count = vclock_get(box_vclock, former_leader_id) >= wait_lsn;
> + int watcher_count = 0;
> + struct lsn_watcher_data data = {
> + .ack_count = &count,
> + .watcher_count = &watcher_count,
> + };
> +
> replicaset_foreach(replica) {
> - if (replica->relay != NULL &&
> - relay_get_state(replica->relay) != RELAY_OFF &&
> - !replica->anon) {
> - assert(!tt_uuid_is_equal(&INSTANCE_UUID,
> - &replica->uuid));
> - vclock = relay_vclock(replica->relay);
> - int64_t lsn = vclock_get(vclock,
> - former_leader_id);
> - lsns[len++] = lsn;
> + if (replica->anon || replica->relay == NULL ||
2. replica->relay is never NULL AFAIR.
> + relay_get_state(replica->relay) != RELAY_FOLLOW)
> + continue;
> + assert(replica->id != 0);
3. Maybe better use REPLICA_ID_NIL.
> + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid));
> +
> + if (vclock_get(relay_vclock(replica->relay),
> + former_leader_id) >= wait_lsn) {
> + count++;
> + continue;
> }
> +
> + relay_lsn_watcher_create(&watchers[replica->id],
> + former_leader_id, wait_lsn,
> + count_confirm_f,
> + watcher_destroy_f, &data);
> + relay_set_lsn_watcher(replica->relay,
> + &watchers[replica->id]);
> + watcher_count++;
> }
> - lsns[len++] = vclock_get(box_vclock, former_leader_id);
> - assert(len < VCLOCK_MAX);
>
> - int64_t confirm_lsn = 0;
> - if (len >= replication_synchro_quorum) {
> - qsort(lsns, len, sizeof(int64_t), cmp_i64);
> - confirm_lsn = lsns[len - replication_synchro_quorum];
> +
> + while (count < replication_synchro_quorum &&
> + count + watcher_count >= replication_synchro_quorum) {
> + fiber_yield();
4. Does it mean the fiber hangs infinitely? I was thinking we could wait
for the timeout and return an error when we failed to wait.
Or maybe at least add a fiber cancellation check here so as this function
could be stopped somehow. Currently the function is basically immortal and
infinite if no quorum for too long time.
> }
>
> - txn_limbo_force_empty(&txn_limbo, confirm_lsn);
> - assert(txn_limbo_is_empty(&txn_limbo));
> - }
> + /*
> + * In case some new limbo entries arrived, confirm them as well.
> + */
> + restart = wait_lsn < txn_limbo_last_entry(&txn_limbo)->lsn;
> +
> + /*
> + * Not enough replicas connected. Give user some time to
> + * reconfigure quorum and replicas some time to reconnect, then
> + * restart the watchers.
> + */
> + if (count + watcher_count < replication_synchro_quorum) {
> + say_info("clear_syncrho_queue cannot collect quorum: "
> + "number of connected replicas (%d) is less "
> + "than replication_synchro_quorum (%d). "
> + "Will retry in %.2f seconds",
> + count + watcher_count,
> + replication_synchro_quorum,
> + replication_timeout);
> + fiber_sleep(replication_timeout);
5. The fixed time sleep looks not much better than the polling in
the old version of box_clear_synchro_queue(). Can we not just sleep
but wait for an event?
> + restart = true;
> + }
> +
> + /* Detach the watchers that haven't fired. */
> + for (int i = 1; i < VCLOCK_MAX; i++)
> + rlist_del_entry(&watchers[i], in_list);
> + } while (restart);
> +
> + txn_limbo_force_empty(&txn_limbo, wait_lsn);
> + assert(txn_limbo_is_empty(&txn_limbo));
> return 0;
> diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
> new file mode 100644
> index 000000000..e633f9e60
> --- /dev/null
> +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
> @@ -0,0 +1,137 @@
> +-- test-run result file version 2
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +
> +--
> +-- gh-5435: make sure the new limbo owner commits everything there is left in
> +-- the limbo from an old owner.
> +--
> +
> +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
> + | ---
> + | ...
> +
> +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'})
> + | ---
> + | ...
> +test_run:wait_fullmesh(SERVERS)
> + | ---
> + | ...
> +
> +-- Force election_replica1 to become leader.
> +test_run:switch('election_replica2')
> + | ---
> + | - true
> + | ...
> +box.cfg{election_mode='voter'}
> + | ---
> + | ...
> +test_run:switch('election_replica3')
> + | ---
> + | - true
> + | ...
> +box.cfg{election_mode='voter'}
> + | ---
> + | ...
> +
> +test_run:switch('election_replica1')
> + | ---
> + | - true
> + | ...
> +box.ctl.wait_rw()
> + | ---
> + | ...
> +
> +_ = box.schema.space.create('test', {is_sync=true})
> + | ---
> + | ...
> +_ = box.space.test:create_index('pk')
> + | ---
> + | ...
> +
> +-- Fill the limbo with pending entries. 3 mustn't receive them yet.
> +test_run:cmd('stop server election_replica3')
> + | ---
> + | - true
> + | ...
> +box.cfg{replication_synchro_quorum=3}
> + | ---
> + | ...
> +
> +lsn = box.info.lsn
> + | ---
> + | ...
> +
> +for i=1,10 do\
> + require('fiber').create(function() box.space.test:insert{i} end)\
> +end
> + | ---
> + | ...
> +
> +-- Wait for WAL write and replication.
> +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end)
> + | ---
> + | - true
> + | ...
> +test_run:wait_lsn('election_replica2', 'election_replica1')
> + | ---
> + | ...
> +
> +test_run:cmd('switch election_replica2')
> + | ---
> + | - true
> + | ...
> +
> +test_run:cmd('stop server election_replica1')
> + | ---
> + | - true
> + | ...
> +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
> +-- It will vote for 3, however, since 3 has newer data, and start replication
> +-- once 3 becomes the leader.
6. 'vote for 2', '2 has newer data', '2 becomes the leader'. Otherwise I don't
understand.
Here you are at node 2, and below you make it candidate + wait_rw(). So it
becomes a leader, not 3.
> +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
> + | ---
> + | - true
> + | ...
> +
> +box.cfg{election_mode='candidate'}
> + | ---
> + | ...
> +box.ctl.wait_rw()
> + | ---
> + | ...
> +
> +-- If 2 decided whether to keep the rows or not right on becoming the leader,
> +-- it would roll them all back. Make sure 2 waits till the rows are replicated
> +-- to 3.
> +box.space.test:select{}
> + | ---
> + | - - [1]
> + | - [2]
> + | - [3]
> + | - [4]
> + | - [5]
> + | - [6]
> + | - [7]
> + | - [8]
> + | - [9]
> + | - [10]
> + | ...
> +
> +test_run:cmd('switch default')
> + | ---
> + | - true
> + | ...
> +-- To silence the QA warning. The 1st replica is already stopped.
> +SERVERS[1] = nil
> + | ---
> + | ...
> +test_run:cmd('delete server election_replica1')
> + | ---
> + | - true
> + | ...
> +test_run:drop_cluster(SERVERS)
> + | ---
> + | ...
7. The test fails when I try to run multiple instances in parallel
like this:
python test-run.py gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear-
It shows that the new leader rolled the data back.
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH 1/4] box: add a single execution guard to clear_synchro_queue
2020-12-17 21:43 ` Vladislav Shpilevoy
@ 2020-12-21 10:18 ` Serge Petrenko
2020-12-21 17:11 ` Vladislav Shpilevoy
0 siblings, 1 reply; 17+ messages in thread
From: Serge Petrenko @ 2020-12-21 10:18 UTC (permalink / raw)
To: Vladislav Shpilevoy, Cyrill Gorcunov; +Cc: tarantool-patches
18.12.2020 00:43, Vladislav Shpilevoy пишет:
> Hi! Thanks for the patch! Looks fine. Only 2 notes below.
Thanks for the review!
>> diff --git a/src/box/box.cc b/src/box/box.cc index
>> a8bc3471d..8e0c9a160 100644 --- a/src/box/box.cc +++ b/src/box/box.cc
>> @@ -1001,15 +1001,25 @@ box_set_replication_anon(void) } -void +int
>> box_clear_synchro_queue(bool try_wait) { + /* A guard to block
>> multiple simultaneous function invocations. */ + static bool
>> in_clear_synchro_queue = false; + if (in_clear_synchro_queue) { +
>> diag_set(ClientError, ER_UNSUPPORTED, "clear_synchro_queue", +
>> "simultaneous invocations"); + return -1; + } if (!is_box_configured
>> || txn_limbo_is_empty(&txn_limbo)) - return; + return 0; uint32_t
>> former_leader_id = txn_limbo.owner_id; assert(former_leader_id !=
>> REPLICA_ID_NIL); if (former_leader_id == instance_id) - return; +
>> return 0; + + in_clear_synchro_queue = true; + auto guard =
>> make_scoped_guard([&] { in_clear_synchro_queue = false; });
> I would better not use C++ here, because guards were introduced only
> for protection against exceptions.
I agree. I thought there would be multiple returns below
so I introduced the guard to not write
```
in_clear_synchro_queue = false;
return 0;
```
every time.
Turns out there are only 2 such places, and I can still use
`goto end` to omit an extraneous `in_clear_synchro_queue = false`.
The diff for this commit is below.
> But I don't mind having this guard here if you want it. Only my thoughts.
>> if (try_wait) { /* Wait until pending confirmations/rollbacks reach
>> us. */ diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c index
>> bf26465e6..a3447f3e7 100644 --- a/src/box/lua/ctl.c +++
>> b/src/box/lua/ctl.c @@ -81,8 +81,8 @@ lbox_ctl_on_schema_init(struct
>> lua_State *L) static int lbox_ctl_clear_synchro_queue(struct
>> lua_State *L) { - (void) L; - box_clear_synchro_queue(true); + if
>> (box_clear_synchro_queue(true) != 0) + return luaT_error(L);
> Maybe better use nil + error object return way? I thought we still use
> it in the new code.
Hm, I haven't seen us do that in lua/C.
As far as I know, every box.* method throws a lua error in case of failure.
I may miss something. Is there a reason for returning nil + error instead of
throwing?
==============================================
```
diff --git a/src/box/box.cc b/src/box/box.cc
index 8e0c9a160..6f7a89d8d 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1019,7 +1019,6 @@ box_clear_synchro_queue(bool try_wait)
return 0;
in_clear_synchro_queue = true;
- auto guard = make_scoped_guard([&] { in_clear_synchro_queue =
false; });
if (try_wait) {
/* Wait until pending confirmations/rollbacks reach us. */
@@ -1060,6 +1059,8 @@ box_clear_synchro_queue(bool try_wait)
txn_limbo_force_empty(&txn_limbo, confirm_lsn);
assert(txn_limbo_is_empty(&txn_limbo));
}
+
+ in_clear_synchro_queue = false;
return 0;
}
```
>
-- Serge Petrenko
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH 1/4] box: add a single execution guard to clear_synchro_queue
2020-12-21 10:18 ` Serge Petrenko
@ 2020-12-21 17:11 ` Vladislav Shpilevoy
2020-12-23 12:01 ` Serge Petrenko
0 siblings, 1 reply; 17+ messages in thread
From: Vladislav Shpilevoy @ 2020-12-21 17:11 UTC (permalink / raw)
To: Serge Petrenko, Cyrill Gorcunov; +Cc: tarantool-patches
>> But I don't mind having this guard here if you want it. Only my thoughts.
>>> if (try_wait) { /* Wait until pending confirmations/rollbacks reach us. */ diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c index bf26465e6..a3447f3e7 100644 --- a/src/box/lua/ctl.c +++ b/src/box/lua/ctl.c @@ -81,8 +81,8 @@ lbox_ctl_on_schema_init(struct lua_State *L) static int lbox_ctl_clear_synchro_queue(struct lua_State *L) { - (void) L; - box_clear_synchro_queue(true); + if (box_clear_synchro_queue(true) != 0) + return luaT_error(L);
>> Maybe better use nil + error object return way? I thought we still use it in the new code.
>
> Hm, I haven't seen us do that in lua/C.
> As far as I know, every box.* method throws a lua error in case of failure.
> I may miss something. Is there a reason for returning nil + error instead of
> throwing?
The only reason is that it is our new rule for writing Lua code.
http://www.tarantool.io/en/doc/latest/dev_guide/lua_style_guide/#error-handling
Not every box function, really.
As some examples I can remember box.session.push(),
box.unprepare, box.execute, box.prepare.
There are also examples for non-box Lua methods. You
can grep by luaT_push_nil_and_error() to find almost all
usages in Lua C API. To find usages in .lua files you
can try to look for `return nil, err*`, but it won't
show everything.
For me nil,err looks better, but I understand that there
is a lot of old code still using exceptions. And maybe we
would better stick to them. I suggest to ask in the chat
if in doubt.
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH 1/4] box: add a single execution guard to clear_synchro_queue
2020-12-21 17:11 ` Vladislav Shpilevoy
@ 2020-12-23 12:01 ` Serge Petrenko
0 siblings, 0 replies; 17+ messages in thread
From: Serge Petrenko @ 2020-12-23 12:01 UTC (permalink / raw)
To: Vladislav Shpilevoy, Cyrill Gorcunov; +Cc: tarantool-patches
21.12.2020 20:11, Vladislav Shpilevoy пишет:
>>> But I don't mind having this guard here if you want it. Only my thoughts.
>>>> if (try_wait) { /* Wait until pending confirmations/rollbacks reach us. */ diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c index bf26465e6..a3447f3e7 100644 --- a/src/box/lua/ctl.c +++ b/src/box/lua/ctl.c @@ -81,8 +81,8 @@ lbox_ctl_on_schema_init(struct lua_State *L) static int lbox_ctl_clear_synchro_queue(struct lua_State *L) { - (void) L; - box_clear_synchro_queue(true); + if (box_clear_synchro_queue(true) != 0) + return luaT_error(L);
>>> Maybe better use nil + error object return way? I thought we still use it in the new code.
>> Hm, I haven't seen us do that in lua/C.
>> As far as I know, every box.* method throws a lua error in case of failure.
>> I may miss something. Is there a reason for returning nil + error instead of
>> throwing?
> The only reason is that it is our new rule for writing Lua code.
> http://www.tarantool.io/en/doc/latest/dev_guide/lua_style_guide/#error-handling
>
> Not every box function, really.
>
> As some examples I can remember box.session.push(),
> box.unprepare, box.execute, box.prepare.
>
> There are also examples for non-box Lua methods. You
> can grep by luaT_push_nil_and_error() to find almost all
> usages in Lua C API. To find usages in .lua files you
> can try to look for `return nil, err*`, but it won't
> show everything.
>
> For me nil,err looks better, but I understand that there
> is a lot of old code still using exceptions. And maybe we
> would better stick to them. I suggest to ask in the chat
> if in doubt.
Thanks for the explanation.
I'm still not sure whether we should change it to nil, err for
`clear_synchro_queue` or not.
Looks like after the chat discussion we should leve it as is (throw an
error).
--
Serge Petrenko
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH 2/4] relay: rename is_raft_enabled message to relay_is_running
2020-12-17 21:43 ` Vladislav Shpilevoy
@ 2020-12-23 12:01 ` Serge Petrenko
0 siblings, 0 replies; 17+ messages in thread
From: Serge Petrenko @ 2020-12-23 12:01 UTC (permalink / raw)
To: Vladislav Shpilevoy, Cyrill Gorcunov; +Cc: tarantool-patches
18.12.2020 00:43, Vladislav Shpilevoy пишет:
> Thanks for the patch!
>
> I have an idea how to do this all a bit easier probably. But could
> take a few more commits.
>
> AFAIU, here is what we need, looking from above.
>
> We need box_clear_synchro_queue() wait until quorum of replicas got
> the changes of the instance owning the limbo.
>
> Limbo already collects the LSNs on the node owning the limbo, through
> txn_limbo_ack(). This function populates a vclock object with versions
> of the owner LSN how it is seen on the replicas.
>
> If we would collect LSNs into that vclock on the replica nodes too, it
> would be quite simple to check if a quorum is reached. Just do the
> same what txn_limbo_assign_local_lsn() does - scan vclock and check if
> there is a quorum.
> If there is no a quorum, the replica must wait for it. We can add a
> trigger to the limbo (on_ack()) or directly to tx_status_update(),
> and wait for the moment when the quorum is collected.
Thanks for your comments!
Please see v2 of this patch.
Below are my answers. Which are mostly of historical interest now, since
a lot is different in the new patch.
Yes, your proposal looks fine and it's rather easy to implement.
I didn't look at the limbo code much and simply decided that
it'd be too complex to rewrite txn_limbo_ack() that way.
Looks like it's not that hard, after all.
> Another option - copy limbo's vclock on the stack in box_clear_synchro_queue(),
> check current number of confirmed replicas, then hang a trigger on
> tx_status_update(), and wait. Even simpler than the first option.
> The reason why I don't like or don't get yet (maybe it is good?) the
> approach through the relays is that firstly I want is_raft_enabled be
> deleted instead of extended. It is an ugly crutch, which probably
> should be removed by this:
> https://github.com/tarantool/tarantool/issues/5438.
Ok, I see. I think though the mechanism itself is not that bad.
Maybe we can remove the is_raft_enabled mesage, which is really
a crutch, but leave the tx notification mechanism. It looks useful
to know when the relay is operational and when it's not.
What do you think?
P.S. I ended up hanging a trigger on tx_status_update, and make this
trigger perform roughly the same work it did in the first version of
the patch.
> Secondly, any changes in relay not related to just reading and writing
> network scare shit out of me. It is a complex thing running in a
> separate thread, so better avoid adding any more complex logic to it.
> It should be as dumb as possible. Until all relays are moved to one
> thread (WAL) with network and disk IO served through worker threads.
> What I see in the last commit now looks extremely complex.
Ok, I see your point.
> This is a discussion proposal. I may be wrong here and miss something.
>
> On 10.12.2020 21:55, Serge Petrenko via Tarantool-patches wrote:
>> Relay sends a raft enabler/disabler message on start and stop. This
>> message would also be useful to perform some additional work related to relay
>> initialization/deinitialization done in the tx thread.
>>
>> For example, the message will be used to notify tx thread that there's no
>> point in wating until a relay replicates a specific row, since the relay
>> is exiting. This is the case in #5435.
> Why do we need such a notification? Why can't we let box_clear_synchro_queue()
> simply wait until timeout? What if the timeout is 10 sec, and we gave up
> immediately because there was no point, but in 5 sec the relays would start,
> deliver everything, and the queue could be cleared?
As far as I understood, we need to wait for quorum infinitely,
not for some timeout. However, it's easy to add such a timeout.
As I understand once the node reaches timeout it stays in leader
state (in case raft is enabled), but remains read-only? In order
to make it operational again the user has to call
box.ctl_clear_synchro_queue() manually. Is this correct?
> P.S. I read the last commit and now I see that you don't wait for timeout.
> But still you want infinitely. So does it matter if a relay has exited or
> not anyway?
We need the relay to notify tx thread when it's exiting to destroy the
watchers.
Here's what I do. I do wait infinitely, but only when it makes sense: i.e.
I start a watcher for every connected relay. While there are enough
running watchers
to gather quorum, I sleep and get waken up by each firing watcher.
The relay exit notification is needed to cease waiting once there's not
enough
connected replicas anymore. Every time a relay thread exits the fiber
executing
clear_synchro_queue gets notified and sees that the watcher count has
decreased.
--
Serge Petrenko
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher
[not found] ` <4b7f4fc1-6d48-4332-c432-1eeb0b28c016@tarantool.org>
@ 2020-12-23 12:03 ` Serge Petrenko
0 siblings, 0 replies; 17+ messages in thread
From: Serge Petrenko @ 2020-12-23 12:03 UTC (permalink / raw)
To: Vladislav Shpilevoy, Cyrill Gorcunov; +Cc: tarantool-patches
21.12.2020 18:31, Serge Petrenko пишет:
>
>
> 18.12.2020 00:43, Vladislav Shpilevoy пишет:
>> Thanks for the patch and for the design!
Thanks for the review!
>>
>> See 3 comments below.
>>
>> On 10.12.2020 21:55, Serge Petrenko via Tarantool-patches wrote:
>>> Add a list of lsn watchers to relay.
>>>
>>> Relay_lsn_watcher is a structure that lives in tx thread and monitors
>>> replica's progress in applying rows coming from a given replica id.
>>>
>>> The watcher fires once relay reports that replica has acked the target lsn
>>> for the given replica id.
>>>
>>> The watcher is owned by some tx fiber, which typically sleeps until the
>>> watcher wakes it up. Besides waking the waiting fiber up, the watcher may
>>> perform some work as dictated by the notify function.
>>>
>>> Prerequisite #5435
>>> ---
>>> src/box/relay.cc | 73 ++++++++++++++++++++++++++++++++++++++++++++++--
>>> src/box/relay.h | 44 +++++++++++++++++++++++++++++
>>> 2 files changed, 114 insertions(+), 3 deletions(-)
>>>
>>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>>> index f342a79dd..4014792a6 100644
>>> --- a/src/box/relay.cc
>>> +++ b/src/box/relay.cc
>>> @@ -394,6 +397,54 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
>>> });
>>> }
>>>
>>> +void
>>> +relay_set_lsn_watcher(struct relay *relay, struct relay_lsn_watcher *watcher)
>>> +{
>>> + rlist_add_tail_entry(&relay->tx.lsn_watchers, watcher, in_list);
>>> +}
>>> +
>>> +void
>>> +relay_lsn_watcher_create(struct relay_lsn_watcher *watcher, uint32_t replica_id,
>>> + int64_t target_lsn, relay_lsn_watcher_f notify,
>>> + relay_lsn_watcher_f destroy, void *data)
>>> +{
>>> + watcher->replica_id = replica_id;
>>> + watcher->target_lsn = target_lsn;
>>> + watcher->notify = notify;
>>> + watcher->destroy = destroy;
>>> + watcher->data = data;
>>> + watcher->waiter = fiber();
>>> +}
>> 1. The notify + destroy + invocation of the watchers looks very similar
>> to what trigger.h API offers. Can you try to implement this as triggers?
>> I mean as a list of struct trigger objects.
>
> Yes, it's possible to implement this as a trigger. It'll be called on
> every
> tx_status_update() invocation then.
P.S. I ended up implementing this watcher as a trigger.
>
>>> +
>>> +/**
>>> + * Destroy the watcher.
>>> + * Wake the waiting fiber up, fire the on destroy callback and remove the
>>> + * watcher from the relay's watcher list.
>>> + */
>>> +static void
>>> +relay_lsn_watcher_destroy(struct relay_lsn_watcher *watcher)
>>> +{
>>> + watcher->destroy(watcher->data);
>>> + fiber_wakeup(watcher->waiter);
>>> + rlist_del_entry(watcher, in_list);
>>> +}
>>> +
>>> +/**
>>> + * Notify the watcher that the replica has advanced to the given vclock.
>>> + * In case target_lsn is hit for watcher's replica_id, fire the notify
>>> + * callback and destroy the watcher.
>>> + */
>>> +static void
>>> +relay_lsn_watcher_notify(struct relay_lsn_watcher *watcher,
>>> + struct vclock *vclock)
>>> +{
>>> + int64_t lsn = vclock_get(vclock, watcher->replica_id);
>>> + if (lsn >= watcher->target_lsn) {
>>> + watcher->notify(watcher->data);
>>> + relay_lsn_watcher_destroy(watcher);
>>> + }
>>> +}
>> 2. This all seems too intricate. Why does the watcher destroys itself?
>> Wouldn't it be easier to let the watcher decide if he wants to die, or
>> change LSN and wait more, or whatever else?
>>
>> I have a feeling that once you will try to switch to struct triggers,
>> it will all come together automatically. Since triggers API is notably
>> hard to misuse.
>
> Sounds good.
>
>>> +
>>> /**
>>> * The message which updated tx thread with a new vclock has returned back
>>> * to the relay.
>>> diff --git a/src/box/relay.h b/src/box/relay.h
>>> index b32e2ea2a..da2e3498a 100644
>>> --- a/src/box/relay.h
>>> +++ b/src/box/relay.h
>>> @@ -134,4 +135,47 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
>>> struct vclock *replica_vclock, uint32_t replica_version_id,
>>> uint32_t replica_id_filter);
>>>
>>> +/**
>>> + * A callback invoked once lsn watcher sees the replica has reached the target
>>> + * lsn for the given replica id.
>>> + */
>>> +typedef void (*relay_lsn_watcher_f)(void *data);
>>> +
>>> +/**
>>> + * A relay watcher which notifies tx via calling a given function once the
>>> + * replica confirms it has received the rows from replica_id up to target_lsn.
>>> + */
>>> +struct relay_lsn_watcher {
>>> + /** A replica_id to monitor rows from. */
>>> + uint32_t replica_id;
>>> + /** The lsn to wait for. */
>>> + int64_t target_lsn;
>>> + /**
>>> + * A callback invoked in the tx thread once the watcher sees that
>>> + * replica has reached the target lsn.
>>> + */
>>> + relay_lsn_watcher_f notify;
>>> + /**
>>> + * A callback fired once the relay thread exits to notify the waiter
>>> + * there's nothing to wait for anymore.
>>> + */
>>> + relay_lsn_watcher_f destroy;
>>> + /** Data passed to \a notify and \a destroy. */
>> 3. Lets better use @a. To be consistent with all the new code (except
>> cases when a whole file uses tons of \a).
>
> Ok, thanks for pointing this out!
>>> + void *data;
>>> + /** A fiber waiting for this watcher to fire. */
>>> + struct fiber *waiter;
>>> + /** A link in relay's watcher list. */
>>> + struct rlist in_list;
>>> +};
>
> --
> Serge Petrenko
--
Serge Petrenko
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything
2020-12-17 21:43 ` Vladislav Shpilevoy
@ 2020-12-23 12:04 ` Serge Petrenko
0 siblings, 0 replies; 17+ messages in thread
From: Serge Petrenko @ 2020-12-23 12:04 UTC (permalink / raw)
To: Vladislav Shpilevoy, Cyrill Gorcunov; +Cc: tarantool-patches
18.12.2020 00:43, Vladislav Shpilevoy пишет:
> Thanks for the patch!
>
> See 7 comments below.
Hi! Thanks for the review!
There's v2 of the patchset, where everything's different,
so my answers don't matter much, but still.
>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index 8e0c9a160..fb9167977 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -1001,6 +1001,25 @@ box_set_replication_anon(void)
>>
>> }
>>
>> +struct lsn_watcher_data {
>> + int *ack_count;
>> + int *watcher_count;
>> +};
>> +
>> +static void
>> +count_confirm_f(void *data)
>> +{
>> + struct lsn_watcher_data *d = (struct lsn_watcher_data *)data;
>> + (*d->ack_count)++;
>> +}
>> +
>> +static void
>> +watcher_destroy_f(void *data)
>> +{
>> + struct lsn_watcher_data *d = (struct lsn_watcher_data *)data;
>> + (*d->watcher_count)--;
> An ultra-pro-master hint - you wouldn't need () if you would use
> prefix -- and ++. But you decide, no rules.
Wow, cool. I didn't think of this.
>> +}
>> +
>> int
>> box_clear_synchro_queue(bool try_wait)
>> {
>> @@ -1032,34 +1055,93 @@ box_clear_synchro_queue(bool try_wait)
>> }
>> }
>>
>> - if (!txn_limbo_is_empty(&txn_limbo)) {
>> - int64_t lsns[VCLOCK_MAX];
>> - int len = 0;
>> - const struct vclock *vclock;
>> + if (txn_limbo_is_empty(&txn_limbo))
>> + return 0;
>> +
>> + /*
>> + * Allocate the watchers statically to not bother with alloc/free.
>> + * This is fine since we have a single execution guard.
>> + */
>> + static struct relay_lsn_watcher watchers[VCLOCK_MAX];
>> + for (int i = 1; i < VCLOCK_MAX; i++)
>> + rlist_create(&watchers[i].in_list);
>> +
>> + int64_t wait_lsn = 0;
>> + bool restart = false;
>> + do {
>> + wait_lsn = txn_limbo_last_entry(&txn_limbo)->lsn;
> 1. What if the last transaction is asynchronous? They have -1 lsn in
> the limbo.
Oops, you're correct. I thought every limbo entry coming from a remote
instance had an lsn assigned.
>> + /*
>> + * Take this node into account immediately.
>> + * clear_synchro_queue() is a no-op on the limbo owner for now,
>> + * so all the rows in the limbo must've come through the applier
>> + * and so they already have an lsn assigned, even if their wal
>> + * write isn't finished yet.
>> + */
>> + assert(wait_lsn > 0);
>> + int count = vclock_get(box_vclock, former_leader_id) >= wait_lsn;
>> + int watcher_count = 0;
>> + struct lsn_watcher_data data = {
>> + .ack_count = &count,
>> + .watcher_count = &watcher_count,
>> + };
>> +
>> replicaset_foreach(replica) {
>> - if (replica->relay != NULL &&
>> - relay_get_state(replica->relay) != RELAY_OFF &&
>> - !replica->anon) {
>> - assert(!tt_uuid_is_equal(&INSTANCE_UUID,
>> - &replica->uuid));
>> - vclock = relay_vclock(replica->relay);
>> - int64_t lsn = vclock_get(vclock,
>> - former_leader_id);
>> - lsns[len++] = lsn;
>> + if (replica->anon || replica->relay == NULL ||
> 2. replica->relay is never NULL AFAIR.
Yes, you're correct. I never noticed that. Thanks for pointing this out.
>> + relay_get_state(replica->relay) != RELAY_FOLLOW)
>> + continue;
>> + assert(replica->id != 0);
> 3. Maybe better use REPLICA_ID_NIL.
I'm asserting here that watchers[0] is never set or used. This would be
hidden by a check for REPLICA_ID_NIL.
Is it possible that REPLICA_ID_NIL becomes, say, -1 one day? Then
the assertion will lose its meaning.
Anyway, by looking at `id != REPLICA_ID_NIL` you get no info on whether
watchers[0] should be set or not.
P.S. this part is also reworked. And you were correct, as always.
>> + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid));
>> +
>> + if (vclock_get(relay_vclock(replica->relay),
>> + former_leader_id) >= wait_lsn) {
>> + count++;
>> + continue;
>> }
>> +
>> + relay_lsn_watcher_create(&watchers[replica->id],
>> + former_leader_id, wait_lsn,
>> + count_confirm_f,
>> + watcher_destroy_f, &data);
>> + relay_set_lsn_watcher(replica->relay,
>> + &watchers[replica->id]);
>> + watcher_count++;
>> }
>> - lsns[len++] = vclock_get(box_vclock, former_leader_id);
>> - assert(len < VCLOCK_MAX);
>>
>> - int64_t confirm_lsn = 0;
>> - if (len >= replication_synchro_quorum) {
>> - qsort(lsns, len, sizeof(int64_t), cmp_i64);
>> - confirm_lsn = lsns[len - replication_synchro_quorum];
>> +
>> + while (count < replication_synchro_quorum &&
>> + count + watcher_count >= replication_synchro_quorum) {
>> + fiber_yield();
> 4. Does it mean the fiber hangs infinitely? I was thinking we could wait
> for the timeout and return an error when we failed to wait.
Well, the fiber hangs while there is a possibility to collect quorum.
Each time a relay exits, which should happen after a replication timeout
when the replica is unresponsive, the corresponding watcher gets destroyed.
When being destroyed it decreases watcher count and wakes the fiber up.
Just like when it notifies the fiber that the `ack_count` is increased.
So, once there are not enough watchers (relays) to collect a quorum, all
the watchers
are destroyed and the fiber sleeps for replication timeout.
Then all the watchers are recreated and the process repeats.
I'm either adding a timeout here, or a cancellation check.
P.S. added both.
> Or maybe at least add a fiber cancellation check here so as this function
> could be stopped somehow. Currently the function is basically immortal and
> infinite if no quorum for too long time.
>
>> }
>>
>> - txn_limbo_force_empty(&txn_limbo, confirm_lsn);
>> - assert(txn_limbo_is_empty(&txn_limbo));
>> - }
>> + /*
>> + * In case some new limbo entries arrived, confirm them as well.
>> + */
>> + restart = wait_lsn < txn_limbo_last_entry(&txn_limbo)->lsn;
>> +
>> + /*
>> + * Not enough replicas connected. Give user some time to
>> + * reconfigure quorum and replicas some time to reconnect, then
>> + * restart the watchers.
>> + */
>> + if (count + watcher_count < replication_synchro_quorum) {
>> + say_info("clear_syncrho_queue cannot collect quorum: "
>> + "number of connected replicas (%d) is less "
>> + "than replication_synchro_quorum (%d). "
>> + "Will retry in %.2f seconds",
>> + count + watcher_count,
>> + replication_synchro_quorum,
>> + replication_timeout);
>> + fiber_sleep(replication_timeout);
> 5. The fixed time sleep looks not much better than the polling in
> the old version of box_clear_synchro_queue(). Can we not just sleep
> but wait for an event?
Yes, that'd be better. Wait for replica connect then?
P.S. I decided to drop the 'retry' thing altogether.
Let the user re-start `clear_synchro_queue` manually if there are not
enough replicas.
>> + restart = true;
>> + }
>> +
>> + /* Detach the watchers that haven't fired. */
>> + for (int i = 1; i < VCLOCK_MAX; i++)
>> + rlist_del_entry(&watchers[i], in_list);
>> + } while (restart);
>> +
>> + txn_limbo_force_empty(&txn_limbo, wait_lsn);
>> + assert(txn_limbo_is_empty(&txn_limbo));
>> return 0;
>> diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
>> new file mode 100644
>> index 000000000..e633f9e60
>> --- /dev/null
>> +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
>> @@ -0,0 +1,137 @@
>> +-- test-run result file version 2
>> +test_run = require('test_run').new()
>> + | ---
>> + | ...
>> +
>> +--
>> +-- gh-5435: make sure the new limbo owner commits everything there is left in
>> +-- the limbo from an old owner.
>> +--
>> +
>> +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
>> + | ---
>> + | ...
>> +
>> +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'})
>> + | ---
>> + | ...
>> +test_run:wait_fullmesh(SERVERS)
>> + | ---
>> + | ...
>> +
>> +-- Force election_replica1 to become leader.
>> +test_run:switch('election_replica2')
>> + | ---
>> + | - true
>> + | ...
>> +box.cfg{election_mode='voter'}
>> + | ---
>> + | ...
>> +test_run:switch('election_replica3')
>> + | ---
>> + | - true
>> + | ...
>> +box.cfg{election_mode='voter'}
>> + | ---
>> + | ...
>> +
>> +test_run:switch('election_replica1')
>> + | ---
>> + | - true
>> + | ...
>> +box.ctl.wait_rw()
>> + | ---
>> + | ...
>> +
>> +_ = box.schema.space.create('test', {is_sync=true})
>> + | ---
>> + | ...
>> +_ = box.space.test:create_index('pk')
>> + | ---
>> + | ...
>> +
>> +-- Fill the limbo with pending entries. 3 mustn't receive them yet.
>> +test_run:cmd('stop server election_replica3')
>> + | ---
>> + | - true
>> + | ...
>> +box.cfg{replication_synchro_quorum=3}
>> + | ---
>> + | ...
>> +
>> +lsn = box.info.lsn
>> + | ---
>> + | ...
>> +
>> +for i=1,10 do\
>> + require('fiber').create(function() box.space.test:insert{i} end)\
>> +end
>> + | ---
>> + | ...
>> +
>> +-- Wait for WAL write and replication.
>> +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end)
>> + | ---
>> + | - true
>> + | ...
>> +test_run:wait_lsn('election_replica2', 'election_replica1')
>> + | ---
>> + | ...
>> +
>> +test_run:cmd('switch election_replica2')
>> + | ---
>> + | - true
>> + | ...
>> +
>> +test_run:cmd('stop server election_replica1')
>> + | ---
>> + | - true
>> + | ...
>> +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
>> +-- It will vote for 3, however, since 3 has newer data, and start replication
>> +-- once 3 becomes the leader.
> 6. 'vote for 2', '2 has newer data', '2 becomes the leader'. Otherwise I don't
> understand.
>
> Here you are at node 2, and below you make it candidate + wait_rw(). So it
> becomes a leader, not 3.
Thanks! You're correct.
>> +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
>> + | ---
>> + | - true
>> + | ...
>> +
>> +box.cfg{election_mode='candidate'}
>> + | ---
>> + | ...
>> +box.ctl.wait_rw()
>> + | ---
>> + | ...
>> +
>> +-- If 2 decided whether to keep the rows or not right on becoming the leader,
>> +-- it would roll them all back. Make sure 2 waits till the rows are replicated
>> +-- to 3.
>> +box.space.test:select{}
>> + | ---
>> + | - - [1]
>> + | - [2]
>> + | - [3]
>> + | - [4]
>> + | - [5]
>> + | - [6]
>> + | - [7]
>> + | - [8]
>> + | - [9]
>> + | - [10]
>> + | ...
>> +
>> +test_run:cmd('switch default')
>> + | ---
>> + | - true
>> + | ...
>> +-- To silence the QA warning. The 1st replica is already stopped.
>> +SERVERS[1] = nil
>> + | ---
>> + | ...
>> +test_run:cmd('delete server election_replica1')
>> + | ---
>> + | - true
>> + | ...
>> +test_run:drop_cluster(SERVERS)
>> + | ---
>> + | ...
> 7. The test fails when I try to run multiple instances in parallel
> like this:
>
> python test-run.py gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear- gh-5435-clear-
>
> It shows that the new leader rolled the data back.
Ok, will re-check.
I've fixed the test, here's the diff:
=================================================
diff --git
a/test/replication/gh-5435-clear-synchro-queue-commit-all.result
b/test/replication/gh-5435-clear-synchro-
queue-commit-all.result
index e633f9e60..e806d9d53 100644
--- a/test/replication/gh-5435-clear-synchro-queue-commit-all.result
+++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
@@ -55,7 +55,7 @@ test_run:cmd('stop server election_replica3')
| ---
| - true
| ...
-box.cfg{replication_synchro_quorum=3}
+box.cfg{replication_synchro_quorum=3, replication_synchro_timeout=1000}
| ---
| ...
@@ -88,13 +88,20 @@ test_run:cmd('stop server election_replica1')
| - true
| ...
-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
--- It will vote for 3, however, since 3 has newer data, and start
replication
--- once 3 becomes the leader.
+-- It will vote for 2, however, since 2 has newer data, and start
replication
+-- once 2 becomes the leader.
test_run:cmd('start server election_replica3 with wait=False,
wait_load=False, args="2 0.4 voter 2"')
| ---
| - true
| ...
+-- Set a huge timeout for 2 reasons.
+-- First, this guards us from the intance leaving clear_synchro_queue
too early
+-- and confirming nothing.
+-- Second, it lets us test that the instance doesn't wait for the full
timeout.
+box.cfg{replication_synchro_timeout=1000}
+ | ---
+ | ...
box.cfg{election_mode='candidate'}
| ---
| ...
--
Serge Petrenko
^ permalink raw reply [flat|nested] 17+ messages in thread
end of thread, other threads:[~2020-12-23 12:04 UTC | newest]
Thread overview: 17+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-12-10 20:55 [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue commit everything Serge Petrenko
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 1/4] box: add a single execution guard to clear_synchro_queue Serge Petrenko
2020-12-17 21:43 ` Vladislav Shpilevoy
2020-12-21 10:18 ` Serge Petrenko
2020-12-21 17:11 ` Vladislav Shpilevoy
2020-12-23 12:01 ` Serge Petrenko
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 2/4] relay: rename is_raft_enabled message to relay_is_running Serge Petrenko
2020-12-17 21:43 ` Vladislav Shpilevoy
2020-12-23 12:01 ` Serge Petrenko
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher Serge Petrenko
2020-12-17 21:43 ` Vladislav Shpilevoy
[not found] ` <4b7f4fc1-6d48-4332-c432-1eeb0b28c016@tarantool.org>
2020-12-23 12:03 ` Serge Petrenko
2020-12-10 20:55 ` [Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything Serge Petrenko
2020-12-17 21:43 ` Vladislav Shpilevoy
2020-12-23 12:04 ` Serge Petrenko
2020-12-11 7:15 ` [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue " Serge Petrenko
2020-12-11 9:19 ` Serge Petrenko
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox