[PATCH] Send relay heartbeat if wal changes won't be send
Vladimir Davydov
vdavydov.dev at gmail.com
Tue Feb 27 12:20:12 MSK 2018
On Mon, Feb 26, 2018 at 07:04:18PM +0300, Georgy Kirichenko wrote:
> If a replica receives some changes then corresponding wal events are
> generated and relay fiber is waken up before heartbeat timeout.
> But there may be nothing to send if all changes are from the current
> relay peer. In this case an applier doesn't receive anything and break a
> connection.
>
> Fixes #3160
> Branch gh-3160-relay-heartbeat-on-applier-changes
> ---
> src/box/relay.cc | 45 ++++++++++++++++++-------------
> test/replication/misc.result | 60 ++++++++++++++++++++++++++++++++++++++++++
> test/replication/misc.test.lua | 18 +++++++++++++
> 3 files changed, 104 insertions(+), 19 deletions(-)
>
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index f9f22d622..ebe2a8167 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -432,37 +432,44 @@ relay_subscribe_f(va_list ap)
> fiber_set_joinable(reader, true);
> fiber_start(reader, relay, fiber());
>
> - /*
> - * If the replica happens to be uptodate on subscribe,
> - * don't wait for timeout to happen - send a heartbeat
> - * message right away to update the replication lag as
> - * soon as possible.
> - */
> - if (vclock_compare(&r->vclock, &replicaset.vclock) == 0)
> - relay_send_heartbeat(relay);
> + /* Send a first one heartbeat and set a keep-alive timeout. */
I don't really understand this new comment. What's keep-alive? Why do we
need to send the very first heartbeat? The old version of the comment
tried to explain that.
> + relay_send_heartbeat(relay);
> + ev_tstamp start, timeout;
> + coio_timeout_init(&start, &timeout, replication_timeout);
>
> while (!fiber_is_cancelled()) {
> - double timeout = replication_timeout;
> struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> ERRINJ_DOUBLE);
> - if (inj != NULL && inj->dparam != 0)
> - timeout = inj->dparam;
> + if (inj != NULL && inj->dparam != 0 && inj->dparam - timeout > 0)
> + fiber_sleep(inj->dparam - timeout);
Why did you change this?
>
> - if (fiber_cond_wait_timeout(&relay->reader_cond, timeout) != 0) {
> - /*
> - * Timed out waiting for WAL events.
> - * Send a heartbeat message to update
> - * the replication lag on the slave.
> - */
> - relay_send_heartbeat(relay);
> - }
> + fiber_cond_wait_timeout(&relay->reader_cond, timeout);
>
> + struct vclock old_vclock;
> + vclock_copy(&old_vclock, &r->vclock);
> /*
> * The fiber can be woken by IO cancel, by a timeout of
> * status messaging or by an acknowledge to status message.
> * Handle cbus messages first.
> */
> cbus_process(&relay->endpoint);
> + relay_send_heartbeat(relay);
Malformed indentation.
> + /* Update timeout. */
> + coio_timeout_update(start, &timeout);
> + /*
> + * Reset timeout if there are changes from any other
> + * that current peer instance.
> + */
> + if (relay->replica->id &&
> + vclock_sum(&r->vclock) - vclock_sum(&old_vclock) >
> + vclock_get(&r->vclock, relay->replica->id) -
> + vclock_get(&old_vclock, relay->replica->id))
> + coio_timeout_init(&start, &timeout, replication_timeout);
> + if (timeout < 0) {
> + /* It is time to send heartbeat. */
> + relay_send_heartbeat(relay);
> + coio_timeout_init(&start, &timeout, replication_timeout);
> + }
I don't like these checks, because they duplicate the check from
relay_send. May be, we could keep track of the time when the last
row was sent (in relay_send), and use something like this
fiber_cond_wait_deadline(last_row_ts + timeout)
cbus_process()
if (now >= last_row_ts + timeout)
send_heartbeat()
> /*
> * Check that the vclock has been updated and the previous
> * status message is delivered
> diff --git a/test/replication/misc.test.lua b/test/replication/misc.test.lua
> index d4f714d91..a8f375b53 100644
> --- a/test/replication/misc.test.lua
> +++ b/test/replication/misc.test.lua
> @@ -22,4 +22,22 @@ test_run:cmd('stop server test')
> test_run:cmd('cleanup server test')
> box.cfg{read_only = false}
>
Please add a comment explaining what you're testing. Without it in a
month we won't remember what this test is for.
> +test_run:cmd('create server test_timeout with rpl_master=default, script="replication/replica.lua"')
> +box.cfg{replication_timeout = 0.05}
> +test_run:cmd('start server test_timeout')
> +test_run:cmd('switch test_timeout')
> +test_run = require('test_run').new()
> +test_run:cmd(string.format('eval default "box.cfg{replication = \'%s\'}"', box.cfg.listen))
> +old_replication = box.cfg.replication
> +box.cfg{replication = {}}
> +box.cfg{replication_timeout = 0.05, replication = old_replication}
> +test_run:cmd('switch default')
> +fiber = require'fiber'
> +_ = box.schema.space.create('test_timeout'):create_index('pk')
> +for i = 0, 22 do box.space.test_timeout:replace({1}) fiber.sleep(0.01) end
> +box.info.replication[3].upstream.status
> +box.info.replication[3].upstream.message
> +test_run:cmd('stop server test_timeout')
> +test_run:cmd('cleanup server test_timeout')
> +
> box.schema.user.revoke('guest', 'replication')
More information about the Tarantool-patches
mailing list