[PATCH] Send relay heartbeat if wal changes won't be send

Vladimir Davydov vdavydov.dev at gmail.com
Tue Feb 27 12:20:12 MSK 2018


On Mon, Feb 26, 2018 at 07:04:18PM +0300, Georgy Kirichenko wrote:
> If a replica receives some changes then corresponding wal events are
> generated and relay fiber is waken up before heartbeat timeout.
> But there may be nothing to send if all changes are from the current
> relay peer. In this case an applier doesn't receive anything and break a
> connection.
> 
> Fixes #3160
> Branch gh-3160-relay-heartbeat-on-applier-changes
> ---
>  src/box/relay.cc               | 45 ++++++++++++++++++-------------
>  test/replication/misc.result   | 60 ++++++++++++++++++++++++++++++++++++++++++
>  test/replication/misc.test.lua | 18 +++++++++++++
>  3 files changed, 104 insertions(+), 19 deletions(-)
> 
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index f9f22d622..ebe2a8167 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -432,37 +432,44 @@ relay_subscribe_f(va_list ap)
>  	fiber_set_joinable(reader, true);
>  	fiber_start(reader, relay, fiber());
>  
> -	/*
> -	 * If the replica happens to be uptodate on subscribe,
> -	 * don't wait for timeout to happen - send a heartbeat
> -	 * message right away to update the replication lag as
> -	 * soon as possible.
> -	 */
> -	if (vclock_compare(&r->vclock, &replicaset.vclock) == 0)
> -		relay_send_heartbeat(relay);
> +	/* Send a first one heartbeat and set a keep-alive timeout. */

I don't really understand this new comment. What's keep-alive? Why do we
need to send the very first heartbeat? The old version of the comment
tried to explain that.

> +	relay_send_heartbeat(relay);
> +	ev_tstamp start, timeout;
> +	coio_timeout_init(&start, &timeout, replication_timeout);
>  
>  	while (!fiber_is_cancelled()) {
> -		double timeout = replication_timeout;
>  		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
>  					    ERRINJ_DOUBLE);
> -		if (inj != NULL && inj->dparam != 0)
> -			timeout = inj->dparam;
> +		if (inj != NULL && inj->dparam != 0 && inj->dparam - timeout > 0)
> +			fiber_sleep(inj->dparam - timeout);

Why did you change this?

>  
> -		if (fiber_cond_wait_timeout(&relay->reader_cond, timeout) != 0) {
> -			/*
> -			 * Timed out waiting for WAL events.
> -			 * Send a heartbeat message to update
> -			 * the replication lag on the slave.
> -			 */
> -			relay_send_heartbeat(relay);
> -		}
> +		fiber_cond_wait_timeout(&relay->reader_cond, timeout);
>  
> +		struct vclock old_vclock;
> +		vclock_copy(&old_vclock, &r->vclock);
>  		/*
>  		 * The fiber can be woken by IO cancel, by a timeout of
>  		 * status messaging or by an acknowledge to status message.
>  		 * Handle cbus messages first.
>  		 */
>  		cbus_process(&relay->endpoint);
> +				relay_send_heartbeat(relay);

Malformed indentation.

> +		/* Update timeout. */
> +		coio_timeout_update(start, &timeout);
> +		/*
> +		 * Reset timeout if there are changes from any other
> +		 * that current peer instance.
> +		 */
> +		if (relay->replica->id &&
> +		    vclock_sum(&r->vclock) - vclock_sum(&old_vclock) >
> +		    vclock_get(&r->vclock, relay->replica->id) -
> +		    vclock_get(&old_vclock, relay->replica->id))
> +			coio_timeout_init(&start, &timeout, replication_timeout);
> +		if (timeout < 0) {
> +			/* It is time to send heartbeat. */
> +			relay_send_heartbeat(relay);
> +			coio_timeout_init(&start, &timeout, replication_timeout);
> +		}

I don't like these checks, because they duplicate the check from
relay_send. May be, we could keep track of the time when the last
row was sent (in relay_send), and use something like this

  fiber_cond_wait_deadline(last_row_ts + timeout)
  cbus_process()
  if (now >= last_row_ts + timeout)
      send_heartbeat()

>  		/*
>  		 * Check that the vclock has been updated and the previous
>  		 * status message is delivered
> diff --git a/test/replication/misc.test.lua b/test/replication/misc.test.lua
> index d4f714d91..a8f375b53 100644
> --- a/test/replication/misc.test.lua
> +++ b/test/replication/misc.test.lua
> @@ -22,4 +22,22 @@ test_run:cmd('stop server test')
>  test_run:cmd('cleanup server test')
>  box.cfg{read_only = false}
>  

Please add a comment explaining what you're testing. Without it in a
month we won't remember what this test is for.

> +test_run:cmd('create server test_timeout with rpl_master=default, script="replication/replica.lua"')
> +box.cfg{replication_timeout = 0.05}
> +test_run:cmd('start server test_timeout')
> +test_run:cmd('switch test_timeout')
> +test_run = require('test_run').new()
> +test_run:cmd(string.format('eval default "box.cfg{replication = \'%s\'}"', box.cfg.listen))
> +old_replication = box.cfg.replication
> +box.cfg{replication = {}}
> +box.cfg{replication_timeout = 0.05, replication = old_replication}
> +test_run:cmd('switch default')
> +fiber = require'fiber'
> +_ = box.schema.space.create('test_timeout'):create_index('pk')
> +for i = 0, 22 do box.space.test_timeout:replace({1}) fiber.sleep(0.01) end
> +box.info.replication[3].upstream.status
> +box.info.replication[3].upstream.message
> +test_run:cmd('stop server test_timeout')
> +test_run:cmd('cleanup server test_timeout')
> +
>  box.schema.user.revoke('guest', 'replication')



More information about the Tarantool-patches mailing list