Hi! Thanks for the review!
On 14.08.2020 00:17, Cyrill Gorcunov wrote:
> On Thu, Aug 13, 2020 at 11:58:20PM +0200, Vladislav Shpilevoy wrote:
> ...
>> +txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
>> {
>> assert(lsn > 0);
>>
>> + struct synchro_request req;
>> + req.type = type;
>> + req.replica_id = limbo->instance_id;
>> + req.lsn = lsn;
>> +
>
> Vlad, while you're at this code, could we please use designated
> initialization from the very beginning, ie
>
> struct synchro_request req = {
> .type = type,
> .replica_id = limbo->instance_id,
> .lsn = lsn,
> };
>
> (the alignment is up to you though). Such initialization won't
> allow a bug to happen when we get a structure extension and
> other non updated fields will be zeroified.
Ok, done:
====================
@@ -277,10 +277,11 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
{
assert(lsn > 0);
- struct synchro_request req;
- req.type = type;
- req.replica_id = limbo->instance_id;
- req.lsn = lsn;
+ struct synchro_request req = {
+ .type = type,
+ .replica_id = limbo->instance_id,
+ .lsn = lsn,
+ };
====================
New complete patch:
====================
diff --git a/src/box/applier.cc b/src/box/applier.cc
index a953d293e..98fb87375 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -275,26 +275,14 @@ process_nop(struct request *request)
* or rollback some of its entries.
*/
static int
-process_confirm_rollback(struct request *request, bool is_confirm)
+process_synchro_row(struct request *request)
{
assert(iproto_type_is_synchro_request(request->header->type));
- uint32_t replica_id;
struct txn *txn = in_txn();
- int64_t lsn = 0;
- int res = 0;
- if (is_confirm)
- res = xrow_decode_confirm(request->header, &replica_id, &lsn);
- else
- res = xrow_decode_rollback(request->header, &replica_id, &lsn);
- if (res == -1)
- return -1;
-
- if (replica_id != txn_limbo.instance_id) {
- diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
- txn_limbo.instance_id);
+ struct synchro_request syn_req;
+ if (xrow_decode_synchro(request->header, &syn_req) != 0)
return -1;
- }
assert(txn->n_applier_rows == 0);
/*
* This is not really a transaction. It just uses txn API
@@ -306,16 +294,9 @@ process_confirm_rollback(struct request *request, bool is_confirm)
if (txn_begin_stmt(txn, NULL) != 0)
return -1;
-
- if (txn_commit_stmt(txn, request) == 0) {
- if (is_confirm)
- txn_limbo_read_confirm(&txn_limbo, lsn);
- else
- txn_limbo_read_rollback(&txn_limbo, lsn);
- return 0;
- } else {
+ if (txn_commit_stmt(txn, request) != 0)
return -1;
- }
+ return txn_limbo_process(&txn_limbo, &syn_req);
}
static int
@@ -324,8 +305,7 @@ apply_row(struct xrow_header *row)
struct request request;
if (iproto_type_is_synchro_request(row->type)) {
request.header = row;
- return process_confirm_rollback(&request,
- row->type == IPROTO_CONFIRM);
+ return process_synchro_row(&request);
}
if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
return -1;
diff --git a/src/box/box.cc b/src/box/box.cc
index 83eef5d98..8e811e9c1 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -367,22 +367,11 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
{
struct request request;
if (iproto_type_is_synchro_request(row->type)) {
- uint32_t replica_id;
- int64_t lsn;
- switch(row->type) {
- case IPROTO_CONFIRM:
- if (xrow_decode_confirm(row, &replica_id, &lsn) < 0)
- diag_raise();
- assert(txn_limbo.instance_id == replica_id);
- txn_limbo_read_confirm(&txn_limbo, lsn);
- break;
- case IPROTO_ROLLBACK:
- if (xrow_decode_rollback(row, &replica_id, &lsn) < 0)
- diag_raise();
- assert(txn_limbo.instance_id == replica_id);
- txn_limbo_read_rollback(&txn_limbo, lsn);
- break;
- }
+ struct synchro_request syn_req;
+ if (xrow_decode_synchro(row, &syn_req) != 0)
+ diag_raise();
+ if (txn_limbo_process(&txn_limbo, &syn_req) != 0)
+ diag_raise();
return;
}
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index a2043c17a..c6a4e5efc 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -31,6 +31,7 @@
#include "txn.h"
#include "txn_limbo.h"
#include "replication.h"
+#include "iproto_constants.h"
struct txn_limbo txn_limbo;
@@ -272,11 +273,16 @@ complete:
}
static void
-txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
- bool is_confirm)
+txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
{
assert(lsn > 0);
+ struct synchro_request req = {
+ .type = type,
+ .replica_id = limbo->instance_id,
+ .lsn = lsn,
+ };
+
struct xrow_header row;
struct request request = {
.header = &row,
@@ -286,19 +292,7 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
if (txn == NULL)
goto rollback;
- int res = 0;
- if (is_confirm) {
- res = xrow_encode_confirm(&row, &txn->region,
- limbo->instance_id, lsn);
- } else {
- /*
- * This LSN is the first to be rolled back, so
- * the last "safe" lsn is lsn - 1.
- */
- res = xrow_encode_rollback(&row, &txn->region,
- limbo->instance_id, lsn);
- }
- if (res == -1)
+ if (xrow_encode_synchro(&row, &txn->region, &req) != 0)
goto rollback;
/*
* This is not really a transaction. It just uses txn API
@@ -325,7 +319,7 @@ rollback:
* problems are fixed. Or retry automatically with some period.
*/
panic("Could not write a synchro request to WAL: lsn = %lld, type = "
- "%s\n", lsn, is_confirm ? "CONFIRM" : "ROLLBACK");
+ "%s\n", lsn, iproto_type_name(type));
}
/**
@@ -338,10 +332,11 @@ txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
assert(lsn > limbo->confirmed_lsn);
assert(!limbo->is_in_rollback);
limbo->confirmed_lsn = lsn;
- txn_limbo_write_confirm_rollback(limbo, lsn, true);
+ txn_limbo_write_synchro(limbo, IPROTO_CONFIRM, lsn);
}
-void
+/** Confirm all the entries <= @a lsn. */
+static void
txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
{
assert(limbo->instance_id != REPLICA_ID_NIL);
@@ -390,11 +385,12 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
assert(lsn > limbo->confirmed_lsn);
assert(!limbo->is_in_rollback);
limbo->is_in_rollback = true;
- txn_limbo_write_confirm_rollback(limbo, lsn, false);
+ txn_limbo_write_synchro(limbo, IPROTO_ROLLBACK, lsn);
limbo->is_in_rollback = false;
}
-void
+/** Rollback all the entries >= @a lsn. */
+static void
txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
{
assert(limbo->instance_id != REPLICA_ID_NIL);
@@ -577,6 +573,27 @@ complete:
return 0;
}
+int
+txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
+{
+ if (req->replica_id != limbo->instance_id) {
+ diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, req->replica_id,
+ limbo->instance_id);
+ return -1;
+ }
+ switch (req->type) {
+ case IPROTO_CONFIRM:
+ txn_limbo_read_confirm(limbo, req->lsn);
+ break;
+ case IPROTO_ROLLBACK:
+ txn_limbo_read_rollback(limbo, req->lsn);
+ break;
+ default:
+ unreachable();
+ }
+ return 0;
+}
+
void
txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn)
{
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 04ee7ea5c..eaf662987 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -39,6 +39,7 @@ extern "C" {
#endif /* defined(__cplusplus) */
struct txn;
+struct synchro_request;
/**
* Transaction and its quorum metadata, to be stored in limbo.
@@ -245,17 +246,9 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
int
txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
-/**
- * Confirm all the entries up to the given master's LSN.
- */
-void
-txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
-
-/**
- * Rollback all the entries starting with given master's LSN.
- */
-void
-txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn);
+/** Execute a synchronous replication request. */
+int
+txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req);
/**
* Waiting for confirmation of all "sync" transactions
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 0c797a9d5..bf174c701 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -893,13 +893,13 @@ xrow_encode_dml(const struct request *request, struct region *region,
return iovcnt;
}
-static int
-xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
- uint32_t replica_id, int64_t lsn, int type)
+int
+xrow_encode_synchro(struct xrow_header *row, struct region *region,
+ const struct synchro_request *req)
{
size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) +
- mp_sizeof_uint(replica_id) + mp_sizeof_uint(IPROTO_LSN) +
- mp_sizeof_uint(lsn);
+ mp_sizeof_uint(req->replica_id) +
+ mp_sizeof_uint(IPROTO_LSN) + mp_sizeof_uint(req->lsn);
char *buf = (char *)region_alloc(region, len);
if (buf == NULL) {
diag_set(OutOfMemory, len, "region_alloc", "buf");
@@ -909,9 +909,9 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
pos = mp_encode_map(pos, 2);
pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
- pos = mp_encode_uint(pos, replica_id);
+ pos = mp_encode_uint(pos, req->replica_id);
pos = mp_encode_uint(pos, IPROTO_LSN);
- pos = mp_encode_uint(pos, lsn);
+ pos = mp_encode_uint(pos, req->lsn);
memset(row, 0, sizeof(*row));
@@ -919,30 +919,13 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
row->body[0].iov_len = len;
row->bodycnt = 1;
- row->type = type;
+ row->type = req->type;
return 0;
}
int
-xrow_encode_confirm(struct xrow_header *row, struct region *region,
- uint32_t replica_id, int64_t lsn)
-{
- return xrow_encode_confirm_rollback(row, region, replica_id, lsn,
- IPROTO_CONFIRM);
-}
-
-int
-xrow_encode_rollback(struct xrow_header *row, struct region *region,
- uint32_t replica_id, int64_t lsn)
-{
- return xrow_encode_confirm_rollback(row, region, replica_id, lsn,
- IPROTO_ROLLBACK);
-}
-
-static int
-xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,
- int64_t *lsn)
+xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -960,6 +943,7 @@ xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,
return -1;
}
+ memset(req, 0, sizeof(*req));
d = data;
uint32_t map_size = mp_decode_map(&d);
for (uint32_t i = 0; i < map_size; i++) {
@@ -977,30 +961,19 @@ xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,
}
switch (key) {
case IPROTO_REPLICA_ID:
- *replica_id = mp_decode_uint(&d);
+ req->replica_id = mp_decode_uint(&d);
break;
case IPROTO_LSN:
- *lsn = mp_decode_uint(&d);
+ req->lsn = mp_decode_uint(&d);
break;
default:
mp_next(&d);
}
}
+ req->type = row->type;
return 0;
}
-int
-xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)
-{
- return xrow_decode_confirm_rollback(row, replica_id, lsn);
-}
-
-int
-xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)
-{
- return xrow_decode_confirm_rollback(row, replica_id, lsn);
-}
-
int
xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
{
diff --git a/src/box/xrow.h b/src/box/xrow.h
index e21ede5a3..02dca74e5 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -216,54 +216,51 @@ xrow_encode_dml(const struct request *request, struct region *region,
struct iovec *iov);
/**
- * Encode the CONFIRM to row body and set row type to
- * IPROTO_CONFIRM.
- * @param row xrow header.
- * @param region Region to use to encode the confirmation body.
- * @param replica_id master's instance id.
- * @param lsn last confirmed lsn.
- * @retval -1 on error.
- * @retval 0 success.
+ * Synchronous replication request - confirmation or rollback of
+ * pending synchronous transactions.
*/
-int
-xrow_encode_confirm(struct xrow_header *row, struct region *region,
- uint32_t replica_id, int64_t lsn);
+struct synchro_request {
+ /** Operation type - IPROTO_ROLLBACK or IPROTO_CONFIRM. */
+ uint32_t type;
+ /**
+ * ID of the instance owning the pending transactions.
+ * Note, it may be not the same instance, who created this
+ * request. An instance can make an operation on foreign
+ * synchronous transactions in case a new master tries to
+ * finish transactions of an old master.
+ */
+ uint32_t replica_id;
+ /**
+ * Operation LSN.
+ * In case of CONFIRM it means 'confirm all
+ * transactions with lsn <= this value'.
+ * In case of ROLLBACK it means 'rollback all transactions
+ * with lsn >= this value'.
+ */
+ int64_t lsn;
+};
/**
- * Decode the CONFIRM request body.
+ * Encode synchronous replication request.
* @param row xrow header.
- * @param[out] replica_id master's instance id.
- * @param[out] lsn last confirmed lsn.
+ * @param region Region to use to encode the confirmation body.
+ * @param req Request parameters.
* @retval -1 on error.
* @retval 0 success.
*/
int
-xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);
-
-/**
- * Encode the ROLLBACK row body and set row type to
- * IPROTO_ROLLBACK.
- * @param row xrow header.
- * @param region Region to use to encode the rollback body.
- * @param replica_id master's instance id.
- * @param lsn lsn to rollback from, including it.
- * @retval -1 on error.
- * @retval 0 success.
- */
-int
-xrow_encode_rollback(struct xrow_header *row, struct region *region,
- uint32_t replica_id, int64_t lsn);
+xrow_encode_synchro(struct xrow_header *row, struct region *region,
+ const struct synchro_request *req);
/**
- * Decode the ROLLBACK row body.
+ * Decode synchronous replication request.
* @param row xrow header.
- * @param[out] replica_id master's instance id.
- * @param[out] lsn lsn to rollback from, including it.
+ * @param[out] req Request parameters.
* @retval -1 on error.
* @retval 0 success.
*/
int
-xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);
+xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
/**
* CALL/EVAL request.