[Tarantool-patches] [PATCH v8 1/9] xrow: introduce struct synchro_request
Cyrill Gorcunov
gorcunov at gmail.com
Mon Aug 17 16:39:10 MSK 2020
From: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
All requests saved to WAL and transmitted through network have
their own request structure with parameters:
- struct request for DML;
- struct call_request for CALL/EVAL;
- struct auth_request for AUTH;
- struct ballot for VOTE;
- struct sql_request for SQL;
- struct greeting for greeting.
It is done for a reason - not to pass all the request parameters
into each function one by one, and manage them all at once
instead.
For synchronous requests IPROTO_CONFIRM and IPROTO_ROLLBACK it was
not done. Because so far it was not too hard to carry just 2
parameters: lsn and replica_id, from their body.
But it will be changed in #5129. Because in fact these requests
have more parameters, but they were filled by txn module, since
synchro requests were saved to WAL via transactions (due to lack
of alternative API to access WAL).
After #5129 it will be necessary to save LSN and replica_id of the
request author. This patch introduces struct synchro_request to
simplify extension of the synchro parameters.
Closes #5151
Needed for #5129
---
src/box/applier.cc | 32 +++++------------------
src/box/box.cc | 21 ++++-----------
src/box/txn_limbo.c | 56 ++++++++++++++++++++++++++--------------
src/box/txn_limbo.h | 15 +++--------
src/box/xrow.c | 52 +++++++++----------------------------
src/box/xrow.h | 63 +++++++++++++++++++++------------------------
6 files changed, 93 insertions(+), 146 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index a953d293e..98fb87375 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -275,26 +275,14 @@ process_nop(struct request *request)
* or rollback some of its entries.
*/
static int
-process_confirm_rollback(struct request *request, bool is_confirm)
+process_synchro_row(struct request *request)
{
assert(iproto_type_is_synchro_request(request->header->type));
- uint32_t replica_id;
struct txn *txn = in_txn();
- int64_t lsn = 0;
- int res = 0;
- if (is_confirm)
- res = xrow_decode_confirm(request->header, &replica_id, &lsn);
- else
- res = xrow_decode_rollback(request->header, &replica_id, &lsn);
- if (res == -1)
- return -1;
-
- if (replica_id != txn_limbo.instance_id) {
- diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
- txn_limbo.instance_id);
+ struct synchro_request syn_req;
+ if (xrow_decode_synchro(request->header, &syn_req) != 0)
return -1;
- }
assert(txn->n_applier_rows == 0);
/*
* This is not really a transaction. It just uses txn API
@@ -306,16 +294,9 @@ process_confirm_rollback(struct request *request, bool is_confirm)
if (txn_begin_stmt(txn, NULL) != 0)
return -1;
-
- if (txn_commit_stmt(txn, request) == 0) {
- if (is_confirm)
- txn_limbo_read_confirm(&txn_limbo, lsn);
- else
- txn_limbo_read_rollback(&txn_limbo, lsn);
- return 0;
- } else {
+ if (txn_commit_stmt(txn, request) != 0)
return -1;
- }
+ return txn_limbo_process(&txn_limbo, &syn_req);
}
static int
@@ -324,8 +305,7 @@ apply_row(struct xrow_header *row)
struct request request;
if (iproto_type_is_synchro_request(row->type)) {
request.header = row;
- return process_confirm_rollback(&request,
- row->type == IPROTO_CONFIRM);
+ return process_synchro_row(&request);
}
if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
return -1;
diff --git a/src/box/box.cc b/src/box/box.cc
index 83eef5d98..8e811e9c1 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -367,22 +367,11 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
{
struct request request;
if (iproto_type_is_synchro_request(row->type)) {
- uint32_t replica_id;
- int64_t lsn;
- switch(row->type) {
- case IPROTO_CONFIRM:
- if (xrow_decode_confirm(row, &replica_id, &lsn) < 0)
- diag_raise();
- assert(txn_limbo.instance_id == replica_id);
- txn_limbo_read_confirm(&txn_limbo, lsn);
- break;
- case IPROTO_ROLLBACK:
- if (xrow_decode_rollback(row, &replica_id, &lsn) < 0)
- diag_raise();
- assert(txn_limbo.instance_id == replica_id);
- txn_limbo_read_rollback(&txn_limbo, lsn);
- break;
- }
+ struct synchro_request syn_req;
+ if (xrow_decode_synchro(row, &syn_req) != 0)
+ diag_raise();
+ if (txn_limbo_process(&txn_limbo, &syn_req) != 0)
+ diag_raise();
return;
}
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index a2043c17a..944161c30 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -31,6 +31,7 @@
#include "txn.h"
#include "txn_limbo.h"
#include "replication.h"
+#include "iproto_constants.h"
struct txn_limbo txn_limbo;
@@ -272,11 +273,15 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
}
static void
-txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
- bool is_confirm)
+txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
{
assert(lsn > 0);
+ struct synchro_request req;
+ req.type = type;
+ req.replica_id = limbo->instance_id;
+ req.lsn = lsn;
+
struct xrow_header row;
struct request request = {
.header = &row,
@@ -286,19 +291,7 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
if (txn == NULL)
goto rollback;
- int res = 0;
- if (is_confirm) {
- res = xrow_encode_confirm(&row, &txn->region,
- limbo->instance_id, lsn);
- } else {
- /*
- * This LSN is the first to be rolled back, so
- * the last "safe" lsn is lsn - 1.
- */
- res = xrow_encode_rollback(&row, &txn->region,
- limbo->instance_id, lsn);
- }
- if (res == -1)
+ if (xrow_encode_synchro(&row, &txn->region, &req) != 0)
goto rollback;
/*
* This is not really a transaction. It just uses txn API
@@ -325,7 +318,7 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
* problems are fixed. Or retry automatically with some period.
*/
panic("Could not write a synchro request to WAL: lsn = %lld, type = "
- "%s\n", lsn, is_confirm ? "CONFIRM" : "ROLLBACK");
+ "%s\n", lsn, iproto_type_name(type));
}
/**
@@ -338,10 +331,11 @@ txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
assert(lsn > limbo->confirmed_lsn);
assert(!limbo->is_in_rollback);
limbo->confirmed_lsn = lsn;
- txn_limbo_write_confirm_rollback(limbo, lsn, true);
+ txn_limbo_write_synchro(limbo, IPROTO_CONFIRM, lsn);
}
-void
+/** Confirm all the entries <= @a lsn. */
+static void
txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
{
assert(limbo->instance_id != REPLICA_ID_NIL);
@@ -390,11 +384,12 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
assert(lsn > limbo->confirmed_lsn);
assert(!limbo->is_in_rollback);
limbo->is_in_rollback = true;
- txn_limbo_write_confirm_rollback(limbo, lsn, false);
+ txn_limbo_write_synchro(limbo, IPROTO_ROLLBACK, lsn);
limbo->is_in_rollback = false;
}
-void
+/** Rollback all the entries >= @a lsn. */
+static void
txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
{
assert(limbo->instance_id != REPLICA_ID_NIL);
@@ -577,6 +572,27 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
return 0;
}
+int
+txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
+{
+ if (req->replica_id != limbo->instance_id) {
+ diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, req->replica_id,
+ limbo->instance_id);
+ return -1;
+ }
+ switch (req->type) {
+ case IPROTO_CONFIRM:
+ txn_limbo_read_confirm(limbo, req->lsn);
+ break;
+ case IPROTO_ROLLBACK:
+ txn_limbo_read_rollback(limbo, req->lsn);
+ break;
+ default:
+ unreachable();
+ }
+ return 0;
+}
+
void
txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn)
{
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 04ee7ea5c..eaf662987 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -39,6 +39,7 @@ extern "C" {
#endif /* defined(__cplusplus) */
struct txn;
+struct synchro_request;
/**
* Transaction and its quorum metadata, to be stored in limbo.
@@ -245,17 +246,9 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
int
txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
-/**
- * Confirm all the entries up to the given master's LSN.
- */
-void
-txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
-
-/**
- * Rollback all the entries starting with given master's LSN.
- */
-void
-txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn);
+/** Execute a synchronous replication request. */
+int
+txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req);
/**
* Waiting for confirmation of all "sync" transactions
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 0c797a9d5..4b5d4356f 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -893,13 +893,13 @@ xrow_encode_dml(const struct request *request, struct region *region,
return iovcnt;
}
-static int
-xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
- uint32_t replica_id, int64_t lsn, int type)
+int
+xrow_encode_synchro(struct xrow_header *row, struct region *region,
+ const struct synchro_request *req)
{
size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) +
- mp_sizeof_uint(replica_id) + mp_sizeof_uint(IPROTO_LSN) +
- mp_sizeof_uint(lsn);
+ mp_sizeof_uint(req->replica_id) +
+ mp_sizeof_uint(IPROTO_LSN) + mp_sizeof_uint(req->lsn);
char *buf = (char *)region_alloc(region, len);
if (buf == NULL) {
diag_set(OutOfMemory, len, "region_alloc", "buf");
@@ -909,9 +909,9 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
pos = mp_encode_map(pos, 2);
pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
- pos = mp_encode_uint(pos, replica_id);
+ pos = mp_encode_uint(pos, req->replica_id);
pos = mp_encode_uint(pos, IPROTO_LSN);
- pos = mp_encode_uint(pos, lsn);
+ pos = mp_encode_uint(pos, req->lsn);
memset(row, 0, sizeof(*row));
@@ -919,30 +919,13 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
row->body[0].iov_len = len;
row->bodycnt = 1;
- row->type = type;
+ row->type = req->type;
return 0;
}
int
-xrow_encode_confirm(struct xrow_header *row, struct region *region,
- uint32_t replica_id, int64_t lsn)
-{
- return xrow_encode_confirm_rollback(row, region, replica_id, lsn,
- IPROTO_CONFIRM);
-}
-
-int
-xrow_encode_rollback(struct xrow_header *row, struct region *region,
- uint32_t replica_id, int64_t lsn)
-{
- return xrow_encode_confirm_rollback(row, region, replica_id, lsn,
- IPROTO_ROLLBACK);
-}
-
-static int
-xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,
- int64_t *lsn)
+xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -977,30 +960,19 @@ xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,
}
switch (key) {
case IPROTO_REPLICA_ID:
- *replica_id = mp_decode_uint(&d);
+ req->replica_id = mp_decode_uint(&d);
break;
case IPROTO_LSN:
- *lsn = mp_decode_uint(&d);
+ req->lsn = mp_decode_uint(&d);
break;
default:
mp_next(&d);
}
}
+ req->type = row->type;
return 0;
}
-int
-xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)
-{
- return xrow_decode_confirm_rollback(row, replica_id, lsn);
-}
-
-int
-xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)
-{
- return xrow_decode_confirm_rollback(row, replica_id, lsn);
-}
-
int
xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
{
diff --git a/src/box/xrow.h b/src/box/xrow.h
index e21ede5a3..02dca74e5 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -216,54 +216,51 @@ xrow_encode_dml(const struct request *request, struct region *region,
struct iovec *iov);
/**
- * Encode the CONFIRM to row body and set row type to
- * IPROTO_CONFIRM.
- * @param row xrow header.
- * @param region Region to use to encode the confirmation body.
- * @param replica_id master's instance id.
- * @param lsn last confirmed lsn.
- * @retval -1 on error.
- * @retval 0 success.
+ * Synchronous replication request - confirmation or rollback of
+ * pending synchronous transactions.
*/
-int
-xrow_encode_confirm(struct xrow_header *row, struct region *region,
- uint32_t replica_id, int64_t lsn);
+struct synchro_request {
+ /** Operation type - IPROTO_ROLLBACK or IPROTO_CONFIRM. */
+ uint32_t type;
+ /**
+ * ID of the instance owning the pending transactions.
+ * Note, it may be not the same instance, who created this
+ * request. An instance can make an operation on foreign
+ * synchronous transactions in case a new master tries to
+ * finish transactions of an old master.
+ */
+ uint32_t replica_id;
+ /**
+ * Operation LSN.
+ * In case of CONFIRM it means 'confirm all
+ * transactions with lsn <= this value'.
+ * In case of ROLLBACK it means 'rollback all transactions
+ * with lsn >= this value'.
+ */
+ int64_t lsn;
+};
/**
- * Decode the CONFIRM request body.
+ * Encode synchronous replication request.
* @param row xrow header.
- * @param[out] replica_id master's instance id.
- * @param[out] lsn last confirmed lsn.
+ * @param region Region to use to encode the confirmation body.
+ * @param req Request parameters.
* @retval -1 on error.
* @retval 0 success.
*/
int
-xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);
-
-/**
- * Encode the ROLLBACK row body and set row type to
- * IPROTO_ROLLBACK.
- * @param row xrow header.
- * @param region Region to use to encode the rollback body.
- * @param replica_id master's instance id.
- * @param lsn lsn to rollback from, including it.
- * @retval -1 on error.
- * @retval 0 success.
- */
-int
-xrow_encode_rollback(struct xrow_header *row, struct region *region,
- uint32_t replica_id, int64_t lsn);
+xrow_encode_synchro(struct xrow_header *row, struct region *region,
+ const struct synchro_request *req);
/**
- * Decode the ROLLBACK row body.
+ * Decode synchronous replication request.
* @param row xrow header.
- * @param[out] replica_id master's instance id.
- * @param[out] lsn lsn to rollback from, including it.
+ * @param[out] req Request parameters.
* @retval -1 on error.
* @retval 0 success.
*/
int
-xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);
+xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
/**
* CALL/EVAL request.
--
2.26.2
More information about the Tarantool-patches
mailing list