From: Vladimir Davydov <vdavydov.dev@gmail.com> To: Georgy Kirichenko <georgy@tarantool.org> Cc: tarantool-patches@freelists.org Subject: Re: [tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel Date: Thu, 13 Jun 2019 18:17:52 +0300 [thread overview] Message-ID: <20190613151752.4jxj34zywbvtnayn@esperanza> (raw) In-Reply-To: <58cc2b3d0c6ee62b3b45d204165f6e107976c35c.1560112747.git.georgy@tarantool.org> On Sun, Jun 09, 2019 at 11:44:42PM +0300, Georgy Kirichenko wrote: > Applier use asynchronous transaction to batch journal writes. A > sequencer orders transaction execution and check write result while > an applier reads network. In general, I have no objections against the way you handle this. It does seem that we don't need to start a fiber per each transaction - batching WAL writes should be enough in most cases, because memtx never yields while vinyl usually doesn't yield on write (well, there are UPDATE/INSERT but we can replace them with REPLACE on replica; before/on replace triggers are slow by design anyway so we don't need to optimize them). However, I'd expect to see the reasoning in the commit message. Also, I find the implementation a little bit too complex and have a suggestion how we could possibly simplify it - see comments inline. > > Closes: #1254 > --- > src/box/applier.cc | 372 ++++++++++++++++++++++++++++++--------------- > src/box/applier.h | 4 + > src/box/box.cc | 1 + > 3 files changed, 251 insertions(+), 126 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 5a92f6109..9f0efda5a 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -53,6 +53,231 @@ > > STRS(applier_state, applier_STATE); > > +/** > + * Process a no-op request. > + * > + * A no-op request does not affect any space, but it > + * promotes vclock and is written to WAL. > + */ > +static int > +process_nop(struct request *request) > +{ > + assert(request->type == IPROTO_NOP); > + struct txn *txn = in_txn(); > + if (txn_begin_stmt(txn, NULL) == NULL) > + return -1; > + return txn_commit_stmt(txn, request); > +} > + > +static int > +apply_row(struct xrow_header *row) > +{ > + struct request request; > + if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) > + return -1; > + if (request.type == IPROTO_NOP) > + return process_nop(&request); > + struct space *space = space_cache_find(request.space_id); > + if (space == NULL) > + return -1; > + if (box_process_rw(&request, space, NULL) != 0) { > + say_error("error applying row: %s", request_str(&request)); > + return -1; > + } > + return 0; > +} Please avoid moving and adding/patching code in the same patch. Split it if necessary. In this particular case I see no reason for you to move the code. > + > +/** > + * A helper struct to link xrow objects in a list. > + */ > +struct applier_tx_row { > + /* Next transaction row. */ > + struct stailq_entry next; > + /* xrow_header struct for the current transaction row. */ > + struct xrow_header row; > +}; > + > +struct sequencer { > + struct stailq txn_queue; > + struct fiber_cond txn_queue_cond; > + struct vclock net_vclock; > + struct fiber_cond tx_vclock_cond; This cond is never used... > + struct diag diag; > + struct rlist on_fail; > +}; > + > +static struct sequencer sequencer; > + > +static int > +sequencer_collect_f(va_list ap) > +{ > + (void) ap; > + while (!fiber_is_cancelled()) { > + while (stailq_empty(&sequencer.txn_queue)) { > + if (!diag_is_empty(&sequencer.diag)) { > + diag_clear(&sequencer.diag); > + vclock_copy(&sequencer.net_vclock, &replicaset.vclock); > + } > + fiber_cond_wait(&sequencer.txn_queue_cond); > + continue; > + } > + struct txn *txn = > + stailq_shift_entry(&sequencer.txn_queue, struct txn, > + in_txn_cache); > + if (txn_wait(txn) == 0) { > + continue; > + } > + if (diag_is_empty(&sequencer.diag)) { > + diag_move(&fiber()->diag, &sequencer.diag); > + trigger_run(&sequencer.on_fail, NULL); > + } > + } > + return 0; > +} > + > +void > +applier_init() > +{ > + stailq_create(&sequencer.txn_queue); > + fiber_cond_create(&sequencer.txn_queue_cond); > + > + rlist_create(&sequencer.on_fail); > + > + vclock_create(&sequencer.net_vclock); > + fiber_cond_create(&sequencer.tx_vclock_cond); > + diag_create(&sequencer.diag); > + struct fiber *collector = fiber_new("collector", sequencer_collect_f); > + if (collector == NULL) > + panic("Failed to create a sequencer collector fiber"); > + fiber_start(collector, NULL); > +} > + > +static inline void > +sequencer_on_fail(struct trigger *on_fail) > +{ > + trigger_add(&sequencer.on_fail, on_fail); > +} > + > +static void > +sequencer_rollback_cb(struct trigger *trigger, void *event) > +{ > + (void) trigger; > + struct txn *txn = (struct txn *)event; > + stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache); > + fiber_cond_signal(&sequencer.txn_queue_cond); > + > + diag_set(ClientError, ER_WAL_IO); > + diag_move(&fiber()->diag, &sequencer.diag); > + trigger_run(&sequencer.on_fail, &sequencer); > +} > + > +static void > +sequencer_commit_cb(struct trigger *trigger, void *event) > +{ > + (void) trigger; > + (void) event; > + struct txn *txn = (struct txn *)event; > + stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache); > + fiber_cond_signal(&sequencer.txn_queue_cond); > + fiber_cond_broadcast(&sequencer.tx_vclock_cond); > +} > + > +static inline int > +sequencer_submit(uint32_t replica_id, int64_t lsn, struct stailq *rows) > +{ > + struct replica *replica = replica_by_id(replica_id); > + struct latch *latch = (replica ? &replica->order_latch : > + &replicaset.applier.order_latch); > + > + latch_lock(latch); > + if (vclock_get(&sequencer.net_vclock, replica_id) >= lsn) { > + /* Nothing to do. */ > + latch_unlock(latch); > + return 0; > + } > + > + struct trigger *on_rollback; > + struct trigger *on_commit; > + /** > + * Explicitly begin the transaction so that we can > + * control fiber->gc life cycle and, in case of apply > + * conflict safely access failed xrow object and allocate > + * IPROTO_NOP on gc. > + */ > + struct txn *txn = txn_begin(); > + if (txn == NULL) > + goto fail; > + struct applier_tx_row *item; > + stailq_foreach_entry(item, rows, next) { > + struct xrow_header *row = &item->row; > + int res = apply_row(row); > + if (res != 0) { > + struct error *e = diag_last_error(diag_get()); > + /* > + * In case of ER_TUPLE_FOUND error and enabled > + * replication_skip_conflict configuration > + * option, skip applying the foreign row and > + * replace it with NOP in the local write ahead > + * log. > + */ > + if (e->type == &type_ClientError && > + box_error_code(e) == ER_TUPLE_FOUND && > + replication_skip_conflict) { > + diag_clear(diag_get()); > + row->type = IPROTO_NOP; > + row->bodycnt = 0; > + res = apply_row(row); > + } > + } > + if (res != 0) > + goto rollback; > + } > + /* > + * We are going to commit so it's a high time to check if > + * the current transaction has non-local effects. > + */ > + if (txn_is_distributed(txn)) { > + /* > + * A transaction mixes remote and local rows. > + * Local rows must be replicated back, which > + * doesn't make sense since the master likely has > + * new changes which local rows may overwrite. > + * Raise an error. > + */ > + diag_set(ClientError, ER_UNSUPPORTED, > + "Replication", "distributed transactions"); > + goto rollback; > + } > + > + /* We are ready to submit txn to wal. */ > + on_rollback = (struct trigger *)txn_alloc(txn, sizeof(struct trigger)); > + trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL); > + txn_on_rollback(txn, on_rollback); > + > + on_commit = (struct trigger *)txn_alloc(txn, sizeof(struct trigger)); > + trigger_create(on_commit, sequencer_commit_cb, NULL, NULL); > + txn_on_commit(txn, on_commit); > + > + if (txn_write(txn) != 0) > + goto fail; > + > + vclock_follow(&sequencer.net_vclock, replica_id, lsn); > + latch_unlock(latch); > + return 0; > + > +rollback: > + txn_rollback(txn); > + > +fail: > + latch_unlock(latch); > + if (diag_is_empty(&sequencer.diag)) { > + diag_add_error(&sequencer.diag, diag_last_error(&fiber()->diag)); > + trigger_run(&sequencer.on_fail, NULL); > + } > + return -1; > +} The code is quite complex and without comments it's easy to get lost. Please next time write proper comments explaining what all the actors do. Just so I understand what's going on here: 1. applier performs requests comprising a transaction. 2. applier submits WAL write asynchronously. 3. wal appends the journal entry to its queue. 4. applier sets on_commit/rollback trigger to add completed tx to sequencer queue. 5. applier continues to the next transaction (goes to step 1). 6. wal writes the request and performs the txn_complete callback. It also wakes up the fiber waiting for the transaction (hell, there's no such fiber in this case!) 7. txn_complete callback runs on_commit trigger. 8. on_commit trigger set by the applier adds the transaction to the sequencer queue. 9. sequencer calls txn_wait. Since the transaction is complete, it just frees the transaction. This is a good example of over-engineering IMHO :) In particular, I find the sequencer entity redundant, because WAL already acts as a request sequencer so we should reuse its queue rather than introducing a new one. Let's try to simplify things a bit: - Instead of having journal_async_write/journal_async_wait pair, let's introduce single method journal_submit that would take a callback to execute upon write completion. The method doesn't need to wake up the initiating fiber - it should be done by the callback itself, if necessary. No other methods are defined for journal. - Then add txn_commit and txn_commit_async that would do something like this: txn_commit: txn->is_async = false txn_do_commit() while (!txn_is_complete()) fiber_yield_timeout() txn_free() txn_commit_async: txn->is_async = true txn_do_commit() txn_do_commit: do preparatory engine stuff wal_submit(txn_complete) txn_complete: do completing engine stuff run on_commit/rollback triggers if (txn->is_async) txn_free() else wakeup awaiting fiber - Now applier just sets on_rollback trigger to stop in case of WAL write error, then fires txn_commit_async and forgets about the txn - it'll get freed by WAL callback automatically upon completion. No sequencer with a separate queue, no new triggers, no new journal methods. Overall, should look much simpler. Have you considered this? Are there any problems? Please check it out.
next prev parent reply other threads:[~2019-06-13 15:17 UTC|newest] Thread overview: 42+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko 2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 01/14] txn: Fire a trigger after a transaction finalization Georgy Kirichenko 2019-06-09 21:59 ` [tarantool-patches] " Konstantin Osipov 2019-06-11 11:42 ` [tarantool-patches] " Vladimir Davydov 2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 02/14] ddl: synchronize privileges cache with actual data state Georgy Kirichenko 2019-06-11 13:13 ` Vladimir Davydov 2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 03/14] txn: transaction memory allocation Georgy Kirichenko 2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 04/14] ddl: place alter structures onto a txn memory region Georgy Kirichenko 2019-06-11 14:14 ` Vladimir Davydov 2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 05/14] txn: get rid of autocommit from a txn structure Georgy Kirichenko 2019-06-13 14:11 ` Vladimir Davydov 2019-06-16 16:20 ` [tarantool-patches] " Konstantin Osipov 2019-06-16 16:14 ` Konstantin Osipov 2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 06/14] txn: get rid of fiber_gc from txn_rollback Georgy Kirichenko 2019-06-13 14:12 ` Vladimir Davydov 2019-06-13 19:28 ` Георгий Кириченко 2019-06-14 9:21 ` Vladimir Davydov 2019-06-16 16:38 ` [tarantool-patches] " Konstantin Osipov 2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 07/14] wal: remove fiber from a journal_entry structure Georgy Kirichenko 2019-06-13 14:17 ` Vladimir Davydov 2019-06-13 19:33 ` Георгий Кириченко 2019-06-14 8:05 ` Vladimir Davydov 2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 08/14] wal: enable asyncronous wal writes Georgy Kirichenko 2019-06-13 14:21 ` Vladimir Davydov 2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 09/14] wal: a dedicated wal scheduling fiber Georgy Kirichenko 2019-06-13 14:24 ` Vladimir Davydov 2019-06-13 19:36 ` Георгий Кириченко 2019-06-14 9:20 ` Vladimir Davydov 2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 10/14] core: latch_unlock_external routine Georgy Kirichenko 2019-06-13 14:27 ` Vladimir Davydov 2019-06-13 19:38 ` Георгий Кириченко 2019-06-14 8:10 ` Vladimir Davydov 2019-06-14 9:18 ` Vladimir Davydov 2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 11/14] txn: introduce asynchronous txn commit Georgy Kirichenko 2019-06-13 14:34 ` Vladimir Davydov 2019-06-13 19:45 ` Георгий Кириченко 2019-06-14 7:58 ` Vladimir Davydov 2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 12/14] txn: handle fiber stop event at transaction level Georgy Kirichenko 2019-06-13 14:36 ` Vladimir Davydov 2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel Georgy Kirichenko 2019-06-13 15:17 ` Vladimir Davydov [this message] 2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 14/14] test: fix flaky test Georgy Kirichenko
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20190613151752.4jxj34zywbvtnayn@esperanza \ --to=vdavydov.dev@gmail.com \ --cc=georgy@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox