[PATCH] Send relay heartbeat if wal changes won't be send

Georgy Kirichenko georgy at tarantool.org
Mon Feb 26 19:04:18 MSK 2018


If a replica receives some changes then corresponding wal events are
generated and relay fiber is waken up before heartbeat timeout.
But there may be nothing to send if all changes are from the current
relay peer. In this case an applier doesn't receive anything and break a
connection.

Fixes #3160
Branch gh-3160-relay-heartbeat-on-applier-changes
---
 src/box/relay.cc               | 45 ++++++++++++++++++-------------
 test/replication/misc.result   | 60 ++++++++++++++++++++++++++++++++++++++++++
 test/replication/misc.test.lua | 18 +++++++++++++
 3 files changed, 104 insertions(+), 19 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index f9f22d622..ebe2a8167 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -432,37 +432,44 @@ relay_subscribe_f(va_list ap)
 	fiber_set_joinable(reader, true);
 	fiber_start(reader, relay, fiber());
 
-	/*
-	 * If the replica happens to be uptodate on subscribe,
-	 * don't wait for timeout to happen - send a heartbeat
-	 * message right away to update the replication lag as
-	 * soon as possible.
-	 */
-	if (vclock_compare(&r->vclock, &replicaset.vclock) == 0)
-		relay_send_heartbeat(relay);
+	/* Send a first one heartbeat and set a keep-alive timeout. */
+	relay_send_heartbeat(relay);
+	ev_tstamp start, timeout;
+	coio_timeout_init(&start, &timeout, replication_timeout);
 
 	while (!fiber_is_cancelled()) {
-		double timeout = replication_timeout;
 		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
 					    ERRINJ_DOUBLE);
-		if (inj != NULL && inj->dparam != 0)
-			timeout = inj->dparam;
+		if (inj != NULL && inj->dparam != 0 && inj->dparam - timeout > 0)
+			fiber_sleep(inj->dparam - timeout);
 
-		if (fiber_cond_wait_timeout(&relay->reader_cond, timeout) != 0) {
-			/*
-			 * Timed out waiting for WAL events.
-			 * Send a heartbeat message to update
-			 * the replication lag on the slave.
-			 */
-			relay_send_heartbeat(relay);
-		}
+		fiber_cond_wait_timeout(&relay->reader_cond, timeout);
 
+		struct vclock old_vclock;
+		vclock_copy(&old_vclock, &r->vclock);
 		/*
 		 * The fiber can be woken by IO cancel, by a timeout of
 		 * status messaging or by an acknowledge to status message.
 		 * Handle cbus messages first.
 		 */
 		cbus_process(&relay->endpoint);
+				relay_send_heartbeat(relay);
+		/* Update timeout. */
+		coio_timeout_update(start, &timeout);
+		/*
+		 * Reset timeout if there are changes from any other
+		 * that current peer instance.
+		 */
+		if (relay->replica->id &&
+		    vclock_sum(&r->vclock) - vclock_sum(&old_vclock) >
+		    vclock_get(&r->vclock, relay->replica->id) -
+		    vclock_get(&old_vclock, relay->replica->id))
+			coio_timeout_init(&start, &timeout, replication_timeout);
+		if (timeout < 0) {
+			/* It is time to send heartbeat. */
+			relay_send_heartbeat(relay);
+			coio_timeout_init(&start, &timeout, replication_timeout);
+		}
 		/*
 		 * Check that the vclock has been updated and the previous
 		 * status message is delivered
diff --git a/test/replication/misc.result b/test/replication/misc.result
index 070e4ea80..7cd68d329 100644
--- a/test/replication/misc.result
+++ b/test/replication/misc.result
@@ -61,6 +61,66 @@ test_run:cmd('cleanup server test')
 box.cfg{read_only = false}
 ---
 ...
+test_run:cmd('create server test_timeout with rpl_master=default, script="replication/replica.lua"')
+---
+- true
+...
+box.cfg{replication_timeout = 0.05}
+---
+...
+test_run:cmd('start server test_timeout')
+---
+- true
+...
+test_run:cmd('switch test_timeout')
+---
+- true
+...
+test_run = require('test_run').new()
+---
+...
+test_run:cmd(string.format('eval default "box.cfg{replication = \'%s\'}"', box.cfg.listen))
+---
+- []
+...
+old_replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication_timeout = 0.05, replication = old_replication}
+---
+...
+test_run:cmd('switch default')
+---
+- true
+...
+fiber = require'fiber'
+---
+...
+_ = box.schema.space.create('test_timeout'):create_index('pk')
+---
+...
+for i = 0, 22 do box.space.test_timeout:replace({1}) fiber.sleep(0.01) end
+---
+...
+box.info.replication[3].upstream.status
+---
+- follow
+...
+box.info.replication[3].upstream.message
+---
+- null
+...
+test_run:cmd('stop server test_timeout')
+---
+- true
+...
+test_run:cmd('cleanup server test_timeout')
+---
+- true
+...
 box.schema.user.revoke('guest', 'replication')
 ---
 ...
diff --git a/test/replication/misc.test.lua b/test/replication/misc.test.lua
index d4f714d91..a8f375b53 100644
--- a/test/replication/misc.test.lua
+++ b/test/replication/misc.test.lua
@@ -22,4 +22,22 @@ test_run:cmd('stop server test')
 test_run:cmd('cleanup server test')
 box.cfg{read_only = false}
 
+test_run:cmd('create server test_timeout with rpl_master=default, script="replication/replica.lua"')
+box.cfg{replication_timeout = 0.05}
+test_run:cmd('start server test_timeout')
+test_run:cmd('switch test_timeout')
+test_run = require('test_run').new()
+test_run:cmd(string.format('eval default "box.cfg{replication = \'%s\'}"', box.cfg.listen))
+old_replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication_timeout = 0.05, replication = old_replication}
+test_run:cmd('switch default')
+fiber = require'fiber'
+_ = box.schema.space.create('test_timeout'):create_index('pk')
+for i = 0, 22 do box.space.test_timeout:replace({1}) fiber.sleep(0.01) end
+box.info.replication[3].upstream.status
+box.info.replication[3].upstream.message
+test_run:cmd('stop server test_timeout')
+test_run:cmd('cleanup server test_timeout')
+
 box.schema.user.revoke('guest', 'replication')
-- 
2.16.2




More information about the Tarantool-patches mailing list