From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Thu, 13 Jun 2019 18:17:52 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel Message-ID: <20190613151752.4jxj34zywbvtnayn@esperanza> References: <58cc2b3d0c6ee62b3b45d204165f6e107976c35c.1560112747.git.georgy@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <58cc2b3d0c6ee62b3b45d204165f6e107976c35c.1560112747.git.georgy@tarantool.org> To: Georgy Kirichenko Cc: tarantool-patches@freelists.org List-ID: On Sun, Jun 09, 2019 at 11:44:42PM +0300, Georgy Kirichenko wrote: > Applier use asynchronous transaction to batch journal writes. A > sequencer orders transaction execution and check write result while > an applier reads network. In general, I have no objections against the way you handle this. It does seem that we don't need to start a fiber per each transaction - batching WAL writes should be enough in most cases, because memtx never yields while vinyl usually doesn't yield on write (well, there are UPDATE/INSERT but we can replace them with REPLACE on replica; before/on replace triggers are slow by design anyway so we don't need to optimize them). However, I'd expect to see the reasoning in the commit message. Also, I find the implementation a little bit too complex and have a suggestion how we could possibly simplify it - see comments inline. > > Closes: #1254 > --- > src/box/applier.cc | 372 ++++++++++++++++++++++++++++++--------------- > src/box/applier.h | 4 + > src/box/box.cc | 1 + > 3 files changed, 251 insertions(+), 126 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 5a92f6109..9f0efda5a 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -53,6 +53,231 @@ > > STRS(applier_state, applier_STATE); > > +/** > + * Process a no-op request. > + * > + * A no-op request does not affect any space, but it > + * promotes vclock and is written to WAL. > + */ > +static int > +process_nop(struct request *request) > +{ > + assert(request->type == IPROTO_NOP); > + struct txn *txn = in_txn(); > + if (txn_begin_stmt(txn, NULL) == NULL) > + return -1; > + return txn_commit_stmt(txn, request); > +} > + > +static int > +apply_row(struct xrow_header *row) > +{ > + struct request request; > + if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) > + return -1; > + if (request.type == IPROTO_NOP) > + return process_nop(&request); > + struct space *space = space_cache_find(request.space_id); > + if (space == NULL) > + return -1; > + if (box_process_rw(&request, space, NULL) != 0) { > + say_error("error applying row: %s", request_str(&request)); > + return -1; > + } > + return 0; > +} Please avoid moving and adding/patching code in the same patch. Split it if necessary. In this particular case I see no reason for you to move the code. > + > +/** > + * A helper struct to link xrow objects in a list. > + */ > +struct applier_tx_row { > + /* Next transaction row. */ > + struct stailq_entry next; > + /* xrow_header struct for the current transaction row. */ > + struct xrow_header row; > +}; > + > +struct sequencer { > + struct stailq txn_queue; > + struct fiber_cond txn_queue_cond; > + struct vclock net_vclock; > + struct fiber_cond tx_vclock_cond; This cond is never used... > + struct diag diag; > + struct rlist on_fail; > +}; > + > +static struct sequencer sequencer; > + > +static int > +sequencer_collect_f(va_list ap) > +{ > + (void) ap; > + while (!fiber_is_cancelled()) { > + while (stailq_empty(&sequencer.txn_queue)) { > + if (!diag_is_empty(&sequencer.diag)) { > + diag_clear(&sequencer.diag); > + vclock_copy(&sequencer.net_vclock, &replicaset.vclock); > + } > + fiber_cond_wait(&sequencer.txn_queue_cond); > + continue; > + } > + struct txn *txn = > + stailq_shift_entry(&sequencer.txn_queue, struct txn, > + in_txn_cache); > + if (txn_wait(txn) == 0) { > + continue; > + } > + if (diag_is_empty(&sequencer.diag)) { > + diag_move(&fiber()->diag, &sequencer.diag); > + trigger_run(&sequencer.on_fail, NULL); > + } > + } > + return 0; > +} > + > +void > +applier_init() > +{ > + stailq_create(&sequencer.txn_queue); > + fiber_cond_create(&sequencer.txn_queue_cond); > + > + rlist_create(&sequencer.on_fail); > + > + vclock_create(&sequencer.net_vclock); > + fiber_cond_create(&sequencer.tx_vclock_cond); > + diag_create(&sequencer.diag); > + struct fiber *collector = fiber_new("collector", sequencer_collect_f); > + if (collector == NULL) > + panic("Failed to create a sequencer collector fiber"); > + fiber_start(collector, NULL); > +} > + > +static inline void > +sequencer_on_fail(struct trigger *on_fail) > +{ > + trigger_add(&sequencer.on_fail, on_fail); > +} > + > +static void > +sequencer_rollback_cb(struct trigger *trigger, void *event) > +{ > + (void) trigger; > + struct txn *txn = (struct txn *)event; > + stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache); > + fiber_cond_signal(&sequencer.txn_queue_cond); > + > + diag_set(ClientError, ER_WAL_IO); > + diag_move(&fiber()->diag, &sequencer.diag); > + trigger_run(&sequencer.on_fail, &sequencer); > +} > + > +static void > +sequencer_commit_cb(struct trigger *trigger, void *event) > +{ > + (void) trigger; > + (void) event; > + struct txn *txn = (struct txn *)event; > + stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache); > + fiber_cond_signal(&sequencer.txn_queue_cond); > + fiber_cond_broadcast(&sequencer.tx_vclock_cond); > +} > + > +static inline int > +sequencer_submit(uint32_t replica_id, int64_t lsn, struct stailq *rows) > +{ > + struct replica *replica = replica_by_id(replica_id); > + struct latch *latch = (replica ? &replica->order_latch : > + &replicaset.applier.order_latch); > + > + latch_lock(latch); > + if (vclock_get(&sequencer.net_vclock, replica_id) >= lsn) { > + /* Nothing to do. */ > + latch_unlock(latch); > + return 0; > + } > + > + struct trigger *on_rollback; > + struct trigger *on_commit; > + /** > + * Explicitly begin the transaction so that we can > + * control fiber->gc life cycle and, in case of apply > + * conflict safely access failed xrow object and allocate > + * IPROTO_NOP on gc. > + */ > + struct txn *txn = txn_begin(); > + if (txn == NULL) > + goto fail; > + struct applier_tx_row *item; > + stailq_foreach_entry(item, rows, next) { > + struct xrow_header *row = &item->row; > + int res = apply_row(row); > + if (res != 0) { > + struct error *e = diag_last_error(diag_get()); > + /* > + * In case of ER_TUPLE_FOUND error and enabled > + * replication_skip_conflict configuration > + * option, skip applying the foreign row and > + * replace it with NOP in the local write ahead > + * log. > + */ > + if (e->type == &type_ClientError && > + box_error_code(e) == ER_TUPLE_FOUND && > + replication_skip_conflict) { > + diag_clear(diag_get()); > + row->type = IPROTO_NOP; > + row->bodycnt = 0; > + res = apply_row(row); > + } > + } > + if (res != 0) > + goto rollback; > + } > + /* > + * We are going to commit so it's a high time to check if > + * the current transaction has non-local effects. > + */ > + if (txn_is_distributed(txn)) { > + /* > + * A transaction mixes remote and local rows. > + * Local rows must be replicated back, which > + * doesn't make sense since the master likely has > + * new changes which local rows may overwrite. > + * Raise an error. > + */ > + diag_set(ClientError, ER_UNSUPPORTED, > + "Replication", "distributed transactions"); > + goto rollback; > + } > + > + /* We are ready to submit txn to wal. */ > + on_rollback = (struct trigger *)txn_alloc(txn, sizeof(struct trigger)); > + trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL); > + txn_on_rollback(txn, on_rollback); > + > + on_commit = (struct trigger *)txn_alloc(txn, sizeof(struct trigger)); > + trigger_create(on_commit, sequencer_commit_cb, NULL, NULL); > + txn_on_commit(txn, on_commit); > + > + if (txn_write(txn) != 0) > + goto fail; > + > + vclock_follow(&sequencer.net_vclock, replica_id, lsn); > + latch_unlock(latch); > + return 0; > + > +rollback: > + txn_rollback(txn); > + > +fail: > + latch_unlock(latch); > + if (diag_is_empty(&sequencer.diag)) { > + diag_add_error(&sequencer.diag, diag_last_error(&fiber()->diag)); > + trigger_run(&sequencer.on_fail, NULL); > + } > + return -1; > +} The code is quite complex and without comments it's easy to get lost. Please next time write proper comments explaining what all the actors do. Just so I understand what's going on here: 1. applier performs requests comprising a transaction. 2. applier submits WAL write asynchronously. 3. wal appends the journal entry to its queue. 4. applier sets on_commit/rollback trigger to add completed tx to sequencer queue. 5. applier continues to the next transaction (goes to step 1). 6. wal writes the request and performs the txn_complete callback. It also wakes up the fiber waiting for the transaction (hell, there's no such fiber in this case!) 7. txn_complete callback runs on_commit trigger. 8. on_commit trigger set by the applier adds the transaction to the sequencer queue. 9. sequencer calls txn_wait. Since the transaction is complete, it just frees the transaction. This is a good example of over-engineering IMHO :) In particular, I find the sequencer entity redundant, because WAL already acts as a request sequencer so we should reuse its queue rather than introducing a new one. Let's try to simplify things a bit: - Instead of having journal_async_write/journal_async_wait pair, let's introduce single method journal_submit that would take a callback to execute upon write completion. The method doesn't need to wake up the initiating fiber - it should be done by the callback itself, if necessary. No other methods are defined for journal. - Then add txn_commit and txn_commit_async that would do something like this: txn_commit: txn->is_async = false txn_do_commit() while (!txn_is_complete()) fiber_yield_timeout() txn_free() txn_commit_async: txn->is_async = true txn_do_commit() txn_do_commit: do preparatory engine stuff wal_submit(txn_complete) txn_complete: do completing engine stuff run on_commit/rollback triggers if (txn->is_async) txn_free() else wakeup awaiting fiber - Now applier just sets on_rollback trigger to stop in case of WAL write error, then fires txn_commit_async and forgets about the txn - it'll get freed by WAL callback automatically upon completion. No sequencer with a separate queue, no new triggers, no new journal methods. Overall, should look much simpler. Have you considered this? Are there any problems? Please check it out.