[Tarantool-patches] [PATCH v2 04/19] replication: make sync transactions wait quorum

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Jun 30 02:15:23 MSK 2020


Synchronous transaction (which changes anything in a synchronous
space) before commit waits until it is replicated onto a quorum
of replicas.

So far all the 'synchronousness' is basically the same as the well
known 'wait_lsn' technique. With the exception, that the
transaction really is not committed until replicated.

Problem of wait_lsn is still present though, in case master
restarts. Because there is no a 'confirm' record in WAL telling
which transactions are replicated and can be applied.

Closes #4844
Closes #4845
---
 src/box/CMakeLists.txt |   1 +
 src/box/box.cc         |   2 +
 src/box/errcode.h      |   1 +
 src/box/relay.cc       |  11 +++
 src/box/txn.c          |  51 +++++++++++-
 src/box/txn_limbo.c    | 176 +++++++++++++++++++++++++++++++++++++++++
 src/box/txn_limbo.h    | 168 +++++++++++++++++++++++++++++++++++++++
 test/box/error.result  |   1 +
 8 files changed, 409 insertions(+), 2 deletions(-)
 create mode 100644 src/box/txn_limbo.c
 create mode 100644 src/box/txn_limbo.h

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 63f98f6c8..b8b2689d2 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -169,6 +169,7 @@ add_library(box STATIC
     session.cc
     port.c
     txn.c
+    txn_limbo.c
     box.cc
     gc.c
     checkpoint_schedule.c
diff --git a/src/box/box.cc b/src/box/box.cc
index 0821ea0a3..02088ba01 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -59,6 +59,7 @@
 #include "index.h"
 #include "port.h"
 #include "txn.h"
+#include "txn_limbo.h"
 #include "user.h"
 #include "cfg.h"
 #include "coio.h"
@@ -2413,6 +2414,7 @@ box_init(void)
 	if (tuple_init(lua_hash) != 0)
 		diag_raise();
 
+	txn_limbo_init();
 	sequence_init();
 }
 
diff --git a/src/box/errcode.h b/src/box/errcode.h
index d1e4d02a9..019c582af 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -266,6 +266,7 @@ struct errcode_record {
 	/*211 */_(ER_WRONG_QUERY_ID,		"Prepared statement with id %u does not exist") \
 	/*212 */_(ER_SEQUENCE_NOT_STARTED,		"Sequence '%s' is not started") \
 	/*213 */_(ER_NO_SUCH_SESSION_SETTING,	"Session setting %s doesn't exist") \
+	/*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 2ad02cb8a..36fc14b8c 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -53,6 +53,7 @@
 #include "xrow_io.h"
 #include "xstream.h"
 #include "wal.h"
+#include "txn_limbo.h"
 
 /**
  * Cbus message to send status updates from relay to tx thread.
@@ -399,6 +400,16 @@ tx_status_update(struct cmsg *msg)
 {
 	struct relay_status_msg *status = (struct relay_status_msg *)msg;
 	vclock_copy(&status->relay->tx.vclock, &status->vclock);
+	/*
+	 * Let pending synchronous transactions know, which of
+	 * them were successfully sent to the replica. Acks are
+	 * collected only by the transactions originator (which is
+	 * the single master in 100% so far).
+	 */
+	if (txn_limbo.instance_id == instance_id) {
+		txn_limbo_ack(&txn_limbo, status->relay->replica->id,
+			      vclock_get(&status->vclock, instance_id));
+	}
 	static const struct cmsg_hop route[] = {
 		{relay_status_update, NULL}
 	};
diff --git a/src/box/txn.c b/src/box/txn.c
index edc1f5180..6cfa98212 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -29,6 +29,7 @@
  * SUCH DAMAGE.
  */
 #include "txn.h"
+#include "txn_limbo.h"
 #include "engine.h"
 #include "tuple.h"
 #include "journal.h"
@@ -433,7 +434,7 @@ txn_complete(struct txn *txn)
 			engine_rollback(txn->engine, txn);
 		if (txn_has_flag(txn, TXN_HAS_TRIGGERS))
 			txn_run_rollback_triggers(txn, &txn->on_rollback);
-	} else {
+	} else if (!txn_has_flag(txn, TXN_WAIT_ACK)) {
 		/* Commit the transaction. */
 		if (txn->engine != NULL)
 			engine_commit(txn->engine, txn);
@@ -448,6 +449,19 @@ txn_complete(struct txn *txn)
 					     txn->signature - n_rows + 1,
 					     stop_tm - txn->start_tm);
 		}
