[PATCH] Send relay heartbeat if wal changes won't be send

Vladimir Davydov vdavydov.dev at gmail.com
Wed Feb 28 14:25:52 MSK 2018


On Wed, Feb 28, 2018 at 11:24:02AM +0300, Georgy Kirichenko wrote:
> If a replica receives some changes then corresponding wal events are
> generated and relay fiber is waken up before heartbeat timeout.
> But there may be nothing to send if all changes are from the current
> relay peer. In this case an applier doesn't receive anything and break a
> connection.
> 
> Fixes #3160

> Branch gh-3160-relay-heartbeat-on-applier-changes-v2

Some comments regarding the patch submission process:

 - A pointer to the branch should be given as a hyper-link to
   github.com, and it should go after '---' (it is ignored by
   `git am` then).

 - Since this is v2, there should also be a brief change log.

 - There's no need to create a new branch. You can force-update
   the old one

> 
> ---
>  src/box/relay.cc               | 38 +++++++++++++++++---------
>  test/replication/misc.result   | 61 ++++++++++++++++++++++++++++++++++++++++++
>  test/replication/misc.test.lua | 19 +++++++++++++
>  3 files changed, 105 insertions(+), 13 deletions(-)
> 
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index f9f22d622..3de6fd070 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -124,6 +124,10 @@ struct relay {
>  	 * confirmation from the replica.
>  	 */
>  	struct stailq pending_gc;
> +	/**
> +	 * A time when last row was send.
> +	 */
> +	double last_row_tstamp;

We use 'tm' as abbreviation for timestamp throughout the code
so I think we should call it `last_row_tm`.

>  
>  	struct {
>  		/* Align to prevent false-sharing with tx thread */
> @@ -433,36 +437,43 @@ relay_subscribe_f(va_list ap)
>  	fiber_start(reader, relay, fiber());
>  
>  	/*
> -	 * If the replica happens to be uptodate on subscribe,
> +	 * If the replica happens to be up to date on subscribe,
>  	 * don't wait for timeout to happen - send a heartbeat
>  	 * message right away to update the replication lag as
>  	 * soon as possible.
>  	 */
> -	if (vclock_compare(&r->vclock, &replicaset.vclock) == 0)
> -		relay_send_heartbeat(relay);
> +	relay_send_heartbeat(relay);
>  
>  	while (!fiber_is_cancelled()) {
> -		double timeout = replication_timeout;
>  		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
>  					    ERRINJ_DOUBLE);
>  		if (inj != NULL && inj->dparam != 0)
> -			timeout = inj->dparam;
> +			fiber_sleep(inj->dparam);

I asked it before but didn't have an answer so I'll ask again.
Why did you change this?

>  
> -		if (fiber_cond_wait_timeout(&relay->reader_cond, timeout) != 0) {
> -			/*
> -			 * Timed out waiting for WAL events.
> -			 * Send a heartbeat message to update
> -			 * the replication lag on the slave.
> -			 */
> -			relay_send_heartbeat(relay);
> -		}
> +		double timeout = replication_timeout -
> +				 (fiber_time() - relay->last_row_tstamp);
> +		fiber_cond_wait_timeout(&relay->reader_cond, timeout);

fiber_cond_wait_deadline(last_row_tm + timeout)

>  
> +		struct vclock old_vclock;
> +		vclock_copy(&old_vclock, &r->vclock);

What's this doing here?

>  		/*
>  		 * The fiber can be woken by IO cancel, by a timeout of
>  		 * status messaging or by an acknowledge to status message.
>  		 * Handle cbus messages first.
>  		 */
>  		cbus_process(&relay->endpoint);
> +		/*
> +		 * If the processed rows belong to the peer,
> +		 * cbus_process() has filtered them out instead of
> +		 * sending to the peer, so the peer got no
> +		 * message. In this case we should honor the
> +		 * previous heartbeat timeout, rather than reset
> +		 * it.
> +		 */

This comment is an overkill IMHO - it goes too deeply into details, but
at the same time doesn't explain why we send heartbeat messages at all.
Let's just copy the commend we used to have, shall we?

> +		if (fiber_time() - relay->last_row_tstamp > replication_timeout) {

> +			/* It is time to send a heartbeat. */

This is a pointless comment. The function name says practically
the same.

> +			relay_send_heartbeat(relay);
> +		}
>  		/*
>  		 * Check that the vclock has been updated and the previous
>  		 * status message is delivered
> @@ -589,5 +600,6 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
>  	if (relay->replica == NULL ||
>  	    packet->replica_id != relay->replica->id) {
>  		relay_send(relay, packet);
> +		relay->last_row_tstamp = fiber_time();

The timestamp must be updated in relay_send! Otherwise heartbeats will
be sent non-stop in case there's no WAL events on the master, consuming
100% of CPU. Just run master-master and see what `top` shows.

>  	}
>  }
> diff --git a/test/replication/misc.test.lua b/test/replication/misc.test.lua
> index d4f714d91..e2649fd7c 100644
> --- a/test/replication/misc.test.lua
> +++ b/test/replication/misc.test.lua
> @@ -22,4 +22,23 @@ test_run:cmd('stop server test')
>  test_run:cmd('cleanup server test')
>  box.cfg{read_only = false}
>  
> +-- gh-3160 - Send heartbeats if there are changes from a remote master only
> +test_run:cmd('create server test_timeout with rpl_master=default, script="replication/replica.lua"')
> +box.cfg{replication_timeout = 0.05}
> +test_run:cmd('start server test_timeout')
> +test_run:cmd('switch test_timeout')
> +test_run = require('test_run').new()
> +test_run:cmd(string.format('eval default "box.cfg{replication = \'%s\'}"', box.cfg.listen))

IMHO better create a separate script for setting up master-master
(master.lua). Might be useful for other tests as well.

> +old_replication = box.cfg.replication
> +box.cfg{replication = {}}
> +box.cfg{replication_timeout = 0.05, replication = old_replication}
> +test_run:cmd('switch default')
> +fiber = require'fiber'

A space is missing. Also, we usually wrap `require` argument in
parentheses.

> +_ = box.schema.space.create('test_timeout'):create_index('pk')
> +for i = 0, 22 do box.space.test_timeout:replace({1}) fiber.sleep(0.01) end
> +box.info.replication[3].upstream.status
> +box.info.replication[3].upstream.message

Kinda unreliable way to test this if you ask me. IMHO it would be better
to check the replication status in the loop after inserting each record.
Then we could insert 100 records so that a passerby wouldn't have to
scratch their head trying to figure out where 22 comes from :-)

Also, this fails:

  ./test-run.py -j -1 replication/misc.test.lua replication/misc.test.lua

Please fix as this may affect parallel test run.

> +test_run:cmd('stop server test_timeout')
> +test_run:cmd('cleanup server test_timeout')
> +
>  box.schema.user.revoke('guest', 'replication')



More information about the Tarantool-patches mailing list