[tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel

Georgy Kirichenko georgy at tarantool.org
Sun Jun 9 23:44:42 MSK 2019


Applier use asynchronous transaction to batch journal writes. A
sequencer orders transaction execution and check write result while
an applier reads network.

Closes: #1254
---
 src/box/applier.cc | 372 ++++++++++++++++++++++++++++++---------------
 src/box/applier.h  |   4 +
 src/box/box.cc     |   1 +
 3 files changed, 251 insertions(+), 126 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 5a92f6109..9f0efda5a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -53,6 +53,231 @@
 
 STRS(applier_state, applier_STATE);
 
+/**
+ * Process a no-op request.
+ *
+ * A no-op request does not affect any space, but it
+ * promotes vclock and is written to WAL.
+ */
+static int
+process_nop(struct request *request)
+{
+	assert(request->type == IPROTO_NOP);
+	struct txn *txn = in_txn();
+	if (txn_begin_stmt(txn, NULL) == NULL)
+		return -1;
+	return txn_commit_stmt(txn, request);
+}
+
+static int
+apply_row(struct xrow_header *row)
+{
+	struct request request;
+	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
+		return -1;
+	if (request.type == IPROTO_NOP)
+		return process_nop(&request);
+	struct space *space = space_cache_find(request.space_id);
+	if (space == NULL)
+		return -1;
+	if (box_process_rw(&request, space, NULL) != 0) {
+		say_error("error applying row: %s", request_str(&request));
+		return -1;
+	}
+	return 0;
+}
+
+/**
+ * A helper struct to link xrow objects in a list.
+ */
+struct applier_tx_row {
+	/* Next transaction row. */
+	struct stailq_entry next;
+	/* xrow_header struct for the current transaction row. */
+	struct xrow_header row;
+};
+
+struct sequencer {
+	struct stailq txn_queue;
+	struct fiber_cond txn_queue_cond;
+	struct vclock net_vclock;
+	struct fiber_cond tx_vclock_cond;
+	struct diag diag;
+	struct rlist on_fail;
+};
+
+static struct sequencer sequencer;
+
+static int
+sequencer_collect_f(va_list ap)
+{
+	(void) ap;
+	while (!fiber_is_cancelled()) {
+		while (stailq_empty(&sequencer.txn_queue)) {
+			if (!diag_is_empty(&sequencer.diag)) {
+				diag_clear(&sequencer.diag);
+				vclock_copy(&sequencer.net_vclock, &replicaset.vclock);
+			}
+			fiber_cond_wait(&sequencer.txn_queue_cond);
+			continue;
+		}
+		struct txn *txn =
+			stailq_shift_entry(&sequencer.txn_queue, struct txn,
+					   in_txn_cache);
+		if (txn_wait(txn) == 0) {
+			continue;
+		}
+		if (diag_is_empty(&sequencer.diag)) {
+			diag_move(&fiber()->diag, &sequencer.diag);
+			trigger_run(&sequencer.on_fail, NULL);
+		}
+	}
+	return 0;
+}
+
+void
+applier_init()
+{
+	stailq_create(&sequencer.txn_queue);
+	fiber_cond_create(&sequencer.txn_queue_cond);
+
+	rlist_create(&sequencer.on_fail);
+
+	vclock_create(&sequencer.net_vclock);
+	fiber_cond_create(&sequencer.tx_vclock_cond);
+	diag_create(&sequencer.diag);
+	struct fiber *collector = fiber_new("collector", sequencer_collect_f);
+	if (collector == NULL)
+		panic("Failed to create a sequencer collector fiber");
+	fiber_start(collector, NULL);
+}
+
+static inline void
+sequencer_on_fail(struct trigger *on_fail)
+{
+	trigger_add(&sequencer.on_fail, on_fail);
+}
+
+static void
+sequencer_rollback_cb(struct trigger *trigger, void *event)
+{
+	(void) trigger;
+	struct txn *txn = (struct txn *)event;
+	stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache);
+	fiber_cond_signal(&sequencer.txn_queue_cond);
+
+	diag_set(ClientError, ER_WAL_IO);
+	diag_move(&fiber()->diag, &sequencer.diag);
+	trigger_run(&sequencer.on_fail, &sequencer);
+}
+
+static void
+sequencer_commit_cb(struct trigger *trigger, void *event)
+{
+	(void) trigger;
+	(void) event;
+	struct txn *txn = (struct txn *)event;
+	stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache);
+	fiber_cond_signal(&sequencer.txn_queue_cond);
+	fiber_cond_broadcast(&sequencer.tx_vclock_cond);
+}
+
+static inline int
+sequencer_submit(uint32_t replica_id, int64_t lsn, struct stailq *rows)
+{
+	struct replica *replica = replica_by_id(replica_id);
+	struct latch *latch = (replica ? &replica->order_latch :
+			      &replicaset.applier.order_latch);
+
+	latch_lock(latch);
+	if (vclock_get(&sequencer.net_vclock, replica_id) >= lsn) {
+		/* Nothing to do. */
+		latch_unlock(latch);
+		return 0;
+	}
+
+	struct trigger *on_rollback;
+	struct trigger *on_commit;
+	/**
+	 * Explicitly begin the transaction so that we can
+	 * control fiber->gc life cycle and, in case of apply
+	 * conflict safely access failed xrow object and allocate
+	 * IPROTO_NOP on gc.
+	 */
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		goto fail;
+	struct applier_tx_row *item;
+	stailq_foreach_entry(item, rows, next) {
+		struct xrow_header *row = &item->row;
+		int res = apply_row(row);
+		if (res != 0) {
+			struct error *e = diag_last_error(diag_get());
+			/*
+			 * In case of ER_TUPLE_FOUND error and enabled
+			 * replication_skip_conflict configuration
+			 * option, skip applying the foreign row and
+			 * replace it with NOP in the local write ahead
+			 * log.
+			 */
+			if (e->type == &type_ClientError &&
+			    box_error_code(e) == ER_TUPLE_FOUND &&
+			    replication_skip_conflict) {
+				diag_clear(diag_get());
+				row->type = IPROTO_NOP;
+				row->bodycnt = 0;
+				res = apply_row(row);
+			}
+		}
+		if (res != 0)
+			goto rollback;
+	}
+	/*
+	 * We are going to commit so it's a high time to check if
+	 * the current transaction has non-local effects.
+	 */
+	if (txn_is_distributed(txn)) {
+		/*
+		 * A transaction mixes remote and local rows.
+		 * Local rows must be replicated back, which
+		 * doesn't make sense since the master likely has
+		 * new changes which local rows may overwrite.
+		 * Raise an error.
+		 */
+		diag_set(ClientError, ER_UNSUPPORTED,
+			 "Replication", "distributed transactions");
+		goto rollback;
+	}
+
+	/* We are ready to submit txn to wal. */
+	on_rollback = (struct trigger *)txn_alloc(txn, sizeof(struct trigger));
+	trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL);
+	txn_on_rollback(txn, on_rollback);
+
+	on_commit = (struct trigger *)txn_alloc(txn, sizeof(struct trigger));
+	trigger_create(on_commit, sequencer_commit_cb, NULL, NULL);
+	txn_on_commit(txn, on_commit);
+
+	if (txn_write(txn) != 0)
+		goto fail;
+
+	vclock_follow(&sequencer.net_vclock, replica_id, lsn);
+	latch_unlock(latch);
+	return 0;
+
+rollback:
+	txn_rollback(txn);
+
+fail:
+	latch_unlock(latch);
+	if (diag_is_empty(&sequencer.diag)) {
+		diag_add_error(&sequencer.diag, diag_last_error(&fiber()->diag));
+		trigger_run(&sequencer.on_fail, NULL);
+	}
+	return -1;
+}
+
+
 static inline void
 applier_set_state(struct applier *applier, enum applier_state state)
 {
@@ -194,40 +419,6 @@ rollback:
 	return -1;
 }
 
-/**
- * Process a no-op request.
- *
- * A no-op request does not affect any space, but it
- * promotes vclock and is written to WAL.
- */
-static int
-process_nop(struct request *request)
-{
-	assert(request->type == IPROTO_NOP);
-	struct txn *txn = in_txn();
-	if (txn_begin_stmt(txn, NULL) == NULL)
-		return -1;
-	return txn_commit_stmt(txn, request);
-}
-
-static int
-apply_row(struct xrow_header *row)
-{
-	struct request request;
-	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
-		return -1;
-	if (request.type == IPROTO_NOP)
-		return process_nop(&request);
-	struct space *space = space_cache_find(request.space_id);
-	if (space == NULL)
-		return -1;
-	if (box_process_rw(&request, space, NULL) != 0) {
-		say_error("error applying row: %s", request_str(&request));
-		return -1;
-	}
-	return 0;
-}
-
 /**
  * Connect to a remote host and authenticate the client.
  */
@@ -450,16 +641,6 @@ applier_join(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
-/**
- * A helper struct to link xrow objects in a list.
- */
-struct applier_tx_row {
-	/* Next transaction row. */
-	struct stailq_entry next;
-	/* xrow_header struct for the current transaction row. */
-	struct xrow_header row;
-};
-
 static struct applier_tx_row *
 applier_read_tx_row(struct applier *applier)
 {
@@ -565,70 +746,14 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
 				    next)->row.is_commit);
 }
 
-/**
- * Apply all rows in the rows queue as a single transaction.
- *
- * Return 0 for success or -1 in case of an error.
- */
-static int
-applier_apply_tx(struct stailq *rows)
+static void
+applier_on_fail(struct trigger *trigger, void *event)
 {
-	/**
-	 * Explicitly begin the transaction so that we can
-	 * control fiber->gc life cycle and, in case of apply
-	 * conflict safely access failed xrow object and allocate
-	 * IPROTO_NOP on gc.
-	 */
-	struct txn *txn = txn_begin();
-	struct applier_tx_row *item;
-	if (txn == NULL)
-		diag_raise();
-	stailq_foreach_entry(item, rows, next) {
-		struct xrow_header *row = &item->row;
-		int res = apply_row(row);
-		if (res != 0) {
-			struct error *e = diag_last_error(diag_get());
-			/*
-			 * In case of ER_TUPLE_FOUND error and enabled
-			 * replication_skip_conflict configuration
-			 * option, skip applying the foreign row and
-			 * replace it with NOP in the local write ahead
-			 * log.
-			 */
-			if (e->type == &type_ClientError &&
-			    box_error_code(e) == ER_TUPLE_FOUND &&
-			    replication_skip_conflict) {
-				diag_clear(diag_get());
-				row->type = IPROTO_NOP;
-				row->bodycnt = 0;
-				res = apply_row(row);
-			}
-		}
-		if (res != 0)
-			goto rollback;
-	}
-	/*
-	 * We are going to commit so it's a high time to check if
-	 * the current transaction has non-local effects.
-	 */
-	if (txn_is_distributed(txn)) {
-		/*
-		 * A transaction mixes remote and local rows.
-		 * Local rows must be replicated back, which
-		 * doesn't make sense since the master likely has
-		 * new changes which local rows may overwrite.
-		 * Raise an error.
-		 */
-		diag_set(ClientError, ER_UNSUPPORTED,
-			 "Replication", "distributed transactions");
-		goto rollback;
-	}
-	return txn_commit(txn);
-
-rollback:
-	txn_rollback(txn);
-	fiber_gc();
-	return -1;
+	(void) event;
+	struct applier *applier = (struct applier *)trigger->data;
+	if (!diag_is_empty(&sequencer.diag))
+		diag_add_error(&applier->diag, diag_last_error(&sequencer.diag));
+	fiber_cancel(applier->reader);
 }
 
 /**
@@ -735,6 +860,10 @@ applier_subscribe(struct applier *applier)
 
 	applier->lag = TIMEOUT_INFINITY;
 
+	struct trigger on_fail;
+	trigger_create(&on_fail, applier_on_fail, applier, NULL);
+	sequencer_on_fail(&on_fail);
+
 	/*
 	 * Process a stream of rows from the binary log.
 	 */
@@ -763,31 +892,16 @@ applier_subscribe(struct applier *applier)
 		struct stailq rows;
 		applier_read_tx(applier, &rows);
 
+		fiber_cond_signal(&applier->writer_cond);
 		struct xrow_header *first_row =
 			&stailq_first_entry(&rows, struct applier_tx_row,
 					    next)->row;
 		applier->last_row_time = ev_monotonic_now(loop());
-		struct replica *replica = replica_by_id(first_row->replica_id);
-		struct latch *latch = (replica ? &replica->order_latch :
-				       &replicaset.applier.order_latch);
-		/*
-		 * In a full mesh topology, the same set of changes
-		 * may arrive via two concurrently running appliers.
-		 * Hence we need a latch to strictly order all changes
-		 * that belong to the same server id.
-		 */
-		latch_lock(latch);
-		if (vclock_get(&replicaset.vclock, first_row->replica_id) <
-		    first_row->lsn &&
-		    applier_apply_tx(&rows) != 0) {
-			latch_unlock(latch);
+		if (sequencer_submit(first_row->replica_id,
+				     first_row->lsn, &rows) != 0) {
+			trigger_clear(&on_fail);
 			diag_raise();
 		}
-		latch_unlock(latch);
-
-		if (applier->state == APPLIER_SYNC ||
-		    applier->state == APPLIER_FOLLOW)
-			fiber_cond_signal(&applier->writer_cond);
 		if (ibuf_used(ibuf) == 0)
 			ibuf_reset(ibuf);
 		fiber_gc();
@@ -872,6 +986,11 @@ applier_f(va_list ap)
 				return -1;
 			}
 		} catch (FiberIsCancelled *e) {
+			if (!diag_is_empty(&applier->diag)) {
+				diag_move(&applier->diag, &fiber()->diag);
+				applier_disconnect(applier, APPLIER_STOPPED);
+				break;
+			}
 			applier_disconnect(applier, APPLIER_OFF);
 			break;
 		} catch (SocketError *e) {
@@ -960,6 +1079,7 @@ applier_new(const char *uri)
 	rlist_create(&applier->on_state);
 	fiber_cond_create(&applier->resume_cond);
 	fiber_cond_create(&applier->writer_cond);
+	diag_create(&applier->diag);
 
 	return applier;
 }
diff --git a/src/box/applier.h b/src/box/applier.h
index 5bff90031..b0e56add6 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -114,8 +114,12 @@ struct applier {
 	bool is_paused;
 	/** Condition variable signaled to resume the applier. */
 	struct fiber_cond resume_cond;
+	struct diag diag;
 };
 
+void
+applier_init();
+
 /**
  * Start a client to a remote master using a background fiber.
  *
diff --git a/src/box/box.cc b/src/box/box.cc
index 510f3fc99..49f8f24af 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2158,6 +2158,7 @@ box_cfg_xc(void)
 	port_init();
 	iproto_init();
 	sql_init();
+	applier_init();
 
 	int64_t wal_max_rows = box_check_wal_max_rows(cfg_geti64("rows_per_wal"));
 	int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
-- 
2.21.0





More information about the Tarantool-patches mailing list