From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Thu, 20 Jun 2019 19:37:09 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] Re: [PATCH v4 8/9] applier: apply transaction in parallel Message-ID: <20190620163709.6hlxnsyekyouygde@esperanza> References: <4226262.a1cURWbef2@home.lan> MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Disposition: inline Content-Transfer-Encoding: 8bit In-Reply-To: <4226262.a1cURWbef2@home.lan> To: =?utf-8?B?0JPQtdC+0YDQs9C40Lkg0JrQuNGA0LjRh9C10L3QutC+?= Cc: tarantool-patches@freelists.org List-ID: On Thu, Jun 20, 2019 at 10:41:10AM +0300, Георгий Кириченко wrote: > I'm sorry, there is proper version of the commit: > > Applier use asynchronous transaction to batch journal writes. All > appliers share the replicaset.applier.tx_vclock which means the vclock > applied but not necessarily written to a journal. Appliers use a trigger > to coordinate in case of failure - when a transaction is going to > be rolled back. Also an applier writer condition is shared across all > appliers and signaled in case of commit or hearth beat message. > > Closes: #1254 > --- > src/box/applier.cc | 156 +++++++++++++++++++++++++++++------------ > src/box/applier.h | 9 ++- > src/box/replication.cc | 7 ++ > src/box/replication.h | 14 ++++ > 4 files changed, 138 insertions(+), 48 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 5a92f6109..fee49d8ca 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -50,6 +50,7 @@ > #include "schema.h" > #include "txn.h" > #include "box.h" > +#include "scoped_guard.h" > > STRS(applier_state, applier_STATE); > > @@ -130,11 +131,24 @@ applier_writer_f(va_list ap) > * replication_timeout seconds any more. > */ > if (applier->version_id >= version_id(1, 7, 7)) > - fiber_cond_wait_timeout(&applier->writer_cond, > + fiber_cond_wait_timeout(&replicaset.applier.commit_cond, > TIMEOUT_INFINITY); > else > - fiber_cond_wait_timeout(&applier->writer_cond, > + fiber_cond_wait_timeout(&replicaset.applier.commit_cond, > replication_timeout); Why replace applier->writer_cond with replicaset.applier.commit_cond? This means that even if only one applier is active, we will wake up all of the writers on each commit, which looks strange. > + /* > + * Stay 'orphan' until appliers catch up with > + * the remote vclock at the time of SUBSCRIBE > + * and the lag is less than configured. > + */ > + if (applier->state == APPLIER_SYNC && > + applier->lag <= replication_sync_lag && > + vclock_compare(&applier->remote_vclock_at_subscribe, > + &replicaset.vclock) <= 0) { > + /* Applier is synced, switch to "follow". */ > + applier_set_state(applier, APPLIER_FOLLOW); > + } > + A writer is supposed to send ACKs, not change the applier state. How did this wind up here? Can't we do this right from the on_commit trigger? > /* Send ACKs only when in FOLLOW mode ,*/ > if (applier->state != APPLIER_SYNC && > applier->state != APPLIER_FOLLOW) > @@ -565,6 +579,36 @@ applier_read_tx(struct applier *applier, struct stailq > *rows) > next)->row.is_commit); > } > > +static void > +sequencer_rollback_cb(struct trigger *trigger, void *event) There's no sequencer object so the names are confusing. Let's call them applier_on_rollback/commit? > +{ > + (void) trigger; > + (void) event; > + diag_set(ClientError, ER_WAL_IO); > + diag_move(&fiber()->diag, &replicaset.applier.diag); > + trigger_run(&replicaset.applier.on_replication_fail, NULL); > + vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock); > +} > + > +static void > +sequencer_commit_cb(struct trigger *trigger, void *event) > +{ > + (void) trigger; > + (void) event; > + fiber_cond_broadcast(&replicaset.applier.commit_cond); > +} > + > +static void > +applier_on_fail(struct trigger *trigger, void *event) > +{ > + (void) event; > + struct applier *applier = (struct applier *)trigger->data; > + if (!diag_is_empty(&replicaset.applier.diag)) > + diag_add_error(&applier->diag, > diag_last_error(&replicaset.applier.diag)); > + fiber_cancel(applier->reader); > + > +} > + > /** > * Apply all rows in the rows queue as a single transaction. > * > @@ -573,6 +617,22 @@ applier_read_tx(struct applier *applier, struct stailq > *rows) > static int > applier_apply_tx(struct stailq *rows) > { > + struct xrow_header *first_row = > + &stailq_first_entry(rows, struct applier_tx_row, > + next)->row; > + struct replica *replica = replica_by_id(first_row->replica_id); > + struct latch *latch = (replica ? &replica->order_latch : > + &replicaset.applier.order_latch); > + latch_lock(latch); > + if (vclock_get(&replicaset.applier.net_vclock, first_row->replica_id) >= > + first_row->lsn) { > + /* Check there is a heathbeat message and wake a writers up. */ > + if (first_row->lsn == 0) > + fiber_cond_broadcast(&replicaset.applier.commit_cond); Would be better to check that before taking the latch. We don't need the latch to reply to a heartbeat message, do we? > + latch_unlock(latch); > + return 0; > + } > + > /** > * Explicitly begin the transaction so that we can > * control fiber->gc life cycle and, in case of apply > @@ -581,8 +641,10 @@ applier_apply_tx(struct stailq *rows) > */ > struct txn *txn = txn_begin(); > struct applier_tx_row *item; > - if (txn == NULL) > - diag_raise(); > + if (txn == NULL) { > + latch_unlock(latch); > + return -1; > + } > stailq_foreach_entry(item, rows, next) { > struct xrow_header *row = &item->row; > int res = apply_row(row); > @@ -623,10 +685,34 @@ applier_apply_tx(struct stailq *rows) > "Replication", "distributed transactions"); > goto rollback; > } > - return txn_commit(txn); > > + /* We are ready to submit txn to wal. */ > + struct trigger *on_rollback, *on_commit; > + on_rollback = (struct trigger *)region_alloc(&txn->region, > + sizeof(struct trigger)); > + on_commit = (struct trigger *)region_alloc(&txn->region, > + sizeof(struct trigger)); > + if (on_rollback == NULL || on_commit == NULL) > + goto rollback; > + > + trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL); > + txn_on_rollback(txn, on_rollback); > + > + trigger_create(on_commit, sequencer_commit_cb, NULL, NULL); > + txn_on_commit(txn, on_commit); > + > + if (txn_write(txn) < 0) > + goto fail; > + /* Transaction was sent to journal so promote vclock. */ > + vclock_follow(&replicaset.applier.net_vclock, first_row->replica_id, > + first_row->lsn); > + latch_unlock(latch); > + > + return 0; > rollback: > txn_rollback(txn); > +fail: > + latch_unlock(latch); > fiber_gc(); > return -1; > } > @@ -641,7 +727,6 @@ applier_subscribe(struct applier *applier) > struct ev_io *coio = &applier->io; > struct ibuf *ibuf = &applier->ibuf; > struct xrow_header row; > - struct vclock remote_vclock_at_subscribe; > struct tt_uuid cluster_id = uuid_nil; > > struct vclock vclock; > @@ -668,10 +753,10 @@ applier_subscribe(struct applier *applier) > * the replica, and replica has to check whether > * its and master's cluster ids match. > */ > - vclock_create(&remote_vclock_at_subscribe); > + vclock_create(&applier->remote_vclock_at_subscribe); > xrow_decode_subscribe_response_xc(&row, > &cluster_id, > - &remote_vclock_at_subscribe); > + &applier- > >remote_vclock_at_subscribe); > /* > * If master didn't send us its cluster id > * assume that it has done all the checks. > @@ -686,7 +771,7 @@ applier_subscribe(struct applier *applier) > > say_info("subscribed"); > say_info("remote vclock %s local vclock %s", > - vclock_to_string(&remote_vclock_at_subscribe), > + vclock_to_string(&applier->remote_vclock_at_subscribe), > vclock_to_string(&vclock)); > } > /* > @@ -735,6 +820,15 @@ applier_subscribe(struct applier *applier) > > applier->lag = TIMEOUT_INFINITY; > > + /* Register a trigger to handle replication failures. */ > + struct trigger on_fail; > + trigger_create(&on_fail, applier_on_fail, applier, NULL); > + trigger_add(&replicaset.applier.on_replication_fail, &on_fail); Why do we need on_replication_fail trigger? AFAICS it is called from on_rollback callback. Can't we call applier_on_fail right from there, without the use of the intermediary? > + auto trigger_guard = make_scoped_guard([&] { > + trigger_clear(&on_fail); > + }); > + > + > /* > * Process a stream of rows from the binary log. > */ > @@ -747,47 +841,13 @@ applier_subscribe(struct applier *applier) > applier_set_state(applier, APPLIER_FOLLOW); > } > > - /* > - * Stay 'orphan' until appliers catch up with > - * the remote vclock at the time of SUBSCRIBE > - * and the lag is less than configured. > - */ > - if (applier->state == APPLIER_SYNC && > - applier->lag <= replication_sync_lag && > - vclock_compare(&remote_vclock_at_subscribe, > - &replicaset.vclock) <= 0) { > - /* Applier is synced, switch to "follow". */ > - applier_set_state(applier, APPLIER_FOLLOW); > - } > - > struct stailq rows; > applier_read_tx(applier, &rows); > > - struct xrow_header *first_row = > - &stailq_first_entry(&rows, struct applier_tx_row, > - next)->row; > applier->last_row_time = ev_monotonic_now(loop()); > - struct replica *replica = replica_by_id(first_row->replica_id); > - struct latch *latch = (replica ? &replica->order_latch : > - &replicaset.applier.order_latch); > - /* > - * In a full mesh topology, the same set of changes > - * may arrive via two concurrently running appliers. > - * Hence we need a latch to strictly order all changes > - * that belong to the same server id. > - */ Why did you remove this comment? (You didn't move it, I checked). > - latch_lock(latch); > - if (vclock_get(&replicaset.vclock, first_row->replica_id) < > - first_row->lsn && > - applier_apply_tx(&rows) != 0) { > - latch_unlock(latch); > + if (applier_apply_tx(&rows) != 0) > diag_raise(); > - } > - latch_unlock(latch); > > - if (applier->state == APPLIER_SYNC || > - applier->state == APPLIER_FOLLOW) > - fiber_cond_signal(&applier->writer_cond); > if (ibuf_used(ibuf) == 0) > ibuf_reset(ibuf); > fiber_gc(); > @@ -872,6 +932,11 @@ applier_f(va_list ap) > return -1; > } > } catch (FiberIsCancelled *e) { > + if (!diag_is_empty(&applier->diag)) { > + diag_move(&applier->diag, &fiber()->diag); Hmm, AFAIK we only need this diag for box.info.replication. May be, better patch box.info.replication to use applier->diag instead of applier->reader->diag so that we don't need to move errors around? > + applier_disconnect(applier, APPLIER_STOPPED); > + break; > + } > applier_disconnect(applier, APPLIER_OFF); > break; > } catch (SocketError *e) { > @@ -959,7 +1024,7 @@ applier_new(const char *uri) > applier->last_row_time = ev_monotonic_now(loop()); > rlist_create(&applier->on_state); > fiber_cond_create(&applier->resume_cond); > - fiber_cond_create(&applier->writer_cond); > + diag_create(&applier->diag); > > return applier; > } > @@ -972,7 +1037,6 @@ applier_delete(struct applier *applier) > assert(applier->io.fd == -1); > trigger_destroy(&applier->on_state); > fiber_cond_destroy(&applier->resume_cond); > - fiber_cond_destroy(&applier->writer_cond); > free(applier); > } > > diff --git a/src/box/applier.h b/src/box/applier.h > index 5bff90031..716da32e2 100644 > --- a/src/box/applier.h > +++ b/src/box/applier.h > @@ -74,8 +74,6 @@ struct applier { > struct fiber *reader; > /** Background fiber to reply with vclock */ > struct fiber *writer; > - /** Writer cond. */ > - struct fiber_cond writer_cond; > /** Finite-state machine */ > enum applier_state state; > /** Local time of this replica when the last row has been received */ > @@ -114,8 +112,15 @@ struct applier { > bool is_paused; > /** Condition variable signaled to resume the applier. */ > struct fiber_cond resume_cond; > + /* Diag to raise an error. */ > + struct diag diag; > + /* The masters vclock while subscribe. */ > + struct vclock remote_vclock_at_subscribe; > }; > > +void > +applier_init(); > + This function isn't defined anywhere. > diff --git a/src/box/replication.h b/src/box/replication.h > index 8c8a9927e..a4830f5b5 100644 > --- a/src/box/replication.h > +++ b/src/box/replication.h > @@ -232,6 +232,20 @@ struct replicaset { > * struct replica object). > */ > struct latch order_latch; > + /* > + * A vclock of the last transaction wich was read > + * from an applier connection. > + */ > + struct vclock net_vclock; Please elaborate. Can it be less than replicaset.vclock? Can it be greater? Why? > + /* Signaled on replicated transaction commit. */ > + struct fiber_cond commit_cond; > + /* > + * Trigger to fire when replication stops in case > + * of an error. > + */ > + struct rlist on_replication_fail; > + /* Diag to populate an error acros all appliers. */ > + struct diag diag; > } applier; > /** Map of all known replica_id's to correspponding replica's. */ > struct replica **replica_by_id;