[Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries

Serge Petrenko sergepetrenko at tarantool.org
Thu Jun 11 11:56:06 MSK 2020


11.06.2020 02:51, Vladislav Shpilevoy пишет:
> Thanks for the patch!

Hi! Thanks for the fixes!

Looks good.

See my comments inline.

>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index df48b4796..1dc977424 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -249,10 +255,73 @@ process_nop(struct request *request)
>>   	return txn_commit_stmt(txn, request);
>>   }
>>   
>> +/*
>> + * An on_commit trigger set on a txn containing a CONFIRM entry.
>> + * Confirms some of the txs waiting in txn_limbo.
>> + */
>> +static int
>> +applier_on_confirm(struct trigger *trig, void *data)
>> +{
>> +	(void) trig;
>> +	int64_t lsn = *(int64_t *)data;
>> +	txn_limbo_read_confirm(&txn_limbo, lsn);
>> +	return 0;
>> +}
>> +
>> +static int
>> +process_confirm(struct request *request)
>> +{
>> +	assert(request->header->type = IPROTO_CONFIRM);
>> +	uint32_t replica_id;
>> +	struct txn *txn = in_txn();
>> +	int64_t *lsn = (int64_t *) region_alloc(&txn->region, sizeof(int64_t));
>> +	if (lsn == NULL) {
> I changed that to region_alloc_object(). To keep alignment correct. Generally,
> we should keep in mind, that raw region_alloc() now is close to being forbidden.
> It can be used only for byte buffers like strings, MessagePack.
Ok, I see.
>
>> +		diag_set(OutOfMemory, sizeof(int64_t), "region_alloc", "lsn");
>> +		return -1;
>> +	}
>> +	if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0)
>> +		return -1;
>> +	/*
>> +	 * on_commit trigger failure is not allowed, so check for
>> +	 * instance id early.
>> +	 */
>> +	if (replica_id != txn_limbo.instance_id) {
>> +		diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
>> +			 txn_limbo.instance_id);
>> +		return -1;
>> +	}
>> +
>> +	/*
>> +	 * Set an on_commit trigger which will perform the actual
>> +	 * confirmation processing.
>> +	 */
>> +	struct trigger *trig = (struct trigger *)region_alloc(&txn->region,
>> +							      sizeof(*trig));
> Changed to region_alloc_object().
>
>> +	if (trig == NULL) {
>> +		diag_set(OutOfMemory, sizeof(*trig), "region_alloc", "trig");
>> +		return -1;
>> +	}
>> +	trigger_create(trig, applier_on_confirm, lsn, NULL);
>> +
>> +	if (txn_begin_stmt(txn, NULL) != 0)
>> +		return -1;
>> +
>> +	if (txn_commit_stmt(txn, request) == 0) {
>> +		txn_on_commit(txn, trig);
>> +		return 0;
>> +	} else {
>> +		return -1;
>> +	}
>> @@ -492,7 +566,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>>   		applier->last_row_time = ev_monotonic_now(loop());
>>   		if (iproto_type_is_dml(row.type)) {
>>   			vclock_follow_xrow(&replicaset.vclock, &row);
>> -			if (apply_final_join_row(&row) != 0)
>> +			/*
>> +			 * Confirms are ignored during join. All the
>> +			 * data master sends us is valid.
>> +			 */
>> +			if (row.type != IPROTO_CONFIRM &&
> I moved the check into apply_final_join_row(). To be consistent with apply_row() and
> apply_wal_row().
>
>> +			    apply_final_join_row(&row) != 0)
>>   				diag_raise();
>>   			if (++row_count % 100000 == 0)
>>   				say_info("%.1fM rows received", row_count / 1e6);
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index 333e91ea9..4df3c2f26 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -402,10 +402,14 @@ tx_status_update(struct cmsg *msg)
>>   	vclock_copy(&status->relay->tx.vclock, &status->vclock);
>>   	/*
>>   	 * Let pending synchronous transactions know, which of
>> -	 * them were successfully sent to the replica.
>> +	 * them were successfully sent to the replica. Acks are
>> +	 * collected only on the master. Other instances wait for
>> +	 * master's CONFIRM message instead.
>>   	 */
>> -	txn_limbo_ack(&txn_limbo, status->relay->replica->id,
>> -		     vclock_get(&status->vclock, instance_id));
>> +	if (txn_limbo.instance_id == instance_id) {
>> +		txn_limbo_ack(&txn_limbo, status->relay->replica->id,
>> +			      vclock_get(&status->vclock, instance_id));
>> +	}
> Nice, I moved that to the patch introducing the limbo.
>
>>   	static const struct cmsg_hop route[] = {
>>   		{relay_status_update, NULL}
>>   	};
>> diff --git a/src/box/txn.c b/src/box/txn.c
>> index a65100b31..3b331fecc 100644
>> --- a/src/box/txn.c
>> +++ b/src/box/txn.c> @@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn)
>>   	struct xrow_header **remote_row = req->rows;
>>   	struct xrow_header **local_row = req->rows + txn->n_applier_rows;
>>   	bool is_sync = false;
>> -	/*
>> -	 * Only local transactions, originated from the master,
>> -	 * can enter 'waiting for acks' state. It means, only
>> -	 * author of the transaction can collect acks. Replicas
>> -	 * consider it a normal async transaction so far.
>> -	 */
>> -	bool is_local = true;
> I squashed is_local removal into the first commits. So like it didn't
> exist at all.
>
> Why did you remove it, btw? I applied the removal, because realized,
> that if all the spaces are local, then neither of them can be sync.
> So I banned is_sync + is_local options in the first commit. Did you
> remove it for the same reason?

