From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng1.m.smailru.net (smtpng1.m.smailru.net [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 8440A42EF5F for ; Thu, 11 Jun 2020 02:51:42 +0300 (MSK) References: <3210e1e6f867cfd1c1f65e05f28a32deae63c172.1591701695.git.sergepetrenko@tarantool.org> From: Vladislav Shpilevoy Message-ID: <7228506b-2eff-befb-43b7-f933a8844867@tarantool.org> Date: Thu, 11 Jun 2020 01:51:40 +0200 MIME-Version: 1.0 In-Reply-To: <3210e1e6f867cfd1c1f65e05f28a32deae63c172.1591701695.git.sergepetrenko@tarantool.org> Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 7bit Subject: Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Serge Petrenko , sergos@tarantool.org, gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org Thanks for the patch! > diff --git a/src/box/applier.cc b/src/box/applier.cc > index df48b4796..1dc977424 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -249,10 +255,73 @@ process_nop(struct request *request) > return txn_commit_stmt(txn, request); > } > > +/* > + * An on_commit trigger set on a txn containing a CONFIRM entry. > + * Confirms some of the txs waiting in txn_limbo. > + */ > +static int > +applier_on_confirm(struct trigger *trig, void *data) > +{ > + (void) trig; > + int64_t lsn = *(int64_t *)data; > + txn_limbo_read_confirm(&txn_limbo, lsn); > + return 0; > +} > + > +static int > +process_confirm(struct request *request) > +{ > + assert(request->header->type = IPROTO_CONFIRM); > + uint32_t replica_id; > + struct txn *txn = in_txn(); > + int64_t *lsn = (int64_t *) region_alloc(&txn->region, sizeof(int64_t)); > + if (lsn == NULL) { I changed that to region_alloc_object(). To keep alignment correct. Generally, we should keep in mind, that raw region_alloc() now is close to being forbidden. It can be used only for byte buffers like strings, MessagePack. > + diag_set(OutOfMemory, sizeof(int64_t), "region_alloc", "lsn"); > + return -1; > + } > + if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0) > + return -1; > + /* > + * on_commit trigger failure is not allowed, so check for > + * instance id early. > + */ > + if (replica_id != txn_limbo.instance_id) { > + diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id, > + txn_limbo.instance_id); > + return -1; > + } > + > + /* > + * Set an on_commit trigger which will perform the actual > + * confirmation processing. > + */ > + struct trigger *trig = (struct trigger *)region_alloc(&txn->region, > + sizeof(*trig)); Changed to region_alloc_object(). > + if (trig == NULL) { > + diag_set(OutOfMemory, sizeof(*trig), "region_alloc", "trig"); > + return -1; > + } > + trigger_create(trig, applier_on_confirm, lsn, NULL); > + > + if (txn_begin_stmt(txn, NULL) != 0) > + return -1; > + > + if (txn_commit_stmt(txn, request) == 0) { > + txn_on_commit(txn, trig); > + return 0; > + } else { > + return -1; > + } > @@ -492,7 +566,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count) > applier->last_row_time = ev_monotonic_now(loop()); > if (iproto_type_is_dml(row.type)) { > vclock_follow_xrow(&replicaset.vclock, &row); > - if (apply_final_join_row(&row) != 0) > + /* > + * Confirms are ignored during join. All the > + * data master sends us is valid. > + */ > + if (row.type != IPROTO_CONFIRM && I moved the check into apply_final_join_row(). To be consistent with apply_row() and apply_wal_row(). > + apply_final_join_row(&row) != 0) > diag_raise(); > if (++row_count % 100000 == 0) > say_info("%.1fM rows received", row_count / 1e6); > diff --git a/src/box/relay.cc b/src/box/relay.cc > index 333e91ea9..4df3c2f26 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -402,10 +402,14 @@ tx_status_update(struct cmsg *msg) > vclock_copy(&status->relay->tx.vclock, &status->vclock); > /* > * Let pending synchronous transactions know, which of > - * them were successfully sent to the replica. > + * them were successfully sent to the replica. Acks are > + * collected only on the master. Other instances wait for > + * master's CONFIRM message instead. > */ > - txn_limbo_ack(&txn_limbo, status->relay->replica->id, > - vclock_get(&status->vclock, instance_id)); > + if (txn_limbo.instance_id == instance_id) { > + txn_limbo_ack(&txn_limbo, status->relay->replica->id, > + vclock_get(&status->vclock, instance_id)); > + } Nice, I moved that to the patch introducing the limbo. > static const struct cmsg_hop route[] = { > {relay_status_update, NULL} > }; > diff --git a/src/box/txn.c b/src/box/txn.c > index a65100b31..3b331fecc 100644 > --- a/src/box/txn.c > +++ b/src/box/txn.c> @@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn) > struct xrow_header **remote_row = req->rows; > struct xrow_header **local_row = req->rows + txn->n_applier_rows; > bool is_sync = false; > - /* > - * Only local transactions, originated from the master, > - * can enter 'waiting for acks' state. It means, only > - * author of the transaction can collect acks. Replicas > - * consider it a normal async transaction so far. > - */ > - bool is_local = true; I squashed is_local removal into the first commits. So like it didn't exist at all. Why did you remove it, btw? I applied the removal, because realized, that if all the spaces are local, then neither of them can be sync. So I banned is_sync + is_local options in the first commit. Did you remove it for the same reason? > > stailq_foreach_entry(stmt, &txn->stmts, next) { > if (stmt->has_triggers) { > diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c > index efb97a591..daec98317 100644 > --- a/src/box/txn_limbo.c > +++ b/src/box/txn_limbo.c > @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) > > +/** > + * Write a confirmation entry to WAL. After it's written all the > + * transactions waiting for confirmation may be finished. > + */ > +static int > +txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry) > +{ > + /* Prepare a confirm entry. */ > + struct xrow_header row = {0}; > + struct request request = {0}; > + request.header = &row; > + > + row.bodycnt = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn); > + if (row.bodycnt < 0) > + return -1; > + > + struct txn *txn = txn_begin(); > + if (txn == NULL) > + return -1; > + > + if (txn_begin_stmt(txn, NULL) != 0) > + goto rollback; > + if (txn_commit_stmt(txn, &request) != 0) > + goto rollback; > + > + return txn_commit(txn); We definitely shouldn't use transactions for non DML data. We need separate API for that, not to spoil the hotpath, and to keep the DML commit code 'simple'. Not now though. We just need to keep these kind of follow ups in mind/on track, and file them as a follow-up issue after the sync replication is done. > +rollback: > + txn_rollback(txn); > + return -1; > +}