[Tarantool-patches] [PATCH v2 4/9] box: make clear_synchro_queue() write a PROMOTE entry instead of CONFIRM + ROLLBACK

Serge Petrenko sergepetrenko at tarantool.org
Mon Apr 12 22:40:17 MSK 2021


A successful box_clear_synchro_queue() call results in writing
CONFIRM(N) ROLLBACK(N+1) pair, where N is  the confirmed lsn.

Let's write a single PROMOTE(N) entry instead. It'll have  the same
meaning as CONFIRM + ROLLBACK and it will give followers some additional
information regarding leader state change later.

Part of #5445
---
 src/box/applier.cc         |  4 +-
 src/box/box.cc             | 14 ++++++-
 src/box/iproto_constants.h |  5 +++
 src/box/txn_limbo.c        | 79 +++++++++++++++++++++-----------------
 src/box/txn_limbo.h        | 10 ++++-
 5 files changed, 72 insertions(+), 40 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 971b2e64c..e8cbbe27a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -763,7 +763,7 @@ applier_txn_wal_write_cb(struct trigger *trigger, void *event)
 
 struct synchro_entry {
 	/** Encoded form of a synchro record. */
-	struct synchro_body_bin	body_bin;
+	struct promote_body_bin body_bin;
 
 	/** xrow to write, used by the journal engine. */
 	struct xrow_header row;
@@ -822,7 +822,7 @@ synchro_entry_new(struct xrow_header *applier_row,
 	}
 
 	struct journal_entry *journal_entry = &entry->journal_entry;
-	struct synchro_body_bin *body_bin = &entry->body_bin;
+	struct synchro_body_bin *body_bin = &entry->body_bin.base;
 	struct xrow_header *row = &entry->row;
 
 	journal_entry->rows[0] = row;
diff --git a/src/box/box.cc b/src/box/box.cc
index b846ba8f5..8aba051a2 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1556,7 +1556,19 @@ box_clear_synchro_queue(bool try_wait)
 				 "new synchronous transactions appeared");
 			rc = -1;
 		} else {
-			txn_limbo_force_empty(&txn_limbo, wait_lsn);
+			/*
+			 * Term parameter is unused now, We'll pass
+			 * box_raft()->term there later.
+			 */
+			txn_limbo_write_promote(&txn_limbo, wait_lsn, 0);
+			struct synchro_request req = {
+				.type = 0, /* unused */
+				.replica_id = 0, /* unused */
+				.origin_id = instance_id,
+				.lsn = wait_lsn,
+				.term = 0, /* unused */
+			};
+			txn_limbo_read_promote(&txn_limbo, &req);
 			assert(txn_limbo_is_empty(&txn_limbo));
 		}
 	}
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 816a308d8..da78ac4d4 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -362,6 +362,11 @@ iproto_type_is_synchro_request(uint32_t type)
 	       type == IPROTO_PROMOTE;
 }
 
+static inline bool
+iproto_type_is_promote_request(uint32_t type)
+{
+	return type == IPROTO_PROMOTE;
+}
 static inline bool
 iproto_type_is_raft_request(uint32_t type)
 {
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index a22e0861a..f119c35b6 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -317,21 +317,25 @@ txn_limbo_write_cb(struct journal_entry *entry)
 }
 
 static void
-txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
+txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn,
+			uint64_t term)
 {
-	assert(lsn > 0);
+	assert(lsn >= 0);
 
 	struct synchro_request req = {
 		.type		= type,
 		.replica_id	= limbo->owner_id,
 		.lsn		= lsn,
+		.term		= term,
 	};
 
 	/*
-	 * This is a synchronous commit so we can
-	 * allocate everything on a stack.
+	 * This is a synchronous commit so we can allocate everything on a
+	 * stack. Promote body includes synchro body.
 	 */
-	struct synchro_body_bin body;
+	struct promote_body_bin body;
+	struct synchro_body_bin *base = &body.base;
+
 	struct xrow_header row;
 	char buf[sizeof(struct journal_entry) +
 		 sizeof(struct xrow_header *)];
@@ -339,7 +343,7 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
 	struct journal_entry *entry = (struct journal_entry *)buf;
 	entry->rows[0] = &row;
 
-	xrow_encode_synchro(&row, &body, &req);
+	xrow_encode_synchro(&row, base, &req);
 
 	journal_entry_create(entry, 1, xrow_approx_len(&row),
 			     txn_limbo_write_cb, fiber());
@@ -371,14 +375,14 @@ txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
 	assert(lsn > limbo->confirmed_lsn);
 	assert(!limbo->is_in_rollback);
 	limbo->confirmed_lsn = lsn;
-	txn_limbo_write_synchro(limbo, IPROTO_CONFIRM, lsn);
+	txn_limbo_write_synchro(limbo, IPROTO_CONFIRM, lsn, 0);
 }
 
 /** Confirm all the entries <= @a lsn. */
 static void
 txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
 {
-	assert(limbo->owner_id != REPLICA_ID_NIL);
+	assert(limbo->owner_id != REPLICA_ID_NIL || txn_limbo_is_empty(limbo));
 	assert(limbo == &txn_limbo);
 	struct txn_limbo_entry *e, *tmp;
 	rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
@@ -434,7 +438,7 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
 	assert(lsn > limbo->confirmed_lsn);
 	assert(!limbo->is_in_rollback);
 	limbo->is_in_rollback = true;
-	txn_limbo_write_synchro(limbo, IPROTO_ROLLBACK, lsn);
+	txn_limbo_write_synchro(limbo, IPROTO_ROLLBACK, lsn, 0);
 	limbo->is_in_rollback = false;
 }
 
