[Tarantool-patches] [PATCH v9 1/2] applier: send transaction's first row WAL time in the applier_writer_f

Serge Petrenko sergepetrenko at tarantool.org
Fri Jun 18 12:51:02 MSK 2021



17.06.2021 18:48, Cyrill Gorcunov пишет:
> Applier fiber sends current vclock of the node to remote relay reader,
> pointing current state of fetched WAL data so the relay will know which
> new data should be sent. The packet applier sends carries xrow_header::tm
> field as a zero but we can reuse it to provide information about first
> timestamp in a transaction we wrote to our WAL. Since old instances of
> Tarantool simply ignore this field such extension won't cause any
> problems.
>
> The timestamp will be needed to account lag of downstream replicas
> suitable for information purpose and cluster health monitoring.
>
> We update applier statistics in WAL callbacks but since both
> apply_synchro_row and apply_plain_tx are used not only in real data
> application but in final join stage as well (in this stage we're not
> writing the data yet) the apply_synchro_row is extended with replica_id
> argument which is non zero when applier is subscribed.
>
> The calculation of the downstream lag itself lag will be addressed
> in next patch because sending the timestamp and its observation
> are independent actions.
>
> Part-of #5447
>
> Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>

Thanks for the patch! LGTM!
> ---
>   src/box/applier.cc     | 97 +++++++++++++++++++++++++++++++++++-------
>   src/box/replication.cc |  1 +
>   src/box/replication.h  |  5 +++
>   3 files changed, 88 insertions(+), 15 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 10cea26a7..0782be513 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -163,6 +163,9 @@ applier_writer_f(va_list ap)
>   	struct ev_io io;
>   	coio_create(&io, applier->io.fd);
>   
> +	/* ID is permanent while applier is alive */
> +	uint32_t replica_id = applier->instance_id;
> +
>   	while (!fiber_is_cancelled()) {
>   		/*
>   		 * Tarantool >= 1.7.7 sends periodic heartbeat
> @@ -193,6 +196,16 @@ applier_writer_f(va_list ap)
>   			applier->has_acks_to_send = false;
>   			struct xrow_header xrow;
>   			xrow_encode_vclock(&xrow, &replicaset.vclock);
> +			/*
> +			 * For relay lag statistics we report last
> +			 * written transaction timestamp in tm field.
> +			 *
> +			 * If user delete the node from _cluster space,
> +			 * we obtain a nil pointer here.
> +			 */
> +			struct replica *r = replica_by_id(replica_id);
> +			if (likely(r != NULL))
> +				xrow.tm = r->applier_txn_last_tm;
>   			coio_write_xrow(&io, &xrow);
>   			ERROR_INJECT(ERRINJ_APPLIER_SLOW_ACK, {
>   				fiber_sleep(0.01);
> @@ -490,7 +503,7 @@ static uint64_t
>   applier_read_tx(struct applier *applier, struct stailq *rows, double timeout);
>   
>   static int
> -apply_final_join_tx(struct stailq *rows);
> +apply_final_join_tx(uint32_t replica_id, struct stailq *rows);
>   
>   /**
>    * A helper struct to link xrow objects in a list.
> @@ -535,7 +548,7 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>   						  next)->row);
>   			break;
>   		}
> -		if (apply_final_join_tx(&rows) != 0)
> +		if (apply_final_join_tx(applier->instance_id, &rows) != 0)
>   			diag_raise();
>   	}
>   
> @@ -751,11 +764,35 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
>   	return 0;
>   }
>   
> +struct replica_cb_data {
> +	/** Replica ID the data belongs to. */
> +	uint32_t replica_id;
> +	/**
> +	 * Timestamp of a transaction to be accounted
> +	 * for relay lag. Usually it is a last row in
> +	 * a transaction.
> +	 */
> +	double txn_last_tm;
> +};
> +
> +/** Update replica associated data once write is complete. */
> +static void
> +replica_txn_wal_write_cb(struct replica_cb_data *rcb)
> +{
> +	struct replica *r = replica_by_id(rcb->replica_id);
> +	if (likely(r != NULL))
> +		r->applier_txn_last_tm = rcb->txn_last_tm;
> +}
> +
>   static int
>   applier_txn_wal_write_cb(struct trigger *trigger, void *event)
>   {
> -	(void) trigger;
>   	(void) event;
> +
> +	struct replica_cb_data *rcb =
> +		(struct replica_cb_data *)trigger->data;
> +	replica_txn_wal_write_cb(rcb);
> +
>   	/* Broadcast the WAL write across all appliers. */
>   	trigger_run(&replicaset.applier.on_wal_write, NULL);
>   	return 0;
> @@ -766,6 +803,8 @@ struct synchro_entry {
>   	struct synchro_request *req;
>   	/** Fiber created the entry. To wakeup when WAL write is done. */
>   	struct fiber *owner;
> +	/** Replica associated data. */
> +	struct replica_cb_data *rcb;
>   	/**
>   	 * The base journal entry. It has unsized array and then must be the
>   	 * last entry in the structure. But can workaround it via a union
> @@ -789,6 +828,7 @@ apply_synchro_row_cb(struct journal_entry *entry)
>   	if (entry->res < 0) {
>   		applier_rollback_by_wal_io(entry->res);
>   	} else {
> +		replica_txn_wal_write_cb(synchro_entry->rcb);
>   		txn_limbo_process(&txn_limbo, synchro_entry->req);
>   		trigger_run(&replicaset.applier.on_wal_write, NULL);
>   	}
> @@ -797,7 +837,7 @@ apply_synchro_row_cb(struct journal_entry *entry)
>   
>   /** Process a synchro request. */
>   static int
> -apply_synchro_row(struct xrow_header *row)
> +apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
>   {
>   	assert(iproto_type_is_synchro_request(row->type));
>   
> @@ -805,6 +845,7 @@ apply_synchro_row(struct xrow_header *row)
>   	if (xrow_decode_synchro(row, &req) != 0)
>   		goto err;
>   
> +	struct replica_cb_data rcb_data;
>   	struct synchro_entry entry;
>   	/*
>   	 * Rows array is cast from *[] to **, because otherwise g++ complains
> @@ -817,6 +858,11 @@ apply_synchro_row(struct xrow_header *row)
>   			     apply_synchro_row_cb, &entry);
>   	entry.req = &req;
>   	entry.owner = fiber();
> +
> +	rcb_data.replica_id = replica_id;
> +	rcb_data.txn_last_tm = row->tm;
> +	entry.rcb = &rcb_data;
> +
>   	/*
>   	 * The WAL write is blocking. Otherwise it might happen that a CONFIRM
>   	 * or ROLLBACK is sent to WAL, and it would empty the limbo, but before
> @@ -864,8 +910,9 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
>   	return box_raft_process(&req, applier->instance_id);
>   }
>   
> -static inline int
> -apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
> +static int
> +apply_plain_tx(uint32_t replica_id, struct stailq *rows,
> +	       bool skip_conflict, bool use_triggers)
>   {
>   	/*
>   	 * Explicitly begin the transaction so that we can
> @@ -933,10 +980,28 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
>   			goto fail;
>   		}
>   
> +		struct replica_cb_data *rcb;
> +		rcb = region_alloc_object(&txn->region, typeof(*rcb), &size);
> +		if (rcb == NULL) {
> +			diag_set(OutOfMemory, size, "region_alloc_object", "rcb");
> +			goto fail;
> +		}
> +
>   		trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
>   		txn_on_rollback(txn, on_rollback);
>   
> -		trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
> +		/*
> +		 * We use *last* entry timestamp because ack comes up to
> +		 * last entry in transaction. Same time this shows more
> +		 * precise result because we're interested in how long
> +		 * transaction traversed network + remote WAL bundle before
> +		 * ack get received.
> +		 */
> +		item = stailq_last_entry(rows, struct applier_tx_row, next);
> +		rcb->replica_id = replica_id;
> +		rcb->txn_last_tm = item->row.tm;
> +
> +		trigger_create(on_wal_write, applier_txn_wal_write_cb, rcb, NULL);
>   		txn_on_wal_write(txn, on_wal_write);
>   	}
>   
> @@ -948,7 +1013,7 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
>   
>   /** A simpler version of applier_apply_tx() for final join stage. */
>   static int
> -apply_final_join_tx(struct stailq *rows)
> +apply_final_join_tx(uint32_t replica_id, struct stailq *rows)
>   {
>   	struct xrow_header *first_row =
>   		&stailq_first_entry(rows, struct applier_tx_row, next)->row;
> @@ -959,9 +1024,9 @@ apply_final_join_tx(struct stailq *rows)
>   	vclock_follow_xrow(&replicaset.vclock, last_row);
>   	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
>   		assert(first_row == last_row);
> -		rc = apply_synchro_row(first_row);
> +		rc = apply_synchro_row(replica_id, first_row);
>   	} else {
> -		rc = apply_plain_tx(rows, false, false);
> +		rc = apply_plain_tx(replica_id, rows, false, false);
>   	}
>   	fiber_gc();
>   	return rc;
> @@ -1090,12 +1155,14 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>   		 * each other.
>   		 */
>   		assert(first_row == last_row);
> -		if ((rc = apply_synchro_row(first_row)) != 0)
> -			goto finish;
> -	} else if ((rc = apply_plain_tx(rows, replication_skip_conflict,
> -					true)) != 0) {
> -		goto finish;
> +		rc = apply_synchro_row(applier->instance_id, first_row);
> +	} else {
> +		rc = apply_plain_tx(applier->instance_id, rows,
> +				    replication_skip_conflict, true);
>   	}
> +	if (rc != 0)
> +		goto finish;
> +
>   	vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
>   		      last_row->lsn);
>   finish:
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index 903390686..a0b3e0186 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -184,6 +184,7 @@ replica_new(void)
>   	trigger_create(&replica->on_applier_state,
>   		       replica_on_applier_state_f, NULL, NULL);
>   	replica->applier_sync_state = APPLIER_DISCONNECTED;
> +	replica->applier_txn_last_tm = 0;
>   	latch_create(&replica->order_latch);
>   	return replica;
>   }
> diff --git a/src/box/replication.h b/src/box/replication.h
> index 5cc380373..57e0f10ae 100644
> --- a/src/box/replication.h
> +++ b/src/box/replication.h
> @@ -331,6 +331,11 @@ struct replica {
>   	 * separate from applier.
>   	 */
>   	enum applier_state applier_sync_state;
> +	/**
> +	 * Applier's last written to WAL transaction timestamp.
> +	 * Needed for relay lagging statistics.
> +	 */
> +	double applier_txn_last_tm;
>   	/* The latch is used to order replication requests. */
>   	struct latch order_latch;
>   };

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list