[Tarantool-patches] [PATCH v2 07/19] replication: write and read CONFIRM entries

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Jun 30 02:15:26 MSK 2020


From: Serge Petrenko <sergepetrenko at tarantool.org>

Make txn_limbo write a CONFIRM entry as soon as a batch of entries
receive their acks. CONFIRM entry is written to WAL and later replicated
to all the replicas.

Now replicas put synchronous transactions into txn_limbo and wait for
corresponding confirmation entries to arrive and end up in their WAL
before committing the transactions.

Closes #4847
---
 src/box/applier.cc    | 54 ++++++++++++++++++++++++++++
 src/box/box.cc        |  3 ++
 src/box/errcode.h     |  1 +
 src/box/relay.cc      |  6 ++--
 src/box/txn.c         | 77 ++++++++++++++++++++++++++++++++++-----
 src/box/txn.h         | 13 +++++++
 src/box/txn_limbo.c   | 84 +++++++++++++++++++++++++++++++++++++------
 src/box/txn_limbo.h   |  6 ++++
 test/box/error.result |  1 +
 9 files changed, 224 insertions(+), 21 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index df48b4796..1b9ea2f71 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -51,6 +51,7 @@
 #include "txn.h"
 #include "box.h"
 #include "scoped_guard.h"
+#include "txn_limbo.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -214,6 +215,11 @@ apply_snapshot_row(struct xrow_header *row)
 	struct txn *txn = txn_begin();
 	if (txn == NULL)
 		return -1;
+	/*
+	 * Do not wait for confirmation when fetching a snapshot.
+	 * Master only sends confirmed rows during join.
+	 */
+	txn_set_flag(txn, TXN_FORCE_ASYNC);
 	if (txn_begin_stmt(txn, space) != 0)
 		goto rollback;
 	/* no access checks here - applier always works with admin privs */
@@ -249,10 +255,47 @@ process_nop(struct request *request)
 	return txn_commit_stmt(txn, request);
 }
 
