From: Cyrill Gorcunov <gorcunov@gmail.com>
To: tml <tarantool-patches@dev.tarantool.org>
Cc: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Subject: [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine
Date: Sat, 15 Aug 2020 00:14:41 +0300 [thread overview]
Message-ID: <20200814211442.667099-8-gorcunov@gmail.com> (raw)
In-Reply-To: <20200814211442.667099-1-gorcunov@gmail.com>
Transaction processing code is very heavy simply because
trasactions are carrying various data and involves a number
of other mechanisms to procceed.
In turn, when we receive confirm or rollback packed from
another node in a cluster we just need to inspect limbo
queue and write this packed into a WAL journal. So calling
a bunch of txn engine helpers is simply waste of cycles.
Thus lets rather handle them in a special light way:
- allocate synchro_entry structure which would carry
the journal entry itself and encoded message
- process limbo queue to mark confirmed/rollback'ed
messages
- finally write this synchro_entry into a journal
Which is a way more simplier.
Part-of #5129
Suggedsted-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/applier.cc | 179 +++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 172 insertions(+), 7 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index a71516282..a1ce7a23f 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -51,8 +51,10 @@
#include "schema.h"
#include "txn.h"
#include "box.h"
+#include "xrow.h"
#include "scoped_guard.h"
#include "txn_limbo.h"
+#include "journal.h"
STRS(applier_state, applier_STATE);
@@ -841,6 +843,151 @@ applier_unlock(struct latch *latch)
latch_unlock(latch);
}
+struct synchro_entry {
+ /** An applier initiated the syncho request. */
+ struct applier *applier;
+
+ /** Encoded form of a synchro record. */
+ struct synchro_body_bin body_bin;
+
+ /** xrow to write, used by the journal engine. */
+ struct xrow_header row;
+
+ /**
+ * The journal entry itself. Note since
+ * it has unsized array it must be the
+ * last entry in the structure.
+ */
+ struct journal_entry journal_entry;
+};
+
+static void
+synchro_entry_delete(struct synchro_entry *entry)
+{
+ free(entry);
+}
+
+/**
+ * Async write journal completion.
+ */
+static void
+apply_synchro_row_cb(struct journal_entry *entry)
+{
+ assert(entry->complete_data != NULL);
+ struct synchro_entry *synchro_entry =
+ (struct synchro_entry *)entry->complete_data;
+ struct applier *applier = synchro_entry->applier;
+
+ /*
+ * We can reuse triggers, they are allocated when
+ * applier get subscribed and since packets handling
+ * is processed after the subscribtion phase the triggers
+ * will be alive.
+ */
+ if (entry->res < 0) {
+ trigger_run(&replicaset.applier.on_rollback, applier);
+ /*
+ * Restore the last written vlock value.
+ */
+ vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+ diag_set(ClientError, ER_WAL_IO);
+ diag_log();
+ } else {
+ trigger_run(&replicaset.applier.on_wal_write, applier);
+ }
+
+ synchro_entry_delete(synchro_entry);
+}
+
+/**
+ * Allocate a new synchro_entry to be passed to
+ * the journal engine in async write way.
+ */
+static struct synchro_entry *
+synchro_entry_new(struct applier *applier,
+ struct xrow_header *applier_row,
+ struct synchro_request *req)
+{
+ struct synchro_entry *entry;
+ size_t size = sizeof(*entry) + sizeof(struct xrow_header *);
+
+ /*
+ * For simplicity we use malloc here but
+ * probably should provide some cache similar
+ * to txn cache.
+ */
+ entry = (struct synchro_entry *)malloc(size);
+ if (entry == NULL) {
+ diag_set(OutOfMemory, size, "malloc", "synchro_entry");
+ return NULL;
+ }
+
+ struct journal_entry *journal_entry = &entry->journal_entry;
+ struct synchro_body_bin *body_bin = &entry->body_bin;
+ struct xrow_header *row = &entry->row;
+
+ entry->applier = applier;
+ journal_entry->rows[0] = row;
+
+ xrow_encode_synchro(row, body_bin, req);
+
+ row->lsn = applier_row->lsn;
+ row->replica_id = applier_row->replica_id;
+
+ journal_entry_create(journal_entry, 1, xrow_approx_len(row),
+ apply_synchro_row_cb, entry);
+ return entry;
+}
+
+/*
+ * Process a synchro request from incoming applier packet
+ * without using txn engine, for a speed sake.
+ */
+static int
+apply_synchro_row(struct applier *applier, struct xrow_header *row)
+{
+ assert(iproto_type_is_synchro_request(row->type));
+
+ struct latch *latch = applier_lock(row->replica_id);
+ if (vclock_get(&replicaset.applier.vclock,
+ row->replica_id) >= row->lsn) {
+ applier_unlock(latch);
+ return 0;
+ }
+
+ struct synchro_request req;
+ if (xrow_decode_synchro(row, &req) != 0)
+ goto out;
+
+ if (txn_limbo_process(&txn_limbo, &req))
+ goto out;
+
+ struct synchro_entry *entry;
+ entry = synchro_entry_new(applier, row, &req);
+ if (entry == NULL)
+ goto out;
+
+ if (journal_write_async(&entry->journal_entry) != 0) {
+ diag_set(ClientError, ER_WAL_IO);
+ goto out;
+ }
+
+ /*
+ * In case if something get wrong the journal completion
+ * handler will set the applier's vclock back to last
+ * successfully WAL written value.
+ */
+ vclock_follow(&replicaset.applier.vclock,
+ row->replica_id, row->lsn);
+ applier_unlock(latch);
+ return 0;
+
+out:
+ diag_log();
+ applier_unlock(latch);
+ return -1;
+}
+
/**
* Apply all rows in the rows queue as a single transaction.
*
@@ -1118,7 +1265,13 @@ applier_subscribe(struct applier *applier)
applier->lag = TIMEOUT_INFINITY;
- /* Register triggers to handle WAL writes and rollbacks. */
+ /*
+ * Register triggers to handle WAL writes and rollbacks.
+ *
+ * Note we use them for syncronous packets handling as well
+ * thus when changing make sure that synchro handling won't
+ * be broken.
+ */
struct trigger on_wal_write;
trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL);
trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);
@@ -1148,15 +1301,27 @@ applier_subscribe(struct applier *applier)
applier_read_tx(applier, &rows);
applier->last_row_time = ev_monotonic_now(loop());
+ struct xrow_header *row = applier_first_row(&rows);
- /*
- * In case of an heartbeat message wake a writer up
- * and check applier state.
- */
- if (applier_first_row(&rows)->lsn == 0)
+ if (row->lsn == 0) {
+ /*
+ * In case of an heartbeat message
+ * wake a writer up and check
+ * the applier state.
+ */
applier_signal_ack(applier);
- else if (applier_apply_tx(&rows) != 0)
+ } else if (iproto_type_is_synchro_request(row->type)) {
+ /*
+ * Make sure synchro messages are never reached
+ * in a batch (this is by design for simplicity
+ * sake).
+ */
+ assert(stailq_first(&rows) == stailq_last(&rows));
+ if (apply_synchro_row(applier, row) != 0)
+ diag_raise();
+ } else if (applier_apply_tx(&rows) != 0) {
diag_raise();
+ }
if (ibuf_used(ibuf) == 0)
ibuf_reset(ibuf);
--
2.26.2
next prev parent reply other threads:[~2020-08-14 21:16 UTC|newest]
Thread overview: 20+ messages / expand[flat|nested] mbox.gz Atom feed top
2020-08-14 21:14 [Tarantool-patches] [PATCH v7 0/8] qsync: write CONFIRM/ROLLBACK " Cyrill Gorcunov
2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 1/8] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 2/8] journal: add journal_entry_create helper Cyrill Gorcunov
2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 3/8] qsync: provide a binary form of syncro entries Cyrill Gorcunov
2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 4/8] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
2020-08-15 15:04 ` Vladislav Shpilevoy
2020-08-15 16:26 ` Cyrill Gorcunov
2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 5/8] applier: factor out latch locking Cyrill Gorcunov
2020-08-15 15:04 ` Vladislav Shpilevoy
2020-08-15 16:27 ` Cyrill Gorcunov
2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 6/8] applier: add shorthands to queue access Cyrill Gorcunov
2020-08-14 21:14 ` Cyrill Gorcunov [this message]
2020-08-15 15:06 ` [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine Vladislav Shpilevoy
2020-08-17 12:42 ` Cyrill Gorcunov
2020-08-17 20:49 ` Vladislav Shpilevoy
2020-08-18 8:08 ` Cyrill Gorcunov
2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 8/8] applier: drop process_synchro_row Cyrill Gorcunov
2020-08-15 8:38 ` [Tarantool-patches] [PATCH v7 9/8] txn: txn_add_redo -- drop synchro processing Cyrill Gorcunov
2020-08-15 15:06 ` Vladislav Shpilevoy
2020-08-17 8:03 ` Cyrill Gorcunov
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20200814211442.667099-8-gorcunov@gmail.com \
--to=gorcunov@gmail.com \
--cc=tarantool-patches@dev.tarantool.org \
--cc=v.shpilevoy@tarantool.org \
--subject='Re: [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine' \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox