[Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader
Serge Petrenko
sergepetrenko at tarantool.org
Sun Jul 5 14:09:36 MSK 2020
05.07.2020 02:18, Vladislav Shpilevoy пишет:
> Here is also a general problem - having this as box.cfg option
> means, that the selected leader should stay selected regardless
> of what happens in the cluster. In particular, it should reject
> any attempts to add an entry into the limbo, not originated from
> this instance.
>
> Currently this is not guaranteed, see comment below.
Thanks for the answer!
>
>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index ca24b98ca..087710383 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -78,6 +78,7 @@
>> #include "sequence.h"
>> #include "sql_stmt_cache.h"
>> #include "msgpack.h"
>> +#include "trivia/util.h"
>>
>> static char status[64] = "unknown";
>>
>> @@ -945,6 +946,84 @@ box_set_replication_anon(void)
>>
>> }
>>
>> +void
>> +box_set_replication_synchro_leader(void)
>> +{
>> + bool is_leader = cfg_geti("replication_synchro_leader");
>> + /*
>> + * For now no actions required when an instance stops
>> + * being a leader. We should probably wait until txn_limbo
>> + * becomes empty.
>> + */
>> + if (!is_leader)
>> + return;
>> + uint32_t former_leader_id = txn_limbo.instance_id;
>> + if (former_leader_id == REPLICA_ID_NIL ||
>> + former_leader_id == instance_id) {
> When limbo is empty, it will change its instance id to whatever
> entry will be added next. So it can happen, that I gave replication_synchro_leader
> to 2 instances, and if they will create transactions one at a
> time, this will work. But looks wrong.
Good catch.
>
> Perhaps it would be better to add a box.ctl function to do this
> 'limbo cleanup'? Without persisting any leader role in a config.
> Until we have a better understanding how leader-read_only-master
> roles coexist.
Agree.
I updated the patch according to your comments.
I'm posting it here.
Subject: [PATCH] box.ctl: introduce clear_synchro_queue function
Introduce a new function to box.ctl API: box.ctl.clear_synchro_queue()
The function performs some actions to make sure that after it's
executed, the txn_limbo is free of any transactions issued on a remote
instance.
In order to achieve this goal, the instance first waits for 2
replication_synchro_timeouts so that confirmations and rollbacks from
the remote instance reach it.
If the limbo remains non-empty, the instance starts figuring out which
transactions should be confirmed and which should be rolled back. In
order to do so the instance scans through vclocks of all the instances
that replicate from it and defines which old leader's lsn is the last
reached by replication_synchro_quorum of replicas.
Then the instance writes appropriate CONFIRM and ROLLBACK entries.
After these actions the limbo must be empty.
Closes #4849
---
src/box/box.cc | 50 +++++++++++++++++++++++++++++++++++++++++++++
src/box/box.h | 2 ++
src/box/lua/ctl.c | 9 ++++++++
src/box/txn_limbo.c | 26 +++++++++++++++++++++++
src/box/txn_limbo.h | 10 +++++++++
5 files changed, 97 insertions(+)
diff --git a/src/box/box.cc b/src/box/box.cc
index ca24b98ca..749c96ca1 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -78,6 +78,7 @@
#include "sequence.h"
#include "sql_stmt_cache.h"
#include "msgpack.h"
+#include "trivia/util.h"
static char status[64] = "unknown";
@@ -945,6 +946,55 @@ box_set_replication_anon(void)
}
+void
+box_clear_synchro_queue(void)
+{
+ if (!is_box_configured || txn_limbo_is_empty(&txn_limbo))
+ return;
+ uint32_t former_leader_id = txn_limbo.instance_id;
+ assert(former_leader_id != REPLICA_ID_NIL);
+ if (former_leader_id == instance_id)
+ return;
+
+ /* Wait until pending confirmations/rollbacks reach us. */
+ double timeout = 2 * txn_limbo_confirm_timeout(&txn_limbo);
+ double start_tm = fiber_time();
+ while (!txn_limbo_is_empty(&txn_limbo)) {
+ if (fiber_time() - start_tm > timeout)
+ break;
+ fiber_sleep(0.001);
+ }
+
+ if (!txn_limbo_is_empty(&txn_limbo)) {
+ int64_t lsns[VCLOCK_MAX];
+ int len = 0;
+ const struct vclock *vclock;
+ replicaset_foreach(replica) {
+ if (replica->relay != NULL &&
+ relay_get_state(replica->relay) != RELAY_OFF &&
+ !replica->anon) {
+ assert(!tt_uuid_is_equal(&INSTANCE_UUID,
+ &replica->uuid));
+ vclock = relay_vclock(replica->relay);
+ int64_t lsn = vclock_get(vclock,
+ former_leader_id);
+ lsns[len++] = lsn;
+ }
+ }
+ lsns[len++] = vclock_get(box_vclock, former_leader_id);
+ assert(len < VCLOCK_MAX);
+
+ int64_t confirm_lsn = 0;
+ if (len >= replication_synchro_quorum) {
+ qsort(lsns, len, sizeof(int64_t), cmp_i64);
+ confirm_lsn = lsns[len - replication_synchro_quorum];
+ }
+
+ txn_limbo_force_empty(&txn_limbo, confirm_lsn);
+ assert(txn_limbo_is_empty(&txn_limbo));
+ }
+}
+
void
box_listen(void)
{
diff --git a/src/box/box.h b/src/box/box.h
index f9789154e..5c4a5ed78 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -258,6 +258,8 @@ extern "C" {
typedef struct tuple box_tuple_t;
+void box_clear_synchro_queue(void);
+
/* box_select is private and used only by FFI */
API_EXPORT int
box_select(uint32_t space_id, uint32_t index_id,
diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c
index 85ed30c50..2017ddc18 100644
--- a/src/box/lua/ctl.c
+++ b/src/box/lua/ctl.c
@@ -78,11 +78,20 @@ lbox_ctl_on_schema_init(struct lua_State *L)
return lbox_trigger_reset(L, 2, &on_schema_init, NULL, NULL);
}
+static int
+lbox_ctl_clear_synchro_queue(struct lua_State *L)
+{
+ (void) L;
+ box_clear_synchro_queue();
+ return 0;
+}
+
static const struct luaL_Reg lbox_ctl_lib[] = {
{"wait_ro", lbox_ctl_wait_ro},
{"wait_rw", lbox_ctl_wait_rw},
{"on_shutdown", lbox_ctl_on_shutdown},
{"on_schema_init", lbox_ctl_on_schema_init},
+ {"clear_synchro_queue", lbox_ctl_clear_synchro_queue},
{NULL, NULL}
};
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index 44a0c7273..9603d3eb3 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -482,6 +482,32 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
return 0;
}
+void
+txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn)
+{
+ struct txn_limbo_entry *e, *last_quorum = NULL;
+ struct txn_limbo_entry *rollback = NULL;
+ rlist_foreach_entry(e, &limbo->queue, in_queue) {
+ if (txn_has_flag(e->txn, TXN_WAIT_ACK)) {
+ if (e->lsn <= confirm_lsn) {
+ last_quorum = e;
+ } else {
+ rollback = e;
+ break;
+ }
+ }
+ }
+
+ if (last_quorum != NULL) {
+ txn_limbo_write_confirm(limbo, last_quorum->lsn);
+ txn_limbo_read_confirm(limbo, last_quorum->lsn);
+ }
+ if (rollback != NULL) {
+ txn_limbo_write_rollback(limbo, rollback->lsn);
+ txn_limbo_read_rollback(limbo, rollback->lsn - 1);
+ }
+}
+
void
txn_limbo_init(void)
{
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 3abbe9e85..1c945f21f 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -237,6 +237,16 @@ txn_limbo_confirm_timeout(struct txn_limbo *limbo);
int
txn_limbo_wait_confirm(struct txn_limbo *limbo);
+/**
+ * Make txn_limbo confirm all the entries with lsn less than or
+ * equal to the given one, and rollback all the following entries.
+ * The function makes txn_limbo write CONFIRM and ROLLBACK
+ * messages for appropriate lsns, and then process the messages
+ * immediately.
+ */
+void
+txn_limbo_force_empty(struct txn_limbo *limbo, int64_t last_confirm);
+
void
txn_limbo_init();
--
2.24.3 (Apple Git-128)
--
Serge Petrenko
More information about the Tarantool-patches
mailing list