Tarantool development patches archive
 help / color / mirror / Atom feed
From: Cyrill Gorcunov <gorcunov@gmail.com>
To: tml <tarantool-patches@dev.tarantool.org>
Cc: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Subject: [Tarantool-patches] [PATCH v8 7/9] applier: process synchro requests without txn engine
Date: Mon, 17 Aug 2020 16:39:16 +0300	[thread overview]
Message-ID: <20200817133918.875558-8-gorcunov@gmail.com> (raw)
In-Reply-To: <20200817133918.875558-1-gorcunov@gmail.com>

Transaction processing code is very heavy simply because
transactions are carrying various data and involves a number
of other mechanisms to proceed.

In turn, when we receive confirm or rollback packed from
another node in a cluster we just need to inspect limbo
queue and write this packed into a WAL journal. So calling
a bunch of txn engine helpers is simply waste of cycles.

Thus lets rather handle them in a special light way:

 - allocate synchro_entry structure which would carry
   the journal entry itself and encoded message
 - process limbo queue to mark confirmed/rollback'ed
   messages
 - finally write this synchro_entry into a journal

Which is a way simplier.

Part-of #5129

Suggedsted-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Co-developed-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/applier.cc | 169 +++++++++++++++++++++++++++++++++++++++------
 1 file changed, 149 insertions(+), 20 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 860a18681..83f6da461 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -51,8 +51,10 @@
 #include "schema.h"
 #include "txn.h"
 #include "box.h"
+#include "xrow.h"
 #include "scoped_guard.h"
 #include "txn_limbo.h"
+#include "journal.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -772,19 +774,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
 	} while (!applier_last_row(rows)->is_commit);
 }
 
-static int
-applier_txn_rollback_cb(struct trigger *trigger, void *event)
+static void
+applier_rollback_by_wal_io(void)
 {
-	(void) trigger;
-	struct txn *txn = (struct txn *) event;
-	/*
-	 * Synchronous transaction rollback due to receiving a
-	 * ROLLBACK entry is a normal event and requires no
-	 * special handling.
-	 */
-	if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK)
-		return 0;
-
 	/*
 	 * Setup shared applier diagnostic area.
 	 *
@@ -793,19 +785,32 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
 	 * diag use per-applier diag instead all the time
 	 * (which actually already present in the structure).
 	 *
-	 * But remember that transactions are asynchronous
-	 * and rollback may happen a way latter after it
-	 * passed to the journal engine.
+	 * But remember that WAL writes are asynchronous and
+	 * rollback may happen a way later after it was passed to
+	 * the journal engine.
 	 */
 	diag_set(ClientError, ER_WAL_IO);
 	diag_set_error(&replicaset.applier.diag,
 		       diag_last_error(diag_get()));
 
-	/* Broadcast the rollback event across all appliers. */
-	trigger_run(&replicaset.applier.on_rollback, event);
-
+	/* Broadcast the rollback across all appliers. */
+	trigger_run(&replicaset.applier.on_rollback, NULL);
 	/* Rollback applier vclock to the committed one. */
 	vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+}
+
+static int
+applier_txn_rollback_cb(struct trigger *trigger, void *event)
+{
+	(void) trigger;
+	struct txn *txn = (struct txn *) event;
+	/*
+	 * Synchronous transaction rollback due to receiving a
+	 * ROLLBACK entry is a normal event and requires no
+	 * special handling.
+	 */
+	if (txn->signature != TXN_SIGNATURE_SYNC_ROLLBACK)
+		applier_rollback_by_wal_io();
 	return 0;
 }
 
@@ -818,6 +823,110 @@ applier_txn_wal_write_cb(struct trigger *trigger, void *event)
 	return 0;
 }
 
