From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> To: Cyrill Gorcunov <gorcunov@gmail.com>, tml <tarantool-patches@dev.tarantool.org> Subject: Re: [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine Date: Sat, 15 Aug 2020 17:06:05 +0200 [thread overview] Message-ID: <af0743ac-3493-0c29-3380-21d41c6a8f1b@tarantool.org> (raw) In-Reply-To: <20200814211442.667099-8-gorcunov@gmail.com> Thanks for the patch! See 10 comments below. On 14.08.2020 23:14, Cyrill Gorcunov wrote: > Transaction processing code is very heavy simply because > trasactions are carrying various data and involves a number > of other mechanisms to procceed. 1. trasactions -> transactions. procceed -> proceed. > In turn, when we receive confirm or rollback packed from > another node in a cluster we just need to inspect limbo > queue and write this packed into a WAL journal. So calling > a bunch of txn engine helpers is simply waste of cycles. > > Thus lets rather handle them in a special light way: > > - allocate synchro_entry structure which would carry > the journal entry itself and encoded message > - process limbo queue to mark confirmed/rollback'ed > messages > - finally write this synchro_entry into a journal > > Which is a way more simplier. 2. 'more simplier' -> simpler. Otherwise looks like 'более лучше'. > Part-of #5129 > > Suggedsted-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> > Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> > --- > src/box/applier.cc | 179 +++++++++++++++++++++++++++++++++++++++++++-- > 1 file changed, 172 insertions(+), 7 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index a71516282..a1ce7a23f 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -841,6 +843,151 @@ applier_unlock(struct latch *latch) > latch_unlock(latch); > } > > +struct synchro_entry { > + /** An applier initiated the syncho request. */ > + struct applier *applier; 3. Actually 'applier' is not needed. I looked around and realized it is never used. I even dropped it and nothing changed. > + > + /** Encoded form of a synchro record. */ > + struct synchro_body_bin body_bin; > + > + /** xrow to write, used by the journal engine. */ > + struct xrow_header row; > + > + /** > + * The journal entry itself. Note since > + * it has unsized array it must be the > + * last entry in the structure. > + */ > + struct journal_entry journal_entry; > +}; > + > +static void > +synchro_entry_delete(struct synchro_entry *entry) > +{ > + free(entry); > +} > + > +/** > + * Async write journal completion. > + */ > +static void > +apply_synchro_row_cb(struct journal_entry *entry) > +{ > + assert(entry->complete_data != NULL); > + struct synchro_entry *synchro_entry = > + (struct synchro_entry *)entry->complete_data; > + struct applier *applier = synchro_entry->applier; > + > + /* > + * We can reuse triggers, they are allocated when > + * applier get subscribed and since packets handling > + * is processed after the subscribtion phase the triggers > + * will be alive. 4. subscribtion -> subscription. Also I don't think I understood the comment. > + */ > + if (entry->res < 0) { > + trigger_run(&replicaset.applier.on_rollback, applier); > + /* > + * Restore the last written vlock value. > + */ > + vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); > + diag_set(ClientError, ER_WAL_IO); > + diag_log(); 5. Error should be set before running on_rollback, and it should be installed into replicaset.applier.diag. > + } else { > + trigger_run(&replicaset.applier.on_wal_write, applier); > + } > + > + synchro_entry_delete(synchro_entry); > +} > + > +/** > + * Allocate a new synchro_entry to be passed to > + * the journal engine in async write way. > + */ > +static struct synchro_entry * > +synchro_entry_new(struct applier *applier, > + struct xrow_header *applier_row, > + struct synchro_request *req) > +{ > + struct synchro_entry *entry; > + size_t size = sizeof(*entry) + sizeof(struct xrow_header *); 6. Why don't you just add 'struct xrow_header*[1]' to the end of struct synchro_entry? There is no a case, when the entry is needed without the xrow_header pointer in the end. > + > + /* > + * For simplicity we use malloc here but > + * probably should provide some cache similar > + * to txn cache. > + */ > + entry = (struct synchro_entry *)malloc(size); > + if (entry == NULL) { > + diag_set(OutOfMemory, size, "malloc", "synchro_entry"); > + return NULL; > + } > + > + struct journal_entry *journal_entry = &entry->journal_entry; > + struct synchro_body_bin *body_bin = &entry->body_bin; > + struct xrow_header *row = &entry->row; > + > + entry->applier = applier; > + journal_entry->rows[0] = row; > + > + xrow_encode_synchro(row, body_bin, req); > + > + row->lsn = applier_row->lsn; > + row->replica_id = applier_row->replica_id; > + > + journal_entry_create(journal_entry, 1, xrow_approx_len(row), > + apply_synchro_row_cb, entry); > + return entry; > +} > + > +/* > + * Process a synchro request from incoming applier packet > + * without using txn engine, for a speed sake. 7. It is not about speed. Txn module is fast, it is one of the hottest and most optimized places in the whole code base. And this is exactly why synchro requests *should not* be there - they slow down and complicate txn, not vice versa. > + */ > +static int > +apply_synchro_row(struct applier *applier, struct xrow_header *row) > +{ > + assert(iproto_type_is_synchro_request(row->type)); > + > + struct latch *latch = applier_lock(row->replica_id); > + if (vclock_get(&replicaset.applier.vclock, > + row->replica_id) >= row->lsn) { > + applier_unlock(latch); > + return 0; > + } > + > + struct synchro_request req; > + if (xrow_decode_synchro(row, &req) != 0) > + goto out; > + > + if (txn_limbo_process(&txn_limbo, &req)) > + goto out; > + > + struct synchro_entry *entry; > + entry = synchro_entry_new(applier, row, &req); > + if (entry == NULL) > + goto out; > + > + if (journal_write_async(&entry->journal_entry) != 0) { > + diag_set(ClientError, ER_WAL_IO); > + goto out; > + } > + > + /* > + * In case if something get wrong the journal completion > + * handler will set the applier's vclock back to last > + * successfully WAL written value. > + */ > + vclock_follow(&replicaset.applier.vclock, > + row->replica_id, row->lsn); > + applier_unlock(latch); 8. Code duplication is too big. And I wouldn't mind if it was just applier locks, but vclock propagation and rollback is not that simple. I think we should do all that inside {applier_apply_tx()}. Because technically you apply tx - the synchro row is stored inside {struct stailq *rows} which is the tx. I moved it into {applier_apply_tx()}, and the code became smaller, simpler, with less duplication, and even less diff. It also allows to drop the commits 5/8 and 6/8. Take a look and lets discuss. > + return 0; > + > +out: > + diag_log(); > + applier_unlock(latch); > + return -1; > +} > + > /** > * Apply all rows in the rows queue as a single transaction. > * > @@ -1148,15 +1301,27 @@ applier_subscribe(struct applier *applier) > applier_read_tx(applier, &rows); > > applier->last_row_time = ev_monotonic_now(loop()); > + struct xrow_header *row = applier_first_row(&rows); > > - /* > - * In case of an heartbeat message wake a writer up > - * and check applier state. > - */ > - if (applier_first_row(&rows)->lsn == 0) > + if (row->lsn == 0) { > + /* > + * In case of an heartbeat message > + * wake a writer up and check > + * the applier state. > + */ > applier_signal_ack(applier); > - else if (applier_apply_tx(&rows) != 0) > + } else if (iproto_type_is_synchro_request(row->type)) { > + /* > + * Make sure synchro messages are never reached > + * in a batch (this is by design for simplicity > + * sake). 9. It is not about simplicity. It is about being not necessary. Transactions exist for DML and DDL (which is also DML on system spaces) only. For other WAL writes transactions in their common sense don't exist. So each row is a 'transaction'. In future we may want to change that and, for example, incorporate several synchro requests into a 'tx' (don't know why would we need that, but it is technically possible). > + */ > + assert(stailq_first(&rows) == stailq_last(&rows)); > + if (apply_synchro_row(applier, row) != 0) > + diag_raise(); > + } else if (applier_apply_tx(&rows) != 0) { > diag_raise(); > + } > > if (ibuf_used(ibuf) == 0) > ibuf_reset(ibuf); > 10. Consider my changes on top of this commit on your branch. Below I paste my diff squashed into your commit (on the branch they are not squashed). ==================== diff --git a/src/box/applier.cc b/src/box/applier.cc index a71516282..dfa62b72a 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -51,8 +51,10 @@ #include "schema.h" #include "txn.h" #include "box.h" +#include "xrow.h" #include "scoped_guard.h" #include "txn_limbo.h" +#include "journal.h" STRS(applier_state, applier_STATE); @@ -772,19 +774,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows) } while (!applier_last_row(rows)->is_commit); } -static int -applier_txn_rollback_cb(struct trigger *trigger, void *event) +static void +applier_rollback_by_wal_io(void) { - (void) trigger; - struct txn *txn = (struct txn *) event; - /* - * Synchronous transaction rollback due to receiving a - * ROLLBACK entry is a normal event and requires no - * special handling. - */ - if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK) - return 0; - /* * Setup shared applier diagnostic area. * @@ -793,19 +785,32 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event) * diag use per-applier diag instead all the time * (which actually already present in the structure). * - * But remember that transactions are asynchronous - * and rollback may happen a way latter after it - * passed to the journal engine. + * But remember that WAL writes are asynchronous and + * rollback may happen a way later after it was passed to + * the journal engine. */ diag_set(ClientError, ER_WAL_IO); diag_set_error(&replicaset.applier.diag, diag_last_error(diag_get())); - /* Broadcast the rollback event across all appliers. */ - trigger_run(&replicaset.applier.on_rollback, event); - + /* Broadcast the rollback across all appliers. */ + trigger_run(&replicaset.applier.on_rollback, NULL); /* Rollback applier vclock to the committed one. */ vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); +} + +static int +applier_txn_rollback_cb(struct trigger *trigger, void *event) +{ + (void) trigger; + struct txn *txn = (struct txn *) event; + /* + * Synchronous transaction rollback due to receiving a + * ROLLBACK entry is a normal event and requires no + * special handling. + */ + if (txn->signature != TXN_SIGNATURE_SYNC_ROLLBACK) + applier_rollback_by_wal_io(); return 0; } @@ -841,6 +846,110 @@ applier_unlock(struct latch *latch) latch_unlock(latch); } +struct synchro_entry { + /** Encoded form of a synchro record. */ + struct synchro_body_bin body_bin; + + /** xrow to write, used by the journal engine. */ + struct xrow_header row; + + /** + * The journal entry itself. Note since + * it has unsized array it must be the + * last entry in the structure. + */ + struct journal_entry journal_entry; +}; + +static void +synchro_entry_delete(struct synchro_entry *entry) +{ + free(entry); +} + +/** + * Async write journal completion. + */ +static void +apply_synchro_row_cb(struct journal_entry *entry) +{ + assert(entry->complete_data != NULL); + struct synchro_entry *synchro_entry = + (struct synchro_entry *)entry->complete_data; + if (entry->res < 0) + applier_rollback_by_wal_io(); + else + trigger_run(&replicaset.applier.on_wal_write, NULL); + + synchro_entry_delete(synchro_entry); +} + +/** + * Allocate a new synchro_entry to be passed to + * the journal engine in async write way. + */ +static struct synchro_entry * +synchro_entry_new(struct xrow_header *applier_row, + struct synchro_request *req) +{ + struct synchro_entry *entry; + size_t size = sizeof(*entry) + sizeof(struct xrow_header *); + + /* + * For simplicity we use malloc here but + * probably should provide some cache similar + * to txn cache. + */ + entry = (struct synchro_entry *)malloc(size); + if (entry == NULL) { + diag_set(OutOfMemory, size, "malloc", "synchro_entry"); + return NULL; + } + + struct journal_entry *journal_entry = &entry->journal_entry; + struct synchro_body_bin *body_bin = &entry->body_bin; + struct xrow_header *row = &entry->row; + + journal_entry->rows[0] = row; + + xrow_encode_synchro(row, body_bin, req); + + row->lsn = applier_row->lsn; + row->replica_id = applier_row->replica_id; + + journal_entry_create(journal_entry, 1, xrow_approx_len(row), + apply_synchro_row_cb, entry); + return entry; +} + +/** Process a synchro request. */ +static int +apply_synchro_row(struct xrow_header *row) +{ + assert(iproto_type_is_synchro_request(row->type)); + + struct synchro_request req; + if (xrow_decode_synchro(row, &req) != 0) + goto err; + + if (txn_limbo_process(&txn_limbo, &req)) + goto err; + + struct synchro_entry *entry; + entry = synchro_entry_new(row, &req); + if (entry == NULL) + goto err; + + if (journal_write_async(&entry->journal_entry) != 0) { + diag_set(ClientError, ER_WAL_IO); + goto err; + } + return 0; +err: + diag_log(); + return -1; +} + /** * Apply all rows in the rows queue as a single transaction. * @@ -876,13 +985,26 @@ applier_apply_tx(struct stailq *rows) } } + if (unlikely(iproto_type_is_synchro_request(first_row->type))) { + /* + * Synchro messages are not transactions, in terms + * of DML. Always sent and written isolated from + * each other. + */ + assert(first_row == last_row); + if (apply_synchro_row(first_row) != 0) + diag_raise(); + goto success; + } + /** * Explicitly begin the transaction so that we can * control fiber->gc life cycle and, in case of apply * conflict safely access failed xrow object and allocate * IPROTO_NOP on gc. */ - struct txn *txn = txn_begin(); + struct txn *txn; + txn = txn_begin(); struct applier_tx_row *item; if (txn == NULL) { applier_unlock(latch); @@ -951,6 +1073,7 @@ applier_apply_tx(struct stailq *rows) if (txn_commit_async(txn) < 0) goto fail; +success: /* * The transaction was sent to journal so promote vclock. * @@ -1118,7 +1241,13 @@ applier_subscribe(struct applier *applier) applier->lag = TIMEOUT_INFINITY; - /* Register triggers to handle WAL writes and rollbacks. */ + /* + * Register triggers to handle WAL writes and rollbacks. + * + * Note we use them for syncronous packets handling as well + * thus when changing make sure that synchro handling won't + * be broken. + */ struct trigger on_wal_write; trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL); trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);
next prev parent reply other threads:[~2020-08-15 15:06 UTC|newest] Thread overview: 20+ messages / expand[flat|nested] mbox.gz Atom feed top 2020-08-14 21:14 [Tarantool-patches] [PATCH v7 0/8] qsync: write CONFIRM/ROLLBACK " Cyrill Gorcunov 2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 1/8] journal: bind asynchronous write completion to an entry Cyrill Gorcunov 2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 2/8] journal: add journal_entry_create helper Cyrill Gorcunov 2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 3/8] qsync: provide a binary form of syncro entries Cyrill Gorcunov 2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 4/8] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov 2020-08-15 15:04 ` Vladislav Shpilevoy 2020-08-15 16:26 ` Cyrill Gorcunov 2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 5/8] applier: factor out latch locking Cyrill Gorcunov 2020-08-15 15:04 ` Vladislav Shpilevoy 2020-08-15 16:27 ` Cyrill Gorcunov 2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 6/8] applier: add shorthands to queue access Cyrill Gorcunov 2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine Cyrill Gorcunov 2020-08-15 15:06 ` Vladislav Shpilevoy [this message] 2020-08-17 12:42 ` Cyrill Gorcunov 2020-08-17 20:49 ` Vladislav Shpilevoy 2020-08-18 8:08 ` Cyrill Gorcunov 2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 8/8] applier: drop process_synchro_row Cyrill Gorcunov 2020-08-15 8:38 ` [Tarantool-patches] [PATCH v7 9/8] txn: txn_add_redo -- drop synchro processing Cyrill Gorcunov 2020-08-15 15:06 ` Vladislav Shpilevoy 2020-08-17 8:03 ` Cyrill Gorcunov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=af0743ac-3493-0c29-3380-21d41c6a8f1b@tarantool.org \ --to=v.shpilevoy@tarantool.org \ --cc=gorcunov@gmail.com \ --cc=tarantool-patches@dev.tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox