[Tarantool-patches] [PATCH v2 04/19] replication: make sync transactions wait quorum
Serge Petrenko
sergepetrenko at tarantool.org
Thu Jul 2 11:48:18 MSK 2020
30.06.2020 02:15, Vladislav Shpilevoy пишет:
> Synchronous transaction (which changes anything in a synchronous
> space) before commit waits until it is replicated onto a quorum
> of replicas.
>
> So far all the 'synchronousness' is basically the same as the well
> known 'wait_lsn' technique. With the exception, that the
> transaction really is not committed until replicated.
>
> Problem of wait_lsn is still present though, in case master
> restarts. Because there is no a 'confirm' record in WAL telling
> which transactions are replicated and can be applied.
>
> Closes #4844
> Closes #4845
Looks good, thanks!
I have a concern though, not related to the patch itself. Please see below.
> ---
> src/box/CMakeLists.txt | 1 +
> src/box/box.cc | 2 +
> src/box/errcode.h | 1 +
> src/box/relay.cc | 11 +++
> src/box/txn.c | 51 +++++++++++-
> src/box/txn_limbo.c | 176 +++++++++++++++++++++++++++++++++++++++++
> src/box/txn_limbo.h | 168 +++++++++++++++++++++++++++++++++++++++
> test/box/error.result | 1 +
> 8 files changed, 409 insertions(+), 2 deletions(-)
> create mode 100644 src/box/txn_limbo.c
> create mode 100644 src/box/txn_limbo.h
>
> diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
> index 63f98f6c8..b8b2689d2 100644
> --- a/src/box/CMakeLists.txt
> +++ b/src/box/CMakeLists.txt
> @@ -169,6 +169,7 @@ add_library(box STATIC
> session.cc
> port.c
> txn.c
> + txn_limbo.c
> box.cc
> gc.c
> checkpoint_schedule.c
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 0821ea0a3..02088ba01 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -59,6 +59,7 @@
> #include "index.h"
> #include "port.h"
> #include "txn.h"
> +#include "txn_limbo.h"
> #include "user.h"
> #include "cfg.h"
> #include "coio.h"
> @@ -2413,6 +2414,7 @@ box_init(void)
> if (tuple_init(lua_hash) != 0)
> diag_raise();
>
> + txn_limbo_init();
> sequence_init();
> }
>
> diff --git a/src/box/errcode.h b/src/box/errcode.h
> index d1e4d02a9..019c582af 100644
> --- a/src/box/errcode.h
> +++ b/src/box/errcode.h
> @@ -266,6 +266,7 @@ struct errcode_record {
> /*211 */_(ER_WRONG_QUERY_ID, "Prepared statement with id %u does not exist") \
> /*212 */_(ER_SEQUENCE_NOT_STARTED, "Sequence '%s' is not started") \
> /*213 */_(ER_NO_SUCH_SESSION_SETTING, "Session setting %s doesn't exist") \
> + /*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \
>
> /*
> * !IMPORTANT! Please follow instructions at start of the file
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 2ad02cb8a..36fc14b8c 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -53,6 +53,7 @@
> #include "xrow_io.h"
> #include "xstream.h"
> #include "wal.h"
> +#include "txn_limbo.h"
>
> /**
> * Cbus message to send status updates from relay to tx thread.
> @@ -399,6 +400,16 @@ tx_status_update(struct cmsg *msg)
> {
> struct relay_status_msg *status = (struct relay_status_msg *)msg;
> vclock_copy(&status->relay->tx.vclock, &status->vclock);
> + /*
> + * Let pending synchronous transactions know, which of
> + * them were successfully sent to the replica. Acks are
> + * collected only by the transactions originator (which is
> + * the single master in 100% so far).
> + */
> + if (txn_limbo.instance_id == instance_id) {
> + txn_limbo_ack(&txn_limbo, status->relay->replica->id,
> + vclock_get(&status->vclock, instance_id));
> + }
> static const struct cmsg_hop route[] = {
> {relay_status_update, NULL}
> };
> diff --git a/src/box/txn.c b/src/box/txn.c
> index edc1f5180..6cfa98212 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -29,6 +29,7 @@
> * SUCH DAMAGE.
> */
> #include "txn.h"
> +#include "txn_limbo.h"
> #include "engine.h"
> #include "tuple.h"
> #include "journal.h"
> @@ -433,7 +434,7 @@ txn_complete(struct txn *txn)
> engine_rollback(txn->engine, txn);
> if (txn_has_flag(txn, TXN_HAS_TRIGGERS))
> txn_run_rollback_triggers(txn, &txn->on_rollback);
> - } else {
> + } else if (!txn_has_flag(txn, TXN_WAIT_ACK)) {
> /* Commit the transaction. */
> if (txn->engine != NULL)
> engine_commit(txn->engine, txn);
> @@ -448,6 +449,19 @@ txn_complete(struct txn *txn)
> txn->signature - n_rows + 1,
> stop_tm - txn->start_tm);
> }
> + } else {
> + /*
> + * Complete is called on every WAL operation
> + * authored by this transaction. And it not always
> + * is one. And not always is enough for commit.
> + * In case the transaction is waiting for acks, it
> + * can't be committed right away. Give control
> + * back to the fiber, owning the transaction so as
> + * it could decide what to do next.
> + */
> + if (txn->fiber != NULL && txn->fiber != fiber())
> + fiber_wakeup(txn->fiber);
> + return;
> }
> /*
> * If there is no fiber waiting for the transaction then
> @@ -517,6 +531,11 @@ txn_journal_entry_new(struct txn *txn)
>
> req->approx_len += xrow_approx_len(stmt->row);
> }
> + /*
> + * There is no a check for all-local rows, because a local
> + * space can't be synchronous. So if there is at least one
> + * synchronous space, the transaction is not local.
> + */
> if (is_sync)
> txn_set_flag(txn, TXN_WAIT_ACK);
>
> @@ -627,6 +646,7 @@ int
> txn_commit(struct txn *txn)
> {
> struct journal_entry *req;
> + struct txn_limbo_entry *limbo_entry;
>
> txn->fiber = fiber();
>
> @@ -648,8 +668,31 @@ txn_commit(struct txn *txn)
> return -1;
> }
>
> + bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
> + if (is_sync) {
> + /*
> + * Remote rows, if any, come before local rows, so
> + * check for originating instance id here.
> + */
> + uint32_t origin_id = req->rows[0]->replica_id;
> +
> + /*
> + * Append now. Before even WAL write is done.
> + * After WAL write nothing should fail, even OOM
> + * wouldn't be acceptable.
> + */
> + limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
> + if (limbo_entry == NULL) {
> + txn_rollback(txn);
> + txn_free(txn);
> + return -1;
> + }
> + }
> +
> fiber_set_txn(fiber(), NULL);
> if (journal_write(req) != 0) {
> + if (is_sync)
> + txn_limbo_abort(&txn_limbo, limbo_entry);
> fiber_set_txn(fiber(), txn);
> txn_rollback(txn);
> txn_free(txn);
> @@ -658,7 +701,11 @@ txn_commit(struct txn *txn)
> diag_log();
> return -1;
> }
> -
> + if (is_sync) {
> + txn_limbo_assign_lsn(&txn_limbo, limbo_entry,
> + req->rows[req->n_rows - 1]->lsn);
This assumes that the last tx row is a global one. This'll be true once
#4928 [1] is fixed. However, the fix is a crutch, either appending a
dummy NOP
statement at the end of tx, or reordering the rows so that the last one is
global. If we remove the crutch someday, we'll also break LSN assignment
here.
Maybe there's a better way to assign LSN to a synchronous tx?
Another point. Once async transactions are also added to limbo, we'll
have fully local transactions in limbo. Local transactions have a separate
LSN counter, so we probably have to assign them the same LSN as the last
synchronous transaction waiting in limbo. Looks like this'll work.
[1] https://github.com/tarantool/tarantool/issues/4928
> + txn_limbo_wait_complete(&txn_limbo, limbo_entry);
> + }
> if (!txn_has_flag(txn, TXN_IS_DONE)) {
> txn->signature = req->res;
> txn_complete(txn);
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> new file mode 100644
> index 000000000..9de91db93
> --- /dev/null
> +++ b/src/box/txn_limbo.c
> @@ -0,0 +1,176 @@
> +/*
> + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + * copyright notice, this list of conditions and the
> + * following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + * copyright notice, this list of conditions and the following
> + * disclaimer in the documentation and/or other materials
> + * provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +#include "txn.h"
> +#include "txn_limbo.h"
> +#include "replication.h"
> +
> +struct txn_limbo txn_limbo;
> +
> +static inline void
> +txn_limbo_create(struct txn_limbo *limbo)
> +{
> + rlist_create(&limbo->queue);
> + limbo->instance_id = REPLICA_ID_NIL;
> + vclock_create(&limbo->vclock);
> +}
> +
> +struct txn_limbo_entry *
> +txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn)
> +{
> + assert(txn_has_flag(txn, TXN_WAIT_ACK));
> + if (id == 0)
> + id = instance_id;
> + if (limbo->instance_id != id) {
> + if (limbo->instance_id == REPLICA_ID_NIL ||
> + rlist_empty(&limbo->queue)) {
> + limbo->instance_id = id;
> + } else {
> + diag_set(ClientError, ER_UNCOMMITTED_FOREIGN_SYNC_TXNS,
> + limbo->instance_id);
> + return NULL;
> + }
> + }
> + size_t size;
> + struct txn_limbo_entry *e = region_alloc_object(&txn->region,
> + typeof(*e), &size);
> + if (e == NULL) {
> + diag_set(OutOfMemory, size, "region_alloc_object", "e");
> + return NULL;
> + }
> + e->txn = txn;
> + e->lsn = -1;
> + e->ack_count = 0;
> + e->is_commit = false;
> + e->is_rollback = false;
> + rlist_add_tail_entry(&limbo->queue, e, in_queue);
> + return e;
> +}
> +
> +static inline void
> +txn_limbo_remove(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
> +{
> + assert(!rlist_empty(&entry->in_queue));
> + assert(rlist_first_entry(&limbo->queue, struct txn_limbo_entry,
> + in_queue) == entry);
> + (void) limbo;
> + rlist_del_entry(entry, in_queue);
> +}
> +
> +void
> +txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
> +{
> + entry->is_rollback = true;
> + txn_limbo_remove(limbo, entry);
> +}
> +
> +void
> +txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry,
> + int64_t lsn)
> +{
> + assert(limbo->instance_id != REPLICA_ID_NIL);
> + entry->lsn = lsn;
> + ++entry->ack_count;
> + vclock_follow(&limbo->vclock, limbo->instance_id, lsn);
> +}
> +
> +static bool
> +txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
> +{
> + if (txn_limbo_entry_is_complete(entry))
> + return true;
> + struct vclock_iterator iter;
> + vclock_iterator_init(&iter, &limbo->vclock);
> + int ack_count = 0;
> + int64_t lsn = entry->lsn;
> + vclock_foreach(&iter, vc)
> + ack_count += vc.lsn >= lsn;
> + assert(ack_count >= entry->ack_count);
> + entry->ack_count = ack_count;
> + entry->is_commit = ack_count >= replication_synchro_quorum;
> + return entry->is_commit;
> +}
> +
> +void
> +txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
> +{
> + struct txn *txn = entry->txn;
> + assert(entry->lsn > 0);
> + assert(!txn_has_flag(txn, TXN_IS_DONE));
> + assert(txn_has_flag(txn, TXN_WAIT_ACK));
> + if (txn_limbo_check_complete(limbo, entry)) {
> + txn_limbo_remove(limbo, entry);
> + return;
> + }
> + bool cancellable = fiber_set_cancellable(false);
> + while (!txn_limbo_entry_is_complete(entry))
> + fiber_yield();
> + fiber_set_cancellable(cancellable);
> + // TODO: implement rollback.
> + // TODO: implement confirm.
> + assert(!entry->is_rollback);
> + txn_limbo_remove(limbo, entry);
> + txn_clear_flag(txn, TXN_WAIT_ACK);
> +}
> +
> +void
> +txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
> +{
> + if (rlist_empty(&limbo->queue))
> + return;
> + assert(limbo->instance_id != REPLICA_ID_NIL);
> + int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
> + vclock_follow(&limbo->vclock, replica_id, lsn);
> + struct txn_limbo_entry *e, *tmp;
> + rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
> + if (e->lsn <= prev_lsn)
> + continue;
> + if (e->lsn > lsn)
> + break;
> + if (++e->ack_count >= replication_synchro_quorum) {
> + // TODO: better call complete() right
> + // here. Appliers use async transactions,
> + // and their txns don't have fibers to
> + // wake up. That becomes actual, when
> + // appliers will be supposed to wait for
> + // 'confirm' message.
> + e->is_commit = true;
> + rlist_del_entry(e, in_queue);
> + fiber_wakeup(e->txn->fiber);
> + }
> + assert(e->ack_count <= VCLOCK_MAX);
> + }
> +}
> +
> +void
> +txn_limbo_init(void)
> +{
> + txn_limbo_create(&txn_limbo);
> +}
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> new file mode 100644
> index 000000000..1ad1c567a
> --- /dev/null
> +++ b/src/box/txn_limbo.h
> @@ -0,0 +1,168 @@
> +#pragma once
> +/*
> + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + * copyright notice, this list of conditions and the
> + * following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + * copyright notice, this list of conditions and the following
> + * disclaimer in the documentation and/or other materials
> + * provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +#include "small/rlist.h"
> +#include "vclock.h"
> +
> +#include <stdint.h>
> +
> +#if defined(__cplusplus)
> +extern "C" {
> +#endif /* defined(__cplusplus) */
> +
> +struct txn;
> +
> +/**
> + * Transaction and its quorum metadata, to be stored in limbo.
> + */
> +struct txn_limbo_entry {
> + /** Link for limbo's queue. */
> + struct rlist in_queue;
> + /** Transaction, waiting for a quorum. */
> + struct txn *txn;
> + /**
> + * LSN of the transaction by the originator's vclock
> + * component. May be -1 in case the transaction is not
> + * written to WAL yet.
> + */
> + int64_t lsn;
> + /**
> + * Number of ACKs. Or in other words - how many replicas
> + * confirmed receipt of the transaction.
> + */
> + int ack_count;
> + /**
> + * Result flags. Only one of them can be true. But both
> + * can be false if the transaction is still waiting for
> + * its resolution.
> + */
> + bool is_commit;
> + bool is_rollback;
> +};
> +
> +static inline bool
> +txn_limbo_entry_is_complete(const struct txn_limbo_entry *e)
> +{
> + return e->is_commit || e->is_rollback;
> +}
> +
> +/**
> + * Limbo is a place where transactions are stored, which are
> + * finished, but not committed nor rolled back. These are
> + * synchronous transactions in progress of collecting ACKs from
> + * replicas.
> + * Limbo's main purposes
> + * - maintain the transactions ordered by LSN of their emitter;
> + * - be a link between transaction and replication modules, so
> + * as they wouldn't depend on each other directly.
> + */
> +struct txn_limbo {
> + /**
> + * Queue of limbo entries. Ordered by LSN. Some of the
> + * entries in the end may not have an LSN yet (their local
> + * WAL write is still in progress), but their order won't
> + * change anyway. Because WAL write completions will give
> + * them LSNs in the same order.
> + */
> + struct rlist queue;
> + /**
> + * Instance ID of the owner of all the transactions in the
> + * queue. Strictly speaking, nothing prevents to store not
> + * own transactions here, originated from some other
> + * instance. But still the queue may contain only
> + * transactions of the same instance. Otherwise LSN order
> + * won't make sense - different nodes have own independent
> + * LSNs in their vclock components.
> + */
> + uint32_t instance_id;
> + /**
> + * All components of the vclock are versions of the limbo
> + * owner's LSN, how it is visible on other nodes. For
> + * example, assume instance ID of the limbo is 1. Then
> + * vclock[1] here is local LSN of the instance 1.
> + * vclock[2] is how replica with ID 2 sees LSN of
> + * instance 1.
> + * vclock[3] is how replica with ID 3 sees LSN of
> + * instance 1, and so on.
> + * In that way by looking at this vclock it is always can
> + * be said up to which LSN there is a sync quorum for
> + * transactions, created on the limbo's owner node.
> + */
> + struct vclock vclock;
> +};
> +
> +/**
> + * Global limbo entry. So far an instance can have only one limbo,
> + * where master's transactions are stored. Eventually there may
> + * appear more than one limbo for master-master support.
> + */
> +extern struct txn_limbo txn_limbo;
> +
> +/**
> + * Allocate, create, and append a new transaction to the limbo.
> + * The limbo entry is allocated on the transaction's region.
> + */
> +struct txn_limbo_entry *
> +txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn);
> +
> +/** Remove the entry from the limbo, mark as rolled back. */
> +void
> +txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
> +
> +/**
> + * Assign local LSN to the limbo entry. That happens when the
> + * transaction is added to the limbo, writes to WAL, and gets an
> + * LSN.
> + */
> +void
> +txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry,
> + int64_t lsn);
> +
> +/**
> + * Ack all transactions up to the given LSN on behalf of the
> + * replica with the specified ID.
> + */
> +void
> +txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
> +
> +/**
> + * Block the current fiber until the transaction in the limbo
> + * entry is either committed or rolled back.
> + */
> +void
> +txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
> +
> +void
> +txn_limbo_init();
> +
> +#if defined(__cplusplus)
> +}
> +#endif /* defined(__cplusplus) */
> diff --git a/test/box/error.result b/test/box/error.result
> index 2196fa541..69c471085 100644
> --- a/test/box/error.result
> +++ b/test/box/error.result
> @@ -432,6 +432,7 @@ t;
> | 211: box.error.WRONG_QUERY_ID
> | 212: box.error.SEQUENCE_NOT_STARTED
> | 213: box.error.NO_SUCH_SESSION_SETTING
> + | 214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS
> | ...
>
> test_run:cmd("setopt delimiter ''");
--
Serge Petrenko
More information about the Tarantool-patches
mailing list