[tarantool-patches] [PATCH v5 0/7] Parallel applier

Vladimir Davydov vdavydov.dev at gmail.com
Tue Jun 25 19:08:44 MSK 2019


The patch set look good to me. I pushed it to master with some very
minor changes primarily regarding coding style (see the diff below).
Note I didn't push the patch fixing tests, because

 - replication/transaction.test.lua works even without it
 - although replication/sync.test.lua fails occasionally, I don't
   believe that your fix is quite correct. I'd rather disable the
   test instead for now. BTW we have a ticket to fix it properly:

   https://github.com/tarantool/tarantool/issues/4129

diff --git a/src/box/alter.cc b/src/box/alter.cc
index cb5f2b17..e76b9e68 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3576,7 +3576,7 @@ unlock_after_dd(struct trigger *trigger, void *event)
 	(void) trigger;
 	(void) event;
 	/*
-	 * In case of yielding journal will this trigger be processed
+	 * In case of yielding journal this trigger will be processed
 	 * in a context of tx_prio endpoint instead of a context of
 	 * a fiber which has this latch locked. So steal the latch first.
 	 */
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 9465b071..cf03ea16 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -109,11 +109,12 @@ applier_log_error(struct applier *applier, struct error *e)
 	applier->last_logged_errcode = errcode;
 }
 
-/*
- * A helper function to track an applier state.
+/**
+ * A helper function which switches the applier to FOLLOW state
+ * if it has synchronized with its master.
  */
 static inline void
-applier_check_state(struct applier *applier)
+applier_check_sync(struct applier *applier)
 {
 	/*
 	 * Stay 'orphan' until appliers catch up with
@@ -156,12 +157,14 @@ applier_writer_f(va_list ap)
 		else
 			fiber_cond_wait_timeout(&applier->writer_cond,
 						replication_timeout);
-		/* A writer fiber is going to be awaken after a commit or
-		 * a heartbeat message. So this is a appropriate place to
+		/*
+		 * A writer fiber is going to be awaken after a commit or
+		 * a heartbeat message. So this is an appropriate place to
 		 * update an applier status because the applier state could
 		 * yield and doesn't fit into a commit trigger.
 		 */
-		applier_check_state(applier);
+		applier_check_sync(applier);
+
 		/* Send ACKs only when in FOLLOW mode ,*/
 		if (applier->state != APPLIER_SYNC &&
 		    applier->state != APPLIER_FOLLOW)
@@ -601,7 +604,6 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
 				    next)->row.is_commit);
 }
 
-/* A trigger to control a replicated transaction rollback. */
 static void
 applier_txn_rollback_cb(struct trigger *trigger, void *event)
 {
@@ -609,8 +611,9 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
 	/* Setup shared applier diagnostic area. */
 	diag_set(ClientError, ER_WAL_IO);
 	diag_move(&fiber()->diag, &replicaset.applier.diag);
+	/* Broadcast the rollback event across all appliers. */
 	trigger_run(&replicaset.applier.on_rollback, event);
-	/* Rollback applier vclock to the commited one. */
+	/* Rollback applier vclock to the committed one. */
 	vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
 }
 
@@ -630,15 +633,20 @@ applier_txn_commit_cb(struct trigger *trigger, void *event)
 static int
 applier_apply_tx(struct stailq *rows)
 {
-	struct xrow_header *first_row =
-		&stailq_first_entry(rows, struct applier_tx_row,
-				    next)->row;
+	struct xrow_header *first_row = &stailq_first_entry(rows,
+					struct applier_tx_row, next)->row;
 	struct replica *replica = replica_by_id(first_row->replica_id);
+	/*
+	 * In a full mesh topology, the same set of changes
+	 * may arrive via two concurrently running appliers.
+	 * Hence we need a latch to strictly order all changes
+	 * that belong to the same server id.
+	 */
 	struct latch *latch = (replica ? &replica->order_latch :
 			       &replicaset.applier.order_latch);
 	latch_lock(latch);
-	if (vclock_get(&replicaset.applier.vclock, first_row->replica_id) >=
-	    first_row->lsn) {
+	if (vclock_get(&replicaset.applier.vclock,
+		       first_row->replica_id) >= first_row->lsn) {
 		latch_unlock(latch);
 		return 0;
 	}
@@ -713,11 +721,11 @@ applier_apply_tx(struct stailq *rows)
 
 	if (txn_write(txn) < 0)
 		goto fail;
+
 	/* Transaction was sent to journal so promote vclock. */
-	vclock_follow(&replicaset.applier.vclock, first_row->replica_id,
-		      first_row->lsn);
+	vclock_follow(&replicaset.applier.vclock,
+		      first_row->replica_id, first_row->lsn);
 	latch_unlock(latch);
-
 	return 0;
 rollback:
 	txn_rollback(txn);
@@ -747,8 +755,11 @@ applier_on_rollback(struct trigger *trigger, void *event)
 	(void) event;
 	struct applier *applier = (struct applier *)trigger->data;
 	/* Setup a shared error. */
-	if (!diag_is_empty(&replicaset.applier.diag))
-		diag_add_error(&applier->diag, diag_last_error(&replicaset.applier.diag));
+	if (!diag_is_empty(&replicaset.applier.diag)) {
+		diag_add_error(&applier->diag,
+			       diag_last_error(&replicaset.applier.diag));
+	}
+	/* Stop the applier fiber. */
 	fiber_cancel(applier->reader);
 }
 
@@ -789,9 +800,8 @@ applier_subscribe(struct applier *applier)
 		 * its and master's cluster ids match.
 		 */
 		vclock_create(&applier->remote_vclock_at_subscribe);
-		xrow_decode_subscribe_response_xc(&row,
-						  &cluster_id,
-						  &applier->remote_vclock_at_subscribe);
+		xrow_decode_subscribe_response_xc(&row, &cluster_id,
+					&applier->remote_vclock_at_subscribe);
 		/*
 		 * If master didn't send us its cluster id
 		 * assume that it has done all the checks.
@@ -869,7 +879,6 @@ applier_subscribe(struct applier *applier)
 		trigger_clear(&on_rollback);
 	});
 
-
 	/*
 	 * Process a stream of rows from the binary log.
 	 */
@@ -887,14 +896,12 @@ applier_subscribe(struct applier *applier)
 
 		applier->last_row_time = ev_monotonic_now(loop());
 		/*
-		 * In case of an heathbeat message wake a writer up and
-		 * check aplier state.
+		 * In case of an heartbeat message wake a writer up
+		 * and check applier state.
 		 */
 		if (stailq_first_entry(&rows, struct applier_tx_row,
-				       next)->row.lsn == 0) {
+				       next)->row.lsn == 0)
 			fiber_cond_signal(&applier->writer_cond);
-		//	applier_check_state(applier);
-		}
 		else if (applier_apply_tx(&rows) != 0)
 			diag_raise();
 
@@ -1087,7 +1094,7 @@ applier_delete(struct applier *applier)
 	ibuf_destroy(&applier->ibuf);
 	assert(applier->io.fd == -1);
 	trigger_destroy(&applier->on_state);
-	fiber_cond_destroy(&applier->resume_cond);
+	diag_destroy(&applier->diag);
 	free(applier);
 }
 
diff --git a/src/box/applier.h b/src/box/applier.h
index b9bb1419..b406e6aa 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -116,13 +116,10 @@ struct applier {
 	struct fiber_cond resume_cond;
 	/* Diag to raise an error. */
 	struct diag diag;
-	/* The masters vclock while subscribe. */
+	/* Master's vclock at the time of SUBSCRIBE. */
 	struct vclock remote_vclock_at_subscribe;
 };
 
-void
-applier_init();
-
 /**
  * Start a client to a remote master using a background fiber.
  *
diff --git a/src/box/box.cc b/src/box/box.cc
index f5bd29dd..80249919 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -308,7 +308,7 @@ recovery_journal_write(struct journal *base,
 	struct recovery_journal *journal = (struct recovery_journal *) base;
 	entry->res = vclock_sum(journal->vclock);
 	journal_entry_complete(entry);
-	return entry->res;
+	return 0;
 }
 
 static inline void
diff --git a/src/box/journal.h b/src/box/journal.h
index cac82c15..236058bb 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -62,7 +62,7 @@ struct journal_entry {
 	int64_t res;
 	/**
 	 * A journal entry finalization callback which is going to be called
-	 * after the entry processing was winished in both cases: succes
+	 * after the entry processing was finished in both cases: success
 	 * or fail. Entry->res is set to a result value before the callback
 	 * is fired.
 	 */
@@ -98,7 +98,7 @@ journal_entry_new(size_t n_rows, struct region *region,
 		  void *on_done_cb_data);
 
 /**
- * Finalize a signle entry.
+ * Finalize a single entry.
  */
 static inline void
 journal_entry_complete(struct journal_entry *entry)
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 617b9538..28f7aced 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -110,6 +110,7 @@ replication_free(void)
 	replicaset_foreach(replica)
 		relay_cancel(replica->relay);
 
+	diag_destroy(&replicaset.applier.diag);
 	free(replicaset.replica_by_id);
 }
 
diff --git a/src/box/txn.c b/src/box/txn.c
index f331642f..95076773 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -192,6 +192,7 @@ txn_begin()
 	txn->n_local_rows = 0;
 	txn->n_applier_rows = 0;
 	txn->has_triggers  = false;
+	txn->is_done = false;
 	txn->is_aborted = false;
 	txn->in_sub_stmt = 0;
 	txn->id = ++tsn;
@@ -199,9 +200,7 @@ txn_begin()
 	txn->engine = NULL;
 	txn->engine_tx = NULL;
 	txn->psql_txn = NULL;
-	txn->entry = NULL;
 	txn->fiber = NULL;
-	txn->done = false;
 	/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
 	fiber_set_txn(fiber(), txn);
 	trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
@@ -344,22 +343,22 @@ fail:
  * A helper function to process on_commit/on_rollback triggers.
  */
 static inline void
-txn_process_trigger(struct rlist *trigger, struct txn *txn)
+txn_run_triggers(struct txn *txn, struct rlist *trigger)
 {
 	/*
-	 * Some of triggers require for in_txn variable is set so
-	 * restore it for time a trigger is in progress.
+	 * Some triggers require for in_txn variable to be set so
+	 * restore it for the time triggers are in progress.
 	 */
 	fiber_set_txn(fiber(), txn);
 	/* Rollback triggers must not throw. */
 	if (trigger_run(trigger, txn) != 0) {
 		/*
 		 * As transaction couldn't handle a trigger error so
-		 * there is no option except than panic.
+		 * there is no option except panic.
 		 */
 		diag_log();
 		unreachable();
-		panic("rollback trigger failed");
+		panic("commit/rollback trigger failed");
 	}
 	fiber_set_txn(fiber(), NULL);
 }
@@ -370,24 +369,24 @@ txn_process_trigger(struct rlist *trigger, struct txn *txn)
 static void
 txn_complete(struct txn *txn)
 {
+	/*
+	 * Note, engine can be NULL if transaction contains
+	 * IPROTO_NOP statements only.
+	 */
 	if (txn->signature < 0) {
 		/* Undo the transaction. */
 		if (txn->engine)
 			engine_rollback(txn->engine, txn);
 		if (txn->has_triggers)
-			txn_process_trigger(&txn->on_rollback, txn);
-
+			txn_run_triggers(txn, &txn->on_rollback);
 	} else {
-		/* Accept the transaction. */
-		/*
-		 * Engine can be NULL if transaction contains IPROTO_NOP
-		 * statements only.
-		 */
+		/* Commit the transaction. */
 		if (txn->engine != NULL)
 			engine_commit(txn->engine, txn);
 		if (txn->has_triggers)
-			txn_process_trigger(&txn->on_commit, txn);
-		ev_tstamp stop_tm = ev_monotonic_now(loop());
+			txn_run_triggers(txn, &txn->on_commit);
+
+		double stop_tm = ev_monotonic_now(loop());
 		if (stop_tm - txn->start_tm > too_long_threshold) {
 			int n_rows = txn->n_new_rows + txn->n_applier_rows;
 			say_warn_ratelimited("too long WAL write: %d rows at "
@@ -404,7 +403,7 @@ txn_complete(struct txn *txn)
 	if (txn->fiber == NULL)
 		txn_free(txn);
 	else {
-		txn->done = true;
+		txn->is_done = true;
 		if (txn->fiber != fiber())
 			/* Wake a waiting fiber up. */
 			fiber_wakeup(txn->fiber);
@@ -414,8 +413,7 @@ txn_complete(struct txn *txn)
 static void
 txn_entry_done_cb(struct journal_entry *entry, void *data)
 {
-	struct txn *txn = (struct txn *)data;
-	assert(txn->entry == entry);
+	struct txn *txn = data;
 	txn->signature = entry->res;
 	txn_complete(txn);
 }
@@ -423,22 +421,22 @@ txn_entry_done_cb(struct journal_entry *entry, void *data)
 static int64_t
 txn_write_to_wal(struct txn *txn)
 {
-	assert(txn->entry == NULL);
 	assert(txn->n_new_rows + txn->n_applier_rows > 0);
 
 	/* Prepare a journal entry. */
-	txn->entry = journal_entry_new(txn->n_new_rows +
-				       txn->n_applier_rows,
-				       &txn->region,
-				       txn_entry_done_cb, txn);
-	if (txn->entry == NULL) {
+	struct journal_entry *req = journal_entry_new(txn->n_new_rows +
+						      txn->n_applier_rows,
+						      &txn->region,
+						      txn_entry_done_cb,
+						      txn);
+	if (req == NULL) {
 		txn_rollback(txn);
 		return -1;
 	}
 
 	struct txn_stmt *stmt;
-	struct xrow_header **remote_row = txn->entry->rows;
-	struct xrow_header **local_row = txn->entry->rows + txn->n_applier_rows;
+	struct xrow_header **remote_row = req->rows;
+	struct xrow_header **local_row = req->rows + txn->n_applier_rows;
 	stailq_foreach_entry(stmt, &txn->stmts, next) {
 		if (stmt->row == NULL)
 			continue; /* A read (e.g. select) request */
@@ -446,13 +444,13 @@ txn_write_to_wal(struct txn *txn)
 			*local_row++ = stmt->row;
 		else
 			*remote_row++ = stmt->row;
-		txn->entry->approx_len += xrow_approx_len(stmt->row);
+		req->approx_len += xrow_approx_len(stmt->row);
 	}
-	assert(remote_row == txn->entry->rows + txn->n_applier_rows);
+	assert(remote_row == req->rows + txn->n_applier_rows);
 	assert(local_row == remote_row + txn->n_new_rows);
 
-	/* Send entry to a journal. */
-	if (journal_write(txn->entry) < 0) {
+	/* Send the entry to the journal. */
+	if (journal_write(req) < 0) {
 		diag_set(ClientError, ER_WAL_IO);
 		diag_log();
 		return -1;
@@ -483,17 +481,13 @@ txn_prepare(struct txn *txn)
 	 * we have a bunch of IPROTO_NOP statements.
 	 */
 	if (txn->engine != NULL) {
-		if (engine_prepare(txn->engine, txn) != 0) {
+		if (engine_prepare(txn->engine, txn) != 0)
 			return -1;
-		}
 	}
 	trigger_clear(&txn->fiber_on_stop);
 	return 0;
 }
 
-/*
- * Send a transaction to a journal.
- */
 int
 txn_write(struct txn *txn)
 {
@@ -503,9 +497,9 @@ txn_write(struct txn *txn)
 	}
 
 	/*
-	 * After this transaction could not be used more
-	 * so reset corresponding key in a fiber storage.
-	*/
+	 * After this point the transaction must not be used
+	 * so reset the corresponding key in the fiber storage.
+	 */
 	fiber_set_txn(fiber(), NULL);
 	txn->start_tm = ev_monotonic_now(loop());
 	if (txn->n_new_rows + txn->n_applier_rows == 0) {
@@ -514,16 +508,7 @@ txn_write(struct txn *txn)
 		txn_complete(txn);
 		return 0;
 	}
-
-	if (txn_write_to_wal(txn) != 0) {
-		/*
-		 * After journal write the transaction would be finalized
-		 * with its journal entry finalization callback,
-		 * just return an error.
-		 */
-		return -1;
-	}
-	return 0;
+	return txn_write_to_wal(txn);
 }
 
 int
@@ -537,15 +522,16 @@ txn_commit(struct txn *txn)
 	 * In case of non-yielding journal the transaction could already
 	 * be done and there is nothing to wait in such cases.
 	 */
-	if (!txn->done) {
+	if (!txn->is_done) {
 		bool cancellable = fiber_set_cancellable(false);
 		fiber_yield();
 		fiber_set_cancellable(cancellable);
 	}
-	int res = txn->signature >= 0? 0: -1;
+	int res = txn->signature >= 0 ? 0 : -1;
 	if (res != 0)
 		diag_set(ClientError, ER_WAL_IO);
-	/* As the current fiber is waiting for the transaction so free it. */
+
+	/* Synchronous transactions are freed by the calling fiber. */
 	txn_free(txn);
 	return res;
 }
diff --git a/src/box/txn.h b/src/box/txn.h
index ddcac3bb..a19becce 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -162,6 +162,8 @@ struct txn {
 	 * already assigned LSN.
 	 */
 	int n_applier_rows;
+	/* True when transaction is processed. */
+	bool is_done;
 	/**
 	 * True if the transaction was aborted so should be
 	 * rolled back at commit.
@@ -182,6 +184,10 @@ struct txn {
 	struct engine *engine;
 	/** Engine-specific transaction data */
 	void *engine_tx;
+	/* A fiber to wake up when transaction is finished. */
+	struct fiber *fiber;
+	/** Timestampt of entry write start. */
+	double start_tm;
 	/**
 	 * Triggers on fiber yield to abort transaction for
 	 * for in-memory engine.
@@ -195,16 +201,6 @@ struct txn {
 	 /** Commit and rollback triggers */
 	struct rlist on_commit, on_rollback;
 	struct sql_txn *psql_txn;
-	/** Journal entry to control txn write. */
-	struct journal_entry *entry;
-	/** Transaction completion trigger. */
-	struct trigger entry_done;
-	/** Timestampt of entry write start. */
-	ev_tstamp start_tm;
-	/* A fiber to wake up when transaction is finished. */
-	struct fiber *fiber;
-	/* True when transaction is processed. */
-	bool done;
 };
 
 /* Pointer to the current transaction (if any) */
@@ -238,12 +234,21 @@ txn_commit(struct txn *txn);
 void
 txn_rollback(struct txn *txn);
 
+/**
+ * Submit a transaction to the journal.
+ * @pre txn == in_txn()
+ *
+ * On success 0 is returned, and the transaction will be freed upon
+ * journal write completion. Note, the journal write may still fail.
+ * To track transaction status, one is supposed to use on_commit and
+ * on_rollback triggers.
+ *
+ * On failure -1 is returned and the transaction is rolled back and
+ * freed.
+ */
 int
 txn_write(struct txn *txn);
 
-int
-txn_wait(struct txn *txn);
-
 /**
  * Roll back the transaction but keep the object around.
  * A special case for memtx transaction abort on yield. In this
diff --git a/src/box/wal.c b/src/box/wal.c
index dce5fee6..6f5d0a58 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -245,17 +245,19 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
 	return xlog_tx_commit(l);
 }
 
+/**
+ * Invoke completion callbacks of journal entries to be
+ * completed. Callbacks are invoked in strict fifo order:
+ * this ensures that, in case of rollback, requests are
+ * rolled back in strict reverse order, producing
+ * a consistent database state.
+ */
 static void
 tx_schedule_queue(struct stailq *queue)
 {
-	/*
-	 * fiber_wakeup() is faster than fiber_call() when there
-	 * are many ready fibers.
-	 */
 	struct journal_entry *req, *tmp;
-	stailq_foreach_entry_safe(req, tmp, queue, fifo) {
+	stailq_foreach_entry_safe(req, tmp, queue, fifo)
 		journal_entry_complete(req);
-	}
 }
 
 /**
@@ -1189,7 +1191,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
 	vclock_copy(&replicaset.vclock, &writer->vclock);
 	entry->res = vclock_sum(&writer->vclock);
 	journal_entry_complete(entry);
-	return entry->res;
+	return 0;
 }
 
 void



More information about the Tarantool-patches mailing list