From: "Serge Petrenko" <sergepetrenko@tarantool.org> To: "Cyrill Gorcunov" <gorcunov@gmail.com> Cc: tml <tarantool-patches@dev.tarantool.org>, "Vladislav Shpilevoy" <v.shpilevoy@tarantool.org> Subject: Re: [Tarantool-patches] [PATCH v9 5/7] applier: process synchro requests without txn engine Date: Fri, 21 Aug 2020 11:51:13 +0300 [thread overview] Message-ID: <1597999873.222856578@f425.i.mail.ru> (raw) In-Reply-To: <20200819213442.1099018-6-gorcunov@gmail.com> [-- Attachment #1: Type: text/plain, Size: 9162 bytes --] Hi! Thanks for the patch. LGTM with one comment which’s up to you. >Четверг, 20 августа 2020, 0:36 +03:00 от Cyrill Gorcunov <gorcunov@gmail.com>: > >Transaction processing code is very heavy simply because >transactions are carrying various data and involves a number >of other mechanisms to proceed. > >In turn, when we receive confirm or rollback packed from >another node in a cluster we just need to inspect limbo >queue and write this packed into a WAL journal. So calling >a bunch of txn engine helpers is simply waste of cycles. > >Thus lets rather handle them in a special light way: > > - allocate synchro_entry structure which would carry > the journal entry itself and encoded message > - process limbo queue to mark confirmed/rollback'ed > messages > - finally write this synchro_entry into a journal > >Which is a way simplier. > >Part-of #5129 > >Suggedsted-by: Vladislav Shpilevoy < v.shpilevoy@tarantool.org > >Co-developed-by: Vladislav Shpilevoy < v.shpilevoy@tarantool.org > >Signed-off-by: Cyrill Gorcunov < gorcunov@gmail.com > >--- > src/box/applier.cc | 200 +++++++++++++++++++++++++++++++++------------ > 1 file changed, 148 insertions(+), 52 deletions(-) > >diff --git a/src/box/applier.cc b/src/box/applier.cc >index 1387d518c..c1d07ca54 100644 >--- a/src/box/applier.cc >+++ b/src/box/applier.cc >@@ -51,8 +51,10 @@ > #include "schema.h" > #include "txn.h" > #include "box.h" >+#include "xrow.h" > #include "scoped_guard.h" > #include "txn_limbo.h" >+#include "journal.h" > > STRS(applier_state, applier_STATE); > >@@ -268,45 +270,11 @@ process_nop(struct request *request) > return txn_commit_stmt(txn, request); > } > >-/* >- * CONFIRM/ROLLBACK rows aren't dml requests and require special >- * handling: instead of performing some operations on spaces, >- * processing these requests requires txn_limbo to either confirm >- * or rollback some of its entries. >- */ >-static int >-process_synchro_row(struct request *request) >-{ >- assert(iproto_type_is_synchro_request(request->header->type)); >- struct txn *txn = in_txn(); >- >- struct synchro_request syn_req; >- if (xrow_decode_synchro(request->header, &syn_req) != 0) >- return -1; >- assert(txn->n_applier_rows == 0); >- /* >- * This is not really a transaction. It just uses txn API >- * to put the data into WAL. And obviously it should not >- * go to the limbo and block on the very same sync >- * transaction which it tries to confirm now. >- */ >- txn_set_flag(txn, TXN_FORCE_ASYNC); >- >- if (txn_begin_stmt(txn, NULL) != 0) >- return -1; >- if (txn_commit_stmt(txn, request) != 0) >- return -1; >- return txn_limbo_process(&txn_limbo, &syn_req); >-} >- > static int > apply_row(struct xrow_header *row) > { > struct request request; >- if (iproto_type_is_synchro_request(row->type)) { >- request.header = row; >- return process_synchro_row(&request); >- } >+ assert(!iproto_type_is_synchro_request(row->type)); > if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) > return -1; > if (request.type == IPROTO_NOP) >@@ -753,19 +721,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows) > next)->row.is_commit); > } > >-static int >-applier_txn_rollback_cb(struct trigger *trigger, void *event) >+static void >+applier_rollback_by_wal_io(void) > { >- (void) trigger; >- struct txn *txn = (struct txn *) event; >- /* >- * Synchronous transaction rollback due to receiving a >- * ROLLBACK entry is a normal event and requires no >- * special handling. >- */ >- if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK) >- return 0; >- > /* > * Setup shared applier diagnostic area. > * >@@ -774,9 +732,9 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event) > * diag use per-applier diag instead all the time > * (which actually already present in the structure). > * >- * But remember that transactions are asynchronous >- * and rollback may happen a way latter after it >- * passed to the journal engine. >+ * But remember that WAL writes are asynchronous and >+ * rollback may happen a way later after it was passed to >+ * the journal engine. > */ > diag_set(ClientError, ER_WAL_IO); > diag_set_error(&replicaset.applier.diag, >@@ -787,6 +745,20 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event) > > /* Rollback applier vclock to the committed one. */ > vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); >+} >+ >+static int >+applier_txn_rollback_cb(struct trigger *trigger, void *event) >+{ >+ (void) trigger; >+ struct txn *txn = (struct txn *) event; >+ /* >+ * Synchronous transaction rollback due to receiving a >+ * ROLLBACK entry is a normal event and requires no >+ * special handling. >+ */ >+ if (txn->signature != TXN_SIGNATURE_SYNC_ROLLBACK) >+ applier_rollback_by_wal_io(); > return 0; > } > >@@ -800,6 +772,110 @@ applier_txn_wal_write_cb(struct trigger *trigger, void *event) > return 0; > } > >+struct synchro_entry { >+ /** Encoded form of a synchro record. */ >+ struct synchro_body_bin body_bin; >+ >+ /** xrow to write, used by the journal engine. */ >+ struct xrow_header row; >+ >+ /** >+ * The journal entry itself. Note since >+ * it has unsized array it must be the >+ * last entry in the structure. >+ */ >+ struct journal_entry journal_entry; >+}; >+ >+static void >+synchro_entry_delete(struct synchro_entry *entry) >+{ >+ free(entry); >+} >+ >+/** >+ * Async write journal completion. >+ */ >+static void >+apply_synchro_row_cb(struct journal_entry *entry) >+{ >+ assert(entry->complete_data != NULL); >+ struct synchro_entry *synchro_entry = >+ (struct synchro_entry *)entry->complete_data; >+ if (entry->res < 0) >+ applier_rollback_by_wal_io(); >+ else >+ trigger_run(&replicaset.applier.on_wal_write, NULL); >+ >+ synchro_entry_delete(synchro_entry); >+} >+ >+/** >+ * Allocate a new synchro_entry to be passed to >+ * the journal engine in async write way. >+ */ >+static struct synchro_entry * >+synchro_entry_new(struct xrow_header *applier_row, >+ struct synchro_request *req) >+{ >+ struct synchro_entry *entry; >+ size_t size = sizeof(*entry) + sizeof(struct xrow_header *); >+ >+ /* >+ * For simplicity we use malloc here but >+ * probably should provide some cache similar >+ * to txn cache. >+ */ >+ entry = (struct synchro_entry *)malloc(size); >+ if (entry == NULL) { >+ diag_set(OutOfMemory, size, "malloc", "synchro_entry"); >+ return NULL; >+ } >+ >+ struct journal_entry *journal_entry = &entry->journal_entry; >+ struct synchro_body_bin *body_bin = &entry->body_bin; >+ struct xrow_header *row = &entry->row; >+ >+ journal_entry->rows[0] = row; >+ >+ xrow_encode_synchro(row, body_bin, req); >+ >+ row->lsn = applier_row->lsn; >+ row->replica_id = applier_row->replica_id; >+ >+ journal_entry_create(journal_entry, 1, xrow_approx_len(row), >+ apply_synchro_row_cb, entry); >+ return entry; >+} >+ >+/** Process a synchro request. */ >+static int >+apply_synchro_row(struct xrow_header *row) >+{ >+ assert(iproto_type_is_synchro_request(row->type)); >+ >+ struct synchro_request req; >+ if (xrow_decode_synchro(row, &req) != 0) >+ goto err; >+ >+ if (txn_limbo_process(&txn_limbo, &req)) >+ goto err; >+ >+ struct synchro_entry *entry; >+ entry = synchro_entry_new(row, &req); >+ if (entry == NULL) >+ goto err; >+ >+ if (journal_write_async(&entry->journal_entry) != 0) { >+ diag_set(ClientError, ER_WAL_IO); >+ goto err; >+ } >+ return 0; >+err: >+ diag_log(); >+ return -1; >+} >+ > /** > * Apply all rows in the rows queue as a single transaction. > * >@@ -847,13 +923,26 @@ applier_apply_tx(struct stailq *rows) > } > } > >+ if (unlikely(iproto_type_is_synchro_request(first_row->type))) { >+ /* >+ * Synchro messages are not transactions, in terms >+ * of DML. Always sent and written isolated from >+ * each other. >+ */ >+ assert(first_row == last_row); >+ if (apply_synchro_row(first_row) != 0) >+ diag_raise(); >+ goto success; >+ } >+ > /** > * Explicitly begin the transaction so that we can > * control fiber->gc life cycle and, in case of apply > * conflict safely access failed xrow object and allocate > * IPROTO_NOP on gc. > */ >- struct txn *txn = txn_begin(); >+ struct txn *txn; >+ txn = txn_begin(); Why this change? > struct applier_tx_row *item; > if (txn == NULL) { > latch_unlock(latch); >@@ -922,6 +1011,7 @@ applier_apply_tx(struct stailq *rows) > if (txn_commit_async(txn) < 0) > goto fail; > >+success: > /* > * The transaction was sent to journal so promote vclock. > * >@@ -1089,7 +1179,13 @@ applier_subscribe(struct applier *applier) > > applier->lag = TIMEOUT_INFINITY; > >- /* Register triggers to handle WAL writes and rollbacks. */ >+ /* >+ * Register triggers to handle WAL writes and rollbacks. >+ * >+ * Note we use them for syncronous packets handling as well >+ * thus when changing make sure that synchro handling won't >+ * be broken. >+ */ > struct trigger on_wal_write; > trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL); > trigger_add(&replicaset.applier.on_wal_write, &on_wal_write); >-- >2.26.2 > -- Serge Petrenko [-- Attachment #2: Type: text/html, Size: 11001 bytes --]
next prev parent reply other threads:[~2020-08-21 8:51 UTC|newest] Thread overview: 20+ messages / expand[flat|nested] mbox.gz Atom feed top 2020-08-19 21:34 [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK " Cyrill Gorcunov 2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 1/7] journal: bind asynchronous write completion to an entry Cyrill Gorcunov 2020-08-21 7:48 ` Serge Petrenko 2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 2/7] journal: add journal_entry_create helper Cyrill Gorcunov 2020-08-21 7:51 ` Serge Petrenko 2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 3/7] qsync: provide a binary form of syncro entries Cyrill Gorcunov 2020-08-21 8:15 ` Serge Petrenko 2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 4/7] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov 2020-08-19 22:43 ` Vladislav Shpilevoy 2020-08-20 7:13 ` Cyrill Gorcunov 2020-08-21 8:36 ` Serge Petrenko 2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 5/7] applier: process synchro requests without txn engine Cyrill Gorcunov 2020-08-21 8:51 ` Serge Petrenko [this message] 2020-08-21 21:59 ` Vladislav Shpilevoy 2020-08-23 12:15 ` Serge Petrenko 2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 6/7] txn: txn_add_redo -- drop synchro processing Cyrill Gorcunov 2020-08-21 8:52 ` Serge Petrenko 2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 7/7] xrow: drop xrow_header_dup_body Cyrill Gorcunov 2020-08-21 8:57 ` Serge Petrenko 2020-08-24 21:16 ` [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK without txn engine Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=1597999873.222856578@f425.i.mail.ru \ --to=sergepetrenko@tarantool.org \ --cc=gorcunov@gmail.com \ --cc=tarantool-patches@dev.tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v9 5/7] applier: process synchro requests without txn engine' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox