Hi! Thanks for the patch.
LGTM with one comment which’s up to you.
 
 
Четверг, 20 августа 2020, 0:36 +03:00 от Cyrill Gorcunov <gorcunov@gmail.com>:
 
Transaction processing code is very heavy simply because
transactions are carrying various data and involves a number
of other mechanisms to proceed.

In turn, when we receive confirm or rollback packed from
another node in a cluster we just need to inspect limbo
queue and write this packed into a WAL journal. So calling
a bunch of txn engine helpers is simply waste of cycles.

Thus lets rather handle them in a special light way:

 - allocate synchro_entry structure which would carry
   the journal entry itself and encoded message
 - process limbo queue to mark confirmed/rollback'ed
   messages
 - finally write this synchro_entry into a journal

Which is a way simplier.

Part-of #5129

Suggedsted-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Co-developed-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/applier.cc | 200 +++++++++++++++++++++++++++++++++------------
 1 file changed, 148 insertions(+), 52 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 1387d518c..c1d07ca54 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -51,8 +51,10 @@
 #include "schema.h"
 #include "txn.h"
 #include "box.h"
+#include "xrow.h"
 #include "scoped_guard.h"
 #include "txn_limbo.h"
+#include "journal.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -268,45 +270,11 @@ process_nop(struct request *request)
  return txn_commit_stmt(txn, request);
 }
 
-/*
- * CONFIRM/ROLLBACK rows aren't dml requests and require special
- * handling: instead of performing some operations on spaces,
- * processing these requests requires txn_limbo to either confirm
- * or rollback some of its entries.
- */
-static int
-process_synchro_row(struct request *request)
-{
- assert(iproto_type_is_synchro_request(request->header->type));
- struct txn *txn = in_txn();
-
- struct synchro_request syn_req;
- if (xrow_decode_synchro(request->header, &syn_req) != 0)
- return -1;
- assert(txn->n_applier_rows == 0);
- /*
- * This is not really a transaction. It just uses txn API
- * to put the data into WAL. And obviously it should not
- * go to the limbo and block on the very same sync
- * transaction which it tries to confirm now.
- */
- txn_set_flag(txn, TXN_FORCE_ASYNC);
-
- if (txn_begin_stmt(txn, NULL) != 0)
- return -1;
- if (txn_commit_stmt(txn, request) != 0)
- return -1;
- return txn_limbo_process(&txn_limbo, &syn_req);
-}
-
 static int
 apply_row(struct xrow_header *row)
 {
  struct request request;
- if (iproto_type_is_synchro_request(row->type)) {
- request.header = row;
- return process_synchro_row(&request);
- }
+ assert(!iproto_type_is_synchro_request(row->type));
  if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
  return -1;
  if (request.type == IPROTO_NOP)
@@ -753,19 +721,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
  next)->row.is_commit);
 }
 
-static int
-applier_txn_rollback_cb(struct trigger *trigger, void *event)
+static void
+applier_rollback_by_wal_io(void)
 {
- (void) trigger;
- struct txn *txn = (struct txn *) event;
- /*
- * Synchronous transaction rollback due to receiving a
- * ROLLBACK entry is a normal event and requires no
- * special handling.
- */
- if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK)
- return 0;
-
  /*
  * Setup shared applier diagnostic area.
  *
@@ -774,9 +732,9 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
  * diag use per-applier diag instead all the time
  * (which actually already present in the structure).
  *
- * But remember that transactions are asynchronous
- * and rollback may happen a way latter after it
- * passed to the journal engine.
+ * But remember that WAL writes are asynchronous and
+ * rollback may happen a way later after it was passed to
+ * the journal engine.
  */
  diag_set(ClientError, ER_WAL_IO);
  diag_set_error(&replicaset.applier.diag,
@@ -787,6 +745,20 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
 
  /* Rollback applier vclock to the committed one. */
  vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+}
+
+static int
+applier_txn_rollback_cb(struct trigger *trigger, void *event)
+{
+ (void) trigger;
+ struct txn *txn = (struct txn *) event;
+ /*
+ * Synchronous transaction rollback due to receiving a
+ * ROLLBACK entry is a normal event and requires no
+ * special handling.
+ */
+ if (txn->signature != TXN_SIGNATURE_SYNC_ROLLBACK)
+ applier_rollback_by_wal_io();
  return 0;
 }
 
@@ -800,6 +772,110 @@ applier_txn_wal_write_cb(struct trigger *trigger, void *event)
  return 0;
 }
 
+struct synchro_entry {
+ /** Encoded form of a synchro record. */
+ struct synchro_body_bin body_bin;
+
+ /** xrow to write, used by the journal engine. */
+ struct xrow_header row;
+
+ /**
+ * The journal entry itself. Note since
+ * it has unsized array it must be the
+ * last entry in the structure.
+ */
+ struct journal_entry journal_entry;
+};
+
+static void
+synchro_entry_delete(struct synchro_entry *entry)
+{
+ free(entry);
+}
+
+/**
+ * Async write journal completion.
+ */
+static void
+apply_synchro_row_cb(struct journal_entry *entry)
+{
+ assert(entry->complete_data != NULL);
+ struct synchro_entry *synchro_entry =
+ (struct synchro_entry *)entry->complete_data;
+ if (entry->res < 0)
+ applier_rollback_by_wal_io();
+ else
+ trigger_run(&replicaset.applier.on_wal_write, NULL);
+
+ synchro_entry_delete(synchro_entry);
+}
+
+/**
+ * Allocate a new synchro_entry to be passed to
+ * the journal engine in async write way.
+ */
+static struct synchro_entry *
+synchro_entry_new(struct xrow_header *applier_row,
+ struct synchro_request *req)
+{
+ struct synchro_entry *entry;
+ size_t size = sizeof(*entry) + sizeof(struct xrow_header *);
+
+ /*
+ * For simplicity we use malloc here but
+ * probably should provide some cache similar
+ * to txn cache.
+ */
+ entry = (struct synchro_entry *)malloc(size);
+ if (entry == NULL) {
+ diag_set(OutOfMemory, size, "malloc", "synchro_entry");
+ return NULL;
+ }
+
+ struct journal_entry *journal_entry = &entry->journal_entry;
+ struct synchro_body_bin *body_bin = &entry->body_bin;
+ struct xrow_header *row = &entry->row;
+
+ journal_entry->rows[0] = row;
+
+ xrow_encode_synchro(row, body_bin, req);
+
+ row->lsn = applier_row->lsn;
+ row->replica_id = applier_row->replica_id;
+
+ journal_entry_create(journal_entry, 1, xrow_approx_len(row),
+ apply_synchro_row_cb, entry);
+ return entry;
+}
+
+/** Process a synchro request. */
+static int
+apply_synchro_row(struct xrow_header *row)
+{
+ assert(iproto_type_is_synchro_request(row->type));
+
+ struct synchro_request req;
+ if (xrow_decode_synchro(row, &req) != 0)
+ goto err;
+
+ if (txn_limbo_process(&txn_limbo, &req))
+ goto err;
+
+ struct synchro_entry *entry;
+ entry = synchro_entry_new(row, &req);
+ if (entry == NULL)
+ goto err;
+
+ if (journal_write_async(&entry->journal_entry) != 0) {
+ diag_set(ClientError, ER_WAL_IO);
+ goto err;
+ }
+ return 0;
+err:
+ diag_log();
+ return -1;
+}
+
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
@@ -847,13 +923,26 @@ applier_apply_tx(struct stailq *rows)
  }
  }
 
+ if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
+ /*
+ * Synchro messages are not transactions, in terms
+ * of DML. Always sent and written isolated from
+ * each other.
+ */
+ assert(first_row == last_row);
+ if (apply_synchro_row(first_row) != 0)
+ diag_raise();
+ goto success;
+ }
+
  /**
  * Explicitly begin the transaction so that we can
  * control fiber->gc life cycle and, in case of apply
  * conflict safely access failed xrow object and allocate
  * IPROTO_NOP on gc.
  */
- struct txn *txn = txn_begin();
+ struct txn *txn;
+ txn = txn_begin();
 
Why this change?
 
  struct applier_tx_row *item;
  if (txn == NULL) {
  latch_unlock(latch);
@@ -922,6 +1011,7 @@ applier_apply_tx(struct stailq *rows)
  if (txn_commit_async(txn) < 0)
  goto fail;
 
+success:
  /*
  * The transaction was sent to journal so promote vclock.
  *
@@ -1089,7 +1179,13 @@ applier_subscribe(struct applier *applier)
 
  applier->lag = TIMEOUT_INFINITY;
 
- /* Register triggers to handle WAL writes and rollbacks. */
+ /*
+ * Register triggers to handle WAL writes and rollbacks.
+ *
+ * Note we use them for syncronous packets handling as well
+ * thus when changing make sure that synchro handling won't
+ * be broken.
+ */
  struct trigger on_wal_write;
  trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL);
  trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);
--
2.26.2
 
 
--
Serge Petrenko