[Tarantool-patches] [PATCH v9 1/2] applier: send transaction's first row WAL time in the applier_writer_f
Serge Petrenko
sergepetrenko at tarantool.org
Fri Jun 18 12:51:02 MSK 2021
17.06.2021 18:48, Cyrill Gorcunov пишет:
> Applier fiber sends current vclock of the node to remote relay reader,
> pointing current state of fetched WAL data so the relay will know which
> new data should be sent. The packet applier sends carries xrow_header::tm
> field as a zero but we can reuse it to provide information about first
> timestamp in a transaction we wrote to our WAL. Since old instances of
> Tarantool simply ignore this field such extension won't cause any
> problems.
>
> The timestamp will be needed to account lag of downstream replicas
> suitable for information purpose and cluster health monitoring.
>
> We update applier statistics in WAL callbacks but since both
> apply_synchro_row and apply_plain_tx are used not only in real data
> application but in final join stage as well (in this stage we're not
> writing the data yet) the apply_synchro_row is extended with replica_id
> argument which is non zero when applier is subscribed.
>
> The calculation of the downstream lag itself lag will be addressed
> in next patch because sending the timestamp and its observation
> are independent actions.
>
> Part-of #5447
>
> Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
Thanks for the patch! LGTM!
> ---
> src/box/applier.cc | 97 +++++++++++++++++++++++++++++++++++-------
> src/box/replication.cc | 1 +
> src/box/replication.h | 5 +++
> 3 files changed, 88 insertions(+), 15 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 10cea26a7..0782be513 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -163,6 +163,9 @@ applier_writer_f(va_list ap)
> struct ev_io io;
> coio_create(&io, applier->io.fd);
>
> + /* ID is permanent while applier is alive */
> + uint32_t replica_id = applier->instance_id;
> +
> while (!fiber_is_cancelled()) {
> /*
> * Tarantool >= 1.7.7 sends periodic heartbeat
> @@ -193,6 +196,16 @@ applier_writer_f(va_list ap)
> applier->has_acks_to_send = false;
> struct xrow_header xrow;
> xrow_encode_vclock(&xrow, &replicaset.vclock);
> + /*
> + * For relay lag statistics we report last
> + * written transaction timestamp in tm field.
> + *
> + * If user delete the node from _cluster space,
> + * we obtain a nil pointer here.
> + */
> + struct replica *r = replica_by_id(replica_id);
> + if (likely(r != NULL))
> + xrow.tm = r->applier_txn_last_tm;
> coio_write_xrow(&io, &xrow);
> ERROR_INJECT(ERRINJ_APPLIER_SLOW_ACK, {
> fiber_sleep(0.01);
> @@ -490,7 +503,7 @@ static uint64_t
> applier_read_tx(struct applier *applier, struct stailq *rows, double timeout);
>
> static int
> -apply_final_join_tx(struct stailq *rows);
> +apply_final_join_tx(uint32_t replica_id, struct stailq *rows);
>
> /**
> * A helper struct to link xrow objects in a list.
> @@ -535,7 +548,7 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
> next)->row);
> break;
> }
> - if (apply_final_join_tx(&rows) != 0)
> + if (apply_final_join_tx(applier->instance_id, &rows) != 0)
> diag_raise();
> }
>
> @@ -751,11 +764,35 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
> return 0;
> }
>
> +struct replica_cb_data {
> + /** Replica ID the data belongs to. */
> + uint32_t replica_id;
> + /**
> + * Timestamp of a transaction to be accounted
> + * for relay lag. Usually it is a last row in
> + * a transaction.
> + */
> + double txn_last_tm;
> +};
> +
> +/** Update replica associated data once write is complete. */
> +static void
> +replica_txn_wal_write_cb(struct replica_cb_data *rcb)
> +{
> + struct replica *r = replica_by_id(rcb->replica_id);
> + if (likely(r != NULL))
> + r->applier_txn_last_tm = rcb->txn_last_tm;
> +}
> +
> static int
> applier_txn_wal_write_cb(struct trigger *trigger, void *event)
> {
> - (void) trigger;
> (void) event;
> +
> + struct replica_cb_data *rcb =
> + (struct replica_cb_data *)trigger->data;
> + replica_txn_wal_write_cb(rcb);
> +
> /* Broadcast the WAL write across all appliers. */
> trigger_run(&replicaset.applier.on_wal_write, NULL);
> return 0;
> @@ -766,6 +803,8 @@ struct synchro_entry {
> struct synchro_request *req;
> /** Fiber created the entry. To wakeup when WAL write is done. */
> struct fiber *owner;
> + /** Replica associated data. */
> + struct replica_cb_data *rcb;
> /**
> * The base journal entry. It has unsized array and then must be the
> * last entry in the structure. But can workaround it via a union
> @@ -789,6 +828,7 @@ apply_synchro_row_cb(struct journal_entry *entry)
> if (entry->res < 0) {
> applier_rollback_by_wal_io(entry->res);
> } else {
> + replica_txn_wal_write_cb(synchro_entry->rcb);
> txn_limbo_process(&txn_limbo, synchro_entry->req);
> trigger_run(&replicaset.applier.on_wal_write, NULL);
> }
> @@ -797,7 +837,7 @@ apply_synchro_row_cb(struct journal_entry *entry)
>
> /** Process a synchro request. */
> static int
> -apply_synchro_row(struct xrow_header *row)
> +apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
> {
> assert(iproto_type_is_synchro_request(row->type));
>
> @@ -805,6 +845,7 @@ apply_synchro_row(struct xrow_header *row)
> if (xrow_decode_synchro(row, &req) != 0)
> goto err;
>
> + struct replica_cb_data rcb_data;
> struct synchro_entry entry;
> /*
> * Rows array is cast from *[] to **, because otherwise g++ complains
> @@ -817,6 +858,11 @@ apply_synchro_row(struct xrow_header *row)
> apply_synchro_row_cb, &entry);
> entry.req = &req;
> entry.owner = fiber();
> +
> + rcb_data.replica_id = replica_id;
> + rcb_data.txn_last_tm = row->tm;
> + entry.rcb = &rcb_data;
> +
> /*
> * The WAL write is blocking. Otherwise it might happen that a CONFIRM
> * or ROLLBACK is sent to WAL, and it would empty the limbo, but before
> @@ -864,8 +910,9 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
> return box_raft_process(&req, applier->instance_id);
> }
>
> -static inline int
> -apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
> +static int
> +apply_plain_tx(uint32_t replica_id, struct stailq *rows,
> + bool skip_conflict, bool use_triggers)
> {
> /*
> * Explicitly begin the transaction so that we can
> @@ -933,10 +980,28 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
> goto fail;
> }
>
> + struct replica_cb_data *rcb;
> + rcb = region_alloc_object(&txn->region, typeof(*rcb), &size);
> + if (rcb == NULL) {
> + diag_set(OutOfMemory, size, "region_alloc_object", "rcb");
> + goto fail;
> + }
> +
> trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
> txn_on_rollback(txn, on_rollback);
>
> - trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
> + /*
> + * We use *last* entry timestamp because ack comes up to
> + * last entry in transaction. Same time this shows more
> + * precise result because we're interested in how long
> + * transaction traversed network + remote WAL bundle before
> + * ack get received.
> + */
> + item = stailq_last_entry(rows, struct applier_tx_row, next);
> + rcb->replica_id = replica_id;
> + rcb->txn_last_tm = item->row.tm;
> +
> + trigger_create(on_wal_write, applier_txn_wal_write_cb, rcb, NULL);
> txn_on_wal_write(txn, on_wal_write);
> }
>
> @@ -948,7 +1013,7 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
>
> /** A simpler version of applier_apply_tx() for final join stage. */
> static int
> -apply_final_join_tx(struct stailq *rows)
> +apply_final_join_tx(uint32_t replica_id, struct stailq *rows)
> {
> struct xrow_header *first_row =
> &stailq_first_entry(rows, struct applier_tx_row, next)->row;
> @@ -959,9 +1024,9 @@ apply_final_join_tx(struct stailq *rows)
> vclock_follow_xrow(&replicaset.vclock, last_row);
> if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
> assert(first_row == last_row);
> - rc = apply_synchro_row(first_row);
> + rc = apply_synchro_row(replica_id, first_row);
> } else {
> - rc = apply_plain_tx(rows, false, false);
> + rc = apply_plain_tx(replica_id, rows, false, false);
> }
> fiber_gc();
> return rc;
> @@ -1090,12 +1155,14 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
> * each other.
> */
> assert(first_row == last_row);
> - if ((rc = apply_synchro_row(first_row)) != 0)
> - goto finish;
> - } else if ((rc = apply_plain_tx(rows, replication_skip_conflict,
> - true)) != 0) {
> - goto finish;
> + rc = apply_synchro_row(applier->instance_id, first_row);
> + } else {
> + rc = apply_plain_tx(applier->instance_id, rows,
> + replication_skip_conflict, true);
> }
> + if (rc != 0)
> + goto finish;
> +
> vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
> last_row->lsn);
> finish:
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index 903390686..a0b3e0186 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -184,6 +184,7 @@ replica_new(void)
> trigger_create(&replica->on_applier_state,
> replica_on_applier_state_f, NULL, NULL);
> replica->applier_sync_state = APPLIER_DISCONNECTED;
> + replica->applier_txn_last_tm = 0;
> latch_create(&replica->order_latch);
> return replica;
> }
> diff --git a/src/box/replication.h b/src/box/replication.h
> index 5cc380373..57e0f10ae 100644
> --- a/src/box/replication.h
> +++ b/src/box/replication.h
> @@ -331,6 +331,11 @@ struct replica {
> * separate from applier.
> */
> enum applier_state applier_sync_state;
> + /**
> + * Applier's last written to WAL transaction timestamp.
> + * Needed for relay lagging statistics.
> + */
> + double applier_txn_last_tm;
> /* The latch is used to order replication requests. */
> struct latch order_latch;
> };
--
Serge Petrenko
More information about the Tarantool-patches
mailing list