[Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine
Cyrill Gorcunov
gorcunov at gmail.com
Sat Aug 15 00:14:41 MSK 2020
Transaction processing code is very heavy simply because
trasactions are carrying various data and involves a number
of other mechanisms to procceed.
In turn, when we receive confirm or rollback packed from
another node in a cluster we just need to inspect limbo
queue and write this packed into a WAL journal. So calling
a bunch of txn engine helpers is simply waste of cycles.
Thus lets rather handle them in a special light way:
- allocate synchro_entry structure which would carry
the journal entry itself and encoded message
- process limbo queue to mark confirmed/rollback'ed
messages
- finally write this synchro_entry into a journal
Which is a way more simplier.
Part-of #5129
Suggedsted-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
---
src/box/applier.cc | 179 +++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 172 insertions(+), 7 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index a71516282..a1ce7a23f 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -51,8 +51,10 @@
#include "schema.h"
#include "txn.h"
#include "box.h"
+#include "xrow.h"
#include "scoped_guard.h"
#include "txn_limbo.h"
+#include "journal.h"
STRS(applier_state, applier_STATE);
@@ -841,6 +843,151 @@ applier_unlock(struct latch *latch)
latch_unlock(latch);
}
+struct synchro_entry {
+ /** An applier initiated the syncho request. */
+ struct applier *applier;
+
+ /** Encoded form of a synchro record. */
+ struct synchro_body_bin body_bin;
+
+ /** xrow to write, used by the journal engine. */
+ struct xrow_header row;
+
+ /**
+ * The journal entry itself. Note since
+ * it has unsized array it must be the
+ * last entry in the structure.
+ */
+ struct journal_entry journal_entry;
+};
+
+static void
+synchro_entry_delete(struct synchro_entry *entry)
+{
+ free(entry);
+}
+
+/**
+ * Async write journal completion.
+ */
+static void
+apply_synchro_row_cb(struct journal_entry *entry)
+{
+ assert(entry->complete_data != NULL);
+ struct synchro_entry *synchro_entry =
+ (struct synchro_entry *)entry->complete_data;
+ struct applier *applier = synchro_entry->applier;
+
+ /*
+ * We can reuse triggers, they are allocated when
+ * applier get subscribed and since packets handling
+ * is processed after the subscribtion phase the triggers
+ * will be alive.
+ */
+ if (entry->res < 0) {
+ trigger_run(&replicaset.applier.on_rollback, applier);
+ /*
+ * Restore the last written vlock value.
+ */
+ vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+ diag_set(ClientError, ER_WAL_IO);
+ diag_log();
+ } else {
+ trigger_run(&replicaset.applier.on_wal_write, applier);
+ }
+
+ synchro_entry_delete(synchro_entry);
+}
+
+/**
+ * Allocate a new synchro_entry to be passed to
+ * the journal engine in async write way.
+ */
+static struct synchro_entry *
+synchro_entry_new(struct applier *applier,
+ struct xrow_header *applier_row,
+ struct synchro_request *req)
+{
+ struct synchro_entry *entry;
+ size_t size = sizeof(*entry) + sizeof(struct xrow_header *);
+
+ /*
+ * For simplicity we use malloc here but
+ * probably should provide some cache similar
+ * to txn cache.
+ */
+ entry = (struct synchro_entry *)malloc(size);
+ if (entry == NULL) {
+ diag_set(OutOfMemory, size, "malloc", "synchro_entry");
+ return NULL;
+ }
+
+ struct journal_entry *journal_entry = &entry->journal_entry;
+ struct synchro_body_bin *body_bin = &entry->body_bin;
+ struct xrow_header *row = &entry->row;
+
+ entry->applier = applier;
+ journal_entry->rows[0] = row;
+
+ xrow_encode_synchro(row, body_bin, req);
+
+ row->lsn = applier_row->lsn;
+ row->replica_id = applier_row->replica_id;
+
+ journal_entry_create(journal_entry, 1, xrow_approx_len(row),
+ apply_synchro_row_cb, entry);
+ return entry;
+}
+
+/*
+ * Process a synchro request from incoming applier packet
+ * without using txn engine, for a speed sake.
+ */
+static int
+apply_synchro_row(struct applier *applier, struct xrow_header *row)
+{
+ assert(iproto_type_is_synchro_request(row->type));
+
+ struct latch *latch = applier_lock(row->replica_id);
+ if (vclock_get(&replicaset.applier.vclock,
+ row->replica_id) >= row->lsn) {
+ applier_unlock(latch);
+ return 0;
+ }
+
+ struct synchro_request req;
+ if (xrow_decode_synchro(row, &req) != 0)
+ goto out;
+
+ if (txn_limbo_process(&txn_limbo, &req))
+ goto out;
+
+ struct synchro_entry *entry;
+ entry = synchro_entry_new(applier, row, &req);
+ if (entry == NULL)
+ goto out;
+
+ if (journal_write_async(&entry->journal_entry) != 0) {
+ diag_set(ClientError, ER_WAL_IO);
+ goto out;
+ }
+
+ /*
+ * In case if something get wrong the journal completion
+ * handler will set the applier's vclock back to last
+ * successfully WAL written value.
+ */
+ vclock_follow(&replicaset.applier.vclock,
+ row->replica_id, row->lsn);
+ applier_unlock(latch);
+ return 0;
+
+out:
+ diag_log();
+ applier_unlock(latch);
+ return -1;
+}
+
/**
* Apply all rows in the rows queue as a single transaction.
*
@@ -1118,7 +1265,13 @@ applier_subscribe(struct applier *applier)
applier->lag = TIMEOUT_INFINITY;
- /* Register triggers to handle WAL writes and rollbacks. */
+ /*
+ * Register triggers to handle WAL writes and rollbacks.
+ *
+ * Note we use them for syncronous packets handling as well
+ * thus when changing make sure that synchro handling won't
+ * be broken.
+ */
struct trigger on_wal_write;
trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL);
trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);
@@ -1148,15 +1301,27 @@ applier_subscribe(struct applier *applier)
applier_read_tx(applier, &rows);
applier->last_row_time = ev_monotonic_now(loop());
+ struct xrow_header *row = applier_first_row(&rows);
- /*
- * In case of an heartbeat message wake a writer up
- * and check applier state.
- */
- if (applier_first_row(&rows)->lsn == 0)
+ if (row->lsn == 0) {
+ /*
+ * In case of an heartbeat message
+ * wake a writer up and check
+ * the applier state.
+ */
applier_signal_ack(applier);
- else if (applier_apply_tx(&rows) != 0)
+ } else if (iproto_type_is_synchro_request(row->type)) {
+ /*
+ * Make sure synchro messages are never reached
+ * in a batch (this is by design for simplicity
+ * sake).
+ */
+ assert(stailq_first(&rows) == stailq_last(&rows));
+ if (apply_synchro_row(applier, row) != 0)
+ diag_raise();
+ } else if (applier_apply_tx(&rows) != 0) {
diag_raise();
+ }
if (ibuf_used(ibuf) == 0)
ibuf_reset(ibuf);
--
2.26.2
More information about the Tarantool-patches
mailing list