[Tarantool-patches] [PATCH 4/8] replication: make sync transactions wait quorum
Serge Petrenko
sergepetrenko at tarantool.org
Tue Jun 9 15:20:16 MSK 2020
From: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
Synchronous transaction (which changes anything in a synchronous
space) before commit waits until it is replicated onto a quorum
of replicas.
So far all the 'synchronousness' is basically the same as the well
known 'wait_lsn' technique. With the exception, that the
transaction really is not committed until replicated.
Problem of wait_lsn is still present though, in case master
restarts. Because there is no a 'confirm' record in WAL telling
which transactions are replicated and can be applied.
Closes #4844
Closes #4845
---
src/box/CMakeLists.txt | 1 +
src/box/box.cc | 2 +
src/box/errcode.h | 1 +
src/box/relay.cc | 7 ++
src/box/txn.c | 40 +++++++++-
src/box/txn_limbo.c | 167 ++++++++++++++++++++++++++++++++++++++++
src/box/txn_limbo.h | 168 +++++++++++++++++++++++++++++++++++++++++
test/box/error.result | 1 +
8 files changed, 385 insertions(+), 2 deletions(-)
create mode 100644 src/box/txn_limbo.c
create mode 100644 src/box/txn_limbo.h
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 230e7427d..c0833e50a 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -124,6 +124,7 @@ add_library(box STATIC
session.cc
port.c
txn.c
+ txn_limbo.c
box.cc
gc.c
checkpoint_schedule.c
diff --git a/src/box/box.cc b/src/box/box.cc
index 9b67aeb1f..64ac89975 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -59,6 +59,7 @@
#include "index.h"
#include "port.h"
#include "txn.h"
+#include "txn_limbo.h"
#include "user.h"
#include "cfg.h"
#include "coio.h"
@@ -2389,6 +2390,7 @@ box_init(void)
if (tuple_init(lua_hash) != 0)
diag_raise();
+ txn_limbo_init();
sequence_init();
}
diff --git a/src/box/errcode.h b/src/box/errcode.h
index d1e4d02a9..019c582af 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -266,6 +266,7 @@ struct errcode_record {
/*211 */_(ER_WRONG_QUERY_ID, "Prepared statement with id %u does not exist") \
/*212 */_(ER_SEQUENCE_NOT_STARTED, "Sequence '%s' is not started") \
/*213 */_(ER_NO_SUCH_SESSION_SETTING, "Session setting %s doesn't exist") \
+ /*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \
/*
* !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 2ad02cb8a..333e91ea9 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -53,6 +53,7 @@
#include "xrow_io.h"
#include "xstream.h"
#include "wal.h"
+#include "txn_limbo.h"
/**
* Cbus message to send status updates from relay to tx thread.
@@ -399,6 +400,12 @@ tx_status_update(struct cmsg *msg)
{
struct relay_status_msg *status = (struct relay_status_msg *)msg;
vclock_copy(&status->relay->tx.vclock, &status->vclock);
+ /*
+ * Let pending synchronous transactions know, which of
+ * them were successfully sent to the replica.
+ */
+ txn_limbo_ack(&txn_limbo, status->relay->replica->id,
+ vclock_get(&status->vclock, instance_id));
static const struct cmsg_hop route[] = {
{relay_status_update, NULL}
};
diff --git a/src/box/txn.c b/src/box/txn.c
index 60870f536..1d6518e29 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -29,6 +29,7 @@
* SUCH DAMAGE.
*/
#include "txn.h"
+#include "txn_limbo.h"
#include "engine.h"
#include "tuple.h"
#include "journal.h"
@@ -433,7 +434,7 @@ txn_complete(struct txn *txn)
engine_rollback(txn->engine, txn);
if (txn_has_flag(txn, TXN_HAS_TRIGGERS))
txn_run_rollback_triggers(txn, &txn->on_rollback);
- } else {
+ } else if (!txn_has_flag(txn, TXN_WAIT_ACK)) {
/* Commit the transaction. */
if (txn->engine != NULL)
engine_commit(txn->engine, txn);
@@ -448,6 +449,19 @@ txn_complete(struct txn *txn)
txn->signature - n_rows + 1,
stop_tm - txn->start_tm);
}
+ } else {
+ /*
+ * Complete is called on every WAL operation
+ * authored by this transaction. And it not always
+ * is one. And not always is enough for commit.
+ * In case the transaction is waiting for acks, it
+ * can't be committed right away. Give control
+ * back to the fiber, owning the transaction so as
+ * it could decide what to do next.
+ */
+ if (txn->fiber != fiber())
+ fiber_wakeup(txn->fiber);
+ return;
}
/*
* If there is no fiber waiting for the transaction then
@@ -634,6 +648,7 @@ int
txn_commit(struct txn *txn)
{
struct journal_entry *req;
+ struct txn_limbo_entry *limbo_entry;
txn->fiber = fiber();
@@ -655,8 +670,25 @@ txn_commit(struct txn *txn)
return -1;
}
+ bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
+ if (is_sync) {
+ /*
+ * Append now. Before even WAL write is done.
+ * After WAL write nothing should fail, even OOM
+ * wouldn't be acceptable.
+ */
+ limbo_entry = txn_limbo_append(&txn_limbo, txn);
+ if (limbo_entry == NULL) {
+ txn_rollback(txn);
+ txn_free(txn);
+ return -1;
+ }
+ }
+
fiber_set_txn(fiber(), NULL);
if (journal_write(req) != 0) {
+ if (is_sync)
+ txn_limbo_abort(&txn_limbo, limbo_entry);
fiber_set_txn(fiber(), txn);
txn_rollback(txn);
txn_free(txn);
@@ -665,7 +697,11 @@ txn_commit(struct txn *txn)
diag_log();
return -1;
}
-
+ if (is_sync) {
+ txn_limbo_assign_lsn(&txn_limbo, limbo_entry,
+ req->rows[req->n_rows - 1]->lsn);
+ txn_limbo_wait_complete(&txn_limbo, limbo_entry);
+ }
if (!txn_has_flag(txn, TXN_IS_DONE)) {
txn->signature = req->res;
txn_complete(txn);
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
new file mode 100644
index 000000000..d28b2a28b
--- /dev/null
+++ b/src/box/txn_limbo.c
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "txn.h"
+#include "txn_limbo.h"
+#include "replication.h"
+
+struct txn_limbo txn_limbo;
+
+static inline void
+txn_limbo_create(struct txn_limbo *limbo)
+{
+ rlist_create(&limbo->queue);
+ limbo->instance_id = REPLICA_ID_NIL;
+ vclock_create(&limbo->vclock);
+}
+
+struct txn_limbo_entry *
+txn_limbo_append(struct txn_limbo *limbo, struct txn *txn)
+{
+ assert(txn_has_flag(txn, TXN_WAIT_ACK));
+ if (limbo->instance_id != instance_id) {
+ if (limbo->instance_id == REPLICA_ID_NIL ||
+ rlist_empty(&limbo->queue)) {
+ limbo->instance_id = instance_id;
+ } else {
+ diag_set(ClientError, ER_UNCOMMITTED_FOREIGN_SYNC_TXNS,
+ limbo->instance_id);
+ return NULL;
+ }
+ }
+ struct txn_limbo_entry *e = (struct txn_limbo_entry *)
+ region_alloc(&txn->region, sizeof(*e));
+ if (e == NULL) {
+ diag_set(OutOfMemory, sizeof(*e), "region_alloc", "e");
+ return NULL;
+ }
+ e->txn = txn;
+ e->lsn = -1;
+ e->ack_count = 0;
+ e->is_commit = false;
+ e->is_rollback = false;
+ rlist_add_tail_entry(&limbo->queue, e, in_queue);
+ return e;
+}
+
+static inline void
+txn_limbo_remove(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+ assert(!rlist_empty(&entry->in_queue));
+ assert(rlist_first_entry(&limbo->queue, struct txn_limbo_entry,
+ in_queue) == entry);
+ rlist_del_entry(entry, in_queue);
+}
+
+void
+txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+ entry->is_rollback = true;
+ txn_limbo_remove(limbo, entry);
+}
+
+void
+txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry,
+ int64_t lsn)
+{
+ assert(limbo->instance_id != REPLICA_ID_NIL);
+ entry->lsn = lsn;
+ ++entry->ack_count;
+ vclock_follow(&limbo->vclock, limbo->instance_id, lsn);
+}
+
+static bool
+txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+ struct vclock_iterator iter;
+ vclock_iterator_init(&iter, &limbo->vclock);
+ int ack_count = 0;
+ int64_t lsn = entry->lsn;
+ vclock_foreach(&iter, vc)
+ ack_count += vc.lsn >= lsn;
+ assert(ack_count >= entry->ack_count);
+ entry->ack_count = ack_count;
+ entry->is_commit = ack_count > replication_sync_quorum;
+ return entry->is_commit;
+}
+
+void
+txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+ struct txn *txn = entry->txn;
+ assert(entry->lsn > 0);
+ assert(!txn_has_flag(txn, TXN_IS_DONE));
+ assert(txn_has_flag(txn, TXN_WAIT_ACK));
+ if (txn_limbo_check_complete(limbo, entry))
+ return;
+ bool cancellable = fiber_set_cancellable(false);
+ while (!txn_limbo_entry_is_complete(entry))
+ fiber_yield();
+ fiber_set_cancellable(cancellable);
+ // TODO: implement rollback.
+ // TODO: implement confirm.
+ assert(!entry->is_rollback);
+ txn_limbo_remove(limbo, entry);
+ txn_clear_flag(txn, TXN_WAIT_ACK);
+}
+
+void
+txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
+{
+ if (rlist_empty(&limbo->queue))
+ return;
+ assert(limbo->instance_id != REPLICA_ID_NIL);
+ int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
+ vclock_follow(&limbo->vclock, replica_id, lsn);
+ struct txn_limbo_entry *e;
+ rlist_foreach_entry(e, &limbo->queue, in_queue) {
+ if (e->lsn <= prev_lsn)
+ continue;
+ if (e->lsn > lsn)
+ break;
+ if (++e->ack_count >= replication_sync_quorum) {
+ // TODO: better call complete() right
+ // here. Appliers use async transactions,
+ // and their txns don't have fibers to
+ // wake up. That becomes actual, when
+ // appliers will be supposed to wait for
+ // 'confirm' message.
+ e->is_commit = true;
+ fiber_wakeup(e->txn->fiber);
+ }
+ assert(e->ack_count <= VCLOCK_MAX);
+ }
+}
+
+void
+txn_limbo_init(void)
+{
+ txn_limbo_create(&txn_limbo);
+}
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
new file mode 100644
index 000000000..112fa8902
--- /dev/null
+++ b/src/box/txn_limbo.h
@@ -0,0 +1,168 @@
+#pragma once
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "small/rlist.h"
+#include "vclock.h"
+
+#include <stdint.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct txn;
+
+/**
+ * Transaction and its quorum metadata, to be stored in limbo.
+ */
+struct txn_limbo_entry {
+ /** Link for limbo's queue. */
+ struct rlist in_queue;
+ /** Transaction, waiting for a quorum. */
+ struct txn *txn;
+ /**
+ * LSN of the transaction by the originator's vclock
+ * component. May be -1 in case the transaction is not
+ * written to WAL yet.
+ */
+ int64_t lsn;
+ /**
+ * Number of ACKs. Or in other words - how many replicas
+ * confirmed receipt of the transaction.
+ */
+ int ack_count;
+ /**
+ * Result flags. Only one of them can be true. But both
+ * can be false if the transaction is still waiting for
+ * its resolution.
+ */
+ bool is_commit;
+ bool is_rollback;
+};
+
+static inline bool
+txn_limbo_entry_is_complete(const struct txn_limbo_entry *e)
+{
+ return e->is_commit || e->is_rollback;
+}
+
+/**
+ * Limbo is a place where transactions are stored, which are
+ * finished, but not committed nor rolled back. These are
+ * synchronous transactions in progress of collecting ACKs from
+ * replicas.
+ * Limbo's main purposes
+ * - maintain the transactions ordered by LSN of their emitter;
+ * - be a link between transaction and replication modules, so
+ * as they wouldn't depend on each other directly.
+ */
+struct txn_limbo {
+ /**
+ * Queue of limbo entries. Ordered by LSN. Some of the
+ * entries in the end may not have an LSN yet (their local
+ * WAL write is still in progress), but their order won't
+ * change anyway. Because WAL write completions will give
+ * them LSNs in the same order.
+ */
+ struct rlist queue;
+ /**
+ * Instance ID of the owner of all the transactions in the
+ * queue. Strictly speaking, nothing prevents to store not
+ * own transactions here, originated from some other
+ * instance. But still the queue may contain only
+ * transactions of the same instance. Otherwise LSN order
+ * won't make sense - different nodes have own independent
+ * LSNs in their vclock components.
+ */
+ uint32_t instance_id;
+ /**
+ * All components of the vclock are versions of the limbo
+ * owner's LSN, how it is visible on other nodes. For
+ * example, assume instance ID of the limbo is 1. Then
+ * vclock[1] here is local LSN of the instance 1.
+ * vclock[2] is how replica with ID 2 sees LSN of
+ * instance 1.
+ * vclock[3] is how replica with ID 3 sees LSN of
+ * instance 1, and so on.
+ * In that way by looking at this vclock it is always can
+ * be said up to which LSN there is a sync quorum for
+ * transactions, created on the limbo's owner node.
+ */
+ struct vclock vclock;
+};
+
+/**
+ * Global limbo entry. So far an instance can have only one limbo,
+ * where master's transactions are stored. Eventually there may
+ * appear more than one limbo for master-master support.
+ */
+extern struct txn_limbo txn_limbo;
+
+/**
+ * Allocate, create, and append a new transaction to the limbo.
+ * The limbo entry is allocated on the transaction's region.
+ */
+struct txn_limbo_entry *
+txn_limbo_append(struct txn_limbo *limbo, struct txn *txn);
+
+/** Remove the entry from the limbo, mark as rolled back. */
+void
+txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
+
+/**
+ * Assign local LSN to the limbo entry. That happens when the
+ * transaction is added to the limbo, writes to WAL, and gets an
+ * LSN.
+ */
+void
+txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry,
+ int64_t lsn);
+
+/**
+ * Ack all transactions up to the given LSN on behalf of the
+ * replica with the specified ID.
+ */
+void
+txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
+
+/**
+ * Block the current fiber until the transaction in the limbo
+ * entry is either committed or rolled back.
+ */
+void
+txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
+
+void
+txn_limbo_init();
+
+#if defined(__cplusplus)
+}
+#endif /* defined(__cplusplus) */
diff --git a/test/box/error.result b/test/box/error.result
index 2196fa541..69c471085 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -432,6 +432,7 @@ t;
| 211: box.error.WRONG_QUERY_ID
| 212: box.error.SEQUENCE_NOT_STARTED
| 213: box.error.NO_SUCH_SESSION_SETTING
+ | 214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS
| ...
test_run:cmd("setopt delimiter ''");
--
2.24.3 (Apple Git-128)
More information about the Tarantool-patches
mailing list