Tarantool development patches archive
 help / color / mirror / Atom feed
From: Cyrill Gorcunov via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: tml <tarantool-patches@dev.tarantool.org>
Cc: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Subject: [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag
Date: Thu, 17 Jun 2021 18:48:35 +0300	[thread overview]
Message-ID: <20210617154835.315576-3-gorcunov@gmail.com> (raw)
In-Reply-To: <20210617154835.315576-1-gorcunov@gmail.com>

We already have `box.replication.upstream.lag` entry for monitoring
sake. Same time in synchronous replication timeouts are key properties
for quorum gathering procedure. Thus we would like to know how long
it took of a transaction to traverse `initiator WAL -> network ->
remote applier -> initiator ACK reception` path.

Typical output is

 | tarantool> box.info.replication[2].downstream
 | ---
 | - status: follow
 |   idle: 0.61753897101153
 |   vclock: {1: 147}
 |   lag: 0
 | ...
 | tarantool> box.space.sync:insert{69}
 | ---
 | - [69]
 | ...
 |
 | tarantool> box.info.replication[2].downstream
 | ---
 | - status: follow
 |   idle: 0.75324084801832
 |   vclock: {1: 151}
 |   lag: 0.0011014938354492
 | ...

Closes #5447

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>

@TarantoolBot document
Title: Add `box.info.replication[n].downstream.lag` entry

`replication[n].downstream.lag` represents a lag between the main
node writes a certain transaction to it's own WAL and a moment it
receives an ack for this transaction from a replica.
---
 .../unreleased/gh-5447-downstream-lag.md      |   6 +
 src/box/lua/info.c                            |   3 +
 src/box/relay.cc                              |  94 ++++++++++++-
 src/box/relay.h                               |   6 +
 .../replication/gh-5447-downstream-lag.result | 128 ++++++++++++++++++
 .../gh-5447-downstream-lag.test.lua           |  57 ++++++++
 6 files changed, 290 insertions(+), 4 deletions(-)
 create mode 100644 changelogs/unreleased/gh-5447-downstream-lag.md
 create mode 100644 test/replication/gh-5447-downstream-lag.result
 create mode 100644 test/replication/gh-5447-downstream-lag.test.lua

diff --git a/changelogs/unreleased/gh-5447-downstream-lag.md b/changelogs/unreleased/gh-5447-downstream-lag.md
new file mode 100644
index 000000000..f937ce35e
--- /dev/null
+++ b/changelogs/unreleased/gh-5447-downstream-lag.md
@@ -0,0 +1,6 @@
+#feature/replication
+
+ * Introduced `box.info.replication[n].downstream.lag` field to monitor
+   state of replication. This member represents a lag between the main
+   node writes a certain transaction to it's own WAL and a moment it
+   receives an ack for this transaction from a replica (gh-5447).
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index 0eb48b823..f201b25e3 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -143,6 +143,9 @@ lbox_pushrelay(lua_State *L, struct relay *relay)
 		lua_pushnumber(L, ev_monotonic_now(loop()) -
 			       relay_last_row_time(relay));
 		lua_settable(L, -3);
+		lua_pushstring(L, "lag");
+		lua_pushnumber(L, relay_txn_lag(relay));
+		lua_settable(L, -3);
 		break;
 	case RELAY_STOPPED:
 	{
diff --git a/src/box/relay.cc b/src/box/relay.cc
index b1571b361..ed6dd193d 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -70,6 +70,18 @@ struct relay_status_msg {
 	struct vclock vclock;
 };
 
+/**
+ * Cbus message to update relay reader fiber status in tx thread.
+ */
+struct relay_reader_msg {
+	/** Parent */
+	struct cmsg msg;
+	/** Relay instance */
+	struct relay *relay;
+	/** Transaction lag value. */
+	double txn_lag;
+};
+
 /**
  * Cbus message to update replica gc state in tx thread.
  */
@@ -151,6 +163,8 @@ struct relay {
 	struct cpipe relay_pipe;
 	/** Status message */
 	struct relay_status_msg status_msg;
+	/** Reader fiber message */
+	struct relay_reader_msg reader_msg;
 	/**
 	 * List of garbage collection messages awaiting
 	 * confirmation from the replica.
@@ -158,6 +172,11 @@ struct relay {
 	struct stailq pending_gc;
 	/** Time when last row was sent to peer. */
 	double last_row_time;
+	/**
+	 * Last timestamp observed from remote node to
+	 * compute @a tx.txn_lag value.
+	 */
+	double txn_acked_tm;
 	/** Relay sync state. */
 	enum relay_state state;
 
@@ -166,6 +185,14 @@ struct relay {
 		alignas(CACHELINE_SIZE)
 		/** Known relay vclock. */
 		struct vclock vclock;
+		/**
+		 * A time difference between the moment when we
+		 * wrote a transaction to the local WAL and when
+		 * this transaction has been replicated to remote
+		 * node (ie written to node's WAL) so that ACK get
+		 * received.
+		 */
+		double txn_lag;
 		/**
 		 * True if the relay needs Raft updates. It can live fine
 		 * without sending Raft updates, if it is a relay to an
@@ -217,6 +244,12 @@ relay_last_row_time(const struct relay *relay)
 	return relay->last_row_time;
 }
 
+double
+relay_txn_lag(const struct relay *relay)
+{
+	return relay->tx.txn_lag;
+}
+
 static void
 relay_send(struct relay *relay, struct xrow_header *packet);
 static void
@@ -284,6 +317,15 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
 	relay->state = RELAY_FOLLOW;
 	relay->row_count = 0;
 	relay->last_row_time = ev_monotonic_now(loop());
+	/*
+	 * We assume that previously written rows in WAL
+	 * are older than current node real time which allows
+	 * to simplify @a tx.txn_lag calculation. In worst
+	 * scenario when runtime has been adjusted backwards
+	 * between restart we simply get some big value in
+	 * @a tx.txn_lag until next transaction get replicated.
+	 */
+	relay->txn_acked_tm = ev_now(loop());
 }
 
 void
@@ -336,6 +378,12 @@ relay_stop(struct relay *relay)
 	 * upon cord_create().
 	 */
 	relay->cord.id = 0;
+	/*
+	 * If relay is stopped then lag statistics should
+	 * be updated on next new ACK packets obtained.
+	 */
+	relay->txn_acked_tm = 0;
+	relay->tx.txn_lag = 0;
 }
 
 void
@@ -466,11 +514,10 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
 }
 
 /**
- * The message which updated tx thread with a new vclock has returned back
- * to the relay.
+ * The message which updated tx thread returns to the relay thread.
  */
 static void
-relay_status_update(struct cmsg *msg)
+tx_update_complete(struct cmsg *msg)
 {
 	msg->route = NULL;
 }
@@ -500,7 +547,7 @@ tx_status_update(struct cmsg *msg)
 	trigger_run(&replicaset.on_ack, &ack);
 
 	static const struct cmsg_hop route[] = {
-		{relay_status_update, NULL}
+		{tx_update_complete, NULL}
 	};
 	cmsg_init(msg, route);
 	cpipe_push(&status->relay->relay_pipe, msg);
@@ -607,6 +654,20 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
 	}
 }
 
+static void
+tx_reader_update(struct cmsg *msg)
+{
+	struct relay_reader_msg *rmsg = (struct relay_reader_msg *)msg;
+	rmsg->relay->tx.txn_lag = rmsg->txn_lag;
+
+	static const struct cmsg_hop route[] = {
+		{tx_update_complete, NULL}
+	};
+
+	cmsg_init(msg, route);
+	cpipe_push(&rmsg->relay->relay_pipe, msg);
+}
+
 /*
  * Relay reader fiber function.
  * Read xrow encoded vclocks sent by the replica.
@@ -616,6 +677,7 @@ relay_reader_f(va_list ap)
 {
 	struct relay *relay = va_arg(ap, struct relay *);
 	struct fiber *relay_f = va_arg(ap, struct fiber *);
+	struct relay_reader_msg *rmsg = &relay->reader_msg;
 
 	struct ibuf ibuf;
 	struct ev_io io;
@@ -629,6 +691,30 @@ relay_reader_f(va_list ap)
 			/* vclock is followed while decoding, zeroing it. */
 			vclock_create(&relay->recv_vclock);
 			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
+			/*
+			 * Replica send us last replicated transaction
+			 * timestamp which is needed for relay lag
+			 * monitoring. Note that this transaction has
+			 * been written to WAL with our current realtime
+			 * clock value, thus when it get reported back we
+			 * can compute time spent regardless of the clock
+			 * value on remote replica. Since the communication
+			 * goes between thread make sure the previous update
+			 * to TX is complete.
+			 */
+			if (relay->txn_acked_tm < xrow.tm &&
+			    rmsg->msg.route == NULL) {
+				relay->txn_acked_tm = xrow.tm;
+
+				static const struct cmsg_hop route[] = {
+					{tx_reader_update, NULL}
+				};
+
+				cmsg_init(&rmsg->msg, route);
+				rmsg->txn_lag = ev_now(loop()) - xrow.tm;
+				rmsg->relay = relay;
+				cpipe_push(&relay->tx_pipe, &rmsg->msg);
+			}
 			fiber_cond_signal(&relay->reader_cond);
 		}
 	} catch (Exception *e) {
diff --git a/src/box/relay.h b/src/box/relay.h
index b32e2ea2a..615ffb75d 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -93,6 +93,12 @@ relay_vclock(const struct relay *relay);
 double
 relay_last_row_time(const struct relay *relay);
 
+/**
+ * Returns relay's transaction's lag.
+ */
+double
+relay_txn_lag(const struct relay *relay);
+
 /**
  * Send a Raft update request to the relay channel. It is not
  * guaranteed that it will be delivered. The connection may break.
diff --git a/test/replication/gh-5447-downstream-lag.result b/test/replication/gh-5447-downstream-lag.result
new file mode 100644
index 000000000..2cc020451
--- /dev/null
+++ b/test/replication/gh-5447-downstream-lag.result
@@ -0,0 +1,128 @@
+-- test-run result file version 2
+--
+-- gh-5447: Test for box.info.replication[n].downstream.lag.
+-- We need to be sure that slow ACKs delivery might be
+-- caught by monitoring tools.
+--
+
+fiber = require('fiber')
+ | ---
+ | ...
+test_run = require('test_run').new()
+ | ---
+ | ...
+engine = test_run:get_cfg('engine')
+ | ---
+ | ...
+
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+
+test_run:cmd('create server replica with rpl_master=default, \
+             script="replication/replica.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica')
+ | ---
+ | - true
+ | ...
+
+replica_id = test_run:get_server_id('replica')
+ | ---
+ | ...
+
+--
+-- Upon replica startup there is no ACKs to process.
+assert(box.info.replication[replica_id].downstream.lag == 0)
+ | ---
+ | - true
+ | ...
+
+s = box.schema.space.create('test', {engine = engine})
+ | ---
+ | ...
+_ = s:create_index('pk')
+ | ---
+ | ...
+
+--
+-- The replica should wait some time before writing data
+-- to the WAL, otherwise we might not even notice the lag
+-- if media is too fast. Before disabling WAL we need to
+-- wait the space get propagated.
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+test_run:wait_lsn('replica', 'default')
+ | ---
+ | ...
+box.error.injection.set("ERRINJ_WAL_DELAY", true)
+ | ---
+ | - ok
+ | ...
+
+--
+-- Insert a record and wakeup replica's WAL to process data.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+lsn = box.info.lsn
+ | ---
+ | ...
+box.space.test:insert({1})
+ | ---
+ | - [1]
+ | ...
+test_run:wait_cond(function() return box.info.lsn > lsn end)
+ | ---
+ | - true
+ | ...
+-- The record is written on the master node.
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+box.error.injection.set("ERRINJ_WAL_DELAY", false)
+ | ---
+ | - ok
+ | ...
+
+--
+-- Wait the record to be ACKed, the lag value should be nonzero.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:wait_lsn('replica', 'default')
+ | ---
+ | ...
+assert(box.info.replication[replica_id].downstream.lag > 0)
+ | ---
+ | - true
+ | ...
+
+--
+-- Cleanup everything.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('cleanup server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
diff --git a/test/replication/gh-5447-downstream-lag.test.lua b/test/replication/gh-5447-downstream-lag.test.lua
new file mode 100644
index 000000000..3096e2ac3
--- /dev/null
+++ b/test/replication/gh-5447-downstream-lag.test.lua
@@ -0,0 +1,57 @@
+--
+-- gh-5447: Test for box.info.replication[n].downstream.lag.
+-- We need to be sure that slow ACKs delivery might be
+-- caught by monitoring tools.
+--
+
+fiber = require('fiber')
+test_run = require('test_run').new()
+engine = test_run:get_cfg('engine')
+
+box.schema.user.grant('guest', 'replication')
+
+test_run:cmd('create server replica with rpl_master=default, \
+             script="replication/replica.lua"')
+test_run:cmd('start server replica')
+
+replica_id = test_run:get_server_id('replica')
+
+--
+-- Upon replica startup there is no ACKs to process.
+assert(box.info.replication[replica_id].downstream.lag == 0)
+
+s = box.schema.space.create('test', {engine = engine})
+_ = s:create_index('pk')
+
+--
+-- The replica should wait some time before writing data
+-- to the WAL, otherwise we might not even notice the lag
+-- if media is too fast. Before disabling WAL we need to
+-- wait the space get propagated.
+test_run:switch('replica')
+test_run:wait_lsn('replica', 'default')
+box.error.injection.set("ERRINJ_WAL_DELAY", true)
+
+--
+-- Insert a record and wakeup replica's WAL to process data.
+test_run:switch('default')
+lsn = box.info.lsn
+box.space.test:insert({1})
+test_run:wait_cond(function() return box.info.lsn > lsn end)
+-- The record is written on the master node.
+test_run:switch('replica')
+box.error.injection.set("ERRINJ_WAL_DELAY", false)
+
+--
+-- Wait the record to be ACKed, the lag value should be nonzero.
+test_run:switch('default')
+test_run:wait_lsn('replica', 'default')
+assert(box.info.replication[replica_id].downstream.lag > 0)
+
+--
+-- Cleanup everything.
+test_run:switch('default')
+box.schema.user.revoke('guest', 'replication')
+test_run:cmd('stop server replica')
+test_run:cmd('cleanup server replica')
+test_run:cmd('delete server replica')
-- 
2.31.1


  parent reply	other threads:[~2021-06-17 15:49 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-06-17 15:48 [Tarantool-patches] [PATCH v9 0/2] relay: provide downstream lag information Cyrill Gorcunov via Tarantool-patches
2021-06-17 15:48 ` [Tarantool-patches] [PATCH v9 1/2] applier: send transaction's first row WAL time in the applier_writer_f Cyrill Gorcunov via Tarantool-patches
2021-06-18  9:51   ` Serge Petrenko via Tarantool-patches
2021-06-18 18:06     ` Cyrill Gorcunov via Tarantool-patches
2021-06-21  8:35       ` Serge Petrenko via Tarantool-patches
2021-06-17 15:48 ` Cyrill Gorcunov via Tarantool-patches [this message]
2021-06-18  9:50   ` [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag Serge Petrenko via Tarantool-patches
2021-06-20 14:37   ` Vladislav Shpilevoy via Tarantool-patches
2021-06-21  8:44     ` Cyrill Gorcunov via Tarantool-patches
2021-06-21 16:17     ` Cyrill Gorcunov via Tarantool-patches
2021-06-21 21:16       ` Vladislav Shpilevoy via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210617154835.315576-3-gorcunov@gmail.com \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=gorcunov@gmail.com \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox