[Tarantool-patches] [PATCH 4/5] [tosquash] replication: rework how local transactions wait sync

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Jul 3 02:40:29 MSK 2020


There was a bug about how async transactions were blocked by a
not empty limbo. This was about fully local transactions. The
problem is that they don't have LSN in the needed vclock
component - it is always 0. It means, that their limbo entry
can't get a valid LSN by any means. Even a copy of the previous
sync transaction's LSN won't work, because the latter may by
still not written to WAL.

This patch makes async transactions always have lsn -1 in their
limbo entry. Because anyway it does not matter. It is not needed
to collect ACKs, nor to propagate limbo's vclock.

Now even a fully local transaction can be blocked by a pending
sync transaction.

Note, this does not cover the case, when the transaction is not
fully local, and its last row is local.
---
 src/box/txn.c                         |  18 ++--
 src/box/txn_limbo.c                   |  60 +++++++------
 test/replication/qsync_basic.result   | 120 ++++++++++++++++++++++++++
 test/replication/qsync_basic.test.lua |  45 ++++++++++
 4 files changed, 208 insertions(+), 35 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index 6c333cbed..fe0591197 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -688,15 +688,15 @@ txn_commit_async(struct txn *txn)
 
 		/* See txn_commit(). */
 		uint32_t origin_id = req->rows[0]->replica_id;
-		int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn;
 		limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
 		if (limbo_entry == NULL) {
 			txn_rollback(txn);
 			return -1;
 		}
-		assert(lsn > 0);
-		txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
-
+		if (txn_has_flag(txn, TXN_WAIT_ACK)) {
+			int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn;
+			txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
+		}
 		/*
 		 * Set a trigger to abort waiting for confirm on
 		 * WAL write failure.
@@ -779,10 +779,12 @@ txn_commit(struct txn *txn)
 		return -1;
 	}
 	if (is_sync) {
-		int64_t lsn = req->rows[req->n_rows - 1]->lsn;
-		txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
-		/* Local WAL write is a first 'ACK'. */
-		txn_limbo_ack(&txn_limbo, txn_limbo.instance_id, lsn);
+		if (txn_has_flag(txn, TXN_WAIT_ACK)) {
+			int64_t lsn = req->rows[req->n_rows - 1]->lsn;
+			txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
+			/* Local WAL write is a first 'ACK'. */
+			txn_limbo_ack(&txn_limbo, txn_limbo.instance_id, lsn);
+		}
 		if (txn_limbo_wait_complete(&txn_limbo, limbo_entry) < 0) {
 			txn_free(txn);
 			return -1;
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index 387cfd337..44a0c7273 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -120,6 +120,7 @@ txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry,
 	assert(limbo->instance_id != REPLICA_ID_NIL);
 	assert(entry->lsn == -1);
 	assert(lsn > 0);
+	assert(txn_has_flag(entry->txn, TXN_WAIT_ACK));
 	(void) limbo;
 	entry->lsn = lsn;
 }
@@ -129,6 +130,12 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 {
 	if (txn_limbo_entry_is_complete(entry))
 		return true;
+	/*
+	 * Async transaction can't complete itself. It is always
+	 * completed by a previous sync transaction.
+	 */
+	if (!txn_has_flag(entry->txn, TXN_WAIT_ACK))
+		return false;
 	struct vclock_iterator iter;
 	vclock_iterator_init(&iter, &limbo->vclock);
 	int ack_count = 0;
@@ -142,14 +149,13 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 }
 
 static int
-txn_limbo_write_rollback(struct txn_limbo *limbo,
-			 struct txn_limbo_entry *entry);
+txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn);
 
 int
 txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 {
 	struct txn *txn = entry->txn;
-	assert(entry->lsn > 0);
+	assert(entry->lsn > 0 || !txn_has_flag(entry->txn, TXN_WAIT_ACK));
 	assert(!txn_has_flag(txn, TXN_IS_DONE));
 	assert(txn_has_flag(txn, TXN_WAIT_SYNC));
 	if (txn_limbo_check_complete(limbo, entry))
@@ -176,7 +182,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 			goto complete;
 		}
 
-		txn_limbo_write_rollback(limbo, entry);
+		txn_limbo_write_rollback(limbo, entry->lsn);
 		struct txn_limbo_entry *e, *tmp;
 		rlist_foreach_entry_safe_reverse(e, &limbo->queue,
 						 in_queue, tmp) {
@@ -210,10 +216,11 @@ complete:
 }
 
 static int
-txn_limbo_write_confirm_rollback(struct txn_limbo *limbo,
-				 struct txn_limbo_entry *entry,
+txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
 				 bool is_confirm)
 {
+	assert(lsn > 0);
+
 	struct xrow_header row;
 	struct request request = {
 		.header = &row,
@@ -221,14 +228,13 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo,
 
 	int res = 0;
 	if (is_confirm) {
-		res = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn);
+		res = xrow_encode_confirm(&row, limbo->instance_id, lsn);
 	} else {
 		/*
 		 * This entry is the first to be rolled back, so
-		 * the last "safe" lsn is entry->lsn - 1.
+		 * the last "safe" lsn is lsn - 1.
 		 */
-		res = xrow_encode_rollback(&row, limbo->instance_id,
-					   entry->lsn - 1);
+		res = xrow_encode_rollback(&row, limbo->instance_id, lsn - 1);
 	}
 	if (res == -1)
 		return -1;
@@ -260,10 +266,9 @@ rollback:
  * transactions waiting for confirmation may be finished.
  */
 static int
-txn_limbo_write_confirm(struct txn_limbo *limbo,
-			struct txn_limbo_entry *entry)
+txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
 {
-	return txn_limbo_write_confirm_rollback(limbo, entry, true);
+	return txn_limbo_write_confirm_rollback(limbo, lsn, true);
 }
 
 void
@@ -300,14 +305,13 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
 
 /**
  * Write a rollback message to WAL. After it's written
- * all the tarnsactions following the current one and waiting
+ * all the transactions following the current one and waiting
  * for confirmation must be rolled back.
  */
 static int
-txn_limbo_write_rollback(struct txn_limbo *limbo,
-			 struct txn_limbo_entry *entry)
+txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
 {
-	return txn_limbo_write_confirm_rollback(limbo, entry, false);
+	return txn_limbo_write_confirm_rollback(limbo, lsn, false);
 }
 
 void
@@ -316,7 +320,7 @@ txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
 	assert(limbo->instance_id != REPLICA_ID_NIL);
 	struct txn_limbo_entry *e, *tmp;
 	rlist_foreach_entry_safe_reverse(e, &limbo->queue, in_queue, tmp) {
-		if (e->lsn <= lsn)
+		if (e->lsn <= lsn && txn_has_flag(e->txn, TXN_WAIT_ACK))
 			break;
 		e->is_rollback = true;
 		txn_limbo_pop(limbo, e);
@@ -350,31 +354,33 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 	int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
 	vclock_follow(&limbo->vclock, replica_id, lsn);
 	struct txn_limbo_entry *e, *last_quorum = NULL;
+	int64_t confirm_lsn = -1;
 	rlist_foreach_entry(e, &limbo->queue, in_queue) {
+		assert(e->ack_count <= VCLOCK_MAX);
 		if (e->lsn > lsn)
 			break;
-		if (e->lsn <= prev_lsn)
-			continue;
-		assert(e->ack_count <= VCLOCK_MAX);
 		/*
 		 * Sync transactions need to collect acks. Async
 		 * transactions are automatically committed right
 		 * after all the previous sync transactions are.
 		 */
-		if (txn_has_flag(e->txn, TXN_WAIT_ACK)) {
-			if (++e->ack_count < replication_synchro_quorum)
-				continue;
-		} else {
-			assert(txn_has_flag(e->txn, TXN_WAIT_SYNC));
+		if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) {
+			assert(e->lsn == -1);
 			if (last_quorum == NULL)
 				continue;
+		} else if (e->lsn <= prev_lsn) {
+			continue;
+		} else if (++e->ack_count < replication_synchro_quorum) {
+			continue;
+		} else {
+			confirm_lsn = e->lsn;
 		}
 		e->is_commit = true;
 		last_quorum = e;
 	}
 	if (last_quorum == NULL)
 		return;
-	if (txn_limbo_write_confirm(limbo, last_quorum) != 0) {
+	if (txn_limbo_write_confirm(limbo, confirm_lsn) != 0) {
 		// TODO: what to do here?.
 		// We already failed writing the CONFIRM
 		// message. What are the chances we'll be
diff --git a/test/replication/qsync_basic.result b/test/replication/qsync_basic.result
index 32deb2ac3..339fc0e33 100644
--- a/test/replication/qsync_basic.result
+++ b/test/replication/qsync_basic.result
@@ -299,6 +299,126 @@ box.space.sync:select{6}
  | - []
  | ...
 
+--
+-- Fully local async transaction also waits for existing sync txn.
+--
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+box.cfg{replication_synchro_timeout = 1000, replication_synchro_quorum = 2}
+ | ---
+ | ...
+_ = box.schema.create_space('locallocal', {is_local = true})
+ | ---
+ | ...
+_ = _:create_index('pk')
+ | ---
+ | ...
+-- Propagate local vclock to some insane value to ensure it won't
+-- affect anything.
+box.begin() for i = 1, 2000 do box.space.locallocal:replace{1} end box.commit()
+ | ---
+ | ...
+do                                                                              \
+    f1 = fiber.create(box.space.sync.replace, box.space.sync, {8})              \
+    f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {8})  \
+    box.space.test:replace{8}                                                   \
+end
+ | ---
+ | ...
+f1:status()
+ | ---
+ | - dead
+ | ...
+f2:status()
+ | ---
+ | - dead
+ | ...
+box.space.sync:select{8}
+ | ---
+ | - - [8]
+ | ...
+box.space.locallocal:select{8}
+ | ---
+ | - - [8]
+ | ...
+box.space.test:select{8}
+ | ---
+ | - - [8]
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+box.space.sync:select{8}
+ | ---
+ | - - [8]
+ | ...
+box.space.locallocal:select{8}
+ | ---
+ | - []
+ | ...
+box.space.test:select{8}
+ | ---
+ | - - [8]
+ | ...
+
+-- Ensure sync rollback will affect all pending fully local async
+-- transactions too.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3}
+ | ---
+ | ...
+do                                                                              \
+    f1 = fiber.create(box.space.sync.replace, box.space.sync, {9})              \
+    f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {9})  \
+    box.space.test:replace{9}                                                   \
+end
+ | ---
+ | - error: A rollback for a synchronous transaction is received
+ | ...
+f1:status()
+ | ---
+ | - dead
+ | ...
+f2:status()
+ | ---
+ | - dead
+ | ...
+box.space.sync:select{9}
+ | ---
+ | - []
+ | ...
+box.space.locallocal:select{9}
+ | ---
+ | - []
+ | ...
+box.space.test:select{9}
+ | ---
+ | - []
+ | ...
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+box.space.sync:select{9}
+ | ---
+ | - []
+ | ...
+box.space.locallocal:select{9}
+ | ---
+ | - []
+ | ...
+box.space.test:select{9}
+ | ---
+ | - []
+ | ...
+
 --
 -- gh-5123: quorum 1 still should write CONFIRM.
 --
diff --git a/test/replication/qsync_basic.test.lua b/test/replication/qsync_basic.test.lua
index 361f22bc3..6e40131bf 100644
--- a/test/replication/qsync_basic.test.lua
+++ b/test/replication/qsync_basic.test.lua
@@ -118,6 +118,51 @@ test_run:switch('replica')
 box.space.test:select{6}
 box.space.sync:select{6}
 
+--
+-- Fully local async transaction also waits for existing sync txn.
+--
+test_run:switch('default')
+box.cfg{replication_synchro_timeout = 1000, replication_synchro_quorum = 2}
+_ = box.schema.create_space('locallocal', {is_local = true})
+_ = _:create_index('pk')
+-- Propagate local vclock to some insane value to ensure it won't
+-- affect anything.
+box.begin() for i = 1, 2000 do box.space.locallocal:replace{1} end box.commit()
+do                                                                              \
+    f1 = fiber.create(box.space.sync.replace, box.space.sync, {8})              \
+    f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {8})  \
+    box.space.test:replace{8}                                                   \
+end
+f1:status()
+f2:status()
+box.space.sync:select{8}
+box.space.locallocal:select{8}
+box.space.test:select{8}
+
+test_run:switch('replica')
+box.space.sync:select{8}
+box.space.locallocal:select{8}
+box.space.test:select{8}
+
+-- Ensure sync rollback will affect all pending fully local async
+-- transactions too.
+test_run:switch('default')
+box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3}
+do                                                                              \
+    f1 = fiber.create(box.space.sync.replace, box.space.sync, {9})              \
+    f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {9})  \
+    box.space.test:replace{9}                                                   \
+end
+f1:status()
+f2:status()
+box.space.sync:select{9}
+box.space.locallocal:select{9}
+box.space.test:select{9}
+test_run:switch('replica')
+box.space.sync:select{9}
+box.space.locallocal:select{9}
+box.space.test:select{9}
+
 --
 -- gh-5123: quorum 1 still should write CONFIRM.
 --
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list