[Tarantool-patches] [PATCH v2 3/7] applier: extract plain tx application from applier_apply_tx()
Serge Petrenko
sergepetrenko at tarantool.org
Wed Mar 24 15:24:13 MSK 2021
The new routine, called apply_plain_tx(), may be used not only by
applier_apply_tx(), but also by final join, once we make it
transactional, and recovery, once it's also turned transactional.
Also, while we're at it. Remove excess fiber_gc() call from
applier_subscribe loop. Let's better make sure fiber_gc() is called on
any return from applier_apply_tx().
Prerequisite #5874
Part of #5566
---
src/box/applier.cc | 188 ++++++++++++++++++++++-----------------------
1 file changed, 93 insertions(+), 95 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 65afa5e98..07e557a51 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -905,6 +905,90 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
return box_raft_process(&req, applier->instance_id);
}
+static inline int
+apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
+{
+ /**
+ * Explicitly begin the transaction so that we can
+ * control fiber->gc life cycle and, in case of apply
+ * conflict safely access failed xrow object and allocate
+ * IPROTO_NOP on gc.
+ */
+ struct txn *txn = txn_begin();
+ struct applier_tx_row *item;
+ if (txn == NULL)
+ return -1;
+
+ stailq_foreach_entry(item, rows, next) {
+ struct xrow_header *row = &item->row;
+ int res = apply_row(row);
+ if (res != 0 && skip_conflict) {
+ struct error *e = diag_last_error(diag_get());
+ /*
+ * In case of ER_TUPLE_FOUND error and enabled
+ * replication_skip_conflict configuration
+ * option, skip applying the foreign row and
+ * replace it with NOP in the local write ahead
+ * log.
+ */
+ if (e->type == &type_ClientError &&
+ box_error_code(e) == ER_TUPLE_FOUND &&
+ replication_skip_conflict) {
+ diag_clear(diag_get());
+ row->type = IPROTO_NOP;
+ row->bodycnt = 0;
+ res = apply_row(row);
+ }
+ }
+ if (res != 0)
+ goto fail;
+ }
+
+ /*
+ * We are going to commit so it's a high time to check if
+ * the current transaction has non-local effects.
+ */
+ if (txn_is_distributed(txn)) {
+ /*
+ * A transaction mixes remote and local rows.
+ * Local rows must be replicated back, which
+ * doesn't make sense since the master likely has
+ * new changes which local rows may overwrite.
+ * Raise an error.
+ */
+ diag_set(ClientError, ER_UNSUPPORTED, "Replication",
+ "distributed transactions");
+ goto fail;
+ }
+
+ if (use_triggers) {
+ /* We are ready to submit txn to wal. */
+ struct trigger *on_rollback, *on_wal_write;
+ size_t size;
+ on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback),
+ &size);
+ on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write),
+ &size);
+ if (on_rollback == NULL || on_wal_write == NULL) {
+ diag_set(OutOfMemory, size, "region_alloc_object",
+ "on_rollback/on_wal_write");
+ goto fail;
+ }
+
+ trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
+ txn_on_rollback(txn, on_rollback);
+
+ trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
+ txn_on_wal_write(txn, on_wal_write);
+ }
+
+ return txn_commit_try_async(txn);
+fail:
+ txn_rollback(txn);
+ return -1;
+}
+
+
/**
* Apply all rows in the rows queue as a single transaction.
*
@@ -931,6 +1015,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
struct xrow_header *last_row;
last_row = &stailq_last_entry(rows, struct applier_tx_row, next)->row;
struct replica *replica = replica_by_id(first_row->replica_id);
+ int rc = 0;
/*
* In a full mesh topology, the same set of changes
* may arrive via two concurrently running appliers.
@@ -942,8 +1027,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
latch_lock(latch);
if (vclock_get(&replicaset.applier.vclock,
last_row->replica_id) >= last_row->lsn) {
- latch_unlock(latch);
- return 0;
+ goto no_write;
} else if (vclock_get(&replicaset.applier.vclock,
first_row->replica_id) >= first_row->lsn) {
/*
@@ -974,103 +1058,18 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
assert(first_row == last_row);
if (apply_synchro_row(first_row) != 0)
diag_raise();
- goto success;
- }
-
- /**
- * Explicitly begin the transaction so that we can
- * control fiber->gc life cycle and, in case of apply
- * conflict safely access failed xrow object and allocate
- * IPROTO_NOP on gc.
- */
- struct txn *txn;
- txn = txn_begin();
- struct applier_tx_row *item;
- if (txn == NULL) {
- latch_unlock(latch);
- return -1;
- }
- stailq_foreach_entry(item, rows, next) {
- struct xrow_header *row = &item->row;
- int res = apply_row(row);
- if (res != 0) {
- struct error *e = diag_last_error(diag_get());
- /*
- * In case of ER_TUPLE_FOUND error and enabled
- * replication_skip_conflict configuration
- * option, skip applying the foreign row and
- * replace it with NOP in the local write ahead
- * log.
- */
- if (e->type == &type_ClientError &&
- box_error_code(e) == ER_TUPLE_FOUND &&
- replication_skip_conflict) {
- diag_clear(diag_get());
- row->type = IPROTO_NOP;
- row->bodycnt = 0;
- res = apply_row(row);
- }
- }
- if (res != 0)
- goto rollback;
- }
- /*
- * We are going to commit so it's a high time to check if
- * the current transaction has non-local effects.
- */
- if (txn_is_distributed(txn)) {
- /*
- * A transaction mixes remote and local rows.
- * Local rows must be replicated back, which
- * doesn't make sense since the master likely has
- * new changes which local rows may overwrite.
- * Raise an error.
- */
- diag_set(ClientError, ER_UNSUPPORTED,
- "Replication", "distributed transactions");
- goto rollback;
+ goto written;
}
- /* We are ready to submit txn to wal. */
- struct trigger *on_rollback, *on_wal_write;
- size_t size;
- on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback),
- &size);
- on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write),
- &size);
- if (on_rollback == NULL || on_wal_write == NULL) {
- diag_set(OutOfMemory, size, "region_alloc_object",
- "on_rollback/on_wal_write");
- goto rollback;
+ if ((rc = apply_plain_tx(rows, true, true)) == 0) {
+written:
+ vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
+ last_row->lsn);
}
-
- trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
- txn_on_rollback(txn, on_rollback);
-
- trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
- txn_on_wal_write(txn, on_wal_write);
-
- if (txn_commit_try_async(txn) < 0)
- goto fail;
-
-success:
- /*
- * The transaction was sent to journal so promote vclock.
- *
- * Use the lsn of the last row to guard from 1.10
- * instances, which send every single tx row as a separate
- * transaction.
- */
- vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
- last_row->lsn);
- latch_unlock(latch);
- return 0;
-rollback:
- txn_rollback(txn);
-fail:
+no_write:
latch_unlock(latch);
fiber_gc();
- return -1;
+ return rc;
}
/**
@@ -1280,7 +1279,6 @@ applier_subscribe(struct applier *applier)
if (ibuf_used(ibuf) == 0)
ibuf_reset(ibuf);
- fiber_gc();
}
}
--
2.24.3 (Apple Git-128)
More information about the Tarantool-patches
mailing list