[Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries
Serge Petrenko
sergepetrenko at tarantool.org
Thu Jun 11 11:56:06 MSK 2020
11.06.2020 02:51, Vladislav Shpilevoy пишет:
> Thanks for the patch!
Hi! Thanks for the fixes!
Looks good.
See my comments inline.
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index df48b4796..1dc977424 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -249,10 +255,73 @@ process_nop(struct request *request)
>> return txn_commit_stmt(txn, request);
>> }
>>
>> +/*
>> + * An on_commit trigger set on a txn containing a CONFIRM entry.
>> + * Confirms some of the txs waiting in txn_limbo.
>> + */
>> +static int
>> +applier_on_confirm(struct trigger *trig, void *data)
>> +{
>> + (void) trig;
>> + int64_t lsn = *(int64_t *)data;
>> + txn_limbo_read_confirm(&txn_limbo, lsn);
>> + return 0;
>> +}
>> +
>> +static int
>> +process_confirm(struct request *request)
>> +{
>> + assert(request->header->type = IPROTO_CONFIRM);
>> + uint32_t replica_id;
>> + struct txn *txn = in_txn();
>> + int64_t *lsn = (int64_t *) region_alloc(&txn->region, sizeof(int64_t));
>> + if (lsn == NULL) {
> I changed that to region_alloc_object(). To keep alignment correct. Generally,
> we should keep in mind, that raw region_alloc() now is close to being forbidden.
> It can be used only for byte buffers like strings, MessagePack.
Ok, I see.
>
>> + diag_set(OutOfMemory, sizeof(int64_t), "region_alloc", "lsn");
>> + return -1;
>> + }
>> + if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0)
>> + return -1;
>> + /*
>> + * on_commit trigger failure is not allowed, so check for
>> + * instance id early.
>> + */
>> + if (replica_id != txn_limbo.instance_id) {
>> + diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
>> + txn_limbo.instance_id);
>> + return -1;
>> + }
>> +
>> + /*
>> + * Set an on_commit trigger which will perform the actual
>> + * confirmation processing.
>> + */
>> + struct trigger *trig = (struct trigger *)region_alloc(&txn->region,
>> + sizeof(*trig));
> Changed to region_alloc_object().
>
>> + if (trig == NULL) {
>> + diag_set(OutOfMemory, sizeof(*trig), "region_alloc", "trig");
>> + return -1;
>> + }
>> + trigger_create(trig, applier_on_confirm, lsn, NULL);
>> +
>> + if (txn_begin_stmt(txn, NULL) != 0)
>> + return -1;
>> +
>> + if (txn_commit_stmt(txn, request) == 0) {
>> + txn_on_commit(txn, trig);
>> + return 0;
>> + } else {
>> + return -1;
>> + }
>> @@ -492,7 +566,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>> applier->last_row_time = ev_monotonic_now(loop());
>> if (iproto_type_is_dml(row.type)) {
>> vclock_follow_xrow(&replicaset.vclock, &row);
>> - if (apply_final_join_row(&row) != 0)
>> + /*
>> + * Confirms are ignored during join. All the
>> + * data master sends us is valid.
>> + */
>> + if (row.type != IPROTO_CONFIRM &&
> I moved the check into apply_final_join_row(). To be consistent with apply_row() and
> apply_wal_row().
>
>> + apply_final_join_row(&row) != 0)
>> diag_raise();
>> if (++row_count % 100000 == 0)
>> say_info("%.1fM rows received", row_count / 1e6);
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index 333e91ea9..4df3c2f26 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -402,10 +402,14 @@ tx_status_update(struct cmsg *msg)
>> vclock_copy(&status->relay->tx.vclock, &status->vclock);
>> /*
>> * Let pending synchronous transactions know, which of
>> - * them were successfully sent to the replica.
>> + * them were successfully sent to the replica. Acks are
>> + * collected only on the master. Other instances wait for
>> + * master's CONFIRM message instead.
>> */
>> - txn_limbo_ack(&txn_limbo, status->relay->replica->id,
>> - vclock_get(&status->vclock, instance_id));
>> + if (txn_limbo.instance_id == instance_id) {
>> + txn_limbo_ack(&txn_limbo, status->relay->replica->id,
>> + vclock_get(&status->vclock, instance_id));
>> + }
> Nice, I moved that to the patch introducing the limbo.
>
>> static const struct cmsg_hop route[] = {
>> {relay_status_update, NULL}
>> };
>> diff --git a/src/box/txn.c b/src/box/txn.c
>> index a65100b31..3b331fecc 100644
>> --- a/src/box/txn.c
>> +++ b/src/box/txn.c> @@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn)
>> struct xrow_header **remote_row = req->rows;
>> struct xrow_header **local_row = req->rows + txn->n_applier_rows;
>> bool is_sync = false;
>> - /*
>> - * Only local transactions, originated from the master,
>> - * can enter 'waiting for acks' state. It means, only
>> - * author of the transaction can collect acks. Replicas
>> - * consider it a normal async transaction so far.
>> - */
>> - bool is_local = true;
> I squashed is_local removal into the first commits. So like it didn't
> exist at all.
>
> Why did you remove it, btw? I applied the removal, because realized,
> that if all the spaces are local, then neither of them can be sync.
> So I banned is_sync + is_local options in the first commit. Did you
> remove it for the same reason?
If I remember correctly, your is_local check was not about local
spaces, but
rather about whether the transaction was originating from the local
instance.
I separated txn_limbo behaviour judging by where the limbo_entries come
from.
If they come from txn_commit_async(), this means this is a replica,
and txn_limbo
waits for confirm messages. If they come from txn_commit(), this is
master, so
txn_limbo gathers acks. It'd be better to judge by your is_local flag,
I believe.
We probably should put a fixme here also.
>
>>
>> stailq_foreach_entry(stmt, &txn->stmts, next) {
>> if (stmt->has_triggers) {
>> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
>> index efb97a591..daec98317 100644
>> --- a/src/box/txn_limbo.c
>> +++ b/src/box/txn_limbo.c
>> @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>>
>> +/**
>> + * Write a confirmation entry to WAL. After it's written all the
>> + * transactions waiting for confirmation may be finished.
>> + */
>> +static int
>> +txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>> +{
>> + /* Prepare a confirm entry. */
>> + struct xrow_header row = {0};
>> + struct request request = {0};
>> + request.header = &row;
>> +
>> + row.bodycnt = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn);
>> + if (row.bodycnt < 0)
>> + return -1;
>> +
>> + struct txn *txn = txn_begin();
>> + if (txn == NULL)
>> + return -1;
>> +
>> + if (txn_begin_stmt(txn, NULL) != 0)
>> + goto rollback;
>> + if (txn_commit_stmt(txn, &request) != 0)
>> + goto rollback;
>> +
>> + return txn_commit(txn);
> We definitely shouldn't use transactions for non DML data. We need
> separate API for that, not to spoil the hotpath, and to keep the
> DML commit code 'simple'. Not now though. We just need to keep these
> kind of follow ups in mind/on track, and file them as a follow-up
> issue after the sync replication is done.
I agree. We'll have to rework journal callbacks then, cos now journal calls
txn_complete_async() on each successful write.
>
>> +rollback:
>> + txn_rollback(txn);
>> + return -1;
>> +}
--
Serge Petrenko
More information about the Tarantool-patches
mailing list