[Tarantool-patches] [PATCH v2 3/7] applier: extract plain tx application from applier_apply_tx()

Serge Petrenko sergepetrenko at tarantool.org
Sat Mar 27 20:34:36 MSK 2021



26.03.2021 23:47, Vladislav Shpilevoy пишет:
> Thanks for the patch!
>
> See 4 comments below.

Thanks for the review!

>
> On 24.03.2021 13:24, Serge Petrenko wrote:
>> The new routine, called apply_plain_tx(), may be used not only by
>> applier_apply_tx(), but also by final join, once we make it
>> transactional, and recovery, once it's also turned transactional.
>>
>> Also, while we're at it. Remove excess fiber_gc() call from
>> applier_subscribe loop. Let's better make sure fiber_gc() is called on
>> any return from applier_apply_tx().
>>
>> Prerequisite #5874
>> Part of #5566
>> ---
>>   src/box/applier.cc | 188 ++++++++++++++++++++++-----------------------
>>   1 file changed, 93 insertions(+), 95 deletions(-)
>>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 65afa5e98..07e557a51 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -905,6 +905,90 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
>>   	return box_raft_process(&req, applier->instance_id);
>>   }
>>   
>> +static inline int
>> +apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
>> +{
>> +	/**
> 1. Inside of functions for comment first line we use /*, not /**.

Sure, fixed.

>
>> +	 * Explicitly begin the transaction so that we can
>> +	 * control fiber->gc life cycle and, in case of apply
>> +	 * conflict safely access failed xrow object and allocate
>> +	 * IPROTO_NOP on gc.
>> +	 */
>> +	struct txn *txn = txn_begin();
>> +	struct applier_tx_row *item;
>> +	if (txn == NULL)
>> +		 return -1;
>> +
>> +	stailq_foreach_entry(item, rows, next) {
>> +		struct xrow_header *row = &item->row;
>> +		int res = apply_row(row);
>> +		if (res != 0 && skip_conflict) {
>> +			struct error *e = diag_last_error(diag_get());
>> +			/*
>> +			 * In case of ER_TUPLE_FOUND error and enabled
>> +			 * replication_skip_conflict configuration
>> +			 * option, skip applying the foreign row and
>> +			 * replace it with NOP in the local write ahead
>> +			 * log.
>> +			 */
>> +			if (e->type == &type_ClientError &&
>> +			    box_error_code(e) == ER_TUPLE_FOUND &&
>> +			    replication_skip_conflict) {
> 2. That looks kind of confusing - you pass skip_conflict option but
> also use replication_skip_conflict. You could calculate skip_conflict
> based on replication_skip_conflict in your patch.

Yes, indeed. Thanks for noticing!

>
>> +				diag_clear(diag_get());
>> +				row->type = IPROTO_NOP;
>> +				row->bodycnt = 0;
>> +				res = apply_row(row);
>> +			}
>> +		}
>> +		if (res != 0)
>> +			goto fail;
>> +	}
>> +
>> +	/*
>> +	 * We are going to commit so it's a high time to check if
>> +	 * the current transaction has non-local effects.
>> +	 */
>> +	if (txn_is_distributed(txn)) {
>> +		/*
>> +		 * A transaction mixes remote and local rows.
>> +		 * Local rows must be replicated back, which
>> +		 * doesn't make sense since the master likely has
>> +		 * new changes which local rows may overwrite.
>> +		 * Raise an error.
>> +		 */
>> +		diag_set(ClientError, ER_UNSUPPORTED, "Replication",
>> +			 "distributed transactions");
>> +		goto fail;
>> +	}
>> +
>> +	if (use_triggers) {
>> +		/* We are ready to submit txn to wal. */
>> +		struct trigger *on_rollback, *on_wal_write;
>> +		size_t size;
>> +		on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback),
>> +						  &size);
>> +		on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write),
>> +						   &size);
>> +		if (on_rollback == NULL || on_wal_write == NULL) {
>> +			diag_set(OutOfMemory, size, "region_alloc_object",
>> +				 "on_rollback/on_wal_write");
>> +			goto fail;
>> +		}
>> +
>> +		trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
>> +		txn_on_rollback(txn, on_rollback);
>> +
>> +		trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
>> +		txn_on_wal_write(txn, on_wal_write);
>> +	}
>> +
>> +	return txn_commit_try_async(txn);
>> +fail:
>> +	txn_rollback(txn);
>> +	return -1;
>> +}
>> @@ -974,103 +1058,18 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>>   		assert(first_row == last_row);
>>   		if (apply_synchro_row(first_row) != 0)
>>   			diag_raise();
> 3. Hm. Isn't it a bug that we raise an error here, but don't unlock the
> latch and don't call fiber_gc()? Looks like a separate bug. Could you
> fix it please, and probably with a test? Can it be related to the
> hang you fix in the previous commit?

It is a bug, yes. Will fix in a commit on top. It's not related to the 
hang we spoke of
in the previous letter though.

>
>> -		goto success;
>> -	}
>> -
>> -	/**
>> -	 * Explicitly begin the transaction so that we can
>> -	 * control fiber->gc life cycle and, in case of apply
>> -	 * conflict safely access failed xrow object and allocate
>> -	 * IPROTO_NOP on gc.
>> -	 */
>> -	struct txn *txn;
>> -	txn = txn_begin();
>> -	struct applier_tx_row *item;
>> -	if (txn == NULL) {
>> -		latch_unlock(latch);
>> -		return -1;
>> -	}
>> -	stailq_foreach_entry(item, rows, next) {
>> -		struct xrow_header *row = &item->row;
>> -		int res = apply_row(row);
>> -		if (res != 0) {
>> -			struct error *e = diag_last_error(diag_get());
>> -			/*
>> -			 * In case of ER_TUPLE_FOUND error and enabled
>> -			 * replication_skip_conflict configuration
>> -			 * option, skip applying the foreign row and
>> -			 * replace it with NOP in the local write ahead
>> -			 * log.
>> -			 */
>> -			if (e->type == &type_ClientError &&
>> -			    box_error_code(e) == ER_TUPLE_FOUND &&
>> -			    replication_skip_conflict) {
>> -				diag_clear(diag_get());
>> -				row->type = IPROTO_NOP;
>> -				row->bodycnt = 0;
>> -				res = apply_row(row);
>> -			}
>> -		}
>> -		if (res != 0)
>> -			goto rollback;
>> -	}
>> -	/*
>> -	 * We are going to commit so it's a high time to check if
>> -	 * the current transaction has non-local effects.
>> -	 */
>> -	if (txn_is_distributed(txn)) {
>> -		/*
>> -		 * A transaction mixes remote and local rows.
>> -		 * Local rows must be replicated back, which
>> -		 * doesn't make sense since the master likely has
>> -		 * new changes which local rows may overwrite.
>> -		 * Raise an error.
>> -		 */
>> -		diag_set(ClientError, ER_UNSUPPORTED,
>> -			 "Replication", "distributed transactions");
>> -		goto rollback;
>> +		goto written;
>>   	}
>>   
>> -	/* We are ready to submit txn to wal. */
>> -	struct trigger *on_rollback, *on_wal_write;
>> -	size_t size;
>> -	on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback),
>> -					  &size);
>> -	on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write),
>> -					   &size);
>> -	if (on_rollback == NULL || on_wal_write == NULL) {
>> -		diag_set(OutOfMemory, size, "region_alloc_object",
>> -			 "on_rollback/on_wal_write");
>> -		goto rollback;
>> +	if ((rc = apply_plain_tx(rows, true, true)) == 0) {
>> +written:
>> +		vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
>> +			      last_row->lsn);
>>   	}
>> -
>> -	trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
>> -	txn_on_rollback(txn, on_rollback);
>> -
>> -	trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
>> -	txn_on_wal_write(txn, on_wal_write);
>> -
>> -	if (txn_commit_try_async(txn) < 0)
>> -		goto fail;
>> -
>> -success:
>> -	/*
>> -	 * The transaction was sent to journal so promote vclock.
>> -	 *
>> -	 * Use the lsn of the last row to guard from 1.10
>> -	 * instances, which send every single tx row as a separate
>> -	 * transaction.
>> -	 */
>> -	vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
>> -		      last_row->lsn);
>> -	latch_unlock(latch);
>> -	return 0;
>> -rollback:
>> -	txn_rollback(txn);
>> -fail:
>> +no_write:
> 4. You go to this label even when write was done. Maybe rename to
> 'end' or 'finish'?
>
> Consider this diff:
>
> ====================
> @@ -1027,7 +1027,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>   	latch_lock(latch);
>   	if (vclock_get(&replicaset.applier.vclock,
>   		       last_row->replica_id) >= last_row->lsn) {
> -		goto no_write;
> +		goto finish;
>   	} else if (vclock_get(&replicaset.applier.vclock,
>   			      first_row->replica_id) >= first_row->lsn) {
>   		/*
> @@ -1058,15 +1058,12 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>   		assert(first_row == last_row);
>   		if (apply_synchro_row(first_row) != 0)
>   			diag_raise();
> -		goto written;
> +	} else if ((rc = apply_plain_tx(rows, true, true)) != 0) {
> +		goto finish;
>   	}
> -
> -	if ((rc = apply_plain_tx(rows, true, true)) == 0) {
> -written:
> -		vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
> -			      last_row->lsn);
> -	}
> -no_write:
> +	vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
> +		      last_row->lsn);
> +finish:
>   	latch_unlock(latch);
>   	fiber_gc();
>   	return rc;
> ====================

Looks good, applied. Incremental diff below.

========================================

diff --git a/src/box/applier.cc b/src/box/applier.cc
index f396e43a8..e6d9673dd 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -908,7 +908,7 @@ applier_handle_raft(struct applier *applier, struct 
xrow_header *row)
  static inline int
  apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
  {
-       /**
+       /*
          * Explicitly begin the transaction so that we can
          * control fiber->gc life cycle and, in case of apply
          * conflict safely access failed xrow object and allocate
@@ -932,8 +932,7 @@ apply_plain_tx(struct stailq *rows, bool 
skip_conflict, bool use_triggers)
                          * log.
                          */
                         if (e->type == &type_ClientError &&
-                           box_error_code(e) == ER_TUPLE_FOUND &&
-                           replication_skip_conflict) {
+                           box_error_code(e) == ER_TUPLE_FOUND) {
                                 diag_clear(diag_get());
                                 row->type = IPROTO_NOP;
                                 row->bodycnt = 0;
@@ -1027,7 +1026,7 @@ applier_apply_tx(struct applier *applier, struct 
stailq *rows)
         latch_lock(latch);
         if (vclock_get(&replicaset.applier.vclock,
                        last_row->replica_id) >= last_row->lsn) {
-               goto no_write;
+               goto finish;
         } else if (vclock_get(&replicaset.applier.vclock,
                               first_row->replica_id) >= first_row->lsn) {
                 /*
@@ -1058,15 +1057,13 @@ applier_apply_tx(struct applier *applier, struct 
stailq *rows)
                 assert(first_row == last_row);
                 if (apply_synchro_row(first_row) != 0)
                         diag_raise();
-               goto written;
-       }
-
-       if ((rc = apply_plain_tx(rows, true, true)) == 0) {
-written:
-               vclock_follow(&replicaset.applier.vclock, 
last_row->replica_id,
-                             last_row->lsn);
+       } else if ((rc = apply_plain_tx(rows, replication_skip_conflict,
+                                       true)) != 0) {
+               goto finish;
         }
-no_write:
+       vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
+                     last_row->lsn);
+finish:
         latch_unlock(latch);
         fiber_gc();
         return rc;

========================================
>
>>   	latch_unlock(latch);
>>   	fiber_gc();
>> -	return -1;
>> +	return rc;
>>   }

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list