[Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
Serge Petrenko
sergepetrenko at tarantool.org
Tue Jun 9 15:20:20 MSK 2020
Make txn_limbo write a CONFIRM entry as soon as a batch of entries
receive their acks. CONFIRM entry is written to WAL and later replicated
to all the replicas.
Now replicas put synchronous transactions into txn_limbo and wait for
corresponding confirmation entries to arrive and end up in their WAL
before committing the transactions.
Part-of #4847
---
src/box/applier.cc | 81 ++++++++++++++++++++++++++++++++++++++++++-
src/box/box.cc | 3 ++
src/box/errcode.h | 1 +
src/box/relay.cc | 13 ++++---
src/box/txn.c | 75 ++++++++++++++++++++++++++++++---------
src/box/txn.h | 23 ++++++++++++
src/box/txn_limbo.c | 79 ++++++++++++++++++++++++++++++++++++-----
src/box/txn_limbo.h | 6 ++++
test/box/error.result | 1 +
9 files changed, 252 insertions(+), 30 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index df48b4796..1dc977424 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -51,6 +51,7 @@
#include "txn.h"
#include "box.h"
#include "scoped_guard.h"
+#include "txn_limbo.h"
STRS(applier_state, applier_STATE);
@@ -214,6 +215,11 @@ apply_snapshot_row(struct xrow_header *row)
struct txn *txn = txn_begin();
if (txn == NULL)
return -1;
+ /*
+ * Do not wait for confirmation when fetching a snapshot.
+ * Master only sends confirmed rows during join.
+ */
+ txn_force_async(txn);
if (txn_begin_stmt(txn, space) != 0)
goto rollback;
/* no access checks here - applier always works with admin privs */
@@ -249,10 +255,73 @@ process_nop(struct request *request)
return txn_commit_stmt(txn, request);
}
+/*
+ * An on_commit trigger set on a txn containing a CONFIRM entry.
+ * Confirms some of the txs waiting in txn_limbo.
+ */
+static int
+applier_on_confirm(struct trigger *trig, void *data)
+{
+ (void) trig;
+ int64_t lsn = *(int64_t *)data;
+ txn_limbo_read_confirm(&txn_limbo, lsn);
+ return 0;
+}
+
+static int
+process_confirm(struct request *request)
+{
+ assert(request->header->type = IPROTO_CONFIRM);
+ uint32_t replica_id;
+ struct txn *txn = in_txn();
+ int64_t *lsn = (int64_t *) region_alloc(&txn->region, sizeof(int64_t));
+ if (lsn == NULL) {
+ diag_set(OutOfMemory, sizeof(int64_t), "region_alloc", "lsn");
+ return -1;
+ }
+ if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0)
+ return -1;
+ /*
+ * on_commit trigger failure is not allowed, so check for
+ * instance id early.
+ */
+ if (replica_id != txn_limbo.instance_id) {
+ diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
+ txn_limbo.instance_id);
+ return -1;
+ }
+
+ /*
+ * Set an on_commit trigger which will perform the actual
+ * confirmation processing.
+ */
+ struct trigger *trig = (struct trigger *)region_alloc(&txn->region,
+ sizeof(*trig));
+ if (trig == NULL) {
+ diag_set(OutOfMemory, sizeof(*trig), "region_alloc", "trig");
+ return -1;
+ }
+ trigger_create(trig, applier_on_confirm, lsn, NULL);
+
+ if (txn_begin_stmt(txn, NULL) != 0)
+ return -1;
+
+ if (txn_commit_stmt(txn, request) == 0) {
+ txn_on_commit(txn, trig);
+ return 0;
+ } else {
+ return -1;
+ }
+}
+
static int
apply_row(struct xrow_header *row)
{
struct request request;
+ if (row->type == IPROTO_CONFIRM) {
+ request.header = row;
+ return process_confirm(&request);
+ }
if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
return -1;
if (request.type == IPROTO_NOP)
@@ -273,6 +342,11 @@ apply_final_join_row(struct xrow_header *row)
struct txn *txn = txn_begin();
if (txn == NULL)
return -1;
+ /*
+ * Do not wait for confirmation while processing final
+ * join rows. See apply_snapshot_row().
+ */
+ txn_force_async(txn);
if (apply_row(row) != 0) {
txn_rollback(txn);
fiber_gc();
@@ -492,7 +566,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
vclock_follow_xrow(&replicaset.vclock, &row);
- if (apply_final_join_row(&row) != 0)
+ /*
+ * Confirms are ignored during join. All the
+ * data master sends us is valid.
+ */
+ if (row.type != IPROTO_CONFIRM &&
+ apply_final_join_row(&row) != 0)
diag_raise();
if (++row_count % 100000 == 0)
say_info("%.1fM rows received", row_count / 1e6);
diff --git a/src/box/box.cc b/src/box/box.cc
index 64ac89975..792c3c394 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -342,6 +342,9 @@ static void
apply_wal_row(struct xstream *stream, struct xrow_header *row)
{
struct request request;
+ // TODO: process confirmation during recovery.
+ if (row->type == IPROTO_CONFIRM)
+ return;
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
if (request.type != IPROTO_NOP) {
struct space *space = space_cache_find_xc(request.space_id);
diff --git a/src/box/errcode.h b/src/box/errcode.h
index 019c582af..3ba6866e5 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -267,6 +267,7 @@ struct errcode_record {
/*212 */_(ER_SEQUENCE_NOT_STARTED, "Sequence '%s' is not started") \
/*213 */_(ER_NO_SUCH_SESSION_SETTING, "Session setting %s doesn't exist") \
/*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \
+ /*215 */_(ER_SYNC_MASTER_MISMATCH, "CONFIRM message arrived for an unknown master id %d, expected %d") \
/*
* !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 333e91ea9..4df3c2f26 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -402,10 +402,14 @@ tx_status_update(struct cmsg *msg)
vclock_copy(&status->relay->tx.vclock, &status->vclock);
/*
* Let pending synchronous transactions know, which of
- * them were successfully sent to the replica.
+ * them were successfully sent to the replica. Acks are
+ * collected only on the master. Other instances wait for
+ * master's CONFIRM message instead.
*/
- txn_limbo_ack(&txn_limbo, status->relay->replica->id,
- vclock_get(&status->vclock, instance_id));
+ if (txn_limbo.instance_id == instance_id) {
+ txn_limbo_ack(&txn_limbo, status->relay->replica->id,
+ vclock_get(&status->vclock, instance_id));
+ }
static const struct cmsg_hop route[] = {
{relay_status_update, NULL}
};
@@ -766,7 +770,8 @@ static void
relay_send_row(struct xstream *stream, struct xrow_header *packet)
{
struct relay *relay = container_of(stream, struct relay, stream);
- assert(iproto_type_is_dml(packet->type));
+ assert(iproto_type_is_dml(packet->type) ||
+ packet->type == IPROTO_CONFIRM);
if (packet->group_id == GROUP_LOCAL) {
/*
* We do not relay replica-local rows to other
diff --git a/src/box/txn.c b/src/box/txn.c
index a65100b31..3b331fecc 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -36,6 +36,7 @@
#include <fiber.h>
#include "xrow.h"
#include "errinj.h"
+#include "iproto_constants.h"
double too_long_threshold;
@@ -81,7 +82,12 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
*/
struct space *space = stmt->space;
row->group_id = space != NULL ? space_group_id(space) : 0;
- row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
+ /*
+ * IPROTO_CONFIRM entries are supplementary and aren't
+ * valid dml requests. They're encoded manually.
+ */
+ if (likely(row->type != IPROTO_CONFIRM))
+ row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
if (row->bodycnt < 0)
return -1;
stmt->row = row;
@@ -321,8 +327,10 @@ txn_commit_stmt(struct txn *txn, struct request *request)
*/
struct txn_stmt *stmt = txn_current_stmt(txn);
- /* Create WAL record for the write requests in non-temporary spaces.
- * stmt->space can be NULL for IRPOTO_NOP.
+ /*
+ * Create WAL record for the write requests in
+ * non-temporary spaces. stmt->space can be NULL for
+ * IRPOTO_NOP or IPROTO_CONFIRM.
*/
if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
if (txn_add_redo(txn, stmt, request) != 0)
@@ -417,12 +425,12 @@ txn_run_rollback_triggers(struct txn *txn, struct rlist *triggers)
/**
* Complete transaction processing.
*/
-static void
+void
txn_complete(struct txn *txn)
{
/*
* Note, engine can be NULL if transaction contains
- * IPROTO_NOP statements only.
+ * IPROTO_NOP or IPROTO_CONFIRM statements.
*/
if (txn->signature < 0) {
/* Undo the transaction. */
@@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn)
struct xrow_header **remote_row = req->rows;
struct xrow_header **local_row = req->rows + txn->n_applier_rows;
bool is_sync = false;
- /*
- * Only local transactions, originated from the master,
- * can enter 'waiting for acks' state. It means, only
- * author of the transaction can collect acks. Replicas
- * consider it a normal async transaction so far.
- */
- bool is_local = true;
stailq_foreach_entry(stmt, &txn->stmts, next) {
if (stmt->has_triggers) {
@@ -530,17 +531,18 @@ txn_journal_entry_new(struct txn *txn)
if (stmt->row == NULL)
continue;
- if (stmt->row->replica_id == 0) {
+ if (stmt->row->replica_id == 0)
*local_row++ = stmt->row;
- } else {
+ else
*remote_row++ = stmt->row;
- is_local = false;
- }
req->approx_len += xrow_approx_len(stmt->row);
}
- if (is_sync && is_local)
+
+ is_sync = is_sync && !txn_has_flag(txn, TXN_FORCE_ASYNC);
+ if (is_sync) {
txn_set_flag(txn, TXN_WAIT_ACK);
+ }
assert(remote_row == req->rows + txn->n_applier_rows);
assert(local_row == remote_row + txn->n_new_rows);
@@ -601,6 +603,19 @@ txn_commit_nop(struct txn *txn)
return false;
}
+/*
+ * A trigger called on tx rollback due to a failed WAL write,
+ * when tx is waiting for confirmation.
+ */
+static int
+txn_limbo_on_rollback(struct trigger *trig, void *data)
+{
+ (void) trig;
+ struct txn_limbo_entry *entry = (struct txn_limbo_entry *) data;
+ txn_limbo_abort(&txn_limbo, entry);
+ return 0;
+}
+
int
txn_commit_async(struct txn *txn)
{
@@ -632,16 +647,42 @@ txn_commit_async(struct txn *txn)
return -1;
}
+ bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
+ struct txn_limbo_entry *limbo_entry;
+ if (is_sync) {
+ /* See txn_commit(). */
+ uint32_t origin_id = req->rows[0]->replica_id;
+ int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn;
+ limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
+ if (limbo_entry == NULL) {
+ txn_rollback(txn);
+ txn_free(txn);
+ return -1;
+ }
+ assert(lsn > 0);
+ txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
+ }
+
fiber_set_txn(fiber(), NULL);
if (journal_write_async(req) != 0) {
fiber_set_txn(fiber(), txn);
txn_rollback(txn);
+ txn_limbo_abort(&txn_limbo, limbo_entry);
diag_set(ClientError, ER_WAL_IO);
diag_log();
return -1;
}
+ /*
+ * Set a trigger to abort waiting for confirm on WAL write
+ * failure.
+ */
+ if (is_sync) {
+ struct trigger trig;
+ trigger_create(&trig, txn_limbo_on_rollback, limbo_entry, NULL);
+ txn_on_rollback(txn, &trig);
+ }
return 0;
}
diff --git a/src/box/txn.h b/src/box/txn.h
index 232cc07a8..e7705bb48 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -73,6 +73,13 @@ enum txn_flag {
* then finishes commit and returns success to a user.
*/
TXN_WAIT_ACK,
+ /**
+ * A transaction mustn't wait for confirmation, even if it
+ * touches synchronous spaces. Needed for join stage on
+ * replica, when all the data coming from the master is
+ * already confirmed by design.
+ */
+ TXN_FORCE_ASYNC,
};
enum {
@@ -257,6 +264,16 @@ txn_clear_flag(struct txn *txn, enum txn_flag flag)
txn->flags &= ~(1 << flag);
}
+/**
+ * Force async mode for transaction. It won't wait for acks
+ * or confirmation.
+ */
+static inline void
+txn_force_async(struct txn *txn)
+{
+ txn_set_flag(txn, TXN_FORCE_ASYNC);
+}
+
/* Pointer to the current transaction (if any) */
static inline struct txn *
in_txn(void)
@@ -278,6 +295,12 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn)
struct txn *
txn_begin(void);
+/**
+ * Complete transaction processing.
+ */
+void
+txn_complete(struct txn *txn);
+
/**
* Commit a transaction.
* @pre txn == in_txn()
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index efb97a591..daec98317 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
fiber_yield();
fiber_set_cancellable(cancellable);
// TODO: implement rollback.
- // TODO: implement confirm.
assert(!entry->is_rollback);
+ assert(entry->is_commit);
txn_limbo_remove(limbo, entry);
txn_clear_flag(txn, TXN_WAIT_ACK);
}
+/**
+ * Write a confirmation entry to WAL. After it's written all the
+ * transactions waiting for confirmation may be finished.
+ */
+static int
+txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+ /* Prepare a confirm entry. */
+ struct xrow_header row = {0};
+ struct request request = {0};
+ request.header = &row;
+
+ row.bodycnt = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn);
+ if (row.bodycnt < 0)
+ return -1;
+
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ return -1;
+
+ if (txn_begin_stmt(txn, NULL) != 0)
+ goto rollback;
+ if (txn_commit_stmt(txn, &request) != 0)
+ goto rollback;
+
+ return txn_commit(txn);
+rollback:
+ txn_rollback(txn);
+ return -1;
+}
+
+void
+txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
+{
+ assert(limbo->instance_id != REPLICA_ID_NIL &&
+ limbo->instance_id != instance_id);
+ struct txn_limbo_entry *e, *tmp;
+ rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
+ if (e->lsn > lsn)
+ break;
+ assert(e->txn->fiber == NULL);
+ e->is_commit = true;
+ txn_limbo_remove(limbo, e);
+ txn_clear_flag(e->txn, TXN_WAIT_ACK);
+ /*
+ * txn_complete_async must've been called already,
+ * since CONFIRM always follows the tx in question.
+ * So, finish this tx processing right away.
+ */
+ txn_complete(e->txn);
+ }
+}
+
void
txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
{
@@ -143,23 +196,33 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
vclock_follow(&limbo->vclock, replica_id, lsn);
struct txn_limbo_entry *e;
+ struct txn_limbo_entry *last_quorum = NULL;
rlist_foreach_entry(e, &limbo->queue, in_queue) {
if (e->lsn <= prev_lsn)
continue;
if (e->lsn > lsn)
break;
if (++e->ack_count >= replication_sync_quorum) {
- // TODO: better call complete() right
- // here. Appliers use async transactions,
- // and their txns don't have fibers to
- // wake up. That becomes actual, when
- // appliers will be supposed to wait for
- // 'confirm' message.
e->is_commit = true;
- fiber_wakeup(e->txn->fiber);
+ last_quorum = e;
}
assert(e->ack_count <= VCLOCK_MAX);
}
+ if (last_quorum != NULL) {
+ if (txn_limbo_write_confirm(limbo, last_quorum) != 0) {
+ // TODO: rollback.
+ return;
+ }
+ /*
+ * Wakeup all the entries in direct order as soon
+ * as confirmation message is written to WAL.
+ */
+ rlist_foreach_entry(e, &limbo->queue, in_queue) {
+ fiber_wakeup(e->txn->fiber);
+ if (e == last_quorum)
+ break;
+ }
+ }
}
void
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 1ad1c567a..de415cd97 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -160,6 +160,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
void
txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
+/**
+ * Confirm all the entries up to the given master's LSN.
+ */
+void
+txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
+
void
txn_limbo_init();
diff --git a/test/box/error.result b/test/box/error.result
index 69c471085..34ded3930 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -433,6 +433,7 @@ t;
| 212: box.error.SEQUENCE_NOT_STARTED
| 213: box.error.NO_SUCH_SESSION_SETTING
| 214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS
+ | 215: box.error.SYNC_MASTER_MISMATCH
| ...
test_run:cmd("setopt delimiter ''");
--
2.24.3 (Apple Git-128)
More information about the Tarantool-patches
mailing list