+/*
+ * CONFIRM rows aren't dml requests  and require special
+ * handling: instead of performing some operations on spaces,
+ * processing these requests required txn_limbo to confirm some
+ * of its entries.
+ */
+static int
+process_confirm(struct request *request)
+{
+	assert(request->header->type == IPROTO_CONFIRM);
+	uint32_t replica_id;
+	struct txn *txn = in_txn();
+	int64_t lsn = 0;
+	if (xrow_decode_confirm(request->header, &replica_id, &lsn) != 0)
+		return -1;
+
+	if (replica_id != txn_limbo.instance_id) {
+		diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
+			 txn_limbo.instance_id);
+		return -1;
+	}
+
+	if (txn_begin_stmt(txn, NULL) != 0)
+		return -1;
+
+	if (txn_commit_stmt(txn, request) == 0) {
+		txn_limbo_read_confirm(&txn_limbo, lsn);
+		return 0;
+	} else {
+		return -1;
+	}
+}
+
 static int
 apply_row(struct xrow_header *row)
 {
 	struct request request;
+	if (row->type == IPROTO_CONFIRM) {
+		request.header = row;
+		return process_confirm(&request);
+	}
 	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
 		return -1;
 	if (request.type == IPROTO_NOP)
@@ -270,9 +313,20 @@ apply_row(struct xrow_header *row)
 static int
 apply_final_join_row(struct xrow_header *row)
 {
+	/*
+	 * Confirms are ignored during join. All the data master
+	 * sends us is valid.
+	 */
+	if (row->type == IPROTO_CONFIRM)
+		return 0;
 	struct txn *txn = txn_begin();
 	if (txn == NULL)
 		return -1;
+	/*
+	 * Do not wait for confirmation while processing final
+	 * join rows. See apply_snapshot_row().
+	 */
+	txn_set_flag(txn, TXN_FORCE_ASYNC);
 	if (apply_row(row) != 0) {
 		txn_rollback(txn);
 		fiber_gc();
diff --git a/src/box/box.cc b/src/box/box.cc
index 02088ba01..ba7347367 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -342,6 +342,9 @@ static void
 apply_wal_row(struct xstream *stream, struct xrow_header *row)
 {
 	struct request request;
+	// TODO: process confirmation during recovery.
+	if (row->type == IPROTO_CONFIRM)
+		return;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
 	if (request.type != IPROTO_NOP) {
 		struct space *space = space_cache_find_xc(request.space_id);
diff --git a/src/box/errcode.h b/src/box/errcode.h
index 019c582af..3ba6866e5 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -267,6 +267,7 @@ struct errcode_record {
 	/*212 */_(ER_SEQUENCE_NOT_STARTED,		"Sequence '%s' is not started") \
 	/*213 */_(ER_NO_SUCH_SESSION_SETTING,	"Session setting %s doesn't exist") \
 	/*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \
+	/*215 */_(ER_SYNC_MASTER_MISMATCH,	"CONFIRM message arrived for an unknown master id %d, expected %d") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 36fc14b8c..0adc9fc98 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -404,7 +404,8 @@ tx_status_update(struct cmsg *msg)
 	 * Let pending synchronous transactions know, which of
 	 * them were successfully sent to the replica. Acks are
 	 * collected only by the transactions originator (which is
-	 * the single master in 100% so far).
+	 * the single master in 100% so far). Other instances wait
+	 * for master's CONFIRM message instead.
 	 */
 	if (txn_limbo.instance_id == instance_id) {
 		txn_limbo_ack(&txn_limbo, status->relay->replica->id,
@@ -770,7 +771,8 @@ static void
 relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
-	assert(iproto_type_is_dml(packet->type));
+	assert(iproto_type_is_dml(packet->type) ||
+	       packet->type == IPROTO_CONFIRM);
 	if (packet->group_id == GROUP_LOCAL) {
 		/*
 		 * We do not relay replica-local rows to other
diff --git a/src/box/txn.c b/src/box/txn.c
index 9de72461b..612cd19bc 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -36,6 +36,7 @@
 #include <fiber.h>
 #include "xrow.h"
 #include "errinj.h"
+#include "iproto_constants.h"
 
 double too_long_threshold;
 
@@ -81,7 +82,12 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
 	 */
 	struct space *space = stmt->space;
 	row->group_id = space != NULL ? space_group_id(space) : 0;
-	row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
+	/*
+	 * IPROTO_CONFIRM entries are supplementary and aren't
+	 * valid dml requests. They're encoded manually.
+	 */
+	if (likely(row->type != IPROTO_CONFIRM))
+		row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
 	if (row->bodycnt < 0)
 		return -1;
 	stmt->row = row;
@@ -321,8 +327,10 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 	 */
 	struct txn_stmt *stmt = txn_current_stmt(txn);
 
-	/* Create WAL record for the write requests in non-temporary spaces.
-	 * stmt->space can be NULL for IRPOTO_NOP.
+	/*
+	 * Create WAL record for the write requests in
+	 * non-temporary spaces. stmt->space can be NULL for
+	 * IRPOTO_NOP or IPROTO_CONFIRM.
 	 */
 	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
 		if (txn_add_redo(txn, stmt, request) != 0)
@@ -417,12 +425,12 @@ txn_run_rollback_triggers(struct txn *txn, struct rlist *triggers)
 /**
  * Complete transaction processing.
  */
-static void
+void
 txn_complete(struct txn *txn)
 {
 	/*
 	 * Note, engine can be NULL if transaction contains
-	 * IPROTO_NOP statements only.
+	 * IPROTO_NOP or IPROTO_CONFIRM statements.
 	 */
 	if (txn->signature < 0) {
 		/* Undo the transaction. */
@@ -536,7 +544,7 @@ txn_journal_entry_new(struct txn *txn)
 	 * space can't be synchronous. So if there is at least one
 	 * synchronous space, the transaction is not local.
 	 */
-	if (is_sync)
+	if (is_sync && !txn_has_flag(txn, TXN_FORCE_ASYNC))
 		txn_set_flag(txn, TXN_WAIT_ACK);
 
 	assert(remote_row == req->rows + txn->n_applier_rows);
@@ -598,6 +606,23 @@ txn_commit_nop(struct txn *txn)
 	return false;
 }
 
+/*
+ * A trigger called on tx rollback due to a failed WAL write,
+ * when tx is waiting for confirmation.
+ */
+static int
+txn_limbo_on_rollback(struct trigger *trig, void *event)
+{
+	(void) event;
+	struct txn *txn = (struct txn *) event;
+	/* Check whether limbo has performed the cleanup. */
+	if (txn->signature != TXN_SIGNATURE_ROLLBACK)
+		return 0;
+	struct txn_limbo_entry *entry = (struct txn_limbo_entry *) trig->data;
+	txn_limbo_abort(&txn_limbo, entry);
+	return 0;
+}
+
 int
 txn_commit_async(struct txn *txn)
 {
@@ -629,16 +654,52 @@ txn_commit_async(struct txn *txn)
 		return -1;
 	}
 
+	bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
+	struct txn_limbo_entry *limbo_entry;
+	if (is_sync) {
+		/*
+		 * We'll need this trigger for sync transactions later,
+		 * but allocation failure is inappropriate after the entry
+		 * is sent to journal, so allocate early.
+		 */
+		size_t size;
+		struct trigger *trig =
+			region_alloc_object(&txn->region, typeof(*trig), &size);
+		if (trig == NULL) {
+			txn_rollback(txn);
+			diag_set(OutOfMemory, size, "region_alloc_object",
+				 "trig");
+			return -1;
+		}
+
+		/* See txn_commit(). */
+		uint32_t origin_id = req->rows[0]->replica_id;
+		int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn;
+		limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
+		if (limbo_entry == NULL) {
+			txn_rollback(txn);
+			return -1;
+		}
+		assert(lsn > 0);
+		txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
+
+		/*
+		 * Set a trigger to abort waiting for confirm on
+		 * WAL write failure.
+		 */
+		trigger_create(trig, txn_limbo_on_rollback,
+			       limbo_entry, NULL);
+		txn_on_rollback(txn, trig);
+	}
+
 	fiber_set_txn(fiber(), NULL);
 	if (journal_write_async(req) != 0) {
 		fiber_set_txn(fiber(), txn);
 		txn_rollback(txn);
-
 		diag_set(ClientError, ER_WAL_IO);
 		diag_log();
 		return -1;
 	}
-
 	return 0;
 }
 
diff --git a/src/box/txn.h b/src/box/txn.h
index 8ec4a248c..c631d7033 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -73,6 +73,13 @@ enum txn_flag {
 	 * then finishes commit and returns success to a user.
 	 */
 	TXN_WAIT_ACK,
+	/**
+	 * A transaction mustn't wait for confirmation, even if it
+	 * touches synchronous spaces. Needed for join stage on
+	 * replica, when all the data coming from the master is
+	 * already confirmed by design.
+	 */
+	TXN_FORCE_ASYNC,
 };
 
 enum {
@@ -301,6 +308,12 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn)
 struct txn *
 txn_begin(void);
 
+/**
+ * Complete transaction processing.
+ */
+void
+txn_complete(struct txn *txn);
+
 /**
  * Commit a transaction.
  * @pre txn == in_txn()
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index 9de91db93..b38d82e4f 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -134,12 +134,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 		fiber_yield();
 	fiber_set_cancellable(cancellable);
 	// TODO: implement rollback.
-	// TODO: implement confirm.
 	assert(!entry->is_rollback);
+	assert(entry->is_commit);
 	txn_limbo_remove(limbo, entry);
 	txn_clear_flag(txn, TXN_WAIT_ACK);
 }
 
+/**
+ * Write a confirmation entry to WAL. After it's written all the
+ * transactions waiting for confirmation may be finished.
+ */
+static int
+txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+	struct xrow_header row;
+	struct request request = {
+		.header = &row,
+	};
+
+	if (xrow_encode_confirm(&row, limbo->instance_id, entry->lsn) < 0)
+		return -1;
+
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		return -1;
+
+	if (txn_begin_stmt(txn, NULL) != 0)
+		goto rollback;
+	if (txn_commit_stmt(txn, &request) != 0)
+		goto rollback;
+
+	return txn_commit(txn);
+rollback:
+	txn_rollback(txn);
+	return -1;
+}
+
+void
+txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
+{
+	assert(limbo->instance_id != REPLICA_ID_NIL &&
+	       limbo->instance_id != instance_id);
+	struct txn_limbo_entry *e, *tmp;
+	rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
+		if (e->lsn > lsn)
+			break;
+		e->is_commit = true;
+		txn_limbo_remove(limbo, e);
+		txn_clear_flag(e->txn, TXN_WAIT_ACK);
+		/*
+		 * If  txn_complete_async() was already called,
+		 * finish tx processing. Otherwise just clear the
+		 * "WAIT_ACK" flag. Tx procesing will finish once
+		 * the tx is written to WAL.
+		 */
+		if (e->txn->signature >= 0)
+			txn_complete(e->txn);
+	}
+}
+
 void
 txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 {
@@ -148,25 +201,34 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 	assert(limbo->instance_id != REPLICA_ID_NIL);
 	int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
 	vclock_follow(&limbo->vclock, replica_id, lsn);
-	struct txn_limbo_entry *e, *tmp;
-	rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
+	struct txn_limbo_entry *e;
+	struct txn_limbo_entry *last_quorum = NULL;
+	rlist_foreach_entry(e, &limbo->queue, in_queue) {
 		if (e->lsn <= prev_lsn)
 			continue;
 		if (e->lsn > lsn)
 			break;
 		if (++e->ack_count >= replication_synchro_quorum) {
-			// TODO: better call complete() right
-			// here. Appliers use async transactions,
-			// and their txns don't have fibers to
-			// wake up. That becomes actual, when
-			// appliers will be supposed to wait for
-			// 'confirm' message.
 			e->is_commit = true;
-			rlist_del_entry(e, in_queue);
-			fiber_wakeup(e->txn->fiber);
+			last_quorum = e;
 		}
 		assert(e->ack_count <= VCLOCK_MAX);
 	}
+	if (last_quorum != NULL) {
+		if (txn_limbo_write_confirm(limbo, last_quorum) != 0) {
+			// TODO: rollback.
+			return;
+		}
+		/*
+		 * Wakeup all the entries in direct order as soon
+		 * as confirmation message is written to WAL.
+		 */
+		rlist_foreach_entry(e, &limbo->queue, in_queue) {
+			fiber_wakeup(e->txn->fiber);
+			if (e == last_quorum)
+				break;
+		}
+	}
 }
 
 void
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 1ad1c567a..de415cd97 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -160,6 +160,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
 void
 txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
 
+/**
+ * Confirm all the entries up to the given master's LSN.
+ */
+void
+txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
+
 void
 txn_limbo_init();
 
diff --git a/test/box/error.result b/test/box/error.result
index 69c471085..34ded3930 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -433,6 +433,7 @@ t;
  |   212: box.error.SEQUENCE_NOT_STARTED
  |   213: box.error.NO_SUCH_SESSION_SETTING
  |   214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS
+ |   215: box.error.SYNC_MASTER_MISMATCH
  | ...
 
 test_run:cmd("setopt delimiter ''");
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list