From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp41.i.mail.ru (smtp41.i.mail.ru [94.100.177.101]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id E107641C5DB for ; Sat, 20 Jun 2020 18:06:30 +0300 (MSK) References: <3210e1e6f867cfd1c1f65e05f28a32deae63c172.1591701695.git.sergepetrenko@tarantool.org> From: Leonid Vasiliev Message-ID: <5a4a21b5-b422-5516-ad26-b1637fd235b0@tarantool.org> Date: Sat, 20 Jun 2020 18:06:28 +0300 MIME-Version: 1.0 In-Reply-To: <3210e1e6f867cfd1c1f65e05f28a32deae63c172.1591701695.git.sergepetrenko@tarantool.org> Content-Type: text/plain; charset=utf-8; format=flowed Content-Language: en-US Content-Transfer-Encoding: 7bit Subject: Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Serge Petrenko , v.shpilevoy@tarantool.org, sergos@tarantool.org, gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org Hi! Thank you for the patch. LGTM. All the following comments can be skipped silently. On 09.06.2020 15:20, Serge Petrenko wrote: > Make txn_limbo write a CONFIRM entry as soon as a batch of entries > receive their acks. CONFIRM entry is written to WAL and later replicated > to all the replicas. > > Now replicas put synchronous transactions into txn_limbo and wait for > corresponding confirmation entries to arrive and end up in their WAL > before committing the transactions. > > Part-of #4847 > --- > src/box/applier.cc | 81 ++++++++++++++++++++++++++++++++++++++++++- > src/box/box.cc | 3 ++ > src/box/errcode.h | 1 + > src/box/relay.cc | 13 ++++--- > src/box/txn.c | 75 ++++++++++++++++++++++++++++++--------- > src/box/txn.h | 23 ++++++++++++ > src/box/txn_limbo.c | 79 ++++++++++++++++++++++++++++++++++++----- > src/box/txn_limbo.h | 6 ++++ > test/box/error.result | 1 + > 9 files changed, 252 insertions(+), 30 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index df48b4796..1dc977424 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -51,6 +51,7 @@ > #include "txn.h" > #include "box.h" > #include "scoped_guard.h" > +#include "txn_limbo.h" > > STRS(applier_state, applier_STATE); > > @@ -214,6 +215,11 @@ apply_snapshot_row(struct xrow_header *row) > struct txn *txn = txn_begin(); > if (txn == NULL) > return -1; > + /* > + * Do not wait for confirmation when fetching a snapshot. > + * Master only sends confirmed rows during join. > + */ > + txn_force_async(txn); > if (txn_begin_stmt(txn, space) != 0) > goto rollback; > /* no access checks here - applier always works with admin privs */ > @@ -249,10 +255,73 @@ process_nop(struct request *request) > return txn_commit_stmt(txn, request); > } > > +/* > + * An on_commit trigger set on a txn containing a CONFIRM entry. > + * Confirms some of the txs waiting in txn_limbo. In txn.h "txns" notation is used (up to you). > + */ > +static int > +applier_on_confirm(struct trigger *trig, void *data) > +{ > + (void) trig; > + int64_t lsn = *(int64_t *)data; > + txn_limbo_read_confirm(&txn_limbo, lsn); > + return 0; > +} > + > +static int > +process_confirm(struct request *request) > +{ > + assert(request->header->type = IPROTO_CONFIRM); > + uint32_t replica_id; > + struct txn *txn = in_txn(); > + int64_t *lsn = (int64_t *) region_alloc(&txn->region, sizeof(int64_t)); > + if (lsn == NULL) { > + diag_set(OutOfMemory, sizeof(int64_t), "region_alloc", "lsn"); > + return -1; > + } > + if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0) > + return -1; > + /* > + * on_commit trigger failure is not allowed, so check for > + * instance id early. > + */ > + if (replica_id != txn_limbo.instance_id) { > + diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id, > + txn_limbo.instance_id); > + return -1; > + } > + > + /* > + * Set an on_commit trigger which will perform the actual > + * confirmation processing. > + */ > + struct trigger *trig = (struct trigger *)region_alloc(&txn->region, > + sizeof(*trig)); > + if (trig == NULL) { > + diag_set(OutOfMemory, sizeof(*trig), "region_alloc", "trig"); > + return -1; > + } > + trigger_create(trig, applier_on_confirm, lsn, NULL); > + > + if (txn_begin_stmt(txn, NULL) != 0) > + return -1; > + > + if (txn_commit_stmt(txn, request) == 0) { > + txn_on_commit(txn, trig); > + return 0; > + } else { > + return -1; > + } Is I understood corectly that this is a trick like in the process_nop() to promote vclock and ...? Maybe, add a coment? > +} > + > static int > apply_row(struct xrow_header *row) > { > struct request request; > + if (row->type == IPROTO_CONFIRM) { > + request.header = row; > + return process_confirm(&request); > + } > if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) > return -1; > if (request.type == IPROTO_NOP) > @@ -273,6 +342,11 @@ apply_final_join_row(struct xrow_header *row) > struct txn *txn = txn_begin(); > if (txn == NULL) > return -1; > + /* > + * Do not wait for confirmation while processing final > + * join rows. See apply_snapshot_row(). > + */ > + txn_force_async(txn); > if (apply_row(row) != 0) { > txn_rollback(txn); > fiber_gc(); > @@ -492,7 +566,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count) > applier->last_row_time = ev_monotonic_now(loop()); > if (iproto_type_is_dml(row.type)) { > vclock_follow_xrow(&replicaset.vclock, &row); > - if (apply_final_join_row(&row) != 0) > + /* > + * Confirms are ignored during join. All the > + * data master sends us is valid. > + */ > + if (row.type != IPROTO_CONFIRM && > + apply_final_join_row(&row) != 0) > diag_raise(); > if (++row_count % 100000 == 0) > say_info("%.1fM rows received", row_count / 1e6); > diff --git a/src/box/box.cc b/src/box/box.cc > index 64ac89975..792c3c394 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -342,6 +342,9 @@ static void > apply_wal_row(struct xstream *stream, struct xrow_header *row) > { > struct request request; > + // TODO: process confirmation during recovery. > + if (row->type == IPROTO_CONFIRM) > + return; > xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); > if (request.type != IPROTO_NOP) { > struct space *space = space_cache_find_xc(request.space_id); > diff --git a/src/box/errcode.h b/src/box/errcode.h > index 019c582af..3ba6866e5 100644 > --- a/src/box/errcode.h > +++ b/src/box/errcode.h > @@ -267,6 +267,7 @@ struct errcode_record { > /*212 */_(ER_SEQUENCE_NOT_STARTED, "Sequence '%s' is not started") \ > /*213 */_(ER_NO_SUCH_SESSION_SETTING, "Session setting %s doesn't exist") \ > /*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \ > + /*215 */_(ER_SYNC_MASTER_MISMATCH, "CONFIRM message arrived for an unknown master id %d, expected %d") \ > > /* > * !IMPORTANT! Please follow instructions at start of the file > diff --git a/src/box/relay.cc b/src/box/relay.cc > index 333e91ea9..4df3c2f26 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -402,10 +402,14 @@ tx_status_update(struct cmsg *msg) > vclock_copy(&status->relay->tx.vclock, &status->vclock); > /* > * Let pending synchronous transactions know, which of > - * them were successfully sent to the replica. > + * them were successfully sent to the replica. Acks are > + * collected only on the master. Other instances wait for > + * master's CONFIRM message instead. > */ > - txn_limbo_ack(&txn_limbo, status->relay->replica->id, > - vclock_get(&status->vclock, instance_id)); > + if (txn_limbo.instance_id == instance_id) { > + txn_limbo_ack(&txn_limbo, status->relay->replica->id, > + vclock_get(&status->vclock, instance_id)); > + } > static const struct cmsg_hop route[] = { > {relay_status_update, NULL} > }; > @@ -766,7 +770,8 @@ static void > relay_send_row(struct xstream *stream, struct xrow_header *packet) > { > struct relay *relay = container_of(stream, struct relay, stream); > - assert(iproto_type_is_dml(packet->type)); > + assert(iproto_type_is_dml(packet->type) || > + packet->type == IPROTO_CONFIRM); > if (packet->group_id == GROUP_LOCAL) { > /* > * We do not relay replica-local rows to other > diff --git a/src/box/txn.c b/src/box/txn.c > index a65100b31..3b331fecc 100644 > --- a/src/box/txn.c > +++ b/src/box/txn.c > @@ -36,6 +36,7 @@ > #include > #include "xrow.h" > #include "errinj.h" > +#include "iproto_constants.h" > > double too_long_threshold; > > @@ -81,7 +82,12 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request) > */ > struct space *space = stmt->space; > row->group_id = space != NULL ? space_group_id(space) : 0; > - row->bodycnt = xrow_encode_dml(request, &txn->region, row->body); > + /* > + * IPROTO_CONFIRM entries are supplementary and aren't > + * valid dml requests. They're encoded manually. > + */ > + if (likely(row->type != IPROTO_CONFIRM)) > + row->bodycnt = xrow_encode_dml(request, &txn->region, row->body); > if (row->bodycnt < 0) > return -1; > stmt->row = row; > @@ -321,8 +327,10 @@ txn_commit_stmt(struct txn *txn, struct request *request) > */ > struct txn_stmt *stmt = txn_current_stmt(txn); > > - /* Create WAL record for the write requests in non-temporary spaces. > - * stmt->space can be NULL for IRPOTO_NOP. > + /* > + * Create WAL record for the write requests in > + * non-temporary spaces. stmt->space can be NULL for > + * IRPOTO_NOP or IPROTO_CONFIRM. > */ > if (stmt->space == NULL || !space_is_temporary(stmt->space)) { > if (txn_add_redo(txn, stmt, request) != 0) > @@ -417,12 +425,12 @@ txn_run_rollback_triggers(struct txn *txn, struct rlist *triggers) > /** > * Complete transaction processing. > */ > -static void > +void > txn_complete(struct txn *txn) > { > /* > * Note, engine can be NULL if transaction contains > - * IPROTO_NOP statements only. > + * IPROTO_NOP or IPROTO_CONFIRM statements. > */ > if (txn->signature < 0) { > /* Undo the transaction. */ > @@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn) > struct xrow_header **remote_row = req->rows; > struct xrow_header **local_row = req->rows + txn->n_applier_rows; > bool is_sync = false; > - /* > - * Only local transactions, originated from the master, > - * can enter 'waiting for acks' state. It means, only > - * author of the transaction can collect acks. Replicas > - * consider it a normal async transaction so far. > - */ > - bool is_local = true; > > stailq_foreach_entry(stmt, &txn->stmts, next) { > if (stmt->has_triggers) { > @@ -530,17 +531,18 @@ txn_journal_entry_new(struct txn *txn) > if (stmt->row == NULL) > continue; > > - if (stmt->row->replica_id == 0) { > + if (stmt->row->replica_id == 0) > *local_row++ = stmt->row; > - } else { > + else > *remote_row++ = stmt->row; > - is_local = false; > - } > > req->approx_len += xrow_approx_len(stmt->row); > } > - if (is_sync && is_local) > + > + is_sync = is_sync && !txn_has_flag(txn, TXN_FORCE_ASYNC); > + if (is_sync) { > txn_set_flag(txn, TXN_WAIT_ACK); > + } > > assert(remote_row == req->rows + txn->n_applier_rows); > assert(local_row == remote_row + txn->n_new_rows); > @@ -601,6 +603,19 @@ txn_commit_nop(struct txn *txn) > return false; > } > > +/* > + * A trigger called on tx rollback due to a failed WAL write, > + * when tx is waiting for confirmation. > + */ > +static int > +txn_limbo_on_rollback(struct trigger *trig, void *data) > +{ > + (void) trig; > + struct txn_limbo_entry *entry = (struct txn_limbo_entry *) data; > + txn_limbo_abort(&txn_limbo, entry); > + return 0; > +} > + > int > txn_commit_async(struct txn *txn) > { > @@ -632,16 +647,42 @@ txn_commit_async(struct txn *txn) > return -1; > } > > + bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK); > + struct txn_limbo_entry *limbo_entry; > + if (is_sync) { > + /* See txn_commit(). */ > + uint32_t origin_id = req->rows[0]->replica_id; > + int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn; > + limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn); > + if (limbo_entry == NULL) { > + txn_rollback(txn); > + txn_free(txn); > + return -1; > + } > + assert(lsn > 0); > + txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn); > + } > + > fiber_set_txn(fiber(), NULL); > if (journal_write_async(req) != 0) { > fiber_set_txn(fiber(), txn); > txn_rollback(txn); > + txn_limbo_abort(&txn_limbo, limbo_entry); > > diag_set(ClientError, ER_WAL_IO); > diag_log(); > return -1; > } > > + /* > + * Set a trigger to abort waiting for confirm on WAL write > + * failure. > + */ > + if (is_sync) { > + struct trigger trig; > + trigger_create(&trig, txn_limbo_on_rollback, limbo_entry, NULL); > + txn_on_rollback(txn, &trig); > + } > return 0; > } > > diff --git a/src/box/txn.h b/src/box/txn.h > index 232cc07a8..e7705bb48 100644 > --- a/src/box/txn.h > +++ b/src/box/txn.h > @@ -73,6 +73,13 @@ enum txn_flag { > * then finishes commit and returns success to a user. > */ > TXN_WAIT_ACK, > + /** > + * A transaction mustn't wait for confirmation, even if it > + * touches synchronous spaces. Needed for join stage on > + * replica, when all the data coming from the master is > + * already confirmed by design. > + */ > + TXN_FORCE_ASYNC, > }; > > enum { > @@ -257,6 +264,16 @@ txn_clear_flag(struct txn *txn, enum txn_flag flag) > txn->flags &= ~(1 << flag); > } > > +/** > + * Force async mode for transaction. It won't wait for acks > + * or confirmation. > + */ > +static inline void > +txn_force_async(struct txn *txn) > +{ > + txn_set_flag(txn, TXN_FORCE_ASYNC); > +} > + > /* Pointer to the current transaction (if any) */ > static inline struct txn * > in_txn(void) > @@ -278,6 +295,12 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn) > struct txn * > txn_begin(void); > > +/** > + * Complete transaction processing. > + */ > +void > +txn_complete(struct txn *txn); > + > /** > * Commit a transaction. > * @pre txn == in_txn() > diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c > index efb97a591..daec98317 100644 > --- a/src/box/txn_limbo.c > +++ b/src/box/txn_limbo.c > @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) > fiber_yield(); > fiber_set_cancellable(cancellable); > // TODO: implement rollback. > - // TODO: implement confirm. > assert(!entry->is_rollback); > + assert(entry->is_commit); > txn_limbo_remove(limbo, entry); > txn_clear_flag(txn, TXN_WAIT_ACK); > } > > +/** > + * Write a confirmation entry to WAL. After it's written all the > + * transactions waiting for confirmation may be finished. > + */ > +static int > +txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry) > +{ > + /* Prepare a confirm entry. */ > + struct xrow_header row = {0}; > + struct request request = {0}; > + request.header = &row; > + > + row.bodycnt = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn); > + if (row.bodycnt < 0) > + return -1; > + > + struct txn *txn = txn_begin(); > + if (txn == NULL) > + return -1; > + > + if (txn_begin_stmt(txn, NULL) != 0) > + goto rollback; > + if (txn_commit_stmt(txn, &request) != 0) > + goto rollback; > + > + return txn_commit(txn); > +rollback: > + txn_rollback(txn); > + return -1; > +} > + > +void > +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) > +{ > + assert(limbo->instance_id != REPLICA_ID_NIL && > + limbo->instance_id != instance_id); > + struct txn_limbo_entry *e, *tmp; > + rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) { > + if (e->lsn > lsn) > + break; > + assert(e->txn->fiber == NULL); > + e->is_commit = true; > + txn_limbo_remove(limbo, e); > + txn_clear_flag(e->txn, TXN_WAIT_ACK); > + /* > + * txn_complete_async must've been called already, > + * since CONFIRM always follows the tx in question. > + * So, finish this tx processing right away. > + */ > + txn_complete(e->txn); > + } > +} > + > void > txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) > { > @@ -143,23 +196,33 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) > int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id); > vclock_follow(&limbo->vclock, replica_id, lsn); > struct txn_limbo_entry *e; > + struct txn_limbo_entry *last_quorum = NULL; > rlist_foreach_entry(e, &limbo->queue, in_queue) { > if (e->lsn <= prev_lsn) > continue; > if (e->lsn > lsn) > break; > if (++e->ack_count >= replication_sync_quorum) { > - // TODO: better call complete() right > - // here. Appliers use async transactions, > - // and their txns don't have fibers to > - // wake up. That becomes actual, when > - // appliers will be supposed to wait for > - // 'confirm' message. > e->is_commit = true; > - fiber_wakeup(e->txn->fiber); > + last_quorum = e; > } > assert(e->ack_count <= VCLOCK_MAX); > } > + if (last_quorum != NULL) { > + if (txn_limbo_write_confirm(limbo, last_quorum) != 0) { > + // TODO: rollback. > + return; > + } > + /* > + * Wakeup all the entries in direct order as soon > + * as confirmation message is written to WAL. > + */ > + rlist_foreach_entry(e, &limbo->queue, in_queue) { > + fiber_wakeup(e->txn->fiber); > + if (e == last_quorum) > + break; > + } > + } > } > > void > diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h > index 1ad1c567a..de415cd97 100644 > --- a/src/box/txn_limbo.h > +++ b/src/box/txn_limbo.h > @@ -160,6 +160,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); > void > txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); > > +/** > + * Confirm all the entries up to the given master's LSN. > + */ > +void > +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn); > + > void > txn_limbo_init(); > > diff --git a/test/box/error.result b/test/box/error.result > index 69c471085..34ded3930 100644 > --- a/test/box/error.result > +++ b/test/box/error.result > @@ -433,6 +433,7 @@ t; > | 212: box.error.SEQUENCE_NOT_STARTED > | 213: box.error.NO_SUCH_SESSION_SETTING > | 214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS > + | 215: box.error.SYNC_MASTER_MISMATCH > | ... > > test_run:cmd("setopt delimiter ''"); >