From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp36.i.mail.ru (smtp36.i.mail.ru [94.100.177.96]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id D92D842EF66 for ; Tue, 30 Jun 2020 02:15:50 +0300 (MSK) From: Vladislav Shpilevoy Date: Tue, 30 Jun 2020 01:15:23 +0200 Message-Id: <38933b569988f89ad124e71e49b460698db5e4a0.1593472477.git.v.shpilevoy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v2 04/19] replication: make sync transactions wait quorum List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org, sergepetrenko@tarantool.org Synchronous transaction (which changes anything in a synchronous space) before commit waits until it is replicated onto a quorum of replicas. So far all the 'synchronousness' is basically the same as the well known 'wait_lsn' technique. With the exception, that the transaction really is not committed until replicated. Problem of wait_lsn is still present though, in case master restarts. Because there is no a 'confirm' record in WAL telling which transactions are replicated and can be applied. Closes #4844 Closes #4845 --- src/box/CMakeLists.txt | 1 + src/box/box.cc | 2 + src/box/errcode.h | 1 + src/box/relay.cc | 11 +++ src/box/txn.c | 51 +++++++++++- src/box/txn_limbo.c | 176 +++++++++++++++++++++++++++++++++++++++++ src/box/txn_limbo.h | 168 +++++++++++++++++++++++++++++++++++++++ test/box/error.result | 1 + 8 files changed, 409 insertions(+), 2 deletions(-) create mode 100644 src/box/txn_limbo.c create mode 100644 src/box/txn_limbo.h diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index 63f98f6c8..b8b2689d2 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -169,6 +169,7 @@ add_library(box STATIC session.cc port.c txn.c + txn_limbo.c box.cc gc.c checkpoint_schedule.c diff --git a/src/box/box.cc b/src/box/box.cc index 0821ea0a3..02088ba01 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -59,6 +59,7 @@ #include "index.h" #include "port.h" #include "txn.h" +#include "txn_limbo.h" #include "user.h" #include "cfg.h" #include "coio.h" @@ -2413,6 +2414,7 @@ box_init(void) if (tuple_init(lua_hash) != 0) diag_raise(); + txn_limbo_init(); sequence_init(); } diff --git a/src/box/errcode.h b/src/box/errcode.h index d1e4d02a9..019c582af 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -266,6 +266,7 @@ struct errcode_record { /*211 */_(ER_WRONG_QUERY_ID, "Prepared statement with id %u does not exist") \ /*212 */_(ER_SEQUENCE_NOT_STARTED, "Sequence '%s' is not started") \ /*213 */_(ER_NO_SUCH_SESSION_SETTING, "Session setting %s doesn't exist") \ + /*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/box/relay.cc b/src/box/relay.cc index 2ad02cb8a..36fc14b8c 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -53,6 +53,7 @@ #include "xrow_io.h" #include "xstream.h" #include "wal.h" +#include "txn_limbo.h" /** * Cbus message to send status updates from relay to tx thread. @@ -399,6 +400,16 @@ tx_status_update(struct cmsg *msg) { struct relay_status_msg *status = (struct relay_status_msg *)msg; vclock_copy(&status->relay->tx.vclock, &status->vclock); + /* + * Let pending synchronous transactions know, which of + * them were successfully sent to the replica. Acks are + * collected only by the transactions originator (which is + * the single master in 100% so far). + */ + if (txn_limbo.instance_id == instance_id) { + txn_limbo_ack(&txn_limbo, status->relay->replica->id, + vclock_get(&status->vclock, instance_id)); + } static const struct cmsg_hop route[] = { {relay_status_update, NULL} }; diff --git a/src/box/txn.c b/src/box/txn.c index edc1f5180..6cfa98212 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -29,6 +29,7 @@ * SUCH DAMAGE. */ #include "txn.h" +#include "txn_limbo.h" #include "engine.h" #include "tuple.h" #include "journal.h" @@ -433,7 +434,7 @@ txn_complete(struct txn *txn) engine_rollback(txn->engine, txn); if (txn_has_flag(txn, TXN_HAS_TRIGGERS)) txn_run_rollback_triggers(txn, &txn->on_rollback); - } else { + } else if (!txn_has_flag(txn, TXN_WAIT_ACK)) { /* Commit the transaction. */ if (txn->engine != NULL) engine_commit(txn->engine, txn); @@ -448,6 +449,19 @@ txn_complete(struct txn *txn) txn->signature - n_rows + 1, stop_tm - txn->start_tm); } + } else { + /* + * Complete is called on every WAL operation + * authored by this transaction. And it not always + * is one. And not always is enough for commit. + * In case the transaction is waiting for acks, it + * can't be committed right away. Give control + * back to the fiber, owning the transaction so as + * it could decide what to do next. + */ + if (txn->fiber != NULL && txn->fiber != fiber()) + fiber_wakeup(txn->fiber); + return; } /* * If there is no fiber waiting for the transaction then @@ -517,6 +531,11 @@ txn_journal_entry_new(struct txn *txn) req->approx_len += xrow_approx_len(stmt->row); } + /* + * There is no a check for all-local rows, because a local + * space can't be synchronous. So if there is at least one + * synchronous space, the transaction is not local. + */ if (is_sync) txn_set_flag(txn, TXN_WAIT_ACK); @@ -627,6 +646,7 @@ int txn_commit(struct txn *txn) { struct journal_entry *req; + struct txn_limbo_entry *limbo_entry; txn->fiber = fiber(); @@ -648,8 +668,31 @@ txn_commit(struct txn *txn) return -1; } + bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK); + if (is_sync) { + /* + * Remote rows, if any, come before local rows, so + * check for originating instance id here. + */ + uint32_t origin_id = req->rows[0]->replica_id; + + /* + * Append now. Before even WAL write is done. + * After WAL write nothing should fail, even OOM + * wouldn't be acceptable. + */ + limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn); + if (limbo_entry == NULL) { + txn_rollback(txn); + txn_free(txn); + return -1; + } + } + fiber_set_txn(fiber(), NULL); if (journal_write(req) != 0) { + if (is_sync) + txn_limbo_abort(&txn_limbo, limbo_entry); fiber_set_txn(fiber(), txn); txn_rollback(txn); txn_free(txn); @@ -658,7 +701,11 @@ txn_commit(struct txn *txn) diag_log(); return -1; } - + if (is_sync) { + txn_limbo_assign_lsn(&txn_limbo, limbo_entry, + req->rows[req->n_rows - 1]->lsn); + txn_limbo_wait_complete(&txn_limbo, limbo_entry); + } if (!txn_has_flag(txn, TXN_IS_DONE)) { txn->signature = req->res; txn_complete(txn); diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c new file mode 100644 index 000000000..9de91db93 --- /dev/null +++ b/src/box/txn_limbo.c @@ -0,0 +1,176 @@ +/* + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "txn.h" +#include "txn_limbo.h" +#include "replication.h" + +struct txn_limbo txn_limbo; + +static inline void +txn_limbo_create(struct txn_limbo *limbo) +{ + rlist_create(&limbo->queue); + limbo->instance_id = REPLICA_ID_NIL; + vclock_create(&limbo->vclock); +} + +struct txn_limbo_entry * +txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn) +{ + assert(txn_has_flag(txn, TXN_WAIT_ACK)); + if (id == 0) + id = instance_id; + if (limbo->instance_id != id) { + if (limbo->instance_id == REPLICA_ID_NIL || + rlist_empty(&limbo->queue)) { + limbo->instance_id = id; + } else { + diag_set(ClientError, ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, + limbo->instance_id); + return NULL; + } + } + size_t size; + struct txn_limbo_entry *e = region_alloc_object(&txn->region, + typeof(*e), &size); + if (e == NULL) { + diag_set(OutOfMemory, size, "region_alloc_object", "e"); + return NULL; + } + e->txn = txn; + e->lsn = -1; + e->ack_count = 0; + e->is_commit = false; + e->is_rollback = false; + rlist_add_tail_entry(&limbo->queue, e, in_queue); + return e; +} + +static inline void +txn_limbo_remove(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +{ + assert(!rlist_empty(&entry->in_queue)); + assert(rlist_first_entry(&limbo->queue, struct txn_limbo_entry, + in_queue) == entry); + (void) limbo; + rlist_del_entry(entry, in_queue); +} + +void +txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +{ + entry->is_rollback = true; + txn_limbo_remove(limbo, entry); +} + +void +txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry, + int64_t lsn) +{ + assert(limbo->instance_id != REPLICA_ID_NIL); + entry->lsn = lsn; + ++entry->ack_count; + vclock_follow(&limbo->vclock, limbo->instance_id, lsn); +} + +static bool +txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +{ + if (txn_limbo_entry_is_complete(entry)) + return true; + struct vclock_iterator iter; + vclock_iterator_init(&iter, &limbo->vclock); + int ack_count = 0; + int64_t lsn = entry->lsn; + vclock_foreach(&iter, vc) + ack_count += vc.lsn >= lsn; + assert(ack_count >= entry->ack_count); + entry->ack_count = ack_count; + entry->is_commit = ack_count >= replication_synchro_quorum; + return entry->is_commit; +} + +void +txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +{ + struct txn *txn = entry->txn; + assert(entry->lsn > 0); + assert(!txn_has_flag(txn, TXN_IS_DONE)); + assert(txn_has_flag(txn, TXN_WAIT_ACK)); + if (txn_limbo_check_complete(limbo, entry)) { + txn_limbo_remove(limbo, entry); + return; + } + bool cancellable = fiber_set_cancellable(false); + while (!txn_limbo_entry_is_complete(entry)) + fiber_yield(); + fiber_set_cancellable(cancellable); + // TODO: implement rollback. + // TODO: implement confirm. + assert(!entry->is_rollback); + txn_limbo_remove(limbo, entry); + txn_clear_flag(txn, TXN_WAIT_ACK); +} + +void +txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) +{ + if (rlist_empty(&limbo->queue)) + return; + assert(limbo->instance_id != REPLICA_ID_NIL); + int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id); + vclock_follow(&limbo->vclock, replica_id, lsn); + struct txn_limbo_entry *e, *tmp; + rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) { + if (e->lsn <= prev_lsn) + continue; + if (e->lsn > lsn) + break; + if (++e->ack_count >= replication_synchro_quorum) { + // TODO: better call complete() right + // here. Appliers use async transactions, + // and their txns don't have fibers to + // wake up. That becomes actual, when + // appliers will be supposed to wait for + // 'confirm' message. + e->is_commit = true; + rlist_del_entry(e, in_queue); + fiber_wakeup(e->txn->fiber); + } + assert(e->ack_count <= VCLOCK_MAX); + } +} + +void +txn_limbo_init(void) +{ + txn_limbo_create(&txn_limbo); +} diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h new file mode 100644 index 000000000..1ad1c567a --- /dev/null +++ b/src/box/txn_limbo.h @@ -0,0 +1,168 @@ +#pragma once +/* + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "small/rlist.h" +#include "vclock.h" + +#include + +#if defined(__cplusplus) +extern "C" { +#endif /* defined(__cplusplus) */ + +struct txn; + +/** + * Transaction and its quorum metadata, to be stored in limbo. + */ +struct txn_limbo_entry { + /** Link for limbo's queue. */ + struct rlist in_queue; + /** Transaction, waiting for a quorum. */ + struct txn *txn; + /** + * LSN of the transaction by the originator's vclock + * component. May be -1 in case the transaction is not + * written to WAL yet. + */ + int64_t lsn; + /** + * Number of ACKs. Or in other words - how many replicas + * confirmed receipt of the transaction. + */ + int ack_count; + /** + * Result flags. Only one of them can be true. But both + * can be false if the transaction is still waiting for + * its resolution. + */ + bool is_commit; + bool is_rollback; +}; + +static inline bool +txn_limbo_entry_is_complete(const struct txn_limbo_entry *e) +{ + return e->is_commit || e->is_rollback; +} + +/** + * Limbo is a place where transactions are stored, which are + * finished, but not committed nor rolled back. These are + * synchronous transactions in progress of collecting ACKs from + * replicas. + * Limbo's main purposes + * - maintain the transactions ordered by LSN of their emitter; + * - be a link between transaction and replication modules, so + * as they wouldn't depend on each other directly. + */ +struct txn_limbo { + /** + * Queue of limbo entries. Ordered by LSN. Some of the + * entries in the end may not have an LSN yet (their local + * WAL write is still in progress), but their order won't + * change anyway. Because WAL write completions will give + * them LSNs in the same order. + */ + struct rlist queue; + /** + * Instance ID of the owner of all the transactions in the + * queue. Strictly speaking, nothing prevents to store not + * own transactions here, originated from some other + * instance. But still the queue may contain only + * transactions of the same instance. Otherwise LSN order + * won't make sense - different nodes have own independent + * LSNs in their vclock components. + */ + uint32_t instance_id; + /** + * All components of the vclock are versions of the limbo + * owner's LSN, how it is visible on other nodes. For + * example, assume instance ID of the limbo is 1. Then + * vclock[1] here is local LSN of the instance 1. + * vclock[2] is how replica with ID 2 sees LSN of + * instance 1. + * vclock[3] is how replica with ID 3 sees LSN of + * instance 1, and so on. + * In that way by looking at this vclock it is always can + * be said up to which LSN there is a sync quorum for + * transactions, created on the limbo's owner node. + */ + struct vclock vclock; +}; + +/** + * Global limbo entry. So far an instance can have only one limbo, + * where master's transactions are stored. Eventually there may + * appear more than one limbo for master-master support. + */ +extern struct txn_limbo txn_limbo; + +/** + * Allocate, create, and append a new transaction to the limbo. + * The limbo entry is allocated on the transaction's region. + */ +struct txn_limbo_entry * +txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn); + +/** Remove the entry from the limbo, mark as rolled back. */ +void +txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry); + +/** + * Assign local LSN to the limbo entry. That happens when the + * transaction is added to the limbo, writes to WAL, and gets an + * LSN. + */ +void +txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry, + int64_t lsn); + +/** + * Ack all transactions up to the given LSN on behalf of the + * replica with the specified ID. + */ +void +txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); + +/** + * Block the current fiber until the transaction in the limbo + * entry is either committed or rolled back. + */ +void +txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); + +void +txn_limbo_init(); + +#if defined(__cplusplus) +} +#endif /* defined(__cplusplus) */ diff --git a/test/box/error.result b/test/box/error.result index 2196fa541..69c471085 100644 --- a/test/box/error.result +++ b/test/box/error.result @@ -432,6 +432,7 @@ t; | 211: box.error.WRONG_QUERY_ID | 212: box.error.SEQUENCE_NOT_STARTED | 213: box.error.NO_SUCH_SESSION_SETTING + | 214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS | ... test_run:cmd("setopt delimiter ''"); -- 2.21.1 (Apple Git-122.3)