From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: =?utf-8?B?0JPQtdC+0YDQs9C40Lkg0JrQuNGA0LjRh9C10L3QutC+?= Subject: Re: [tarantool-patches] Re: [PATCH v4 8/9] applier: apply transaction in parallel Date: Thu, 20 Jun 2019 23:33:50 +0300 Message-ID: <3160961.t2HJ1z8aLX@home.lan> In-Reply-To: <20190620163709.6hlxnsyekyouygde@esperanza> References: <4226262.a1cURWbef2@home.lan> <20190620163709.6hlxnsyekyouygde@esperanza> MIME-Version: 1.0 Content-Type: multipart/signed; boundary="nextPart3833433.ZIrbVNXqP8"; micalg="pgp-sha256"; protocol="application/pgp-signature" To: Vladimir Davydov Cc: tarantool-patches@freelists.org List-ID: --nextPart3833433.ZIrbVNXqP8 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset="UTF-8" On Thursday, June 20, 2019 7:37:09 PM MSK Vladimir Davydov wrote: > On Thu, Jun 20, 2019 at 10:41:10AM +0300, =D0=93=D0=B5=D0=BE=D1=80=D0=B3= =D0=B8=D0=B9 =D0=9A=D0=B8=D1=80=D0=B8=D1=87=D0=B5=D0=BD=D0=BA=D0=BE wrote: > > I'm sorry, there is proper version of the commit: > >=20 > > Applier use asynchronous transaction to batch journal writes. All > > appliers share the replicaset.applier.tx_vclock which means the vclock > > applied but not necessarily written to a journal. Appliers use a trigger > > to coordinate in case of failure - when a transaction is going to > > be rolled back. Also an applier writer condition is shared across all > > appliers and signaled in case of commit or hearth beat message. > >=20 > > Closes: #1254 > > --- > >=20 > > src/box/applier.cc | 156 +++++++++++++++++++++++++++++------------ > > src/box/applier.h | 9 ++- > > src/box/replication.cc | 7 ++ > > src/box/replication.h | 14 ++++ > > 4 files changed, 138 insertions(+), 48 deletions(-) > >=20 > > diff --git a/src/box/applier.cc b/src/box/applier.cc > > index 5a92f6109..fee49d8ca 100644 > > --- a/src/box/applier.cc > > +++ b/src/box/applier.cc > > @@ -50,6 +50,7 @@ > >=20 > > #include "schema.h" > > #include "txn.h" > > #include "box.h" > >=20 > > +#include "scoped_guard.h" > >=20 > > STRS(applier_state, applier_STATE); > >=20 > > @@ -130,11 +131,24 @@ applier_writer_f(va_list ap) > >=20 > > * replication_timeout seconds any more. > > */ > > =09 > > if (applier->version_id >=3D version_id(1, 7, 7)) > >=20 > > - fiber_cond_wait_timeout(&applier->writer_cond, > > + fiber_cond_wait_timeout(&replicaset.applier.commit_cond, > >=20 > > TIMEOUT_INFINITY); > > =09 > > else > >=20 > > - fiber_cond_wait_timeout(&applier->writer_cond, > > + fiber_cond_wait_timeout(&replicaset.applier.commit_cond, > >=20 > > replication_timeout); >=20 > Why replace applier->writer_cond with replicaset.applier.commit_cond? > This means that even if only one applier is active, we will wake up all > of the writers on each commit, which looks strange. I did it because an applier doesn't have any control of how transaction is= =20 finished except an on_commit/on_rollback trigger. However if an applier sen= ds=20 nothing to commit (for instance it could be behind others) it still should= =20 send ACK. Also I think we should update this state for any transaction=20 processed (even for local ones). >=20 > > + /* > > + * Stay 'orphan' until appliers catch up with > > + * the remote vclock at the time of SUBSCRIBE > > + * and the lag is less than configured. > > + */ > > + if (applier->state =3D=3D APPLIER_SYNC && > > + applier->lag <=3D replication_sync_lag && > > + vclock_compare(&applier->remote_vclock_at_subscribe, > > + &replicaset.vclock) <=3D 0) { > > + /* Applier is synced, switch to "follow". */ > > + applier_set_state(applier, APPLIER_FOLLOW); > > + } > > + >=20 > A writer is supposed to send ACKs, not change the applier state. > How did this wind up here? Can't we do this right from the on_commit > trigger? The same case above - if applier didn't send anything to commit (it is behi= nd=20 other applier) where is the better point to update its state. >=20 > > /* Send ACKs only when in FOLLOW mode ,*/ > > if (applier->state !=3D APPLIER_SYNC && > > =09 > > applier->state !=3D APPLIER_FOLLOW) > >=20 > > @@ -565,6 +579,36 @@ applier_read_tx(struct applier *applier, struct > > stailq > > *rows) > >=20 > > next)->row.is_commit); > > =20 > > } > >=20 > > +static void > > +sequencer_rollback_cb(struct trigger *trigger, void *event) >=20 > There's no sequencer object so the names are confusing. Let's call them > applier_on_rollback/commit? Accepted. >=20 > > +{ > > + (void) trigger; > > + (void) event; > > + diag_set(ClientError, ER_WAL_IO); > > + diag_move(&fiber()->diag, &replicaset.applier.diag); > > + trigger_run(&replicaset.applier.on_replication_fail, NULL); > > + vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock); > > +} > > + > > +static void > > +sequencer_commit_cb(struct trigger *trigger, void *event) > > +{ > > + (void) trigger; > > + (void) event; > > + fiber_cond_broadcast(&replicaset.applier.commit_cond); > > +} > > + > > +static void > > +applier_on_fail(struct trigger *trigger, void *event) > > +{ > > + (void) event; > > + struct applier *applier =3D (struct applier *)trigger->data; > > + if (!diag_is_empty(&replicaset.applier.diag)) > > + diag_add_error(&applier->diag, > > diag_last_error(&replicaset.applier.diag)); > > + fiber_cancel(applier->reader); > > + > > +} > > + > >=20 > > /** > > =20 > > * Apply all rows in the rows queue as a single transaction. > > * > >=20 > > @@ -573,6 +617,22 @@ applier_read_tx(struct applier *applier, struct > > stailq > > *rows) > >=20 > > static int > > applier_apply_tx(struct stailq *rows) > > { > >=20 > > + struct xrow_header *first_row =3D > > + &stailq_first_entry(rows, struct applier_tx_row, > > + next)->row; > > + struct replica *replica =3D replica_by_id(first_row->replica_id); > > + struct latch *latch =3D (replica ? &replica->order_latch : > > + &replicaset.applier.order_latch); > > + latch_lock(latch); > > + if (vclock_get(&replicaset.applier.net_vclock, first_row->replica_id)= =20 >=3D > > + first_row->lsn) { > > + /* Check there is a heathbeat message and wake a writers up. */ > > + if (first_row->lsn =3D=3D 0) > > + fiber_cond_broadcast(&replicaset.applier.commit_cond); >=20 > Would be better to check that before taking the latch. We don't need the > latch to reply to a heartbeat message, do we? Accepted. >=20 > > + latch_unlock(latch); > > + return 0; > > + } > > + > >=20 > > /** > > =09 > > * Explicitly begin the transaction so that we can > > * control fiber->gc life cycle and, in case of apply > >=20 > > @@ -581,8 +641,10 @@ applier_apply_tx(struct stailq *rows) > >=20 > > */ > > =09 > > struct txn *txn =3D txn_begin(); > > struct applier_tx_row *item; > >=20 > > - if (txn =3D=3D NULL) > > - diag_raise(); > > + if (txn =3D=3D NULL) { > > + latch_unlock(latch); > > + return -1; > > + } > >=20 > > stailq_foreach_entry(item, rows, next) { > > =09 > > struct xrow_header *row =3D &item->row; > > int res =3D apply_row(row); > >=20 > > @@ -623,10 +685,34 @@ applier_apply_tx(struct stailq *rows) > >=20 > > "Replication", "distributed transactions"); > > =09 > > goto rollback; > > =09 > > } > >=20 > > - return txn_commit(txn); > >=20 > > + /* We are ready to submit txn to wal. */ > > + struct trigger *on_rollback, *on_commit; > > + on_rollback =3D (struct trigger *)region_alloc(&txn->region, > > + sizeof(struct trigger)); > > + on_commit =3D (struct trigger *)region_alloc(&txn->region, > > + sizeof(struct trigger)); > > + if (on_rollback =3D=3D NULL || on_commit =3D=3D NULL) > > + goto rollback; > > + > > + trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL); > > + txn_on_rollback(txn, on_rollback); > > + > > + trigger_create(on_commit, sequencer_commit_cb, NULL, NULL); > > + txn_on_commit(txn, on_commit); > > + > > + if (txn_write(txn) < 0) > > + goto fail; > > + /* Transaction was sent to journal so promote vclock. */ > > + vclock_follow(&replicaset.applier.net_vclock, first_row->replica_id, > > + first_row->lsn); > > + latch_unlock(latch); > > + > > + return 0; > >=20 > > rollback: > > txn_rollback(txn); > >=20 > > +fail: > > + latch_unlock(latch); > >=20 > > fiber_gc(); > > return -1; > > =20 > > } > >=20 > > @@ -641,7 +727,6 @@ applier_subscribe(struct applier *applier) > >=20 > > struct ev_io *coio =3D &applier->io; > > struct ibuf *ibuf =3D &applier->ibuf; > > struct xrow_header row; > >=20 > > - struct vclock remote_vclock_at_subscribe; > >=20 > > struct tt_uuid cluster_id =3D uuid_nil; > > =09 > > struct vclock vclock; > >=20 > > @@ -668,10 +753,10 @@ applier_subscribe(struct applier *applier) > >=20 > > * the replica, and replica has to check whether > > * its and master's cluster ids match. > > */ > >=20 > > - vclock_create(&remote_vclock_at_subscribe); > > + vclock_create(&applier->remote_vclock_at_subscribe); > >=20 > > xrow_decode_subscribe_response_xc(&row, > > =09 > > &cluster_id, > >=20 > > - &remote_vclock_at_subscribe); > > + &applier- > >=20 > > >remote_vclock_at_subscribe); > > > > > /* > > =09 > > * If master didn't send us its cluster id > > * assume that it has done all the checks. > >=20 > > @@ -686,7 +771,7 @@ applier_subscribe(struct applier *applier) > >=20 > > say_info("subscribed"); > > say_info("remote vclock %s local vclock %s", > >=20 > > - vclock_to_string(&remote_vclock_at_subscribe), > > + vclock_to_string(&applier->remote_vclock_at_subscribe), > >=20 > > vclock_to_string(&vclock)); > > =09 > > } > > /* > >=20 > > @@ -735,6 +820,15 @@ applier_subscribe(struct applier *applier) > >=20 > > applier->lag =3D TIMEOUT_INFINITY; > >=20 > > + /* Register a trigger to handle replication failures. */ > > + struct trigger on_fail; > > + trigger_create(&on_fail, applier_on_fail, applier, NULL); > > + trigger_add(&replicaset.applier.on_replication_fail, &on_fail); >=20 > Why do we need on_replication_fail trigger? AFAICS it is called from > on_rollback callback. Can't we call applier_on_fail right from there, > without the use of the intermediary? Because we should cancel all appliers if anything failed (for instance an=20 applier could skip a transaction and start with the next one and then shoul= d=20 be cancelled if other applier failed to). We could track the applier list b= ut=20 I'm not sure it would be better. >=20 > > + auto trigger_guard =3D make_scoped_guard([&] { > > + trigger_clear(&on_fail); > > + }); > > + > > + > >=20 > > /* > > =09 > > * Process a stream of rows from the binary log. > > */ > >=20 > > @@ -747,47 +841,13 @@ applier_subscribe(struct applier *applier) > >=20 > > applier_set_state(applier, APPLIER_FOLLOW); > > =09 > > } > >=20 > > - /* > > - * Stay 'orphan' until appliers catch up with > > - * the remote vclock at the time of SUBSCRIBE > > - * and the lag is less than configured. > > - */ > > - if (applier->state =3D=3D APPLIER_SYNC && > > - applier->lag <=3D replication_sync_lag && > > - vclock_compare(&remote_vclock_at_subscribe, > > - &replicaset.vclock) <=3D 0) { > > - /* Applier is synced, switch to "follow". */ > > - applier_set_state(applier, APPLIER_FOLLOW); > > - } > > - > >=20 > > struct stailq rows; > > applier_read_tx(applier, &rows); > >=20 > > - struct xrow_header *first_row =3D > > - &stailq_first_entry(&rows, struct applier_tx_row, > > - next)->row; > >=20 > > applier->last_row_time =3D ev_monotonic_now(loop()); > >=20 > > - struct replica *replica =3D replica_by_id(first_row->replica_id); > > - struct latch *latch =3D (replica ? &replica->order_latch : > > - &replicaset.applier.order_latch); > > - /* > > - * In a full mesh topology, the same set of changes > > - * may arrive via two concurrently running appliers. > > - * Hence we need a latch to strictly order all changes > > - * that belong to the same server id. > > - */ >=20 > Why did you remove this comment? (You didn't move it, I checked). Accepted >=20 > > - latch_lock(latch); > > - if (vclock_get(&replicaset.vclock, first_row->replica_id) < > > - first_row->lsn && > > - applier_apply_tx(&rows) !=3D 0) { > > - latch_unlock(latch); > > + if (applier_apply_tx(&rows) !=3D 0) > >=20 > > diag_raise(); > >=20 > > - } > > - latch_unlock(latch); > >=20 > > - if (applier->state =3D=3D APPLIER_SYNC || > > - applier->state =3D=3D APPLIER_FOLLOW) > > - fiber_cond_signal(&applier->writer_cond); > >=20 > > if (ibuf_used(ibuf) =3D=3D 0) > > =09 > > ibuf_reset(ibuf); > > =09 > > fiber_gc(); > >=20 > > @@ -872,6 +932,11 @@ applier_f(va_list ap) > >=20 > > return -1; > > =09 > > } > > =09 > > } catch (FiberIsCancelled *e) { > >=20 > > + if (!diag_is_empty(&applier->diag)) { > > + diag_move(&applier->diag, &fiber()->diag); >=20 > Hmm, AFAIK we only need this diag for box.info.replication. May be, > better patch box.info.replication to use applier->diag instead of > applier->reader->diag so that we don't need to move errors around? If one applier failed then any other is going to be cancelled with the same= =20 error. As applier_subscribe uses an exception I should populate the diag to= =20 raise it. >=20 > > + applier_disconnect(applier, APPLIER_STOPPED); > > + break; > > + } > >=20 > > applier_disconnect(applier, APPLIER_OFF); > > break; > > =09 > > } catch (SocketError *e) { > >=20 > > @@ -959,7 +1024,7 @@ applier_new(const char *uri) > >=20 > > applier->last_row_time =3D ev_monotonic_now(loop()); > > rlist_create(&applier->on_state); > > fiber_cond_create(&applier->resume_cond); > >=20 > > - fiber_cond_create(&applier->writer_cond); > > + diag_create(&applier->diag); > >=20 > > return applier; > > =20 > > } > >=20 > > @@ -972,7 +1037,6 @@ applier_delete(struct applier *applier) > >=20 > > assert(applier->io.fd =3D=3D -1); > > trigger_destroy(&applier->on_state); > > fiber_cond_destroy(&applier->resume_cond); > >=20 > > - fiber_cond_destroy(&applier->writer_cond); > >=20 > > free(applier); > > =20 > > } > >=20 > > diff --git a/src/box/applier.h b/src/box/applier.h > > index 5bff90031..716da32e2 100644 > > --- a/src/box/applier.h > > +++ b/src/box/applier.h > > @@ -74,8 +74,6 @@ struct applier { > >=20 > > struct fiber *reader; > > /** Background fiber to reply with vclock */ > > struct fiber *writer; > >=20 > > - /** Writer cond. */ > > - struct fiber_cond writer_cond; > >=20 > > /** Finite-state machine */ > > enum applier_state state; > > /** Local time of this replica when the last row has been received */ > >=20 > > @@ -114,8 +112,15 @@ struct applier { > >=20 > > bool is_paused; > > /** Condition variable signaled to resume the applier. */ > > struct fiber_cond resume_cond; > >=20 > > + /* Diag to raise an error. */ > > + struct diag diag; > > + /* The masters vclock while subscribe. */ > > + struct vclock remote_vclock_at_subscribe; > >=20 > > }; > >=20 > > +void > > +applier_init(); > > + >=20 > This function isn't defined anywhere. >=20 > > diff --git a/src/box/replication.h b/src/box/replication.h > > index 8c8a9927e..a4830f5b5 100644 > > --- a/src/box/replication.h > > +++ b/src/box/replication.h > > @@ -232,6 +232,20 @@ struct replicaset { > >=20 > > * struct replica object). > > */ > > =09 > > struct latch order_latch; > >=20 > > + /* > > + * A vclock of the last transaction wich was read > > + * from an applier connection. > > + */ > > + struct vclock net_vclock; >=20 > Please elaborate. Can it be less than replicaset.vclock? Can it be > greater? Why? Let discuss it f2f. >=20 > > + /* Signaled on replicated transaction commit. */ > > + struct fiber_cond commit_cond; > > + /* > > + * Trigger to fire when replication stops in case > > + * of an error. > > + */ > > + struct rlist on_replication_fail; > > + /* Diag to populate an error acros all appliers. */ > > + struct diag diag; > >=20 > > } applier; > > /** Map of all known replica_id's to correspponding replica's. */ > > struct replica **replica_by_id; --nextPart3833433.ZIrbVNXqP8 Content-Type: application/pgp-signature; name="signature.asc" Content-Description: This is a digitally signed message part. Content-Transfer-Encoding: 7Bit -----BEGIN PGP SIGNATURE----- iQEzBAABCAAdFiEEFB+nbqWGnp59Rk9ZFSyY70x8X3sFAl0L7a4ACgkQFSyY70x8 X3u5Ngf/fS1cHqdFEOYUUjDt41kFfe+yK1v+a7+16QuD4bWxBkrQbDQgEG3e2KTI qXVaNgfZP4RTZWMngiY6dgjzd0S9M0F/BKCU0mE8ks3yUigKbZbTlVWlnVKmXgn3 s7Fb8/KbCv8nHcdzRr5gTyN8CAS5NHfrn9XUbpSLdaRa/PKryqQsa1lggRZqe15g BL230g20uTqy0aI8jJgEEx+r3N9jYMuPmDb6is1qY/USfGOeCZ5n2U9MaNGgIDvj PTupARdU3F4NsMGG8pCRd6t+lxxf6GhqF/qWE8IGLqXyPXNcvyocFUQeKM2DlCmt WEhV4aRzid5jZIyg8Zla5FM+C0a3aA== =bQJ5 -----END PGP SIGNATURE----- --nextPart3833433.ZIrbVNXqP8--