[Tarantool-patches] [PATCH v2 10/19] txn_limbo: add ROLLBACK processing
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Tue Jun 30 02:15:11 MSK 2020
From: Serge Petrenko <sergepetrenko at tarantool.org>
Now txn_limbo writes a ROLLBACK entry to WAL when one of the limbo
entries fails to gather quorum during a txn_limbo_confirm_timeout.
All the limbo entries, starting with the failed one, are rolled back in
reverse order.
Closes #4848
---
src/box/applier.cc | 38 +++++++++----
src/box/box.cc | 2 +-
src/box/errcode.h | 2 +
src/box/relay.cc | 2 +-
src/box/txn.c | 19 +++++--
src/box/txn_limbo.c | 124 ++++++++++++++++++++++++++++++++++++++----
src/box/txn_limbo.h | 12 +++-
test/box/error.result | 2 +
8 files changed, 173 insertions(+), 28 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 1b9ea2f71..fbb452dc0 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -256,19 +256,25 @@ process_nop(struct request *request)
}
/*
- * CONFIRM rows aren't dml requests and require special
+ * CONFIRM/ROLLBACK rows aren't dml requests and require special
* handling: instead of performing some operations on spaces,
- * processing these requests required txn_limbo to confirm some
- * of its entries.
+ * processing these requests requires txn_limbo to either confirm
+ * or rollback some of its entries.
*/
static int
-process_confirm(struct request *request)
+process_confirm_rollback(struct request *request, bool is_confirm)
{
- assert(request->header->type == IPROTO_CONFIRM);
+ assert(iproto_type_is_synchro_request(request->header->type));
uint32_t replica_id;
struct txn *txn = in_txn();
int64_t lsn = 0;
- if (xrow_decode_confirm(request->header, &replica_id, &lsn) != 0)
+
+ int res = 0;
+ if (is_confirm)
+ res = xrow_decode_confirm(request->header, &replica_id, &lsn);
+ else
+ res = xrow_decode_rollback(request->header, &replica_id, &lsn);
+ if (res == -1)
return -1;
if (replica_id != txn_limbo.instance_id) {
@@ -281,7 +287,10 @@ process_confirm(struct request *request)
return -1;
if (txn_commit_stmt(txn, request) == 0) {
- txn_limbo_read_confirm(&txn_limbo, lsn);
+ if (is_confirm)
+ txn_limbo_read_confirm(&txn_limbo, lsn);
+ else
+ txn_limbo_read_rollback(&txn_limbo, lsn);
return 0;
} else {
return -1;
@@ -292,9 +301,10 @@ static int
apply_row(struct xrow_header *row)
{
struct request request;
- if (row->type == IPROTO_CONFIRM) {
+ if (iproto_type_is_synchro_request(row->type)) {
request.header = row;
- return process_confirm(&request);
+ return process_confirm_rollback(&request,
+ row->type == IPROTO_CONFIRM);
}
if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
return -1;
@@ -317,7 +327,7 @@ apply_final_join_row(struct xrow_header *row)
* Confirms are ignored during join. All the data master
* sends us is valid.
*/
- if (row->type == IPROTO_CONFIRM)
+ if (iproto_type_is_synchro_request(row->type))
return 0;
struct txn *txn = txn_begin();
if (txn == NULL)
@@ -746,6 +756,14 @@ static int
applier_txn_rollback_cb(struct trigger *trigger, void *event)
{
(void) trigger;
+ struct txn *txn = (struct txn *) event;
+ /*
+ * Synchronous transaction rollback due to receiving a
+ * ROLLBACK entry is a normal event and requires no
+ * special handling.
+ */
+ if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK)
+ return 0;
/*
* Setup shared applier diagnostic area.
diff --git a/src/box/box.cc b/src/box/box.cc
index ba7347367..d6ef6351b 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -343,7 +343,7 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
{
struct request request;
// TODO: process confirmation during recovery.
- if (row->type == IPROTO_CONFIRM)
+ if (iproto_type_is_synchro_request(row->type))
return;
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
if (request.type != IPROTO_NOP) {
diff --git a/src/box/errcode.h b/src/box/errcode.h
index 3ba6866e5..ea521aa07 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -268,6 +268,8 @@ struct errcode_record {
/*213 */_(ER_NO_SUCH_SESSION_SETTING, "Session setting %s doesn't exist") \
/*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \
/*215 */_(ER_SYNC_MASTER_MISMATCH, "CONFIRM message arrived for an unknown master id %d, expected %d") \
+ /*216 */_(ER_SYNC_QUORUM_TIMEOUT, "Quorum collection for a synchronous transaction is timed out") \
+ /*217 */_(ER_SYNC_ROLLBACK, "A rollback for a synchronous transaction is received") \
/*
* !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 0adc9fc98..29588b6ca 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -772,7 +772,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
{
struct relay *relay = container_of(stream, struct relay, stream);
assert(iproto_type_is_dml(packet->type) ||
- packet->type == IPROTO_CONFIRM);
+ iproto_type_is_synchro_request(packet->type));
if (packet->group_id == GROUP_LOCAL) {
/*
* We do not relay replica-local rows to other
diff --git a/src/box/txn.c b/src/box/txn.c
index 612cd19bc..37955752a 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -83,10 +83,10 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
struct space *space = stmt->space;
row->group_id = space != NULL ? space_group_id(space) : 0;
/*
- * IPROTO_CONFIRM entries are supplementary and aren't
- * valid dml requests. They're encoded manually.
+ * Sychronous replication entries are supplementary and
+ * aren't valid dml requests. They're encoded manually.
*/
- if (likely(row->type != IPROTO_CONFIRM))
+ if (likely(!iproto_type_is_synchro_request(row->type)))
row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
if (row->bodycnt < 0)
return -1;
@@ -490,6 +490,14 @@ void
txn_complete_async(struct journal_entry *entry)
{
struct txn *txn = entry->complete_data;
+ /*
+ * txn_limbo has already rolled the tx back, so we just
+ * have to free it.
+ */
+ if (txn->signature < TXN_SIGNATURE_ROLLBACK) {
+ txn_free(txn);
+ return;
+ }
txn->signature = entry->res;
/*
* Some commit/rollback triggers require for in_txn fiber
@@ -765,7 +773,10 @@ txn_commit(struct txn *txn)
if (is_sync) {
txn_limbo_assign_lsn(&txn_limbo, limbo_entry,
req->rows[req->n_rows - 1]->lsn);
- txn_limbo_wait_complete(&txn_limbo, limbo_entry);
+ if (txn_limbo_wait_complete(&txn_limbo, limbo_entry) < 0) {
+ txn_free(txn);
+ return -1;
+ }
}
if (!txn_has_flag(txn, TXN_IS_DONE)) {
txn->signature = req->res;
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index ac57fd1bd..680e81d3d 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -84,6 +84,16 @@ txn_limbo_remove(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
rlist_del_entry(entry, in_queue);
}
+static inline void
+txn_limbo_pop(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+ assert(!rlist_empty(&entry->in_queue));
+ assert(rlist_last_entry(&limbo->queue, struct txn_limbo_entry,
+ in_queue) == entry);
+ (void) limbo;
+ rlist_del_entry(entry, in_queue);
+}
+
void
txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
{
@@ -118,7 +128,11 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
return entry->is_commit;
}
-void
+static int
+txn_limbo_write_rollback(struct txn_limbo *limbo,
+ struct txn_limbo_entry *entry);
+
+int
txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
{
struct txn *txn = entry->txn;
@@ -127,33 +141,64 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
assert(txn_has_flag(txn, TXN_WAIT_ACK));
if (txn_limbo_check_complete(limbo, entry)) {
txn_limbo_remove(limbo, entry);
- return;
+ return 0;
}
bool cancellable = fiber_set_cancellable(false);
bool timed_out = fiber_yield_timeout(txn_limbo_confirm_timeout(limbo));
fiber_set_cancellable(cancellable);
if (timed_out) {
- // TODO: implement rollback.
- entry->is_rollback = true;
+ txn_limbo_write_rollback(limbo, entry);
+ struct txn_limbo_entry *e, *tmp;
+ rlist_foreach_entry_safe_reverse(e, &limbo->queue,
+ in_queue, tmp) {
+ e->is_rollback = true;
+ e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT;
+ txn_limbo_pop(limbo, e);
+ txn_clear_flag(e->txn, TXN_WAIT_ACK);
+ txn_complete(e->txn);
+ if (e == entry)
+ break;
+ fiber_wakeup(e->txn->fiber);
+ }
+ diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
+ return -1;
}
assert(txn_limbo_entry_is_complete(entry));
+ /*
+ * The first tx to be rolled back already performed all
+ * the necessary cleanups for us.
+ */
+ if (entry->is_rollback) {
+ diag_set(ClientError, ER_SYNC_ROLLBACK);
+ return -1;
+ }
txn_limbo_remove(limbo, entry);
txn_clear_flag(txn, TXN_WAIT_ACK);
+ return 0;
}
-/**
- * Write a confirmation entry to WAL. After it's written all the
- * transactions waiting for confirmation may be finished.
- */
static int
-txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+txn_limbo_write_confirm_rollback(struct txn_limbo *limbo,
+ struct txn_limbo_entry *entry,
+ bool is_confirm)
{
struct xrow_header row;
struct request request = {
.header = &row,
};
- if (xrow_encode_confirm(&row, limbo->instance_id, entry->lsn) < 0)
+ int res = 0;
+ if (is_confirm) {
+ res = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn);
+ } else {
+ /*
+ * This entry is the first to be rolled back, so
+ * the last "safe" lsn is entry->lsn - 1.
+ */
+ res = xrow_encode_rollback(&row, limbo->instance_id,
+ entry->lsn - 1);
+ }
+ if (res == -1)
return -1;
struct txn *txn = txn_begin();
@@ -171,6 +216,17 @@ rollback:
return -1;
}
+/**
+ * Write a confirmation entry to WAL. After it's written all the
+ * transactions waiting for confirmation may be finished.
+ */
+static int
+txn_limbo_write_confirm(struct txn_limbo *limbo,
+ struct txn_limbo_entry *entry)
+{
+ return txn_limbo_write_confirm_rollback(limbo, entry, true);
+}
+
void
txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
{
@@ -194,6 +250,49 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
}
}
+/**
+ * Write a rollback message to WAL. After it's written
+ * all the tarnsactions following the current one and waiting
+ * for confirmation must be rolled back.
+ */
+static int
+txn_limbo_write_rollback(struct txn_limbo *limbo,
+ struct txn_limbo_entry *entry)
+{
+ return txn_limbo_write_confirm_rollback(limbo, entry, false);
+}
+
+void
+txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
+{
+ assert(limbo->instance_id != REPLICA_ID_NIL &&
+ limbo->instance_id != instance_id);
+ struct txn_limbo_entry *e, *tmp;
+ rlist_foreach_entry_safe_reverse(e, &limbo->queue, in_queue, tmp) {
+ if (e->lsn <= lsn)
+ break;
+ e->is_rollback = true;
+ txn_limbo_pop(limbo, e);
+ txn_clear_flag(e->txn, TXN_WAIT_ACK);
+ if (e->txn->signature >= 0) {
+ /* Rollback the transaction. */
+ e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK;
+ txn_complete(e->txn);
+ } else {
+ /*
+ * Rollback the transaction, but don't
+ * free it yet. txn_complete_async() will
+ * free it.
+ */
+ e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK;
+ struct fiber *fiber = e->txn->fiber;
+ e->txn->fiber = fiber();
+ txn_complete(e->txn);
+ e->txn->fiber = fiber;
+ }
+ }
+}
+
void
txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
{
@@ -217,7 +316,10 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
}
if (last_quorum != NULL) {
if (txn_limbo_write_confirm(limbo, last_quorum) != 0) {
- // TODO: rollback.
+ // TODO: what to do here?.
+ // We already failed writing the CONFIRM
+ // message. What are the chances we'll be
+ // able to write ROLLBACK?
return;
}
/*
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 94f224131..138093c7c 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -156,8 +156,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
/**
* Block the current fiber until the transaction in the limbo
* entry is either committed or rolled back.
+ * If timeout is reached before acks are collected, the tx is
+ * rolled back as well as all the txs in the limbo following it.
+ * Returns -1 when rollback was performed and tx has to be freed.
+ * 0 when tx processing can go on.
*/
-void
+int
txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
/**
@@ -166,6 +170,12 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
void
txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
+/**
+ * Rollback all the entries starting with given master's LSN.
+ */
+void
+txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn);
+
/**
* Return TRUE if limbo is empty.
*/
diff --git a/test/box/error.result b/test/box/error.result
index 34ded3930..8241ec1a8 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -434,6 +434,8 @@ t;
| 213: box.error.NO_SUCH_SESSION_SETTING
| 214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS
| 215: box.error.SYNC_MASTER_MISMATCH
+ | 216: box.error.SYNC_QUORUM_TIMEOUT
+ | 217: box.error.SYNC_ROLLBACK
| ...
test_run:cmd("setopt delimiter ''");
--
2.21.1 (Apple Git-122.3)
More information about the Tarantool-patches
mailing list