From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: Cyrill Gorcunov <gorcunov@gmail.com>, tml <tarantool-patches@dev.tarantool.org> Cc: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> Subject: Re: [Tarantool-patches] [PATCH v9 1/2] applier: send transaction's first row WAL time in the applier_writer_f Date: Fri, 18 Jun 2021 12:51:02 +0300 [thread overview] Message-ID: <3ebc9966-6c9f-5206-4cd4-1fff97530709@tarantool.org> (raw) In-Reply-To: <20210617154835.315576-2-gorcunov@gmail.com> 17.06.2021 18:48, Cyrill Gorcunov пишет: > Applier fiber sends current vclock of the node to remote relay reader, > pointing current state of fetched WAL data so the relay will know which > new data should be sent. The packet applier sends carries xrow_header::tm > field as a zero but we can reuse it to provide information about first > timestamp in a transaction we wrote to our WAL. Since old instances of > Tarantool simply ignore this field such extension won't cause any > problems. > > The timestamp will be needed to account lag of downstream replicas > suitable for information purpose and cluster health monitoring. > > We update applier statistics in WAL callbacks but since both > apply_synchro_row and apply_plain_tx are used not only in real data > application but in final join stage as well (in this stage we're not > writing the data yet) the apply_synchro_row is extended with replica_id > argument which is non zero when applier is subscribed. > > The calculation of the downstream lag itself lag will be addressed > in next patch because sending the timestamp and its observation > are independent actions. > > Part-of #5447 > > Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> Thanks for the patch! LGTM! > --- > src/box/applier.cc | 97 +++++++++++++++++++++++++++++++++++------- > src/box/replication.cc | 1 + > src/box/replication.h | 5 +++ > 3 files changed, 88 insertions(+), 15 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 10cea26a7..0782be513 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -163,6 +163,9 @@ applier_writer_f(va_list ap) > struct ev_io io; > coio_create(&io, applier->io.fd); > > + /* ID is permanent while applier is alive */ > + uint32_t replica_id = applier->instance_id; > + > while (!fiber_is_cancelled()) { > /* > * Tarantool >= 1.7.7 sends periodic heartbeat > @@ -193,6 +196,16 @@ applier_writer_f(va_list ap) > applier->has_acks_to_send = false; > struct xrow_header xrow; > xrow_encode_vclock(&xrow, &replicaset.vclock); > + /* > + * For relay lag statistics we report last > + * written transaction timestamp in tm field. > + * > + * If user delete the node from _cluster space, > + * we obtain a nil pointer here. > + */ > + struct replica *r = replica_by_id(replica_id); > + if (likely(r != NULL)) > + xrow.tm = r->applier_txn_last_tm; > coio_write_xrow(&io, &xrow); > ERROR_INJECT(ERRINJ_APPLIER_SLOW_ACK, { > fiber_sleep(0.01); > @@ -490,7 +503,7 @@ static uint64_t > applier_read_tx(struct applier *applier, struct stailq *rows, double timeout); > > static int > -apply_final_join_tx(struct stailq *rows); > +apply_final_join_tx(uint32_t replica_id, struct stailq *rows); > > /** > * A helper struct to link xrow objects in a list. > @@ -535,7 +548,7 @@ applier_wait_register(struct applier *applier, uint64_t row_count) > next)->row); > break; > } > - if (apply_final_join_tx(&rows) != 0) > + if (apply_final_join_tx(applier->instance_id, &rows) != 0) > diag_raise(); > } > > @@ -751,11 +764,35 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event) > return 0; > } > > +struct replica_cb_data { > + /** Replica ID the data belongs to. */ > + uint32_t replica_id; > + /** > + * Timestamp of a transaction to be accounted > + * for relay lag. Usually it is a last row in > + * a transaction. > + */ > + double txn_last_tm; > +}; > + > +/** Update replica associated data once write is complete. */ > +static void > +replica_txn_wal_write_cb(struct replica_cb_data *rcb) > +{ > + struct replica *r = replica_by_id(rcb->replica_id); > + if (likely(r != NULL)) > + r->applier_txn_last_tm = rcb->txn_last_tm; > +} > + > static int > applier_txn_wal_write_cb(struct trigger *trigger, void *event) > { > - (void) trigger; > (void) event; > + > + struct replica_cb_data *rcb = > + (struct replica_cb_data *)trigger->data; > + replica_txn_wal_write_cb(rcb); > + > /* Broadcast the WAL write across all appliers. */ > trigger_run(&replicaset.applier.on_wal_write, NULL); > return 0; > @@ -766,6 +803,8 @@ struct synchro_entry { > struct synchro_request *req; > /** Fiber created the entry. To wakeup when WAL write is done. */ > struct fiber *owner; > + /** Replica associated data. */ > + struct replica_cb_data *rcb; > /** > * The base journal entry. It has unsized array and then must be the > * last entry in the structure. But can workaround it via a union > @@ -789,6 +828,7 @@ apply_synchro_row_cb(struct journal_entry *entry) > if (entry->res < 0) { > applier_rollback_by_wal_io(entry->res); > } else { > + replica_txn_wal_write_cb(synchro_entry->rcb); > txn_limbo_process(&txn_limbo, synchro_entry->req); > trigger_run(&replicaset.applier.on_wal_write, NULL); > } > @@ -797,7 +837,7 @@ apply_synchro_row_cb(struct journal_entry *entry) > > /** Process a synchro request. */ > static int > -apply_synchro_row(struct xrow_header *row) > +apply_synchro_row(uint32_t replica_id, struct xrow_header *row) > { > assert(iproto_type_is_synchro_request(row->type)); > > @@ -805,6 +845,7 @@ apply_synchro_row(struct xrow_header *row) > if (xrow_decode_synchro(row, &req) != 0) > goto err; > > + struct replica_cb_data rcb_data; > struct synchro_entry entry; > /* > * Rows array is cast from *[] to **, because otherwise g++ complains > @@ -817,6 +858,11 @@ apply_synchro_row(struct xrow_header *row) > apply_synchro_row_cb, &entry); > entry.req = &req; > entry.owner = fiber(); > + > + rcb_data.replica_id = replica_id; > + rcb_data.txn_last_tm = row->tm; > + entry.rcb = &rcb_data; > + > /* > * The WAL write is blocking. Otherwise it might happen that a CONFIRM > * or ROLLBACK is sent to WAL, and it would empty the limbo, but before > @@ -864,8 +910,9 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row) > return box_raft_process(&req, applier->instance_id); > } > > -static inline int > -apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) > +static int > +apply_plain_tx(uint32_t replica_id, struct stailq *rows, > + bool skip_conflict, bool use_triggers) > { > /* > * Explicitly begin the transaction so that we can > @@ -933,10 +980,28 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) > goto fail; > } > > + struct replica_cb_data *rcb; > + rcb = region_alloc_object(&txn->region, typeof(*rcb), &size); > + if (rcb == NULL) { > + diag_set(OutOfMemory, size, "region_alloc_object", "rcb"); > + goto fail; > + } > + > trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL); > txn_on_rollback(txn, on_rollback); > > - trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL); > + /* > + * We use *last* entry timestamp because ack comes up to > + * last entry in transaction. Same time this shows more > + * precise result because we're interested in how long > + * transaction traversed network + remote WAL bundle before > + * ack get received. > + */ > + item = stailq_last_entry(rows, struct applier_tx_row, next); > + rcb->replica_id = replica_id; > + rcb->txn_last_tm = item->row.tm; > + > + trigger_create(on_wal_write, applier_txn_wal_write_cb, rcb, NULL); > txn_on_wal_write(txn, on_wal_write); > } > > @@ -948,7 +1013,7 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) > > /** A simpler version of applier_apply_tx() for final join stage. */ > static int > -apply_final_join_tx(struct stailq *rows) > +apply_final_join_tx(uint32_t replica_id, struct stailq *rows) > { > struct xrow_header *first_row = > &stailq_first_entry(rows, struct applier_tx_row, next)->row; > @@ -959,9 +1024,9 @@ apply_final_join_tx(struct stailq *rows) > vclock_follow_xrow(&replicaset.vclock, last_row); > if (unlikely(iproto_type_is_synchro_request(first_row->type))) { > assert(first_row == last_row); > - rc = apply_synchro_row(first_row); > + rc = apply_synchro_row(replica_id, first_row); > } else { > - rc = apply_plain_tx(rows, false, false); > + rc = apply_plain_tx(replica_id, rows, false, false); > } > fiber_gc(); > return rc; > @@ -1090,12 +1155,14 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) > * each other. > */ > assert(first_row == last_row); > - if ((rc = apply_synchro_row(first_row)) != 0) > - goto finish; > - } else if ((rc = apply_plain_tx(rows, replication_skip_conflict, > - true)) != 0) { > - goto finish; > + rc = apply_synchro_row(applier->instance_id, first_row); > + } else { > + rc = apply_plain_tx(applier->instance_id, rows, > + replication_skip_conflict, true); > } > + if (rc != 0) > + goto finish; > + > vclock_follow(&replicaset.applier.vclock, last_row->replica_id, > last_row->lsn); > finish: > diff --git a/src/box/replication.cc b/src/box/replication.cc > index 903390686..a0b3e0186 100644 > --- a/src/box/replication.cc > +++ b/src/box/replication.cc > @@ -184,6 +184,7 @@ replica_new(void) > trigger_create(&replica->on_applier_state, > replica_on_applier_state_f, NULL, NULL); > replica->applier_sync_state = APPLIER_DISCONNECTED; > + replica->applier_txn_last_tm = 0; > latch_create(&replica->order_latch); > return replica; > } > diff --git a/src/box/replication.h b/src/box/replication.h > index 5cc380373..57e0f10ae 100644 > --- a/src/box/replication.h > +++ b/src/box/replication.h > @@ -331,6 +331,11 @@ struct replica { > * separate from applier. > */ > enum applier_state applier_sync_state; > + /** > + * Applier's last written to WAL transaction timestamp. > + * Needed for relay lagging statistics. > + */ > + double applier_txn_last_tm; > /* The latch is used to order replication requests. */ > struct latch order_latch; > }; -- Serge Petrenko
next prev parent reply other threads:[~2021-06-18 9:51 UTC|newest] Thread overview: 11+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-06-17 15:48 [Tarantool-patches] [PATCH v9 0/2] relay: provide downstream lag information Cyrill Gorcunov via Tarantool-patches 2021-06-17 15:48 ` [Tarantool-patches] [PATCH v9 1/2] applier: send transaction's first row WAL time in the applier_writer_f Cyrill Gorcunov via Tarantool-patches 2021-06-18 9:51 ` Serge Petrenko via Tarantool-patches [this message] 2021-06-18 18:06 ` Cyrill Gorcunov via Tarantool-patches 2021-06-21 8:35 ` Serge Petrenko via Tarantool-patches 2021-06-17 15:48 ` [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag Cyrill Gorcunov via Tarantool-patches 2021-06-18 9:50 ` Serge Petrenko via Tarantool-patches 2021-06-20 14:37 ` Vladislav Shpilevoy via Tarantool-patches 2021-06-21 8:44 ` Cyrill Gorcunov via Tarantool-patches 2021-06-21 16:17 ` Cyrill Gorcunov via Tarantool-patches 2021-06-21 21:16 ` Vladislav Shpilevoy via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=3ebc9966-6c9f-5206-4cd4-1fff97530709@tarantool.org \ --to=tarantool-patches@dev.tarantool.org \ --cc=gorcunov@gmail.com \ --cc=sergepetrenko@tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v9 1/2] applier: send transaction'\''s first row WAL time in the applier_writer_f' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox