[Tarantool-patches] [PATCH 4/4] txn_limbo: add ROLLBACK processing

Serge Petrenko sergepetrenko at tarantool.org
Thu Jun 18 15:14:03 MSK 2020


Now txn_limbo writes a ROLLBACK entry to WAL when one of the limbo
entries fails to gather quorum during a txn_limbo_confirm_timeout.
All the limbo entries, starting with the failed one, are rolled back in
reverse order.

Closes #4848
---
 src/box/applier.cc  |  40 ++++++++++++----
 src/box/relay.cc    |   2 +-
 src/box/txn.c       |   5 +-
 src/box/txn_limbo.c | 110 +++++++++++++++++++++++++++++++++++++++-----
 src/box/txn_limbo.h |  12 ++++-
 5 files changed, 146 insertions(+), 23 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index ad2ee18a5..872372f62 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -260,7 +260,7 @@ process_nop(struct request *request)
  * Confirms some of the txs waiting in txn_limbo.
  */
 static int
-applier_on_confirm(struct trigger *trig, void *data)
+applier_on_confirm_written(struct trigger *trig, void *data)
 {
 	(void) trig;
 	int64_t lsn = *(int64_t *)data;
@@ -268,10 +268,23 @@ applier_on_confirm(struct trigger *trig, void *data)
 	return 0;
 }
 
+/*
+ * An on_commit trigger set on a txn containing a ROLLBACK entry.
+ * Rolls back part of the txs waiting in limbo.
+ */
 static int
-process_confirm(struct request *request)
+applier_on_rollback_written(struct trigger *trig, void *data)
 {
-	assert(request->header->type == IPROTO_CONFIRM);
+	(void) trig;
+	int64_t lsn = *(int64_t *)data;
+	txn_limbo_read_rollback(&txn_limbo, lsn);
+	return 0;
+}
+
+static int
+process_confirm_rollback(struct request *request, bool is_confirm)
+{
+	assert(iproto_type_is_synchro_request(request->header->type));
 	uint32_t replica_id;
 	struct txn *txn = in_txn();
 	size_t size;
@@ -280,8 +293,14 @@ process_confirm(struct request *request)
 		diag_set(OutOfMemory, size, "region_alloc_object", "lsn");
 		return -1;
 	}
-	if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0)
+	int res = 0;
+	if (is_confirm)
+		res = xrow_decode_confirm(request->header, &replica_id, lsn);
+	else
+		res = xrow_decode_rollback(request->header, &replica_id, lsn);
+	if (res == -1)
 		return -1;
+
 	/*
 	 * on_commit trigger failure is not allowed, so check for
 	 * instance id early.
@@ -294,7 +313,7 @@ process_confirm(struct request *request)
 
 	/*
 	 * Set an on_commit trigger which will perform the actual
-	 * confirmation processing.
+	 * confirmation/rollback processing.
 	 */
 	struct trigger *trig = region_alloc_object(&txn->region, typeof(*trig),
 						   &size);
@@ -302,7 +321,9 @@ process_confirm(struct request *request)
 		diag_set(OutOfMemory, size, "region_alloc_object", "trig");
 		return -1;
 	}
-	trigger_create(trig, applier_on_confirm, lsn, NULL);
+	trigger_create(trig, is_confirm ? applier_on_confirm_written :
+					  applier_on_rollback_written,
+		       lsn, NULL);
 
 	if (txn_begin_stmt(txn, NULL) != 0)
 		return -1;
@@ -319,9 +340,10 @@ static int
 apply_row(struct xrow_header *row)
 {
 	struct request request;
-	if (row->type == IPROTO_CONFIRM) {
+	if (iproto_type_is_synchro_request(row->type)) {
 		request.header = row;
-		return process_confirm(&request);
+		return process_confirm_rollback(&request,
+						row->type == IPROTO_CONFIRM);
 	}
 	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
 		return -1;
@@ -344,7 +366,7 @@ apply_final_join_row(struct xrow_header *row)
 	 * Confirms are ignored during join. All the data master
 	 * sends us is valid.
 	 */
-	if (row->type == IPROTO_CONFIRM)
+	if (iproto_type_is_synchro_request(row->type))
 		return 0;
 	struct txn *txn = txn_begin();
 	if (txn == NULL)
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 0adc9fc98..29588b6ca 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -772,7 +772,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
 	assert(iproto_type_is_dml(packet->type) ||
-	       packet->type == IPROTO_CONFIRM);
+	       iproto_type_is_synchro_request(packet->type));
 	if (packet->group_id == GROUP_LOCAL) {
 		/*
 		 * We do not relay replica-local rows to other
diff --git a/src/box/txn.c b/src/box/txn.c
index 4f787db79..484b822db 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -751,7 +751,10 @@ txn_commit(struct txn *txn)
 	if (is_sync) {
 		txn_limbo_assign_lsn(&txn_limbo, limbo_entry,
 				     req->rows[req->n_rows - 1]->lsn);
-		txn_limbo_wait_complete(&txn_limbo, limbo_entry);
+		if (txn_limbo_wait_complete(&txn_limbo, limbo_entry) < 0) {
+			txn_free(txn);
+			return -1;
+		}
 	}
 	if (!txn_has_flag(txn, TXN_IS_DONE)) {
 		txn->signature = req->res;
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index a715a136e..c55f5bda1 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -84,6 +84,16 @@ txn_limbo_remove(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 	rlist_del_entry(entry, in_queue);
 }
 
+static inline void
+txn_limbo_pop(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+	assert(!rlist_empty(&entry->in_queue));
+	assert(rlist_last_entry(&limbo->queue, struct txn_limbo_entry,
+				 in_queue) == entry);
+	(void) limbo;
+	rlist_del_entry(entry, in_queue);
+}
+
 void
 txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 {
@@ -116,7 +126,11 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 	return entry->is_commit;
 }
 
-void
+static int
+txn_limbo_write_rollback(struct txn_limbo *limbo,
+			 struct txn_limbo_entry *entry);
+
+int
 txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 {
 	struct txn *txn = entry->txn;
@@ -125,33 +139,61 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 	assert(txn_has_flag(txn, TXN_WAIT_ACK));
 	if (txn_limbo_check_complete(limbo, entry)) {
 		txn_limbo_remove(limbo, entry);
-		return;
+		return 0;
 	}
 	bool cancellable = fiber_set_cancellable(false);
 	bool timed_out = fiber_yield_timeout(txn_limbo_confirm_timeout(limbo));
 	fiber_set_cancellable(cancellable);
 	if (timed_out) {
-		// TODO: implement rollback.
-		entry->is_rollback = true;
+		txn_limbo_write_rollback(limbo, entry);
+		struct txn_limbo_entry *e, *tmp;
+		rlist_foreach_entry_safe_reverse(e, &limbo->queue,
+						 in_queue, tmp) {
+			e->is_rollback = true;
+			e->txn->signature = -1;
+			txn_limbo_pop(limbo, e);
+			txn_clear_flag(e->txn, TXN_WAIT_ACK);
+			txn_complete(e->txn);
+			if (e == entry)
+				break;
+			fiber_wakeup(e->txn->fiber);
+		}
+		return -1;
 	}
 	assert(txn_limbo_entry_is_complete(entry));
+	/*
+	 * The first tx to be rolled back already performed all
+	 * the necessary cleanups for us.
+	 */
+	if (entry->is_rollback)
+		return -1;
 	txn_limbo_remove(limbo, entry);
 	txn_clear_flag(txn, TXN_WAIT_ACK);
+	return 0;
 }
 
-/**
- * Write a confirmation entry to WAL. After it's written all the
- * transactions waiting for confirmation may be finished.
- */
 static int
-txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+txn_limbo_write_confirm_rollback(struct txn_limbo *limbo,
+				 struct txn_limbo_entry *entry,
+				 bool is_confirm)
 {
 	struct xrow_header row;
 	struct request request = {
 		.header = &row,
 	};
 
-	if (xrow_encode_confirm(&row, limbo->instance_id, entry->lsn) < 0)
+	int res = 0;
+	if (is_confirm) {
+		res = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn);
+	} else {
+		/*
+		 * This entry is the first to be rolled back, so
+		 * the last "safe" lsn is entry->lsn - 1.
+		 */
+		res = xrow_encode_rollback(&row, limbo->instance_id,
+					   entry->lsn - 1);
+	}
+	if (res == -1)
 		return -1;
 
 	struct txn *txn = txn_begin();
@@ -169,6 +211,17 @@ rollback:
 	return -1;
 }
 
+/**
+ * Write a confirmation entry to WAL. After it's written all the
+ * transactions waiting for confirmation may be finished.
+ */
+static int
+txn_limbo_write_confirm(struct txn_limbo *limbo,
+			struct txn_limbo_entry *entry)
+{
+	return txn_limbo_write_confirm_rollback(limbo, entry, true);
+}
+
 void
 txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
 {
@@ -191,6 +244,38 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
 	}
 }
 
+/**
+ * Write a rollback message to WAL. After it's written
+ * all the tarnsactions following the current one and waiting
+ * for confirmation must be rolled back.
+ */
+static int
+txn_limbo_write_rollback(struct txn_limbo *limbo,
+			 struct txn_limbo_entry *entry)
+{
+	return txn_limbo_write_confirm_rollback(limbo, entry, false);
+}
+
+void
+txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
+{
+	assert(limbo->instance_id != REPLICA_ID_NIL &&
+	       limbo->instance_id != instance_id);
+	struct txn_limbo_entry *e, *tmp;
+	rlist_foreach_entry_safe_reverse(e, &limbo->queue, in_queue, tmp) {
+		if (e->lsn < lsn)
+			break;
+		assert(e->txn->fiber == NULL);
+		e->is_rollback = true;
+		txn_limbo_pop(limbo, e);
+		txn_clear_flag(e->txn, TXN_WAIT_ACK);
+
+		/* Rollback the transaction. */
+		e->txn->signature = -1;
+		txn_complete(e->txn);
+	}
+}
+
 void
 txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 {
@@ -214,7 +299,10 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 	}
 	if (last_quorum != NULL) {
 		if (txn_limbo_write_confirm(limbo, last_quorum) != 0) {
-			// TODO: rollback.
+			// TODO: what to do here?.
+			// We already failed writing the CONFIRM
+			// message. What are the chances we'll be
+			// able to write ROLLBACK?
 			return;
 		}
 		/*
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 23019e5d9..987cf9271 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -156,8 +156,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
 /**
  * Block the current fiber until the transaction in the limbo
  * entry is either committed or rolled back.
+ * If timeout is reached before acks are collected, the tx is
+ * rolled back as well as all the txs in the limbo following it.
+ * Returns -1 when rollback was performed and tx has to be freed.
+ *          0 when tx processing can go on.
  */
-void
+int
 txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
 
 /**
@@ -166,6 +170,12 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
 void
 txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
 
+/**
+ * Rollback all the entries  starting with given master's LSN.
+ */
+void
+txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn);
+
 /**
  * Return TRUE if limbo is empty.
  */
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list