[tarantool-patches] [PATCH v5 6/7] applier: apply transaction in parallel

Georgy Kirichenko georgy at tarantool.org
Sat Jun 22 00:48:20 MSK 2019


Applier use asynchronous transaction to batch journal writes. All
appliers share the replicaset.applier.tx_vclock which means the vclock
applied but not necessarily written to a journal. Appliers use a trigger
to coordinate in case of failure - when a transaction is going to
be rolled back.

Closes: #1254
---
 src/box/applier.cc     | 188 ++++++++++++++++++++++++++++++++---------
 src/box/applier.h      |   7 ++
 src/box/replication.cc |   7 ++
 src/box/replication.h  |  11 +++
 4 files changed, 172 insertions(+), 41 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 6f93759a8..9465b071a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -50,6 +50,7 @@
 #include "schema.h"
 #include "txn.h"
 #include "box.h"
+#include "scoped_guard.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -108,6 +109,26 @@ applier_log_error(struct applier *applier, struct error *e)
 	applier->last_logged_errcode = errcode;
 }
 
+/*
+ * A helper function to track an applier state.
+ */
+static inline void
+applier_check_state(struct applier *applier)
+{
+	/*
+	 * Stay 'orphan' until appliers catch up with
+	 * the remote vclock at the time of SUBSCRIBE
+	 * and the lag is less than configured.
+	 */
+	if (applier->state == APPLIER_SYNC &&
+	    applier->lag <= replication_sync_lag &&
+	    vclock_compare(&applier->remote_vclock_at_subscribe,
+			   &replicaset.vclock) <= 0) {
+		/* Applier is synced, switch to "follow". */
+		applier_set_state(applier, APPLIER_FOLLOW);
+	}
+}
+
 /*
  * Fiber function to write vclock to replication master.
  * To track connection status, replica answers master
@@ -135,6 +156,12 @@ applier_writer_f(va_list ap)
 		else
 			fiber_cond_wait_timeout(&applier->writer_cond,
 						replication_timeout);
+		/* A writer fiber is going to be awaken after a commit or
+		 * a heartbeat message. So this is a appropriate place to
+		 * update an applier status because the applier state could
+		 * yield and doesn't fit into a commit trigger.
+		 */
+		applier_check_state(applier);
 		/* Send ACKs only when in FOLLOW mode ,*/
 		if (applier->state != APPLIER_SYNC &&
 		    applier->state != APPLIER_FOLLOW)
@@ -574,6 +601,27 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
 				    next)->row.is_commit);
 }
 
+/* A trigger to control a replicated transaction rollback. */
+static void
+applier_txn_rollback_cb(struct trigger *trigger, void *event)
+{
+	(void) trigger;
+	/* Setup shared applier diagnostic area. */
+	diag_set(ClientError, ER_WAL_IO);
+	diag_move(&fiber()->diag, &replicaset.applier.diag);
+	trigger_run(&replicaset.applier.on_rollback, event);
+	/* Rollback applier vclock to the commited one. */
+	vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+}
+
+static void
+applier_txn_commit_cb(struct trigger *trigger, void *event)
+{
+	(void) trigger;
+	/* Broadcast the commit event across all appliers. */
+	trigger_run(&replicaset.applier.on_commit, event);
+}
+
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
@@ -582,6 +630,19 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
 static int
 applier_apply_tx(struct stailq *rows)
 {
+	struct xrow_header *first_row =
+		&stailq_first_entry(rows, struct applier_tx_row,
+				    next)->row;
+	struct replica *replica = replica_by_id(first_row->replica_id);
+	struct latch *latch = (replica ? &replica->order_latch :
+			       &replicaset.applier.order_latch);
+	latch_lock(latch);
+	if (vclock_get(&replicaset.applier.vclock, first_row->replica_id) >=
+	    first_row->lsn) {
+		latch_unlock(latch);
+		return 0;
+	}
+
 	/**
 	 * Explicitly begin the transaction so that we can
 	 * control fiber->gc life cycle and, in case of apply
@@ -590,8 +651,10 @@ applier_apply_tx(struct stailq *rows)
 	 */
 	struct txn *txn = txn_begin();
 	struct applier_tx_row *item;
-	if (txn == NULL)
-		diag_raise();
+	if (txn == NULL) {
+		latch_unlock(latch);
+		return -1;
+	}
 	stailq_foreach_entry(item, rows, next) {
 		struct xrow_header *row = &item->row;
 		int res = apply_row(row);
@@ -632,14 +695,63 @@ applier_apply_tx(struct stailq *rows)
 			 "Replication", "distributed transactions");
 		goto rollback;
 	}
-	return txn_commit(txn);
 
+	/* We are ready to submit txn to wal. */
+	struct trigger *on_rollback, *on_commit;
+	on_rollback = (struct trigger *)region_alloc(&txn->region,
+						     sizeof(struct trigger));
+	on_commit = (struct trigger *)region_alloc(&txn->region,
+						   sizeof(struct trigger));
+	if (on_rollback == NULL || on_commit == NULL)
+		goto rollback;
+
+	trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
+	txn_on_rollback(txn, on_rollback);
+
+	trigger_create(on_commit, applier_txn_commit_cb, NULL, NULL);
+	txn_on_commit(txn, on_commit);
+
+	if (txn_write(txn) < 0)
+		goto fail;
+	/* Transaction was sent to journal so promote vclock. */
+	vclock_follow(&replicaset.applier.vclock, first_row->replica_id,
+		      first_row->lsn);
+	latch_unlock(latch);
+
+	return 0;
 rollback:
 	txn_rollback(txn);
+fail:
+	latch_unlock(latch);
 	fiber_gc();
 	return -1;
 }
 
+/*
+ * A trigger to update an applier state after a replication commit.
+ */
+static void
+applier_on_commit(struct trigger *trigger, void *event)
+{
+	(void) event;
+	struct applier *applier = (struct applier *)trigger->data;
+	fiber_cond_signal(&applier->writer_cond);
+}
+
+/*
+ * A trigger to update an applier state after a replication rollback.
+ */
+static void
+applier_on_rollback(struct trigger *trigger, void *event)
+{
+	(void) event;
+	struct applier *applier = (struct applier *)trigger->data;
+	/* Setup a shared error. */
+	if (!diag_is_empty(&replicaset.applier.diag))
+		diag_add_error(&applier->diag, diag_last_error(&replicaset.applier.diag));
+	fiber_cancel(applier->reader);
+}
+
 /**
  * Execute and process SUBSCRIBE request (follow updates from a master).
  */
@@ -650,7 +762,6 @@ applier_subscribe(struct applier *applier)
 	struct ev_io *coio = &applier->io;
 	struct ibuf *ibuf = &applier->ibuf;
 	struct xrow_header row;
-	struct vclock remote_vclock_at_subscribe;
 	struct tt_uuid cluster_id = uuid_nil;
 
 	struct vclock vclock;
@@ -677,10 +788,10 @@ applier_subscribe(struct applier *applier)
 		 * the replica, and replica has to check whether
 		 * its and master's cluster ids match.
 		 */
-		vclock_create(&remote_vclock_at_subscribe);
+		vclock_create(&applier->remote_vclock_at_subscribe);
 		xrow_decode_subscribe_response_xc(&row,
 						  &cluster_id,
-						  &remote_vclock_at_subscribe);
+						  &applier->remote_vclock_at_subscribe);
 		/*
 		 * If master didn't send us its cluster id
 		 * assume that it has done all the checks.
@@ -695,7 +806,7 @@ applier_subscribe(struct applier *applier)
 
 		say_info("subscribed");
 		say_info("remote vclock %s local vclock %s",
-			 vclock_to_string(&remote_vclock_at_subscribe),
+			 vclock_to_string(&applier->remote_vclock_at_subscribe),
 			 vclock_to_string(&vclock));
 	}
 	/*
@@ -744,6 +855,21 @@ applier_subscribe(struct applier *applier)
 
 	applier->lag = TIMEOUT_INFINITY;
 
+	/* Register triggers to handle replication commits and rollbacks. */
+	struct trigger on_commit;
+	trigger_create(&on_commit, applier_on_commit, applier, NULL);
+	trigger_add(&replicaset.applier.on_commit, &on_commit);
+
+	struct trigger on_rollback;
+	trigger_create(&on_rollback, applier_on_rollback, applier, NULL);
+	trigger_add(&replicaset.applier.on_rollback, &on_rollback);
+
+	auto trigger_guard = make_scoped_guard([&] {
+		trigger_clear(&on_commit);
+		trigger_clear(&on_rollback);
+	});
+
+
 	/*
 	 * Process a stream of rows from the binary log.
 	 */
@@ -756,47 +882,22 @@ applier_subscribe(struct applier *applier)
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
-		/*
-		 * Stay 'orphan' until appliers catch up with
-		 * the remote vclock at the time of SUBSCRIBE
-		 * and the lag is less than configured.
-		 */
-		if (applier->state == APPLIER_SYNC &&
-		    applier->lag <= replication_sync_lag &&
-		    vclock_compare(&remote_vclock_at_subscribe,
-				   &replicaset.vclock) <= 0) {
-			/* Applier is synced, switch to "follow". */
-			applier_set_state(applier, APPLIER_FOLLOW);
-		}
-
 		struct stailq rows;
 		applier_read_tx(applier, &rows);
 
-		struct xrow_header *first_row =
-			&stailq_first_entry(&rows, struct applier_tx_row,
-					    next)->row;
 		applier->last_row_time = ev_monotonic_now(loop());
-		struct replica *replica = replica_by_id(first_row->replica_id);
-		struct latch *latch = (replica ? &replica->order_latch :
-				       &replicaset.applier.order_latch);
 		/*
-		 * In a full mesh topology, the same set of changes
-		 * may arrive via two concurrently running appliers.
-		 * Hence we need a latch to strictly order all changes
-		 * that belong to the same server id.
+		 * In case of an heathbeat message wake a writer up and
+		 * check aplier state.
 		 */
-		latch_lock(latch);
-		if (vclock_get(&replicaset.vclock, first_row->replica_id) <
-		    first_row->lsn &&
-		    applier_apply_tx(&rows) != 0) {
-			latch_unlock(latch);
-			diag_raise();
+		if (stailq_first_entry(&rows, struct applier_tx_row,
+				       next)->row.lsn == 0) {
+			fiber_cond_signal(&applier->writer_cond);
+		//	applier_check_state(applier);
 		}
-		latch_unlock(latch);
+		else if (applier_apply_tx(&rows) != 0)
+			diag_raise();
 
-		if (applier->state == APPLIER_SYNC ||
-		    applier->state == APPLIER_FOLLOW)
-			fiber_cond_signal(&applier->writer_cond);
 		if (ibuf_used(ibuf) == 0)
 			ibuf_reset(ibuf);
 		fiber_gc();
@@ -881,6 +982,11 @@ applier_f(va_list ap)
 				return -1;
 			}
 		} catch (FiberIsCancelled *e) {
+			if (!diag_is_empty(&applier->diag)) {
+				diag_move(&applier->diag, &fiber()->diag);
+				applier_disconnect(applier, APPLIER_STOPPED);
+				break;
+			}
 			applier_disconnect(applier, APPLIER_OFF);
 			break;
 		} catch (SocketError *e) {
@@ -969,6 +1075,7 @@ applier_new(const char *uri)
 	rlist_create(&applier->on_state);
 	fiber_cond_create(&applier->resume_cond);
 	fiber_cond_create(&applier->writer_cond);
+	diag_create(&applier->diag);
 
 	return applier;
 }
@@ -981,7 +1088,6 @@ applier_delete(struct applier *applier)
 	assert(applier->io.fd == -1);
 	trigger_destroy(&applier->on_state);
 	fiber_cond_destroy(&applier->resume_cond);
-	fiber_cond_destroy(&applier->writer_cond);
 	free(applier);
 }
 
diff --git a/src/box/applier.h b/src/box/applier.h
index 5bff90031..b9bb14198 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -114,8 +114,15 @@ struct applier {
 	bool is_paused;
 	/** Condition variable signaled to resume the applier. */
 	struct fiber_cond resume_cond;
+	/* Diag to raise an error. */
+	struct diag diag;
+	/* The masters vclock while subscribe. */
+	struct vclock remote_vclock_at_subscribe;
 };
 
+void
+applier_init();
+
 /**
  * Start a client to a remote master using a background fiber.
  *
diff --git a/src/box/replication.cc b/src/box/replication.cc
index a1a2a9eb3..617b9538f 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -90,6 +90,13 @@ replication_init(void)
 	fiber_cond_create(&replicaset.applier.cond);
 	replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *));
 	latch_create(&replicaset.applier.order_latch);
+
+	vclock_create(&replicaset.applier.vclock);
+	vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+	rlist_create(&replicaset.applier.on_rollback);
+	rlist_create(&replicaset.applier.on_commit);
+
+	diag_create(&replicaset.applier.diag);
 }
 
 void
diff --git a/src/box/replication.h b/src/box/replication.h
index 8c8a9927e..19f283c7d 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -232,6 +232,17 @@ struct replicaset {
 		 * struct replica object).
 		 */
 		struct latch order_latch;
+		/*
+		 * A vclock of the last transaction wich was read
+		 * by applier and processed by tx.
+		 */
+		struct vclock vclock;
+		/* Trigger to fire when a replication request failed to apply. */
+		struct rlist on_rollback;
+		/* Trigget to fire a replication request commited to a wal. */
+		struct rlist on_commit;
+		/* Shared applier diagnostic area. */
+		struct diag diag;
 	} applier;
 	/** Map of all known replica_id's to correspponding replica's. */
 	struct replica **replica_by_id;
-- 
2.22.0





More information about the Tarantool-patches mailing list