If I remember correctly,  your is_local check was not about local 
spaces, but
rather about whether the transaction was originating from the local 
instance.

I separated txn_limbo behaviour judging by where the limbo_entries come 
from.
If they come from txn_commit_async(), this means this is  a  replica, 
and txn_limbo
waits  for  confirm messages. If they come from txn_commit(), this is 
master, so
txn_limbo gathers acks. It'd be better to judge by your is_local flag, 
I  believe.

We probably should put a fixme here also.

>
>>   
>>   	stailq_foreach_entry(stmt, &txn->stmts, next) {
>>   		if (stmt->has_triggers) {
>> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
>> index efb97a591..daec98317 100644
>> --- a/src/box/txn_limbo.c
>> +++ b/src/box/txn_limbo.c
>> @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>>   
>> +/**
>> + * Write a confirmation entry to WAL. After it's written all the
>> + * transactions waiting for confirmation may be finished.
>> + */
>> +static int
>> +txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>> +{
>> +	/* Prepare a confirm entry. */
>> +	struct xrow_header row = {0};
>> +	struct request request = {0};
>> +	request.header = &row;
>> +
>> +	row.bodycnt = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn);
>> +	if (row.bodycnt < 0)
>> +		return -1;
>> +
>> +	struct txn *txn = txn_begin();
>> +	if (txn == NULL)
>> +		return -1;
>> +
>> +	if (txn_begin_stmt(txn, NULL) != 0)
>> +		goto rollback;
>> +	if (txn_commit_stmt(txn, &request) != 0)
>> +		goto rollback;
>> +
>> +	return txn_commit(txn);
> We definitely shouldn't use transactions for non DML data. We need
> separate API for that, not to spoil the hotpath, and to keep the
> DML commit code 'simple'. Not now though. We just need to keep these
> kind of follow ups in mind/on track, and file them as a follow-up
> issue after the sync replication is done.

I agree. We'll have to rework journal callbacks then, cos now journal calls
txn_complete_async() on each successful write.

>
>> +rollback:
>> +	txn_rollback(txn);
>> +	return -1;
>> +}

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list