From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp61.i.mail.ru (smtp61.i.mail.ru [217.69.128.41]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 6959440F3AE for ; Thu, 11 Jun 2020 11:56:07 +0300 (MSK) References: <3210e1e6f867cfd1c1f65e05f28a32deae63c172.1591701695.git.sergepetrenko@tarantool.org> <7228506b-2eff-befb-43b7-f933a8844867@tarantool.org> From: Serge Petrenko Message-ID: <1a8d2474-b84e-c5c9-6c4b-5c07f1fb7055@tarantool.org> Date: Thu, 11 Jun 2020 11:56:06 +0300 MIME-Version: 1.0 In-Reply-To: <7228506b-2eff-befb-43b7-f933a8844867@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-GB Subject: Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy , sergos@tarantool.org, gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org 11.06.2020 02:51, Vladislav Shpilevoy пишет: > Thanks for the patch! Hi! Thanks for the fixes! Looks good. See my comments inline. >> diff --git a/src/box/applier.cc b/src/box/applier.cc >> index df48b4796..1dc977424 100644 >> --- a/src/box/applier.cc >> +++ b/src/box/applier.cc >> @@ -249,10 +255,73 @@ process_nop(struct request *request) >> return txn_commit_stmt(txn, request); >> } >> >> +/* >> + * An on_commit trigger set on a txn containing a CONFIRM entry. >> + * Confirms some of the txs waiting in txn_limbo. >> + */ >> +static int >> +applier_on_confirm(struct trigger *trig, void *data) >> +{ >> + (void) trig; >> + int64_t lsn = *(int64_t *)data; >> + txn_limbo_read_confirm(&txn_limbo, lsn); >> + return 0; >> +} >> + >> +static int >> +process_confirm(struct request *request) >> +{ >> + assert(request->header->type = IPROTO_CONFIRM); >> + uint32_t replica_id; >> + struct txn *txn = in_txn(); >> + int64_t *lsn = (int64_t *) region_alloc(&txn->region, sizeof(int64_t)); >> + if (lsn == NULL) { > I changed that to region_alloc_object(). To keep alignment correct. Generally, > we should keep in mind, that raw region_alloc() now is close to being forbidden. > It can be used only for byte buffers like strings, MessagePack. Ok, I see. > >> + diag_set(OutOfMemory, sizeof(int64_t), "region_alloc", "lsn"); >> + return -1; >> + } >> + if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0) >> + return -1; >> + /* >> + * on_commit trigger failure is not allowed, so check for >> + * instance id early. >> + */ >> + if (replica_id != txn_limbo.instance_id) { >> + diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id, >> + txn_limbo.instance_id); >> + return -1; >> + } >> + >> + /* >> + * Set an on_commit trigger which will perform the actual >> + * confirmation processing. >> + */ >> + struct trigger *trig = (struct trigger *)region_alloc(&txn->region, >> + sizeof(*trig)); > Changed to region_alloc_object(). > >> + if (trig == NULL) { >> + diag_set(OutOfMemory, sizeof(*trig), "region_alloc", "trig"); >> + return -1; >> + } >> + trigger_create(trig, applier_on_confirm, lsn, NULL); >> + >> + if (txn_begin_stmt(txn, NULL) != 0) >> + return -1; >> + >> + if (txn_commit_stmt(txn, request) == 0) { >> + txn_on_commit(txn, trig); >> + return 0; >> + } else { >> + return -1; >> + } >> @@ -492,7 +566,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count) >> applier->last_row_time = ev_monotonic_now(loop()); >> if (iproto_type_is_dml(row.type)) { >> vclock_follow_xrow(&replicaset.vclock, &row); >> - if (apply_final_join_row(&row) != 0) >> + /* >> + * Confirms are ignored during join. All the >> + * data master sends us is valid. >> + */ >> + if (row.type != IPROTO_CONFIRM && > I moved the check into apply_final_join_row(). To be consistent with apply_row() and > apply_wal_row(). > >> + apply_final_join_row(&row) != 0) >> diag_raise(); >> if (++row_count % 100000 == 0) >> say_info("%.1fM rows received", row_count / 1e6); >> diff --git a/src/box/relay.cc b/src/box/relay.cc >> index 333e91ea9..4df3c2f26 100644 >> --- a/src/box/relay.cc >> +++ b/src/box/relay.cc >> @@ -402,10 +402,14 @@ tx_status_update(struct cmsg *msg) >> vclock_copy(&status->relay->tx.vclock, &status->vclock); >> /* >> * Let pending synchronous transactions know, which of >> - * them were successfully sent to the replica. >> + * them were successfully sent to the replica. Acks are >> + * collected only on the master. Other instances wait for >> + * master's CONFIRM message instead. >> */ >> - txn_limbo_ack(&txn_limbo, status->relay->replica->id, >> - vclock_get(&status->vclock, instance_id)); >> + if (txn_limbo.instance_id == instance_id) { >> + txn_limbo_ack(&txn_limbo, status->relay->replica->id, >> + vclock_get(&status->vclock, instance_id)); >> + } > Nice, I moved that to the patch introducing the limbo. > >> static const struct cmsg_hop route[] = { >> {relay_status_update, NULL} >> }; >> diff --git a/src/box/txn.c b/src/box/txn.c >> index a65100b31..3b331fecc 100644 >> --- a/src/box/txn.c >> +++ b/src/box/txn.c> @@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn) >> struct xrow_header **remote_row = req->rows; >> struct xrow_header **local_row = req->rows + txn->n_applier_rows; >> bool is_sync = false; >> - /* >> - * Only local transactions, originated from the master, >> - * can enter 'waiting for acks' state. It means, only >> - * author of the transaction can collect acks. Replicas >> - * consider it a normal async transaction so far. >> - */ >> - bool is_local = true; > I squashed is_local removal into the first commits. So like it didn't > exist at all. > > Why did you remove it, btw? I applied the removal, because realized, > that if all the spaces are local, then neither of them can be sync. > So I banned is_sync + is_local options in the first commit. Did you > remove it for the same reason? If I remember correctly,  your is_local check was not about local spaces, but rather about whether the transaction was originating from the local instance. I separated txn_limbo behaviour judging by where the limbo_entries come from. If they come from txn_commit_async(), this means this is  a  replica, and txn_limbo waits  for  confirm messages. If they come from txn_commit(), this is master, so txn_limbo gathers acks. It'd be better to judge by your is_local flag, I  believe. We probably should put a fixme here also. > >> >> stailq_foreach_entry(stmt, &txn->stmts, next) { >> if (stmt->has_triggers) { >> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c >> index efb97a591..daec98317 100644 >> --- a/src/box/txn_limbo.c >> +++ b/src/box/txn_limbo.c >> @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) >> >> +/** >> + * Write a confirmation entry to WAL. After it's written all the >> + * transactions waiting for confirmation may be finished. >> + */ >> +static int >> +txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry) >> +{ >> + /* Prepare a confirm entry. */ >> + struct xrow_header row = {0}; >> + struct request request = {0}; >> + request.header = &row; >> + >> + row.bodycnt = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn); >> + if (row.bodycnt < 0) >> + return -1; >> + >> + struct txn *txn = txn_begin(); >> + if (txn == NULL) >> + return -1; >> + >> + if (txn_begin_stmt(txn, NULL) != 0) >> + goto rollback; >> + if (txn_commit_stmt(txn, &request) != 0) >> + goto rollback; >> + >> + return txn_commit(txn); > We definitely shouldn't use transactions for non DML data. We need > separate API for that, not to spoil the hotpath, and to keep the > DML commit code 'simple'. Not now though. We just need to keep these > kind of follow ups in mind/on track, and file them as a follow-up > issue after the sync replication is done. I agree. We'll have to rework journal callbacks then, cos now journal calls txn_complete_async() on each successful write. > >> +rollback: >> + txn_rollback(txn); >> + return -1; >> +} -- Serge Petrenko