Tarantool development patches archive
 help / color / mirror / Atom feed
From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: Cyrill Gorcunov <gorcunov@gmail.com>,
	tml <tarantool-patches@dev.tarantool.org>
Cc: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Subject: Re: [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag
Date: Fri, 18 Jun 2021 12:50:01 +0300
Message-ID: <816930c9-c890-68e5-c25e-53543beb5df7@tarantool.org> (raw)
In-Reply-To: <20210617154835.315576-3-gorcunov@gmail.com>



17.06.2021 18:48, Cyrill Gorcunov пишет:
> We already have `box.replication.upstream.lag` entry for monitoring
> sake. Same time in synchronous replication timeouts are key properties
> for quorum gathering procedure. Thus we would like to know how long
> it took of a transaction to traverse `initiator WAL -> network ->
> remote applier -> initiator ACK reception` path.
>
> Typical output is
>
>   | tarantool> box.info.replication[2].downstream
>   | ---
>   | - status: follow
>   |   idle: 0.61753897101153
>   |   vclock: {1: 147}
>   |   lag: 0
>   | ...
>   | tarantool> box.space.sync:insert{69}
>   | ---
>   | - [69]
>   | ...
>   |
>   | tarantool> box.info.replication[2].downstream
>   | ---
>   | - status: follow
>   |   idle: 0.75324084801832
>   |   vclock: {1: 151}
>   |   lag: 0.0011014938354492
>   | ...
>
> Closes #5447

Hi! Thanks for the fixes!

Please find a couple of comments below.

>
> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
>
> @TarantoolBot document
> Title: Add `box.info.replication[n].downstream.lag` entry
>
> `replication[n].downstream.lag` represents a lag between the main
> node writes a certain transaction to it's own WAL and a moment it
> receives an ack for this transaction from a replica.
> ---
>   .../unreleased/gh-5447-downstream-lag.md      |   6 +
>   src/box/lua/info.c                            |   3 +
>   src/box/relay.cc                              |  94 ++++++++++++-
>   src/box/relay.h                               |   6 +
>   .../replication/gh-5447-downstream-lag.result | 128 ++++++++++++++++++
>   .../gh-5447-downstream-lag.test.lua           |  57 ++++++++
>   6 files changed, 290 insertions(+), 4 deletions(-)
>   create mode 100644 changelogs/unreleased/gh-5447-downstream-lag.md
>   create mode 100644 test/replication/gh-5447-downstream-lag.result
>   create mode 100644 test/replication/gh-5447-downstream-lag.test.lua
>
> diff --git a/changelogs/unreleased/gh-5447-downstream-lag.md b/changelogs/unreleased/gh-5447-downstream-lag.md
> new file mode 100644
> index 000000000..f937ce35e
> --- /dev/null
> +++ b/changelogs/unreleased/gh-5447-downstream-lag.md
> @@ -0,0 +1,6 @@
> +#feature/replication
> +
> + * Introduced `box.info.replication[n].downstream.lag` field to monitor
> +   state of replication. This member represents a lag between the main
> +   node writes a certain transaction to it's own WAL and a moment it
> +   receives an ack for this transaction from a replica (gh-5447).
> diff --git a/src/box/lua/info.c b/src/box/lua/info.c
> index 0eb48b823..f201b25e3 100644
> --- a/src/box/lua/info.c
> +++ b/src/box/lua/info.c
> @@ -143,6 +143,9 @@ lbox_pushrelay(lua_State *L, struct relay *relay)
>   		lua_pushnumber(L, ev_monotonic_now(loop()) -
>   			       relay_last_row_time(relay));
>   		lua_settable(L, -3);
> +		lua_pushstring(L, "lag");
> +		lua_pushnumber(L, relay_txn_lag(relay));
> +		lua_settable(L, -3);
>   		break;
>   	case RELAY_STOPPED:
>   	{
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index b1571b361..ed6dd193d 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -70,6 +70,18 @@ struct relay_status_msg {
>   	struct vclock vclock;
>   };
>   
> +/**
> + * Cbus message to update relay reader fiber status in tx thread.
> + */
> +struct relay_reader_msg {
> +	/** Parent */
> +	struct cmsg msg;
> +	/** Relay instance */
> +	struct relay *relay;
> +	/** Transaction lag value. */
> +	double txn_lag;
> +};
> +
>   /**
>    * Cbus message to update replica gc state in tx thread.
>    */
> @@ -151,6 +163,8 @@ struct relay {
>   	struct cpipe relay_pipe;
>   	/** Status message */
>   	struct relay_status_msg status_msg;
> +	/** Reader fiber message */
> +	struct relay_reader_msg reader_msg;
>   	/**
>   	 * List of garbage collection messages awaiting
>   	 * confirmation from the replica.
> @@ -158,6 +172,11 @@ struct relay {
>   	struct stailq pending_gc;
>   	/** Time when last row was sent to peer. */
>   	double last_row_time;
> +	/**
> +	 * Last timestamp observed from remote node to
> +	 * compute @a tx.txn_lag value.
> +	 */
> +	double txn_acked_tm;
>   	/** Relay sync state. */
>   	enum relay_state state;
>   
> @@ -166,6 +185,14 @@ struct relay {
>   		alignas(CACHELINE_SIZE)
>   		/** Known relay vclock. */
>   		struct vclock vclock;
> +		/**
> +		 * A time difference between the moment when we
> +		 * wrote a transaction to the local WAL and when
> +		 * this transaction has been replicated to remote
> +		 * node (ie written to node's WAL) so that ACK get
> +		 * received.
> +		 */
> +		double txn_lag;
>   		/**
>   		 * True if the relay needs Raft updates. It can live fine
>   		 * without sending Raft updates, if it is a relay to an
> @@ -217,6 +244,12 @@ relay_last_row_time(const struct relay *relay)
>   	return relay->last_row_time;
>   }
>   
> +double
> +relay_txn_lag(const struct relay *relay)
> +{
> +	return relay->tx.txn_lag;
> +}
> +
>   static void
>   relay_send(struct relay *relay, struct xrow_header *packet);
>   static void
> @@ -284,6 +317,15 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
>   	relay->state = RELAY_FOLLOW;
>   	relay->row_count = 0;
>   	relay->last_row_time = ev_monotonic_now(loop());
> +	/*
> +	 * We assume that previously written rows in WAL
> +	 * are older than current node real time which allows
> +	 * to simplify @a tx.txn_lag calculation. In worst
> +	 * scenario when runtime has been adjusted backwards
> +	 * between restart we simply get some big value in
> +	 * @a tx.txn_lag until next transaction get replicated.
> +	 */
> +	relay->txn_acked_tm = ev_now(loop());
>   }
>   
>   void
> @@ -336,6 +378,12 @@ relay_stop(struct relay *relay)
>   	 * upon cord_create().
>   	 */
>   	relay->cord.id = 0;
> +	/*
> +	 * If relay is stopped then lag statistics should
> +	 * be updated on next new ACK packets obtained.
> +	 */
> +	relay->txn_acked_tm = 0;
> +	relay->tx.txn_lag = 0;
>   }
>   
>   void
> @@ -466,11 +514,10 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
>   }
>   
>   /**
> - * The message which updated tx thread with a new vclock has returned back
> - * to the relay.
> + * The message which updated tx thread returns to the relay thread.
>    */
>   static void
> -relay_status_update(struct cmsg *msg)
> +tx_update_complete(struct cmsg *msg)

Let's name this `relay_update_complete` or something similar, please.

Otherwise it looks like something executed in tx thread.

>   {
>   	msg->route = NULL;
>   }
> @@ -500,7 +547,7 @@ tx_status_update(struct cmsg *msg)
>   	trigger_run(&replicaset.on_ack, &ack);
>   
>   	static const struct cmsg_hop route[] = {
> -		{relay_status_update, NULL}
> +		{tx_update_complete, NULL}
>   	};
>   	cmsg_init(msg, route);
>   	cpipe_push(&status->relay->relay_pipe, msg);
> @@ -607,6 +654,20 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
>   	}
>   }
>   
> +static void
> +tx_reader_update(struct cmsg *msg)
> +{
> +	struct relay_reader_msg *rmsg = (struct relay_reader_msg *)msg;
> +	rmsg->relay->tx.txn_lag = rmsg->txn_lag;
> +
> +	static const struct cmsg_hop route[] = {
> +		{tx_update_complete, NULL}
> +	};
> +
> +	cmsg_init(msg, route);
> +	cpipe_push(&rmsg->relay->relay_pipe, msg);
> +}
> +
>   /*
>    * Relay reader fiber function.
>    * Read xrow encoded vclocks sent by the replica.
> @@ -616,6 +677,7 @@ relay_reader_f(va_list ap)
>   {
>   	struct relay *relay = va_arg(ap, struct relay *);
>   	struct fiber *relay_f = va_arg(ap, struct fiber *);
> +	struct relay_reader_msg *rmsg = &relay->reader_msg;
>   
>   	struct ibuf ibuf;
>   	struct ev_io io;
> @@ -629,6 +691,30 @@ relay_reader_f(va_list ap)
>   			/* vclock is followed while decoding, zeroing it. */
>   			vclock_create(&relay->recv_vclock);
>   			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
> +			/*
> +			 * Replica send us last replicated transaction
> +			 * timestamp which is needed for relay lag
> +			 * monitoring. Note that this transaction has
> +			 * been written to WAL with our current realtime
> +			 * clock value, thus when it get reported back we
> +			 * can compute time spent regardless of the clock
> +			 * value on remote replica. Since the communication
> +			 * goes between thread make sure the previous update
> +			 * to TX is complete.
> +			 */
> +			if (relay->txn_acked_tm < xrow.tm &&
> +			    rmsg->msg.route == NULL) {
> +				relay->txn_acked_tm = xrow.tm;
> +
> +				static const struct cmsg_hop route[] = {
> +					{tx_reader_update, NULL}
> +				};
> +
> +				cmsg_init(&rmsg->msg, route);
> +				rmsg->txn_lag = ev_now(loop()) - xrow.tm;
> +				rmsg->relay = relay;
> +				cpipe_push(&relay->tx_pipe, &rmsg->msg);
> +			}

This bothers me a bit that now we have 2 places which issue cbus 
messages from
relay to tx. I mean relay_reader_f and relay_subscribe_f.

It would be nice if you could unify these two messages. For example,
enrich tx_status_update message with new txn_lag value.

I think this should be fine to send the cbus message a bit later than 
the actual
ACK was received. This is how it's done for replica's vclock.

So, I propose to either move vclock update (relay->status_msg) push to 
relay_reader_f
or move time update (relay->reader_msg) to relay_subscribe_f. And unify 
the messages.

This way we'll have a single source for all relay->tx updates and less 
code duplication.

P.S. moving relay->status_msg push to relay_reader_f sounds more logical 
to me.

>   			fiber_cond_signal(&relay->reader_cond);
>   		}
>   	} catch (Exception *e) {
> diff --git a/src/box/relay.h b/src/box/relay.h
> index b32e2ea2a..615ffb75d 100644
> --- a/src/box/relay.h
> +++ b/src/box/relay.h
> @@ -93,6 +93,12 @@ relay_vclock(const struct relay *relay);
>   double
>   relay_last_row_time(const struct relay *relay);
>   
> +/**
> + * Returns relay's transaction's lag.
> + */
> +double
> +relay_txn_lag(const struct relay *relay);
> +
>   /**
>    * Send a Raft update request to the relay channel. It is not
>    * guaranteed that it will be delivered. The connection may break.
> diff --git a/test/replication/gh-5447-downstream-lag.result b/test/replication/gh-5447-downstream-lag.result
> new file mode 100644
> index 000000000..2cc020451
> --- /dev/null
> +++ b/test/replication/gh-5447-downstream-lag.result
> @@ -0,0 +1,128 @@
> +-- test-run result file version 2
> +--
> +-- gh-5447: Test for box.info.replication[n].downstream.lag.
> +-- We need to be sure that slow ACKs delivery might be
> +-- caught by monitoring tools.
> +--
> +
> +fiber = require('fiber')
> + | ---
> + | ...
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +engine = test_run:get_cfg('engine')
> + | ---
> + | ...
> +
> +box.schema.user.grant('guest', 'replication')
> + | ---
> + | ...
> +
> +test_run:cmd('create server replica with rpl_master=default, \
> +             script="replication/replica.lua"')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('start server replica')
> + | ---
> + | - true
> + | ...
> +
> +replica_id = test_run:get_server_id('replica')
> + | ---
> + | ...
> +
> +--
> +-- Upon replica startup there is no ACKs to process.
> +assert(box.info.replication[replica_id].downstream.lag == 0)
> + | ---
> + | - true
> + | ...
> +
> +s = box.schema.space.create('test', {engine = engine})
> + | ---
> + | ...
> +_ = s:create_index('pk')
> + | ---
> + | ...
> +
> +--
> +-- The replica should wait some time before writing data
> +-- to the WAL, otherwise we might not even notice the lag
> +-- if media is too fast. Before disabling WAL we need to
> +-- wait the space get propagated.
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +test_run:wait_lsn('replica', 'default')
> + | ---
> + | ...
> +box.error.injection.set("ERRINJ_WAL_DELAY", true)
> + | ---
> + | - ok
> + | ...
> +
> +--
> +-- Insert a record and wakeup replica's WAL to process data.
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +lsn = box.info.lsn
> + | ---
> + | ...
> +box.space.test:insert({1})
> + | ---
> + | - [1]
> + | ...
> +test_run:wait_cond(function() return box.info.lsn > lsn end)
> + | ---
> + | - true
> + | ...
> +-- The record is written on the master node.
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +box.error.injection.set("ERRINJ_WAL_DELAY", false)
> + | ---
> + | - ok
> + | ...
> +
> +--
> +-- Wait the record to be ACKed, the lag value should be nonzero.
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +test_run:wait_lsn('replica', 'default')
> + | ---
> + | ...
> +assert(box.info.replication[replica_id].downstream.lag > 0)
> + | ---
> + | - true
> + | ...
> +
> +--
> +-- Cleanup everything.
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +box.schema.user.revoke('guest', 'replication')
> + | ---
> + | ...
> +test_run:cmd('stop server replica')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('cleanup server replica')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('delete server replica')
> + | ---
> + | - true
> + | ...
> diff --git a/test/replication/gh-5447-downstream-lag.test.lua b/test/replication/gh-5447-downstream-lag.test.lua
> new file mode 100644
> index 000000000..3096e2ac3
> --- /dev/null
> +++ b/test/replication/gh-5447-downstream-lag.test.lua
> @@ -0,0 +1,57 @@
> +--
> +-- gh-5447: Test for box.info.replication[n].downstream.lag.
> +-- We need to be sure that slow ACKs delivery might be
> +-- caught by monitoring tools.
> +--
> +
> +fiber = require('fiber')
> +test_run = require('test_run').new()
> +engine = test_run:get_cfg('engine')
> +
> +box.schema.user.grant('guest', 'replication')
> +
> +test_run:cmd('create server replica with rpl_master=default, \
> +             script="replication/replica.lua"')
> +test_run:cmd('start server replica')
> +
> +replica_id = test_run:get_server_id('replica')
> +
> +--
> +-- Upon replica startup there is no ACKs to process.
> +assert(box.info.replication[replica_id].downstream.lag == 0)
> +
> +s = box.schema.space.create('test', {engine = engine})
> +_ = s:create_index('pk')
> +
> +--
> +-- The replica should wait some time before writing data
> +-- to the WAL, otherwise we might not even notice the lag
> +-- if media is too fast. Before disabling WAL we need to
> +-- wait the space get propagated.
> +test_run:switch('replica')
> +test_run:wait_lsn('replica', 'default')
> +box.error.injection.set("ERRINJ_WAL_DELAY", true)
> +
> +--
> +-- Insert a record and wakeup replica's WAL to process data.
> +test_run:switch('default')
> +lsn = box.info.lsn
> +box.space.test:insert({1})
> +test_run:wait_cond(function() return box.info.lsn > lsn end)
> +-- The record is written on the master node.
> +test_run:switch('replica')
> +box.error.injection.set("ERRINJ_WAL_DELAY", false)
> +
> +--
> +-- Wait the record to be ACKed, the lag value should be nonzero.
> +test_run:switch('default')
> +test_run:wait_lsn('replica', 'default')
> +assert(box.info.replication[replica_id].downstream.lag > 0)
> +
> +--
> +-- Cleanup everything.
> +test_run:switch('default')
> +box.schema.user.revoke('guest', 'replication')
> +test_run:cmd('stop server replica')
> +test_run:cmd('cleanup server replica')
> +test_run:cmd('delete server replica')

-- 
Serge Petrenko


  reply	other threads:[~2021-06-18  9:50 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-06-17 15:48 [Tarantool-patches] [PATCH v9 0/2] relay: provide downstream lag information Cyrill Gorcunov via Tarantool-patches
2021-06-17 15:48 ` [Tarantool-patches] [PATCH v9 1/2] applier: send transaction's first row WAL time in the applier_writer_f Cyrill Gorcunov via Tarantool-patches
2021-06-18  9:51   ` Serge Petrenko via Tarantool-patches
2021-06-18 18:06     ` Cyrill Gorcunov via Tarantool-patches
2021-06-21  8:35       ` Serge Petrenko via Tarantool-patches
2021-06-17 15:48 ` [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag Cyrill Gorcunov via Tarantool-patches
2021-06-18  9:50   ` Serge Petrenko via Tarantool-patches [this message]
2021-06-20 14:37   ` Vladislav Shpilevoy via Tarantool-patches
2021-06-21  8:44     ` Cyrill Gorcunov via Tarantool-patches
2021-06-21 16:17     ` Cyrill Gorcunov via Tarantool-patches
2021-06-21 21:16       ` Vladislav Shpilevoy via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=816930c9-c890-68e5-c25e-53543beb5df7@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=gorcunov@gmail.com \
    --cc=sergepetrenko@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Tarantool development patches archive

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://lists.tarantool.org/tarantool-patches/0 tarantool-patches/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 tarantool-patches tarantool-patches/ https://lists.tarantool.org/tarantool-patches \
		tarantool-patches@dev.tarantool.org.
	public-inbox-index tarantool-patches

Example config snippet for mirrors.


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git