From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp53.i.mail.ru (smtp53.i.mail.ru [94.100.177.113]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 5DDC542EF5F for ; Thu, 18 Jun 2020 15:14:24 +0300 (MSK) From: Serge Petrenko Date: Thu, 18 Jun 2020 15:14:03 +0300 Message-Id: <16c9d1ffb9d09bb2b2f206a23973e2734616c345.1592482315.git.sergepetrenko@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH 4/4] txn_limbo: add ROLLBACK processing List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: v.shpilevoy@tarantool.org, gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org Now txn_limbo writes a ROLLBACK entry to WAL when one of the limbo entries fails to gather quorum during a txn_limbo_confirm_timeout. All the limbo entries, starting with the failed one, are rolled back in reverse order. Closes #4848 --- src/box/applier.cc | 40 ++++++++++++---- src/box/relay.cc | 2 +- src/box/txn.c | 5 +- src/box/txn_limbo.c | 110 +++++++++++++++++++++++++++++++++++++++----- src/box/txn_limbo.h | 12 ++++- 5 files changed, 146 insertions(+), 23 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index ad2ee18a5..872372f62 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -260,7 +260,7 @@ process_nop(struct request *request) * Confirms some of the txs waiting in txn_limbo. */ static int -applier_on_confirm(struct trigger *trig, void *data) +applier_on_confirm_written(struct trigger *trig, void *data) { (void) trig; int64_t lsn = *(int64_t *)data; @@ -268,10 +268,23 @@ applier_on_confirm(struct trigger *trig, void *data) return 0; } +/* + * An on_commit trigger set on a txn containing a ROLLBACK entry. + * Rolls back part of the txs waiting in limbo. + */ static int -process_confirm(struct request *request) +applier_on_rollback_written(struct trigger *trig, void *data) { - assert(request->header->type == IPROTO_CONFIRM); + (void) trig; + int64_t lsn = *(int64_t *)data; + txn_limbo_read_rollback(&txn_limbo, lsn); + return 0; +} + +static int +process_confirm_rollback(struct request *request, bool is_confirm) +{ + assert(iproto_type_is_synchro_request(request->header->type)); uint32_t replica_id; struct txn *txn = in_txn(); size_t size; @@ -280,8 +293,14 @@ process_confirm(struct request *request) diag_set(OutOfMemory, size, "region_alloc_object", "lsn"); return -1; } - if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0) + int res = 0; + if (is_confirm) + res = xrow_decode_confirm(request->header, &replica_id, lsn); + else + res = xrow_decode_rollback(request->header, &replica_id, lsn); + if (res == -1) return -1; + /* * on_commit trigger failure is not allowed, so check for * instance id early. @@ -294,7 +313,7 @@ process_confirm(struct request *request) /* * Set an on_commit trigger which will perform the actual - * confirmation processing. + * confirmation/rollback processing. */ struct trigger *trig = region_alloc_object(&txn->region, typeof(*trig), &size); @@ -302,7 +321,9 @@ process_confirm(struct request *request) diag_set(OutOfMemory, size, "region_alloc_object", "trig"); return -1; } - trigger_create(trig, applier_on_confirm, lsn, NULL); + trigger_create(trig, is_confirm ? applier_on_confirm_written : + applier_on_rollback_written, + lsn, NULL); if (txn_begin_stmt(txn, NULL) != 0) return -1; @@ -319,9 +340,10 @@ static int apply_row(struct xrow_header *row) { struct request request; - if (row->type == IPROTO_CONFIRM) { + if (iproto_type_is_synchro_request(row->type)) { request.header = row; - return process_confirm(&request); + return process_confirm_rollback(&request, + row->type == IPROTO_CONFIRM); } if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) return -1; @@ -344,7 +366,7 @@ apply_final_join_row(struct xrow_header *row) * Confirms are ignored during join. All the data master * sends us is valid. */ - if (row->type == IPROTO_CONFIRM) + if (iproto_type_is_synchro_request(row->type)) return 0; struct txn *txn = txn_begin(); if (txn == NULL) diff --git a/src/box/relay.cc b/src/box/relay.cc index 0adc9fc98..29588b6ca 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -772,7 +772,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) { struct relay *relay = container_of(stream, struct relay, stream); assert(iproto_type_is_dml(packet->type) || - packet->type == IPROTO_CONFIRM); + iproto_type_is_synchro_request(packet->type)); if (packet->group_id == GROUP_LOCAL) { /* * We do not relay replica-local rows to other diff --git a/src/box/txn.c b/src/box/txn.c index 4f787db79..484b822db 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -751,7 +751,10 @@ txn_commit(struct txn *txn) if (is_sync) { txn_limbo_assign_lsn(&txn_limbo, limbo_entry, req->rows[req->n_rows - 1]->lsn); - txn_limbo_wait_complete(&txn_limbo, limbo_entry); + if (txn_limbo_wait_complete(&txn_limbo, limbo_entry) < 0) { + txn_free(txn); + return -1; + } } if (!txn_has_flag(txn, TXN_IS_DONE)) { txn->signature = req->res; diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index a715a136e..c55f5bda1 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -84,6 +84,16 @@ txn_limbo_remove(struct txn_limbo *limbo, struct txn_limbo_entry *entry) rlist_del_entry(entry, in_queue); } +static inline void +txn_limbo_pop(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +{ + assert(!rlist_empty(&entry->in_queue)); + assert(rlist_last_entry(&limbo->queue, struct txn_limbo_entry, + in_queue) == entry); + (void) limbo; + rlist_del_entry(entry, in_queue); +} + void txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry) { @@ -116,7 +126,11 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) return entry->is_commit; } -void +static int +txn_limbo_write_rollback(struct txn_limbo *limbo, + struct txn_limbo_entry *entry); + +int txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) { struct txn *txn = entry->txn; @@ -125,33 +139,61 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) assert(txn_has_flag(txn, TXN_WAIT_ACK)); if (txn_limbo_check_complete(limbo, entry)) { txn_limbo_remove(limbo, entry); - return; + return 0; } bool cancellable = fiber_set_cancellable(false); bool timed_out = fiber_yield_timeout(txn_limbo_confirm_timeout(limbo)); fiber_set_cancellable(cancellable); if (timed_out) { - // TODO: implement rollback. - entry->is_rollback = true; + txn_limbo_write_rollback(limbo, entry); + struct txn_limbo_entry *e, *tmp; + rlist_foreach_entry_safe_reverse(e, &limbo->queue, + in_queue, tmp) { + e->is_rollback = true; + e->txn->signature = -1; + txn_limbo_pop(limbo, e); + txn_clear_flag(e->txn, TXN_WAIT_ACK); + txn_complete(e->txn); + if (e == entry) + break; + fiber_wakeup(e->txn->fiber); + } + return -1; } assert(txn_limbo_entry_is_complete(entry)); + /* + * The first tx to be rolled back already performed all + * the necessary cleanups for us. + */ + if (entry->is_rollback) + return -1; txn_limbo_remove(limbo, entry); txn_clear_flag(txn, TXN_WAIT_ACK); + return 0; } -/** - * Write a confirmation entry to WAL. After it's written all the - * transactions waiting for confirmation may be finished. - */ static int -txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, + struct txn_limbo_entry *entry, + bool is_confirm) { struct xrow_header row; struct request request = { .header = &row, }; - if (xrow_encode_confirm(&row, limbo->instance_id, entry->lsn) < 0) + int res = 0; + if (is_confirm) { + res = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn); + } else { + /* + * This entry is the first to be rolled back, so + * the last "safe" lsn is entry->lsn - 1. + */ + res = xrow_encode_rollback(&row, limbo->instance_id, + entry->lsn - 1); + } + if (res == -1) return -1; struct txn *txn = txn_begin(); @@ -169,6 +211,17 @@ rollback: return -1; } +/** + * Write a confirmation entry to WAL. After it's written all the + * transactions waiting for confirmation may be finished. + */ +static int +txn_limbo_write_confirm(struct txn_limbo *limbo, + struct txn_limbo_entry *entry) +{ + return txn_limbo_write_confirm_rollback(limbo, entry, true); +} + void txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) { @@ -191,6 +244,38 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) } } +/** + * Write a rollback message to WAL. After it's written + * all the tarnsactions following the current one and waiting + * for confirmation must be rolled back. + */ +static int +txn_limbo_write_rollback(struct txn_limbo *limbo, + struct txn_limbo_entry *entry) +{ + return txn_limbo_write_confirm_rollback(limbo, entry, false); +} + +void +txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn) +{ + assert(limbo->instance_id != REPLICA_ID_NIL && + limbo->instance_id != instance_id); + struct txn_limbo_entry *e, *tmp; + rlist_foreach_entry_safe_reverse(e, &limbo->queue, in_queue, tmp) { + if (e->lsn < lsn) + break; + assert(e->txn->fiber == NULL); + e->is_rollback = true; + txn_limbo_pop(limbo, e); + txn_clear_flag(e->txn, TXN_WAIT_ACK); + + /* Rollback the transaction. */ + e->txn->signature = -1; + txn_complete(e->txn); + } +} + void txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) { @@ -214,7 +299,10 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) } if (last_quorum != NULL) { if (txn_limbo_write_confirm(limbo, last_quorum) != 0) { - // TODO: rollback. + // TODO: what to do here?. + // We already failed writing the CONFIRM + // message. What are the chances we'll be + // able to write ROLLBACK? return; } /* diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index 23019e5d9..987cf9271 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -156,8 +156,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); /** * Block the current fiber until the transaction in the limbo * entry is either committed or rolled back. + * If timeout is reached before acks are collected, the tx is + * rolled back as well as all the txs in the limbo following it. + * Returns -1 when rollback was performed and tx has to be freed. + * 0 when tx processing can go on. */ -void +int txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); /** @@ -166,6 +170,12 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); void txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn); +/** + * Rollback all the entries starting with given master's LSN. + */ +void +txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn); + /** * Return TRUE if limbo is empty. */ -- 2.24.3 (Apple Git-128)