[tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel
Georgy Kirichenko
georgy at tarantool.org
Sun Jun 9 23:44:42 MSK 2019
Applier use asynchronous transaction to batch journal writes. A
sequencer orders transaction execution and check write result while
an applier reads network.
Closes: #1254
---
src/box/applier.cc | 372 ++++++++++++++++++++++++++++++---------------
src/box/applier.h | 4 +
src/box/box.cc | 1 +
3 files changed, 251 insertions(+), 126 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 5a92f6109..9f0efda5a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -53,6 +53,231 @@
STRS(applier_state, applier_STATE);
+/**
+ * Process a no-op request.
+ *
+ * A no-op request does not affect any space, but it
+ * promotes vclock and is written to WAL.
+ */
+static int
+process_nop(struct request *request)
+{
+ assert(request->type == IPROTO_NOP);
+ struct txn *txn = in_txn();
+ if (txn_begin_stmt(txn, NULL) == NULL)
+ return -1;
+ return txn_commit_stmt(txn, request);
+}
+
+static int
+apply_row(struct xrow_header *row)
+{
+ struct request request;
+ if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
+ return -1;
+ if (request.type == IPROTO_NOP)
+ return process_nop(&request);
+ struct space *space = space_cache_find(request.space_id);
+ if (space == NULL)
+ return -1;
+ if (box_process_rw(&request, space, NULL) != 0) {
+ say_error("error applying row: %s", request_str(&request));
+ return -1;
+ }
+ return 0;
+}
+
+/**
+ * A helper struct to link xrow objects in a list.
+ */
+struct applier_tx_row {
+ /* Next transaction row. */
+ struct stailq_entry next;
+ /* xrow_header struct for the current transaction row. */
+ struct xrow_header row;
+};
+
+struct sequencer {
+ struct stailq txn_queue;
+ struct fiber_cond txn_queue_cond;
+ struct vclock net_vclock;
+ struct fiber_cond tx_vclock_cond;
+ struct diag diag;
+ struct rlist on_fail;
+};
+
+static struct sequencer sequencer;
+
+static int
+sequencer_collect_f(va_list ap)
+{
+ (void) ap;
+ while (!fiber_is_cancelled()) {
+ while (stailq_empty(&sequencer.txn_queue)) {
+ if (!diag_is_empty(&sequencer.diag)) {
+ diag_clear(&sequencer.diag);
+ vclock_copy(&sequencer.net_vclock, &replicaset.vclock);
+ }
+ fiber_cond_wait(&sequencer.txn_queue_cond);
+ continue;
+ }
+ struct txn *txn =
+ stailq_shift_entry(&sequencer.txn_queue, struct txn,
+ in_txn_cache);
+ if (txn_wait(txn) == 0) {
+ continue;
+ }
+ if (diag_is_empty(&sequencer.diag)) {
+ diag_move(&fiber()->diag, &sequencer.diag);
+ trigger_run(&sequencer.on_fail, NULL);
+ }
+ }
+ return 0;
+}
+
+void
+applier_init()
+{
+ stailq_create(&sequencer.txn_queue);
+ fiber_cond_create(&sequencer.txn_queue_cond);
+
+ rlist_create(&sequencer.on_fail);
+
+ vclock_create(&sequencer.net_vclock);
+ fiber_cond_create(&sequencer.tx_vclock_cond);
+ diag_create(&sequencer.diag);
+ struct fiber *collector = fiber_new("collector", sequencer_collect_f);
+ if (collector == NULL)
+ panic("Failed to create a sequencer collector fiber");
+ fiber_start(collector, NULL);
+}
+
+static inline void
+sequencer_on_fail(struct trigger *on_fail)
+{
+ trigger_add(&sequencer.on_fail, on_fail);
+}
+
+static void
+sequencer_rollback_cb(struct trigger *trigger, void *event)
+{
+ (void) trigger;
+ struct txn *txn = (struct txn *)event;
+ stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache);
+ fiber_cond_signal(&sequencer.txn_queue_cond);
+
+ diag_set(ClientError, ER_WAL_IO);
+ diag_move(&fiber()->diag, &sequencer.diag);
+ trigger_run(&sequencer.on_fail, &sequencer);
+}
+
+static void
+sequencer_commit_cb(struct trigger *trigger, void *event)
+{
+ (void) trigger;
+ (void) event;
+ struct txn *txn = (struct txn *)event;
+ stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache);
+ fiber_cond_signal(&sequencer.txn_queue_cond);
+ fiber_cond_broadcast(&sequencer.tx_vclock_cond);
+}
+
+static inline int
+sequencer_submit(uint32_t replica_id, int64_t lsn, struct stailq *rows)
+{
+ struct replica *replica = replica_by_id(replica_id);
+ struct latch *latch = (replica ? &replica->order_latch :
+ &replicaset.applier.order_latch);
+
+ latch_lock(latch);
+ if (vclock_get(&sequencer.net_vclock, replica_id) >= lsn) {
+ /* Nothing to do. */
+ latch_unlock(latch);
+ return 0;
+ }
+
+ struct trigger *on_rollback;
+ struct trigger *on_commit;
+ /**
+ * Explicitly begin the transaction so that we can
+ * control fiber->gc life cycle and, in case of apply
+ * conflict safely access failed xrow object and allocate
+ * IPROTO_NOP on gc.
+ */
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ goto fail;
+ struct applier_tx_row *item;
+ stailq_foreach_entry(item, rows, next) {
+ struct xrow_header *row = &item->row;
+ int res = apply_row(row);
+ if (res != 0) {
+ struct error *e = diag_last_error(diag_get());
+ /*
+ * In case of ER_TUPLE_FOUND error and enabled
+ * replication_skip_conflict configuration
+ * option, skip applying the foreign row and
+ * replace it with NOP in the local write ahead
+ * log.
+ */
+ if (e->type == &type_ClientError &&
+ box_error_code(e) == ER_TUPLE_FOUND &&
+ replication_skip_conflict) {
+ diag_clear(diag_get());
+ row->type = IPROTO_NOP;
+ row->bodycnt = 0;
+ res = apply_row(row);
+ }
+ }
+ if (res != 0)
+ goto rollback;
+ }
+ /*
+ * We are going to commit so it's a high time to check if
+ * the current transaction has non-local effects.
+ */
+ if (txn_is_distributed(txn)) {
+ /*
+ * A transaction mixes remote and local rows.
+ * Local rows must be replicated back, which
+ * doesn't make sense since the master likely has
+ * new changes which local rows may overwrite.
+ * Raise an error.
+ */
+ diag_set(ClientError, ER_UNSUPPORTED,
+ "Replication", "distributed transactions");
+ goto rollback;
+ }
+
+ /* We are ready to submit txn to wal. */
+ on_rollback = (struct trigger *)txn_alloc(txn, sizeof(struct trigger));
+ trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL);
+ txn_on_rollback(txn, on_rollback);
+
+ on_commit = (struct trigger *)txn_alloc(txn, sizeof(struct trigger));
+ trigger_create(on_commit, sequencer_commit_cb, NULL, NULL);
+ txn_on_commit(txn, on_commit);
+
+ if (txn_write(txn) != 0)
+ goto fail;
+
+ vclock_follow(&sequencer.net_vclock, replica_id, lsn);
+ latch_unlock(latch);
+ return 0;
+
+rollback:
+ txn_rollback(txn);
+
+fail:
+ latch_unlock(latch);
+ if (diag_is_empty(&sequencer.diag)) {
+ diag_add_error(&sequencer.diag, diag_last_error(&fiber()->diag));
+ trigger_run(&sequencer.on_fail, NULL);
+ }
+ return -1;
+}
+
+
static inline void
applier_set_state(struct applier *applier, enum applier_state state)
{
@@ -194,40 +419,6 @@ rollback:
return -1;
}
-/**
- * Process a no-op request.
- *
- * A no-op request does not affect any space, but it
- * promotes vclock and is written to WAL.
- */
-static int
-process_nop(struct request *request)
-{
- assert(request->type == IPROTO_NOP);
- struct txn *txn = in_txn();
- if (txn_begin_stmt(txn, NULL) == NULL)
- return -1;
- return txn_commit_stmt(txn, request);
-}
-
-static int
-apply_row(struct xrow_header *row)
-{
- struct request request;
- if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
- return -1;
- if (request.type == IPROTO_NOP)
- return process_nop(&request);
- struct space *space = space_cache_find(request.space_id);
- if (space == NULL)
- return -1;
- if (box_process_rw(&request, space, NULL) != 0) {
- say_error("error applying row: %s", request_str(&request));
- return -1;
- }
- return 0;
-}
-
/**
* Connect to a remote host and authenticate the client.
*/
@@ -450,16 +641,6 @@ applier_join(struct applier *applier)
applier_set_state(applier, APPLIER_READY);
}
-/**
- * A helper struct to link xrow objects in a list.
- */
-struct applier_tx_row {
- /* Next transaction row. */
- struct stailq_entry next;
- /* xrow_header struct for the current transaction row. */
- struct xrow_header row;
-};
-
static struct applier_tx_row *
applier_read_tx_row(struct applier *applier)
{
@@ -565,70 +746,14 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
next)->row.is_commit);
}
-/**
- * Apply all rows in the rows queue as a single transaction.
- *
- * Return 0 for success or -1 in case of an error.
- */
-static int
-applier_apply_tx(struct stailq *rows)
+static void
+applier_on_fail(struct trigger *trigger, void *event)
{
- /**
- * Explicitly begin the transaction so that we can
- * control fiber->gc life cycle and, in case of apply
- * conflict safely access failed xrow object and allocate
- * IPROTO_NOP on gc.
- */
- struct txn *txn = txn_begin();
- struct applier_tx_row *item;
- if (txn == NULL)
- diag_raise();
- stailq_foreach_entry(item, rows, next) {
- struct xrow_header *row = &item->row;
- int res = apply_row(row);
- if (res != 0) {
- struct error *e = diag_last_error(diag_get());
- /*
- * In case of ER_TUPLE_FOUND error and enabled
- * replication_skip_conflict configuration
- * option, skip applying the foreign row and
- * replace it with NOP in the local write ahead
- * log.
- */
- if (e->type == &type_ClientError &&
- box_error_code(e) == ER_TUPLE_FOUND &&
- replication_skip_conflict) {
- diag_clear(diag_get());
- row->type = IPROTO_NOP;
- row->bodycnt = 0;
- res = apply_row(row);
- }
- }
- if (res != 0)
- goto rollback;
- }
- /*
- * We are going to commit so it's a high time to check if
- * the current transaction has non-local effects.
- */
- if (txn_is_distributed(txn)) {
- /*
- * A transaction mixes remote and local rows.
- * Local rows must be replicated back, which
- * doesn't make sense since the master likely has
- * new changes which local rows may overwrite.
- * Raise an error.
- */
- diag_set(ClientError, ER_UNSUPPORTED,
- "Replication", "distributed transactions");
- goto rollback;
- }
- return txn_commit(txn);
-
-rollback:
- txn_rollback(txn);
- fiber_gc();
- return -1;
+ (void) event;
+ struct applier *applier = (struct applier *)trigger->data;
+ if (!diag_is_empty(&sequencer.diag))
+ diag_add_error(&applier->diag, diag_last_error(&sequencer.diag));
+ fiber_cancel(applier->reader);
}
/**
@@ -735,6 +860,10 @@ applier_subscribe(struct applier *applier)
applier->lag = TIMEOUT_INFINITY;
+ struct trigger on_fail;
+ trigger_create(&on_fail, applier_on_fail, applier, NULL);
+ sequencer_on_fail(&on_fail);
+
/*
* Process a stream of rows from the binary log.
*/
@@ -763,31 +892,16 @@ applier_subscribe(struct applier *applier)
struct stailq rows;
applier_read_tx(applier, &rows);
+ fiber_cond_signal(&applier->writer_cond);
struct xrow_header *first_row =
&stailq_first_entry(&rows, struct applier_tx_row,
next)->row;
applier->last_row_time = ev_monotonic_now(loop());
- struct replica *replica = replica_by_id(first_row->replica_id);
- struct latch *latch = (replica ? &replica->order_latch :
- &replicaset.applier.order_latch);
- /*
- * In a full mesh topology, the same set of changes
- * may arrive via two concurrently running appliers.
- * Hence we need a latch to strictly order all changes
- * that belong to the same server id.
- */
- latch_lock(latch);
- if (vclock_get(&replicaset.vclock, first_row->replica_id) <
- first_row->lsn &&
- applier_apply_tx(&rows) != 0) {
- latch_unlock(latch);
+ if (sequencer_submit(first_row->replica_id,
+ first_row->lsn, &rows) != 0) {
+ trigger_clear(&on_fail);
diag_raise();
}
- latch_unlock(latch);
-
- if (applier->state == APPLIER_SYNC ||
- applier->state == APPLIER_FOLLOW)
- fiber_cond_signal(&applier->writer_cond);
if (ibuf_used(ibuf) == 0)
ibuf_reset(ibuf);
fiber_gc();
@@ -872,6 +986,11 @@ applier_f(va_list ap)
return -1;
}
} catch (FiberIsCancelled *e) {
+ if (!diag_is_empty(&applier->diag)) {
+ diag_move(&applier->diag, &fiber()->diag);
+ applier_disconnect(applier, APPLIER_STOPPED);
+ break;
+ }
applier_disconnect(applier, APPLIER_OFF);
break;
} catch (SocketError *e) {
@@ -960,6 +1079,7 @@ applier_new(const char *uri)
rlist_create(&applier->on_state);
fiber_cond_create(&applier->resume_cond);
fiber_cond_create(&applier->writer_cond);
+ diag_create(&applier->diag);
return applier;
}
diff --git a/src/box/applier.h b/src/box/applier.h
index 5bff90031..b0e56add6 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -114,8 +114,12 @@ struct applier {
bool is_paused;
/** Condition variable signaled to resume the applier. */
struct fiber_cond resume_cond;
+ struct diag diag;
};
+void
+applier_init();
+
/**
* Start a client to a remote master using a background fiber.
*
diff --git a/src/box/box.cc b/src/box/box.cc
index 510f3fc99..49f8f24af 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2158,6 +2158,7 @@ box_cfg_xc(void)
port_init();
iproto_init();
sql_init();
+ applier_init();
int64_t wal_max_rows = box_check_wal_max_rows(cfg_geti64("rows_per_wal"));
int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
--
2.21.0
More information about the Tarantool-patches
mailing list