+struct synchro_entry {
+	/** Encoded form of a synchro record. */
+	struct synchro_body_bin	body_bin;
+
+	/** xrow to write, used by the journal engine. */
+	struct xrow_header row;
+
+	/**
+	 * The journal entry itself. Note since
+	 * it has unsized array it must be the
+	 * last entry in the structure.
+	 */
+	struct journal_entry journal_entry;
+};
+
+static void
+synchro_entry_delete(struct synchro_entry *entry)
+{
+	free(entry);
+}
+
+/**
+ * Async write journal completion.
+ */
+static void
+apply_synchro_row_cb(struct journal_entry *entry)
+{
+	assert(entry->complete_data != NULL);
+	struct synchro_entry *synchro_entry =
+		(struct synchro_entry *)entry->complete_data;
+	if (entry->res < 0)
+		applier_rollback_by_wal_io();
+	else
+		trigger_run(&replicaset.applier.on_wal_write, NULL);
+
+	synchro_entry_delete(synchro_entry);
+}
+
+/**
+ * Allocate a new synchro_entry to be passed to
+ * the journal engine in async write way.
+ */
+static struct synchro_entry *
+synchro_entry_new(struct xrow_header *applier_row,
+		  struct synchro_request *req)
+{
+	struct synchro_entry *entry;
+	size_t size = sizeof(*entry) + sizeof(struct xrow_header *);
+
+	/*
+	 * For simplicity we use malloc here but
+	 * probably should provide some cache similar
+	 * to txn cache.
+	 */
+	entry = (struct synchro_entry *)malloc(size);
+	if (entry == NULL) {
+		diag_set(OutOfMemory, size, "malloc", "synchro_entry");
+		return NULL;
+	}
+
+	struct journal_entry *journal_entry = &entry->journal_entry;
+	struct synchro_body_bin *body_bin = &entry->body_bin;
+	struct xrow_header *row = &entry->row;
+
+	journal_entry->rows[0] = row;
+
+	xrow_encode_synchro(row, body_bin, req);
+
+	row->lsn = applier_row->lsn;
+	row->replica_id = applier_row->replica_id;
+
+	journal_entry_create(journal_entry, 1, xrow_approx_len(row),
+			     apply_synchro_row_cb, entry);
+	return entry;
+}
+
+/** Process a synchro request. */
+static int
+apply_synchro_row(struct xrow_header *row)
+{
+	assert(iproto_type_is_synchro_request(row->type));
+
+	struct synchro_request req;
+	if (xrow_decode_synchro(row, &req) != 0)
+		goto err;
+
+	if (txn_limbo_process(&txn_limbo, &req))
+		goto err;
+
+	struct synchro_entry *entry;
+	entry = synchro_entry_new(row, &req);
+	if (entry == NULL)
+		goto err;
+
+	if (journal_write_async(&entry->journal_entry) != 0) {
+		diag_set(ClientError, ER_WAL_IO);
+		goto err;
+	}
+	return 0;
+err:
+	diag_log();
+	return -1;
+}
+
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
@@ -861,13 +970,26 @@ applier_apply_tx(struct stailq *rows)
 		}
 	}
 
+	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
+		/*
+		 * Synchro messages are not transactions, in terms
+		 * of DML. Always sent and written isolated from
+		 * each other.
+		 */
+		assert(first_row == last_row);
+		if (apply_synchro_row(first_row) != 0)
+			diag_raise();
+		goto success;
+	}
+
 	/**
 	 * Explicitly begin the transaction so that we can
 	 * control fiber->gc life cycle and, in case of apply
 	 * conflict safely access failed xrow object and allocate
 	 * IPROTO_NOP on gc.
 	 */
-	struct txn *txn = txn_begin();
+	struct txn *txn;
+	txn = txn_begin();
 	struct applier_tx_row *item;
 	if (txn == NULL) {
 		latch_unlock(latch);
@@ -936,6 +1058,7 @@ applier_apply_tx(struct stailq *rows)
 	if (txn_commit_async(txn) < 0)
 		goto fail;
 
+success:
 	/*
 	 * The transaction was sent to journal so promote vclock.
 	 *
@@ -1103,7 +1226,13 @@ applier_subscribe(struct applier *applier)
 
 	applier->lag = TIMEOUT_INFINITY;
 
-	/* Register triggers to handle WAL writes and rollbacks. */
+	/*
+	 * Register triggers to handle WAL writes and rollbacks.
+	 *
+	 * Note we use them for syncronous packets handling as well
+	 * thus when changing make sure that synchro handling won't
+	 * be broken.
+	 */
 	struct trigger on_wal_write;
 	trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL);
 	trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);
-- 
2.26.2

  parent reply	other threads:[~2020-08-17 13:40 UTC|newest]

Thread overview: 20+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-08-17 13:39 [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK " Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 1/9] xrow: introduce struct synchro_request Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 2/9] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 3/9] journal: add journal_entry_create helper Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 4/9] qsync: provide a binary form of syncro entries Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 5/9] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
2020-08-17 20:49   ` Vladislav Shpilevoy
2020-08-17 22:16     ` Cyrill Gorcunov
2020-08-17 22:23     ` Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 6/9] applier: add shorthands to queue access Cyrill Gorcunov
2020-08-17 20:49   ` Vladislav Shpilevoy
2020-08-17 22:14     ` Cyrill Gorcunov
2020-08-18 19:18       ` Vladislav Shpilevoy
2020-08-19 20:37         ` Vladislav Shpilevoy
2020-08-19 20:49           ` Cyrill Gorcunov
2020-08-17 13:39 ` Cyrill Gorcunov [this message]
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 8/9] txn: txn_add_redo -- drop synchro processing Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 9/9] xrow: drop xrow_header_dup_body Cyrill Gorcunov
2020-08-17 21:24 ` [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine Vladislav Shpilevoy
2020-08-17 21:54   ` Cyrill Gorcunov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200817133918.875558-8-gorcunov@gmail.com \
    --to=gorcunov@gmail.com \
    --cc=tarantool-patches@dev.tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH v8 7/9] applier: process synchro requests without txn engine' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox