[PATCH] Break connection on timeout

Konstantin Belyavskiy k.belyavskiy at tarantool.org
Thu Feb 8 19:42:46 MSK 2018


In replication schema if one of the instances was powered off, it doesn't detected
by others and the connection hangs. Alive machines show 'follow' state.
Add timeout to solve this issue. It's safe since applier and relay both send
messages every replication_timeout so we can assume that if we read nothing we
have problem with connection.
Use replication_disconnect_timeout which is replication_timeout * 4 as for now.

Closes #3025
---
 src/box/applier.cc               |  7 ++++++-
 src/box/box.cc                   | 23 ++++++++++++++++++++-
 src/box/box.h                    |  1 +
 src/box/lua/cfg.cc               | 13 ++++++++++++
 src/box/lua/load_cfg.lua         |  4 ++++
 src/box/relay.cc                 |  2 +-
 src/box/replication.cc           |  1 +
 src/box/replication.h            |  7 ++-----
 test/app-tap/init_script.result  | 43 ++++++++++++++++++++--------------------
 test/box/admin.result            |  2 ++
 test/box/cfg.result              |  4 ++++
 test/replication/errinj.result   | 13 ++++++++++++
 test/replication/errinj.test.lua |  7 +++++++
 13 files changed, 98 insertions(+), 29 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index f0073bada..bc1f8fc5e 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -418,7 +418,12 @@ applier_subscribe(struct applier *applier)
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
-		coio_read_xrow(coio, ibuf, &row);
+		if (applier->version_id < version_id(1, 7, 7))
+			coio_read_xrow(coio, ibuf, &row);
+		else {
+			coio_read_xrow_timeout_xc(coio, ibuf, &row,
+						  replication_disconnect_timeout);
+		}
 
 		if (iproto_type_is_error(row.type))
 			xrow_decode_error_xc(&row);  /* error */
diff --git a/src/box/box.cc b/src/box/box.cc
index ec6c26342..039b8ba95 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -357,6 +357,17 @@ box_check_replication_timeout(void)
 	return timeout;
 }
 
+static double
+box_check_replication_disconnect_timeout(double replication_timeout)
+{
+	double disconnect_timeout = cfg_getd("replication_disconnect_timeout");
+	if (disconnect_timeout <= replication_timeout) {
+		tnt_raise(ClientError, ER_CFG, "replication_disconnect_timeout",
+			  "the value must be greather than replication_timeout");
+	}
+	return disconnect_timeout;
+}
+
 static int
 box_check_replication_connect_quorum(void)
 {
@@ -449,7 +460,7 @@ box_check_config()
 	box_check_instance_uuid(&uuid);
 	box_check_replicaset_uuid(&uuid);
 	box_check_replication();
-	box_check_replication_timeout();
+	box_check_replication_disconnect_timeout(box_check_replication_timeout());
 	box_check_replication_connect_quorum();
 	box_check_readahead(cfg_geti("readahead"));
 	box_check_checkpoint_count(cfg_geti("checkpoint_count"));
@@ -549,6 +560,15 @@ void
 box_set_replication_timeout(void)
 {
 	replication_timeout = box_check_replication_timeout();
+	replication_disconnect_timeout = replication_timeout * 4;
+}
+
+void
+box_set_replication_disconnect_timeout(void)
+{
+	replication_disconnect_timeout = box_check_replication_disconnect_timeout(
+		box_check_replication_timeout()
+	);
 }
 
 void
@@ -1622,6 +1642,7 @@ box_cfg_xc(void)
 	box_set_checkpoint_count();
 	box_set_too_long_threshold();
 	box_set_replication_timeout();
+	box_set_replication_disconnect_timeout();
 	box_set_replication_connect_quorum();
 	xstream_create(&join_stream, apply_initial_join_row);
 	xstream_create(&subscribe_stream, apply_row);
diff --git a/src/box/box.h b/src/box/box.h
index dc5d88679..0dd9a6de8 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -155,6 +155,7 @@ void box_set_memtx_max_tuple_size(void);
 void box_set_vinyl_max_tuple_size(void);
 void box_set_vinyl_timeout(void);
 void box_set_replication_timeout(void);
+void box_set_replication_disconnect_timeout(void);
 void box_set_replication_connect_quorum(void);
 
 extern "C" {
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index be7c46929..9fc7ee538 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -229,6 +229,17 @@ lbox_cfg_set_replication_timeout(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_cfg_set_replication_disconnect_timeout(struct lua_State *L)
+{
+	try {
+		box_set_replication_disconnect_timeout();
+	} catch (Exception *) {
+		luaT_error(L);
+	}
+	return 0;
+}
+
 static int
 lbox_cfg_set_replication_connect_quorum(struct lua_State *L)
 {
@@ -261,6 +272,8 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_vinyl_max_tuple_size", lbox_cfg_set_vinyl_max_tuple_size},
 		{"cfg_set_vinyl_timeout", lbox_cfg_set_vinyl_timeout},
 		{"cfg_set_replication_timeout", lbox_cfg_set_replication_timeout},
+		{"cfg_set_replication_disconnect_timeout",
+			lbox_cfg_set_replication_disconnect_timeout},
 		{"cfg_set_replication_connect_quorum",
 			lbox_cfg_set_replication_connect_quorum},
 		{NULL, NULL}
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 1f42bef8b..78f06122c 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -56,6 +56,7 @@ local default_cfg = {
     checkpoint_count    = 2,
     worker_pool_threads = 4,
     replication_timeout = 1,
+    replication_disconnect_timeout = 4,
     replication_connect_quorum = nil
 }
 
@@ -110,6 +111,7 @@ local template_cfg = {
     hot_standby         = 'boolean',
     worker_pool_threads = 'number',
     replication_timeout = 'number',
+    replication_disconnect_timeout = 'number',
     replication_connect_quorum = 'number',
 }
 
@@ -177,6 +179,7 @@ local dynamic_cfg = {
     end,
     force_recovery          = function() end,
     replication_timeout     = private.cfg_set_replication_timeout,
+    replication_disconnect_timeout = private.cfg_set_replication_disconnect_timeout,
     replication_connect_quorum = private.cfg_set_replication_connect_quorum,
 }
 
@@ -185,6 +188,7 @@ local dynamic_cfg_skip_at_load = {
     listen                  = true,
     replication             = true,
     replication_timeout     = true,
+    replication_disconnect_timeout = true,
     replication_connect_quorum = true,
     wal_dir_rescan_delay    = true,
     custom_proc_title       = true,
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 7102a5734..8d7e2745f 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -362,7 +362,7 @@ relay_reader_f(va_list ap)
 		while (!fiber_is_cancelled()) {
 			struct xrow_header xrow;
 			coio_read_xrow_timeout_xc(&io, &ibuf, &xrow,
-					replication_disconnect_timeout());
+					replication_disconnect_timeout);
 			/* vclock is followed while decoding, zeroing it. */
 			vclock_create(&relay->recv_vclock);
 			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
diff --git a/src/box/replication.cc b/src/box/replication.cc
index f901c365c..2a6ed7173 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -52,6 +52,7 @@ uint32_t instance_id = REPLICA_ID_NIL;
 struct tt_uuid REPLICASET_UUID;
 
 double replication_timeout = 1.0; /* seconds */
+double replication_disconnect_timeout = replication_timeout * 4; /* seconds */
 
 typedef rb_tree(struct replica) replicaset_t;
 rb_proto(, replicaset_, replicaset_t, struct replica)
diff --git a/src/box/replication.h b/src/box/replication.h
index f2f113f38..add134b8a 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -109,12 +109,9 @@ replication_reconnect_timeout(void)
 /**
  * Disconnect a replica if no heartbeat message has been
  * received from it within the given period.
+ * Set by box.cfg.replication_disconnect_timeout.
  */
-static inline double
-replication_disconnect_timeout(void)
-{
-	return replication_timeout * 4;
-}
+extern double replication_disconnect_timeout;
 
 /**
  * Fail box.cfg() if the quorum hasn't been assembled within
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 73dea399d..8488387da 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -21,27 +21,28 @@ box.cfg
 16	pid_file:box.pid
 17	read_only:false
 18	readahead:16320
-19	replication_timeout:1
-20	rows_per_wal:500000
-21	slab_alloc_factor:1.05
-22	too_long_threshold:0.5
-23	vinyl_bloom_fpr:0.05
-24	vinyl_cache:134217728
-25	vinyl_dir:.
-26	vinyl_max_tuple_size:1048576
-27	vinyl_memory:134217728
-28	vinyl_page_size:8192
-29	vinyl_range_size:1073741824
-30	vinyl_read_threads:1
-31	vinyl_run_count_per_level:2
-32	vinyl_run_size_ratio:3.5
-33	vinyl_timeout:60
-34	vinyl_write_threads:2
-35	wal_dir:.
-36	wal_dir_rescan_delay:2
-37	wal_max_size:268435456
-38	wal_mode:write
-39	worker_pool_threads:4
+19	replication_disconnect_timeout:4
+20	replication_timeout:1
+21	rows_per_wal:500000
+22	slab_alloc_factor:1.05
+23	too_long_threshold:0.5
+24	vinyl_bloom_fpr:0.05
+25	vinyl_cache:134217728
+26	vinyl_dir:.
+27	vinyl_max_tuple_size:1048576
+28	vinyl_memory:134217728
+29	vinyl_page_size:8192
+30	vinyl_range_size:1073741824
+31	vinyl_read_threads:1
+32	vinyl_run_count_per_level:2
+33	vinyl_run_size_ratio:3.5
+34	vinyl_timeout:60
+35	vinyl_write_threads:2
+36	wal_dir:.
+37	wal_dir_rescan_delay:2
+38	wal_max_size:268435456
+39	wal_mode:write
+40	worker_pool_threads:4
 --
 -- Test insert from detached fiber
 --
diff --git a/test/box/admin.result b/test/box/admin.result
index cb1c7d4ab..06c61a99c 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -54,6 +54,8 @@ cfg_filter(box.cfg)
     - false
   - - readahead
     - 16320
+  - - replication_disconnect_timeout
+    - 4
   - - replication_timeout
     - 1
   - - rows_per_wal
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 7be132e33..881e8a4c5 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -50,6 +50,8 @@ cfg_filter(box.cfg)
     - false
   - - readahead
     - 16320
+  - - replication_disconnect_timeout
+    - 4
   - - replication_timeout
     - 1
   - - rows_per_wal
@@ -135,6 +137,8 @@ cfg_filter(box.cfg)
     - false
   - - readahead
     - 16320
+  - - replication_disconnect_timeout
+    - 4
   - - replication_timeout
     - 1
   - - rows_per_wal
diff --git a/test/replication/errinj.result b/test/replication/errinj.result
index d1f1dbe91..d0121e209 100644
--- a/test/replication/errinj.result
+++ b/test/replication/errinj.result
@@ -426,6 +426,19 @@ test_run:cmd("switch replica_ack")
 ---
 - true
 ...
+-- applier connection timeout is by default proportional to replication_timeout, so since it's very low
+-- for this test case, we should manually set it to new value to allow connection. It should be higher
+-- than master's replication_timeout (0.1 in this case)
+box.cfg{replication_disconnect_timeout = 1.}
+---
+...
+fiber = require('fiber')
+---
+...
+-- wait master's replication_timeout (0.1) couple times to make sure connection is done (follow)
+fiber.sleep(0.5)
+---
+...
 box.info.replication[1].upstream.status
 ---
 - follow
diff --git a/test/replication/errinj.test.lua b/test/replication/errinj.test.lua
index ba83481fe..6555c7f2f 100644
--- a/test/replication/errinj.test.lua
+++ b/test/replication/errinj.test.lua
@@ -175,6 +175,13 @@ for i = 0, 9999 do box.space.test:replace({i, 4, 5, 'test'}) end
 test_run:cmd("create server replica_ack with rpl_master=default, script='replication/replica_ack.lua'")
 test_run:cmd("start server replica_ack")
 test_run:cmd("switch replica_ack")
+-- applier connection timeout is by default proportional to replication_timeout, so since it's very low
+-- for this test case, we should manually set it to new value to allow connection. It should be higher
+-- than master's replication_timeout (0.1 in this case)
+box.cfg{replication_disconnect_timeout = 1.}
+fiber = require('fiber')
+-- wait master's replication_timeout (0.1) couple times to make sure connection is done (follow)
+fiber.sleep(0.5)
 box.info.replication[1].upstream.status
 
 test_run:cmd("stop server default")
-- 
2.14.3 (Apple Git-98)




More information about the Tarantool-patches mailing list