Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v9 0/2] relay: provide downstream lag information
@ 2021-06-17 15:48 Cyrill Gorcunov via Tarantool-patches
  2021-06-17 15:48 ` [Tarantool-patches] [PATCH v9 1/2] applier: send transaction's first row WAL time in the applier_writer_f Cyrill Gorcunov via Tarantool-patches
  2021-06-17 15:48 ` [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag Cyrill Gorcunov via Tarantool-patches
  0 siblings, 2 replies; 11+ messages in thread
From: Cyrill Gorcunov via Tarantool-patches @ 2021-06-17 15:48 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

Guys, take a look once time permit, hopefully manage to address
all comments. Previous series at

https://lists.tarantool.org/tarantool-patches/20210607155519.109626-1-gorcunov@gmail.com/

v4 (by Vlad):
 - add a test case
 - add docbot request
 - dropped off xrow_encode_vclock_timed, we use opencoded assignment
   for tm value when send ack
 - struct awstat renamed to applier_wal_stat. Vlad I think this is
   better name than "applier_lag" because this is statistics on WAL,
   we simply track remote WAL propagation here, so more general name
   is better for grep sake and for future extensions
 - instead of passing applier structure we pass replica_id
 - the real keeper of this statistics comes into "replica" structure
   thus unbound of applier itself
 - for synchro entries we pass a pointer to the applier_wal_stat instead
   of using replica_id = 0 as a sign that we don't need to update statistics
   for initial and final join cases
 - to write and read statistics we provide wal_stat_update and wal_stat_ack
   helpers to cover the case where single ACK spans several transactions

v8:
 - make one branch less in apply_synchro_row()
 - keep applier_txn_start_tm inside replica stucture
 - rename wal_stat to replica_cb_data since this is more
   logical for case where we have no general stat engine
 - make applier to send timestamp so that relay will compute
   delta upon the read, the lag is kept permanently until new
   write happens
 - extend doc and changelog a bit
 - keep reading of relay's lag from TX thread without any modifications
   because relay get deleted from TX thread and set to non-RELAY_FOLLOW
   state, thus any attempt to read it won't success. To be honest there
   is a small race window present: write doubles are not atomic operation
   thus we might read partially updated timestamp similarly as we have with
   @idle field already. I think this should be addressed separately and better
   without heavy cmsg engine involved but with rw lock instead or plain atomics.

v9 (Vlad and Serge):
 - update of transaction lag for reading by TX thread done via cbus message
 - use last timestamp from transaction to account
 - verify that we really need to test for replica being non-nil in
   applier reader
 - update docs
 - update a testcase

branch gorcunov/gh-5447-relay-lag-9
issue https://github.com/tarantool/tarantool/issues/5447

Cyrill Gorcunov (2):
  applier: send transaction's first row WAL time in the applier_writer_f
  relay: provide information about downstream lag

 .../unreleased/gh-5447-downstream-lag.md      |   6 +
 src/box/applier.cc                            |  97 +++++++++++--
 src/box/lua/info.c                            |   3 +
 src/box/relay.cc                              |  94 ++++++++++++-
 src/box/relay.h                               |   6 +
 src/box/replication.cc                        |   1 +
 src/box/replication.h                         |   5 +
 .../replication/gh-5447-downstream-lag.result | 128 ++++++++++++++++++
 .../gh-5447-downstream-lag.test.lua           |  57 ++++++++
 9 files changed, 378 insertions(+), 19 deletions(-)
 create mode 100644 changelogs/unreleased/gh-5447-downstream-lag.md
 create mode 100644 test/replication/gh-5447-downstream-lag.result
 create mode 100644 test/replication/gh-5447-downstream-lag.test.lua


base-commit: b5f0dc4db9aef9618f56b0bcb4a7b82a59591784
-- 
2.31.1


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [Tarantool-patches] [PATCH v9 1/2] applier: send transaction's first row WAL time in the applier_writer_f
  2021-06-17 15:48 [Tarantool-patches] [PATCH v9 0/2] relay: provide downstream lag information Cyrill Gorcunov via Tarantool-patches
@ 2021-06-17 15:48 ` Cyrill Gorcunov via Tarantool-patches
  2021-06-18  9:51   ` Serge Petrenko via Tarantool-patches
  2021-06-17 15:48 ` [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag Cyrill Gorcunov via Tarantool-patches
  1 sibling, 1 reply; 11+ messages in thread
From: Cyrill Gorcunov via Tarantool-patches @ 2021-06-17 15:48 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

Applier fiber sends current vclock of the node to remote relay reader,
pointing current state of fetched WAL data so the relay will know which
new data should be sent. The packet applier sends carries xrow_header::tm
field as a zero but we can reuse it to provide information about first
timestamp in a transaction we wrote to our WAL. Since old instances of
Tarantool simply ignore this field such extension won't cause any
problems.

The timestamp will be needed to account lag of downstream replicas
suitable for information purpose and cluster health monitoring.

We update applier statistics in WAL callbacks but since both
apply_synchro_row and apply_plain_tx are used not only in real data
application but in final join stage as well (in this stage we're not
writing the data yet) the apply_synchro_row is extended with replica_id
argument which is non zero when applier is subscribed.

The calculation of the downstream lag itself lag will be addressed
in next patch because sending the timestamp and its observation
are independent actions.

Part-of #5447

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/applier.cc     | 97 +++++++++++++++++++++++++++++++++++-------
 src/box/replication.cc |  1 +
 src/box/replication.h  |  5 +++
 3 files changed, 88 insertions(+), 15 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 10cea26a7..0782be513 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -163,6 +163,9 @@ applier_writer_f(va_list ap)
 	struct ev_io io;
 	coio_create(&io, applier->io.fd);
 
+	/* ID is permanent while applier is alive */
+	uint32_t replica_id = applier->instance_id;
+
 	while (!fiber_is_cancelled()) {
 		/*
 		 * Tarantool >= 1.7.7 sends periodic heartbeat
@@ -193,6 +196,16 @@ applier_writer_f(va_list ap)
 			applier->has_acks_to_send = false;
 			struct xrow_header xrow;
 			xrow_encode_vclock(&xrow, &replicaset.vclock);
+			/*
+			 * For relay lag statistics we report last
+			 * written transaction timestamp in tm field.
+			 *
+			 * If user delete the node from _cluster space,
+			 * we obtain a nil pointer here.
+			 */
+			struct replica *r = replica_by_id(replica_id);
+			if (likely(r != NULL))
+				xrow.tm = r->applier_txn_last_tm;
 			coio_write_xrow(&io, &xrow);
 			ERROR_INJECT(ERRINJ_APPLIER_SLOW_ACK, {
 				fiber_sleep(0.01);
@@ -490,7 +503,7 @@ static uint64_t
 applier_read_tx(struct applier *applier, struct stailq *rows, double timeout);
 
 static int
-apply_final_join_tx(struct stailq *rows);
+apply_final_join_tx(uint32_t replica_id, struct stailq *rows);
 
 /**
  * A helper struct to link xrow objects in a list.
@@ -535,7 +548,7 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
 						  next)->row);
 			break;
 		}
-		if (apply_final_join_tx(&rows) != 0)
+		if (apply_final_join_tx(applier->instance_id, &rows) != 0)
 			diag_raise();
 	}
 
@@ -751,11 +764,35 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
 	return 0;
 }
 
+struct replica_cb_data {
+	/** Replica ID the data belongs to. */
+	uint32_t replica_id;
+	/**
+	 * Timestamp of a transaction to be accounted
+	 * for relay lag. Usually it is a last row in
+	 * a transaction.
+	 */
+	double txn_last_tm;
+};
+
+/** Update replica associated data once write is complete. */
+static void
+replica_txn_wal_write_cb(struct replica_cb_data *rcb)
+{
+	struct replica *r = replica_by_id(rcb->replica_id);
+	if (likely(r != NULL))
+		r->applier_txn_last_tm = rcb->txn_last_tm;
+}
+
 static int
 applier_txn_wal_write_cb(struct trigger *trigger, void *event)
 {
-	(void) trigger;
 	(void) event;
+
+	struct replica_cb_data *rcb =
+		(struct replica_cb_data *)trigger->data;
+	replica_txn_wal_write_cb(rcb);
+
 	/* Broadcast the WAL write across all appliers. */
 	trigger_run(&replicaset.applier.on_wal_write, NULL);
 	return 0;
@@ -766,6 +803,8 @@ struct synchro_entry {
 	struct synchro_request *req;
 	/** Fiber created the entry. To wakeup when WAL write is done. */
 	struct fiber *owner;
+	/** Replica associated data. */
+	struct replica_cb_data *rcb;
 	/**
 	 * The base journal entry. It has unsized array and then must be the
 	 * last entry in the structure. But can workaround it via a union
@@ -789,6 +828,7 @@ apply_synchro_row_cb(struct journal_entry *entry)
 	if (entry->res < 0) {
 		applier_rollback_by_wal_io(entry->res);
 	} else {
+		replica_txn_wal_write_cb(synchro_entry->rcb);
 		txn_limbo_process(&txn_limbo, synchro_entry->req);
 		trigger_run(&replicaset.applier.on_wal_write, NULL);
 	}
@@ -797,7 +837,7 @@ apply_synchro_row_cb(struct journal_entry *entry)
 
 /** Process a synchro request. */
 static int
-apply_synchro_row(struct xrow_header *row)
+apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
 {
 	assert(iproto_type_is_synchro_request(row->type));
 
@@ -805,6 +845,7 @@ apply_synchro_row(struct xrow_header *row)
 	if (xrow_decode_synchro(row, &req) != 0)
 		goto err;
 
+	struct replica_cb_data rcb_data;
 	struct synchro_entry entry;
 	/*
 	 * Rows array is cast from *[] to **, because otherwise g++ complains
@@ -817,6 +858,11 @@ apply_synchro_row(struct xrow_header *row)
 			     apply_synchro_row_cb, &entry);
 	entry.req = &req;
 	entry.owner = fiber();
+
+	rcb_data.replica_id = replica_id;
+	rcb_data.txn_last_tm = row->tm;
+	entry.rcb = &rcb_data;
+
 	/*
 	 * The WAL write is blocking. Otherwise it might happen that a CONFIRM
 	 * or ROLLBACK is sent to WAL, and it would empty the limbo, but before
@@ -864,8 +910,9 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
 	return box_raft_process(&req, applier->instance_id);
 }
 
-static inline int
-apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
+static int
+apply_plain_tx(uint32_t replica_id, struct stailq *rows,
+	       bool skip_conflict, bool use_triggers)
 {
 	/*
 	 * Explicitly begin the transaction so that we can
@@ -933,10 +980,28 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
 			goto fail;
 		}
 
+		struct replica_cb_data *rcb;
+		rcb = region_alloc_object(&txn->region, typeof(*rcb), &size);
+		if (rcb == NULL) {
+			diag_set(OutOfMemory, size, "region_alloc_object", "rcb");
+			goto fail;
+		}
+
 		trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
 		txn_on_rollback(txn, on_rollback);
 
-		trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
+		/*
+		 * We use *last* entry timestamp because ack comes up to
+		 * last entry in transaction. Same time this shows more
+		 * precise result because we're interested in how long
+		 * transaction traversed network + remote WAL bundle before
+		 * ack get received.
+		 */
+		item = stailq_last_entry(rows, struct applier_tx_row, next);
+		rcb->replica_id = replica_id;
+		rcb->txn_last_tm = item->row.tm;
+
+		trigger_create(on_wal_write, applier_txn_wal_write_cb, rcb, NULL);
 		txn_on_wal_write(txn, on_wal_write);
 	}
 
@@ -948,7 +1013,7 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
 
 /** A simpler version of applier_apply_tx() for final join stage. */
 static int
-apply_final_join_tx(struct stailq *rows)
+apply_final_join_tx(uint32_t replica_id, struct stailq *rows)
 {
 	struct xrow_header *first_row =
 		&stailq_first_entry(rows, struct applier_tx_row, next)->row;
@@ -959,9 +1024,9 @@ apply_final_join_tx(struct stailq *rows)
 	vclock_follow_xrow(&replicaset.vclock, last_row);
 	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
 		assert(first_row == last_row);
-		rc = apply_synchro_row(first_row);
+		rc = apply_synchro_row(replica_id, first_row);
 	} else {
-		rc = apply_plain_tx(rows, false, false);
+		rc = apply_plain_tx(replica_id, rows, false, false);
 	}
 	fiber_gc();
 	return rc;
@@ -1090,12 +1155,14 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
 		 * each other.
 		 */
 		assert(first_row == last_row);
-		if ((rc = apply_synchro_row(first_row)) != 0)
-			goto finish;
-	} else if ((rc = apply_plain_tx(rows, replication_skip_conflict,
-					true)) != 0) {
-		goto finish;
+		rc = apply_synchro_row(applier->instance_id, first_row);
+	} else {
+		rc = apply_plain_tx(applier->instance_id, rows,
+				    replication_skip_conflict, true);
 	}
+	if (rc != 0)
+		goto finish;
+
 	vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
 		      last_row->lsn);
 finish:
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 903390686..a0b3e0186 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -184,6 +184,7 @@ replica_new(void)
 	trigger_create(&replica->on_applier_state,
 		       replica_on_applier_state_f, NULL, NULL);
 	replica->applier_sync_state = APPLIER_DISCONNECTED;
+	replica->applier_txn_last_tm = 0;
 	latch_create(&replica->order_latch);
 	return replica;
 }
diff --git a/src/box/replication.h b/src/box/replication.h
index 5cc380373..57e0f10ae 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -331,6 +331,11 @@ struct replica {
 	 * separate from applier.
 	 */
 	enum applier_state applier_sync_state;
+	/**
+	 * Applier's last written to WAL transaction timestamp.
+	 * Needed for relay lagging statistics.
+	 */
+	double applier_txn_last_tm;
 	/* The latch is used to order replication requests. */
 	struct latch order_latch;
 };
-- 
2.31.1


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag
  2021-06-17 15:48 [Tarantool-patches] [PATCH v9 0/2] relay: provide downstream lag information Cyrill Gorcunov via Tarantool-patches
  2021-06-17 15:48 ` [Tarantool-patches] [PATCH v9 1/2] applier: send transaction's first row WAL time in the applier_writer_f Cyrill Gorcunov via Tarantool-patches
@ 2021-06-17 15:48 ` Cyrill Gorcunov via Tarantool-patches
  2021-06-18  9:50   ` Serge Petrenko via Tarantool-patches
  2021-06-20 14:37   ` Vladislav Shpilevoy via Tarantool-patches
  1 sibling, 2 replies; 11+ messages in thread
From: Cyrill Gorcunov via Tarantool-patches @ 2021-06-17 15:48 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

We already have `box.replication.upstream.lag` entry for monitoring
sake. Same time in synchronous replication timeouts are key properties
for quorum gathering procedure. Thus we would like to know how long
it took of a transaction to traverse `initiator WAL -> network ->
remote applier -> initiator ACK reception` path.

Typical output is

 | tarantool> box.info.replication[2].downstream
 | ---
 | - status: follow
 |   idle: 0.61753897101153
 |   vclock: {1: 147}
 |   lag: 0
 | ...
 | tarantool> box.space.sync:insert{69}
 | ---
 | - [69]
 | ...
 |
 | tarantool> box.info.replication[2].downstream
 | ---
 | - status: follow
 |   idle: 0.75324084801832
 |   vclock: {1: 151}
 |   lag: 0.0011014938354492
 | ...

Closes #5447

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>

@TarantoolBot document
Title: Add `box.info.replication[n].downstream.lag` entry

`replication[n].downstream.lag` represents a lag between the main
node writes a certain transaction to it's own WAL and a moment it
receives an ack for this transaction from a replica.
---
 .../unreleased/gh-5447-downstream-lag.md      |   6 +
 src/box/lua/info.c                            |   3 +
 src/box/relay.cc                              |  94 ++++++++++++-
 src/box/relay.h                               |   6 +
 .../replication/gh-5447-downstream-lag.result | 128 ++++++++++++++++++
 .../gh-5447-downstream-lag.test.lua           |  57 ++++++++
 6 files changed, 290 insertions(+), 4 deletions(-)
 create mode 100644 changelogs/unreleased/gh-5447-downstream-lag.md
 create mode 100644 test/replication/gh-5447-downstream-lag.result
 create mode 100644 test/replication/gh-5447-downstream-lag.test.lua

diff --git a/changelogs/unreleased/gh-5447-downstream-lag.md b/changelogs/unreleased/gh-5447-downstream-lag.md
new file mode 100644
index 000000000..f937ce35e
--- /dev/null
+++ b/changelogs/unreleased/gh-5447-downstream-lag.md
@@ -0,0 +1,6 @@
+#feature/replication
+
+ * Introduced `box.info.replication[n].downstream.lag` field to monitor
+   state of replication. This member represents a lag between the main
+   node writes a certain transaction to it's own WAL and a moment it
+   receives an ack for this transaction from a replica (gh-5447).
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index 0eb48b823..f201b25e3 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -143,6 +143,9 @@ lbox_pushrelay(lua_State *L, struct relay *relay)
 		lua_pushnumber(L, ev_monotonic_now(loop()) -
 			       relay_last_row_time(relay));
 		lua_settable(L, -3);
+		lua_pushstring(L, "lag");
+		lua_pushnumber(L, relay_txn_lag(relay));
+		lua_settable(L, -3);
 		break;
 	case RELAY_STOPPED:
 	{
diff --git a/src/box/relay.cc b/src/box/relay.cc
index b1571b361..ed6dd193d 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -70,6 +70,18 @@ struct relay_status_msg {
 	struct vclock vclock;
 };
 
+/**
+ * Cbus message to update relay reader fiber status in tx thread.
+ */
+struct relay_reader_msg {
+	/** Parent */
+	struct cmsg msg;
+	/** Relay instance */
+	struct relay *relay;
+	/** Transaction lag value. */
+	double txn_lag;
+};
+
 /**
  * Cbus message to update replica gc state in tx thread.
  */
@@ -151,6 +163,8 @@ struct relay {
 	struct cpipe relay_pipe;
 	/** Status message */
 	struct relay_status_msg status_msg;
+	/** Reader fiber message */
+	struct relay_reader_msg reader_msg;
 	/**
 	 * List of garbage collection messages awaiting
 	 * confirmation from the replica.
@@ -158,6 +172,11 @@ struct relay {
 	struct stailq pending_gc;
 	/** Time when last row was sent to peer. */
 	double last_row_time;
+	/**
+	 * Last timestamp observed from remote node to
+	 * compute @a tx.txn_lag value.
+	 */
+	double txn_acked_tm;
 	/** Relay sync state. */
 	enum relay_state state;
 
@@ -166,6 +185,14 @@ struct relay {
 		alignas(CACHELINE_SIZE)
 		/** Known relay vclock. */
 		struct vclock vclock;
+		/**
+		 * A time difference between the moment when we
+		 * wrote a transaction to the local WAL and when
+		 * this transaction has been replicated to remote
+		 * node (ie written to node's WAL) so that ACK get
+		 * received.
+		 */
+		double txn_lag;
 		/**
 		 * True if the relay needs Raft updates. It can live fine
 		 * without sending Raft updates, if it is a relay to an
@@ -217,6 +244,12 @@ relay_last_row_time(const struct relay *relay)
 	return relay->last_row_time;
 }
 
+double
+relay_txn_lag(const struct relay *relay)
+{
+	return relay->tx.txn_lag;
+}
+
 static void
 relay_send(struct relay *relay, struct xrow_header *packet);
 static void
@@ -284,6 +317,15 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
 	relay->state = RELAY_FOLLOW;
 	relay->row_count = 0;
 	relay->last_row_time = ev_monotonic_now(loop());
+	/*
+	 * We assume that previously written rows in WAL
+	 * are older than current node real time which allows
+	 * to simplify @a tx.txn_lag calculation. In worst
+	 * scenario when runtime has been adjusted backwards
+	 * between restart we simply get some big value in
+	 * @a tx.txn_lag until next transaction get replicated.
+	 */
+	relay->txn_acked_tm = ev_now(loop());
 }
 
 void
@@ -336,6 +378,12 @@ relay_stop(struct relay *relay)
 	 * upon cord_create().
 	 */
 	relay->cord.id = 0;
+	/*
+	 * If relay is stopped then lag statistics should
+	 * be updated on next new ACK packets obtained.
+	 */
+	relay->txn_acked_tm = 0;
+	relay->tx.txn_lag = 0;
 }
 
 void
@@ -466,11 +514,10 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
 }
 
 /**
- * The message which updated tx thread with a new vclock has returned back
- * to the relay.
+ * The message which updated tx thread returns to the relay thread.
  */
 static void
-relay_status_update(struct cmsg *msg)
+tx_update_complete(struct cmsg *msg)
 {
 	msg->route = NULL;
 }
@@ -500,7 +547,7 @@ tx_status_update(struct cmsg *msg)
 	trigger_run(&replicaset.on_ack, &ack);
 
 	static const struct cmsg_hop route[] = {
-		{relay_status_update, NULL}
+		{tx_update_complete, NULL}
 	};
 	cmsg_init(msg, route);
 	cpipe_push(&status->relay->relay_pipe, msg);
@@ -607,6 +654,20 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
 	}
 }
 
+static void
+tx_reader_update(struct cmsg *msg)
+{
+	struct relay_reader_msg *rmsg = (struct relay_reader_msg *)msg;
+	rmsg->relay->tx.txn_lag = rmsg->txn_lag;
+
+	static const struct cmsg_hop route[] = {
+		{tx_update_complete, NULL}
+	};
+
+	cmsg_init(msg, route);
+	cpipe_push(&rmsg->relay->relay_pipe, msg);
+}
+
 /*
  * Relay reader fiber function.
  * Read xrow encoded vclocks sent by the replica.
@@ -616,6 +677,7 @@ relay_reader_f(va_list ap)
 {
 	struct relay *relay = va_arg(ap, struct relay *);
 	struct fiber *relay_f = va_arg(ap, struct fiber *);
+	struct relay_reader_msg *rmsg = &relay->reader_msg;
 
 	struct ibuf ibuf;
 	struct ev_io io;
@@ -629,6 +691,30 @@ relay_reader_f(va_list ap)
 			/* vclock is followed while decoding, zeroing it. */
 			vclock_create(&relay->recv_vclock);
 			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
+			/*
+			 * Replica send us last replicated transaction
+			 * timestamp which is needed for relay lag
+			 * monitoring. Note that this transaction has
+			 * been written to WAL with our current realtime
+			 * clock value, thus when it get reported back we
+			 * can compute time spent regardless of the clock
+			 * value on remote replica. Since the communication
+			 * goes between thread make sure the previous update
+			 * to TX is complete.
+			 */
+			if (relay->txn_acked_tm < xrow.tm &&
+			    rmsg->msg.route == NULL) {
+				relay->txn_acked_tm = xrow.tm;
+
+				static const struct cmsg_hop route[] = {
+					{tx_reader_update, NULL}
+				};
+
+				cmsg_init(&rmsg->msg, route);
+				rmsg->txn_lag = ev_now(loop()) - xrow.tm;
+				rmsg->relay = relay;
+				cpipe_push(&relay->tx_pipe, &rmsg->msg);
+			}
 			fiber_cond_signal(&relay->reader_cond);
 		}
 	} catch (Exception *e) {
diff --git a/src/box/relay.h b/src/box/relay.h
index b32e2ea2a..615ffb75d 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -93,6 +93,12 @@ relay_vclock(const struct relay *relay);
 double
 relay_last_row_time(const struct relay *relay);
 
+/**
+ * Returns relay's transaction's lag.
+ */
+double
+relay_txn_lag(const struct relay *relay);
+
 /**
  * Send a Raft update request to the relay channel. It is not
  * guaranteed that it will be delivered. The connection may break.
diff --git a/test/replication/gh-5447-downstream-lag.result b/test/replication/gh-5447-downstream-lag.result
new file mode 100644
index 000000000..2cc020451
--- /dev/null
+++ b/test/replication/gh-5447-downstream-lag.result
@@ -0,0 +1,128 @@
+-- test-run result file version 2
+--
+-- gh-5447: Test for box.info.replication[n].downstream.lag.
+-- We need to be sure that slow ACKs delivery might be
+-- caught by monitoring tools.
+--
+
+fiber = require('fiber')
+ | ---
+ | ...
+test_run = require('test_run').new()
+ | ---
+ | ...
+engine = test_run:get_cfg('engine')
+ | ---
+ | ...
+
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+
+test_run:cmd('create server replica with rpl_master=default, \
+             script="replication/replica.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica')
+ | ---
+ | - true
+ | ...
+
+replica_id = test_run:get_server_id('replica')
+ | ---
+ | ...
+
+--
+-- Upon replica startup there is no ACKs to process.
+assert(box.info.replication[replica_id].downstream.lag == 0)
+ | ---
+ | - true
+ | ...
+
+s = box.schema.space.create('test', {engine = engine})
+ | ---
+ | ...
+_ = s:create_index('pk')
+ | ---
+ | ...
+
+--
+-- The replica should wait some time before writing data
+-- to the WAL, otherwise we might not even notice the lag
+-- if media is too fast. Before disabling WAL we need to
+-- wait the space get propagated.
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+test_run:wait_lsn('replica', 'default')
+ | ---
+ | ...
+box.error.injection.set("ERRINJ_WAL_DELAY", true)
+ | ---
+ | - ok
+ | ...
+
+--
+-- Insert a record and wakeup replica's WAL to process data.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+lsn = box.info.lsn
+ | ---
+ | ...
+box.space.test:insert({1})
+ | ---
+ | - [1]
+ | ...
+test_run:wait_cond(function() return box.info.lsn > lsn end)
+ | ---
+ | - true
+ | ...
+-- The record is written on the master node.
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+box.error.injection.set("ERRINJ_WAL_DELAY", false)
+ | ---
+ | - ok
+ | ...
+
+--
+-- Wait the record to be ACKed, the lag value should be nonzero.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:wait_lsn('replica', 'default')
+ | ---
+ | ...
+assert(box.info.replication[replica_id].downstream.lag > 0)
+ | ---
+ | - true
+ | ...
+
+--
+-- Cleanup everything.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('cleanup server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
diff --git a/test/replication/gh-5447-downstream-lag.test.lua b/test/replication/gh-5447-downstream-lag.test.lua
new file mode 100644
index 000000000..3096e2ac3
--- /dev/null
+++ b/test/replication/gh-5447-downstream-lag.test.lua
@@ -0,0 +1,57 @@
+--
+-- gh-5447: Test for box.info.replication[n].downstream.lag.
+-- We need to be sure that slow ACKs delivery might be
+-- caught by monitoring tools.
+--
+
+fiber = require('fiber')
+test_run = require('test_run').new()
+engine = test_run:get_cfg('engine')
+
+box.schema.user.grant('guest', 'replication')
+
+test_run:cmd('create server replica with rpl_master=default, \
+             script="replication/replica.lua"')
+test_run:cmd('start server replica')
+
+replica_id = test_run:get_server_id('replica')
+
+--
+-- Upon replica startup there is no ACKs to process.
+assert(box.info.replication[replica_id].downstream.lag == 0)
+
+s = box.schema.space.create('test', {engine = engine})
+_ = s:create_index('pk')
+
+--
+-- The replica should wait some time before writing data
+-- to the WAL, otherwise we might not even notice the lag
+-- if media is too fast. Before disabling WAL we need to
+-- wait the space get propagated.
+test_run:switch('replica')
+test_run:wait_lsn('replica', 'default')
+box.error.injection.set("ERRINJ_WAL_DELAY", true)
+
+--
+-- Insert a record and wakeup replica's WAL to process data.
+test_run:switch('default')
+lsn = box.info.lsn
+box.space.test:insert({1})
+test_run:wait_cond(function() return box.info.lsn > lsn end)
+-- The record is written on the master node.
+test_run:switch('replica')
+box.error.injection.set("ERRINJ_WAL_DELAY", false)
+
+--
+-- Wait the record to be ACKed, the lag value should be nonzero.
+test_run:switch('default')
+test_run:wait_lsn('replica', 'default')
+assert(box.info.replication[replica_id].downstream.lag > 0)
+
+--
+-- Cleanup everything.
+test_run:switch('default')
+box.schema.user.revoke('guest', 'replication')
+test_run:cmd('stop server replica')
+test_run:cmd('cleanup server replica')
+test_run:cmd('delete server replica')
-- 
2.31.1


^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag
  2021-06-17 15:48 ` [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag Cyrill Gorcunov via Tarantool-patches
@ 2021-06-18  9:50   ` Serge Petrenko via Tarantool-patches
  2021-06-20 14:37   ` Vladislav Shpilevoy via Tarantool-patches
  1 sibling, 0 replies; 11+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-06-18  9:50 UTC (permalink / raw)
  To: Cyrill Gorcunov, tml; +Cc: Vladislav Shpilevoy



17.06.2021 18:48, Cyrill Gorcunov пишет:
> We already have `box.replication.upstream.lag` entry for monitoring
> sake. Same time in synchronous replication timeouts are key properties
> for quorum gathering procedure. Thus we would like to know how long
> it took of a transaction to traverse `initiator WAL -> network ->
> remote applier -> initiator ACK reception` path.
>
> Typical output is
>
>   | tarantool> box.info.replication[2].downstream
>   | ---
>   | - status: follow
>   |   idle: 0.61753897101153
>   |   vclock: {1: 147}
>   |   lag: 0
>   | ...
>   | tarantool> box.space.sync:insert{69}
>   | ---
>   | - [69]
>   | ...
>   |
>   | tarantool> box.info.replication[2].downstream
>   | ---
>   | - status: follow
>   |   idle: 0.75324084801832
>   |   vclock: {1: 151}
>   |   lag: 0.0011014938354492
>   | ...
>
> Closes #5447

Hi! Thanks for the fixes!

Please find a couple of comments below.

>
> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
>
> @TarantoolBot document
> Title: Add `box.info.replication[n].downstream.lag` entry
>
> `replication[n].downstream.lag` represents a lag between the main
> node writes a certain transaction to it's own WAL and a moment it
> receives an ack for this transaction from a replica.
> ---
>   .../unreleased/gh-5447-downstream-lag.md      |   6 +
>   src/box/lua/info.c                            |   3 +
>   src/box/relay.cc                              |  94 ++++++++++++-
>   src/box/relay.h                               |   6 +
>   .../replication/gh-5447-downstream-lag.result | 128 ++++++++++++++++++
>   .../gh-5447-downstream-lag.test.lua           |  57 ++++++++
>   6 files changed, 290 insertions(+), 4 deletions(-)
>   create mode 100644 changelogs/unreleased/gh-5447-downstream-lag.md
>   create mode 100644 test/replication/gh-5447-downstream-lag.result
>   create mode 100644 test/replication/gh-5447-downstream-lag.test.lua
>
> diff --git a/changelogs/unreleased/gh-5447-downstream-lag.md b/changelogs/unreleased/gh-5447-downstream-lag.md
> new file mode 100644
> index 000000000..f937ce35e
> --- /dev/null
> +++ b/changelogs/unreleased/gh-5447-downstream-lag.md
> @@ -0,0 +1,6 @@
> +#feature/replication
> +
> + * Introduced `box.info.replication[n].downstream.lag` field to monitor
> +   state of replication. This member represents a lag between the main
> +   node writes a certain transaction to it's own WAL and a moment it
> +   receives an ack for this transaction from a replica (gh-5447).
> diff --git a/src/box/lua/info.c b/src/box/lua/info.c
> index 0eb48b823..f201b25e3 100644
> --- a/src/box/lua/info.c
> +++ b/src/box/lua/info.c
> @@ -143,6 +143,9 @@ lbox_pushrelay(lua_State *L, struct relay *relay)
>   		lua_pushnumber(L, ev_monotonic_now(loop()) -
>   			       relay_last_row_time(relay));
>   		lua_settable(L, -3);
> +		lua_pushstring(L, "lag");
> +		lua_pushnumber(L, relay_txn_lag(relay));
> +		lua_settable(L, -3);
>   		break;
>   	case RELAY_STOPPED:
>   	{
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index b1571b361..ed6dd193d 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -70,6 +70,18 @@ struct relay_status_msg {
>   	struct vclock vclock;
>   };
>   
> +/**
> + * Cbus message to update relay reader fiber status in tx thread.
> + */
> +struct relay_reader_msg {
> +	/** Parent */
> +	struct cmsg msg;
> +	/** Relay instance */
> +	struct relay *relay;
> +	/** Transaction lag value. */
> +	double txn_lag;
> +};
> +
>   /**
>    * Cbus message to update replica gc state in tx thread.
>    */
> @@ -151,6 +163,8 @@ struct relay {
>   	struct cpipe relay_pipe;
>   	/** Status message */
>   	struct relay_status_msg status_msg;
> +	/** Reader fiber message */
> +	struct relay_reader_msg reader_msg;
>   	/**
>   	 * List of garbage collection messages awaiting
>   	 * confirmation from the replica.
> @@ -158,6 +172,11 @@ struct relay {
>   	struct stailq pending_gc;
>   	/** Time when last row was sent to peer. */
>   	double last_row_time;
> +	/**
> +	 * Last timestamp observed from remote node to
> +	 * compute @a tx.txn_lag value.
> +	 */
> +	double txn_acked_tm;
>   	/** Relay sync state. */
>   	enum relay_state state;
>   
> @@ -166,6 +185,14 @@ struct relay {
>   		alignas(CACHELINE_SIZE)
>   		/** Known relay vclock. */
>   		struct vclock vclock;
> +		/**
> +		 * A time difference between the moment when we
> +		 * wrote a transaction to the local WAL and when
> +		 * this transaction has been replicated to remote
> +		 * node (ie written to node's WAL) so that ACK get
> +		 * received.
> +		 */
> +		double txn_lag;
>   		/**
>   		 * True if the relay needs Raft updates. It can live fine
>   		 * without sending Raft updates, if it is a relay to an
> @@ -217,6 +244,12 @@ relay_last_row_time(const struct relay *relay)
>   	return relay->last_row_time;
>   }
>   
> +double
> +relay_txn_lag(const struct relay *relay)
> +{
> +	return relay->tx.txn_lag;
> +}
> +
>   static void
>   relay_send(struct relay *relay, struct xrow_header *packet);
>   static void
> @@ -284,6 +317,15 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
>   	relay->state = RELAY_FOLLOW;
>   	relay->row_count = 0;
>   	relay->last_row_time = ev_monotonic_now(loop());
> +	/*
> +	 * We assume that previously written rows in WAL
> +	 * are older than current node real time which allows
> +	 * to simplify @a tx.txn_lag calculation. In worst
> +	 * scenario when runtime has been adjusted backwards
> +	 * between restart we simply get some big value in
> +	 * @a tx.txn_lag until next transaction get replicated.
> +	 */
> +	relay->txn_acked_tm = ev_now(loop());
>   }
>   
>   void
> @@ -336,6 +378,12 @@ relay_stop(struct relay *relay)
>   	 * upon cord_create().
>   	 */
>   	relay->cord.id = 0;
> +	/*
> +	 * If relay is stopped then lag statistics should
> +	 * be updated on next new ACK packets obtained.
> +	 */
> +	relay->txn_acked_tm = 0;
> +	relay->tx.txn_lag = 0;
>   }
>   
>   void
> @@ -466,11 +514,10 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
>   }
>   
>   /**
> - * The message which updated tx thread with a new vclock has returned back
> - * to the relay.
> + * The message which updated tx thread returns to the relay thread.
>    */
>   static void
> -relay_status_update(struct cmsg *msg)
> +tx_update_complete(struct cmsg *msg)

Let's name this `relay_update_complete` or something similar, please.

Otherwise it looks like something executed in tx thread.

>   {
>   	msg->route = NULL;
>   }
> @@ -500,7 +547,7 @@ tx_status_update(struct cmsg *msg)
>   	trigger_run(&replicaset.on_ack, &ack);
>   
>   	static const struct cmsg_hop route[] = {
> -		{relay_status_update, NULL}
> +		{tx_update_complete, NULL}
>   	};
>   	cmsg_init(msg, route);
>   	cpipe_push(&status->relay->relay_pipe, msg);
> @@ -607,6 +654,20 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
>   	}
>   }
>   
> +static void
> +tx_reader_update(struct cmsg *msg)
> +{
> +	struct relay_reader_msg *rmsg = (struct relay_reader_msg *)msg;
> +	rmsg->relay->tx.txn_lag = rmsg->txn_lag;
> +
> +	static const struct cmsg_hop route[] = {
> +		{tx_update_complete, NULL}
> +	};
> +
> +	cmsg_init(msg, route);
> +	cpipe_push(&rmsg->relay->relay_pipe, msg);
> +}
> +
>   /*
>    * Relay reader fiber function.
>    * Read xrow encoded vclocks sent by the replica.
> @@ -616,6 +677,7 @@ relay_reader_f(va_list ap)
>   {
>   	struct relay *relay = va_arg(ap, struct relay *);
>   	struct fiber *relay_f = va_arg(ap, struct fiber *);
> +	struct relay_reader_msg *rmsg = &relay->reader_msg;
>   
>   	struct ibuf ibuf;
>   	struct ev_io io;
> @@ -629,6 +691,30 @@ relay_reader_f(va_list ap)
>   			/* vclock is followed while decoding, zeroing it. */
>   			vclock_create(&relay->recv_vclock);
>   			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
> +			/*
> +			 * Replica send us last replicated transaction
> +			 * timestamp which is needed for relay lag
> +			 * monitoring. Note that this transaction has
> +			 * been written to WAL with our current realtime
> +			 * clock value, thus when it get reported back we
> +			 * can compute time spent regardless of the clock
> +			 * value on remote replica. Since the communication
> +			 * goes between thread make sure the previous update
> +			 * to TX is complete.
> +			 */
> +			if (relay->txn_acked_tm < xrow.tm &&
> +			    rmsg->msg.route == NULL) {
> +				relay->txn_acked_tm = xrow.tm;
> +
> +				static const struct cmsg_hop route[] = {
> +					{tx_reader_update, NULL}
> +				};
> +
> +				cmsg_init(&rmsg->msg, route);
> +				rmsg->txn_lag = ev_now(loop()) - xrow.tm;
> +				rmsg->relay = relay;
> +				cpipe_push(&relay->tx_pipe, &rmsg->msg);
> +			}

