[Tarantool-patches] [PATCH v2 10/19] txn_limbo: add ROLLBACK processing

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Jun 30 02:15:11 MSK 2020


From: Serge Petrenko <sergepetrenko at tarantool.org>

Now txn_limbo writes a ROLLBACK entry to WAL when one of the limbo
entries fails to gather quorum during a txn_limbo_confirm_timeout.
All the limbo entries, starting with the failed one, are rolled back in
reverse order.

Closes #4848
---
 src/box/applier.cc    |  38 +++++++++----
 src/box/box.cc        |   2 +-
 src/box/errcode.h     |   2 +
 src/box/relay.cc      |   2 +-
 src/box/txn.c         |  19 +++++--
 src/box/txn_limbo.c   | 124 ++++++++++++++++++++++++++++++++++++++----
 src/box/txn_limbo.h   |  12 +++-
 test/box/error.result |   2 +
 8 files changed, 173 insertions(+), 28 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 1b9ea2f71..fbb452dc0 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -256,19 +256,25 @@ process_nop(struct request *request)
 }
 
 /*
- * CONFIRM rows aren't dml requests  and require special
+ * CONFIRM/ROLLBACK rows aren't dml requests  and require special
  * handling: instead of performing some operations on spaces,
- * processing these requests required txn_limbo to confirm some
- * of its entries.
+ * processing these requests requires txn_limbo to either confirm
+ * or rollback some of its entries.
  */
 static int
-process_confirm(struct request *request)
+process_confirm_rollback(struct request *request, bool is_confirm)
 {
-	assert(request->header->type == IPROTO_CONFIRM);
+	assert(iproto_type_is_synchro_request(request->header->type));
 	uint32_t replica_id;
 	struct txn *txn = in_txn();
 	int64_t lsn = 0;
-	if (xrow_decode_confirm(request->header, &replica_id, &lsn) != 0)
+
+	int res = 0;
+	if (is_confirm)
+		res = xrow_decode_confirm(request->header, &replica_id, &lsn);
+	else
+		res = xrow_decode_rollback(request->header, &replica_id, &lsn);
+	if (res == -1)
 		return -1;
 
 	if (replica_id != txn_limbo.instance_id) {
@@ -281,7 +287,10 @@ process_confirm(struct request *request)
 		return -1;
 
 	if (txn_commit_stmt(txn, request) == 0) {
-		txn_limbo_read_confirm(&txn_limbo, lsn);
+		if (is_confirm)
+			txn_limbo_read_confirm(&txn_limbo, lsn);
+		else
+			txn_limbo_read_rollback(&txn_limbo, lsn);
 		return 0;
 	} else {
 		return -1;
@@ -292,9 +301,10 @@ static int
 apply_row(struct xrow_header *row)
 {
 	struct request request;
-	if (row->type == IPROTO_CONFIRM) {
+	if (iproto_type_is_synchro_request(row->type)) {
 		request.header = row;
-		return process_confirm(&request);
+		return process_confirm_rollback(&request,
+						row->type == IPROTO_CONFIRM);
 	}
 	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
 		return -1;
@@ -317,7 +327,7 @@ apply_final_join_row(struct xrow_header *row)
 	 * Confirms are ignored during join. All the data master
 	 * sends us is valid.
 	 */
-	if (row->type == IPROTO_CONFIRM)
+	if (iproto_type_is_synchro_request(row->type))
 		return 0;
 	struct txn *txn = txn_begin();
 	if (txn == NULL)
@@ -746,6 +756,14 @@ static int
 applier_txn_rollback_cb(struct trigger *trigger, void *event)
 {
 	(void) trigger;
+	struct txn *txn = (struct txn *) event;
+	/*
+	 * Synchronous transaction rollback due to receiving a
+	 * ROLLBACK entry is a normal event and requires no
+	 * special handling.
+	 */
+	if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK)
+		return 0;
 
 	/*
 	 * Setup shared applier diagnostic area.
diff --git a/src/box/box.cc b/src/box/box.cc
index ba7347367..d6ef6351b 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -343,7 +343,7 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 {
 	struct request request;
 	// TODO: process confirmation during recovery.
-	if (row->type == IPROTO_CONFIRM)
+	if (iproto_type_is_synchro_request(row->type))
 		return;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
 	if (request.type != IPROTO_NOP) {
diff --git a/src/box/errcode.h b/src/box/errcode.h
index 3ba6866e5..ea521aa07 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -268,6 +268,8 @@ struct errcode_record {
 	/*213 */_(ER_NO_SUCH_SESSION_SETTING,	"Session setting %s doesn't exist") \
 	/*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \
 	/*215 */_(ER_SYNC_MASTER_MISMATCH,	"CONFIRM message arrived for an unknown master id %d, expected %d") \
+        /*216 */_(ER_SYNC_QUORUM_TIMEOUT,       "Quorum collection for a synchronous transaction is timed out") \
+        /*217 */_(ER_SYNC_ROLLBACK,             "A rollback for a synchronous transaction is received") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 0adc9fc98..29588b6ca 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -772,7 +772,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
 	assert(iproto_type_is_dml(packet->type) ||
-	       packet->type == IPROTO_CONFIRM);
+	       iproto_type_is_synchro_request(packet->type));
 	if (packet->group_id == GROUP_LOCAL) {
 		/*
 		 * We do not relay replica-local rows to other
diff --git a/src/box/txn.c b/src/box/txn.c
index 612cd19bc..37955752a 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -83,10 +83,10 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
 	struct space *space = stmt->space;
 	row->group_id = space != NULL ? space_group_id(space) : 0;
 	/*
-	 * IPROTO_CONFIRM entries are supplementary and aren't
-	 * valid dml requests. They're encoded manually.
+	 * Sychronous replication entries are supplementary and
+	 * aren't valid dml requests. They're encoded manually.
 	 */
-	if (likely(row->type != IPROTO_CONFIRM))
+	if (likely(!iproto_type_is_synchro_request(row->type)))
 		row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
 	if (row->bodycnt < 0)
 		return -1;
@@ -490,6 +490,14 @@ void
 txn_complete_async(struct journal_entry *entry)
 {
 	struct txn *txn = entry->complete_data;
+	/*
+	 * txn_limbo has already rolled the tx back, so we just
+	 * have to free it.
+	 */
+	if (txn->signature < TXN_SIGNATURE_ROLLBACK) {
+		txn_free(txn);
+		return;
+	}
 	txn->signature = entry->res;
 	/*
 	 * Some commit/rollback triggers require for in_txn fiber
@@ -765,7 +773,10 @@ txn_commit(struct txn *txn)
 	if (is_sync) {
 		txn_limbo_assign_lsn(&txn_limbo, limbo_entry,
 				     req->rows[req->n_rows - 1]->lsn);
-		txn_limbo_wait_complete(&txn_limbo, limbo_entry);
+		if (txn_limbo_wait_complete(&txn_limbo, limbo_entry) < 0) {
+			txn_free(txn);
+			return -1;
+		}
 	}
 	if (!txn_has_flag(txn, TXN_IS_DONE)) {
 		txn->signature = req->res;
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index ac57fd1bd..680e81d3d 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -84,6 +84,16 @@ txn_limbo_remove(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 	rlist_del_entry(entry, in_queue);
 }
 
+static inline void
+txn_limbo_pop(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+	assert(!rlist_empty(&entry->in_queue));
+	assert(rlist_last_entry(&limbo->queue, struct txn_limbo_entry,
+				 in_queue) == entry);
+	(void) limbo;
+	rlist_del_entry(entry, in_queue);
+}
+
 void
 txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 {
@@ -118,7 +128,11 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 	return entry->is_commit;
 }
 
-void
+static int
+txn_limbo_write_rollback(struct txn_limbo *limbo,
+			 struct txn_limbo_entry *entry);
+
+int
 txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 {
 	struct txn *txn = entry->txn;
@@ -127,33 +141,64 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 	assert(txn_has_flag(txn, TXN_WAIT_ACK));
 	if (txn_limbo_check_complete(limbo, entry)) {
 		txn_limbo_remove(limbo, entry);
-		return;
+		return 0;
 	}
 	bool cancellable = fiber_set_cancellable(false);
 	bool timed_out = fiber_yield_timeout(txn_limbo_confirm_timeout(limbo));
 	fiber_set_cancellable(cancellable);
 	if (timed_out) {
-		// TODO: implement rollback.
-		entry->is_rollback = true;
+		txn_limbo_write_rollback(limbo, entry);
+		struct txn_limbo_entry *e, *tmp;
+		rlist_foreach_entry_safe_reverse(e, &limbo->queue,
+						 in_queue, tmp) {
+			e->is_rollback = true;
+			e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT;
+			txn_limbo_pop(limbo, e);
+			txn_clear_flag(e->txn, TXN_WAIT_ACK);
+			txn_complete(e->txn);
+			if (e == entry)
+				break;
+			fiber_wakeup(e->txn->fiber);
+		}
+		diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
+		return -1;
 	}
 	assert(txn_limbo_entry_is_complete(entry));
+	/*
+	 * The first tx to be rolled back already performed all
+	 * the necessary cleanups for us.
+	 */
+	if (entry->is_rollback) {
+		diag_set(ClientError, ER_SYNC_ROLLBACK);
+		return -1;
+	}
 	txn_limbo_remove(limbo, entry);
 	txn_clear_flag(txn, TXN_WAIT_ACK);
+	return 0;
 }
 
-/**
- * Write a confirmation entry to WAL. After it's written all the
- * transactions waiting for confirmation may be finished.
- */
 static int
-txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+txn_limbo_write_confirm_rollback(struct txn_limbo *limbo,
+				 struct txn_limbo_entry *entry,
+				 bool is_confirm)
 {
 	struct xrow_header row;
 	struct request request = {
 		.header = &row,
 	};
 
-	if (xrow_encode_confirm(&row, limbo->instance_id, entry->lsn) < 0)
+	int res = 0;
+	if (is_confirm) {
+		res = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn);
+	} else {
+		/*
+		 * This entry is the first to be rolled back, so
+		 * the last "safe" lsn is entry->lsn - 1.
+		 */
+		res = xrow_encode_rollback(&row, limbo->instance_id,
+					   entry->lsn - 1);
+	}
+	if (res == -1)
 		return -1;
 
 	struct txn *txn = txn_begin();
@@ -171,6 +216,17 @@ rollback:
 	return -1;
 }
 
+/**
+ * Write a confirmation entry to WAL. After it's written all the
+ * transactions waiting for confirmation may be finished.
+ */
+static int
+txn_limbo_write_confirm(struct txn_limbo *limbo,
+			struct txn_limbo_entry *entry)
+{
+	return txn_limbo_write_confirm_rollback(limbo, entry, true);
+}
+
 void
 txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
 {
@@ -194,6 +250,49 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
 	}
 }
 
+/**
+ * Write a rollback message to WAL. After it's written
+ * all the tarnsactions following the current one and waiting
+ * for confirmation must be rolled back.
+ */
+static int
+txn_limbo_write_rollback(struct txn_limbo *limbo,
+			 struct txn_limbo_entry *entry)
+{
+	return txn_limbo_write_confirm_rollback(limbo, entry, false);
+}
+
+void
+txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
+{
+	assert(limbo->instance_id != REPLICA_ID_NIL &&
+	       limbo->instance_id != instance_id);
+	struct txn_limbo_entry *e, *tmp;
+	rlist_foreach_entry_safe_reverse(e, &limbo->queue, in_queue, tmp) {
+		if (e->lsn <= lsn)
+			break;
+		e->is_rollback = true;
+		txn_limbo_pop(limbo, e);
+		txn_clear_flag(e->txn, TXN_WAIT_ACK);
+		if (e->txn->signature >= 0) {
+			/* Rollback the transaction. */
+			e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK;
+			txn_complete(e->txn);
+		} else {
+			/*
+			 * Rollback the transaction, but don't
+			 * free it yet. txn_complete_async() will
+			 * free it.
+			 */
+			e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK;
+			struct fiber *fiber = e->txn->fiber;
+			e->txn->fiber = fiber();
+			txn_complete(e->txn);
+			e->txn->fiber = fiber;
+		}
+	}
+}
+
 void
 txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 {
@@ -217,7 +316,10 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 	}
 	if (last_quorum != NULL) {
 		if (txn_limbo_write_confirm(limbo, last_quorum) != 0) {
-			// TODO: rollback.
+			// TODO: what to do here?.
+			// We already failed writing the CONFIRM
+			// message. What are the chances we'll be
+			// able to write ROLLBACK?
 			return;
 		}
 		/*
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 94f224131..138093c7c 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -156,8 +156,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
 /**
  * Block the current fiber until the transaction in the limbo
  * entry is either committed or rolled back.
+ * If timeout is reached before acks are collected, the tx is
+ * rolled back as well as all the txs in the limbo following it.
+ * Returns -1 when rollback was performed and tx has to be freed.
+ *          0 when tx processing can go on.
  */
-void
+int
 txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
 
 /**
@@ -166,6 +170,12 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
 void
 txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
 
+/**
+ * Rollback all the entries  starting with given master's LSN.
+ */
+void
+txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn);
+
 /**
  * Return TRUE if limbo is empty.
  */
diff --git a/test/box/error.result b/test/box/error.result
index 34ded3930..8241ec1a8 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -434,6 +434,8 @@ t;
  |   213: box.error.NO_SUCH_SESSION_SETTING
  |   214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS
  |   215: box.error.SYNC_MASTER_MISMATCH
+ |   216: box.error.SYNC_QUORUM_TIMEOUT
+ |   217: box.error.SYNC_ROLLBACK
  | ...
 
 test_run:cmd("setopt delimiter ''");
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list