From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp50.i.mail.ru (smtp50.i.mail.ru [94.100.177.110]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id D294D4696C2 for ; Tue, 9 Jun 2020 15:20:46 +0300 (MSK) From: Serge Petrenko Date: Tue, 9 Jun 2020 15:20:16 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH 4/8] replication: make sync transactions wait quorum List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: v.shpilevoy@tarantool.org, sergos@tarantool.org, gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org From: Vladislav Shpilevoy Synchronous transaction (which changes anything in a synchronous space) before commit waits until it is replicated onto a quorum of replicas. So far all the 'synchronousness' is basically the same as the well known 'wait_lsn' technique. With the exception, that the transaction really is not committed until replicated. Problem of wait_lsn is still present though, in case master restarts. Because there is no a 'confirm' record in WAL telling which transactions are replicated and can be applied. Closes #4844 Closes #4845 --- src/box/CMakeLists.txt | 1 + src/box/box.cc | 2 + src/box/errcode.h | 1 + src/box/relay.cc | 7 ++ src/box/txn.c | 40 +++++++++- src/box/txn_limbo.c | 167 ++++++++++++++++++++++++++++++++++++++++ src/box/txn_limbo.h | 168 +++++++++++++++++++++++++++++++++++++++++ test/box/error.result | 1 + 8 files changed, 385 insertions(+), 2 deletions(-) create mode 100644 src/box/txn_limbo.c create mode 100644 src/box/txn_limbo.h diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index 230e7427d..c0833e50a 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -124,6 +124,7 @@ add_library(box STATIC session.cc port.c txn.c + txn_limbo.c box.cc gc.c checkpoint_schedule.c diff --git a/src/box/box.cc b/src/box/box.cc index 9b67aeb1f..64ac89975 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -59,6 +59,7 @@ #include "index.h" #include "port.h" #include "txn.h" +#include "txn_limbo.h" #include "user.h" #include "cfg.h" #include "coio.h" @@ -2389,6 +2390,7 @@ box_init(void) if (tuple_init(lua_hash) != 0) diag_raise(); + txn_limbo_init(); sequence_init(); } diff --git a/src/box/errcode.h b/src/box/errcode.h index d1e4d02a9..019c582af 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -266,6 +266,7 @@ struct errcode_record { /*211 */_(ER_WRONG_QUERY_ID, "Prepared statement with id %u does not exist") \ /*212 */_(ER_SEQUENCE_NOT_STARTED, "Sequence '%s' is not started") \ /*213 */_(ER_NO_SUCH_SESSION_SETTING, "Session setting %s doesn't exist") \ + /*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/box/relay.cc b/src/box/relay.cc index 2ad02cb8a..333e91ea9 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -53,6 +53,7 @@ #include "xrow_io.h" #include "xstream.h" #include "wal.h" +#include "txn_limbo.h" /** * Cbus message to send status updates from relay to tx thread. @@ -399,6 +400,12 @@ tx_status_update(struct cmsg *msg) { struct relay_status_msg *status = (struct relay_status_msg *)msg; vclock_copy(&status->relay->tx.vclock, &status->vclock); + /* + * Let pending synchronous transactions know, which of + * them were successfully sent to the replica. + */ + txn_limbo_ack(&txn_limbo, status->relay->replica->id, + vclock_get(&status->vclock, instance_id)); static const struct cmsg_hop route[] = { {relay_status_update, NULL} }; diff --git a/src/box/txn.c b/src/box/txn.c index 60870f536..1d6518e29 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -29,6 +29,7 @@ * SUCH DAMAGE. */ #include "txn.h" +#include "txn_limbo.h" #include "engine.h" #include "tuple.h" #include "journal.h" @@ -433,7 +434,7 @@ txn_complete(struct txn *txn) engine_rollback(txn->engine, txn); if (txn_has_flag(txn, TXN_HAS_TRIGGERS)) txn_run_rollback_triggers(txn, &txn->on_rollback); - } else { + } else if (!txn_has_flag(txn, TXN_WAIT_ACK)) { /* Commit the transaction. */ if (txn->engine != NULL) engine_commit(txn->engine, txn); @@ -448,6 +449,19 @@ txn_complete(struct txn *txn) txn->signature - n_rows + 1, stop_tm - txn->start_tm); } + } else { + /* + * Complete is called on every WAL operation + * authored by this transaction. And it not always + * is one. And not always is enough for commit. + * In case the transaction is waiting for acks, it + * can't be committed right away. Give control + * back to the fiber, owning the transaction so as + * it could decide what to do next. + */ + if (txn->fiber != fiber()) + fiber_wakeup(txn->fiber); + return; } /* * If there is no fiber waiting for the transaction then @@ -634,6 +648,7 @@ int txn_commit(struct txn *txn) { struct journal_entry *req; + struct txn_limbo_entry *limbo_entry; txn->fiber = fiber(); @@ -655,8 +670,25 @@ txn_commit(struct txn *txn) return -1; } + bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK); + if (is_sync) { + /* + * Append now. Before even WAL write is done. + * After WAL write nothing should fail, even OOM + * wouldn't be acceptable. + */ + limbo_entry = txn_limbo_append(&txn_limbo, txn); + if (limbo_entry == NULL) { + txn_rollback(txn); + txn_free(txn); + return -1; + } + } + fiber_set_txn(fiber(), NULL); if (journal_write(req) != 0) { + if (is_sync) + txn_limbo_abort(&txn_limbo, limbo_entry); fiber_set_txn(fiber(), txn); txn_rollback(txn); txn_free(txn); @@ -665,7 +697,11 @@ txn_commit(struct txn *txn) diag_log(); return -1; } - + if (is_sync) { + txn_limbo_assign_lsn(&txn_limbo, limbo_entry, + req->rows[req->n_rows - 1]->lsn); + txn_limbo_wait_complete(&txn_limbo, limbo_entry); + } if (!txn_has_flag(txn, TXN_IS_DONE)) { txn->signature = req->res; txn_complete(txn); diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c new file mode 100644 index 000000000..d28b2a28b --- /dev/null +++ b/src/box/txn_limbo.c @@ -0,0 +1,167 @@ +/* + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "txn.h" +#include "txn_limbo.h" +#include "replication.h" + +struct txn_limbo txn_limbo; + +static inline void +txn_limbo_create(struct txn_limbo *limbo) +{ + rlist_create(&limbo->queue); + limbo->instance_id = REPLICA_ID_NIL; + vclock_create(&limbo->vclock); +} + +struct txn_limbo_entry * +txn_limbo_append(struct txn_limbo *limbo, struct txn *txn) +{ + assert(txn_has_flag(txn, TXN_WAIT_ACK)); + if (limbo->instance_id != instance_id) { + if (limbo->instance_id == REPLICA_ID_NIL || + rlist_empty(&limbo->queue)) { + limbo->instance_id = instance_id; + } else { + diag_set(ClientError, ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, + limbo->instance_id); + return NULL; + } + } + struct txn_limbo_entry *e = (struct txn_limbo_entry *) + region_alloc(&txn->region, sizeof(*e)); + if (e == NULL) { + diag_set(OutOfMemory, sizeof(*e), "region_alloc", "e"); + return NULL; + } + e->txn = txn; + e->lsn = -1; + e->ack_count = 0; + e->is_commit = false; + e->is_rollback = false; + rlist_add_tail_entry(&limbo->queue, e, in_queue); + return e; +} + +static inline void +txn_limbo_remove(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +{ + assert(!rlist_empty(&entry->in_queue)); + assert(rlist_first_entry(&limbo->queue, struct txn_limbo_entry, + in_queue) == entry); + rlist_del_entry(entry, in_queue); +} + +void +txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +{ + entry->is_rollback = true; + txn_limbo_remove(limbo, entry); +} + +void +txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry, + int64_t lsn) +{ + assert(limbo->instance_id != REPLICA_ID_NIL); + entry->lsn = lsn; + ++entry->ack_count; + vclock_follow(&limbo->vclock, limbo->instance_id, lsn); +} + +static bool +txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +{ + struct vclock_iterator iter; + vclock_iterator_init(&iter, &limbo->vclock); + int ack_count = 0; + int64_t lsn = entry->lsn; + vclock_foreach(&iter, vc) + ack_count += vc.lsn >= lsn; + assert(ack_count >= entry->ack_count); + entry->ack_count = ack_count; + entry->is_commit = ack_count > replication_sync_quorum; + return entry->is_commit; +} + +void +txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +{ + struct txn *txn = entry->txn; + assert(entry->lsn > 0); + assert(!txn_has_flag(txn, TXN_IS_DONE)); + assert(txn_has_flag(txn, TXN_WAIT_ACK)); + if (txn_limbo_check_complete(limbo, entry)) + return; + bool cancellable = fiber_set_cancellable(false); + while (!txn_limbo_entry_is_complete(entry)) + fiber_yield(); + fiber_set_cancellable(cancellable); + // TODO: implement rollback. + // TODO: implement confirm. + assert(!entry->is_rollback); + txn_limbo_remove(limbo, entry); + txn_clear_flag(txn, TXN_WAIT_ACK); +} + +void +txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) +{ + if (rlist_empty(&limbo->queue)) + return; + assert(limbo->instance_id != REPLICA_ID_NIL); + int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id); + vclock_follow(&limbo->vclock, replica_id, lsn); + struct txn_limbo_entry *e; + rlist_foreach_entry(e, &limbo->queue, in_queue) { + if (e->lsn <= prev_lsn) + continue; + if (e->lsn > lsn) + break; + if (++e->ack_count >= replication_sync_quorum) { + // TODO: better call complete() right + // here. Appliers use async transactions, + // and their txns don't have fibers to + // wake up. That becomes actual, when + // appliers will be supposed to wait for + // 'confirm' message. + e->is_commit = true; + fiber_wakeup(e->txn->fiber); + } + assert(e->ack_count <= VCLOCK_MAX); + } +} + +void +txn_limbo_init(void) +{ + txn_limbo_create(&txn_limbo); +} diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h new file mode 100644 index 000000000..112fa8902 --- /dev/null +++ b/src/box/txn_limbo.h @@ -0,0 +1,168 @@ +#pragma once +/* + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "small/rlist.h" +#include "vclock.h" + +#include + +#if defined(__cplusplus) +extern "C" { +#endif /* defined(__cplusplus) */ + +struct txn; + +/** + * Transaction and its quorum metadata, to be stored in limbo. + */ +struct txn_limbo_entry { + /** Link for limbo's queue. */ + struct rlist in_queue; + /** Transaction, waiting for a quorum. */ + struct txn *txn; + /** + * LSN of the transaction by the originator's vclock + * component. May be -1 in case the transaction is not + * written to WAL yet. + */ + int64_t lsn; + /** + * Number of ACKs. Or in other words - how many replicas + * confirmed receipt of the transaction. + */ + int ack_count; + /** + * Result flags. Only one of them can be true. But both + * can be false if the transaction is still waiting for + * its resolution. + */ + bool is_commit; + bool is_rollback; +}; + +static inline bool +txn_limbo_entry_is_complete(const struct txn_limbo_entry *e) +{ + return e->is_commit || e->is_rollback; +} + +/** + * Limbo is a place where transactions are stored, which are + * finished, but not committed nor rolled back. These are + * synchronous transactions in progress of collecting ACKs from + * replicas. + * Limbo's main purposes + * - maintain the transactions ordered by LSN of their emitter; + * - be a link between transaction and replication modules, so + * as they wouldn't depend on each other directly. + */ +struct txn_limbo { + /** + * Queue of limbo entries. Ordered by LSN. Some of the + * entries in the end may not have an LSN yet (their local + * WAL write is still in progress), but their order won't + * change anyway. Because WAL write completions will give + * them LSNs in the same order. + */ + struct rlist queue; + /** + * Instance ID of the owner of all the transactions in the + * queue. Strictly speaking, nothing prevents to store not + * own transactions here, originated from some other + * instance. But still the queue may contain only + * transactions of the same instance. Otherwise LSN order + * won't make sense - different nodes have own independent + * LSNs in their vclock components. + */ + uint32_t instance_id; + /** + * All components of the vclock are versions of the limbo + * owner's LSN, how it is visible on other nodes. For + * example, assume instance ID of the limbo is 1. Then + * vclock[1] here is local LSN of the instance 1. + * vclock[2] is how replica with ID 2 sees LSN of + * instance 1. + * vclock[3] is how replica with ID 3 sees LSN of + * instance 1, and so on. + * In that way by looking at this vclock it is always can + * be said up to which LSN there is a sync quorum for + * transactions, created on the limbo's owner node. + */ + struct vclock vclock; +}; + +/** + * Global limbo entry. So far an instance can have only one limbo, + * where master's transactions are stored. Eventually there may + * appear more than one limbo for master-master support. + */ +extern struct txn_limbo txn_limbo; + +/** + * Allocate, create, and append a new transaction to the limbo. + * The limbo entry is allocated on the transaction's region. + */ +struct txn_limbo_entry * +txn_limbo_append(struct txn_limbo *limbo, struct txn *txn); + +/** Remove the entry from the limbo, mark as rolled back. */ +void +txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry); + +/** + * Assign local LSN to the limbo entry. That happens when the + * transaction is added to the limbo, writes to WAL, and gets an + * LSN. + */ +void +txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry, + int64_t lsn); + +/** + * Ack all transactions up to the given LSN on behalf of the + * replica with the specified ID. + */ +void +txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); + +/** + * Block the current fiber until the transaction in the limbo + * entry is either committed or rolled back. + */ +void +txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); + +void +txn_limbo_init(); + +#if defined(__cplusplus) +} +#endif /* defined(__cplusplus) */ diff --git a/test/box/error.result b/test/box/error.result index 2196fa541..69c471085 100644 --- a/test/box/error.result +++ b/test/box/error.result @@ -432,6 +432,7 @@ t; | 211: box.error.WRONG_QUERY_ID | 212: box.error.SEQUENCE_NOT_STARTED | 213: box.error.NO_SUCH_SESSION_SETTING + | 214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS | ... test_run:cmd("setopt delimiter ''"); -- 2.24.3 (Apple Git-128)