From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp50.i.mail.ru (smtp50.i.mail.ru [94.100.177.110]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 9EDC945C307 for ; Tue, 9 Jun 2020 15:20:53 +0300 (MSK) From: Serge Petrenko Date: Tue, 9 Jun 2020 15:20:20 +0300 Message-Id: <3210e1e6f867cfd1c1f65e05f28a32deae63c172.1591701695.git.sergepetrenko@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: v.shpilevoy@tarantool.org, sergos@tarantool.org, gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org Make txn_limbo write a CONFIRM entry as soon as a batch of entries receive their acks. CONFIRM entry is written to WAL and later replicated to all the replicas. Now replicas put synchronous transactions into txn_limbo and wait for corresponding confirmation entries to arrive and end up in their WAL before committing the transactions. Part-of #4847 --- src/box/applier.cc | 81 ++++++++++++++++++++++++++++++++++++++++++- src/box/box.cc | 3 ++ src/box/errcode.h | 1 + src/box/relay.cc | 13 ++++--- src/box/txn.c | 75 ++++++++++++++++++++++++++++++--------- src/box/txn.h | 23 ++++++++++++ src/box/txn_limbo.c | 79 ++++++++++++++++++++++++++++++++++++----- src/box/txn_limbo.h | 6 ++++ test/box/error.result | 1 + 9 files changed, 252 insertions(+), 30 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index df48b4796..1dc977424 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -51,6 +51,7 @@ #include "txn.h" #include "box.h" #include "scoped_guard.h" +#include "txn_limbo.h" STRS(applier_state, applier_STATE); @@ -214,6 +215,11 @@ apply_snapshot_row(struct xrow_header *row) struct txn *txn = txn_begin(); if (txn == NULL) return -1; + /* + * Do not wait for confirmation when fetching a snapshot. + * Master only sends confirmed rows during join. + */ + txn_force_async(txn); if (txn_begin_stmt(txn, space) != 0) goto rollback; /* no access checks here - applier always works with admin privs */ @@ -249,10 +255,73 @@ process_nop(struct request *request) return txn_commit_stmt(txn, request); } +/* + * An on_commit trigger set on a txn containing a CONFIRM entry. + * Confirms some of the txs waiting in txn_limbo. + */ +static int +applier_on_confirm(struct trigger *trig, void *data) +{ + (void) trig; + int64_t lsn = *(int64_t *)data; + txn_limbo_read_confirm(&txn_limbo, lsn); + return 0; +} + +static int +process_confirm(struct request *request) +{ + assert(request->header->type = IPROTO_CONFIRM); + uint32_t replica_id; + struct txn *txn = in_txn(); + int64_t *lsn = (int64_t *) region_alloc(&txn->region, sizeof(int64_t)); + if (lsn == NULL) { + diag_set(OutOfMemory, sizeof(int64_t), "region_alloc", "lsn"); + return -1; + } + if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0) + return -1; + /* + * on_commit trigger failure is not allowed, so check for + * instance id early. + */ + if (replica_id != txn_limbo.instance_id) { + diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id, + txn_limbo.instance_id); + return -1; + } + + /* + * Set an on_commit trigger which will perform the actual + * confirmation processing. + */ + struct trigger *trig = (struct trigger *)region_alloc(&txn->region, + sizeof(*trig)); + if (trig == NULL) { + diag_set(OutOfMemory, sizeof(*trig), "region_alloc", "trig"); + return -1; + } + trigger_create(trig, applier_on_confirm, lsn, NULL); + + if (txn_begin_stmt(txn, NULL) != 0) + return -1; + + if (txn_commit_stmt(txn, request) == 0) { + txn_on_commit(txn, trig); + return 0; + } else { + return -1; + } +} + static int apply_row(struct xrow_header *row) { struct request request; + if (row->type == IPROTO_CONFIRM) { + request.header = row; + return process_confirm(&request); + } if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) return -1; if (request.type == IPROTO_NOP) @@ -273,6 +342,11 @@ apply_final_join_row(struct xrow_header *row) struct txn *txn = txn_begin(); if (txn == NULL) return -1; + /* + * Do not wait for confirmation while processing final + * join rows. See apply_snapshot_row(). + */ + txn_force_async(txn); if (apply_row(row) != 0) { txn_rollback(txn); fiber_gc(); @@ -492,7 +566,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count) applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { vclock_follow_xrow(&replicaset.vclock, &row); - if (apply_final_join_row(&row) != 0) + /* + * Confirms are ignored during join. All the + * data master sends us is valid. + */ + if (row.type != IPROTO_CONFIRM && + apply_final_join_row(&row) != 0) diag_raise(); if (++row_count % 100000 == 0) say_info("%.1fM rows received", row_count / 1e6); diff --git a/src/box/box.cc b/src/box/box.cc index 64ac89975..792c3c394 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -342,6 +342,9 @@ static void apply_wal_row(struct xstream *stream, struct xrow_header *row) { struct request request; + // TODO: process confirmation during recovery. + if (row->type == IPROTO_CONFIRM) + return; xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); if (request.type != IPROTO_NOP) { struct space *space = space_cache_find_xc(request.space_id); diff --git a/src/box/errcode.h b/src/box/errcode.h index 019c582af..3ba6866e5 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -267,6 +267,7 @@ struct errcode_record { /*212 */_(ER_SEQUENCE_NOT_STARTED, "Sequence '%s' is not started") \ /*213 */_(ER_NO_SUCH_SESSION_SETTING, "Session setting %s doesn't exist") \ /*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \ + /*215 */_(ER_SYNC_MASTER_MISMATCH, "CONFIRM message arrived for an unknown master id %d, expected %d") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/box/relay.cc b/src/box/relay.cc index 333e91ea9..4df3c2f26 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -402,10 +402,14 @@ tx_status_update(struct cmsg *msg) vclock_copy(&status->relay->tx.vclock, &status->vclock); /* * Let pending synchronous transactions know, which of - * them were successfully sent to the replica. + * them were successfully sent to the replica. Acks are + * collected only on the master. Other instances wait for + * master's CONFIRM message instead. */ - txn_limbo_ack(&txn_limbo, status->relay->replica->id, - vclock_get(&status->vclock, instance_id)); + if (txn_limbo.instance_id == instance_id) { + txn_limbo_ack(&txn_limbo, status->relay->replica->id, + vclock_get(&status->vclock, instance_id)); + } static const struct cmsg_hop route[] = { {relay_status_update, NULL} }; @@ -766,7 +770,8 @@ static void relay_send_row(struct xstream *stream, struct xrow_header *packet) { struct relay *relay = container_of(stream, struct relay, stream); - assert(iproto_type_is_dml(packet->type)); + assert(iproto_type_is_dml(packet->type) || + packet->type == IPROTO_CONFIRM); if (packet->group_id == GROUP_LOCAL) { /* * We do not relay replica-local rows to other diff --git a/src/box/txn.c b/src/box/txn.c index a65100b31..3b331fecc 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -36,6 +36,7 @@ #include #include "xrow.h" #include "errinj.h" +#include "iproto_constants.h" double too_long_threshold; @@ -81,7 +82,12 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request) */ struct space *space = stmt->space; row->group_id = space != NULL ? space_group_id(space) : 0; - row->bodycnt = xrow_encode_dml(request, &txn->region, row->body); + /* + * IPROTO_CONFIRM entries are supplementary and aren't + * valid dml requests. They're encoded manually. + */ + if (likely(row->type != IPROTO_CONFIRM)) + row->bodycnt = xrow_encode_dml(request, &txn->region, row->body); if (row->bodycnt < 0) return -1; stmt->row = row; @@ -321,8 +327,10 @@ txn_commit_stmt(struct txn *txn, struct request *request) */ struct txn_stmt *stmt = txn_current_stmt(txn); - /* Create WAL record for the write requests in non-temporary spaces. - * stmt->space can be NULL for IRPOTO_NOP. + /* + * Create WAL record for the write requests in + * non-temporary spaces. stmt->space can be NULL for + * IRPOTO_NOP or IPROTO_CONFIRM. */ if (stmt->space == NULL || !space_is_temporary(stmt->space)) { if (txn_add_redo(txn, stmt, request) != 0) @@ -417,12 +425,12 @@ txn_run_rollback_triggers(struct txn *txn, struct rlist *triggers) /** * Complete transaction processing. */ -static void +void txn_complete(struct txn *txn) { /* * Note, engine can be NULL if transaction contains - * IPROTO_NOP statements only. + * IPROTO_NOP or IPROTO_CONFIRM statements. */ if (txn->signature < 0) { /* Undo the transaction. */ @@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn) struct xrow_header **remote_row = req->rows; struct xrow_header **local_row = req->rows + txn->n_applier_rows; bool is_sync = false; - /* - * Only local transactions, originated from the master, - * can enter 'waiting for acks' state. It means, only - * author of the transaction can collect acks. Replicas - * consider it a normal async transaction so far. - */ - bool is_local = true; stailq_foreach_entry(stmt, &txn->stmts, next) { if (stmt->has_triggers) { @@ -530,17 +531,18 @@ txn_journal_entry_new(struct txn *txn) if (stmt->row == NULL) continue; - if (stmt->row->replica_id == 0) { + if (stmt->row->replica_id == 0) *local_row++ = stmt->row; - } else { + else *remote_row++ = stmt->row; - is_local = false; - } req->approx_len += xrow_approx_len(stmt->row); } - if (is_sync && is_local) + + is_sync = is_sync && !txn_has_flag(txn, TXN_FORCE_ASYNC); + if (is_sync) { txn_set_flag(txn, TXN_WAIT_ACK); + } assert(remote_row == req->rows + txn->n_applier_rows); assert(local_row == remote_row + txn->n_new_rows); @@ -601,6 +603,19 @@ txn_commit_nop(struct txn *txn) return false; } +/* + * A trigger called on tx rollback due to a failed WAL write, + * when tx is waiting for confirmation. + */ +static int +txn_limbo_on_rollback(struct trigger *trig, void *data) +{ + (void) trig; + struct txn_limbo_entry *entry = (struct txn_limbo_entry *) data; + txn_limbo_abort(&txn_limbo, entry); + return 0; +} + int txn_commit_async(struct txn *txn) { @@ -632,16 +647,42 @@ txn_commit_async(struct txn *txn) return -1; } + bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK); + struct txn_limbo_entry *limbo_entry; + if (is_sync) { + /* See txn_commit(). */ + uint32_t origin_id = req->rows[0]->replica_id; + int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn; + limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn); + if (limbo_entry == NULL) { + txn_rollback(txn); + txn_free(txn); + return -1; + } + assert(lsn > 0); + txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn); + } + fiber_set_txn(fiber(), NULL); if (journal_write_async(req) != 0) { fiber_set_txn(fiber(), txn); txn_rollback(txn); + txn_limbo_abort(&txn_limbo, limbo_entry); diag_set(ClientError, ER_WAL_IO); diag_log(); return -1; } + /* + * Set a trigger to abort waiting for confirm on WAL write + * failure. + */ + if (is_sync) { + struct trigger trig; + trigger_create(&trig, txn_limbo_on_rollback, limbo_entry, NULL); + txn_on_rollback(txn, &trig); + } return 0; } diff --git a/src/box/txn.h b/src/box/txn.h index 232cc07a8..e7705bb48 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -73,6 +73,13 @@ enum txn_flag { * then finishes commit and returns success to a user. */ TXN_WAIT_ACK, + /** + * A transaction mustn't wait for confirmation, even if it + * touches synchronous spaces. Needed for join stage on + * replica, when all the data coming from the master is + * already confirmed by design. + */ + TXN_FORCE_ASYNC, }; enum { @@ -257,6 +264,16 @@ txn_clear_flag(struct txn *txn, enum txn_flag flag) txn->flags &= ~(1 << flag); } +/** + * Force async mode for transaction. It won't wait for acks + * or confirmation. + */ +static inline void +txn_force_async(struct txn *txn) +{ + txn_set_flag(txn, TXN_FORCE_ASYNC); +} + /* Pointer to the current transaction (if any) */ static inline struct txn * in_txn(void) @@ -278,6 +295,12 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn) struct txn * txn_begin(void); +/** + * Complete transaction processing. + */ +void +txn_complete(struct txn *txn); + /** * Commit a transaction. * @pre txn == in_txn() diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index efb97a591..daec98317 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) fiber_yield(); fiber_set_cancellable(cancellable); // TODO: implement rollback. - // TODO: implement confirm. assert(!entry->is_rollback); + assert(entry->is_commit); txn_limbo_remove(limbo, entry); txn_clear_flag(txn, TXN_WAIT_ACK); } +/** + * Write a confirmation entry to WAL. After it's written all the + * transactions waiting for confirmation may be finished. + */ +static int +txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry) +{ + /* Prepare a confirm entry. */ + struct xrow_header row = {0}; + struct request request = {0}; + request.header = &row; + + row.bodycnt = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn); + if (row.bodycnt < 0) + return -1; + + struct txn *txn = txn_begin(); + if (txn == NULL) + return -1; + + if (txn_begin_stmt(txn, NULL) != 0) + goto rollback; + if (txn_commit_stmt(txn, &request) != 0) + goto rollback; + + return txn_commit(txn); +rollback: + txn_rollback(txn); + return -1; +} + +void +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) +{ + assert(limbo->instance_id != REPLICA_ID_NIL && + limbo->instance_id != instance_id); + struct txn_limbo_entry *e, *tmp; + rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) { + if (e->lsn > lsn) + break; + assert(e->txn->fiber == NULL); + e->is_commit = true; + txn_limbo_remove(limbo, e); + txn_clear_flag(e->txn, TXN_WAIT_ACK); + /* + * txn_complete_async must've been called already, + * since CONFIRM always follows the tx in question. + * So, finish this tx processing right away. + */ + txn_complete(e->txn); + } +} + void txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) { @@ -143,23 +196,33 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id); vclock_follow(&limbo->vclock, replica_id, lsn); struct txn_limbo_entry *e; + struct txn_limbo_entry *last_quorum = NULL; rlist_foreach_entry(e, &limbo->queue, in_queue) { if (e->lsn <= prev_lsn) continue; if (e->lsn > lsn) break; if (++e->ack_count >= replication_sync_quorum) { - // TODO: better call complete() right - // here. Appliers use async transactions, - // and their txns don't have fibers to - // wake up. That becomes actual, when - // appliers will be supposed to wait for - // 'confirm' message. e->is_commit = true; - fiber_wakeup(e->txn->fiber); + last_quorum = e; } assert(e->ack_count <= VCLOCK_MAX); } + if (last_quorum != NULL) { + if (txn_limbo_write_confirm(limbo, last_quorum) != 0) { + // TODO: rollback. + return; + } + /* + * Wakeup all the entries in direct order as soon + * as confirmation message is written to WAL. + */ + rlist_foreach_entry(e, &limbo->queue, in_queue) { + fiber_wakeup(e->txn->fiber); + if (e == last_quorum) + break; + } + } } void diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index 1ad1c567a..de415cd97 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -160,6 +160,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); void txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); +/** + * Confirm all the entries up to the given master's LSN. + */ +void +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn); + void txn_limbo_init(); diff --git a/test/box/error.result b/test/box/error.result index 69c471085..34ded3930 100644 --- a/test/box/error.result +++ b/test/box/error.result @@ -433,6 +433,7 @@ t; | 212: box.error.SEQUENCE_NOT_STARTED | 213: box.error.NO_SUCH_SESSION_SETTING | 214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS + | 215: box.error.SYNC_MASTER_MISMATCH | ... test_run:cmd("setopt delimiter ''"); -- 2.24.3 (Apple Git-128)