[Tarantool-patches] [PATCH v2 3/7] applier: extract plain tx application from applier_apply_tx()
Serge Petrenko
sergepetrenko at tarantool.org
Sat Mar 27 20:34:36 MSK 2021
26.03.2021 23:47, Vladislav Shpilevoy пишет:
> Thanks for the patch!
>
> See 4 comments below.
Thanks for the review!
>
> On 24.03.2021 13:24, Serge Petrenko wrote:
>> The new routine, called apply_plain_tx(), may be used not only by
>> applier_apply_tx(), but also by final join, once we make it
>> transactional, and recovery, once it's also turned transactional.
>>
>> Also, while we're at it. Remove excess fiber_gc() call from
>> applier_subscribe loop. Let's better make sure fiber_gc() is called on
>> any return from applier_apply_tx().
>>
>> Prerequisite #5874
>> Part of #5566
>> ---
>> src/box/applier.cc | 188 ++++++++++++++++++++++-----------------------
>> 1 file changed, 93 insertions(+), 95 deletions(-)
>>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 65afa5e98..07e557a51 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -905,6 +905,90 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
>> return box_raft_process(&req, applier->instance_id);
>> }
>>
>> +static inline int
>> +apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
>> +{
>> + /**
> 1. Inside of functions for comment first line we use /*, not /**.
Sure, fixed.
>
>> + * Explicitly begin the transaction so that we can
>> + * control fiber->gc life cycle and, in case of apply
>> + * conflict safely access failed xrow object and allocate
>> + * IPROTO_NOP on gc.
>> + */
>> + struct txn *txn = txn_begin();
>> + struct applier_tx_row *item;
>> + if (txn == NULL)
>> + return -1;
>> +
>> + stailq_foreach_entry(item, rows, next) {
>> + struct xrow_header *row = &item->row;
>> + int res = apply_row(row);
>> + if (res != 0 && skip_conflict) {
>> + struct error *e = diag_last_error(diag_get());
>> + /*
>> + * In case of ER_TUPLE_FOUND error and enabled
>> + * replication_skip_conflict configuration
>> + * option, skip applying the foreign row and
>> + * replace it with NOP in the local write ahead
>> + * log.
>> + */
>> + if (e->type == &type_ClientError &&
>> + box_error_code(e) == ER_TUPLE_FOUND &&
>> + replication_skip_conflict) {
> 2. That looks kind of confusing - you pass skip_conflict option but
> also use replication_skip_conflict. You could calculate skip_conflict
> based on replication_skip_conflict in your patch.
Yes, indeed. Thanks for noticing!
>
>> + diag_clear(diag_get());
>> + row->type = IPROTO_NOP;
>> + row->bodycnt = 0;
>> + res = apply_row(row);
>> + }
>> + }
>> + if (res != 0)
>> + goto fail;
>> + }
>> +
>> + /*
>> + * We are going to commit so it's a high time to check if
>> + * the current transaction has non-local effects.
>> + */
>> + if (txn_is_distributed(txn)) {
>> + /*
>> + * A transaction mixes remote and local rows.
>> + * Local rows must be replicated back, which
>> + * doesn't make sense since the master likely has
>> + * new changes which local rows may overwrite.
>> + * Raise an error.
>> + */
>> + diag_set(ClientError, ER_UNSUPPORTED, "Replication",
>> + "distributed transactions");
>> + goto fail;
>> + }
>> +
>> + if (use_triggers) {
>> + /* We are ready to submit txn to wal. */
>> + struct trigger *on_rollback, *on_wal_write;
>> + size_t size;
>> + on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback),
>> + &size);
>> + on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write),
>> + &size);
>> + if (on_rollback == NULL || on_wal_write == NULL) {
>> + diag_set(OutOfMemory, size, "region_alloc_object",
>> + "on_rollback/on_wal_write");
>> + goto fail;
>> + }
>> +
>> + trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
>> + txn_on_rollback(txn, on_rollback);
>> +
>> + trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
>> + txn_on_wal_write(txn, on_wal_write);
>> + }
>> +
>> + return txn_commit_try_async(txn);
>> +fail:
>> + txn_rollback(txn);
>> + return -1;
>> +}
>> @@ -974,103 +1058,18 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>> assert(first_row == last_row);
>> if (apply_synchro_row(first_row) != 0)
>> diag_raise();
> 3. Hm. Isn't it a bug that we raise an error here, but don't unlock the
> latch and don't call fiber_gc()? Looks like a separate bug. Could you
> fix it please, and probably with a test? Can it be related to the
> hang you fix in the previous commit?
It is a bug, yes. Will fix in a commit on top. It's not related to the
hang we spoke of
in the previous letter though.
>
>> - goto success;
>> - }
>> -
>> - /**
>> - * Explicitly begin the transaction so that we can
>> - * control fiber->gc life cycle and, in case of apply
>> - * conflict safely access failed xrow object and allocate
>> - * IPROTO_NOP on gc.
>> - */
>> - struct txn *txn;
>> - txn = txn_begin();
>> - struct applier_tx_row *item;
>> - if (txn == NULL) {
>> - latch_unlock(latch);
>> - return -1;
>> - }
>> - stailq_foreach_entry(item, rows, next) {
>> - struct xrow_header *row = &item->row;
>> - int res = apply_row(row);
>> - if (res != 0) {
>> - struct error *e = diag_last_error(diag_get());
>> - /*
>> - * In case of ER_TUPLE_FOUND error and enabled
>> - * replication_skip_conflict configuration
>> - * option, skip applying the foreign row and
>> - * replace it with NOP in the local write ahead
>> - * log.
>> - */
>> - if (e->type == &type_ClientError &&
>> - box_error_code(e) == ER_TUPLE_FOUND &&
>> - replication_skip_conflict) {
>> - diag_clear(diag_get());
>> - row->type = IPROTO_NOP;
>> - row->bodycnt = 0;
>> - res = apply_row(row);
>> - }
>> - }
>> - if (res != 0)
>> - goto rollback;
>> - }
>> - /*
>> - * We are going to commit so it's a high time to check if
>> - * the current transaction has non-local effects.
>> - */
>> - if (txn_is_distributed(txn)) {
>> - /*
>> - * A transaction mixes remote and local rows.
>> - * Local rows must be replicated back, which
>> - * doesn't make sense since the master likely has
>> - * new changes which local rows may overwrite.
>> - * Raise an error.
>> - */
>> - diag_set(ClientError, ER_UNSUPPORTED,
>> - "Replication", "distributed transactions");
>> - goto rollback;
>> + goto written;
>> }
>>
>> - /* We are ready to submit txn to wal. */
>> - struct trigger *on_rollback, *on_wal_write;
>> - size_t size;
>> - on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback),
>> - &size);
>> - on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write),
>> - &size);
>> - if (on_rollback == NULL || on_wal_write == NULL) {
>> - diag_set(OutOfMemory, size, "region_alloc_object",
>> - "on_rollback/on_wal_write");
>> - goto rollback;
>> + if ((rc = apply_plain_tx(rows, true, true)) == 0) {
>> +written:
>> + vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
>> + last_row->lsn);
>> }
>> -
>> - trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
>> - txn_on_rollback(txn, on_rollback);
>> -
>> - trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
>> - txn_on_wal_write(txn, on_wal_write);
>> -
>> - if (txn_commit_try_async(txn) < 0)
>> - goto fail;
>> -
>> -success:
>> - /*
>> - * The transaction was sent to journal so promote vclock.
>> - *
>> - * Use the lsn of the last row to guard from 1.10
>> - * instances, which send every single tx row as a separate
>> - * transaction.
>> - */
>> - vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
>> - last_row->lsn);
>> - latch_unlock(latch);
>> - return 0;
>> -rollback:
>> - txn_rollback(txn);
>> -fail:
>> +no_write:
> 4. You go to this label even when write was done. Maybe rename to
> 'end' or 'finish'?
>
> Consider this diff:
>
> ====================
> @@ -1027,7 +1027,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
> latch_lock(latch);
> if (vclock_get(&replicaset.applier.vclock,
> last_row->replica_id) >= last_row->lsn) {
> - goto no_write;
> + goto finish;
> } else if (vclock_get(&replicaset.applier.vclock,
> first_row->replica_id) >= first_row->lsn) {
> /*
> @@ -1058,15 +1058,12 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
> assert(first_row == last_row);
> if (apply_synchro_row(first_row) != 0)
> diag_raise();
> - goto written;
> + } else if ((rc = apply_plain_tx(rows, true, true)) != 0) {
> + goto finish;
> }
> -
> - if ((rc = apply_plain_tx(rows, true, true)) == 0) {
> -written:
> - vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
> - last_row->lsn);
> - }
> -no_write:
> + vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
> + last_row->lsn);
> +finish:
> latch_unlock(latch);
> fiber_gc();
> return rc;
> ====================
Looks good, applied. Incremental diff below.
========================================
diff --git a/src/box/applier.cc b/src/box/applier.cc
index f396e43a8..e6d9673dd 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -908,7 +908,7 @@ applier_handle_raft(struct applier *applier, struct
xrow_header *row)
static inline int
apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
{
- /**
+ /*
* Explicitly begin the transaction so that we can
* control fiber->gc life cycle and, in case of apply
* conflict safely access failed xrow object and allocate
@@ -932,8 +932,7 @@ apply_plain_tx(struct stailq *rows, bool
skip_conflict, bool use_triggers)
* log.
*/
if (e->type == &type_ClientError &&
- box_error_code(e) == ER_TUPLE_FOUND &&
- replication_skip_conflict) {
+ box_error_code(e) == ER_TUPLE_FOUND) {
diag_clear(diag_get());
row->type = IPROTO_NOP;
row->bodycnt = 0;
@@ -1027,7 +1026,7 @@ applier_apply_tx(struct applier *applier, struct
stailq *rows)
latch_lock(latch);
if (vclock_get(&replicaset.applier.vclock,
last_row->replica_id) >= last_row->lsn) {
- goto no_write;
+ goto finish;
} else if (vclock_get(&replicaset.applier.vclock,
first_row->replica_id) >= first_row->lsn) {
/*
@@ -1058,15 +1057,13 @@ applier_apply_tx(struct applier *applier, struct
stailq *rows)
assert(first_row == last_row);
if (apply_synchro_row(first_row) != 0)
diag_raise();
- goto written;
- }
-
- if ((rc = apply_plain_tx(rows, true, true)) == 0) {
-written:
- vclock_follow(&replicaset.applier.vclock,
last_row->replica_id,
- last_row->lsn);
+ } else if ((rc = apply_plain_tx(rows, replication_skip_conflict,
+ true)) != 0) {
+ goto finish;
}
-no_write:
+ vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
+ last_row->lsn);
+finish:
latch_unlock(latch);
fiber_gc();
return rc;
========================================
>
>> latch_unlock(latch);
>> fiber_gc();
>> - return -1;
>> + return rc;
>> }
--
Serge Petrenko
More information about the Tarantool-patches
mailing list