From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> To: tarantool-patches@dev.tarantool.org, sergepetrenko@tarantool.org Subject: [Tarantool-patches] [PATCH v2 10/19] txn_limbo: add ROLLBACK processing Date: Tue, 30 Jun 2020 01:15:11 +0200 [thread overview] Message-ID: <d6cf4d6c5382b0248fb86e470d05dece2e292360.1593472477.git.v.shpilevoy@tarantool.org> (raw) In-Reply-To: <cover.1593472477.git.v.shpilevoy@tarantool.org> From: Serge Petrenko <sergepetrenko@tarantool.org> Now txn_limbo writes a ROLLBACK entry to WAL when one of the limbo entries fails to gather quorum during a txn_limbo_confirm_timeout. All the limbo entries, starting with the failed one, are rolled back in reverse order. Closes #4848 --- src/box/applier.cc | 38 +++++++++---- src/box/box.cc | 2 +- src/box/errcode.h | 2 + src/box/relay.cc | 2 +- src/box/txn.c | 19 +++++-- src/box/txn_limbo.c | 124 ++++++++++++++++++++++++++++++++++++++---- src/box/txn_limbo.h | 12 +++- test/box/error.result | 2 + 8 files changed, 173 insertions(+), 28 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 1b9ea2f71..fbb452dc0 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -256,19 +256,25 @@ process_nop(struct request *request) } /* - * CONFIRM rows aren't dml requests and require special + * CONFIRM/ROLLBACK rows aren't dml requests and require special * handling: instead of performing some operations on spaces, - * processing these requests required txn_limbo to confirm some - * of its entries. + * processing these requests requires txn_limbo to either confirm + * or rollback some of its entries. */ static int -process_confirm(struct request *request) +process_confirm_rollback(struct request *request, bool is_confirm) { - assert(request->header->type == IPROTO_CONFIRM); + assert(iproto_type_is_synchro_request(request->header->type)); uint32_t replica_id; struct txn *txn = in_txn(); int64_t lsn = 0; - if (xrow_decode_confirm(request->header, &replica_id, &lsn) != 0) + + int res = 0; + if (is_confirm) + res = xrow_decode_confirm(request->header, &replica_id, &lsn); + else + res = xrow_decode_rollback(request->header, &replica_id, &lsn); + if (res == -1) return -1; if (replica_id != txn_limbo.instance_id) { @@ -281,7 +287,10 @@ process_confirm(struct request *request) return -1; if (txn_commit_stmt(txn, request) == 0) { - txn_limbo_read_confirm(&txn_limbo, lsn); + if (is_confirm) + txn_limbo_read_confirm(&txn_limbo, lsn); + else + txn_limbo_read_rollback(&txn_limbo, lsn); return 0; } else { return -1; @@ -292,9 +301,10 @@ static int apply_row(struct xrow_header *row) { struct request request; - if (row->type == IPROTO_CONFIRM) { + if (iproto_type_is_synchro_request(row->type)) { request.header = row; - return process_confirm(&request); + return process_confirm_rollback(&request, + row->type == IPROTO_CONFIRM); } if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) return -1; @@ -317,7 +327,7 @@ apply_final_join_row(struct xrow_header *row) * Confirms are ignored during join. All the data master * sends us is valid. */ - if (row->type == IPROTO_CONFIRM) + if (iproto_type_is_synchro_request(row->type)) return 0; struct txn *txn = txn_begin(); if (txn == NULL) @@ -746,6 +756,14 @@ static int applier_txn_rollback_cb(struct trigger *trigger, void *event) { (void) trigger; + struct txn *txn = (struct txn *) event; + /* + * Synchronous transaction rollback due to receiving a + * ROLLBACK entry is a normal event and requires no + * special handling. + */ + if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK) + return 0; /* * Setup shared applier diagnostic area. diff --git a/src/box/box.cc b/src/box/box.cc index ba7347367..d6ef6351b 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -343,7 +343,7 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) { struct request request; // TODO: process confirmation during recovery. - if (row->type == IPROTO_CONFIRM) + if (iproto_type_is_synchro_request(row->type)) return; xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); if (request.type != IPROTO_NOP) { diff --git a/src/box/errcode.h b/src/box/errcode.h index 3ba6866e5..ea521aa07 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -268,6 +268,8 @@ struct errcode_record { /*213 */_(ER_NO_SUCH_SESSION_SETTING, "Session setting %s doesn't exist") \ /*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \ /*215 */_(ER_SYNC_MASTER_MISMATCH, "CONFIRM message arrived for an unknown master id %d, expected %d") \ + /*216 */_(ER_SYNC_QUORUM_TIMEOUT, "Quorum collection for a synchronous transaction is timed out") \ + /*217 */_(ER_SYNC_ROLLBACK, "A rollback for a synchronous transaction is received") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/box/relay.cc b/src/box/relay.cc index 0adc9fc98..29588b6ca 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -772,7 +772,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) { struct relay *relay = container_of(stream, struct relay, stream); assert(iproto_type_is_dml(packet->type) || - packet->type == IPROTO_CONFIRM); + iproto_type_is_synchro_request(packet->type)); if (packet->group_id == GROUP_LOCAL) { /* * We do not relay replica-local rows to other diff --git a/src/box/txn.c b/src/box/txn.c index 612cd19bc..37955752a 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -83,10 +83,10 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request) struct space *space = stmt->space; row->group_id = space != NULL ? space_group_id(space) : 0; /* - * IPROTO_CONFIRM entries are supplementary and aren't - * valid dml requests. They're encoded manually. + * Sychronous replication entries are supplementary and + * aren't valid dml requests. They're encoded manually. */ - if (likely(row->type != IPROTO_CONFIRM)) + if (likely(!iproto_type_is_synchro_request(row->type))) row->bodycnt = xrow_encode_dml(request, &txn->region, row->body); if (row->bodycnt < 0) return -1; @@ -490,6 +490,14 @@ void txn_complete_async(struct journal_entry *entry) { struct txn *txn = entry->complete_data; + /* + * txn_limbo has already rolled the tx back, so we just + * have to free it. + */ + if (txn->signature < TXN_SIGNATURE_ROLLBACK) { + txn_free(txn); + return; + } txn->signature = entry->res; /* * Some commit/rollback triggers require for in_txn fiber @@ -765,7 +773,10 @@ txn_commit(struct txn *txn) if (is_sync) { txn_limbo_assign_lsn(&txn_limbo, limbo_entry, req->rows[req->n_rows - 1]->lsn); - txn_limbo_wait_complete(&txn_limbo, limbo_entry); + if (txn_limbo_wait_complete(&txn_limbo, limbo_entry) < 0) { + txn_free(txn); + return -1; + } } if (!txn_has_flag(txn, TXN_IS_DONE)) { txn->signature = req->res; diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index ac57fd1bd..680e81d3d 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -84,6 +84,16 @@ txn_limbo_remove(struct txn_limbo *limbo, struct txn_limbo_entry *entry) rlist_del_entry(entry, in_queue); } +static inline void +txn_limbo_pop(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +{ + assert(!rlist_empty(&entry->in_queue)); + assert(rlist_last_entry(&limbo->queue, struct txn_limbo_entry, + in_queue) == entry); + (void) limbo; + rlist_del_entry(entry, in_queue); +} + void txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry) { @@ -118,7 +128,11 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) return entry->is_commit; } -void +static int +txn_limbo_write_rollback(struct txn_limbo *limbo, + struct txn_limbo_entry *entry); + +int txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) { struct txn *txn = entry->txn; @@ -127,33 +141,64 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) assert(txn_has_flag(txn, TXN_WAIT_ACK)); if (txn_limbo_check_complete(limbo, entry)) { txn_limbo_remove(limbo, entry); - return; + return 0; } bool cancellable = fiber_set_cancellable(false); bool timed_out = fiber_yield_timeout(txn_limbo_confirm_timeout(limbo)); fiber_set_cancellable(cancellable); if (timed_out) { - // TODO: implement rollback. - entry->is_rollback = true; + txn_limbo_write_rollback(limbo, entry); + struct txn_limbo_entry *e, *tmp; + rlist_foreach_entry_safe_reverse(e, &limbo->queue, + in_queue, tmp) { + e->is_rollback = true; + e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT; + txn_limbo_pop(limbo, e); + txn_clear_flag(e->txn, TXN_WAIT_ACK); + txn_complete(e->txn); + if (e == entry) + break; + fiber_wakeup(e->txn->fiber); + } + diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT); + return -1; } assert(txn_limbo_entry_is_complete(entry)); + /* + * The first tx to be rolled back already performed all + * the necessary cleanups for us. + */ + if (entry->is_rollback) { + diag_set(ClientError, ER_SYNC_ROLLBACK); + return -1; + } txn_limbo_remove(limbo, entry); txn_clear_flag(txn, TXN_WAIT_ACK); + return 0; } -/** - * Write a confirmation entry to WAL. After it's written all the - * transactions waiting for confirmation may be finished. - */ static int -txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, + struct txn_limbo_entry *entry, + bool is_confirm) { struct xrow_header row; struct request request = { .header = &row, }; - if (xrow_encode_confirm(&row, limbo->instance_id, entry->lsn) < 0) + int res = 0; + if (is_confirm) { + res = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn); + } else { + /* + * This entry is the first to be rolled back, so + * the last "safe" lsn is entry->lsn - 1. + */ + res = xrow_encode_rollback(&row, limbo->instance_id, + entry->lsn - 1); + } + if (res == -1) return -1; struct txn *txn = txn_begin(); @@ -171,6 +216,17 @@ rollback: return -1; } +/** + * Write a confirmation entry to WAL. After it's written all the + * transactions waiting for confirmation may be finished. + */ +static int +txn_limbo_write_confirm(struct txn_limbo *limbo, + struct txn_limbo_entry *entry) +{ + return txn_limbo_write_confirm_rollback(limbo, entry, true); +} + void txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) { @@ -194,6 +250,49 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) } } +/** + * Write a rollback message to WAL. After it's written + * all the tarnsactions following the current one and waiting + * for confirmation must be rolled back. + */ +static int +txn_limbo_write_rollback(struct txn_limbo *limbo, + struct txn_limbo_entry *entry) +{ + return txn_limbo_write_confirm_rollback(limbo, entry, false); +} + +void +txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn) +{ + assert(limbo->instance_id != REPLICA_ID_NIL && + limbo->instance_id != instance_id); + struct txn_limbo_entry *e, *tmp; + rlist_foreach_entry_safe_reverse(e, &limbo->queue, in_queue, tmp) { + if (e->lsn <= lsn) + break; + e->is_rollback = true; + txn_limbo_pop(limbo, e); + txn_clear_flag(e->txn, TXN_WAIT_ACK); + if (e->txn->signature >= 0) { + /* Rollback the transaction. */ + e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK; + txn_complete(e->txn); + } else { + /* + * Rollback the transaction, but don't + * free it yet. txn_complete_async() will + * free it. + */ + e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK; + struct fiber *fiber = e->txn->fiber; + e->txn->fiber = fiber(); + txn_complete(e->txn); + e->txn->fiber = fiber; + } + } +} + void txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) { @@ -217,7 +316,10 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) } if (last_quorum != NULL) { if (txn_limbo_write_confirm(limbo, last_quorum) != 0) { - // TODO: rollback. + // TODO: what to do here?. + // We already failed writing the CONFIRM + // message. What are the chances we'll be + // able to write ROLLBACK? return; } /* diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index 94f224131..138093c7c 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -156,8 +156,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); /** * Block the current fiber until the transaction in the limbo * entry is either committed or rolled back. + * If timeout is reached before acks are collected, the tx is + * rolled back as well as all the txs in the limbo following it. + * Returns -1 when rollback was performed and tx has to be freed. + * 0 when tx processing can go on. */ -void +int txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); /** @@ -166,6 +170,12 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); void txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn); +/** + * Rollback all the entries starting with given master's LSN. + */ +void +txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn); + /** * Return TRUE if limbo is empty. */ diff --git a/test/box/error.result b/test/box/error.result index 34ded3930..8241ec1a8 100644 --- a/test/box/error.result +++ b/test/box/error.result @@ -434,6 +434,8 @@ t; | 213: box.error.NO_SUCH_SESSION_SETTING | 214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS | 215: box.error.SYNC_MASTER_MISMATCH + | 216: box.error.SYNC_QUORUM_TIMEOUT + | 217: box.error.SYNC_ROLLBACK | ... test_run:cmd("setopt delimiter ''"); -- 2.21.1 (Apple Git-122.3)
next prev parent reply other threads:[~2020-06-29 23:15 UTC|newest] Thread overview: 68+ messages / expand[flat|nested] mbox.gz Atom feed top [not found] <cover.1593723973.git.sergeyb@tarantool.org> 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 00/19] Sync replication Vladislav Shpilevoy 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 01/19] replication: introduce space.is_sync option Vladislav Shpilevoy 2020-06-30 23:00 ` Vladislav Shpilevoy 2020-07-01 15:55 ` Sergey Ostanevich 2020-07-01 23:46 ` Vladislav Shpilevoy 2020-07-02 8:25 ` Serge Petrenko 2020-06-29 23:15 ` Vladislav Shpilevoy [this message] 2020-07-05 15:29 ` [Tarantool-patches] [PATCH v2 10/19] txn_limbo: add ROLLBACK processing Vladislav Shpilevoy 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 11/19] box: rework local_recovery to use async txn_commit Vladislav Shpilevoy 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 12/19] replication: support ROLLBACK and CONFIRM during recovery Vladislav Shpilevoy 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 13/19] replication: add test for synchro CONFIRM/ROLLBACK Vladislav Shpilevoy 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 14/19] applier: remove writer_cond Vladislav Shpilevoy 2020-07-02 9:13 ` Serge Petrenko 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 15/19] applier: send heartbeat not only on commit, but on any write Vladislav Shpilevoy 2020-07-01 23:55 ` Vladislav Shpilevoy 2020-07-03 12:23 ` Serge Petrenko 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 16/19] txn_limbo: add diag_set in txn_limbo_wait_confirm Vladislav Shpilevoy 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 17/19] replication: delay initial join until confirmation Vladislav Shpilevoy 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 18/19] replication: only send confirmed data during final join Vladislav Shpilevoy 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 19/19] replication: block async transactions when not empty limbo Vladislav Shpilevoy 2020-07-01 17:12 ` Sergey Ostanevich 2020-07-01 23:47 ` Vladislav Shpilevoy 2020-07-03 12:28 ` Serge Petrenko 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 02/19] replication: introduce replication_synchro_* cfg options Vladislav Shpilevoy 2020-07-01 16:05 ` Sergey Ostanevich 2020-07-01 23:46 ` Vladislav Shpilevoy 2020-07-02 8:29 ` Serge Petrenko 2020-07-02 23:36 ` Vladislav Shpilevoy 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 03/19] txn: add TXN_WAIT_ACK flag Vladislav Shpilevoy 2020-07-01 17:14 ` Sergey Ostanevich 2020-07-01 23:46 ` Vladislav Shpilevoy 2020-07-02 8:30 ` Serge Petrenko 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 04/19] replication: make sync transactions wait quorum Vladislav Shpilevoy 2020-06-30 23:00 ` Vladislav Shpilevoy 2020-07-02 8:48 ` Serge Petrenko 2020-07-03 21:16 ` Vladislav Shpilevoy 2020-07-05 16:05 ` Vladislav Shpilevoy 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 05/19] xrow: introduce CONFIRM and ROLLBACK entries Vladislav Shpilevoy 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 06/19] txn: introduce various reasons for txn rollback Vladislav Shpilevoy 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 07/19] replication: write and read CONFIRM entries Vladislav Shpilevoy 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 08/19] replication: add support of qsync to the snapshot machinery Vladislav Shpilevoy 2020-07-02 8:52 ` Serge Petrenko 2020-07-08 11:43 ` Leonid Vasiliev 2020-06-29 23:15 ` [Tarantool-patches] [PATCH v2 09/19] txn_limbo: add timeout when waiting for acks Vladislav Shpilevoy 2020-06-29 23:22 ` [Tarantool-patches] [PATCH v2 00/19] Sync replication Vladislav Shpilevoy 2020-06-30 23:00 ` [Tarantool-patches] [PATCH v2 20/19] replication: add test for quorum 1 Vladislav Shpilevoy 2020-07-03 12:32 ` Serge Petrenko 2020-07-02 21:13 ` [Tarantool-patches] [PATCH 1/4] replication: regression test on gh-5119 [not fixed] sergeyb 2020-07-02 21:13 ` [Tarantool-patches] [PATCH 2/4] replication: add advanced tests for sync replication sergeyb 2020-07-02 22:46 ` Sergey Bronnikov 2020-07-02 23:20 ` Vladislav Shpilevoy 2020-07-06 12:30 ` Sergey Bronnikov 2020-07-06 23:31 ` Vladislav Shpilevoy 2020-07-07 12:12 ` Sergey Bronnikov 2020-07-07 20:57 ` Vladislav Shpilevoy 2020-07-08 12:07 ` Sergey Bronnikov 2020-07-08 22:13 ` Vladislav Shpilevoy 2020-07-09 9:39 ` Sergey Bronnikov 2020-07-02 21:13 ` [Tarantool-patches] [PATCH 3/4] replication: add tests for sync replication with anon replica sergeyb 2020-07-06 23:31 ` Vladislav Shpilevoy 2020-07-02 21:13 ` [Tarantool-patches] [PATCH 4/4] replication: add tests for sync replication with snapshots sergeyb 2020-07-02 22:46 ` Sergey Bronnikov 2020-07-02 23:20 ` Vladislav Shpilevoy 2020-07-06 23:31 ` Vladislav Shpilevoy 2020-07-07 16:00 ` Sergey Bronnikov 2020-07-06 23:31 ` [Tarantool-patches] [PATCH] Add new error injection constant ERRINJ_SYNC_TIMEOUT Vladislav Shpilevoy 2020-07-10 0:50 ` [Tarantool-patches] [PATCH v2 00/19] Sync replication Vladislav Shpilevoy 2020-07-10 7:40 ` Kirill Yukhin
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=d6cf4d6c5382b0248fb86e470d05dece2e292360.1593472477.git.v.shpilevoy@tarantool.org \ --to=v.shpilevoy@tarantool.org \ --cc=sergepetrenko@tarantool.org \ --cc=tarantool-patches@dev.tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v2 10/19] txn_limbo: add ROLLBACK processing' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox