[Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine

Cyrill Gorcunov gorcunov at gmail.com
Sat Aug 15 00:14:41 MSK 2020


Transaction processing code is very heavy simply because
trasactions are carrying various data and involves a number
of other mechanisms to procceed.

In turn, when we receive confirm or rollback packed from
another node in a cluster we just need to inspect limbo
queue and write this packed into a WAL journal. So calling
a bunch of txn engine helpers is simply waste of cycles.

Thus lets rather handle them in a special light way:

 - allocate synchro_entry structure which would carry
   the journal entry itself and encoded message
 - process limbo queue to mark confirmed/rollback'ed
   messages
 - finally write this synchro_entry into a journal

Which is a way more simplier.

Part-of #5129

Suggedsted-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
---
 src/box/applier.cc | 179 +++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 172 insertions(+), 7 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index a71516282..a1ce7a23f 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -51,8 +51,10 @@
 #include "schema.h"
 #include "txn.h"
 #include "box.h"
+#include "xrow.h"
 #include "scoped_guard.h"
 #include "txn_limbo.h"
+#include "journal.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -841,6 +843,151 @@ applier_unlock(struct latch *latch)
 	latch_unlock(latch);
 }
 
+struct synchro_entry {
+	/** An applier initiated the syncho request. */
+	struct applier *applier;
+
+	/** Encoded form of a synchro record. */
+	struct synchro_body_bin	body_bin;
+
+	/** xrow to write, used by the journal engine. */
+	struct xrow_header row;
+
+	/**
+	 * The journal entry itself. Note since
+	 * it has unsized array it must be the
+	 * last entry in the structure.
+	 */
+	struct journal_entry journal_entry;
+};
+
+static void
+synchro_entry_delete(struct synchro_entry *entry)
+{
+	free(entry);
+}
+
+/**
+ * Async write journal completion.
+ */
+static void
+apply_synchro_row_cb(struct journal_entry *entry)
+{
+	assert(entry->complete_data != NULL);
+	struct synchro_entry *synchro_entry =
+		(struct synchro_entry *)entry->complete_data;
+	struct applier *applier = synchro_entry->applier;
+
+	/*
+	 * We can reuse triggers, they are allocated when
+	 * applier get subscribed and since packets handling
+	 * is processed after the subscribtion phase the triggers
+	 * will be alive.
+	 */
+	if (entry->res < 0) {
+		trigger_run(&replicaset.applier.on_rollback, applier);
+		/*
+		 * Restore the last written vlock value.
+		 */
+		vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+	} else {
+		trigger_run(&replicaset.applier.on_wal_write, applier);
+	}
+
+	synchro_entry_delete(synchro_entry);
+}
+
+/**
+ * Allocate a new synchro_entry to be passed to
+ * the journal engine in async write way.
+ */
+static struct synchro_entry *
+synchro_entry_new(struct applier *applier,
+		  struct xrow_header *applier_row,
+		  struct synchro_request *req)
+{
+	struct synchro_entry *entry;
+	size_t size = sizeof(*entry) + sizeof(struct xrow_header *);
+
+	/*
+	 * For simplicity we use malloc here but
+	 * probably should provide some cache similar
+	 * to txn cache.
+	 */
+	entry = (struct synchro_entry *)malloc(size);
+	if (entry == NULL) {
+		diag_set(OutOfMemory, size, "malloc", "synchro_entry");
+		return NULL;
+	}
+
+	struct journal_entry *journal_entry = &entry->journal_entry;
+	struct synchro_body_bin *body_bin = &entry->body_bin;
+	struct xrow_header *row = &entry->row;
+
+	entry->applier = applier;
+	journal_entry->rows[0] = row;
+
+	xrow_encode_synchro(row, body_bin, req);
+
+	row->lsn = applier_row->lsn;
+	row->replica_id = applier_row->replica_id;
+
+	journal_entry_create(journal_entry, 1, xrow_approx_len(row),
+			     apply_synchro_row_cb, entry);
+	return entry;
+}
+
+/*
+ * Process a synchro request from incoming applier packet
+ * without using txn engine, for a speed sake.
+ */
+static int
+apply_synchro_row(struct applier *applier, struct xrow_header *row)
+{
+	assert(iproto_type_is_synchro_request(row->type));
+
+	struct latch *latch = applier_lock(row->replica_id);
+	if (vclock_get(&replicaset.applier.vclock,
+		       row->replica_id) >= row->lsn) {
+		applier_unlock(latch);
+		return 0;
+	}
+
+	struct synchro_request req;
+	if (xrow_decode_synchro(row, &req) != 0)
+		goto out;
+
+	if (txn_limbo_process(&txn_limbo, &req))
+		goto out;
+
+	struct synchro_entry *entry;
+	entry = synchro_entry_new(applier, row, &req);
+	if (entry == NULL)
+		goto out;
+
+	if (journal_write_async(&entry->journal_entry) != 0) {
+		diag_set(ClientError, ER_WAL_IO);
+		goto out;
+	}
+
+	/*
+	 * In case if something get wrong the journal completion
+	 * handler will set the applier's vclock back to last
+	 * successfully WAL written value.
+	 */
+	vclock_follow(&replicaset.applier.vclock,
+		      row->replica_id, row->lsn);
+	applier_unlock(latch);
+	return 0;
+
+out:
+	diag_log();
+	applier_unlock(latch);
+	return -1;
+}
+
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
@@ -1118,7 +1265,13 @@ applier_subscribe(struct applier *applier)
 
 	applier->lag = TIMEOUT_INFINITY;
 
-	/* Register triggers to handle WAL writes and rollbacks. */
+	/*
+	 * Register triggers to handle WAL writes and rollbacks.
+	 *
+	 * Note we use them for syncronous packets handling as well
+	 * thus when changing make sure that synchro handling won't
+	 * be broken.
+	 */
 	struct trigger on_wal_write;
 	trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL);
 	trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);
@@ -1148,15 +1301,27 @@ applier_subscribe(struct applier *applier)
 		applier_read_tx(applier, &rows);
 
 		applier->last_row_time = ev_monotonic_now(loop());
+		struct xrow_header *row = applier_first_row(&rows);
 
-		/*
-		 * In case of an heartbeat message wake a writer up
-		 * and check applier state.
-		 */
-		if (applier_first_row(&rows)->lsn == 0)
+		if (row->lsn == 0) {
+			/*
+			 * In case of an heartbeat message
+			 * wake a writer up and check
+			 * the applier state.
+			 */
 			applier_signal_ack(applier);
-		else if (applier_apply_tx(&rows) != 0)
+		} else if (iproto_type_is_synchro_request(row->type)) {
+			/*
+			 * Make sure synchro messages are never reached
+			 * in a batch (this is by design for simplicity
+			 * sake).
+			 */
+			assert(stailq_first(&rows) == stailq_last(&rows));
+			if (apply_synchro_row(applier, row) != 0)
+				diag_raise();
+		} else if (applier_apply_tx(&rows) != 0) {
 			diag_raise();
+		}
 
 		if (ibuf_used(ibuf) == 0)
 			ibuf_reset(ibuf);
-- 
2.26.2



More information about the Tarantool-patches mailing list