This bothers me a bit that now we have 2 places which issue cbus 
messages from
relay to tx. I mean relay_reader_f and relay_subscribe_f.

It would be nice if you could unify these two messages. For example,
enrich tx_status_update message with new txn_lag value.

I think this should be fine to send the cbus message a bit later than 
the actual
ACK was received. This is how it's done for replica's vclock.

So, I propose to either move vclock update (relay->status_msg) push to 
relay_reader_f
or move time update (relay->reader_msg) to relay_subscribe_f. And unify 
the messages.

This way we'll have a single source for all relay->tx updates and less 
code duplication.

P.S. moving relay->status_msg push to relay_reader_f sounds more logical 
to me.

>   			fiber_cond_signal(&relay->reader_cond);
>   		}
>   	} catch (Exception *e) {
> diff --git a/src/box/relay.h b/src/box/relay.h
> index b32e2ea2a..615ffb75d 100644
> --- a/src/box/relay.h
> +++ b/src/box/relay.h
> @@ -93,6 +93,12 @@ relay_vclock(const struct relay *relay);
>   double
>   relay_last_row_time(const struct relay *relay);
>   
> +/**
> + * Returns relay's transaction's lag.
> + */
> +double
> +relay_txn_lag(const struct relay *relay);
> +
>   /**
>    * Send a Raft update request to the relay channel. It is not
>    * guaranteed that it will be delivered. The connection may break.
> diff --git a/test/replication/gh-5447-downstream-lag.result b/test/replication/gh-5447-downstream-lag.result
> new file mode 100644
> index 000000000..2cc020451
> --- /dev/null
> +++ b/test/replication/gh-5447-downstream-lag.result
> @@ -0,0 +1,128 @@
> +-- test-run result file version 2
> +--
> +-- gh-5447: Test for box.info.replication[n].downstream.lag.
> +-- We need to be sure that slow ACKs delivery might be
> +-- caught by monitoring tools.
> +--
> +
> +fiber = require('fiber')
> + | ---
> + | ...
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +engine = test_run:get_cfg('engine')
> + | ---
> + | ...
> +
> +box.schema.user.grant('guest', 'replication')
> + | ---
> + | ...
> +
> +test_run:cmd('create server replica with rpl_master=default, \
> +             script="replication/replica.lua"')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('start server replica')
> + | ---
> + | - true
> + | ...
> +
> +replica_id = test_run:get_server_id('replica')
> + | ---
> + | ...
> +
> +--
> +-- Upon replica startup there is no ACKs to process.
> +assert(box.info.replication[replica_id].downstream.lag == 0)
> + | ---
> + | - true
> + | ...
> +
> +s = box.schema.space.create('test', {engine = engine})
> + | ---
> + | ...
> +_ = s:create_index('pk')
> + | ---
> + | ...
> +
> +--
> +-- The replica should wait some time before writing data
> +-- to the WAL, otherwise we might not even notice the lag
> +-- if media is too fast. Before disabling WAL we need to
> +-- wait the space get propagated.
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +test_run:wait_lsn('replica', 'default')
> + | ---
> + | ...
> +box.error.injection.set("ERRINJ_WAL_DELAY", true)
> + | ---
> + | - ok
> + | ...
> +
> +--
> +-- Insert a record and wakeup replica's WAL to process data.
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +lsn = box.info.lsn
> + | ---
> + | ...
> +box.space.test:insert({1})
> + | ---
> + | - [1]
> + | ...
> +test_run:wait_cond(function() return box.info.lsn > lsn end)
> + | ---
> + | - true
> + | ...
> +-- The record is written on the master node.
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +box.error.injection.set("ERRINJ_WAL_DELAY", false)
> + | ---
> + | - ok
> + | ...
> +
> +--
> +-- Wait the record to be ACKed, the lag value should be nonzero.
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +test_run:wait_lsn('replica', 'default')
> + | ---
> + | ...
> +assert(box.info.replication[replica_id].downstream.lag > 0)
> + | ---
> + | - true
> + | ...
> +
> +--
> +-- Cleanup everything.
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +box.schema.user.revoke('guest', 'replication')
> + | ---
> + | ...
> +test_run:cmd('stop server replica')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('cleanup server replica')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('delete server replica')
> + | ---
> + | - true
> + | ...
> diff --git a/test/replication/gh-5447-downstream-lag.test.lua b/test/replication/gh-5447-downstream-lag.test.lua
> new file mode 100644
> index 000000000..3096e2ac3
> --- /dev/null
> +++ b/test/replication/gh-5447-downstream-lag.test.lua
> @@ -0,0 +1,57 @@
> +--
> +-- gh-5447: Test for box.info.replication[n].downstream.lag.
> +-- We need to be sure that slow ACKs delivery might be
> +-- caught by monitoring tools.
> +--
> +
> +fiber = require('fiber')
> +test_run = require('test_run').new()
> +engine = test_run:get_cfg('engine')
> +
> +box.schema.user.grant('guest', 'replication')
> +
> +test_run:cmd('create server replica with rpl_master=default, \
> +             script="replication/replica.lua"')
> +test_run:cmd('start server replica')
> +
> +replica_id = test_run:get_server_id('replica')
> +
> +--
> +-- Upon replica startup there is no ACKs to process.
> +assert(box.info.replication[replica_id].downstream.lag == 0)
> +
> +s = box.schema.space.create('test', {engine = engine})
> +_ = s:create_index('pk')
> +
> +--
> +-- The replica should wait some time before writing data
> +-- to the WAL, otherwise we might not even notice the lag
> +-- if media is too fast. Before disabling WAL we need to
> +-- wait the space get propagated.
> +test_run:switch('replica')
> +test_run:wait_lsn('replica', 'default')
> +box.error.injection.set("ERRINJ_WAL_DELAY", true)
> +
> +--
> +-- Insert a record and wakeup replica's WAL to process data.
> +test_run:switch('default')
> +lsn = box.info.lsn
> +box.space.test:insert({1})
> +test_run:wait_cond(function() return box.info.lsn > lsn end)
> +-- The record is written on the master node.
> +test_run:switch('replica')
> +box.error.injection.set("ERRINJ_WAL_DELAY", false)
> +
> +--
> +-- Wait the record to be ACKed, the lag value should be nonzero.
> +test_run:switch('default')
> +test_run:wait_lsn('replica', 'default')
> +assert(box.info.replication[replica_id].downstream.lag > 0)
> +
> +--
> +-- Cleanup everything.
> +test_run:switch('default')
> +box.schema.user.revoke('guest', 'replication')
> +test_run:cmd('stop server replica')
> +test_run:cmd('cleanup server replica')
> +test_run:cmd('delete server replica')

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [Tarantool-patches] [PATCH v9 1/2] applier: send transaction's first row WAL time in the applier_writer_f
  2021-06-17 15:48 ` [Tarantool-patches] [PATCH v9 1/2] applier: send transaction's first row WAL time in the applier_writer_f Cyrill Gorcunov via Tarantool-patches
@ 2021-06-18  9:51   ` Serge Petrenko via Tarantool-patches
  2021-06-18 18:06     ` Cyrill Gorcunov via Tarantool-patches
  0 siblings, 1 reply; 11+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-06-18  9:51 UTC (permalink / raw)
  To: Cyrill Gorcunov, tml; +Cc: Vladislav Shpilevoy



17.06.2021 18:48, Cyrill Gorcunov пишет:
> Applier fiber sends current vclock of the node to remote relay reader,
> pointing current state of fetched WAL data so the relay will know which
> new data should be sent. The packet applier sends carries xrow_header::tm
> field as a zero but we can reuse it to provide information about first
> timestamp in a transaction we wrote to our WAL. Since old instances of
> Tarantool simply ignore this field such extension won't cause any
> problems.
>
> The timestamp will be needed to account lag of downstream replicas
> suitable for information purpose and cluster health monitoring.
>
> We update applier statistics in WAL callbacks but since both
> apply_synchro_row and apply_plain_tx are used not only in real data
> application but in final join stage as well (in this stage we're not
> writing the data yet) the apply_synchro_row is extended with replica_id
> argument which is non zero when applier is subscribed.
>
> The calculation of the downstream lag itself lag will be addressed
> in next patch because sending the timestamp and its observation
> are independent actions.
>
> Part-of #5447
>
> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>

Thanks for the patch! LGTM!
> ---
>   src/box/applier.cc     | 97 +++++++++++++++++++++++++++++++++++-------
>   src/box/replication.cc |  1 +
>   src/box/replication.h  |  5 +++
>   3 files changed, 88 insertions(+), 15 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 10cea26a7..0782be513 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -163,6 +163,9 @@ applier_writer_f(va_list ap)
>   	struct ev_io io;
>   	coio_create(&io, applier->io.fd);
>   
> +	/* ID is permanent while applier is alive */
> +	uint32_t replica_id = applier->instance_id;
> +
>   	while (!fiber_is_cancelled()) {
>   		/*
>   		 * Tarantool >= 1.7.7 sends periodic heartbeat
> @@ -193,6 +196,16 @@ applier_writer_f(va_list ap)
>   			applier->has_acks_to_send = false;
>   			struct xrow_header xrow;
>   			xrow_encode_vclock(&xrow, &replicaset.vclock);
> +			/*
> +			 * For relay lag statistics we report last
> +			 * written transaction timestamp in tm field.
> +			 *
> +			 * If user delete the node from _cluster space,
> +			 * we obtain a nil pointer here.
> +			 */
> +			struct replica *r = replica_by_id(replica_id);
> +			if (likely(r != NULL))
> +				xrow.tm = r->applier_txn_last_tm;
>   			coio_write_xrow(&io, &xrow);
>   			ERROR_INJECT(ERRINJ_APPLIER_SLOW_ACK, {
>   				fiber_sleep(0.01);
> @@ -490,7 +503,7 @@ static uint64_t
>   applier_read_tx(struct applier *applier, struct stailq *rows, double timeout);
>   
>   static int
> -apply_final_join_tx(struct stailq *rows);
> +apply_final_join_tx(uint32_t replica_id, struct stailq *rows);
>   
>   /**
>    * A helper struct to link xrow objects in a list.
> @@ -535,7 +548,7 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>   						  next)->row);
>   			break;
>   		}
> -		if (apply_final_join_tx(&rows) != 0)
> +		if (apply_final_join_tx(applier->instance_id, &rows) != 0)
>   			diag_raise();
>   	}
>   
> @@ -751,11 +764,35 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
>   	return 0;
>   }
>   
> +struct replica_cb_data {
> +	/** Replica ID the data belongs to. */
> +	uint32_t replica_id;
> +	/**
> +	 * Timestamp of a transaction to be accounted
> +	 * for relay lag. Usually it is a last row in
> +	 * a transaction.
> +	 */
> +	double txn_last_tm;
> +};
> +
> +/** Update replica associated data once write is complete. */
> +static void
> +replica_txn_wal_write_cb(struct replica_cb_data *rcb)
> +{
> +	struct replica *r = replica_by_id(rcb->replica_id);
> +	if (likely(r != NULL))
> +		r->applier_txn_last_tm = rcb->txn_last_tm;
> +}
> +
>   static int
>   applier_txn_wal_write_cb(struct trigger *trigger, void *event)
>   {
> -	(void) trigger;
>   	(void) event;
> +
> +	struct replica_cb_data *rcb =
> +		(struct replica_cb_data *)trigger->data;
> +	replica_txn_wal_write_cb(rcb);
> +
>   	/* Broadcast the WAL write across all appliers. */
>   	trigger_run(&replicaset.applier.on_wal_write, NULL);
>   	return 0;
> @@ -766,6 +803,8 @@ struct synchro_entry {
>   	struct synchro_request *req;
>   	/** Fiber created the entry. To wakeup when WAL write is done. */
>   	struct fiber *owner;
> +	/** Replica associated data. */
> +	struct replica_cb_data *rcb;
>   	/**
>   	 * The base journal entry. It has unsized array and then must be the
>   	 * last entry in the structure. But can workaround it via a union
> @@ -789,6 +828,7 @@ apply_synchro_row_cb(struct journal_entry *entry)
>   	if (entry->res < 0) {
>   		applier_rollback_by_wal_io(entry->res);
>   	} else {
> +		replica_txn_wal_write_cb(synchro_entry->rcb);
>   		txn_limbo_process(&txn_limbo, synchro_entry->req);
>   		trigger_run(&replicaset.applier.on_wal_write, NULL);
>   	}
> @@ -797,7 +837,7 @@ apply_synchro_row_cb(struct journal_entry *entry)
>   
>   /** Process a synchro request. */
>   static int
> -apply_synchro_row(struct xrow_header *row)
> +apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
>   {
>   	assert(iproto_type_is_synchro_request(row->type));
>   
> @@ -805,6 +845,7 @@ apply_synchro_row(struct xrow_header *row)
>   	if (xrow_decode_synchro(row, &req) != 0)
>   		goto err;
>   
> +	struct replica_cb_data rcb_data;
>   	struct synchro_entry entry;
>   	/*
>   	 * Rows array is cast from *[] to **, because otherwise g++ complains
> @@ -817,6 +858,11 @@ apply_synchro_row(struct xrow_header *row)
>   			     apply_synchro_row_cb, &entry);
>   	entry.req = &req;
>   	entry.owner = fiber();
> +
> +	rcb_data.replica_id = replica_id;
> +	rcb_data.txn_last_tm = row->tm;
> +	entry.rcb = &rcb_data;
> +
>   	/*
>   	 * The WAL write is blocking. Otherwise it might happen that a CONFIRM
>   	 * or ROLLBACK is sent to WAL, and it would empty the limbo, but before
> @@ -864,8 +910,9 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
>   	return box_raft_process(&req, applier->instance_id);
>   }
>   
> -static inline int
> -apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
> +static int
> +apply_plain_tx(uint32_t replica_id, struct stailq *rows,
> +	       bool skip_conflict, bool use_triggers)
>   {
>   	/*
>   	 * Explicitly begin the transaction so that we can
> @@ -933,10 +980,28 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
>   			goto fail;
>   		}
>   
> +		struct replica_cb_data *rcb;
> +		rcb = region_alloc_object(&txn->region, typeof(*rcb), &size);
> +		if (rcb == NULL) {
> +			diag_set(OutOfMemory, size, "region_alloc_object", "rcb");
> +			goto fail;
> +		}
> +
>   		trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
>   		txn_on_rollback(txn, on_rollback);
>   
> -		trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
> +		/*
> +		 * We use *last* entry timestamp because ack comes up to
> +		 * last entry in transaction. Same time this shows more
> +		 * precise result because we're interested in how long
> +		 * transaction traversed network + remote WAL bundle before
> +		 * ack get received.
> +		 */
> +		item = stailq_last_entry(rows, struct applier_tx_row, next);
> +		rcb->replica_id = replica_id;
> +		rcb->txn_last_tm = item->row.tm;
> +
> +		trigger_create(on_wal_write, applier_txn_wal_write_cb, rcb, NULL);
>   		txn_on_wal_write(txn, on_wal_write);
>   	}
>   
> @@ -948,7 +1013,7 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
>   
>   /** A simpler version of applier_apply_tx() for final join stage. */
>   static int
> -apply_final_join_tx(struct stailq *rows)
> +apply_final_join_tx(uint32_t replica_id, struct stailq *rows)
>   {
>   	struct xrow_header *first_row =
>   		&stailq_first_entry(rows, struct applier_tx_row, next)->row;
> @@ -959,9 +1024,9 @@ apply_final_join_tx(struct stailq *rows)
>   	vclock_follow_xrow(&replicaset.vclock, last_row);
>   	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
>   		assert(first_row == last_row);
> -		rc = apply_synchro_row(first_row);
> +		rc = apply_synchro_row(replica_id, first_row);
>   	} else {
> -		rc = apply_plain_tx(rows, false, false);
> +		rc = apply_plain_tx(replica_id, rows, false, false);
>   	}
>   	fiber_gc();
>   	return rc;
> @@ -1090,12 +1155,14 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>   		 * each other.
>   		 */
>   		assert(first_row == last_row);
> -		if ((rc = apply_synchro_row(first_row)) != 0)
> -			goto finish;
> -	} else if ((rc = apply_plain_tx(rows, replication_skip_conflict,
> -					true)) != 0) {
> -		goto finish;
> +		rc = apply_synchro_row(applier->instance_id, first_row);
> +	} else {
> +		rc = apply_plain_tx(applier->instance_id, rows,
> +				    replication_skip_conflict, true);
>   	}
> +	if (rc != 0)
> +		goto finish;
> +
>   	vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
>   		      last_row->lsn);
>   finish:
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index 903390686..a0b3e0186 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -184,6 +184,7 @@ replica_new(void)
>   	trigger_create(&replica->on_applier_state,
>   		       replica_on_applier_state_f, NULL, NULL);
>   	replica->applier_sync_state = APPLIER_DISCONNECTED;
> +	replica->applier_txn_last_tm = 0;
>   	latch_create(&replica->order_latch);
>   	return replica;
>   }
> diff --git a/src/box/replication.h b/src/box/replication.h
> index 5cc380373..57e0f10ae 100644
> --- a/src/box/replication.h
> +++ b/src/box/replication.h
> @@ -331,6 +331,11 @@ struct replica {
>   	 * separate from applier.
>   	 */
>   	enum applier_state applier_sync_state;
> +	/**
> +	 * Applier's last written to WAL transaction timestamp.
> +	 * Needed for relay lagging statistics.
> +	 */
> +	double applier_txn_last_tm;
>   	/* The latch is used to order replication requests. */
>   	struct latch order_latch;
>   };

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [Tarantool-patches] [PATCH v9 1/2] applier: send transaction's first row WAL time in the applier_writer_f
  2021-06-18  9:51   ` Serge Petrenko via Tarantool-patches
@ 2021-06-18 18:06     ` Cyrill Gorcunov via Tarantool-patches
  2021-06-21  8:35       ` Serge Petrenko via Tarantool-patches
  0 siblings, 1 reply; 11+ messages in thread
From: Cyrill Gorcunov via Tarantool-patches @ 2021-06-18 18:06 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: tml, Vladislav Shpilevoy

Serge, I've update the patch and force pushed it, please take a look.
---
From: Cyrill Gorcunov <gorcunov@gmail.com>
Subject: [PATCH] relay: provide information about downstream lag

We already have `box.replication.upstream.lag` entry for monitoring
sake. Same time in synchronous replication timeouts are key properties
for quorum gathering procedure. Thus we would like to know how long
it took of a transaction to traverse `initiator WAL -> network ->
remote applier -> initiator ACK reception` path.

Typical output is

 | tarantool> box.info.replication[2].downstream
 | ---
 | - status: follow
 |   idle: 0.61753897101153
 |   vclock: {1: 147}
 |   lag: 0
 | ...
 | tarantool> box.space.sync:insert{69}
 | ---
 | - [69]
 | ...
 |
 | tarantool> box.info.replication[2].downstream
 | ---
 | - status: follow
 |   idle: 0.75324084801832
 |   vclock: {1: 151}
 |   lag: 0.0011014938354492
 | ...

Closes #5447

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>

@TarantoolBot document
Title: Add `box.info.replication[n].downstream.lag` entry

`replication[n].downstream.lag` represents a lag between the main
node writes a certain transaction to it's own WAL and a moment it
receives an ack for this transaction from a replica.
---
 .../unreleased/gh-5447-downstream-lag.md      |   6 +
 src/box/lua/info.c                            |   3 +
 src/box/relay.cc                              |  58 ++++++++
 src/box/relay.h                               |   6 +
 .../replication/gh-5447-downstream-lag.result | 128 ++++++++++++++++++
 .../gh-5447-downstream-lag.test.lua           |  57 ++++++++
 6 files changed, 258 insertions(+)
 create mode 100644 changelogs/unreleased/gh-5447-downstream-lag.md
 create mode 100644 test/replication/gh-5447-downstream-lag.result
 create mode 100644 test/replication/gh-5447-downstream-lag.test.lua

diff --git a/changelogs/unreleased/gh-5447-downstream-lag.md b/changelogs/unreleased/gh-5447-downstream-lag.md
new file mode 100644
index 000000000..f937ce35e
--- /dev/null
+++ b/changelogs/unreleased/gh-5447-downstream-lag.md
@@ -0,0 +1,6 @@
+#feature/replication
+
+ * Introduced `box.info.replication[n].downstream.lag` field to monitor
+   state of replication. This member represents a lag between the main
+   node writes a certain transaction to it's own WAL and a moment it
+   receives an ack for this transaction from a replica (gh-5447).
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index 0eb48b823..f201b25e3 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -143,6 +143,9 @@ lbox_pushrelay(lua_State *L, struct relay *relay)
 		lua_pushnumber(L, ev_monotonic_now(loop()) -
 			       relay_last_row_time(relay));
 		lua_settable(L, -3);
+		lua_pushstring(L, "lag");
+		lua_pushnumber(L, relay_txn_lag(relay));
+		lua_settable(L, -3);
 		break;
 	case RELAY_STOPPED:
 	{
diff --git a/src/box/relay.cc b/src/box/relay.cc
index b1571b361..14c9b0f03 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -68,6 +68,8 @@ struct relay_status_msg {
 	struct relay *relay;
 	/** Replica vclock. */
 	struct vclock vclock;
+	/** Last replicated transaction timestamp. */
+	double txn_lag;
 };
 
 /**
@@ -158,6 +160,19 @@ struct relay {
 	struct stailq pending_gc;
 	/** Time when last row was sent to peer. */
 	double last_row_time;
+	/**
+	 * Last timestamp observed from remote node to
+	 * compute @a txn_lag value.
+	 */
+	double txn_acked_tm;
+	/**
+	 * A time difference between the moment when we
+	 * wrote a transaction to the local WAL and when
+	 * this transaction has been replicated to remote
+	 * node (ie written to node's WAL) so that ACK get
+	 * received.
+	 */
+	double txn_lag;
 	/** Relay sync state. */
 	enum relay_state state;
 
@@ -166,6 +181,11 @@ struct relay {
 		alignas(CACHELINE_SIZE)
 		/** Known relay vclock. */
 		struct vclock vclock;
+		/**
+		 * Transaction downstream lag to be accessed
+		 * from TX thread only.
+		 */
+		double txn_lag;
 		/**
 		 * True if the relay needs Raft updates. It can live fine
 		 * without sending Raft updates, if it is a relay to an
@@ -217,6 +237,12 @@ relay_last_row_time(const struct relay *relay)
 	return relay->last_row_time;
 }
 
+double
+relay_txn_lag(const struct relay *relay)
+{
+	return relay->tx.txn_lag;
+}
+
 static void
 relay_send(struct relay *relay, struct xrow_header *packet);
 static void
@@ -284,6 +310,15 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
 	relay->state = RELAY_FOLLOW;
 	relay->row_count = 0;
 	relay->last_row_time = ev_monotonic_now(loop());
+	/*
+	 * We assume that previously written rows in WAL
+	 * are older than current node real time which allows
+	 * to simplify @a tx.txn_lag calculation. In worst
+	 * scenario when runtime has been adjusted backwards
+	 * between restart we simply get some big value in
+	 * @a tx.txn_lag until next transaction get replicated.
+	 */
+	relay->txn_acked_tm = ev_now(loop());
 }
 
 void
@@ -336,6 +371,13 @@ relay_stop(struct relay *relay)
 	 * upon cord_create().
 	 */
 	relay->cord.id = 0;
+	/*
+	 * If relay is stopped then lag statistics should
+	 * be updated on next new ACK packets obtained.
+	 */
+	relay->txn_acked_tm = 0;
+	relay->txn_lag = 0;
+	relay->tx.txn_lag = 0;
 }
 
 void
@@ -483,6 +525,8 @@ tx_status_update(struct cmsg *msg)
 {
 	struct relay_status_msg *status = (struct relay_status_msg *)msg;
 	vclock_copy(&status->relay->tx.vclock, &status->vclock);
+	status->relay->tx.txn_lag = status->txn_lag;
+
 	struct replication_ack ack;
 	ack.source = status->relay->replica->id;
 	ack.vclock = &status->vclock;
@@ -629,6 +673,19 @@ relay_reader_f(va_list ap)
 			/* vclock is followed while decoding, zeroing it. */
 			vclock_create(&relay->recv_vclock);
 			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
+			/*
+			 * Replica send us last replicated transaction
+			 * timestamp which is needed for relay lag
+			 * monitoring. Note that this transaction has
+			 * been written to WAL with our current realtime
+			 * clock value, thus when it get reported back we
+			 * can compute time spent regardless of the clock
+			 * value on remote replica.
+			 */
+			if (relay->txn_acked_tm < xrow.tm) {
+				relay->txn_acked_tm = xrow.tm;
+				relay->txn_lag = ev_now(loop()) - xrow.tm;
+			}
 			fiber_cond_signal(&relay->reader_cond);
 		}
 	} catch (Exception *e) {
@@ -838,6 +895,7 @@ relay_subscribe_f(va_list ap)
 		};
 		cmsg_init(&relay->status_msg.msg, route);
 		vclock_copy(&relay->status_msg.vclock, send_vclock);
+		relay->status_msg.txn_lag = relay->txn_lag;
 		relay->status_msg.relay = relay;
 		cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
 	}
diff --git a/src/box/relay.h b/src/box/relay.h
index b32e2ea2a..615ffb75d 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -93,6 +93,12 @@ relay_vclock(const struct relay *relay);
 double
 relay_last_row_time(const struct relay *relay);
 
+/**
+ * Returns relay's transaction's lag.
+ */
+double
+relay_txn_lag(const struct relay *relay);
+
 /**
  * Send a Raft update request to the relay channel. It is not
  * guaranteed that it will be delivered. The connection may break.
diff --git a/test/replication/gh-5447-downstream-lag.result b/test/replication/gh-5447-downstream-lag.result
new file mode 100644
index 000000000..2cc020451
--- /dev/null
+++ b/test/replication/gh-5447-downstream-lag.result
@@ -0,0 +1,128 @@
+-- test-run result file version 2
+--
+-- gh-5447: Test for box.info.replication[n].downstream.lag.
+-- We need to be sure that slow ACKs delivery might be
+-- caught by monitoring tools.
+--
+
+fiber = require('fiber')
+ | ---
+ | ...
+test_run = require('test_run').new()
+ | ---
+ | ...
+engine = test_run:get_cfg('engine')
+ | ---
+ | ...
+
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+
+test_run:cmd('create server replica with rpl_master=default, \
+             script="replication/replica.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica')
+ | ---
+ | - true
+ | ...
+
+replica_id = test_run:get_server_id('replica')
+ | ---
+ | ...
+
+--
+-- Upon replica startup there is no ACKs to process.
+assert(box.info.replication[replica_id].downstream.lag == 0)
+ | ---
+ | - true
+ | ...
+
+s = box.schema.space.create('test', {engine = engine})
+ | ---
+ | ...
+_ = s:create_index('pk')
+ | ---
+ | ...
+
+--
+-- The replica should wait some time before writing data
+-- to the WAL, otherwise we might not even notice the lag
+-- if media is too fast. Before disabling WAL we need to
+-- wait the space get propagated.
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+test_run:wait_lsn('replica', 'default')
+ | ---
+ | ...
+box.error.injection.set("ERRINJ_WAL_DELAY", true)
+ | ---
+ | - ok
+ | ...
+
+--
+-- Insert a record and wakeup replica's WAL to process data.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+lsn = box.info.lsn
+ | ---
+ | ...
+box.space.test:insert({1})
+ | ---
+ | - [1]
+ | ...
+test_run:wait_cond(function() return box.info.lsn > lsn end)
+ | ---
+ | - true
+ | ...
+-- The record is written on the master node.
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+box.error.injection.set("ERRINJ_WAL_DELAY", false)
+ | ---
+ | - ok
+ | ...
+
+--
+-- Wait the record to be ACKed, the lag value should be nonzero.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:wait_lsn('replica', 'default')
+ | ---
+ | ...
+assert(box.info.replication[replica_id].downstream.lag > 0)
+ | ---
+ | - true
+ | ...
+
+--
+-- Cleanup everything.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('cleanup server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
diff --git a/test/replication/gh-5447-downstream-lag.test.lua b/test/replication/gh-5447-downstream-lag.test.lua
new file mode 100644
index 000000000..3096e2ac3
--- /dev/null
+++ b/test/replication/gh-5447-downstream-lag.test.lua
@@ -0,0 +1,57 @@
+--
+-- gh-5447: Test for box.info.replication[n].downstream.lag.
+-- We need to be sure that slow ACKs delivery might be
+-- caught by monitoring tools.
+--
+
+fiber = require('fiber')
+test_run = require('test_run').new()
+engine = test_run:get_cfg('engine')
+
+box.schema.user.grant('guest', 'replication')
+
+test_run:cmd('create server replica with rpl_master=default, \
+             script="replication/replica.lua"')
+test_run:cmd('start server replica')
+
+replica_id = test_run:get_server_id('replica')
+
+--
+-- Upon replica startup there is no ACKs to process.
+assert(box.info.replication[replica_id].downstream.lag == 0)
+
+s = box.schema.space.create('test', {engine = engine})
+_ = s:create_index('pk')
+
+--
+-- The replica should wait some time before writing data
+-- to the WAL, otherwise we might not even notice the lag
+-- if media is too fast. Before disabling WAL we need to
+-- wait the space get propagated.
+test_run:switch('replica')
+test_run:wait_lsn('replica', 'default')
+box.error.injection.set("ERRINJ_WAL_DELAY", true)
+
+--
+-- Insert a record and wakeup replica's WAL to process data.
+test_run:switch('default')
+lsn = box.info.lsn
+box.space.test:insert({1})
+test_run:wait_cond(function() return box.info.lsn > lsn end)
+-- The record is written on the master node.
+test_run:switch('replica')
+box.error.injection.set("ERRINJ_WAL_DELAY", false)
+
+--
+-- Wait the record to be ACKed, the lag value should be nonzero.
+test_run:switch('default')
+test_run:wait_lsn('replica', 'default')
+assert(box.info.replication[replica_id].downstream.lag > 0)
+
+--
+-- Cleanup everything.
+test_run:switch('default')
+box.schema.user.revoke('guest', 'replication')
+test_run:cmd('stop server replica')
+test_run:cmd('cleanup server replica')
+test_run:cmd('delete server replica')
-- 
2.31.1


^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag
  2021-06-17 15:48 ` [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag Cyrill Gorcunov via Tarantool-patches
  2021-06-18  9:50   ` Serge Petrenko via Tarantool-patches
@ 2021-06-20 14:37   ` Vladislav Shpilevoy via Tarantool-patches
  2021-06-21  8:44     ` Cyrill Gorcunov via Tarantool-patches
  2021-06-21 16:17     ` Cyrill Gorcunov via Tarantool-patches
  1 sibling, 2 replies; 11+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-06-20 14:37 UTC (permalink / raw)
  To: Cyrill Gorcunov, tml

Hi! Thanks for the patch!

The test fails when I run it multiple times:

[014] Test failed! Result content mismatch:
[014] --- replication/gh-5447-downstream-lag.result	Sun Jun 20 16:10:26 2021
[014] +++ var/rejects/replication/gh-5447-downstream-lag.reject	Sun Jun 20 16:33:01 2021
[014] @@ -37,7 +37,7 @@
[014]  -- Upon replica startup there is no ACKs to process.
[014]  assert(box.info.replication[replica_id].downstream.lag == 0)
[014]   | ---
[014] - | - true
[014] + | - error: assertion failed!

See 4 comments below.

> @@ -629,6 +673,19 @@ relay_reader_f(va_list ap)
>  			/* vclock is followed while decoding, zeroing it. */
>  			vclock_create(&relay->recv_vclock);
>  			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
> +			/*
> +			 * Replica send us last replicated transaction
> +			 * timestamp which is needed for relay lag
> +			 * monitoring. Note that this transaction has
> +			 * been written to WAL with our current realtime
> +			 * clock value, thus when it get reported back we
> +			 * can compute time spent regardless of the clock
> +			 * value on remote replica.
> +			 */
> +			if (relay->txn_acked_tm < xrow.tm) {

1. Why do you need this `if`? Why xrow.tm != 0 is not enough? (It is 0
when replicate to old versions.) ACKs are sent in the same order as the
rows, so there can't be any reordering.

If it is intended to be used against time changes, this check won't work
it seems. If time is moved far into the future, the check passes and the
lag will be huge for some time. No protection. And there can't be.

If the time is moved far into the past, the check will freeze for the
time shift size. Even when all the old transactions are acked and new
ones are coming. Because you cached txn_acked_tm in the old time system.
No protection either. Looks even like a bug, because the lag freezes
regardless of whether there are new transactions ACKed with the new time
system or not. It will wait for the new time system to catch up with the
old txn_acked_tm.

If the timestamp is not needed, you can drop txn_acked_tm member from
struct relay.

> diff --git a/test/replication/gh-5447-downstream-lag.result b/test/replication/gh-5447-downstream-lag.result
> new file mode 100644
> index 000000000..2cc020451
> --- /dev/null
> +++ b/test/replication/gh-5447-downstream-lag.result
> @@ -0,0 +1,128 @@

<...>

> +-- Insert a record and wakeup replica's WAL to process data.
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +lsn = box.info.lsn
> + | ---
> + | ...
> +box.space.test:insert({1})
> + | ---
> + | - [1]
> + | ...
> +test_run:wait_cond(function() return box.info.lsn > lsn end)

2. You don't need it. You did blocking insert(), which returns only
after the WAL write is done.

> + | ---
> + | - true
> + | ...
> +-- The record is written on the master node.
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +box.error.injection.set("ERRINJ_WAL_DELAY", false)
> + | ---
> + | - ok
> + | ...
> +
> +--
> +-- Wait the record to be ACKed, the lag value should be nonzero.
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +test_run:wait_lsn('replica', 'default')
> + | ---
> + | ...
> +assert(box.info.replication[replica_id].downstream.lag > 0)
> + | ---
> + | - true
> + | ...
> +
> +--
> +-- Cleanup everything.
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +box.schema.user.revoke('guest', 'replication')

3. You didn't drop the space.

> + | ---
> + | ...
> +test_run:cmd('stop server replica')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('cleanup server replica')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('delete server replica')
> + | ---
> + | - true
> + | ...
4. Your test uses error injections. It means it must be configured
not to run in Release build. See replication/suite.ini.


^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [Tarantool-patches] [PATCH v9 1/2] applier: send transaction's first row WAL time in the applier_writer_f
  2021-06-18 18:06     ` Cyrill Gorcunov via Tarantool-patches
@ 2021-06-21  8:35       ` Serge Petrenko via Tarantool-patches
  0 siblings, 0 replies; 11+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-06-21  8:35 UTC (permalink / raw)
  To: Cyrill Gorcunov; +Cc: tml, Vladislav Shpilevoy



18.06.2021 21:06, Cyrill Gorcunov пишет:
> Serge, I've update the patch and force pushed it, please take a look.

Thanks for the fixes!
LGTM.
> ---
> From: Cyrill Gorcunov <gorcunov@gmail.com>
> Subject: [PATCH] relay: provide information about downstream lag
>
> We already have `box.replication.upstream.lag` entry for monitoring
> sake. Same time in synchronous replication timeouts are key properties
> for quorum gathering procedure. Thus we would like to know how long
> it took of a transaction to traverse `initiator WAL -> network ->
> remote applier -> initiator ACK reception` path.
>
> Typical output is
>
>   | tarantool> box.info.replication[2].downstream
>   | ---
>   | - status: follow
>   |   idle: 0.61753897101153
>   |   vclock: {1: 147}
>   |   lag: 0
>   | ...
>   | tarantool> box.space.sync:insert{69}
>   | ---
>   | - [69]
>   | ...
>   |
>   | tarantool> box.info.replication[2].downstream
>   | ---
>   | - status: follow
>   |   idle: 0.75324084801832
>   |   vclock: {1: 151}
>   |   lag: 0.0011014938354492
>   | ...
>
> Closes #5447
>
> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
>
> @TarantoolBot document
> Title: Add `box.info.replication[n].downstream.lag` entry
>
> `replication[n].downstream.lag` represents a lag between the main
> node writes a certain transaction to it's own WAL and a moment it
> receives an ack for this transaction from a replica.
> ---
>

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag
  2021-06-20 14:37   ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-06-21  8:44     ` Cyrill Gorcunov via Tarantool-patches
  2021-06-21 16:17     ` Cyrill Gorcunov via Tarantool-patches
  1 sibling, 0 replies; 11+ messages in thread
From: Cyrill Gorcunov via Tarantool-patches @ 2021-06-21  8:44 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tml

On Sun, Jun 20, 2021 at 04:37:21PM +0200, Vladislav Shpilevoy wrote:
> Hi! Thanks for the patch!
> 
> The test fails when I run it multiple times:
> 
> [014] Test failed! Result content mismatch:
> [014] --- replication/gh-5447-downstream-lag.result	Sun Jun 20 16:10:26 2021
> [014] +++ var/rejects/replication/gh-5447-downstream-lag.reject	Sun Jun 20 16:33:01 2021
> [014] @@ -37,7 +37,7 @@
> [014]  -- Upon replica startup there is no ACKs to process.
> [014]  assert(box.info.replication[replica_id].downstream.lag == 0)
> [014]   | ---
> [014] - | - true
> [014] + | - error: assertion failed!
> 
> See 4 comments below.

Hmm, didn't trigger on my machine. Gimme some time I'll try to hit this problem.

> 
> > @@ -629,6 +673,19 @@ relay_reader_f(va_list ap)
> >  			/* vclock is followed while decoding, zeroing it. */
> >  			vclock_create(&relay->recv_vclock);
> >  			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
> > +			/*
> > +			 * Replica send us last replicated transaction
> > +			 * timestamp which is needed for relay lag
> > +			 * monitoring. Note that this transaction has
> > +			 * been written to WAL with our current realtime
> > +			 * clock value, thus when it get reported back we
> > +			 * can compute time spent regardless of the clock
> > +			 * value on remote replica.
> > +			 */
> > +			if (relay->txn_acked_tm < xrow.tm) {
> 
> 1. Why do you need this `if`? Why xrow.tm != 0 is not enough? (It is 0
> when replicate to old versions.) ACKs are sent in the same order as the
> rows, so there can't be any reordering.

The main purpose is to prevent the case where a peer sends us negative value,
for some reason. Nit sure though maybe we should not hide such case but
rather point out that there some weird node spamming us. I tend to agree
that comparision with zero might be more straightforward, will do.

> If it is intended to be used against time changes, this check won't work
> it seems. If time is moved far into the future, the check passes and the
> lag will be huge for some time. No protection. And there can't be.
> 
> If the time is moved far into the past, the check will freeze for the
> time shift size. Even when all the old transactions are acked and new
> ones are coming. Because you cached txn_acked_tm in the old time system.
> No protection either. Looks even like a bug, because the lag freezes
> regardless of whether there are new transactions ACKed with the new time
> system or not. It will wait for the new time system to catch up with the
> old txn_acked_tm.
> 
> If the timestamp is not needed, you can drop txn_acked_tm member from
> struct relay.

I'll drop this variable, and yes, there is no protection from clocks adjustments,
and it can't be fixed (for current code base).

> > +-- Insert a record and wakeup replica's WAL to process data.
> > +test_run:switch('default')
> > + | ---
> > + | - true
> > + | ...
> > +lsn = box.info.lsn
> > + | ---
> > + | ...
> > +box.space.test:insert({1})
> > + | ---
> > + | - [1]
> > + | ...
> > +test_run:wait_cond(function() return box.info.lsn > lsn end)
> 
> 2. You don't need it. You did blocking insert(), which returns only
> after the WAL write is done.

Yeah, it's redundant but harmless check, thanks.

> > +-- Cleanup everything.
> > +test_run:switch('default')
> > + | ---
> > + | - true
> > + | ...
> > +box.schema.user.revoke('guest', 'replication')
> 
> 3. You didn't drop the space.

Good catch, thanks!

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag
  2021-06-20 14:37   ` Vladislav Shpilevoy via Tarantool-patches
  2021-06-21  8:44     ` Cyrill Gorcunov via Tarantool-patches
@ 2021-06-21 16:17     ` Cyrill Gorcunov via Tarantool-patches
  2021-06-21 21:16       ` Vladislav Shpilevoy via Tarantool-patches
  1 sibling, 1 reply; 11+ messages in thread
From: Cyrill Gorcunov via Tarantool-patches @ 2021-06-21 16:17 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tml

On Sun, Jun 20, 2021 at 04:37:21PM +0200, Vladislav Shpilevoy wrote:
> Hi! Thanks for the patch!
> 
> The test fails when I run it multiple times:
> 
> [014] Test failed! Result content mismatch:
> [014] --- replication/gh-5447-downstream-lag.result	Sun Jun 20 16:10:26 2021
> [014] +++ var/rejects/replication/gh-5447-downstream-lag.reject	Sun Jun 20 16:33:01 2021
> [014] @@ -37,7 +37,7 @@
> [014]  -- Upon replica startup there is no ACKs to process.
> [014]  assert(box.info.replication[replica_id].downstream.lag == 0)
> [014]   | ---
> [014] - | - true
> [014] + | - error: assertion failed!
> 
> See 4 comments below.

Vlad, here is an update, I force pushed it into the same branch.
I'll fix the error injection nit. Could you please retry the
test to run simultaneously (I did it locally with 200 tests
but it didn't trigger anything). I rebased the series on top
of master.
---
From da969da89beab720c91c7e895613ab9cf6ab2ea7 Mon Sep 17 00:00:00 2001
From: Cyrill Gorcunov <gorcunov@gmail.com>
Date: Mon, 21 Jun 2021 14:30:52 +0300
Subject: [PATCH] Update

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/relay.cc                              | 19 +------------------
 .../replication/gh-5447-downstream-lag.result | 10 +++-------
 .../gh-5447-downstream-lag.test.lua           |  3 +--
 3 files changed, 5 insertions(+), 27 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 14c9b0f03..115037fc3 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -160,11 +160,6 @@ struct relay {
 	struct stailq pending_gc;
 	/** Time when last row was sent to peer. */
 	double last_row_time;
-	/**
-	 * Last timestamp observed from remote node to
-	 * compute @a txn_lag value.
-	 */
-	double txn_acked_tm;
 	/**
 	 * A time difference between the moment when we
 	 * wrote a transaction to the local WAL and when
@@ -310,15 +305,6 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
 	relay->state = RELAY_FOLLOW;
 	relay->row_count = 0;
 	relay->last_row_time = ev_monotonic_now(loop());
-	/*
-	 * We assume that previously written rows in WAL
-	 * are older than current node real time which allows
-	 * to simplify @a tx.txn_lag calculation. In worst
-	 * scenario when runtime has been adjusted backwards
-	 * between restart we simply get some big value in
-	 * @a tx.txn_lag until next transaction get replicated.
-	 */
-	relay->txn_acked_tm = ev_now(loop());
 }
 
 void
@@ -375,7 +361,6 @@ relay_stop(struct relay *relay)
 	 * If relay is stopped then lag statistics should
 	 * be updated on next new ACK packets obtained.
 	 */
-	relay->txn_acked_tm = 0;
 	relay->txn_lag = 0;
 	relay->tx.txn_lag = 0;
 }
@@ -682,10 +667,8 @@ relay_reader_f(va_list ap)
 			 * can compute time spent regardless of the clock
 			 * value on remote replica.
 			 */
-			if (relay->txn_acked_tm < xrow.tm) {
-				relay->txn_acked_tm = xrow.tm;
+			if (xrow.tm != 0)
 				relay->txn_lag = ev_now(loop()) - xrow.tm;
-			}
 			fiber_cond_signal(&relay->reader_cond);
 		}
 	} catch (Exception *e) {
diff --git a/test/replication/gh-5447-downstream-lag.result b/test/replication/gh-5447-downstream-lag.result
index 2cc020451..0d5de2564 100644
--- a/test/replication/gh-5447-downstream-lag.result
+++ b/test/replication/gh-5447-downstream-lag.result
@@ -70,17 +70,10 @@ test_run:switch('default')
  | ---
  | - true
  | ...
-lsn = box.info.lsn
- | ---
- | ...
 box.space.test:insert({1})
  | ---
  | - [1]
  | ...
-test_run:wait_cond(function() return box.info.lsn > lsn end)
- | ---
- | - true
- | ...
 -- The record is written on the master node.
 test_run:switch('replica')
  | ---
@@ -111,6 +104,9 @@ test_run:switch('default')
  | ---
  | - true
  | ...
+box.space.test:drop()
+ | ---
+ | ...
 box.schema.user.revoke('guest', 'replication')
  | ---
  | ...
diff --git a/test/replication/gh-5447-downstream-lag.test.lua b/test/replication/gh-5447-downstream-lag.test.lua
index 3096e2ac3..dd1d2e2c9 100644
--- a/test/replication/gh-5447-downstream-lag.test.lua
+++ b/test/replication/gh-5447-downstream-lag.test.lua
@@ -35,9 +35,7 @@ box.error.injection.set("ERRINJ_WAL_DELAY", true)
 --
 -- Insert a record and wakeup replica's WAL to process data.
 test_run:switch('default')
-lsn = box.info.lsn
 box.space.test:insert({1})
-test_run:wait_cond(function() return box.info.lsn > lsn end)
 -- The record is written on the master node.
 test_run:switch('replica')
 box.error.injection.set("ERRINJ_WAL_DELAY", false)
@@ -51,6 +49,7 @@ assert(box.info.replication[replica_id].downstream.lag > 0)
 --
 -- Cleanup everything.
 test_run:switch('default')
+box.space.test:drop()
 box.schema.user.revoke('guest', 'replication')
 test_run:cmd('stop server replica')
 test_run:cmd('cleanup server replica')
-- 
2.31.1


^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag
  2021-06-21 16:17     ` Cyrill Gorcunov via Tarantool-patches
@ 2021-06-21 21:16       ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 0 replies; 11+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-06-21 21:16 UTC (permalink / raw)
  To: Cyrill Gorcunov; +Cc: tml

Thanks for the fixes!

On 21.06.2021 18:17, Cyrill Gorcunov wrote:
> On Sun, Jun 20, 2021 at 04:37:21PM +0200, Vladislav Shpilevoy wrote:
>> Hi! Thanks for the patch!
>>
>> The test fails when I run it multiple times:
>>
>> [014] Test failed! Result content mismatch:
>> [014] --- replication/gh-5447-downstream-lag.result	Sun Jun 20 16:10:26 2021
>> [014] +++ var/rejects/replication/gh-5447-downstream-lag.reject	Sun Jun 20 16:33:01 2021
>> [014] @@ -37,7 +37,7 @@
>> [014]  -- Upon replica startup there is no ACKs to process.
>> [014]  assert(box.info.replication[replica_id].downstream.lag == 0)
>> [014]   | ---
>> [014] - | - true
>> [014] + | - error: assertion failed!
>>
>> See 4 comments below.
> 
> Vlad, here is an update, I force pushed it into the same branch.
> I'll fix the error injection nit. Could you please retry the
> test to run simultaneously (I did it locally with 200 tests
> but it didn't trigger anything). I rebased the series on top
> of master.

Strangely, I couldn't reproduce it again. Neither with the old
version, nor with the updated one.

The fixes look good. Please, squash, and fix the error injection
comment.

^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2021-06-21 21:16 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-06-17 15:48 [Tarantool-patches] [PATCH v9 0/2] relay: provide downstream lag information Cyrill Gorcunov via Tarantool-patches
2021-06-17 15:48 ` [Tarantool-patches] [PATCH v9 1/2] applier: send transaction's first row WAL time in the applier_writer_f Cyrill Gorcunov via Tarantool-patches
2021-06-18  9:51   ` Serge Petrenko via Tarantool-patches
2021-06-18 18:06     ` Cyrill Gorcunov via Tarantool-patches
2021-06-21  8:35       ` Serge Petrenko via Tarantool-patches
2021-06-17 15:48 ` [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag Cyrill Gorcunov via Tarantool-patches
2021-06-18  9:50   ` Serge Petrenko via Tarantool-patches
2021-06-20 14:37   ` Vladislav Shpilevoy via Tarantool-patches
2021-06-21  8:44     ` Cyrill Gorcunov via Tarantool-patches
2021-06-21 16:17     ` Cyrill Gorcunov via Tarantool-patches
2021-06-21 21:16       ` Vladislav Shpilevoy via Tarantool-patches

Tarantool development patches archive

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://lists.tarantool.org/tarantool-patches/0 tarantool-patches/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 tarantool-patches tarantool-patches/ https://lists.tarantool.org/tarantool-patches \
		tarantool-patches@dev.tarantool.org.
	public-inbox-index tarantool-patches

Example config snippet for mirrors.


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git