[Tarantool-patches] [PATCH v9 5/7] applier: process synchro requests without txn engine

Serge Petrenko sergepetrenko at tarantool.org
Fri Aug 21 11:51:13 MSK 2020


Hi! Thanks for the patch.
LGTM with one comment which’s up to you.
 
  
>Четверг, 20 августа 2020, 0:36 +03:00 от Cyrill Gorcunov <gorcunov at gmail.com>:
> 
>Transaction processing code is very heavy simply because
>transactions are carrying various data and involves a number
>of other mechanisms to proceed.
>
>In turn, when we receive confirm or rollback packed from
>another node in a cluster we just need to inspect limbo
>queue and write this packed into a WAL journal. So calling
>a bunch of txn engine helpers is simply waste of cycles.
>
>Thus lets rather handle them in a special light way:
>
> - allocate synchro_entry structure which would carry
>   the journal entry itself and encoded message
> - process limbo queue to mark confirmed/rollback'ed
>   messages
> - finally write this synchro_entry into a journal
>
>Which is a way simplier.
>
>Part-of #5129
>
>Suggedsted-by: Vladislav Shpilevoy < v.shpilevoy at tarantool.org >
>Co-developed-by: Vladislav Shpilevoy < v.shpilevoy at tarantool.org >
>Signed-off-by: Cyrill Gorcunov < gorcunov at gmail.com >
>---
> src/box/applier.cc | 200 +++++++++++++++++++++++++++++++++------------
> 1 file changed, 148 insertions(+), 52 deletions(-)
>
>diff --git a/src/box/applier.cc b/src/box/applier.cc
>index 1387d518c..c1d07ca54 100644
>--- a/src/box/applier.cc
>+++ b/src/box/applier.cc
>@@ -51,8 +51,10 @@
> #include "schema.h"
> #include "txn.h"
> #include "box.h"
>+#include "xrow.h"
> #include "scoped_guard.h"
> #include "txn_limbo.h"
>+#include "journal.h"
> 
> STRS(applier_state, applier_STATE);
> 
>@@ -268,45 +270,11 @@ process_nop(struct request *request)
>  return txn_commit_stmt(txn, request);
> }
> 
>-/*
>- * CONFIRM/ROLLBACK rows aren't dml requests and require special
>- * handling: instead of performing some operations on spaces,
>- * processing these requests requires txn_limbo to either confirm
>- * or rollback some of its entries.
>- */
>-static int
>-process_synchro_row(struct request *request)
>-{
>- assert(iproto_type_is_synchro_request(request->header->type));
>- struct txn *txn = in_txn();
>-
>- struct synchro_request syn_req;
>- if (xrow_decode_synchro(request->header, &syn_req) != 0)
>- return -1;
>- assert(txn->n_applier_rows == 0);
>- /*
>- * This is not really a transaction. It just uses txn API
>- * to put the data into WAL. And obviously it should not
>- * go to the limbo and block on the very same sync
>- * transaction which it tries to confirm now.
>- */
>- txn_set_flag(txn, TXN_FORCE_ASYNC);
>-
>- if (txn_begin_stmt(txn, NULL) != 0)
>- return -1;
>- if (txn_commit_stmt(txn, request) != 0)
>- return -1;
>- return txn_limbo_process(&txn_limbo, &syn_req);
>-}
>-
> static int
> apply_row(struct xrow_header *row)
> {
>  struct request request;
>- if (iproto_type_is_synchro_request(row->type)) {
>- request.header = row;
>- return process_synchro_row(&request);
>- }
>+ assert(!iproto_type_is_synchro_request(row->type));
>  if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
>  return -1;
>  if (request.type == IPROTO_NOP)
>@@ -753,19 +721,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
>  next)->row.is_commit);
> }
> 
>-static int
>-applier_txn_rollback_cb(struct trigger *trigger, void *event)
>+static void
>+applier_rollback_by_wal_io(void)
> {
>- (void) trigger;
>- struct txn *txn = (struct txn *) event;
>- /*
>- * Synchronous transaction rollback due to receiving a
>- * ROLLBACK entry is a normal event and requires no
>- * special handling.
>- */
>- if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK)
>- return 0;
>-
>  /*
>  * Setup shared applier diagnostic area.
>  *
>@@ -774,9 +732,9 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
>  * diag use per-applier diag instead all the time
>  * (which actually already present in the structure).
>  *
>- * But remember that transactions are asynchronous
>- * and rollback may happen a way latter after it
>- * passed to the journal engine.
>+ * But remember that WAL writes are asynchronous and
>+ * rollback may happen a way later after it was passed to
>+ * the journal engine.
>  */
>  diag_set(ClientError, ER_WAL_IO);
>  diag_set_error(&replicaset.applier.diag,
>@@ -787,6 +745,20 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
> 
>  /* Rollback applier vclock to the committed one. */
>  vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
>+}
>+
>+static int
>+applier_txn_rollback_cb(struct trigger *trigger, void *event)
>+{
>+ (void) trigger;
>+ struct txn *txn = (struct txn *) event;
>+ /*
>+ * Synchronous transaction rollback due to receiving a
>+ * ROLLBACK entry is a normal event and requires no
>+ * special handling.
>+ */
>+ if (txn->signature != TXN_SIGNATURE_SYNC_ROLLBACK)
>+ applier_rollback_by_wal_io();
>  return 0;
> }
> 
>@@ -800,6 +772,110 @@ applier_txn_wal_write_cb(struct trigger *trigger, void *event)
>  return 0;
> }
> 
>+struct synchro_entry {
>+ /** Encoded form of a synchro record. */
>+ struct synchro_body_bin body_bin;
>+
>+ /** xrow to write, used by the journal engine. */
>+ struct xrow_header row;
>+
>+ /**
>+ * The journal entry itself. Note since
>+ * it has unsized array it must be the
>+ * last entry in the structure.
>+ */
>+ struct journal_entry journal_entry;
>+};
>+
>+static void
>+synchro_entry_delete(struct synchro_entry *entry)
>+{
>+ free(entry);
>+}
>+
>+/**
>+ * Async write journal completion.
>+ */
>+static void
>+apply_synchro_row_cb(struct journal_entry *entry)
>+{
>+ assert(entry->complete_data != NULL);
>+ struct synchro_entry *synchro_entry =
>+ (struct synchro_entry *)entry->complete_data;
>+ if (entry->res < 0)
>+ applier_rollback_by_wal_io();
>+ else
>+ trigger_run(&replicaset.applier.on_wal_write, NULL);
>+
>+ synchro_entry_delete(synchro_entry);
>+}
>+
>+/**
>+ * Allocate a new synchro_entry to be passed to
>+ * the journal engine in async write way.
>+ */
>+static struct synchro_entry *
>+synchro_entry_new(struct xrow_header *applier_row,
>+ struct synchro_request *req)
>+{
>+ struct synchro_entry *entry;
>+ size_t size = sizeof(*entry) + sizeof(struct xrow_header *);
>+
>+ /*
>+ * For simplicity we use malloc here but
>+ * probably should provide some cache similar
>+ * to txn cache.
>+ */
>+ entry = (struct synchro_entry *)malloc(size);
>+ if (entry == NULL) {
>+ diag_set(OutOfMemory, size, "malloc", "synchro_entry");
>+ return NULL;
>+ }
>+
>+ struct journal_entry *journal_entry = &entry->journal_entry;
>+ struct synchro_body_bin *body_bin = &entry->body_bin;
>+ struct xrow_header *row = &entry->row;
>+
>+ journal_entry->rows[0] = row;
>+
>+ xrow_encode_synchro(row, body_bin, req);
>+
>+ row->lsn = applier_row->lsn;
>+ row->replica_id = applier_row->replica_id;
>+
>+ journal_entry_create(journal_entry, 1, xrow_approx_len(row),
>+ apply_synchro_row_cb, entry);
>+ return entry;
>+}
>+
>+/** Process a synchro request. */
>+static int
>+apply_synchro_row(struct xrow_header *row)
>+{
>+ assert(iproto_type_is_synchro_request(row->type));
>+
>+ struct synchro_request req;
>+ if (xrow_decode_synchro(row, &req) != 0)
>+ goto err;
>+
>+ if (txn_limbo_process(&txn_limbo, &req))
>+ goto err;
>+
>+ struct synchro_entry *entry;
>+ entry = synchro_entry_new(row, &req);
>+ if (entry == NULL)
>+ goto err;
>+
>+ if (journal_write_async(&entry->journal_entry) != 0) {
>+ diag_set(ClientError, ER_WAL_IO);
>+ goto err;
>+ }
>+ return 0;
>+err:
>+ diag_log();
>+ return -1;
>+}
>+
> /**
>  * Apply all rows in the rows queue as a single transaction.
>  *
>@@ -847,13 +923,26 @@ applier_apply_tx(struct stailq *rows)
>  }
>  }
> 
>+ if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
>+ /*
>+ * Synchro messages are not transactions, in terms
>+ * of DML. Always sent and written isolated from
>+ * each other.
>+ */
>+ assert(first_row == last_row);
>+ if (apply_synchro_row(first_row) != 0)
>+ diag_raise();
>+ goto success;
>+ }
>+
>  /**
>  * Explicitly begin the transaction so that we can
>  * control fiber->gc life cycle and, in case of apply
>  * conflict safely access failed xrow object and allocate
>  * IPROTO_NOP on gc.
>  */
>- struct txn *txn = txn_begin();
>+ struct txn *txn;
>+ txn = txn_begin();
 
Why this change?
 
>  struct applier_tx_row *item;
>  if (txn == NULL) {
>  latch_unlock(latch);
>@@ -922,6 +1011,7 @@ applier_apply_tx(struct stailq *rows)
>  if (txn_commit_async(txn) < 0)
>  goto fail;
> 
>+success:
>  /*
>  * The transaction was sent to journal so promote vclock.
>  *
>@@ -1089,7 +1179,13 @@ applier_subscribe(struct applier *applier)
> 
>  applier->lag = TIMEOUT_INFINITY;
> 
>- /* Register triggers to handle WAL writes and rollbacks. */
>+ /*
>+ * Register triggers to handle WAL writes and rollbacks.
>+ *
>+ * Note we use them for syncronous packets handling as well
>+ * thus when changing make sure that synchro handling won't
>+ * be broken.
>+ */
>  struct trigger on_wal_write;
>  trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL);
>  trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);
>--
>2.26.2
> 
 
--
Serge Petrenko
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20200821/9be9779e/attachment.html>


More information about the Tarantool-patches mailing list