[PATCH] Send relay heartbeat if wal changes won't be send

Georgy Kirichenko georgy at tarantool.org
Wed Feb 28 11:24:02 MSK 2018


If a replica receives some changes then corresponding wal events are
generated and relay fiber is waken up before heartbeat timeout.
But there may be nothing to send if all changes are from the current
relay peer. In this case an applier doesn't receive anything and break a
connection.

Fixes #3160
Branch gh-3160-relay-heartbeat-on-applier-changes-v2

---
 src/box/relay.cc               | 38 +++++++++++++++++---------
 test/replication/misc.result   | 61 ++++++++++++++++++++++++++++++++++++++++++
 test/replication/misc.test.lua | 19 +++++++++++++
 3 files changed, 105 insertions(+), 13 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index f9f22d622..3de6fd070 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -124,6 +124,10 @@ struct relay {
 	 * confirmation from the replica.
 	 */
 	struct stailq pending_gc;
+	/**
+	 * A time when last row was send.
+	 */
+	double last_row_tstamp;
 
 	struct {
 		/* Align to prevent false-sharing with tx thread */
@@ -433,36 +437,43 @@ relay_subscribe_f(va_list ap)
 	fiber_start(reader, relay, fiber());
 
 	/*
-	 * If the replica happens to be uptodate on subscribe,
+	 * If the replica happens to be up to date on subscribe,
 	 * don't wait for timeout to happen - send a heartbeat
 	 * message right away to update the replication lag as
 	 * soon as possible.
 	 */
-	if (vclock_compare(&r->vclock, &replicaset.vclock) == 0)
-		relay_send_heartbeat(relay);
+	relay_send_heartbeat(relay);
 
 	while (!fiber_is_cancelled()) {
-		double timeout = replication_timeout;
 		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
 					    ERRINJ_DOUBLE);
 		if (inj != NULL && inj->dparam != 0)
-			timeout = inj->dparam;
+			fiber_sleep(inj->dparam);
 
-		if (fiber_cond_wait_timeout(&relay->reader_cond, timeout) != 0) {
-			/*
-			 * Timed out waiting for WAL events.
-			 * Send a heartbeat message to update
-			 * the replication lag on the slave.
-			 */
-			relay_send_heartbeat(relay);
-		}
+		double timeout = replication_timeout -
+				 (fiber_time() - relay->last_row_tstamp);
+		fiber_cond_wait_timeout(&relay->reader_cond, timeout);
 
+		struct vclock old_vclock;
+		vclock_copy(&old_vclock, &r->vclock);
 		/*
 		 * The fiber can be woken by IO cancel, by a timeout of
 		 * status messaging or by an acknowledge to status message.
 		 * Handle cbus messages first.
 		 */
 		cbus_process(&relay->endpoint);
+		/*
+		 * If the processed rows belong to the peer,
+		 * cbus_process() has filtered them out instead of
+		 * sending to the peer, so the peer got no
+		 * message. In this case we should honor the
+		 * previous heartbeat timeout, rather than reset
+		 * it.
+		 */
+		if (fiber_time() - relay->last_row_tstamp > replication_timeout) {
+			/* It is time to send a heartbeat. */
+			relay_send_heartbeat(relay);
+		}
 		/*
 		 * Check that the vclock has been updated and the previous
 		 * status message is delivered
@@ -589,5 +600,6 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 	if (relay->replica == NULL ||
 	    packet->replica_id != relay->replica->id) {
 		relay_send(relay, packet);
+		relay->last_row_tstamp = fiber_time();
 	}
 }
diff --git a/test/replication/misc.result b/test/replication/misc.result
index 070e4ea80..c4621b8cd 100644
--- a/test/replication/misc.result
+++ b/test/replication/misc.result
@@ -61,6 +61,67 @@ test_run:cmd('cleanup server test')
 box.cfg{read_only = false}
 ---
 ...
+-- gh-3160 - Send heartbeats if there are changes from a remote master only
+test_run:cmd('create server test_timeout with rpl_master=default, script="replication/replica.lua"')
+---
+- true
+...
+box.cfg{replication_timeout = 0.05}
+---
+...
+test_run:cmd('start server test_timeout')
+---
+- true
+...
+test_run:cmd('switch test_timeout')
+---
+- true
+...
+test_run = require('test_run').new()
+---
+...
+test_run:cmd(string.format('eval default "box.cfg{replication = \'%s\'}"', box.cfg.listen))
+---
+- []
+...
+old_replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication_timeout = 0.05, replication = old_replication}
+---
+...
+test_run:cmd('switch default')
+---
+- true
+...
+fiber = require'fiber'
+---
+...
+_ = box.schema.space.create('test_timeout'):create_index('pk')
+---
+...
+for i = 0, 22 do box.space.test_timeout:replace({1}) fiber.sleep(0.01) end
+---
+...
+box.info.replication[3].upstream.status
+---
+- follow
+...
+box.info.replication[3].upstream.message
+---
+- null
+...
+test_run:cmd('stop server test_timeout')
+---
+- true
+...
+test_run:cmd('cleanup server test_timeout')
+---
+- true
+...
 box.schema.user.revoke('guest', 'replication')
 ---
 ...
diff --git a/test/replication/misc.test.lua b/test/replication/misc.test.lua
index d4f714d91..e2649fd7c 100644
--- a/test/replication/misc.test.lua
+++ b/test/replication/misc.test.lua
@@ -22,4 +22,23 @@ test_run:cmd('stop server test')
 test_run:cmd('cleanup server test')
 box.cfg{read_only = false}
 
+-- gh-3160 - Send heartbeats if there are changes from a remote master only
+test_run:cmd('create server test_timeout with rpl_master=default, script="replication/replica.lua"')
+box.cfg{replication_timeout = 0.05}
+test_run:cmd('start server test_timeout')
+test_run:cmd('switch test_timeout')
+test_run = require('test_run').new()
+test_run:cmd(string.format('eval default "box.cfg{replication = \'%s\'}"', box.cfg.listen))
+old_replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication_timeout = 0.05, replication = old_replication}
+test_run:cmd('switch default')
+fiber = require'fiber'
+_ = box.schema.space.create('test_timeout'):create_index('pk')
+for i = 0, 22 do box.space.test_timeout:replace({1}) fiber.sleep(0.01) end
+box.info.replication[3].upstream.status
+box.info.replication[3].upstream.message
+test_run:cmd('stop server test_timeout')
+test_run:cmd('cleanup server test_timeout')
+
 box.schema.user.revoke('guest', 'replication')
-- 
2.16.2




More information about the Tarantool-patches mailing list