@@ -442,7 +446,7 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
 static void
 txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
 {
-	assert(limbo->owner_id != REPLICA_ID_NIL);
+	assert(limbo->owner_id != REPLICA_ID_NIL || txn_limbo_is_empty(limbo));
 	assert(limbo == &txn_limbo);
 	struct txn_limbo_entry *e, *tmp;
 	struct txn_limbo_entry *last_rollback = NULL;
@@ -490,6 +494,32 @@ txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
 		box_update_ro_summary();
 }
 
+void
+txn_limbo_write_promote(struct txn_limbo *limbo, int64_t lsn, uint64_t term)
+{
+	limbo->confirmed_lsn = lsn;
+	/*
+	 * We make sure that promote is only written once everything this
+	 * instance has may be confirmed.
+	 */
+	struct txn_limbo_entry *e = txn_limbo_last_synchro_entry(limbo);
+	assert(e == NULL || e->lsn <= lsn);
+	(void) e;
+	txn_limbo_write_synchro(limbo, IPROTO_PROMOTE, lsn, term);
+	limbo->is_in_rollback = false;
+}
+
+void
+txn_limbo_read_promote(struct txn_limbo *limbo,
+		       const struct synchro_request *req)
+{
+	txn_limbo_read_confirm(limbo, req->lsn);
+	txn_limbo_read_rollback(limbo, req->lsn + 1);
+	assert(txn_limbo_is_empty(&txn_limbo));
+	limbo->owner_id = req->origin_id;
+	limbo->confirmed_lsn = 0;
+}
+
 void
 txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
 {
@@ -652,38 +682,15 @@ txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
 	case IPROTO_ROLLBACK:
 		txn_limbo_read_rollback(limbo, req->lsn);
 		break;
+	case IPROTO_PROMOTE:
+		txn_limbo_read_promote(limbo, req);
+		break;
 	default:
 		unreachable();
 	}
 	return;
 }
 
-void
-txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn)
-{
-	struct txn_limbo_entry *e, *last_quorum = NULL;
-	struct txn_limbo_entry *rollback = NULL;
-	rlist_foreach_entry(e, &limbo->queue, in_queue) {
-		if (txn_has_flag(e->txn, TXN_WAIT_ACK)) {
-			if (e->lsn <= confirm_lsn) {
-				last_quorum = e;
-			} else {
-				rollback = e;
-				break;
-			}
-		}
-	}
-
-	if (last_quorum != NULL) {
-		txn_limbo_write_confirm(limbo, last_quorum->lsn);
-		txn_limbo_read_confirm(limbo, last_quorum->lsn);
-	}
-	if (rollback != NULL) {
-		txn_limbo_write_rollback(limbo, rollback->lsn);
-		txn_limbo_read_rollback(limbo, rollback->lsn);
-	}
-}
-
 void
 txn_limbo_on_parameters_change(struct txn_limbo *limbo)
 {
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index f2a98c8bb..10db4fc2d 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -279,7 +279,15 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo);
  * immediately.
  */
 void
-txn_limbo_force_empty(struct txn_limbo *limbo, int64_t last_confirm);
+txn_limbo_write_promote(struct txn_limbo *limbo, int64_t lsn, uint64_t term);
+
+/**
+ * Process a PROMOTE request, i.e. confirm all entries <= @lsn and rollback all
+ * entries > @lsn.
+ */
+void
+txn_limbo_read_promote(struct txn_limbo *limbo,
+		       const struct synchro_request *req);
 
 /**
  * Update qsync parameters dynamically.
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list