Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH 0/2] introduce actions on leader election
@ 2020-07-04 21:55 Serge Petrenko
  2020-07-04 21:55 ` [Tarantool-patches] [PATCH 1/2] util: move cmp_i64 from xlog.c to util.h Serge Petrenko
  2020-07-04 21:55 ` [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader Serge Petrenko
  0 siblings, 2 replies; 8+ messages in thread
From: Serge Petrenko @ 2020-07-04 21:55 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches

Branch: gh-4842-sync-replication
Issue: https://github.com/tarantool/tarantool/issues/4849

Serge Petrenko (2):
  util: move cmp_i64 from xlog.c to util.h
  box: introduce a cfg handle to become syncro leader

 src/box/box.cc           | 79 ++++++++++++++++++++++++++++++++++++++++
 src/box/box.h            |  1 +
 src/box/lua/cfg.cc       |  9 +++++
 src/box/lua/load_cfg.lua |  4 ++
 src/box/txn_limbo.c      | 16 +-------
 src/box/txn_limbo.h      | 15 ++++++++
 src/box/xlog.c           | 10 +----
 src/trivia/util.h        | 11 ++++++
 8 files changed, 122 insertions(+), 23 deletions(-)

-- 
2.24.3 (Apple Git-128)

^ permalink raw reply	[flat|nested] 8+ messages in thread

* [Tarantool-patches] [PATCH 1/2] util: move cmp_i64 from xlog.c to util.h
  2020-07-04 21:55 [Tarantool-patches] [PATCH 0/2] introduce actions on leader election Serge Petrenko
@ 2020-07-04 21:55 ` Serge Petrenko
  2020-07-04 21:55 ` [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader Serge Petrenko
  1 sibling, 0 replies; 8+ messages in thread
From: Serge Petrenko @ 2020-07-04 21:55 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches

The comparator will be needed in other files too, e.g. box.cc

Prerequisite #4849
---
 src/box/xlog.c    | 10 +---------
 src/trivia/util.h | 11 +++++++++++
 2 files changed, 12 insertions(+), 9 deletions(-)

diff --git a/src/box/xlog.c b/src/box/xlog.c
index b5b082a20..05f8c2e29 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -46,6 +46,7 @@
 #include "xrow.h"
 #include "iproto_constants.h"
 #include "errinj.h"
+#include "trivia/util.h"
 
 /*
  * FALLOC_FL_KEEP_SIZE flag has existed since fallocate() was
@@ -475,15 +476,6 @@ xdir_open_cursor(struct xdir *dir, int64_t signature,
 	return 0;
 }
 
-static int
-cmp_i64(const void *_a, const void *_b)
-{
-	const int64_t *a = (const int64_t *) _a, *b = (const int64_t *) _b;
-	if (*a == *b)
-		return 0;
-	return (*a > *b) ? 1 : -1;
-}
-
 /**
  * Scan (or rescan) a directory with snapshot or write ahead logs.
  * Read all files matching a pattern from the directory -
diff --git a/src/trivia/util.h b/src/trivia/util.h
index 29c7f0194..b344af303 100644
--- a/src/trivia/util.h
+++ b/src/trivia/util.h
@@ -534,6 +534,17 @@ double_compare_int64(double lhs, int64_t rhs, int k)
 	return double_compare_nint64(lhs, rhs, k);
 }
 
+/**
+ * Compare two operands as int64_t.
+ * Needed for qsort.
+ */
+static inline int
+cmp_i64(const void *_a, const void *_b)
+{
+	const int64_t *a = (const int64_t *) _a, *b = (const int64_t *) _b;
+	return COMPARE_RESULT(*a, *b);
+}
+
 /**
  * Put the current thread in sleep for the given number of
  * seconds.
-- 
2.24.3 (Apple Git-128)

^ permalink raw reply	[flat|nested] 8+ messages in thread

* [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader
  2020-07-04 21:55 [Tarantool-patches] [PATCH 0/2] introduce actions on leader election Serge Petrenko
  2020-07-04 21:55 ` [Tarantool-patches] [PATCH 1/2] util: move cmp_i64 from xlog.c to util.h Serge Petrenko
@ 2020-07-04 21:55 ` Serge Petrenko
  2020-07-04 23:03   ` Vladislav Shpilevoy
                     ` (2 more replies)
  1 sibling, 3 replies; 8+ messages in thread
From: Serge Petrenko @ 2020-07-04 21:55 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches

Introduce replication_synchro_leader option to box.cfg.

Once an instance is promoted to leader, it makes sure that txn_limbo is
free of previous leader's transactions. In order to achieve this goal,
the instance first waits for 2 replication_synchro_timeouts  so that
confirmations and rollbacks from the former leader reach it.

If the limbo remains non-empty, the new leader starts figuring out which
transactions should be confirmed and which should be rolled back. In
order to do so the instance scans through vclocks of all the instances
that replicate from it and defines which former leader's lsn is the last
reached by replication_synchro_quorum of replicas.

Then the instance writes appropriate CONFIRM and ROLLBACK entries.
After these actions the limbo must be empty, and the instance may
proceed with appending its own entries to the limbo.

Closes #4849
---
 src/box/box.cc           | 79 ++++++++++++++++++++++++++++++++++++++++
 src/box/box.h            |  1 +
 src/box/lua/cfg.cc       |  9 +++++
 src/box/lua/load_cfg.lua |  4 ++
 src/box/txn_limbo.c      | 16 +-------
 src/box/txn_limbo.h      | 15 ++++++++
 6 files changed, 110 insertions(+), 14 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index ca24b98ca..087710383 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -78,6 +78,7 @@
 #include "sequence.h"
 #include "sql_stmt_cache.h"
 #include "msgpack.h"
+#include "trivia/util.h"
 
 static char status[64] = "unknown";
 
@@ -945,6 +946,84 @@ box_set_replication_anon(void)
 
 }
 
+void
+box_set_replication_synchro_leader(void)
+{
+	bool is_leader = cfg_geti("replication_synchro_leader");
+	/*
+	 * For now no actions required when an instance stops
+	 * being a leader. We should probably wait until txn_limbo
+	 * becomes empty.
+	 */
+	if (!is_leader)
+		return;
+	uint32_t former_leader_id = txn_limbo.instance_id;
+	if (former_leader_id == REPLICA_ID_NIL ||
+	    former_leader_id == instance_id) {
+		return;
+	}
+
+	/* Wait until pending confirmations/rollbacks reach us. */
+	double timeout = 2 * txn_limbo_confirm_timeout(&txn_limbo);
+	double start_tm = fiber_time();
+	while (!txn_limbo_is_empty(&txn_limbo)) {
+		if (fiber_time() - start_tm > timeout)
+			break;
+		fiber_sleep(0.001);
+	}
+
+	if (!txn_limbo_is_empty(&txn_limbo)) {
+		int64_t lsns[VCLOCK_MAX];
+		int len = 0;
+		const struct vclock  *vclock;
+		replicaset_foreach(replica) {
+			if (replica->relay != NULL &&
+			    relay_get_state(replica->relay) != RELAY_OFF &&
+			    !replica->anon) {
+				assert(!tt_uuid_is_equal(&INSTANCE_UUID,
+							 &replica->uuid));
+				vclock = relay_vclock(replica->relay);
+				int64_t lsn = vclock_get(vclock,
+							 former_leader_id);
+				lsns[len++] = lsn;
+			}
+		}
+		lsns[len++] = vclock_get(box_vclock, former_leader_id);
+		assert(len < VCLOCK_MAX);
+
+		int64_t confirm_lsn = 0;
+		if (len >= replication_synchro_quorum) {
+			qsort(lsns, len, sizeof(int64_t), cmp_i64);
+			confirm_lsn = lsns[len - replication_synchro_quorum];
+		}
+
+		struct txn_limbo_entry *e, *last_quorum = NULL;
+		struct txn_limbo_entry *rollback = NULL;
+		rlist_foreach_entry(e, &txn_limbo.queue, in_queue) {
+			if (txn_has_flag(e->txn, TXN_WAIT_ACK)) {
+				if (e->lsn <= confirm_lsn) {
+					last_quorum = e;
+				} else {
+					rollback = e;
+					break;
+				}
+			}
+		}
+
+		if (last_quorum != NULL) {
+			confirm_lsn = last_quorum->lsn;
+			txn_limbo_write_confirm(&txn_limbo, confirm_lsn);
+			txn_limbo_read_confirm(&txn_limbo, confirm_lsn);
+		}
+		if (rollback != NULL) {
+			txn_limbo_write_rollback(&txn_limbo, rollback->lsn);
+			txn_limbo_read_rollback(&txn_limbo, rollback->lsn - 1);
+		}
+
+		assert(txn_limbo_is_empty(&txn_limbo));
+	}
+}
+
 void
 box_listen(void)
 {
diff --git a/src/box/box.h b/src/box/box.h
index f9789154e..565b0ebce 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -245,6 +245,7 @@ void box_set_replication_connect_quorum(void);
 void box_set_replication_sync_lag(void);
 int box_set_replication_synchro_quorum(void);
 int box_set_replication_synchro_timeout(void);
+void box_set_replication_synchro_leader(void);
 void box_set_replication_sync_timeout(void);
 void box_set_replication_skip_conflict(void);
 void box_set_replication_anon(void);
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index d481155cd..adc1fcf3f 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -329,6 +329,14 @@ lbox_cfg_set_replication_synchro_timeout(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_cfg_set_replication_synchro_leader(struct lua_State *L)
+{
+	(void) L;
+	box_set_replication_synchro_leader();
+	return 0;
+}
+
 static int
 lbox_cfg_set_replication_sync_timeout(struct lua_State *L)
 {
@@ -388,6 +396,7 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_replication_sync_lag", lbox_cfg_set_replication_sync_lag},
 		{"cfg_set_replication_synchro_quorum", lbox_cfg_set_replication_synchro_quorum},
 		{"cfg_set_replication_synchro_timeout", lbox_cfg_set_replication_synchro_timeout},
+		{"cfg_set_replication_synchro_leader", lbox_cfg_set_replication_synchro_leader},
 		{"cfg_set_replication_sync_timeout", lbox_cfg_set_replication_sync_timeout},
 		{"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict},
 		{"cfg_set_replication_anon", lbox_cfg_set_replication_anon},
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 107bc1582..9a968f30e 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -91,6 +91,7 @@ local default_cfg = {
     replication_sync_timeout = 300,
     replication_synchro_quorum = 1,
     replication_synchro_timeout = 5,
+    replication_synchro_leader = false,
     replication_connect_timeout = 30,
     replication_connect_quorum = nil, -- connect all
     replication_skip_conflict = false,
@@ -168,6 +169,7 @@ local template_cfg = {
     replication_sync_timeout = 'number',
     replication_synchro_quorum = 'number',
     replication_synchro_timeout = 'number',
+    replication_synchro_leader = 'boolean',
     replication_connect_timeout = 'number',
     replication_connect_quorum = 'number',
     replication_skip_conflict = 'boolean',
@@ -286,6 +288,7 @@ local dynamic_cfg = {
     replication_sync_timeout = private.cfg_set_replication_sync_timeout,
     replication_synchro_quorum = private.cfg_set_replication_synchro_quorum,
     replication_synchro_timeout = private.cfg_set_replication_synchro_timeout,
+    replication_synchro_leader = private.cfg_set_replication_synchro_leader,
     replication_skip_conflict = private.cfg_set_replication_skip_conflict,
     replication_anon        = private.cfg_set_replication_anon,
     instance_uuid           = check_instance_uuid,
@@ -333,6 +336,7 @@ local dynamic_cfg_order = {
     -- the new one. This should be fixed when box.cfg is able to
     -- apply some parameters together and atomically.
     replication_anon        = 250,
+    replication_synchro__leader = 250,
 }
 
 local function sort_cfg_cb(l, r)
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index 44a0c7273..992115ad1 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -148,9 +148,6 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 	return entry->is_commit;
 }
 
-static int
-txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn);
-
 int
 txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 {
@@ -261,11 +258,7 @@ rollback:
 	return -1;
 }
 
-/**
- * Write a confirmation entry to WAL. After it's written all the
- * transactions waiting for confirmation may be finished.
- */
-static int
+int
 txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
 {
 	return txn_limbo_write_confirm_rollback(limbo, lsn, true);
@@ -303,12 +296,7 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
 	}
 }
 
-/**
- * Write a rollback message to WAL. After it's written
- * all the transactions following the current one and waiting
- * for confirmation must be rolled back.
- */
-static int
+int
 txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
 {
 	return txn_limbo_write_confirm_rollback(limbo, lsn, false);
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 3abbe9e85..5bf5827ac 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -205,6 +205,21 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
 int
 txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
 
+/**
+ * Write a confirmation entry to WAL. After it's written all the
+ * transactions waiting for confirmation may be finished.
+ */
+int
+txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn);
+
+/**
+ * Write a rollback message to WAL. After it's written
+ * all the transactions following the current one and waiting
+ * for confirmation must be rolled back.
+ */
+int
+txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn);
+
 /**
  * Confirm all the entries up to the given master's LSN.
  */
-- 
2.24.3 (Apple Git-128)

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader
  2020-07-04 21:55 ` [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader Serge Petrenko
@ 2020-07-04 23:03   ` Vladislav Shpilevoy
  2020-07-04 23:18   ` Vladislav Shpilevoy
  2020-07-09 22:03   ` Leonid Vasiliev
  2 siblings, 0 replies; 8+ messages in thread
From: Vladislav Shpilevoy @ 2020-07-04 23:03 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov, sergos; +Cc: tarantool-patches

Hi! Thanks for the patch!

I didn't review it properly yet. Just one comment.

> diff --git a/src/box/box.cc b/src/box/box.cc
> index ca24b98ca..087710383 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -945,6 +946,84 @@ box_set_replication_anon(void)
>  
>  }
>  
> +void
> +box_set_replication_synchro_leader(void)
> +{
> +	bool is_leader = cfg_geti("replication_synchro_leader");
> +	/*
> +	 * For now no actions required when an instance stops
> +	 * being a leader. We should probably wait until txn_limbo
> +	 * becomes empty.
> +	 */
> +	if (!is_leader)
> +		return;
> +	uint32_t former_leader_id = txn_limbo.instance_id;
> +	if (former_leader_id == REPLICA_ID_NIL ||
> +	    former_leader_id == instance_id) {
> +		return;
> +	}
> +
> +	/* Wait until pending confirmations/rollbacks reach us. */
> +	double timeout = 2 * txn_limbo_confirm_timeout(&txn_limbo);
> +	double start_tm = fiber_time();
> +	while (!txn_limbo_is_empty(&txn_limbo)) {
> +		if (fiber_time() - start_tm > timeout)
> +			break;
> +		fiber_sleep(0.001);
> +	}
> +
> +	if (!txn_limbo_is_empty(&txn_limbo)) {
> +		int64_t lsns[VCLOCK_MAX];
> +		int len = 0;
> +		const struct vclock  *vclock;
> +		replicaset_foreach(replica) {
> +			if (replica->relay != NULL &&
> +			    relay_get_state(replica->relay) != RELAY_OFF &&
> +			    !replica->anon) {
> +				assert(!tt_uuid_is_equal(&INSTANCE_UUID,
> +							 &replica->uuid));
> +				vclock = relay_vclock(replica->relay);
> +				int64_t lsn = vclock_get(vclock,
> +							 former_leader_id);
> +				lsns[len++] = lsn;
> +			}
> +		}
> +		lsns[len++] = vclock_get(box_vclock, former_leader_id);
> +		assert(len < VCLOCK_MAX);
> +
> +		int64_t confirm_lsn = 0;
> +		if (len >= replication_synchro_quorum) {
> +			qsort(lsns, len, sizeof(int64_t), cmp_i64);
> +			confirm_lsn = lsns[len - replication_synchro_quorum];
> +		}
> +

Can the code below be moved to txn_limbo.c somehow? Doesn't look
right to touch the queue here.

> +		struct txn_limbo_entry *e, *last_quorum = NULL;
> +		struct txn_limbo_entry *rollback = NULL;
> +		rlist_foreach_entry(e, &txn_limbo.queue, in_queue) {
> +			if (txn_has_flag(e->txn, TXN_WAIT_ACK)) {
> +				if (e->lsn <= confirm_lsn) {
> +					last_quorum = e;
> +				} else {
> +					rollback = e;
> +					break;
> +				}
> +			}
> +		}
> +
> +		if (last_quorum != NULL) {
> +			confirm_lsn = last_quorum->lsn;
> +			txn_limbo_write_confirm(&txn_limbo, confirm_lsn);
> +			txn_limbo_read_confirm(&txn_limbo, confirm_lsn);
> +		}
> +		if (rollback != NULL) {
> +			txn_limbo_write_rollback(&txn_limbo, rollback->lsn);
> +			txn_limbo_read_rollback(&txn_limbo, rollback->lsn - 1);
> +		}
> +
> +		assert(txn_limbo_is_empty(&txn_limbo));
> +	}
> +}

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader
  2020-07-04 21:55 ` [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader Serge Petrenko
  2020-07-04 23:03   ` Vladislav Shpilevoy
@ 2020-07-04 23:18   ` Vladislav Shpilevoy
  2020-07-05 11:09     ` Serge Petrenko
  2020-07-09 22:03   ` Leonid Vasiliev
  2 siblings, 1 reply; 8+ messages in thread
From: Vladislav Shpilevoy @ 2020-07-04 23:18 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov, sergos; +Cc: tarantool-patches

Here is also a general problem - having this as box.cfg option
means, that the selected leader should stay selected regardless
of what happens in the cluster. In particular, it should reject
any attempts to add an entry into the limbo, not originated from
this instance.

Currently this is not guaranteed, see comment below.

> diff --git a/src/box/box.cc b/src/box/box.cc
> index ca24b98ca..087710383 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -78,6 +78,7 @@
>  #include "sequence.h"
>  #include "sql_stmt_cache.h"
>  #include "msgpack.h"
> +#include "trivia/util.h"
>  
>  static char status[64] = "unknown";
>  
> @@ -945,6 +946,84 @@ box_set_replication_anon(void)
>  
>  }
>  
> +void
> +box_set_replication_synchro_leader(void)
> +{
> +	bool is_leader = cfg_geti("replication_synchro_leader");
> +	/*
> +	 * For now no actions required when an instance stops
> +	 * being a leader. We should probably wait until txn_limbo
> +	 * becomes empty.
> +	 */
> +	if (!is_leader)
> +		return;
> +	uint32_t former_leader_id = txn_limbo.instance_id;
> +	if (former_leader_id == REPLICA_ID_NIL ||
> +	    former_leader_id == instance_id) {

When limbo is empty, it will change its instance id to whatever
entry will be added next. So it can happen, that I gave replication_synchro_leader
to 2 instances, and if they will create transactions one at a
time, this will work. But looks wrong.

Perhaps it would be better to add a box.ctl function to do this
'limbo cleanup'? Without persisting any leader role in a config.
Until we have a better understanding how leader-read_only-master
roles coexist.

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader
  2020-07-04 23:18   ` Vladislav Shpilevoy
@ 2020-07-05 11:09     ` Serge Petrenko
  0 siblings, 0 replies; 8+ messages in thread
From: Serge Petrenko @ 2020-07-05 11:09 UTC (permalink / raw)
  To: Vladislav Shpilevoy, gorcunov, sergos; +Cc: tarantool-patches


05.07.2020 02:18, Vladislav Shpilevoy пишет:
> Here is also a general problem - having this as box.cfg option
> means, that the selected leader should stay selected regardless
> of what happens in the cluster. In particular, it should reject
> any attempts to add an entry into the limbo, not originated from
> this instance.
>
> Currently this is not guaranteed, see comment below.


Thanks for the answer!

>
>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index ca24b98ca..087710383 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -78,6 +78,7 @@
>>   #include "sequence.h"
>>   #include "sql_stmt_cache.h"
>>   #include "msgpack.h"
>> +#include "trivia/util.h"
>>   
>>   static char status[64] = "unknown";
>>   
>> @@ -945,6 +946,84 @@ box_set_replication_anon(void)
>>   
>>   }
>>   
>> +void
>> +box_set_replication_synchro_leader(void)
>> +{
>> +	bool is_leader = cfg_geti("replication_synchro_leader");
>> +	/*
>> +	 * For now no actions required when an instance stops
>> +	 * being a leader. We should probably wait until txn_limbo
>> +	 * becomes empty.
>> +	 */
>> +	if (!is_leader)
>> +		return;
>> +	uint32_t former_leader_id = txn_limbo.instance_id;
>> +	if (former_leader_id == REPLICA_ID_NIL ||
>> +	    former_leader_id == instance_id) {
> When limbo is empty, it will change its instance id to whatever
> entry will be added next. So it can happen, that I gave replication_synchro_leader
> to 2 instances, and if they will create transactions one at a
> time, this will work. But looks wrong.
Good catch.
>
> Perhaps it would be better to add a box.ctl function to do this
> 'limbo cleanup'? Without persisting any leader role in a config.
> Until we have a better understanding how leader-read_only-master
> roles coexist.
Agree.

I updated the patch according to your comments.

I'm posting it here.

Subject: [PATCH] box.ctl: introduce clear_synchro_queue function

Introduce a new function to box.ctl API: box.ctl.clear_synchro_queue()
The function performs some actions to make sure that after it's
executed, the txn_limbo is free of any transactions issued on a remote
instance.
In order to achieve this goal, the instance first waits for 2
replication_synchro_timeouts so that confirmations and rollbacks from
the remote instance reach it.

If the limbo remains non-empty, the instance starts figuring out which
transactions should be confirmed and which should be rolled back. In
order to do so the instance scans through vclocks of all the instances
that replicate from it and defines which old leader's lsn is the last
reached by replication_synchro_quorum of replicas.

Then the instance writes appropriate CONFIRM and ROLLBACK entries.
After these actions the limbo must be empty.

Closes #4849
---
  src/box/box.cc      | 50 +++++++++++++++++++++++++++++++++++++++++++++
  src/box/box.h       |  2 ++
  src/box/lua/ctl.c   |  9 ++++++++
  src/box/txn_limbo.c | 26 +++++++++++++++++++++++
  src/box/txn_limbo.h | 10 +++++++++
  5 files changed, 97 insertions(+)

diff --git a/src/box/box.cc b/src/box/box.cc
index ca24b98ca..749c96ca1 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -78,6 +78,7 @@
  #include "sequence.h"
  #include "sql_stmt_cache.h"
  #include "msgpack.h"
+#include "trivia/util.h"

  static char status[64] = "unknown";

@@ -945,6 +946,55 @@ box_set_replication_anon(void)

  }

+void
+box_clear_synchro_queue(void)
+{
+    if (!is_box_configured || txn_limbo_is_empty(&txn_limbo))
+        return;
+    uint32_t former_leader_id = txn_limbo.instance_id;
+    assert(former_leader_id != REPLICA_ID_NIL);
+    if (former_leader_id == instance_id)
+        return;
+
+    /* Wait until pending confirmations/rollbacks reach us. */
+    double timeout = 2 * txn_limbo_confirm_timeout(&txn_limbo);
+    double start_tm = fiber_time();
+    while (!txn_limbo_is_empty(&txn_limbo)) {
+        if (fiber_time() - start_tm > timeout)
+            break;
+        fiber_sleep(0.001);
+    }
+
+    if (!txn_limbo_is_empty(&txn_limbo)) {
+        int64_t lsns[VCLOCK_MAX];
+        int len = 0;
+        const struct vclock  *vclock;
+        replicaset_foreach(replica) {
+            if (replica->relay != NULL &&
+                relay_get_state(replica->relay) != RELAY_OFF &&
+                !replica->anon) {
+                assert(!tt_uuid_is_equal(&INSTANCE_UUID,
+                             &replica->uuid));
+                vclock = relay_vclock(replica->relay);
+                int64_t lsn = vclock_get(vclock,
+                             former_leader_id);
+                lsns[len++] = lsn;
+            }
+        }
+        lsns[len++] = vclock_get(box_vclock, former_leader_id);
+        assert(len < VCLOCK_MAX);
+
+        int64_t confirm_lsn = 0;
+        if (len >= replication_synchro_quorum) {
+            qsort(lsns, len, sizeof(int64_t), cmp_i64);
+            confirm_lsn = lsns[len - replication_synchro_quorum];
+        }
+
+        txn_limbo_force_empty(&txn_limbo, confirm_lsn);
+        assert(txn_limbo_is_empty(&txn_limbo));
+    }
+}
+
  void
  box_listen(void)
  {
diff --git a/src/box/box.h b/src/box/box.h
index f9789154e..5c4a5ed78 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -258,6 +258,8 @@ extern "C" {

  typedef struct tuple box_tuple_t;

+void box_clear_synchro_queue(void);
+
  /* box_select is private and used only by FFI */
  API_EXPORT int
  box_select(uint32_t space_id, uint32_t index_id,
diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c
index 85ed30c50..2017ddc18 100644
--- a/src/box/lua/ctl.c
+++ b/src/box/lua/ctl.c
@@ -78,11 +78,20 @@ lbox_ctl_on_schema_init(struct lua_State *L)
      return lbox_trigger_reset(L, 2, &on_schema_init, NULL, NULL);
  }

+static int
+lbox_ctl_clear_synchro_queue(struct lua_State *L)
+{
+    (void) L;
+    box_clear_synchro_queue();
+    return 0;
+}
+
  static const struct luaL_Reg lbox_ctl_lib[] = {
      {"wait_ro", lbox_ctl_wait_ro},
      {"wait_rw", lbox_ctl_wait_rw},
      {"on_shutdown", lbox_ctl_on_shutdown},
      {"on_schema_init", lbox_ctl_on_schema_init},
+    {"clear_synchro_queue", lbox_ctl_clear_synchro_queue},
      {NULL, NULL}
  };

diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index 44a0c7273..9603d3eb3 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -482,6 +482,32 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
      return 0;
  }

+void
+txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn)
+{
+    struct txn_limbo_entry *e, *last_quorum = NULL;
+    struct txn_limbo_entry *rollback = NULL;
+    rlist_foreach_entry(e, &limbo->queue, in_queue) {
+        if (txn_has_flag(e->txn, TXN_WAIT_ACK)) {
+            if (e->lsn <= confirm_lsn) {
+                last_quorum = e;
+            } else {
+                rollback = e;
+                break;
+            }
+        }
+    }
+
+    if (last_quorum != NULL) {
+        txn_limbo_write_confirm(limbo, last_quorum->lsn);
+        txn_limbo_read_confirm(limbo, last_quorum->lsn);
+    }
+    if (rollback != NULL) {
+        txn_limbo_write_rollback(limbo, rollback->lsn);
+        txn_limbo_read_rollback(limbo, rollback->lsn - 1);
+    }
+}
+
  void
  txn_limbo_init(void)
  {
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 3abbe9e85..1c945f21f 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -237,6 +237,16 @@ txn_limbo_confirm_timeout(struct txn_limbo *limbo);
  int
  txn_limbo_wait_confirm(struct txn_limbo *limbo);

+/**
+ * Make txn_limbo confirm all the entries with lsn less than or
+ * equal to the given one, and rollback all the following entries.
+ * The function makes txn_limbo write CONFIRM and ROLLBACK
+ * messages for appropriate lsns, and then process the messages
+ * immediately.
+ */
+void
+txn_limbo_force_empty(struct txn_limbo *limbo, int64_t last_confirm);
+
  void
  txn_limbo_init();

-- 
2.24.3 (Apple Git-128)

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader
  2020-07-04 21:55 ` [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader Serge Petrenko
  2020-07-04 23:03   ` Vladislav Shpilevoy
  2020-07-04 23:18   ` Vladislav Shpilevoy
@ 2020-07-09 22:03   ` Leonid Vasiliev
  2020-07-20 21:13     ` Vladislav Shpilevoy
  2 siblings, 1 reply; 8+ messages in thread
From: Leonid Vasiliev @ 2020-07-09 22:03 UTC (permalink / raw)
  To: Serge Petrenko, v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches

Hi! Thank you for the patch.
Sergey is in the army now, so my comment is addressed to Vlad.

On 05.07.2020 00:55, Serge Petrenko wrote:
> Introduce replication_synchro_leader option to box.cfg.
> 
> Once an instance is promoted to leader, it makes sure that txn_limbo is
> free of previous leader's transactions. In order to achieve this goal,
> the instance first waits for 2 replication_synchro_timeouts  so that
> confirmations and rollbacks from the former leader reach it.
> 
> If the limbo remains non-empty, the new leader starts figuring out which
> transactions should be confirmed and which should be rolled back. In
> order to do so the instance scans through vclocks of all the instances
> that replicate from it and defines which former leader's lsn is the last
> reached by replication_synchro_quorum of replicas.
> 
> Then the instance writes appropriate CONFIRM and ROLLBACK entries.
> After these actions the limbo must be empty, and the instance may
> proceed with appending its own entries to the limbo.
> 
> Closes #4849
> ---
>   src/box/box.cc           | 79 ++++++++++++++++++++++++++++++++++++++++
>   src/box/box.h            |  1 +
>   src/box/lua/cfg.cc       |  9 +++++
>   src/box/lua/load_cfg.lua |  4 ++
>   src/box/txn_limbo.c      | 16 +-------
>   src/box/txn_limbo.h      | 15 ++++++++
>   6 files changed, 110 insertions(+), 14 deletions(-)
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index ca24b98ca..087710383 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -78,6 +78,7 @@
>   #include "sequence.h"
>   #include "sql_stmt_cache.h"
>   #include "msgpack.h"
> +#include "trivia/util.h"
>   
>   static char status[64] = "unknown";
>   
> @@ -945,6 +946,84 @@ box_set_replication_anon(void)
>   
>   }
>   
> +void
> +box_set_replication_synchro_leader(void)
> +{
> +	bool is_leader = cfg_geti("replication_synchro_leader");
> +	/*
> +	 * For now no actions required when an instance stops
> +	 * being a leader. We should probably wait until txn_limbo
> +	 * becomes empty.
> +	 */
> +	if (!is_leader)
> +		return;
> +	uint32_t former_leader_id = txn_limbo.instance_id;
> +	if (former_leader_id == REPLICA_ID_NIL ||
> +	    former_leader_id == instance_id) {
> +		return;
> +	}
> +
> +	/* Wait until pending confirmations/rollbacks reach us. */
> +	double timeout = 2 * txn_limbo_confirm_timeout(&txn_limbo);
> +	double start_tm = fiber_time();
> +	while (!txn_limbo_is_empty(&txn_limbo)) {
> +		if (fiber_time() - start_tm > timeout)

 From the d988d7fb92fe1dda4b64218fb06813e93eb56ed1 commit comment:
"
...use fiber_clock() instead of fiber_time() for timeouts

fiber_time() reports real time, which shouldn't be used for calculating
timeouts as it is affected by system time changes. Add fiber_clock()
based on ev_monotonic_now(), export it to Lua, and use it instead.
".

> +			break;
> +		fiber_sleep(0.001);
> +	}
> +
> +	if (!txn_limbo_is_empty(&txn_limbo)) {
> +		int64_t lsns[VCLOCK_MAX];
> +		int len = 0;
> +		const struct vclock  *vclock;
> +		replicaset_foreach(replica) {
> +			if (replica->relay != NULL &&
> +			    relay_get_state(replica->relay) != RELAY_OFF &&
> +			    !replica->anon) {
> +				assert(!tt_uuid_is_equal(&INSTANCE_UUID,
> +							 &replica->uuid));
> +				vclock = relay_vclock(replica->relay);
> +				int64_t lsn = vclock_get(vclock,
> +							 former_leader_id);
> +				lsns[len++] = lsn;
> +			}
> +		}
> +		lsns[len++] = vclock_get(box_vclock, former_leader_id);
> +		assert(len < VCLOCK_MAX);
> +
> +		int64_t confirm_lsn = 0;
> +		if (len >= replication_synchro_quorum) {
> +			qsort(lsns, len, sizeof(int64_t), cmp_i64);
> +			confirm_lsn = lsns[len - replication_synchro_quorum];
> +		}
> +
> +		struct txn_limbo_entry *e, *last_quorum = NULL;
> +		struct txn_limbo_entry *rollback = NULL;
> +		rlist_foreach_entry(e, &txn_limbo.queue, in_queue) {
> +			if (txn_has_flag(e->txn, TXN_WAIT_ACK)) {
> +				if (e->lsn <= confirm_lsn) {
> +					last_quorum = e;
> +				} else {
> +					rollback = e;
> +					break;
> +				}
> +			}
> +		}
> +
> +		if (last_quorum != NULL) {
> +			confirm_lsn = last_quorum->lsn;
> +			txn_limbo_write_confirm(&txn_limbo, confirm_lsn);
> +			txn_limbo_read_confirm(&txn_limbo, confirm_lsn);
> +		}
> +		if (rollback != NULL) {
> +			txn_limbo_write_rollback(&txn_limbo, rollback->lsn);
> +			txn_limbo_read_rollback(&txn_limbo, rollback->lsn - 1);
> +		}
> +
> +		assert(txn_limbo_is_empty(&txn_limbo));
> +	}
> +}
> +
>   void
>   box_listen(void)
>   {
> diff --git a/src/box/box.h b/src/box/box.h
> index f9789154e..565b0ebce 100644
> --- a/src/box/box.h
> +++ b/src/box/box.h
> @@ -245,6 +245,7 @@ void box_set_replication_connect_quorum(void);
>   void box_set_replication_sync_lag(void);
>   int box_set_replication_synchro_quorum(void);
>   int box_set_replication_synchro_timeout(void);
> +void box_set_replication_synchro_leader(void);
>   void box_set_replication_sync_timeout(void);
>   void box_set_replication_skip_conflict(void);
>   void box_set_replication_anon(void);
> diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
> index d481155cd..adc1fcf3f 100644
> --- a/src/box/lua/cfg.cc
> +++ b/src/box/lua/cfg.cc
> @@ -329,6 +329,14 @@ lbox_cfg_set_replication_synchro_timeout(struct lua_State *L)
>   	return 0;
>   }
>   
> +static int
> +lbox_cfg_set_replication_synchro_leader(struct lua_State *L)
> +{
> +	(void) L;
> +	box_set_replication_synchro_leader();
> +	return 0;
> +}
> +
>   static int
>   lbox_cfg_set_replication_sync_timeout(struct lua_State *L)
>   {
> @@ -388,6 +396,7 @@ box_lua_cfg_init(struct lua_State *L)
>   		{"cfg_set_replication_sync_lag", lbox_cfg_set_replication_sync_lag},
>   		{"cfg_set_replication_synchro_quorum", lbox_cfg_set_replication_synchro_quorum},
>   		{"cfg_set_replication_synchro_timeout", lbox_cfg_set_replication_synchro_timeout},
> +		{"cfg_set_replication_synchro_leader", lbox_cfg_set_replication_synchro_leader},
>   		{"cfg_set_replication_sync_timeout", lbox_cfg_set_replication_sync_timeout},
>   		{"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict},
>   		{"cfg_set_replication_anon", lbox_cfg_set_replication_anon},
> diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
> index 107bc1582..9a968f30e 100644
> --- a/src/box/lua/load_cfg.lua
> +++ b/src/box/lua/load_cfg.lua
> @@ -91,6 +91,7 @@ local default_cfg = {
>       replication_sync_timeout = 300,
>       replication_synchro_quorum = 1,
>       replication_synchro_timeout = 5,
> +    replication_synchro_leader = false,
>       replication_connect_timeout = 30,
>       replication_connect_quorum = nil, -- connect all
>       replication_skip_conflict = false,
> @@ -168,6 +169,7 @@ local template_cfg = {
>       replication_sync_timeout = 'number',
>       replication_synchro_quorum = 'number',
>       replication_synchro_timeout = 'number',
> +    replication_synchro_leader = 'boolean',
>       replication_connect_timeout = 'number',
>       replication_connect_quorum = 'number',
>       replication_skip_conflict = 'boolean',
> @@ -286,6 +288,7 @@ local dynamic_cfg = {
>       replication_sync_timeout = private.cfg_set_replication_sync_timeout,
>       replication_synchro_quorum = private.cfg_set_replication_synchro_quorum,
>       replication_synchro_timeout = private.cfg_set_replication_synchro_timeout,
> +    replication_synchro_leader = private.cfg_set_replication_synchro_leader,
>       replication_skip_conflict = private.cfg_set_replication_skip_conflict,
>       replication_anon        = private.cfg_set_replication_anon,
>       instance_uuid           = check_instance_uuid,
> @@ -333,6 +336,7 @@ local dynamic_cfg_order = {
>       -- the new one. This should be fixed when box.cfg is able to
>       -- apply some parameters together and atomically.
>       replication_anon        = 250,
> +    replication_synchro__leader = 250,
>   }
>   
>   local function sort_cfg_cb(l, r)
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index 44a0c7273..992115ad1 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -148,9 +148,6 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>   	return entry->is_commit;
>   }
>   
> -static int
> -txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn);
> -
>   int
>   txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>   {
> @@ -261,11 +258,7 @@ rollback:
>   	return -1;
>   }
>   
> -/**
> - * Write a confirmation entry to WAL. After it's written all the
> - * transactions waiting for confirmation may be finished.
> - */
> -static int
> +int
>   txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
>   {
>   	return txn_limbo_write_confirm_rollback(limbo, lsn, true);
> @@ -303,12 +296,7 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
>   	}
>   }
>   
> -/**
> - * Write a rollback message to WAL. After it's written
> - * all the transactions following the current one and waiting
> - * for confirmation must be rolled back.
> - */
> -static int
> +int
>   txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
>   {
>   	return txn_limbo_write_confirm_rollback(limbo, lsn, false);
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index 3abbe9e85..5bf5827ac 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -205,6 +205,21 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
>   int
>   txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
>   
> +/**
> + * Write a confirmation entry to WAL. After it's written all the
> + * transactions waiting for confirmation may be finished.
> + */
> +int
> +txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn);
> +
> +/**
> + * Write a rollback message to WAL. After it's written
> + * all the transactions following the current one and waiting
> + * for confirmation must be rolled back.
> + */
> +int
> +txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn);
> +
>   /**
>    * Confirm all the entries up to the given master's LSN.
>    */
> 

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader
  2020-07-09 22:03   ` Leonid Vasiliev
@ 2020-07-20 21:13     ` Vladislav Shpilevoy
  0 siblings, 0 replies; 8+ messages in thread
From: Vladislav Shpilevoy @ 2020-07-20 21:13 UTC (permalink / raw)
  To: Leonid Vasiliev, Serge Petrenko, gorcunov, sergos; +Cc: tarantool-patches

Hi! Thanks for the review!

>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index ca24b98ca..087710383 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -78,6 +78,7 @@
>>   #include "sequence.h"
>>   #include "sql_stmt_cache.h"
>>   #include "msgpack.h"
>> +#include "trivia/util.h"
>>     static char status[64] = "unknown";
>>   @@ -945,6 +946,84 @@ box_set_replication_anon(void)
>>     }
>>   +void
>> +box_set_replication_synchro_leader(void)
>> +{
>> +    bool is_leader = cfg_geti("replication_synchro_leader");
>> +    /*
>> +     * For now no actions required when an instance stops
>> +     * being a leader. We should probably wait until txn_limbo
>> +     * becomes empty.
>> +     */
>> +    if (!is_leader)
>> +        return;
>> +    uint32_t former_leader_id = txn_limbo.instance_id;
>> +    if (former_leader_id == REPLICA_ID_NIL ||
>> +        former_leader_id == instance_id) {
>> +        return;
>> +    }
>> +
>> +    /* Wait until pending confirmations/rollbacks reach us. */
>> +    double timeout = 2 * txn_limbo_confirm_timeout(&txn_limbo);
>> +    double start_tm = fiber_time();
>> +    while (!txn_limbo_is_empty(&txn_limbo)) {
>> +        if (fiber_time() - start_tm > timeout)
> 
> From the d988d7fb92fe1dda4b64218fb06813e93eb56ed1 commit comment:
> "
> ...use fiber_clock() instead of fiber_time() for timeouts
> 
> fiber_time() reports real time, which shouldn't be used for calculating
> timeouts as it is affected by system time changes. Add fiber_clock()
> based on ev_monotonic_now(), export it to Lua, and use it instead.
> ".

Indeed. I've sent a patch fixing it.

^ permalink raw reply	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2020-07-20 21:13 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-07-04 21:55 [Tarantool-patches] [PATCH 0/2] introduce actions on leader election Serge Petrenko
2020-07-04 21:55 ` [Tarantool-patches] [PATCH 1/2] util: move cmp_i64 from xlog.c to util.h Serge Petrenko
2020-07-04 21:55 ` [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader Serge Petrenko
2020-07-04 23:03   ` Vladislav Shpilevoy
2020-07-04 23:18   ` Vladislav Shpilevoy
2020-07-05 11:09     ` Serge Petrenko
2020-07-09 22:03   ` Leonid Vasiliev
2020-07-20 21:13     ` Vladislav Shpilevoy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox