[Tarantool-patches] [PATCH 1/1] xrow: introduce struct synchro_request

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Aug 15 16:24:11 MSK 2020


Hi! Thanks for the review!

On 14.08.2020 00:17, Cyrill Gorcunov wrote:
> On Thu, Aug 13, 2020 at 11:58:20PM +0200, Vladislav Shpilevoy wrote:
> ...
>> +txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
>>  {
>>  	assert(lsn > 0);
>>  
>> +	struct synchro_request req;
>> +	req.type = type;
>> +	req.replica_id = limbo->instance_id;
>> +	req.lsn = lsn;
>> +
> 
> Vlad, while you're at this code, could we please use designated
> initialization from the very beginning, ie
> 
> 	struct synchro_request req = {
> 		.type		= type,
> 		.replica_id	= limbo->instance_id,
> 		.lsn		= lsn,
> 	};
> 
> (the alignment is up to you though). Such initialization won't
> allow a bug to happen when we get a structure extension and
> other non updated fields will be zeroified.

Ok, done:

====================
@@ -277,10 +277,11 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
 {
 	assert(lsn > 0);
 
-	struct synchro_request req;
-	req.type = type;
-	req.replica_id = limbo->instance_id;
-	req.lsn = lsn;
+	struct synchro_request req = {
+		.type		= type,
+		.replica_id	= limbo->instance_id,
+		.lsn		= lsn,
+	};
 
====================

New complete patch:

====================
diff --git a/src/box/applier.cc b/src/box/applier.cc
index a953d293e..98fb87375 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -275,26 +275,14 @@ process_nop(struct request *request)
  * or rollback some of its entries.
  */
 static int
-process_confirm_rollback(struct request *request, bool is_confirm)
+process_synchro_row(struct request *request)
 {
 	assert(iproto_type_is_synchro_request(request->header->type));
-	uint32_t replica_id;
 	struct txn *txn = in_txn();
-	int64_t lsn = 0;
 
-	int res = 0;
-	if (is_confirm)
-		res = xrow_decode_confirm(request->header, &replica_id, &lsn);
-	else
-		res = xrow_decode_rollback(request->header, &replica_id, &lsn);
-	if (res == -1)
-		return -1;
-
-	if (replica_id != txn_limbo.instance_id) {
-		diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
-			 txn_limbo.instance_id);
+	struct synchro_request syn_req;
+	if (xrow_decode_synchro(request->header, &syn_req) != 0)
 		return -1;
-	}
 	assert(txn->n_applier_rows == 0);
 	/*
 	 * This is not really a transaction. It just uses txn API
@@ -306,16 +294,9 @@ process_confirm_rollback(struct request *request, bool is_confirm)
 
 	if (txn_begin_stmt(txn, NULL) != 0)
 		return -1;
-
-	if (txn_commit_stmt(txn, request) == 0) {
-		if (is_confirm)
-			txn_limbo_read_confirm(&txn_limbo, lsn);
-		else
-			txn_limbo_read_rollback(&txn_limbo, lsn);
-		return 0;
-	} else {
+	if (txn_commit_stmt(txn, request) != 0)
 		return -1;
-	}
+	return txn_limbo_process(&txn_limbo, &syn_req);
 }
 
 static int
@@ -324,8 +305,7 @@ apply_row(struct xrow_header *row)
 	struct request request;
 	if (iproto_type_is_synchro_request(row->type)) {
 		request.header = row;
-		return process_confirm_rollback(&request,
-						row->type == IPROTO_CONFIRM);
+		return process_synchro_row(&request);
 	}
 	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
 		return -1;
diff --git a/src/box/box.cc b/src/box/box.cc
index 83eef5d98..8e811e9c1 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -367,22 +367,11 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 {
 	struct request request;
 	if (iproto_type_is_synchro_request(row->type)) {
-		uint32_t replica_id;
-		int64_t lsn;
-		switch(row->type) {
-		case IPROTO_CONFIRM:
-			if (xrow_decode_confirm(row, &replica_id, &lsn) < 0)
-				diag_raise();
-			assert(txn_limbo.instance_id == replica_id);
-			txn_limbo_read_confirm(&txn_limbo, lsn);
-			break;
-		case IPROTO_ROLLBACK:
-			if (xrow_decode_rollback(row, &replica_id, &lsn) < 0)
-				diag_raise();
-			assert(txn_limbo.instance_id == replica_id);
-			txn_limbo_read_rollback(&txn_limbo, lsn);
-			break;
-		}
+		struct synchro_request syn_req;
+		if (xrow_decode_synchro(row, &syn_req) != 0)
+			diag_raise();
+		if (txn_limbo_process(&txn_limbo, &syn_req) != 0)
+			diag_raise();
 		return;
 	}
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index a2043c17a..c6a4e5efc 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -31,6 +31,7 @@
 #include "txn.h"
 #include "txn_limbo.h"
 #include "replication.h"
+#include "iproto_constants.h"
 
 struct txn_limbo txn_limbo;
 
@@ -272,11 +273,16 @@ complete:
 }
 
 static void
-txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
-				 bool is_confirm)
+txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
 {
 	assert(lsn > 0);
 
+	struct synchro_request req = {
+		.type		= type,
+		.replica_id	= limbo->instance_id,
+		.lsn		= lsn,
+	};
+
 	struct xrow_header row;
 	struct request request = {
 		.header = &row,
@@ -286,19 +292,7 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
 	if (txn == NULL)
 		goto rollback;
 
-	int res = 0;
-	if (is_confirm) {
-		res = xrow_encode_confirm(&row, &txn->region,
-					  limbo->instance_id, lsn);
-	} else {
-		/*
-		 * This LSN is the first to be rolled back, so
-		 * the last "safe" lsn is lsn - 1.
-		 */
-		res = xrow_encode_rollback(&row, &txn->region,
-					   limbo->instance_id, lsn);
-	}
-	if (res == -1)
+	if (xrow_encode_synchro(&row, &txn->region, &req) != 0)
 		goto rollback;
 	/*
 	 * This is not really a transaction. It just uses txn API
@@ -325,7 +319,7 @@ rollback:
 	 * problems are fixed. Or retry automatically with some period.
 	 */
 	panic("Could not write a synchro request to WAL: lsn = %lld, type = "
-	      "%s\n", lsn, is_confirm ? "CONFIRM" : "ROLLBACK");
+	      "%s\n", lsn, iproto_type_name(type));
 }
 
 /**
@@ -338,10 +332,11 @@ txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
 	assert(lsn > limbo->confirmed_lsn);
 	assert(!limbo->is_in_rollback);
 	limbo->confirmed_lsn = lsn;
-	txn_limbo_write_confirm_rollback(limbo, lsn, true);
+	txn_limbo_write_synchro(limbo, IPROTO_CONFIRM, lsn);
 }
 
-void
+/** Confirm all the entries <= @a lsn. */
+static void
 txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
 {
 	assert(limbo->instance_id != REPLICA_ID_NIL);
@@ -390,11 +385,12 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
 	assert(lsn > limbo->confirmed_lsn);
 	assert(!limbo->is_in_rollback);
 	limbo->is_in_rollback = true;
-	txn_limbo_write_confirm_rollback(limbo, lsn, false);
+	txn_limbo_write_synchro(limbo, IPROTO_ROLLBACK, lsn);
 	limbo->is_in_rollback = false;
 }
 
-void
+/** Rollback all the entries >= @a lsn. */
+static void
 txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
 {
 	assert(limbo->instance_id != REPLICA_ID_NIL);
@@ -577,6 +573,27 @@ complete:
 	return 0;
 }
 
+int
+txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
+{
+	if (req->replica_id != limbo->instance_id) {
+		diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, req->replica_id,
+			 limbo->instance_id);
+		return -1;
+	}
+	switch (req->type) {
+	case IPROTO_CONFIRM:
+		txn_limbo_read_confirm(limbo, req->lsn);
+		break;
+	case IPROTO_ROLLBACK:
+		txn_limbo_read_rollback(limbo, req->lsn);
+		break;
+	default:
+		unreachable();
+	}
+	return 0;
+}
+
 void
 txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn)
 {
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 04ee7ea5c..eaf662987 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -39,6 +39,7 @@ extern "C" {
 #endif /* defined(__cplusplus) */
 
 struct txn;
+struct synchro_request;
 
 /**
  * Transaction and its quorum metadata, to be stored in limbo.
@@ -245,17 +246,9 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
 int
 txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
 
-/**
- * Confirm all the entries up to the given master's LSN.
- */
-void
-txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
-
-/**
- * Rollback all the entries  starting with given master's LSN.
- */
-void
-txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn);
+/** Execute a synchronous replication request. */
+int
+txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req);
 
 /**
  * Waiting for confirmation of all "sync" transactions
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 0c797a9d5..bf174c701 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -893,13 +893,13 @@ xrow_encode_dml(const struct request *request, struct region *region,
 	return iovcnt;
 }
 
-static int
-xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
-			     uint32_t replica_id, int64_t lsn, int type)
+int
+xrow_encode_synchro(struct xrow_header *row, struct region *region,
+		    const struct synchro_request *req)
 {
 	size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) +
-		     mp_sizeof_uint(replica_id) + mp_sizeof_uint(IPROTO_LSN) +
-		     mp_sizeof_uint(lsn);
+		     mp_sizeof_uint(req->replica_id) +
+		     mp_sizeof_uint(IPROTO_LSN) + mp_sizeof_uint(req->lsn);
 	char *buf = (char *)region_alloc(region, len);
 	if (buf == NULL) {
 		diag_set(OutOfMemory, len, "region_alloc", "buf");
@@ -909,9 +909,9 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
 
 	pos = mp_encode_map(pos, 2);
 	pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
-	pos = mp_encode_uint(pos, replica_id);
+	pos = mp_encode_uint(pos, req->replica_id);
 	pos = mp_encode_uint(pos, IPROTO_LSN);
-	pos = mp_encode_uint(pos, lsn);
+	pos = mp_encode_uint(pos, req->lsn);
 
 	memset(row, 0, sizeof(*row));
 
@@ -919,30 +919,13 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
 	row->body[0].iov_len = len;
 	row->bodycnt = 1;
 
-	row->type = type;
+	row->type = req->type;
 
 	return 0;
 }
 
 int
-xrow_encode_confirm(struct xrow_header *row, struct region *region,
-		    uint32_t replica_id, int64_t lsn)
-{
-	return xrow_encode_confirm_rollback(row, region, replica_id, lsn,
-					    IPROTO_CONFIRM);
-}
-
-int
-xrow_encode_rollback(struct xrow_header *row, struct region *region,
-		     uint32_t replica_id, int64_t lsn)
-{
-	return xrow_encode_confirm_rollback(row, region, replica_id, lsn,
-					    IPROTO_ROLLBACK);
-}
-
-static int
-xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,
-			     int64_t *lsn)
+xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
 {
 	if (row->bodycnt == 0) {
 		diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -960,6 +943,7 @@ xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,
 		return -1;
 	}
 
+	memset(req, 0, sizeof(*req));
 	d = data;
 	uint32_t map_size = mp_decode_map(&d);
 	for (uint32_t i = 0; i < map_size; i++) {
@@ -977,30 +961,19 @@ xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,
 		}
 		switch (key) {
 		case IPROTO_REPLICA_ID:
-			*replica_id = mp_decode_uint(&d);
+			req->replica_id = mp_decode_uint(&d);
 			break;
 		case IPROTO_LSN:
-			*lsn = mp_decode_uint(&d);
+			req->lsn = mp_decode_uint(&d);
 			break;
 		default:
 			mp_next(&d);
 		}
 	}
+	req->type = row->type;
 	return 0;
 }
 
-int
-xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)
-{
-	return xrow_decode_confirm_rollback(row, replica_id, lsn);
-}
-
-int
-xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)
-{
-	return xrow_decode_confirm_rollback(row, replica_id, lsn);
-}
-
 int
 xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
 {
diff --git a/src/box/xrow.h b/src/box/xrow.h
index e21ede5a3..02dca74e5 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -216,54 +216,51 @@ xrow_encode_dml(const struct request *request, struct region *region,
 		struct iovec *iov);
 
 /**
- * Encode the CONFIRM to row body and set row type to
- * IPROTO_CONFIRM.
- * @param row xrow header.
- * @param region Region to use to encode the confirmation body.
- * @param replica_id master's instance id.
- * @param lsn last confirmed lsn.
- * @retval -1 on error.
- * @retval 0 success.
+ * Synchronous replication request - confirmation or rollback of
+ * pending synchronous transactions.
  */
-int
-xrow_encode_confirm(struct xrow_header *row, struct region *region,
-		    uint32_t replica_id, int64_t lsn);
+struct synchro_request {
+	/** Operation type - IPROTO_ROLLBACK or IPROTO_CONFIRM. */
+	uint32_t type;
+	/**
+	 * ID of the instance owning the pending transactions.
+	 * Note, it may be not the same instance, who created this
+	 * request. An instance can make an operation on foreign
+	 * synchronous transactions in case a new master tries to
+	 * finish transactions of an old master.
+	 */
+	uint32_t replica_id;
+	/**
+	 * Operation LSN.
+	 * In case of CONFIRM it means 'confirm all
+	 * transactions with lsn <= this value'.
+	 * In case of ROLLBACK it means 'rollback all transactions
+	 * with lsn >= this value'.
+	 */
+	int64_t lsn;
+};
 
 /**
- * Decode the CONFIRM request body.
+ * Encode synchronous replication request.
  * @param row xrow header.
- * @param[out] replica_id master's instance id.
- * @param[out] lsn last confirmed lsn.
+ * @param region Region to use to encode the confirmation body.
+ * @param req Request parameters.
  * @retval -1 on error.
  * @retval 0 success.
  */
 int
-xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);
-
-/**
- * Encode the ROLLBACK row body and set row type to
- * IPROTO_ROLLBACK.
- * @param row xrow header.
- * @param region Region to use to encode the rollback body.
- * @param replica_id master's instance id.
- * @param lsn lsn to rollback from, including it.
- * @retval -1  on error.
- * @retval 0 success.
- */
-int
-xrow_encode_rollback(struct xrow_header *row, struct region *region,
-		     uint32_t replica_id, int64_t lsn);
+xrow_encode_synchro(struct xrow_header *row, struct region *region,
+		    const struct synchro_request *req);
 
 /**
- * Decode the ROLLBACK row body.
+ * Decode synchronous replication request.
  * @param row xrow header.
- * @param[out] replica_id master's instance id.
- * @param[out] lsn lsn to rollback from, including it.
+ * @param[out] req Request parameters.
  * @retval -1 on error.
  * @retval 0 success.
  */
 int
-xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);
+xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
 
 /**
  * CALL/EVAL request.



More information about the Tarantool-patches mailing list