From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> To: Cyrill Gorcunov <gorcunov@gmail.com> Cc: tarantool-patches@dev.tarantool.org Subject: Re: [Tarantool-patches] [PATCH 1/1] xrow: introduce struct synchro_request Date: Sat, 15 Aug 2020 15:24:11 +0200 [thread overview] Message-ID: <eb8e07a3-66c0-8ef8-ce99-606118ba507a@tarantool.org> (raw) In-Reply-To: <20200813221757.GB2074@grain> Hi! Thanks for the review! On 14.08.2020 00:17, Cyrill Gorcunov wrote: > On Thu, Aug 13, 2020 at 11:58:20PM +0200, Vladislav Shpilevoy wrote: > ... >> +txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn) >> { >> assert(lsn > 0); >> >> + struct synchro_request req; >> + req.type = type; >> + req.replica_id = limbo->instance_id; >> + req.lsn = lsn; >> + > > Vlad, while you're at this code, could we please use designated > initialization from the very beginning, ie > > struct synchro_request req = { > .type = type, > .replica_id = limbo->instance_id, > .lsn = lsn, > }; > > (the alignment is up to you though). Such initialization won't > allow a bug to happen when we get a structure extension and > other non updated fields will be zeroified. Ok, done: ==================== @@ -277,10 +277,11 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn) { assert(lsn > 0); - struct synchro_request req; - req.type = type; - req.replica_id = limbo->instance_id; - req.lsn = lsn; + struct synchro_request req = { + .type = type, + .replica_id = limbo->instance_id, + .lsn = lsn, + }; ==================== New complete patch: ==================== diff --git a/src/box/applier.cc b/src/box/applier.cc index a953d293e..98fb87375 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -275,26 +275,14 @@ process_nop(struct request *request) * or rollback some of its entries. */ static int -process_confirm_rollback(struct request *request, bool is_confirm) +process_synchro_row(struct request *request) { assert(iproto_type_is_synchro_request(request->header->type)); - uint32_t replica_id; struct txn *txn = in_txn(); - int64_t lsn = 0; - int res = 0; - if (is_confirm) - res = xrow_decode_confirm(request->header, &replica_id, &lsn); - else - res = xrow_decode_rollback(request->header, &replica_id, &lsn); - if (res == -1) - return -1; - - if (replica_id != txn_limbo.instance_id) { - diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id, - txn_limbo.instance_id); + struct synchro_request syn_req; + if (xrow_decode_synchro(request->header, &syn_req) != 0) return -1; - } assert(txn->n_applier_rows == 0); /* * This is not really a transaction. It just uses txn API @@ -306,16 +294,9 @@ process_confirm_rollback(struct request *request, bool is_confirm) if (txn_begin_stmt(txn, NULL) != 0) return -1; - - if (txn_commit_stmt(txn, request) == 0) { - if (is_confirm) - txn_limbo_read_confirm(&txn_limbo, lsn); - else - txn_limbo_read_rollback(&txn_limbo, lsn); - return 0; - } else { + if (txn_commit_stmt(txn, request) != 0) return -1; - } + return txn_limbo_process(&txn_limbo, &syn_req); } static int @@ -324,8 +305,7 @@ apply_row(struct xrow_header *row) struct request request; if (iproto_type_is_synchro_request(row->type)) { request.header = row; - return process_confirm_rollback(&request, - row->type == IPROTO_CONFIRM); + return process_synchro_row(&request); } if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) return -1; diff --git a/src/box/box.cc b/src/box/box.cc index 83eef5d98..8e811e9c1 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -367,22 +367,11 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) { struct request request; if (iproto_type_is_synchro_request(row->type)) { - uint32_t replica_id; - int64_t lsn; - switch(row->type) { - case IPROTO_CONFIRM: - if (xrow_decode_confirm(row, &replica_id, &lsn) < 0) - diag_raise(); - assert(txn_limbo.instance_id == replica_id); - txn_limbo_read_confirm(&txn_limbo, lsn); - break; - case IPROTO_ROLLBACK: - if (xrow_decode_rollback(row, &replica_id, &lsn) < 0) - diag_raise(); - assert(txn_limbo.instance_id == replica_id); - txn_limbo_read_rollback(&txn_limbo, lsn); - break; - } + struct synchro_request syn_req; + if (xrow_decode_synchro(row, &syn_req) != 0) + diag_raise(); + if (txn_limbo_process(&txn_limbo, &syn_req) != 0) + diag_raise(); return; } xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index a2043c17a..c6a4e5efc 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -31,6 +31,7 @@ #include "txn.h" #include "txn_limbo.h" #include "replication.h" +#include "iproto_constants.h" struct txn_limbo txn_limbo; @@ -272,11 +273,16 @@ complete: } static void -txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn, - bool is_confirm) +txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn) { assert(lsn > 0); + struct synchro_request req = { + .type = type, + .replica_id = limbo->instance_id, + .lsn = lsn, + }; + struct xrow_header row; struct request request = { .header = &row, @@ -286,19 +292,7 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn, if (txn == NULL) goto rollback; - int res = 0; - if (is_confirm) { - res = xrow_encode_confirm(&row, &txn->region, - limbo->instance_id, lsn); - } else { - /* - * This LSN is the first to be rolled back, so - * the last "safe" lsn is lsn - 1. - */ - res = xrow_encode_rollback(&row, &txn->region, - limbo->instance_id, lsn); - } - if (res == -1) + if (xrow_encode_synchro(&row, &txn->region, &req) != 0) goto rollback; /* * This is not really a transaction. It just uses txn API @@ -325,7 +319,7 @@ rollback: * problems are fixed. Or retry automatically with some period. */ panic("Could not write a synchro request to WAL: lsn = %lld, type = " - "%s\n", lsn, is_confirm ? "CONFIRM" : "ROLLBACK"); + "%s\n", lsn, iproto_type_name(type)); } /** @@ -338,10 +332,11 @@ txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn) assert(lsn > limbo->confirmed_lsn); assert(!limbo->is_in_rollback); limbo->confirmed_lsn = lsn; - txn_limbo_write_confirm_rollback(limbo, lsn, true); + txn_limbo_write_synchro(limbo, IPROTO_CONFIRM, lsn); } -void +/** Confirm all the entries <= @a lsn. */ +static void txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) { assert(limbo->instance_id != REPLICA_ID_NIL); @@ -390,11 +385,12 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn) assert(lsn > limbo->confirmed_lsn); assert(!limbo->is_in_rollback); limbo->is_in_rollback = true; - txn_limbo_write_confirm_rollback(limbo, lsn, false); + txn_limbo_write_synchro(limbo, IPROTO_ROLLBACK, lsn); limbo->is_in_rollback = false; } -void +/** Rollback all the entries >= @a lsn. */ +static void txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn) { assert(limbo->instance_id != REPLICA_ID_NIL); @@ -577,6 +573,27 @@ complete: return 0; } +int +txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) +{ + if (req->replica_id != limbo->instance_id) { + diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, req->replica_id, + limbo->instance_id); + return -1; + } + switch (req->type) { + case IPROTO_CONFIRM: + txn_limbo_read_confirm(limbo, req->lsn); + break; + case IPROTO_ROLLBACK: + txn_limbo_read_rollback(limbo, req->lsn); + break; + default: + unreachable(); + } + return 0; +} + void txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn) { diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index 04ee7ea5c..eaf662987 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -39,6 +39,7 @@ extern "C" { #endif /* defined(__cplusplus) */ struct txn; +struct synchro_request; /** * Transaction and its quorum metadata, to be stored in limbo. @@ -245,17 +246,9 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); int txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); -/** - * Confirm all the entries up to the given master's LSN. - */ -void -txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn); - -/** - * Rollback all the entries starting with given master's LSN. - */ -void -txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn); +/** Execute a synchronous replication request. */ +int +txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req); /** * Waiting for confirmation of all "sync" transactions diff --git a/src/box/xrow.c b/src/box/xrow.c index 0c797a9d5..bf174c701 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -893,13 +893,13 @@ xrow_encode_dml(const struct request *request, struct region *region, return iovcnt; } -static int -xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region, - uint32_t replica_id, int64_t lsn, int type) +int +xrow_encode_synchro(struct xrow_header *row, struct region *region, + const struct synchro_request *req) { size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) + - mp_sizeof_uint(replica_id) + mp_sizeof_uint(IPROTO_LSN) + - mp_sizeof_uint(lsn); + mp_sizeof_uint(req->replica_id) + + mp_sizeof_uint(IPROTO_LSN) + mp_sizeof_uint(req->lsn); char *buf = (char *)region_alloc(region, len); if (buf == NULL) { diag_set(OutOfMemory, len, "region_alloc", "buf"); @@ -909,9 +909,9 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region, pos = mp_encode_map(pos, 2); pos = mp_encode_uint(pos, IPROTO_REPLICA_ID); - pos = mp_encode_uint(pos, replica_id); + pos = mp_encode_uint(pos, req->replica_id); pos = mp_encode_uint(pos, IPROTO_LSN); - pos = mp_encode_uint(pos, lsn); + pos = mp_encode_uint(pos, req->lsn); memset(row, 0, sizeof(*row)); @@ -919,30 +919,13 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region, row->body[0].iov_len = len; row->bodycnt = 1; - row->type = type; + row->type = req->type; return 0; } int -xrow_encode_confirm(struct xrow_header *row, struct region *region, - uint32_t replica_id, int64_t lsn) -{ - return xrow_encode_confirm_rollback(row, region, replica_id, lsn, - IPROTO_CONFIRM); -} - -int -xrow_encode_rollback(struct xrow_header *row, struct region *region, - uint32_t replica_id, int64_t lsn) -{ - return xrow_encode_confirm_rollback(row, region, replica_id, lsn, - IPROTO_ROLLBACK); -} - -static int -xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id, - int64_t *lsn) +xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req) { if (row->bodycnt == 0) { diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); @@ -960,6 +943,7 @@ xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id, return -1; } + memset(req, 0, sizeof(*req)); d = data; uint32_t map_size = mp_decode_map(&d); for (uint32_t i = 0; i < map_size; i++) { @@ -977,30 +961,19 @@ xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id, } switch (key) { case IPROTO_REPLICA_ID: - *replica_id = mp_decode_uint(&d); + req->replica_id = mp_decode_uint(&d); break; case IPROTO_LSN: - *lsn = mp_decode_uint(&d); + req->lsn = mp_decode_uint(&d); break; default: mp_next(&d); } } + req->type = row->type; return 0; } -int -xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn) -{ - return xrow_decode_confirm_rollback(row, replica_id, lsn); -} - -int -xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn) -{ - return xrow_decode_confirm_rollback(row, replica_id, lsn); -} - int xrow_to_iovec(const struct xrow_header *row, struct iovec *out) { diff --git a/src/box/xrow.h b/src/box/xrow.h index e21ede5a3..02dca74e5 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -216,54 +216,51 @@ xrow_encode_dml(const struct request *request, struct region *region, struct iovec *iov); /** - * Encode the CONFIRM to row body and set row type to - * IPROTO_CONFIRM. - * @param row xrow header. - * @param region Region to use to encode the confirmation body. - * @param replica_id master's instance id. - * @param lsn last confirmed lsn. - * @retval -1 on error. - * @retval 0 success. + * Synchronous replication request - confirmation or rollback of + * pending synchronous transactions. */ -int -xrow_encode_confirm(struct xrow_header *row, struct region *region, - uint32_t replica_id, int64_t lsn); +struct synchro_request { + /** Operation type - IPROTO_ROLLBACK or IPROTO_CONFIRM. */ + uint32_t type; + /** + * ID of the instance owning the pending transactions. + * Note, it may be not the same instance, who created this + * request. An instance can make an operation on foreign + * synchronous transactions in case a new master tries to + * finish transactions of an old master. + */ + uint32_t replica_id; + /** + * Operation LSN. + * In case of CONFIRM it means 'confirm all + * transactions with lsn <= this value'. + * In case of ROLLBACK it means 'rollback all transactions + * with lsn >= this value'. + */ + int64_t lsn; +}; /** - * Decode the CONFIRM request body. + * Encode synchronous replication request. * @param row xrow header. - * @param[out] replica_id master's instance id. - * @param[out] lsn last confirmed lsn. + * @param region Region to use to encode the confirmation body. + * @param req Request parameters. * @retval -1 on error. * @retval 0 success. */ int -xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn); - -/** - * Encode the ROLLBACK row body and set row type to - * IPROTO_ROLLBACK. - * @param row xrow header. - * @param region Region to use to encode the rollback body. - * @param replica_id master's instance id. - * @param lsn lsn to rollback from, including it. - * @retval -1 on error. - * @retval 0 success. - */ -int -xrow_encode_rollback(struct xrow_header *row, struct region *region, - uint32_t replica_id, int64_t lsn); +xrow_encode_synchro(struct xrow_header *row, struct region *region, + const struct synchro_request *req); /** - * Decode the ROLLBACK row body. + * Decode synchronous replication request. * @param row xrow header. - * @param[out] replica_id master's instance id. - * @param[out] lsn lsn to rollback from, including it. + * @param[out] req Request parameters. * @retval -1 on error. * @retval 0 success. */ int -xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn); +xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req); /** * CALL/EVAL request.
next prev parent reply other threads:[~2020-08-15 13:24 UTC|newest] Thread overview: 6+ messages / expand[flat|nested] mbox.gz Atom feed top 2020-08-13 21:58 Vladislav Shpilevoy 2020-08-13 22:17 ` Cyrill Gorcunov 2020-08-15 13:24 ` Vladislav Shpilevoy [this message] 2020-08-15 13:38 ` Cyrill Gorcunov 2020-08-17 15:51 ` Serge Petrenko 2020-08-17 20:22 ` Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=eb8e07a3-66c0-8ef8-ce99-606118ba507a@tarantool.org \ --to=v.shpilevoy@tarantool.org \ --cc=gorcunov@gmail.com \ --cc=tarantool-patches@dev.tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH 1/1] xrow: introduce struct synchro_request' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox