[Tarantool-patches] [RFC v3 2/3] applier: send first row's WAL time in the applier_writer_f

Cyrill Gorcunov gorcunov at gmail.com
Fri Apr 30 18:39:39 MSK 2021


This fibers sends current vclock of the node to remote relay reader.
This packet carries xrow_header::tm field as a zero but we can reuse
it to provide information about first timestamp in a transaction we
wrote to our WAL. Since old instances of Tarantool simply ignore this
field such extension won't cause any problems.

This timestamp will be needed to account lags of downstream replicas
suitable for information purpose and cluster health monitoring.

We update applier statistics in WAL callbacks but since both
apply_synchro_row and apply_plain_tx are used not only in real data
application but in final join stage as well (in this stage we're not
yet writing the data) the apply_synchro_row is extended with a flag
pointing that applier update is needed.

Same time the apply_plain_tx uses asynchronous WAL write completion
and at moment when the write procedure is finished the applier might
be removed from replicaset already thus we use applier's instance
to lookup if it is still alive.

The calculation of the downstream lag itself lag will be addressed
in next patch because sending the timestamp and its observation
are independent actions.

Part-of #5447

Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
---
 src/box/applier.cc | 84 ++++++++++++++++++++++++++++++++++++++--------
 src/box/applier.h  |  5 +++
 2 files changed, 75 insertions(+), 14 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 33181fdbf..626dc0324 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -192,7 +192,8 @@ applier_writer_f(va_list ap)
 		try {
 			applier->has_acks_to_send = false;
 			struct xrow_header xrow;
-			xrow_encode_vclock(&xrow, &replicaset.vclock);
+			xrow_encode_vclock_timed(&xrow, &replicaset.vclock,
+						 applier->first_row_wal_time);
 			coio_write_xrow(&io, &xrow);
 			ERROR_INJECT(ERRINJ_APPLIER_SLOW_ACK, {
 				fiber_sleep(0.01);
@@ -490,7 +491,7 @@ static uint64_t
 applier_read_tx(struct applier *applier, struct stailq *rows, double timeout);
 
 static int
-apply_final_join_tx(struct stailq *rows);
+apply_final_join_tx(struct applier *applier, struct stailq *rows);
 
 /**
  * A helper struct to link xrow objects in a list.
@@ -535,7 +536,7 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
 						  next)->row);
 			break;
 		}
-		if (apply_final_join_tx(&rows) != 0)
+		if (apply_final_join_tx(applier, &rows) != 0)
 			diag_raise();
 	}
 
@@ -751,11 +752,41 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
 	return 0;
 }
 
+/** Applier WAL related statistics */
+struct awstat {
+	uint32_t instance_id;
+	double first_row_tm;
+};
+
+static void
+awstat_update(struct awstat *awstat)
+{
+	/* Ignore if not needed */
+	if (awstat->instance_id == 0)
+		return;
+
+	/*
+	 * Write to WAL happens in two contexts: as
+	 * synchronous writes and as asynchronous. In
+	 * second case the applier might be already
+	 * stopped and removed.
+	 */
+	struct replica *r = replica_by_id(awstat->instance_id);
+	if (r == NULL && r->applier == NULL)
+		return;
+
+	r->applier->first_row_wal_time = awstat->first_row_tm;
+}
+
 static int
 applier_txn_wal_write_cb(struct trigger *trigger, void *event)
 {
 	(void) trigger;
 	(void) event;
+
+	struct awstat *awstat = (struct awstat *)trigger->data;
+	awstat_update(awstat);
+
 	/* Broadcast the WAL write across all appliers. */
 	trigger_run(&replicaset.applier.on_wal_write, NULL);
 	return 0;
@@ -766,6 +797,8 @@ struct synchro_entry {
 	struct synchro_request *req;
 	/** Fiber created the entry. To wakeup when WAL write is done. */
 	struct fiber *owner;
+	/** WAL bound statistics. */
+	struct awstat awstat;
 	/**
 	 * The base journal entry. It has unsized array and then must be the
 	 * last entry in the structure. But can workaround it via a union
@@ -789,6 +822,7 @@ apply_synchro_row_cb(struct journal_entry *entry)
 	if (entry->res < 0) {
 		applier_rollback_by_wal_io();
 	} else {
+		awstat_update(&synchro_entry->awstat);
 		txn_limbo_process(&txn_limbo, synchro_entry->req);
 		trigger_run(&replicaset.applier.on_wal_write, NULL);
 	}
@@ -797,7 +831,8 @@ apply_synchro_row_cb(struct journal_entry *entry)
 
 /** Process a synchro request. */
 static int
-apply_synchro_row(struct xrow_header *row)
+apply_synchro_row(struct applier *applier, struct xrow_header *row,
+		  bool use_awstat)
 {
 	assert(iproto_type_is_synchro_request(row->type));
 
@@ -817,6 +852,12 @@ apply_synchro_row(struct xrow_header *row)
 			     apply_synchro_row_cb, &entry);
 	entry.req = &req;
 	entry.owner = fiber();
+	if (use_awstat) {
+		entry.awstat.instance_id = applier->instance_id;
+		entry.awstat.first_row_tm = row->tm;
+	} else {
+		entry.awstat.instance_id = 0;
+	}
 	/*
 	 * The WAL write is blocking. Otherwise it might happen that a CONFIRM
 	 * or ROLLBACK is sent to WAL, and it would empty the limbo, but before
@@ -862,8 +903,9 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
 	return box_raft_process(&req, applier->instance_id);
 }
 
-static inline int
-apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
+static int
+apply_plain_tx(struct applier *applier, struct stailq *rows,
+	       bool skip_conflict, bool use_triggers)
 {
 	/*
 	 * Explicitly begin the transaction so that we can
@@ -931,10 +973,21 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
 			goto fail;
 		}
 
+		struct awstat *awstat;
+		awstat = region_alloc_object(&txn->region, typeof(*awstat), &size);
+		if (awstat == NULL) {
+			diag_set(OutOfMemory, size, "region_alloc_object", "awstat");
+			goto fail;
+		}
+
 		trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
 		txn_on_rollback(txn, on_rollback);
 
-		trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
+		item = stailq_first_entry(rows, struct applier_tx_row, next);
+		awstat->instance_id = applier->instance_id;
+		awstat->first_row_tm = item->row.tm;
+
+		trigger_create(on_wal_write, applier_txn_wal_write_cb, awstat, NULL);
 		txn_on_wal_write(txn, on_wal_write);
 	}
 
@@ -946,7 +999,7 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
 
 /** A simpler version of applier_apply_tx() for final join stage. */
 static int
-apply_final_join_tx(struct stailq *rows)
+apply_final_join_tx(struct applier *applier, struct stailq *rows)
 {
 	struct xrow_header *first_row =
 		&stailq_first_entry(rows, struct applier_tx_row, next)->row;
@@ -957,9 +1010,9 @@ apply_final_join_tx(struct stailq *rows)
 	vclock_follow_xrow(&replicaset.vclock, last_row);
 	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
 		assert(first_row == last_row);
-		rc = apply_synchro_row(first_row);
+		rc = apply_synchro_row(applier, first_row, false);
 	} else {
-		rc = apply_plain_tx(rows, false, false);
+		rc = apply_plain_tx(applier, rows, false, false);
 	}
 	fiber_gc();
 	return rc;
@@ -1088,11 +1141,14 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
 		 * each other.
 		 */
 		assert(first_row == last_row);
-		if ((rc = apply_synchro_row(first_row)) != 0)
+		rc = apply_synchro_row(applier, first_row, true);
+		if (rc != 0)
+			goto finish;
+	} else {
+		rc = apply_plain_tx(applier, rows,
+				    replication_skip_conflict, true);
+		if (rc != 0)
 			goto finish;
-	} else if ((rc = apply_plain_tx(rows, replication_skip_conflict,
-					true)) != 0) {
-		goto finish;
 	}
 	vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
 		      last_row->lsn);
diff --git a/src/box/applier.h b/src/box/applier.h
index 15ca1fcfd..bd98827e7 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -93,6 +93,11 @@ struct applier {
 	ev_tstamp last_row_time;
 	/** Number of seconds this replica is behind the remote master */
 	ev_tstamp lag;
+	/**
+	 * WAL time of first applied row in a transaction.
+	 * For relay statistics sake.
+	 */
+	double first_row_wal_time;
 	/** The last box_error_code() logged to avoid log flooding */
 	uint32_t last_logged_errcode;
 	/** Remote instance ID. */
-- 
2.30.2



More information about the Tarantool-patches mailing list