[PATCH] Send relay heartbeat if wal changes won't be send

Georgy Kirichenko georgy at tarantool.org
Wed Feb 28 20:36:45 MSK 2018


If a replica receives some changes then corresponding wal events are
generated and relay fiber is waken up before heartbeat timeout.
But there may be nothing to send if all changes are from the current
relay peer. In this case an applier doesn't receive anything and break a
connection.

Fixes #3160
---
 src/box/relay.cc               | 23 ++++++------
 test/replication/misc.result   | 81 ++++++++++++++++++++++++++++++++++++++++++
 test/replication/misc.test.lua | 36 +++++++++++++++++++
 3 files changed, 129 insertions(+), 11 deletions(-)

Branch: https://github.com/tarantool/tarantool/tree/gh-3160-relay-heartbeat-on-applier-changes

Changes againt previos patch:
* use a last message timestamp for a hearbeat deadline calculation
* use fiber_cond_wait_deadline + ev_monotonic_now instead of
 fiber_cond_timeout
* fix test
diff --git a/src/box/relay.cc b/src/box/relay.cc
index f9f22d622..5ace53d4d 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -124,6 +124,10 @@ struct relay {
 	 * confirmation from the replica.
 	 */
 	struct stailq pending_gc;
+	/**
+	 * A time when last row was send.
+	 */
+	double last_row_tm;
 
 	struct {
 		/* Align to prevent false-sharing with tx thread */
@@ -433,13 +437,12 @@ relay_subscribe_f(va_list ap)
 	fiber_start(reader, relay, fiber());
 
 	/*
-	 * If the replica happens to be uptodate on subscribe,
+	 * If the replica happens to be up to date on subscribe,
 	 * don't wait for timeout to happen - send a heartbeat
 	 * message right away to update the replication lag as
 	 * soon as possible.
 	 */
-	if (vclock_compare(&r->vclock, &replicaset.vclock) == 0)
-		relay_send_heartbeat(relay);
+	relay_send_heartbeat(relay);
 
 	while (!fiber_is_cancelled()) {
 		double timeout = replication_timeout;
@@ -448,14 +451,8 @@ relay_subscribe_f(va_list ap)
 		if (inj != NULL && inj->dparam != 0)
 			timeout = inj->dparam;
 
-		if (fiber_cond_wait_timeout(&relay->reader_cond, timeout) != 0) {
-			/*
-			 * Timed out waiting for WAL events.
-			 * Send a heartbeat message to update
-			 * the replication lag on the slave.
-			 */
-			relay_send_heartbeat(relay);
-		}
+		fiber_cond_wait_deadline(&relay->reader_cond,
+					 relay->last_row_tm + timeout);
 
 		/*
 		 * The fiber can be woken by IO cancel, by a timeout of
@@ -463,6 +460,9 @@ relay_subscribe_f(va_list ap)
 		 * Handle cbus messages first.
 		 */
 		cbus_process(&relay->endpoint);
+		/* Check for a heartbeat timeout. */
+		if (ev_monotonic_now(loop()) - relay->last_row_tm > timeout)
+			relay_send_heartbeat(relay);
 		/*
 		 * Check that the vclock has been updated and the previous
 		 * status message is delivered
@@ -560,6 +560,7 @@ static void
 relay_send(struct relay *relay, struct xrow_header *packet)
 {
 	packet->sync = relay->sync;
+	relay->last_row_tm = ev_monotonic_now(loop());
 	coio_write_xrow(&relay->io, packet);
 	fiber_gc();
 
diff --git a/test/replication/misc.result b/test/replication/misc.result
index 070e4ea80..879c7fe3e 100644
--- a/test/replication/misc.result
+++ b/test/replication/misc.result
@@ -61,6 +61,87 @@ test_run:cmd('cleanup server test')
 box.cfg{read_only = false}
 ---
 ...
+-- gh-3160 - Send heartbeats if there are changes from a remote master only
+SERVERS = { 'autobootstrap1', 'autobootstrap2', 'autobootstrap3' }
+---
+...
+-- Deploy a cluster.
+test_run:create_cluster(SERVERS)
+---
+...
+test_run:wait_fullmesh(SERVERS)
+---
+...
+test_run:cmd("switch autobootstrap1")
+---
+- true
+...
+test_run = require('test_run').new()
+---
+...
+box.cfg{replication_timeout = 0.01}
+---
+...
+test_run:cmd("switch autobootstrap2")
+---
+- true
+...
+test_run = require('test_run').new()
+---
+...
+box.cfg{replication_timeout = 0.01}
+---
+...
+test_run:cmd("switch autobootstrap3")
+---
+- true
+...
+test_run = require('test_run').new()
+---
+...
+fiber=require('fiber')
+---
+...
+box.cfg{replication_timeout = 0.01}
+---
+...
+_ = box.schema.space.create('test_timeout'):create_index('pk')
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function test_timeout()
+    for i = 0, 99 do 
+        box.space.test_timeout:replace({1})
+        fiber.sleep(0.005)
+        local rinfo = box.info.replication
+        if rinfo[1].upstream and rinfo[1].upstream.status ~= 'follow' or
+           rinfo[2].upstream and rinfo[2].upstream.status ~= 'follow' or
+           rinfo[3].upstream and rinfo[3].upstream.status ~= 'follow' then
+            return error('Replication broken')
+        end
+    end
+    return true
+end ;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+test_timeout()
+---
+- true
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:drop_cluster(SERVERS)
+---
+...
 box.schema.user.revoke('guest', 'replication')
 ---
 ...
diff --git a/test/replication/misc.test.lua b/test/replication/misc.test.lua
index d4f714d91..8752182b7 100644
--- a/test/replication/misc.test.lua
+++ b/test/replication/misc.test.lua
@@ -22,4 +22,40 @@ test_run:cmd('stop server test')
 test_run:cmd('cleanup server test')
 box.cfg{read_only = false}
 
+-- gh-3160 - Send heartbeats if there are changes from a remote master only
+SERVERS = { 'autobootstrap1', 'autobootstrap2', 'autobootstrap3' }
+
+-- Deploy a cluster.
+test_run:create_cluster(SERVERS)
+test_run:wait_fullmesh(SERVERS)
+test_run:cmd("switch autobootstrap1")
+test_run = require('test_run').new()
+box.cfg{replication_timeout = 0.01}
+test_run:cmd("switch autobootstrap2")
+test_run = require('test_run').new()
+box.cfg{replication_timeout = 0.01}
+test_run:cmd("switch autobootstrap3")
+test_run = require('test_run').new()
+fiber=require('fiber')
+box.cfg{replication_timeout = 0.01}
+_ = box.schema.space.create('test_timeout'):create_index('pk')
+test_run:cmd("setopt delimiter ';'")
+function test_timeout()
+    for i = 0, 99 do 
+        box.space.test_timeout:replace({1})
+        fiber.sleep(0.005)
+        local rinfo = box.info.replication
+        if rinfo[1].upstream and rinfo[1].upstream.status ~= 'follow' or
+           rinfo[2].upstream and rinfo[2].upstream.status ~= 'follow' or
+           rinfo[3].upstream and rinfo[3].upstream.status ~= 'follow' then
+            return error('Replication broken')
+        end
+    end
+    return true
+end ;
+test_run:cmd("setopt delimiter ''");
+test_timeout()
+test_run:cmd("switch default")
+test_run:drop_cluster(SERVERS)
+
 box.schema.user.revoke('guest', 'replication')
-- 
2.16.2




More information about the Tarantool-patches mailing list