+	} else {
+		/*
+		 * Complete is called on every WAL operation
+		 * authored by this transaction. And it not always
+		 * is one. And not always is enough for commit.
+		 * In case the transaction is waiting for acks, it
+		 * can't be committed right away. Give control
+		 * back to the fiber, owning the transaction so as
+		 * it could decide what to do next.
+		 */
+		if (txn->fiber != NULL && txn->fiber != fiber())
+			fiber_wakeup(txn->fiber);
+		return;
 	}
 	/*
 	 * If there is no fiber waiting for the transaction then
@@ -517,6 +531,11 @@ txn_journal_entry_new(struct txn *txn)
 
 		req->approx_len += xrow_approx_len(stmt->row);
 	}
+	/*
+	 * There is no a check for all-local rows, because a local
+	 * space can't be synchronous. So if there is at least one
+	 * synchronous space, the transaction is not local.
+	 */
 	if (is_sync)
 		txn_set_flag(txn, TXN_WAIT_ACK);
 
@@ -627,6 +646,7 @@ int
 txn_commit(struct txn *txn)
 {
 	struct journal_entry *req;
+	struct txn_limbo_entry *limbo_entry;
 
 	txn->fiber = fiber();
 
@@ -648,8 +668,31 @@ txn_commit(struct txn *txn)
 		return -1;
 	}
 
+	bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
+	if (is_sync) {
+		/*
+		 * Remote rows, if any, come before local rows, so
+		 * check for originating instance id here.
+		 */
+		uint32_t origin_id = req->rows[0]->replica_id;
+
+		/*
+		 * Append now. Before even WAL write is done.
+		 * After WAL write nothing should fail, even OOM
+		 * wouldn't be acceptable.
+		 */
+		limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
+		if (limbo_entry == NULL) {
+			txn_rollback(txn);
+			txn_free(txn);
+			return -1;
+		}
+	}
+
 	fiber_set_txn(fiber(), NULL);
 	if (journal_write(req) != 0) {
+		if (is_sync)
+			txn_limbo_abort(&txn_limbo, limbo_entry);
 		fiber_set_txn(fiber(), txn);
 		txn_rollback(txn);
 		txn_free(txn);
@@ -658,7 +701,11 @@ txn_commit(struct txn *txn)
 		diag_log();
 		return -1;
 	}
-
+	if (is_sync) {
+		txn_limbo_assign_lsn(&txn_limbo, limbo_entry,
+				     req->rows[req->n_rows - 1]->lsn);
+		txn_limbo_wait_complete(&txn_limbo, limbo_entry);
+	}
 	if (!txn_has_flag(txn, TXN_IS_DONE)) {
 		txn->signature = req->res;
 		txn_complete(txn);
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
new file mode 100644
index 000000000..9de91db93
--- /dev/null
+++ b/src/box/txn_limbo.c
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "txn.h"
+#include "txn_limbo.h"
+#include "replication.h"
+
+struct txn_limbo txn_limbo;
+
+static inline void
+txn_limbo_create(struct txn_limbo *limbo)
+{
+	rlist_create(&limbo->queue);
+	limbo->instance_id = REPLICA_ID_NIL;
+	vclock_create(&limbo->vclock);
+}
+
+struct txn_limbo_entry *
+txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn)
+{
+	assert(txn_has_flag(txn, TXN_WAIT_ACK));
+	if (id == 0)
+		id = instance_id;
+	if (limbo->instance_id != id) {
+		if (limbo->instance_id == REPLICA_ID_NIL ||
+		    rlist_empty(&limbo->queue)) {
+			limbo->instance_id = id;
+		} else {
+			diag_set(ClientError, ER_UNCOMMITTED_FOREIGN_SYNC_TXNS,
+				 limbo->instance_id);
+			return NULL;
+		}
+	}
+	size_t size;
+	struct txn_limbo_entry *e = region_alloc_object(&txn->region,
+							typeof(*e), &size);
+	if (e == NULL) {
+		diag_set(OutOfMemory, size, "region_alloc_object", "e");
+		return NULL;
+	}
+	e->txn = txn;
+	e->lsn = -1;
+	e->ack_count = 0;
+	e->is_commit = false;
+	e->is_rollback = false;
+	rlist_add_tail_entry(&limbo->queue, e, in_queue);
+	return e;
+}
+
+static inline void
+txn_limbo_remove(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+	assert(!rlist_empty(&entry->in_queue));
+	assert(rlist_first_entry(&limbo->queue, struct txn_limbo_entry,
+				 in_queue) == entry);
+	(void) limbo;
+	rlist_del_entry(entry, in_queue);
+}
+
+void
+txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+	entry->is_rollback = true;
+	txn_limbo_remove(limbo, entry);
+}
+
+void
+txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry,
+		     int64_t lsn)
+{
+	assert(limbo->instance_id != REPLICA_ID_NIL);
+	entry->lsn = lsn;
+	++entry->ack_count;
+	vclock_follow(&limbo->vclock, limbo->instance_id, lsn);
+}
+
+static bool
+txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+	if (txn_limbo_entry_is_complete(entry))
+		return true;
+	struct vclock_iterator iter;
+	vclock_iterator_init(&iter, &limbo->vclock);
+	int ack_count = 0;
+	int64_t lsn = entry->lsn;
+	vclock_foreach(&iter, vc)
+		ack_count += vc.lsn >= lsn;
+	assert(ack_count >= entry->ack_count);
+	entry->ack_count = ack_count;
+	entry->is_commit = ack_count >= replication_synchro_quorum;
+	return entry->is_commit;
+}
+
+void
+txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
+{
+	struct txn *txn = entry->txn;
+	assert(entry->lsn > 0);
+	assert(!txn_has_flag(txn, TXN_IS_DONE));
+	assert(txn_has_flag(txn, TXN_WAIT_ACK));
+	if (txn_limbo_check_complete(limbo, entry)) {
+		txn_limbo_remove(limbo, entry);
+		return;
+	}
+	bool cancellable = fiber_set_cancellable(false);
+	while (!txn_limbo_entry_is_complete(entry))
+		fiber_yield();
+	fiber_set_cancellable(cancellable);
+	// TODO: implement rollback.
+	// TODO: implement confirm.
+	assert(!entry->is_rollback);
+	txn_limbo_remove(limbo, entry);
+	txn_clear_flag(txn, TXN_WAIT_ACK);
+}
+
+void
+txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
+{
+	if (rlist_empty(&limbo->queue))
+		return;
+	assert(limbo->instance_id != REPLICA_ID_NIL);
+	int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
+	vclock_follow(&limbo->vclock, replica_id, lsn);
+	struct txn_limbo_entry *e, *tmp;
+	rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
+		if (e->lsn <= prev_lsn)
+			continue;
+		if (e->lsn > lsn)
+			break;
+		if (++e->ack_count >= replication_synchro_quorum) {
+			// TODO: better call complete() right
+			// here. Appliers use async transactions,
+			// and their txns don't have fibers to
+			// wake up. That becomes actual, when
+			// appliers will be supposed to wait for
+			// 'confirm' message.
+			e->is_commit = true;
+			rlist_del_entry(e, in_queue);
+			fiber_wakeup(e->txn->fiber);
+		}
+		assert(e->ack_count <= VCLOCK_MAX);
+	}
+}
+
+void
+txn_limbo_init(void)
+{
+	txn_limbo_create(&txn_limbo);
+}
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
new file mode 100644
index 000000000..1ad1c567a
--- /dev/null
+++ b/src/box/txn_limbo.h
@@ -0,0 +1,168 @@
+#pragma once
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "small/rlist.h"
+#include "vclock.h"
+
+#include <stdint.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct txn;
+
+/**
+ * Transaction and its quorum metadata, to be stored in limbo.
+ */
+struct txn_limbo_entry {
+	/** Link for limbo's queue. */
+	struct rlist in_queue;
+	/** Transaction, waiting for a quorum. */
+	struct txn *txn;
+	/**
+	 * LSN of the transaction by the originator's vclock
+	 * component. May be -1 in case the transaction is not
+	 * written to WAL yet.
+	 */
+	int64_t lsn;
+	/**
+	 * Number of ACKs. Or in other words - how many replicas
+	 * confirmed receipt of the transaction.
+	 */
+	int ack_count;
+	/**
+	 * Result flags. Only one of them can be true. But both
+	 * can be false if the transaction is still waiting for
+	 * its resolution.
+	 */
+	bool is_commit;
+	bool is_rollback;
+};
+
+static inline bool
+txn_limbo_entry_is_complete(const struct txn_limbo_entry *e)
+{
+	return e->is_commit || e->is_rollback;
+}
+
+/**
+ * Limbo is a place where transactions are stored, which are
+ * finished, but not committed nor rolled back. These are
+ * synchronous transactions in progress of collecting ACKs from
+ * replicas.
+ * Limbo's main purposes
+ *   - maintain the transactions ordered by LSN of their emitter;
+ *   - be a link between transaction and replication modules, so
+ *     as they wouldn't depend on each other directly.
+ */
+struct txn_limbo {
+	/**
+	 * Queue of limbo entries. Ordered by LSN. Some of the
+	 * entries in the end may not have an LSN yet (their local
+	 * WAL write is still in progress), but their order won't
+	 * change anyway. Because WAL write completions will give
+	 * them LSNs in the same order.
+	 */
+	struct rlist queue;
+	/**
+	 * Instance ID of the owner of all the transactions in the
+	 * queue. Strictly speaking, nothing prevents to store not
+	 * own transactions here, originated from some other
+	 * instance. But still the queue may contain only
+	 * transactions of the same instance. Otherwise LSN order
+	 * won't make sense - different nodes have own independent
+	 * LSNs in their vclock components.
+	 */
+	uint32_t instance_id;
+	/**
+	 * All components of the vclock are versions of the limbo
+	 * owner's LSN, how it is visible on other nodes. For
+	 * example, assume instance ID of the limbo is 1. Then
+	 * vclock[1] here is local LSN of the instance 1.
+	 * vclock[2] is how replica with ID 2 sees LSN of
+	 * instance 1.
+	 * vclock[3] is how replica with ID 3 sees LSN of
+	 * instance 1, and so on.
+	 * In that way by looking at this vclock it is always can
+	 * be said up to which LSN there is a sync quorum for
+	 * transactions, created on the limbo's owner node.
+	 */
+	struct vclock vclock;
+};
+
+/**
+ * Global limbo entry. So far an instance can have only one limbo,
+ * where master's transactions are stored. Eventually there may
+ * appear more than one limbo for master-master support.
+ */
+extern struct txn_limbo txn_limbo;
+
+/**
+ * Allocate, create, and append a new transaction to the limbo.
+ * The limbo entry is allocated on the transaction's region.
+ */
+struct txn_limbo_entry *
+txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn);
+
+/** Remove the entry from the limbo, mark as rolled back. */
+void
+txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
+
+/**
+ * Assign local LSN to the limbo entry. That happens when the
+ * transaction is added to the limbo, writes to WAL, and gets an
+ * LSN.
+ */
+void
+txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry,
+		     int64_t lsn);
+
+/**
+ * Ack all transactions up to the given LSN on behalf of the
+ * replica with the specified ID.
+ */
+void
+txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
+
+/**
+ * Block the current fiber until the transaction in the limbo
+ * entry is either committed or rolled back.
+ */
+void
+txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
+
+void
+txn_limbo_init();
+
+#if defined(__cplusplus)
+}
+#endif /* defined(__cplusplus) */
diff --git a/test/box/error.result b/test/box/error.result
index 2196fa541..69c471085 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -432,6 +432,7 @@ t;
  |   211: box.error.WRONG_QUERY_ID
  |   212: box.error.SEQUENCE_NOT_STARTED
  |   213: box.error.NO_SUCH_SESSION_SETTING
+ |   214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS
  | ...
 
 test_run:cmd("setopt delimiter ''");
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list