From: Cyrill Gorcunov <gorcunov@gmail.com> To: tml <tarantool-patches@dev.tarantool.org> Cc: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> Subject: [Tarantool-patches] [PATCH v8 7/9] applier: process synchro requests without txn engine Date: Mon, 17 Aug 2020 16:39:16 +0300 [thread overview] Message-ID: <20200817133918.875558-8-gorcunov@gmail.com> (raw) In-Reply-To: <20200817133918.875558-1-gorcunov@gmail.com> Transaction processing code is very heavy simply because transactions are carrying various data and involves a number of other mechanisms to proceed. In turn, when we receive confirm or rollback packed from another node in a cluster we just need to inspect limbo queue and write this packed into a WAL journal. So calling a bunch of txn engine helpers is simply waste of cycles. Thus lets rather handle them in a special light way: - allocate synchro_entry structure which would carry the journal entry itself and encoded message - process limbo queue to mark confirmed/rollback'ed messages - finally write this synchro_entry into a journal Which is a way simplier. Part-of #5129 Suggedsted-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> Co-developed-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> --- src/box/applier.cc | 169 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 149 insertions(+), 20 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 860a18681..83f6da461 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -51,8 +51,10 @@ #include "schema.h" #include "txn.h" #include "box.h" +#include "xrow.h" #include "scoped_guard.h" #include "txn_limbo.h" +#include "journal.h" STRS(applier_state, applier_STATE); @@ -772,19 +774,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows) } while (!applier_last_row(rows)->is_commit); } -static int -applier_txn_rollback_cb(struct trigger *trigger, void *event) +static void +applier_rollback_by_wal_io(void) { - (void) trigger; - struct txn *txn = (struct txn *) event; - /* - * Synchronous transaction rollback due to receiving a - * ROLLBACK entry is a normal event and requires no - * special handling. - */ - if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK) - return 0; - /* * Setup shared applier diagnostic area. * @@ -793,19 +785,32 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event) * diag use per-applier diag instead all the time * (which actually already present in the structure). * - * But remember that transactions are asynchronous - * and rollback may happen a way latter after it - * passed to the journal engine. + * But remember that WAL writes are asynchronous and + * rollback may happen a way later after it was passed to + * the journal engine. */ diag_set(ClientError, ER_WAL_IO); diag_set_error(&replicaset.applier.diag, diag_last_error(diag_get())); - /* Broadcast the rollback event across all appliers. */ - trigger_run(&replicaset.applier.on_rollback, event); - + /* Broadcast the rollback across all appliers. */ + trigger_run(&replicaset.applier.on_rollback, NULL); /* Rollback applier vclock to the committed one. */ vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); +} + +static int +applier_txn_rollback_cb(struct trigger *trigger, void *event) +{ + (void) trigger; + struct txn *txn = (struct txn *) event; + /* + * Synchronous transaction rollback due to receiving a + * ROLLBACK entry is a normal event and requires no + * special handling. + */ + if (txn->signature != TXN_SIGNATURE_SYNC_ROLLBACK) + applier_rollback_by_wal_io(); return 0; } @@ -818,6 +823,110 @@ applier_txn_wal_write_cb(struct trigger *trigger, void *event) return 0; } +struct synchro_entry { + /** Encoded form of a synchro record. */ + struct synchro_body_bin body_bin; + + /** xrow to write, used by the journal engine. */ + struct xrow_header row; + + /** + * The journal entry itself. Note since + * it has unsized array it must be the + * last entry in the structure. + */ + struct journal_entry journal_entry; +}; + +static void +synchro_entry_delete(struct synchro_entry *entry) +{ + free(entry); +} + +/** + * Async write journal completion. + */ +static void +apply_synchro_row_cb(struct journal_entry *entry) +{ + assert(entry->complete_data != NULL); + struct synchro_entry *synchro_entry = + (struct synchro_entry *)entry->complete_data; + if (entry->res < 0) + applier_rollback_by_wal_io(); + else + trigger_run(&replicaset.applier.on_wal_write, NULL); + + synchro_entry_delete(synchro_entry); +} + +/** + * Allocate a new synchro_entry to be passed to + * the journal engine in async write way. + */ +static struct synchro_entry * +synchro_entry_new(struct xrow_header *applier_row, + struct synchro_request *req) +{ + struct synchro_entry *entry; + size_t size = sizeof(*entry) + sizeof(struct xrow_header *); + + /* + * For simplicity we use malloc here but + * probably should provide some cache similar + * to txn cache. + */ + entry = (struct synchro_entry *)malloc(size); + if (entry == NULL) { + diag_set(OutOfMemory, size, "malloc", "synchro_entry"); + return NULL; + } + + struct journal_entry *journal_entry = &entry->journal_entry; + struct synchro_body_bin *body_bin = &entry->body_bin; + struct xrow_header *row = &entry->row; + + journal_entry->rows[0] = row; + + xrow_encode_synchro(row, body_bin, req); + + row->lsn = applier_row->lsn; + row->replica_id = applier_row->replica_id; + + journal_entry_create(journal_entry, 1, xrow_approx_len(row), + apply_synchro_row_cb, entry); + return entry; +} + +/** Process a synchro request. */ +static int +apply_synchro_row(struct xrow_header *row) +{ + assert(iproto_type_is_synchro_request(row->type)); + + struct synchro_request req; + if (xrow_decode_synchro(row, &req) != 0) + goto err; + + if (txn_limbo_process(&txn_limbo, &req)) + goto err; + + struct synchro_entry *entry; + entry = synchro_entry_new(row, &req); + if (entry == NULL) + goto err; + + if (journal_write_async(&entry->journal_entry) != 0) { + diag_set(ClientError, ER_WAL_IO); + goto err; + } + return 0; +err: + diag_log(); + return -1; +} + /** * Apply all rows in the rows queue as a single transaction. * @@ -861,13 +970,26 @@ applier_apply_tx(struct stailq *rows) } } + if (unlikely(iproto_type_is_synchro_request(first_row->type))) { + /* + * Synchro messages are not transactions, in terms + * of DML. Always sent and written isolated from + * each other. + */ + assert(first_row == last_row); + if (apply_synchro_row(first_row) != 0) + diag_raise(); + goto success; + } + /** * Explicitly begin the transaction so that we can * control fiber->gc life cycle and, in case of apply * conflict safely access failed xrow object and allocate * IPROTO_NOP on gc. */ - struct txn *txn = txn_begin(); + struct txn *txn; + txn = txn_begin(); struct applier_tx_row *item; if (txn == NULL) { latch_unlock(latch); @@ -936,6 +1058,7 @@ applier_apply_tx(struct stailq *rows) if (txn_commit_async(txn) < 0) goto fail; +success: /* * The transaction was sent to journal so promote vclock. * @@ -1103,7 +1226,13 @@ applier_subscribe(struct applier *applier) applier->lag = TIMEOUT_INFINITY; - /* Register triggers to handle WAL writes and rollbacks. */ + /* + * Register triggers to handle WAL writes and rollbacks. + * + * Note we use them for syncronous packets handling as well + * thus when changing make sure that synchro handling won't + * be broken. + */ struct trigger on_wal_write; trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL); trigger_add(&replicaset.applier.on_wal_write, &on_wal_write); -- 2.26.2
next prev parent reply other threads:[~2020-08-17 13:40 UTC|newest] Thread overview: 20+ messages / expand[flat|nested] mbox.gz Atom feed top 2020-08-17 13:39 [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK " Cyrill Gorcunov 2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 1/9] xrow: introduce struct synchro_request Cyrill Gorcunov 2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 2/9] journal: bind asynchronous write completion to an entry Cyrill Gorcunov 2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 3/9] journal: add journal_entry_create helper Cyrill Gorcunov 2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 4/9] qsync: provide a binary form of syncro entries Cyrill Gorcunov 2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 5/9] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov 2020-08-17 20:49 ` Vladislav Shpilevoy 2020-08-17 22:16 ` Cyrill Gorcunov 2020-08-17 22:23 ` Cyrill Gorcunov 2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 6/9] applier: add shorthands to queue access Cyrill Gorcunov 2020-08-17 20:49 ` Vladislav Shpilevoy 2020-08-17 22:14 ` Cyrill Gorcunov 2020-08-18 19:18 ` Vladislav Shpilevoy 2020-08-19 20:37 ` Vladislav Shpilevoy 2020-08-19 20:49 ` Cyrill Gorcunov 2020-08-17 13:39 ` Cyrill Gorcunov [this message] 2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 8/9] txn: txn_add_redo -- drop synchro processing Cyrill Gorcunov 2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 9/9] xrow: drop xrow_header_dup_body Cyrill Gorcunov 2020-08-17 21:24 ` [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine Vladislav Shpilevoy 2020-08-17 21:54 ` Cyrill Gorcunov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20200817133918.875558-8-gorcunov@gmail.com \ --to=gorcunov@gmail.com \ --cc=tarantool-patches@dev.tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v8 7/9] applier: process synchro requests without txn engine' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox