<HTML><BODY><div> </div><div>Hi! Thanks for the patch!<br><br>LGTM.</div><div> </div><div> </div><div> </div><div> </div><blockquote style="border-left:1px solid #0857A6; margin:10px; padding:0 0 0 10px;">Суббота, 15 августа 2020, 16:24 +03:00 от Vladislav Shpilevoy <v.shpilevoy@tarantool.org>:<br> <div id=""><div class="js-helper js-readmsg-msg"><style type="text/css"></style><div><div id="style_15974978531009400933_BODY">Hi! Thanks for the review!<br><br>On 14.08.2020 00:17, Cyrill Gorcunov wrote:<div class="mail-quote-collapse">> On Thu, Aug 13, 2020 at 11:58:20PM +0200, Vladislav Shpilevoy wrote:<br>> ...<br>>> +txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)<br>>> {<br>>> assert(lsn > 0);<br>>><br>>> + struct synchro_request req;<br>>> + req.type = type;<br>>> + req.replica_id = limbo->instance_id;<br>>> + req.lsn = lsn;<br>>> +<br>><br>> Vlad, while you're at this code, could we please use designated<br>> initialization from the very beginning, ie<br>><br>> struct synchro_request req = {<br>> .type = type,<br>> .replica_id = limbo->instance_id,<br>> .lsn = lsn,<br>> };<br>><br>> (the alignment is up to you though). Such initialization won't<br>> allow a bug to happen when we get a structure extension and<br>> other non updated fields will be zeroified.</div><br>Ok, done:<br><br>====================<br>@@ -277,10 +277,11 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)<br> {<br> assert(lsn > 0);<br> <br>- struct synchro_request req;<br>- req.type = type;<br>- req.replica_id = limbo->instance_id;<br>- req.lsn = lsn;<br>+ struct synchro_request req = {<br>+ .type = type,<br>+ .replica_id = limbo->instance_id,<br>+ .lsn = lsn,<br>+ };<br> <br>====================<br><br>New complete patch:<br><br>====================<br>diff --git a/src/box/applier.cc b/src/box/applier.cc<br>index a953d293e..98fb87375 100644<br>--- a/src/box/applier.cc<br>+++ b/src/box/applier.cc<br>@@ -275,26 +275,14 @@ process_nop(struct request *request)<br> * or rollback some of its entries.<br> */<br> static int<br>-process_confirm_rollback(struct request *request, bool is_confirm)<br>+process_synchro_row(struct request *request)<br> {<br> assert(iproto_type_is_synchro_request(request->header->type));<br>- uint32_t replica_id;<br> struct txn *txn = in_txn();<br>- int64_t lsn = 0;<br> <br>- int res = 0;<br>- if (is_confirm)<br>- res = xrow_decode_confirm(request->header, &replica_id, &lsn);<br>- else<br>- res = xrow_decode_rollback(request->header, &replica_id, &lsn);<br>- if (res == -1)<br>- return -1;<br>-<br>- if (replica_id != txn_limbo.instance_id) {<br>- diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,<br>- txn_limbo.instance_id);<br>+ struct synchro_request syn_req;<br>+ if (xrow_decode_synchro(request->header, &syn_req) != 0)<br> return -1;<br>- }<br> assert(txn->n_applier_rows == 0);<br> /*<br> * This is not really a transaction. It just uses txn API<br>@@ -306,16 +294,9 @@ process_confirm_rollback(struct request *request, bool is_confirm)<br> <br> if (txn_begin_stmt(txn, NULL) != 0)<br> return -1;<br>-<br>- if (txn_commit_stmt(txn, request) == 0) {<br>- if (is_confirm)<br>- txn_limbo_read_confirm(&txn_limbo, lsn);<br>- else<br>- txn_limbo_read_rollback(&txn_limbo, lsn);<br>- return 0;<br>- } else {<br>+ if (txn_commit_stmt(txn, request) != 0)<br> return -1;<br>- }<br>+ return txn_limbo_process(&txn_limbo, &syn_req);<br> }<br> <br> static int<br>@@ -324,8 +305,7 @@ apply_row(struct xrow_header *row)<br> struct request request;<br> if (iproto_type_is_synchro_request(row->type)) {<br> request.header = row;<br>- return process_confirm_rollback(&request,<br>- row->type == IPROTO_CONFIRM);<br>+ return process_synchro_row(&request);<br> }<br> if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)<br> return -1;<br>diff --git a/src/box/box.cc b/src/box/box.cc<br>index 83eef5d98..8e811e9c1 100644<br>--- a/src/box/box.cc<br>+++ b/src/box/box.cc<br>@@ -367,22 +367,11 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)<br> {<br> struct request request;<br> if (iproto_type_is_synchro_request(row->type)) {<br>- uint32_t replica_id;<br>- int64_t lsn;<br>- switch(row->type) {<br>- case IPROTO_CONFIRM:<br>- if (xrow_decode_confirm(row, &replica_id, &lsn) < 0)<br>- diag_raise();<br>- assert(txn_limbo.instance_id == replica_id);<br>- txn_limbo_read_confirm(&txn_limbo, lsn);<br>- break;<br>- case IPROTO_ROLLBACK:<br>- if (xrow_decode_rollback(row, &replica_id, &lsn) < 0)<br>- diag_raise();<br>- assert(txn_limbo.instance_id == replica_id);<br>- txn_limbo_read_rollback(&txn_limbo, lsn);<br>- break;<br>- }<br>+ struct synchro_request syn_req;<br>+ if (xrow_decode_synchro(row, &syn_req) != 0)<br>+ diag_raise();<br>+ if (txn_limbo_process(&txn_limbo, &syn_req) != 0)<br>+ diag_raise();<br> return;<br> }<br> xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));<br>diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c<br>index a2043c17a..c6a4e5efc 100644<br>--- a/src/box/txn_limbo.c<br>+++ b/src/box/txn_limbo.c<br>@@ -31,6 +31,7 @@<br> #include "txn.h"<br> #include "txn_limbo.h"<br> #include "replication.h"<br>+#include "iproto_constants.h"<br> <br> struct txn_limbo txn_limbo;<br> <br>@@ -272,11 +273,16 @@ complete:<br> }<br> <br> static void<br>-txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,<br>- bool is_confirm)<br>+txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)<br> {<br> assert(lsn > 0);<br> <br>+ struct synchro_request req = {<br>+ .type = type,<br>+ .replica_id = limbo->instance_id,<br>+ .lsn = lsn,<br>+ };<br>+<br> struct xrow_header row;<br> struct request request = {<br> .header = &row,<br>@@ -286,19 +292,7 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,<br> if (txn == NULL)<br> goto rollback;<br> <br>- int res = 0;<br>- if (is_confirm) {<br>- res = xrow_encode_confirm(&row, &txn->region,<br>- limbo->instance_id, lsn);<br>- } else {<br>- /*<br>- * This LSN is the first to be rolled back, so<br>- * the last "safe" lsn is lsn - 1.<br>- */<br>- res = xrow_encode_rollback(&row, &txn->region,<br>- limbo->instance_id, lsn);<br>- }<br>- if (res == -1)<br>+ if (xrow_encode_synchro(&row, &txn->region, &req) != 0)<br> goto rollback;<br> /*<br> * This is not really a transaction. It just uses txn API<br>@@ -325,7 +319,7 @@ rollback:<br> * problems are fixed. Or retry automatically with some period.<br> */<br> panic("Could not write a synchro request to WAL: lsn = %lld, type = "<br>- "%s\n", lsn, is_confirm ? "CONFIRM" : "ROLLBACK");<br>+ "%s\n", lsn, iproto_type_name(type));<br> }<br> <br> /**<br>@@ -338,10 +332,11 @@ txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)<br> assert(lsn > limbo->confirmed_lsn);<br> assert(!limbo->is_in_rollback);<br> limbo->confirmed_lsn = lsn;<br>- txn_limbo_write_confirm_rollback(limbo, lsn, true);<br>+ txn_limbo_write_synchro(limbo, IPROTO_CONFIRM, lsn);<br> }<br> <br>-void<br>+/** Confirm all the entries <= @a lsn. */<br>+static void<br> txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)<br> {<br> assert(limbo->instance_id != REPLICA_ID_NIL);<br>@@ -390,11 +385,12 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)<br> assert(lsn > limbo->confirmed_lsn);<br> assert(!limbo->is_in_rollback);<br> limbo->is_in_rollback = true;<br>- txn_limbo_write_confirm_rollback(limbo, lsn, false);<br>+ txn_limbo_write_synchro(limbo, IPROTO_ROLLBACK, lsn);<br> limbo->is_in_rollback = false;<br> }<br> <br>-void<br>+/** Rollback all the entries >= @a lsn. */<br>+static void<br> txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)<br> {<br> assert(limbo->instance_id != REPLICA_ID_NIL);<br>@@ -577,6 +573,27 @@ complete:<br> return 0;<br> }<br> <br>+int<br>+txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)<br>+{<br>+ if (req->replica_id != limbo->instance_id) {<br>+ diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, req->replica_id,<br>+ limbo->instance_id);<br>+ return -1;<br>+ }<br>+ switch (req->type) {<br>+ case IPROTO_CONFIRM:<br>+ txn_limbo_read_confirm(limbo, req->lsn);<br>+ break;<br>+ case IPROTO_ROLLBACK:<br>+ txn_limbo_read_rollback(limbo, req->lsn);<br>+ break;<br>+ default:<br>+ unreachable();<br>+ }<br>+ return 0;<br>+}<br>+<br> void<br> txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn)<br> {<br>diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h<br>index 04ee7ea5c..eaf662987 100644<br>--- a/src/box/txn_limbo.h<br>+++ b/src/box/txn_limbo.h<br>@@ -39,6 +39,7 @@ extern "C" {<br> #endif /* defined(__cplusplus) */<br> <br> struct txn;<br>+struct synchro_request;<br> <br> /**<br> * Transaction and its quorum metadata, to be stored in limbo.<br>@@ -245,17 +246,9 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);<br> int<br> txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);<br> <br>-/**<br>- * Confirm all the entries up to the given master's LSN.<br>- */<br>-void<br>-txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);<br>-<br>-/**<br>- * Rollback all the entries starting with given master's LSN.<br>- */<br>-void<br>-txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn);<br>+/** Execute a synchronous replication request. */<br>+int<br>+txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req);<br> <br> /**<br> * Waiting for confirmation of all "sync" transactions<br>diff --git a/src/box/xrow.c b/src/box/xrow.c<br>index 0c797a9d5..bf174c701 100644<br>--- a/src/box/xrow.c<br>+++ b/src/box/xrow.c<br>@@ -893,13 +893,13 @@ xrow_encode_dml(const struct request *request, struct region *region,<br> return iovcnt;<br> }<br> <br>-static int<br>-xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,<br>- uint32_t replica_id, int64_t lsn, int type)<br>+int<br>+xrow_encode_synchro(struct xrow_header *row, struct region *region,<br>+ const struct synchro_request *req)<br> {<br> size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) +<br>- mp_sizeof_uint(replica_id) + mp_sizeof_uint(IPROTO_LSN) +<br>- mp_sizeof_uint(lsn);<br>+ mp_sizeof_uint(req->replica_id) +<br>+ mp_sizeof_uint(IPROTO_LSN) + mp_sizeof_uint(req->lsn);<br> char *buf = (char *)region_alloc(region, len);<br> if (buf == NULL) {<br> diag_set(OutOfMemory, len, "region_alloc", "buf");<br>@@ -909,9 +909,9 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,<br> <br> pos = mp_encode_map(pos, 2);<br> pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);<br>- pos = mp_encode_uint(pos, replica_id);<br>+ pos = mp_encode_uint(pos, req->replica_id);<br> pos = mp_encode_uint(pos, IPROTO_LSN);<br>- pos = mp_encode_uint(pos, lsn);<br>+ pos = mp_encode_uint(pos, req->lsn);<br> <br> memset(row, 0, sizeof(*row));<br> <br>@@ -919,30 +919,13 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,<br> row->body[0].iov_len = len;<br> row->bodycnt = 1;<br> <br>- row->type = type;<br>+ row->type = req->type;<br> <br> return 0;<br> }<br> <br> int<br>-xrow_encode_confirm(struct xrow_header *row, struct region *region,<br>- uint32_t replica_id, int64_t lsn)<br>-{<br>- return xrow_encode_confirm_rollback(row, region, replica_id, lsn,<br>- IPROTO_CONFIRM);<br>-}<br>-<br>-int<br>-xrow_encode_rollback(struct xrow_header *row, struct region *region,<br>- uint32_t replica_id, int64_t lsn)<br>-{<br>- return xrow_encode_confirm_rollback(row, region, replica_id, lsn,<br>- IPROTO_ROLLBACK);<br>-}<br>-<br>-static int<br>-xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,<br>- int64_t *lsn)<br>+xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)<br> {<br> if (row->bodycnt == 0) {<br> diag_set(ClientError, ER_INVALID_MSGPACK, "request body");<br>@@ -960,6 +943,7 @@ xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,<br> return -1;<br> }<br> <br>+ memset(req, 0, sizeof(*req));<br> d = data;<br> uint32_t map_size = mp_decode_map(&d);<br> for (uint32_t i = 0; i < map_size; i++) {<br>@@ -977,30 +961,19 @@ xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,<br> }<br> switch (key) {<br> case IPROTO_REPLICA_ID:<br>- *replica_id = mp_decode_uint(&d);<br>+ req->replica_id = mp_decode_uint(&d);<br> break;<br> case IPROTO_LSN:<br>- *lsn = mp_decode_uint(&d);<br>+ req->lsn = mp_decode_uint(&d);<br> break;<br> default:<br> mp_next(&d);<br> }<br> }<br>+ req->type = row->type;<br> return 0;<br> }<br> <br>-int<br>-xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)<br>-{<br>- return xrow_decode_confirm_rollback(row, replica_id, lsn);<br>-}<br>-<br>-int<br>-xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)<br>-{<br>- return xrow_decode_confirm_rollback(row, replica_id, lsn);<br>-}<br>-<br> int<br> xrow_to_iovec(const struct xrow_header *row, struct iovec *out)<br> {<br>diff --git a/src/box/xrow.h b/src/box/xrow.h<br>index e21ede5a3..02dca74e5 100644<br>--- a/src/box/xrow.h<br>+++ b/src/box/xrow.h<br>@@ -216,54 +216,51 @@ xrow_encode_dml(const struct request *request, struct region *region,<br> struct iovec *iov);<br> <br> /**<br>- * Encode the CONFIRM to row body and set row type to<br>- * IPROTO_CONFIRM.<br>- * @param row xrow header.<br>- * @param region Region to use to encode the confirmation body.<br>- * @param replica_id master's instance id.<br>- * @param lsn last confirmed lsn.<br>- * @retval -1 on error.<br>- * @retval 0 success.<br>+ * Synchronous replication request - confirmation or rollback of<br>+ * pending synchronous transactions.<br> */<br>-int<br>-xrow_encode_confirm(struct xrow_header *row, struct region *region,<br>- uint32_t replica_id, int64_t lsn);<br>+struct synchro_request {<br>+ /** Operation type - IPROTO_ROLLBACK or IPROTO_CONFIRM. */<br>+ uint32_t type;<br>+ /**<br>+ * ID of the instance owning the pending transactions.<br>+ * Note, it may be not the same instance, who created this<br>+ * request. An instance can make an operation on foreign<br>+ * synchronous transactions in case a new master tries to<br>+ * finish transactions of an old master.<br>+ */<br>+ uint32_t replica_id;<br>+ /**<br>+ * Operation LSN.<br>+ * In case of CONFIRM it means 'confirm all<br>+ * transactions with lsn <= this value'.<br>+ * In case of ROLLBACK it means 'rollback all transactions<br>+ * with lsn >= this value'.<br>+ */<br>+ int64_t lsn;<br>+};<br> <br> /**<br>- * Decode the CONFIRM request body.<br>+ * Encode synchronous replication request.<br> * @param row xrow header.<br>- * @param[out] replica_id master's instance id.<br>- * @param[out] lsn last confirmed lsn.<br>+ * @param region Region to use to encode the confirmation body.<br>+ * @param req Request parameters.<br> * @retval -1 on error.<br> * @retval 0 success.<br> */<br> int<br>-xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);<br>-<br>-/**<br>- * Encode the ROLLBACK row body and set row type to<br>- * IPROTO_ROLLBACK.<br>- * @param row xrow header.<br>- * @param region Region to use to encode the rollback body.<br>- * @param replica_id master's instance id.<br>- * @param lsn lsn to rollback from, including it.<br>- * @retval -1 on error.<br>- * @retval 0 success.<br>- */<br>-int<br>-xrow_encode_rollback(struct xrow_header *row, struct region *region,<br>- uint32_t replica_id, int64_t lsn);<br>+xrow_encode_synchro(struct xrow_header *row, struct region *region,<br>+ const struct synchro_request *req);<br> <br> /**<br>- * Decode the ROLLBACK row body.<br>+ * Decode synchronous replication request.<br> * @param row xrow header.<br>- * @param[out] replica_id master's instance id.<br>- * @param[out] lsn lsn to rollback from, including it.<br>+ * @param[out] req Request parameters.<br> * @retval -1 on error.<br> * @retval 0 success.<br> */<br> int<br>-xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);<br>+xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);<br> <br> /**<br> * CALL/EVAL request.<br> </div></div></div></div></blockquote><div> </div><div>--<br>Serge Petrenko</div></BODY></HTML>