<HTML><BODY><div>Hi! Thanks for the patch.</div><div>LGTM with one comment which’s up to you.</div><div> </div><div> </div><blockquote style="border-left:1px solid #0857A6; margin:10px; padding:0 0 0 10px;">Четверг, 20 августа 2020, 0:36 +03:00 от Cyrill Gorcunov <gorcunov@gmail.com>:<br> <div id=""><div class="js-helper js-readmsg-msg"><style type="text/css"></style><div><div id="style_15978729611856633093_BODY">Transaction processing code is very heavy simply because<br>transactions are carrying various data and involves a number<br>of other mechanisms to proceed.<br><br>In turn, when we receive confirm or rollback packed from<br>another node in a cluster we just need to inspect limbo<br>queue and write this packed into a WAL journal. So calling<br>a bunch of txn engine helpers is simply waste of cycles.<br><br>Thus lets rather handle them in a special light way:<br><br> - allocate synchro_entry structure which would carry<br>   the journal entry itself and encoded message<br> - process limbo queue to mark confirmed/rollback'ed<br>   messages<br> - finally write this synchro_entry into a journal<br><br>Which is a way simplier.<br><br>Part-of #5129<br><br>Suggedsted-by: Vladislav Shpilevoy <<a href="/compose?To=v.shpilevoy@tarantool.org">v.shpilevoy@tarantool.org</a>><br>Co-developed-by: Vladislav Shpilevoy <<a href="/compose?To=v.shpilevoy@tarantool.org">v.shpilevoy@tarantool.org</a>><br>Signed-off-by: Cyrill Gorcunov <<a href="/compose?To=gorcunov@gmail.com">gorcunov@gmail.com</a>><br>---<br> src/box/applier.cc | 200 +++++++++++++++++++++++++++++++++------------<br> 1 file changed, 148 insertions(+), 52 deletions(-)<br><br>diff --git a/src/box/applier.cc b/src/box/applier.cc<br>index 1387d518c..c1d07ca54 100644<br>--- a/src/box/applier.cc<br>+++ b/src/box/applier.cc<br>@@ -51,8 +51,10 @@<br> #include "schema.h"<br> #include "txn.h"<br> #include "box.h"<br>+#include "xrow.h"<br> #include "scoped_guard.h"<br> #include "txn_limbo.h"<br>+#include "journal.h"<br> <br> STRS(applier_state, applier_STATE);<br> <br>@@ -268,45 +270,11 @@ process_nop(struct request *request)<br>  return txn_commit_stmt(txn, request);<br> }<br> <br>-/*<br>- * CONFIRM/ROLLBACK rows aren't dml requests and require special<br>- * handling: instead of performing some operations on spaces,<br>- * processing these requests requires txn_limbo to either confirm<br>- * or rollback some of its entries.<br>- */<br>-static int<br>-process_synchro_row(struct request *request)<br>-{<br>- assert(iproto_type_is_synchro_request(request->header->type));<br>- struct txn *txn = in_txn();<br>-<br>- struct synchro_request syn_req;<br>- if (xrow_decode_synchro(request->header, &syn_req) != 0)<br>- return -1;<br>- assert(txn->n_applier_rows == 0);<br>- /*<br>- * This is not really a transaction. It just uses txn API<br>- * to put the data into WAL. And obviously it should not<br>- * go to the limbo and block on the very same sync<br>- * transaction which it tries to confirm now.<br>- */<br>- txn_set_flag(txn, TXN_FORCE_ASYNC);<br>-<br>- if (txn_begin_stmt(txn, NULL) != 0)<br>- return -1;<br>- if (txn_commit_stmt(txn, request) != 0)<br>- return -1;<br>- return txn_limbo_process(&txn_limbo, &syn_req);<br>-}<br>-<br> static int<br> apply_row(struct xrow_header *row)<br> {<br>  struct request request;<br>- if (iproto_type_is_synchro_request(row->type)) {<br>- request.header = row;<br>- return process_synchro_row(&request);<br>- }<br>+ assert(!iproto_type_is_synchro_request(row->type));<br>  if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)<br>  return -1;<br>  if (request.type == IPROTO_NOP)<br>@@ -753,19 +721,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows)<br>  next)->row.is_commit);<br> }<br> <br>-static int<br>-applier_txn_rollback_cb(struct trigger *trigger, void *event)<br>+static void<br>+applier_rollback_by_wal_io(void)<br> {<br>- (void) trigger;<br>- struct txn *txn = (struct txn *) event;<br>- /*<br>- * Synchronous transaction rollback due to receiving a<br>- * ROLLBACK entry is a normal event and requires no<br>- * special handling.<br>- */<br>- if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK)<br>- return 0;<br>-<br>  /*<br>  * Setup shared applier diagnostic area.<br>  *<br>@@ -774,9 +732,9 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)<br>  * diag use per-applier diag instead all the time<br>  * (which actually already present in the structure).<br>  *<br>- * But remember that transactions are asynchronous<br>- * and rollback may happen a way latter after it<br>- * passed to the journal engine.<br>+ * But remember that WAL writes are asynchronous and<br>+ * rollback may happen a way later after it was passed to<br>+ * the journal engine.<br>  */<br>  diag_set(ClientError, ER_WAL_IO);<br>  diag_set_error(&replicaset.applier.diag,<br>@@ -787,6 +745,20 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)<br> <br>  /* Rollback applier vclock to the committed one. */<br>  vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);<br>+}<br>+<br>+static int<br>+applier_txn_rollback_cb(struct trigger *trigger, void *event)<br>+{<br>+ (void) trigger;<br>+ struct txn *txn = (struct txn *) event;<br>+ /*<br>+ * Synchronous transaction rollback due to receiving a<br>+ * ROLLBACK entry is a normal event and requires no<br>+ * special handling.<br>+ */<br>+ if (txn->signature != TXN_SIGNATURE_SYNC_ROLLBACK)<br>+ applier_rollback_by_wal_io();<br>  return 0;<br> }<br> <br>@@ -800,6 +772,110 @@ applier_txn_wal_write_cb(struct trigger *trigger, void *event)<br>  return 0;<br> }<br> <br>+struct synchro_entry {<br>+ /** Encoded form of a synchro record. */<br>+ struct synchro_body_bin body_bin;<br>+<br>+ /** xrow to write, used by the journal engine. */<br>+ struct xrow_header row;<br>+<br>+ /**<br>+ * The journal entry itself. Note since<br>+ * it has unsized array it must be the<br>+ * last entry in the structure.<br>+ */<br>+ struct journal_entry journal_entry;<br>+};<br>+<br>+static void<br>+synchro_entry_delete(struct synchro_entry *entry)<br>+{<br>+ free(entry);<br>+}<br>+<br>+/**<br>+ * Async write journal completion.<br>+ */<br>+static void<br>+apply_synchro_row_cb(struct journal_entry *entry)<br>+{<br>+ assert(entry->complete_data != NULL);<br>+ struct synchro_entry *synchro_entry =<br>+ (struct synchro_entry *)entry->complete_data;<br>+ if (entry->res < 0)<br>+ applier_rollback_by_wal_io();<br>+ else<br>+ trigger_run(&replicaset.applier.on_wal_write, NULL);<br>+<br>+ synchro_entry_delete(synchro_entry);<br>+}<br>+<br>+/**<br>+ * Allocate a new synchro_entry to be passed to<br>+ * the journal engine in async write way.<br>+ */<br>+static struct synchro_entry *<br>+synchro_entry_new(struct xrow_header *applier_row,<br>+ struct synchro_request *req)<br>+{<br>+ struct synchro_entry *entry;<br>+ size_t size = sizeof(*entry) + sizeof(struct xrow_header *);<br>+<br>+ /*<br>+ * For simplicity we use malloc here but<br>+ * probably should provide some cache similar<br>+ * to txn cache.<br>+ */<br>+ entry = (struct synchro_entry *)malloc(size);<br>+ if (entry == NULL) {<br>+ diag_set(OutOfMemory, size, "malloc", "synchro_entry");<br>+ return NULL;<br>+ }<br>+<br>+ struct journal_entry *journal_entry = &entry->journal_entry;<br>+ struct synchro_body_bin *body_bin = &entry->body_bin;<br>+ struct xrow_header *row = &entry->row;<br>+<br>+ journal_entry->rows[0] = row;<br>+<br>+ xrow_encode_synchro(row, body_bin, req);<br>+<br>+ row->lsn = applier_row->lsn;<br>+ row->replica_id = applier_row->replica_id;<br>+<br>+ journal_entry_create(journal_entry, 1, xrow_approx_len(row),<br>+ apply_synchro_row_cb, entry);<br>+ return entry;<br>+}<br>+<br>+/** Process a synchro request. */<br>+static int<br>+apply_synchro_row(struct xrow_header *row)<br>+{<br>+ assert(iproto_type_is_synchro_request(row->type));<br>+<br>+ struct synchro_request req;<br>+ if (xrow_decode_synchro(row, &req) != 0)<br>+ goto err;<br>+<br>+ if (txn_limbo_process(&txn_limbo, &req))<br>+ goto err;<br>+<br>+ struct synchro_entry *entry;<br>+ entry = synchro_entry_new(row, &req);<br>+ if (entry == NULL)<br>+ goto err;<br>+<br>+ if (journal_write_async(&entry->journal_entry) != 0) {<br>+ diag_set(ClientError, ER_WAL_IO);<br>+ goto err;<br>+ }<br>+ return 0;<br>+err:<br>+ diag_log();<br>+ return -1;<br>+}<br>+<br> /**<br>  * Apply all rows in the rows queue as a single transaction.<br>  *<br>@@ -847,13 +923,26 @@ applier_apply_tx(struct stailq *rows)<br>  }<br>  }<br> <br>+ if (unlikely(iproto_type_is_synchro_request(first_row->type))) {<br>+ /*<br>+ * Synchro messages are not transactions, in terms<br>+ * of DML. Always sent and written isolated from<br>+ * each other.<br>+ */<br>+ assert(first_row == last_row);<br>+ if (apply_synchro_row(first_row) != 0)<br>+ diag_raise();<br>+ goto success;<br>+ }<br>+<br>  /**<br>  * Explicitly begin the transaction so that we can<br>  * control fiber->gc life cycle and, in case of apply<br>  * conflict safely access failed xrow object and allocate<br>  * IPROTO_NOP on gc.<br>  */<br>- struct txn *txn = txn_begin();<br>+ struct txn *txn;<br>+ txn = txn_begin();</div></div></div></div></blockquote><div> </div><div>Why this change?</div><div> </div><blockquote style="border-left:1px solid #0857A6; margin:10px; padding:0 0 0 10px;"><div><div class="js-helper js-readmsg-msg"><div><div>  struct applier_tx_row *item;<br>  if (txn == NULL) {<br>  latch_unlock(latch);<br>@@ -922,6 +1011,7 @@ applier_apply_tx(struct stailq *rows)<br>  if (txn_commit_async(txn) < 0)<br>  goto fail;<br> <br>+success:<br>  /*<br>  * The transaction was sent to journal so promote vclock.<br>  *<br>@@ -1089,7 +1179,13 @@ applier_subscribe(struct applier *applier)<br> <br>  applier->lag = TIMEOUT_INFINITY;<br> <br>- /* Register triggers to handle WAL writes and rollbacks. */<br>+ /*<br>+ * Register triggers to handle WAL writes and rollbacks.<br>+ *<br>+ * Note we use them for syncronous packets handling as well<br>+ * thus when changing make sure that synchro handling won't<br>+ * be broken.<br>+ */<br>  struct trigger on_wal_write;<br>  trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL);<br>  trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);<br>--<br>2.26.2<br> </div></div></div></div></blockquote><div> </div><div>--<br>Serge Petrenko</div></BODY></HTML>