[Tarantool-patches] [PATCH v2 19/19] replication: block async transactions when not empty limbo

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Jun 30 02:15:20 MSK 2020


When there is a not committed synchronous transaction, any attempt
to commit a next transaction should be suspended, even if it is an
async transaction.

This restriction comes from the theoretically possible dependency
of what is written in the async transactions on what was written
in the previous sync transactions.

For that there is a new txn flag - TXN_WAIT_SYNC. Previously the
only synchro replication flag was TXN_WAIT_ACK. And now a
transaction can be sync, but not wait for ACKs.

In particular, if a transaction:

- Is synchronous, the it has TXN_WAIT_SYNC (it is sync), and
  TXN_WAIT_ACK (need to collect ACKs, or get a CONFIRM);

- Is asynchronous, and the limbo was empty and the moment of
  commit, the it does not have any of these flags and committed
  like earlier;

- Is asynchronous, and the limbo was not empty and the moment of
  commit. Then it will have only TXN_WAIT_SYNC. So it will be
  finished right after all the previous sync transactions are
  done. Note: *without waiting for ACKs* - the transaction is
  still asynchronous in a sense that it is don't need to wait for
  quorum replication.

Follow-up #4845
---
 src/box/applier.cc                            |  8 ++
 src/box/txn.c                                 | 16 ++--
 src/box/txn.h                                 |  7 ++
 src/box/txn_limbo.c                           | 49 +++++++++---
 .../sync_replication_sanity.result            | 75 +++++++++++++++++++
 .../sync_replication_sanity.test.lua          | 26 +++++++
 test/unit/snap_quorum_delay.cc                |  6 +-
 7 files changed, 172 insertions(+), 15 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 7e63dc544..7e70211b7 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -280,6 +280,14 @@ process_confirm_rollback(struct request *request, bool is_confirm)
 			 txn_limbo.instance_id);
 		return -1;
 	}
+	assert(txn->n_applier_rows == 0);
+	/*
+	 * This is not really a transaction. It just uses txn API
+	 * to put the data into WAL. And obviously it should not
+	 * go to the limbo and block on the very same sync
+	 * transaction which it tries to confirm now.
+	 */
+	txn_set_flag(txn, TXN_FORCE_ASYNC);
 
 	if (txn_begin_stmt(txn, NULL) != 0)
 		return -1;
diff --git a/src/box/txn.c b/src/box/txn.c
index 37955752a..bc2bb8e11 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -442,7 +442,7 @@ txn_complete(struct txn *txn)
 			engine_rollback(txn->engine, txn);
 		if (txn_has_flag(txn, TXN_HAS_TRIGGERS))
 			txn_run_rollback_triggers(txn, &txn->on_rollback);
-	} else if (!txn_has_flag(txn, TXN_WAIT_ACK)) {
+	} else if (!txn_has_flag(txn, TXN_WAIT_SYNC)) {
 		/* Commit the transaction. */
 		if (txn->engine != NULL)
 			engine_commit(txn->engine, txn);
@@ -552,8 +552,14 @@ txn_journal_entry_new(struct txn *txn)
 	 * space can't be synchronous. So if there is at least one
 	 * synchronous space, the transaction is not local.
 	 */
-	if (is_sync && !txn_has_flag(txn, TXN_FORCE_ASYNC))
-		txn_set_flag(txn, TXN_WAIT_ACK);
+	if (!txn_has_flag(txn, TXN_FORCE_ASYNC)) {
+		if (is_sync) {
+			txn_set_flag(txn, TXN_WAIT_SYNC);
+			txn_set_flag(txn, TXN_WAIT_ACK);
+		} else if (!txn_limbo_is_empty(&txn_limbo)) {
+			txn_set_flag(txn, TXN_WAIT_SYNC);
+		}
+	}
 
 	assert(remote_row == req->rows + txn->n_applier_rows);
 	assert(local_row == remote_row + txn->n_new_rows);
@@ -662,7 +668,7 @@ txn_commit_async(struct txn *txn)
 		return -1;
 	}
 
-	bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
+	bool is_sync = txn_has_flag(txn, TXN_WAIT_SYNC);
 	struct txn_limbo_entry *limbo_entry;
 	if (is_sync) {
 		/*
@@ -737,7 +743,7 @@ txn_commit(struct txn *txn)
 		return -1;
 	}
 
-	bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
+	bool is_sync = txn_has_flag(txn, TXN_WAIT_SYNC);
 	if (is_sync) {
 		/*
 		 * Remote rows, if any, come before local rows, so
diff --git a/src/box/txn.h b/src/box/txn.h
index c631d7033..c484fcb56 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -66,11 +66,18 @@ enum txn_flag {
 	TXN_CAN_YIELD,
 	/** on_commit and/or on_rollback list is not empty. */
 	TXN_HAS_TRIGGERS,
+	/**
+	 * A transaction is either synchronous itself and needs to
+	 * be synced with replicas, or it is async, but is blocked
+	 * by a not yet finished synchronous transaction.
+	 */
+	TXN_WAIT_SYNC,
 	/**
 	 * Transaction, touched sync spaces, enters 'waiting for
 	 * acks' state before commit. In this state it waits until
 	 * it is replicated onto a quorum of replicas, and only
 	 * then finishes commit and returns success to a user.
+	 * TXN_WAIT_SYNC is always set, if TXN_WAIT_ACK is set.
 	 */
 	TXN_WAIT_ACK,
 	/**
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index fbe4dcecf..bfb404e8e 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -47,7 +47,7 @@ txn_limbo_create(struct txn_limbo *limbo)
 struct txn_limbo_entry *
 txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn)
 {
-	assert(txn_has_flag(txn, TXN_WAIT_ACK));
+	assert(txn_has_flag(txn, TXN_WAIT_SYNC));
 	if (id == 0)
 		id = instance_id;
 	if (limbo->instance_id != id) {
@@ -143,7 +143,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 	struct txn *txn = entry->txn;
 	assert(entry->lsn > 0);
 	assert(!txn_has_flag(txn, TXN_IS_DONE));
-	assert(txn_has_flag(txn, TXN_WAIT_ACK));
+	assert(txn_has_flag(txn, TXN_WAIT_SYNC));
 	if (txn_limbo_check_complete(limbo, entry)) {
 		txn_limbo_remove(limbo, entry);
 		return 0;
@@ -160,6 +160,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 			e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT;
 			txn_limbo_pop(limbo, e);
 			txn_clear_flag(e->txn, TXN_WAIT_ACK);
+			txn_clear_flag(e->txn, TXN_WAIT_SYNC);
 			txn_complete(e->txn);
 			if (e == entry)
 				break;
@@ -179,6 +180,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 	}
 	txn_limbo_remove(limbo, entry);
 	txn_clear_flag(txn, TXN_WAIT_ACK);
+	txn_clear_flag(txn, TXN_WAIT_SYNC);
 	return 0;
 }
 
@@ -209,6 +211,13 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo,
 	struct txn *txn = txn_begin();
 	if (txn == NULL)
 		return -1;
+	/*
+	 * This is not really a transaction. It just uses txn API
+	 * to put the data into WAL. And obviously it should not
+	 * go to the limbo and block on the very same sync
+	 * transaction which it tries to confirm now.
+	 */
+	txn_set_flag(txn, TXN_FORCE_ASYNC);
 
 	if (txn_begin_stmt(txn, NULL) != 0)
 		goto rollback;
@@ -238,11 +247,21 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
 	assert(limbo->instance_id != REPLICA_ID_NIL);
 	struct txn_limbo_entry *e, *tmp;
 	rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
-		if (e->lsn > lsn)
+		/*
+		 * Confirm a transaction if
+		 * - it is a sync transaction covered by the
+		 *   confirmation LSN;
+		 * - it is an async transaction, and it is the
+		 *   last in the queue. So it does not depend on
+		 *   a not finished sync transaction anymore and
+		 *   can be confirmed too.
+		 */
+		if (e->lsn > lsn && txn_has_flag(e->txn, TXN_WAIT_ACK))
 			break;
 		e->is_commit = true;
 		txn_limbo_remove(limbo, e);
 		txn_clear_flag(e->txn, TXN_WAIT_ACK);
+		txn_clear_flag(e->txn, TXN_WAIT_SYNC);
 		/*
 		 * If  txn_complete_async() was already called,
 		 * finish tx processing. Otherwise just clear the
@@ -277,6 +296,7 @@ txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
 		e->is_rollback = true;
 		txn_limbo_pop(limbo, e);
 		txn_clear_flag(e->txn, TXN_WAIT_ACK);
+		txn_clear_flag(e->txn, TXN_WAIT_SYNC);
 		if (e->txn->signature >= 0) {
 			/* Rollback the transaction. */
 			e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK;
@@ -307,15 +327,26 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 	struct txn_limbo_entry *e;
 	struct txn_limbo_entry *last_quorum = NULL;
 	rlist_foreach_entry(e, &limbo->queue, in_queue) {
-		if (e->lsn <= prev_lsn)
-			continue;
 		if (e->lsn > lsn)
 			break;
-		if (++e->ack_count >= replication_synchro_quorum) {
-			e->is_commit = true;
-			last_quorum = e;
-		}
+		if (e->lsn <= prev_lsn)
+			continue;
 		assert(e->ack_count <= VCLOCK_MAX);
+		/*
+		 * Sync transactions need to collect acks. Async
+		 * transactions are automatically committed right
+		 * after all the previous sync transactions are.
+		 */
+		if (txn_has_flag(e->txn, TXN_WAIT_ACK)) {
+			if (++e->ack_count < replication_synchro_quorum)
+				continue;
+		} else {
+			assert(txn_has_flag(e->txn, TXN_WAIT_SYNC));
+			if (last_quorum == NULL)
+				continue;
+		}
+		e->is_commit = true;
+		last_quorum = e;
 	}
 	if (last_quorum != NULL) {
 		if (txn_limbo_write_confirm(limbo, last_quorum) != 0) {
diff --git a/test/replication/sync_replication_sanity.result b/test/replication/sync_replication_sanity.result
index 8b37ba6f5..f713d4b08 100644
--- a/test/replication/sync_replication_sanity.result
+++ b/test/replication/sync_replication_sanity.result
@@ -224,6 +224,81 @@ box.space.sync:select{4}
  | - - [4]
  | ...
 
+--
+-- Async transactions should wait for existing sync transactions
+-- finish.
+--
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+-- Start 2 fibers, which will execute one right after the other
+-- in the same event loop iteration.
+f = fiber.create(box.space.sync.replace, box.space.sync, {5}) s:replace{5}
+ | ---
+ | ...
+f:status()
+ | ---
+ | - dead
+ | ...
+s:select{5}
+ | ---
+ | - - [5]
+ | ...
+box.space.sync:select{5}
+ | ---
+ | - - [5]
+ | ...
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+box.space.test:select{5}
+ | ---
+ | - - [5]
+ | ...
+box.space.sync:select{5}
+ | ---
+ | - - [5]
+ | ...
+-- Ensure sync rollback will affect all pending async transactions
+-- too.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3}
+ | ---
+ | ...
+f = fiber.create(box.space.sync.replace, box.space.sync, {6}) s:replace{6}
+ | ---
+ | - error: Quorum collection for a synchronous transaction is timed out
+ | ...
+f:status()
+ | ---
+ | - dead
+ | ...
+s:select{6}
+ | ---
+ | - []
+ | ...
+box.space.sync:select{6}
+ | ---
+ | - []
+ | ...
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+box.space.test:select{6}
+ | ---
+ | - []
+ | ...
+box.space.sync:select{6}
+ | ---
+ | - []
+ | ...
+
 -- Cleanup.
 test_run:cmd('switch default')
  | ---
diff --git a/test/replication/sync_replication_sanity.test.lua b/test/replication/sync_replication_sanity.test.lua
index b0326fd4b..f84b6ee19 100644
--- a/test/replication/sync_replication_sanity.test.lua
+++ b/test/replication/sync_replication_sanity.test.lua
@@ -92,6 +92,32 @@ box.space.sync:replace{4}
 test_run:switch('replica')
 box.space.sync:select{4}
 
+--
+-- Async transactions should wait for existing sync transactions
+-- finish.
+--
+test_run:switch('default')
+-- Start 2 fibers, which will execute one right after the other
+-- in the same event loop iteration.
+f = fiber.create(box.space.sync.replace, box.space.sync, {5}) s:replace{5}
+f:status()
+s:select{5}
+box.space.sync:select{5}
+test_run:switch('replica')
+box.space.test:select{5}
+box.space.sync:select{5}
+-- Ensure sync rollback will affect all pending async transactions
+-- too.
+test_run:switch('default')
+box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3}
+f = fiber.create(box.space.sync.replace, box.space.sync, {6}) s:replace{6}
+f:status()
+s:select{6}
+box.space.sync:select{6}
+test_run:switch('replica')
+box.space.test:select{6}
+box.space.sync:select{6}
+
 -- Cleanup.
 test_run:cmd('switch default')
 
diff --git a/test/unit/snap_quorum_delay.cc b/test/unit/snap_quorum_delay.cc
index 7a200673a..e6cf381bf 100644
--- a/test/unit/snap_quorum_delay.cc
+++ b/test/unit/snap_quorum_delay.cc
@@ -97,8 +97,12 @@ txn_process_func(va_list ap)
 	enum process_type process_type = (enum process_type)va_arg(ap, int);
 	struct txn *txn = txn_begin();
 	txn->fiber = fiber();
-	/* Set the TXN_WAIT_ACK flag to simulate a sync transaction.*/
+	/*
+	 * Set the TXN_WAIT_ACK + SYNC flags to simulate a sync
+	 * transaction.
+	 */
 	txn_set_flag(txn, TXN_WAIT_ACK);
+	txn_set_flag(txn, TXN_WAIT_SYNC);
 	/*
 	 * The true way to push the transaction to limbo is to call
 	 * txn_commit() for sync transaction. But, if txn_commit()
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list