From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> To: tarantool-patches@dev.tarantool.org, gorcunov@gmail.com, sergepetrenko@tarantool.org Subject: [Tarantool-patches] [PATCH 1/1] xrow: introduce struct synchro_request Date: Thu, 13 Aug 2020 23:58:20 +0200 [thread overview] Message-ID: <c7e18f7cf0be76d28983d036b8c6cb58062499cc.1597355824.git.v.shpilevoy@tarantool.org> (raw) All requests saved to WAL and transmitted through network have their own request structure with parameters: - struct request for DML; - struct call_request for CALL/EVAL; - struct auth_request for AUTH; - struct ballot for VOTE; - struct sql_request for SQL; - struct greeting for greeting. It is done for a reason - not to pass all the request parameters into each function one by one, and manage them all at once instead. For synchronous requests IPROTO_CONFIRM and IPROTO_ROLLBACK it was not done. Because so far it was not too hard to carry just 2 parameters: lsn and replica_id, from their body. But it will be changed in #5129. Because in fact these requests have more parameters, but they were filled by txn module, since synchro requests were saved to WAL via transactions (due to lack of alternative API to access WAL). After #5129 it will be necessary to save LSN and replica_id of the request author. This patch introduces struct synchro_request to simplify extension of the synchro parameters. Closes #5151 Needed for #5129 --- Branch: http://github.com/tarantool/tarantool/tree/gerold103/gh-5151-synchro_request Issue: https://github.com/tarantool/tarantool/issues/5151 src/box/applier.cc | 32 +++++------------------ src/box/box.cc | 21 ++++----------- src/box/txn_limbo.c | 56 ++++++++++++++++++++++++++-------------- src/box/txn_limbo.h | 15 +++-------- src/box/xrow.c | 52 +++++++++---------------------------- src/box/xrow.h | 63 +++++++++++++++++++++------------------------ 6 files changed, 93 insertions(+), 146 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index a953d293e..98fb87375 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -275,26 +275,14 @@ process_nop(struct request *request) * or rollback some of its entries. */ static int -process_confirm_rollback(struct request *request, bool is_confirm) +process_synchro_row(struct request *request) { assert(iproto_type_is_synchro_request(request->header->type)); - uint32_t replica_id; struct txn *txn = in_txn(); - int64_t lsn = 0; - int res = 0; - if (is_confirm) - res = xrow_decode_confirm(request->header, &replica_id, &lsn); - else - res = xrow_decode_rollback(request->header, &replica_id, &lsn); - if (res == -1) - return -1; - - if (replica_id != txn_limbo.instance_id) { - diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id, - txn_limbo.instance_id); + struct synchro_request syn_req; + if (xrow_decode_synchro(request->header, &syn_req) != 0) return -1; - } assert(txn->n_applier_rows == 0); /* * This is not really a transaction. It just uses txn API @@ -306,16 +294,9 @@ process_confirm_rollback(struct request *request, bool is_confirm) if (txn_begin_stmt(txn, NULL) != 0) return -1; - - if (txn_commit_stmt(txn, request) == 0) { - if (is_confirm) - txn_limbo_read_confirm(&txn_limbo, lsn); - else - txn_limbo_read_rollback(&txn_limbo, lsn); - return 0; - } else { + if (txn_commit_stmt(txn, request) != 0) return -1; - } + return txn_limbo_process(&txn_limbo, &syn_req); } static int @@ -324,8 +305,7 @@ apply_row(struct xrow_header *row) struct request request; if (iproto_type_is_synchro_request(row->type)) { request.header = row; - return process_confirm_rollback(&request, - row->type == IPROTO_CONFIRM); + return process_synchro_row(&request); } if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) return -1; diff --git a/src/box/box.cc b/src/box/box.cc index 83eef5d98..8e811e9c1 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -367,22 +367,11 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) { struct request request; if (iproto_type_is_synchro_request(row->type)) { - uint32_t replica_id; - int64_t lsn; - switch(row->type) { - case IPROTO_CONFIRM: - if (xrow_decode_confirm(row, &replica_id, &lsn) < 0) - diag_raise(); - assert(txn_limbo.instance_id == replica_id); - txn_limbo_read_confirm(&txn_limbo, lsn); - break; - case IPROTO_ROLLBACK: - if (xrow_decode_rollback(row, &replica_id, &lsn) < 0) - diag_raise(); - assert(txn_limbo.instance_id == replica_id); - txn_limbo_read_rollback(&txn_limbo, lsn); - break; - } + struct synchro_request syn_req; + if (xrow_decode_synchro(row, &syn_req) != 0) + diag_raise(); + if (txn_limbo_process(&txn_limbo, &syn_req) != 0) + diag_raise(); return; } xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index a2043c17a..944161c30 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -31,6 +31,7 @@ #include "txn.h" #include "txn_limbo.h" #include "replication.h" +#include "iproto_constants.h" struct txn_limbo txn_limbo; @@ -272,11 +273,15 @@ complete: } static void -txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn, - bool is_confirm) +txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn) { assert(lsn > 0); + struct synchro_request req; + req.type = type; + req.replica_id = limbo->instance_id; + req.lsn = lsn; + struct xrow_header row; struct request request = { .header = &row, @@ -286,19 +291,7 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn, if (txn == NULL) goto rollback; - int res = 0; - if (is_confirm) { - res = xrow_encode_confirm(&row, &txn->region, - limbo->instance_id, lsn); - } else { - /* - * This LSN is the first to be rolled back, so - * the last "safe" lsn is lsn - 1. - */ - res = xrow_encode_rollback(&row, &txn->region, - limbo->instance_id, lsn); - } - if (res == -1) + if (xrow_encode_synchro(&row, &txn->region, &req) != 0) goto rollback; /* * This is not really a transaction. It just uses txn API @@ -325,7 +318,7 @@ rollback: * problems are fixed. Or retry automatically with some period. */ panic("Could not write a synchro request to WAL: lsn = %lld, type = " - "%s\n", lsn, is_confirm ? "CONFIRM" : "ROLLBACK"); + "%s\n", lsn, iproto_type_name(type)); } /** @@ -338,10 +331,11 @@ txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn) assert(lsn > limbo->confirmed_lsn); assert(!limbo->is_in_rollback); limbo->confirmed_lsn = lsn; - txn_limbo_write_confirm_rollback(limbo, lsn, true); + txn_limbo_write_synchro(limbo, IPROTO_CONFIRM, lsn); } -void +/** Confirm all the entries <= @a lsn. */ +static void txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) { assert(limbo->instance_id != REPLICA_ID_NIL); @@ -390,11 +384,12 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn) assert(lsn > limbo->confirmed_lsn); assert(!limbo->is_in_rollback); limbo->is_in_rollback = true; - txn_limbo_write_confirm_rollback(limbo, lsn, false); + txn_limbo_write_synchro(limbo, IPROTO_ROLLBACK, lsn); limbo->is_in_rollback = false; } -void +/** Rollback all the entries >= @a lsn. */ +static void txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn) { assert(limbo->instance_id != REPLICA_ID_NIL); @@ -577,6 +572,27 @@ complete: return 0; } +int +txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) +{ + if (req->replica_id != limbo->instance_id) { + diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, req->replica_id, + limbo->instance_id); + return -1; + } + switch (req->type) { + case IPROTO_CONFIRM: + txn_limbo_read_confirm(limbo, req->lsn); + break; + case IPROTO_ROLLBACK: + txn_limbo_read_rollback(limbo, req->lsn); + break; + default: + unreachable(); + } + return 0; +} + void txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn) { diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index 04ee7ea5c..eaf662987 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -39,6 +39,7 @@ extern "C" { #endif /* defined(__cplusplus) */ struct txn; +struct synchro_request; /** * Transaction and its quorum metadata, to be stored in limbo. @@ -245,17 +246,9 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); int txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); -/** - * Confirm all the entries up to the given master's LSN. - */ -void -txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn); - -/** - * Rollback all the entries starting with given master's LSN. - */ -void -txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn); +/** Execute a synchronous replication request. */ +int +txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req); /** * Waiting for confirmation of all "sync" transactions diff --git a/src/box/xrow.c b/src/box/xrow.c index 0c797a9d5..4b5d4356f 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -893,13 +893,13 @@ xrow_encode_dml(const struct request *request, struct region *region, return iovcnt; } -static int -xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region, - uint32_t replica_id, int64_t lsn, int type) +int +xrow_encode_synchro(struct xrow_header *row, struct region *region, + const struct synchro_request *req) { size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) + - mp_sizeof_uint(replica_id) + mp_sizeof_uint(IPROTO_LSN) + - mp_sizeof_uint(lsn); + mp_sizeof_uint(req->replica_id) + + mp_sizeof_uint(IPROTO_LSN) + mp_sizeof_uint(req->lsn); char *buf = (char *)region_alloc(region, len); if (buf == NULL) { diag_set(OutOfMemory, len, "region_alloc", "buf"); @@ -909,9 +909,9 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region, pos = mp_encode_map(pos, 2); pos = mp_encode_uint(pos, IPROTO_REPLICA_ID); - pos = mp_encode_uint(pos, replica_id); + pos = mp_encode_uint(pos, req->replica_id); pos = mp_encode_uint(pos, IPROTO_LSN); - pos = mp_encode_uint(pos, lsn); + pos = mp_encode_uint(pos, req->lsn); memset(row, 0, sizeof(*row)); @@ -919,30 +919,13 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region, row->body[0].iov_len = len; row->bodycnt = 1; - row->type = type; + row->type = req->type; return 0; } int -xrow_encode_confirm(struct xrow_header *row, struct region *region, - uint32_t replica_id, int64_t lsn) -{ - return xrow_encode_confirm_rollback(row, region, replica_id, lsn, - IPROTO_CONFIRM); -} - -int -xrow_encode_rollback(struct xrow_header *row, struct region *region, - uint32_t replica_id, int64_t lsn) -{ - return xrow_encode_confirm_rollback(row, region, replica_id, lsn, - IPROTO_ROLLBACK); -} - -static int -xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id, - int64_t *lsn) +xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req) { if (row->bodycnt == 0) { diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); @@ -977,30 +960,19 @@ xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id, } switch (key) { case IPROTO_REPLICA_ID: - *replica_id = mp_decode_uint(&d); + req->replica_id = mp_decode_uint(&d); break; case IPROTO_LSN: - *lsn = mp_decode_uint(&d); + req->lsn = mp_decode_uint(&d); break; default: mp_next(&d); } } + req->type = row->type; return 0; } -int -xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn) -{ - return xrow_decode_confirm_rollback(row, replica_id, lsn); -} - -int -xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn) -{ - return xrow_decode_confirm_rollback(row, replica_id, lsn); -} - int xrow_to_iovec(const struct xrow_header *row, struct iovec *out) { diff --git a/src/box/xrow.h b/src/box/xrow.h index e21ede5a3..02dca74e5 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -216,54 +216,51 @@ xrow_encode_dml(const struct request *request, struct region *region, struct iovec *iov); /** - * Encode the CONFIRM to row body and set row type to - * IPROTO_CONFIRM. - * @param row xrow header. - * @param region Region to use to encode the confirmation body. - * @param replica_id master's instance id. - * @param lsn last confirmed lsn. - * @retval -1 on error. - * @retval 0 success. + * Synchronous replication request - confirmation or rollback of + * pending synchronous transactions. */ -int -xrow_encode_confirm(struct xrow_header *row, struct region *region, - uint32_t replica_id, int64_t lsn); +struct synchro_request { + /** Operation type - IPROTO_ROLLBACK or IPROTO_CONFIRM. */ + uint32_t type; + /** + * ID of the instance owning the pending transactions. + * Note, it may be not the same instance, who created this + * request. An instance can make an operation on foreign + * synchronous transactions in case a new master tries to + * finish transactions of an old master. + */ + uint32_t replica_id; + /** + * Operation LSN. + * In case of CONFIRM it means 'confirm all + * transactions with lsn <= this value'. + * In case of ROLLBACK it means 'rollback all transactions + * with lsn >= this value'. + */ + int64_t lsn; +}; /** - * Decode the CONFIRM request body. + * Encode synchronous replication request. * @param row xrow header. - * @param[out] replica_id master's instance id. - * @param[out] lsn last confirmed lsn. + * @param region Region to use to encode the confirmation body. + * @param req Request parameters. * @retval -1 on error. * @retval 0 success. */ int -xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn); - -/** - * Encode the ROLLBACK row body and set row type to - * IPROTO_ROLLBACK. - * @param row xrow header. - * @param region Region to use to encode the rollback body. - * @param replica_id master's instance id. - * @param lsn lsn to rollback from, including it. - * @retval -1 on error. - * @retval 0 success. - */ -int -xrow_encode_rollback(struct xrow_header *row, struct region *region, - uint32_t replica_id, int64_t lsn); +xrow_encode_synchro(struct xrow_header *row, struct region *region, + const struct synchro_request *req); /** - * Decode the ROLLBACK row body. + * Decode synchronous replication request. * @param row xrow header. - * @param[out] replica_id master's instance id. - * @param[out] lsn lsn to rollback from, including it. + * @param[out] req Request parameters. * @retval -1 on error. * @retval 0 success. */ int -xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn); +xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req); /** * CALL/EVAL request. -- 2.21.1 (Apple Git-122.3)
next reply other threads:[~2020-08-13 21:58 UTC|newest] Thread overview: 6+ messages / expand[flat|nested] mbox.gz Atom feed top 2020-08-13 21:58 Vladislav Shpilevoy [this message] 2020-08-13 22:17 ` Cyrill Gorcunov 2020-08-15 13:24 ` Vladislav Shpilevoy 2020-08-15 13:38 ` Cyrill Gorcunov 2020-08-17 15:51 ` Serge Petrenko 2020-08-17 20:22 ` Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=c7e18f7cf0be76d28983d036b8c6cb58062499cc.1597355824.git.v.shpilevoy@tarantool.org \ --to=v.shpilevoy@tarantool.org \ --cc=gorcunov@gmail.com \ --cc=sergepetrenko@tarantool.org \ --cc=tarantool-patches@dev.tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH 1/1] xrow: introduce struct synchro_request' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox