[Tarantool-patches] [PATCH 4/4] txn_limbo: add ROLLBACK processing
Serge Petrenko
sergepetrenko at tarantool.org
Thu Jun 18 15:14:03 MSK 2020
Now txn_limbo writes a ROLLBACK entry to WAL when one of the limbo
entries fails to gather quorum during a txn_limbo_confirm_timeout.
All the limbo entries, starting with the failed one, are rolled back in
reverse order.
Closes #4848
---
src/box/applier.cc | 40 ++++++++++++----
src/box/relay.cc | 2 +-
src/box/txn.c | 5 +-
src/box/txn_limbo.c | 110 +++++++++++++++++++++++++++++++++++++++-----
src/box/txn_limbo.h | 12 ++++-
5 files changed, 146 insertions(+), 23 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index ad2ee18a5..872372f62 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -260,7 +260,7 @@ process_nop(struct request *request)
* Confirms some of the txs waiting in txn_limbo.
*/
static int
-applier_on_confirm(struct trigger *trig, void *data)
+applier_on_confirm_written(struct trigger *trig, void *data)
{
(void) trig;
int64_t lsn = *(int64_t *)data;
@@ -268,10 +268,23 @@ applier_on_confirm(struct trigger *trig, void *data)
return 0;
}
+/*
+ * An on_commit trigger set on a txn containing a ROLLBACK entry.
+ * Rolls back part of the txs waiting in limbo.
+ */
static int
-process_confirm(struct request *request)
+applier_on_rollback_written(struct trigger *trig, void *data)
{
- assert(request->header->type == IPROTO_CONFIRM);
+ (void) trig;
+ int64_t lsn = *(int64_t *)data;
+ txn_limbo_read_rollback(&txn_limbo, lsn);
+ return 0;
+}
+
+static int
+process_confirm_rollback(struct request *request, bool is_confirm)
+{
+ assert(iproto_type_is_synchro_request(request->header->type));
uint32_t replica_id;
struct txn *txn = in_txn();
size_t size;
@@ -280,8 +293,14 @@ process_confirm(struct request *request)
diag_set(OutOfMemory, size, "region_alloc_object", "lsn");
return -1;
}
- if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0)
+ int res = 0;
+ if (is_confirm)
+ res = xrow_decode_confirm(request->header, &replica_id, lsn);
+ else
+ res = xrow_decode_rollback(request->header, &replica_id, lsn);
+ if (res == -1)
return -1;
+
/*
* on_commit trigger failure is not allowed, so check for
* instance id early.
@@ -294,7 +313,7 @@ process_confirm(struct request *request)
/*
* Set an on_commit trigger which will perform the actual
- * confirmation processing.
+ * confirmation/rollback processing.
*/
struct trigger *trig = region_alloc_object(&txn->region, typeof(*trig),
&size);
@@ -302,7 +321,9 @@ process_confirm(struct request *request)
diag_set(OutOfMemory, size, "region_alloc_object", "trig");
return -1;
}
- trigger_create(trig, applier_on_confirm, lsn, NULL);
+ trigger_create(trig, is_confirm ? applier_on_confirm_written :
+ applier_on_rollback_written,
+ lsn, NULL);
if (txn_begin_stmt(txn, NULL) != 0)
return -1;
@@ -319,9 +340,10 @@ static int
apply_row(struct xrow_header *row)
{
struct request request;
- if (row->type == IPROTO_CONFIRM) {
+ if (iproto_type_is_synchro_request(row->type)) {
request.header = row;
- return process_confirm(&request);
+ return process_confirm_rollback(&request,
+ row->type == IPROTO_CONFIRM);
}
if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
return -1;
@@ -344,7 +366,7 @@ apply_final_join_row(struct xrow_header *row)
* Confirms are ignored during join. All the data master
* sends us is valid.
*/
- if (row->type == IPROTO_CONFIRM)
+ if (iproto_type_is_synchro_request(row->type))
return 0;
struct txn *txn = txn_begin();
if (txn == NULL)
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 0adc9fc98..29588b6ca 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -772,7 +772,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
{
struct relay *relay = container_of(stream, struct relay, stream);
assert(iproto_type_is_dml(packet->type) ||
- packet->type == IPROTO_CONFIRM);
+ iproto_type_is_synchro_request(packet->type));
if (packet->group_id == GROUP_LOCAL) {
/*
* We do not relay replica-local rows to other
diff --git a/src/box/txn.c b/src/box/txn.c
index 4f787db79..484b822db 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -751,7 +751,10 @@ txn_commit(struct txn *txn)
if (is_sync) {
txn_limbo_assign_lsn(&txn_limbo, limbo_entry,
req->rows[req->n_rows - 1]->lsn);
- txn_limbo_wait_complete(&txn_limbo, limbo_entry);
+ if (txn_limbo_wait_complete(&txn_limbo, limbo_entry) < 0) {
+ txn_free(txn);
+ return -1;
+ }
}
if (!txn_has_flag(txn, TXN_IS_DONE)) {
txn->signature = req->res;
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index a715a136e..c55f5bda1 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -84,6 +84,16 @@ txn_limbo_remove(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
rlist_del_entry(entry, in_queue);
}
+static inline void
+txn_limbo_pop(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+ assert(!rlist_empty(&entry->in_queue));
+ assert(rlist_last_entry(&limbo->queue, struct txn_limbo_entry,
+ in_queue) == entry);
+ (void) limbo;
+ rlist_del_entry(entry, in_queue);
+}
+
void
txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
{
@@ -116,7 +126,11 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
return entry->is_commit;
}
-void
+static int
+txn_limbo_write_rollback(struct txn_limbo *limbo,
+ struct txn_limbo_entry *entry);
+
+int
txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
{
struct txn *txn = entry->txn;
@@ -125,33 +139,61 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
assert(txn_has_flag(txn, TXN_WAIT_ACK));
if (txn_limbo_check_complete(limbo, entry)) {
txn_limbo_remove(limbo, entry);
- return;
+ return 0;
}
bool cancellable = fiber_set_cancellable(false);
bool timed_out = fiber_yield_timeout(txn_limbo_confirm_timeout(limbo));
fiber_set_cancellable(cancellable);
if (timed_out) {
- // TODO: implement rollback.
- entry->is_rollback = true;
+ txn_limbo_write_rollback(limbo, entry);
+ struct txn_limbo_entry *e, *tmp;
+ rlist_foreach_entry_safe_reverse(e, &limbo->queue,
+ in_queue, tmp) {
+ e->is_rollback = true;
+ e->txn->signature = -1;
+ txn_limbo_pop(limbo, e);
+ txn_clear_flag(e->txn, TXN_WAIT_ACK);
+ txn_complete(e->txn);
+ if (e == entry)
+ break;
+ fiber_wakeup(e->txn->fiber);
+ }
+ return -1;
}
assert(txn_limbo_entry_is_complete(entry));
+ /*
+ * The first tx to be rolled back already performed all
+ * the necessary cleanups for us.
+ */
+ if (entry->is_rollback)
+ return -1;
txn_limbo_remove(limbo, entry);
txn_clear_flag(txn, TXN_WAIT_ACK);
+ return 0;
}
-/**
- * Write a confirmation entry to WAL. After it's written all the
- * transactions waiting for confirmation may be finished.
- */
static int
-txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+txn_limbo_write_confirm_rollback(struct txn_limbo *limbo,
+ struct txn_limbo_entry *entry,
+ bool is_confirm)
{
struct xrow_header row;
struct request request = {
.header = &row,
};
- if (xrow_encode_confirm(&row, limbo->instance_id, entry->lsn) < 0)
+ int res = 0;
+ if (is_confirm) {
+ res = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn);
+ } else {
+ /*
+ * This entry is the first to be rolled back, so
+ * the last "safe" lsn is entry->lsn - 1.
+ */
+ res = xrow_encode_rollback(&row, limbo->instance_id,
+ entry->lsn - 1);
+ }
+ if (res == -1)
return -1;
struct txn *txn = txn_begin();
@@ -169,6 +211,17 @@ rollback:
return -1;
}
+/**
+ * Write a confirmation entry to WAL. After it's written all the
+ * transactions waiting for confirmation may be finished.
+ */
+static int
+txn_limbo_write_confirm(struct txn_limbo *limbo,
+ struct txn_limbo_entry *entry)
+{
+ return txn_limbo_write_confirm_rollback(limbo, entry, true);
+}
+
void
txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
{
@@ -191,6 +244,38 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
}
}
+/**
+ * Write a rollback message to WAL. After it's written
+ * all the tarnsactions following the current one and waiting
+ * for confirmation must be rolled back.
+ */
+static int
+txn_limbo_write_rollback(struct txn_limbo *limbo,
+ struct txn_limbo_entry *entry)
+{
+ return txn_limbo_write_confirm_rollback(limbo, entry, false);
+}
+
+void
+txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
+{
+ assert(limbo->instance_id != REPLICA_ID_NIL &&
+ limbo->instance_id != instance_id);
+ struct txn_limbo_entry *e, *tmp;
+ rlist_foreach_entry_safe_reverse(e, &limbo->queue, in_queue, tmp) {
+ if (e->lsn < lsn)
+ break;
+ assert(e->txn->fiber == NULL);
+ e->is_rollback = true;
+ txn_limbo_pop(limbo, e);
+ txn_clear_flag(e->txn, TXN_WAIT_ACK);
+
+ /* Rollback the transaction. */
+ e->txn->signature = -1;
+ txn_complete(e->txn);
+ }
+}
+
void
txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
{
@@ -214,7 +299,10 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
}
if (last_quorum != NULL) {
if (txn_limbo_write_confirm(limbo, last_quorum) != 0) {
- // TODO: rollback.
+ // TODO: what to do here?.
+ // We already failed writing the CONFIRM
+ // message. What are the chances we'll be
+ // able to write ROLLBACK?
return;
}
/*
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 23019e5d9..987cf9271 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -156,8 +156,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
/**
* Block the current fiber until the transaction in the limbo
* entry is either committed or rolled back.
+ * If timeout is reached before acks are collected, the tx is
+ * rolled back as well as all the txs in the limbo following it.
+ * Returns -1 when rollback was performed and tx has to be freed.
+ * 0 when tx processing can go on.
*/
-void
+int
txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
/**
@@ -166,6 +170,12 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
void
txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
+/**
+ * Rollback all the entries starting with given master's LSN.
+ */
+void
+txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn);
+
/**
* Return TRUE if limbo is empty.
*/
--
2.24.3 (Apple Git-128)
More information about the Tarantool-patches
mailing list