[PATCH 3/3] iproto: allow to configure IPROTO_MSG_MAX
    Vladislav Shpilevoy 
    v.shpilevoy at tarantool.org
       
    Sat Apr 21 01:52:11 MSK 2018
    
    
  
IPROTO_MSG_MAX is a constant that restricts count of requests in
fly. It allows to do not produce too many fibers in TX thread,
that would lead to too big overhead on fibers switching, their
stack storing.
But some users have powerful metal on which Tarantool
IPROTO_MSG_MAX constant is not serious. The patch exposes it as
a configuration runtime parameter.
'iproto_msg_max' is its name. If a user sees that IProto thread
is stuck due to too many requests, it can change iproto_msg_max
in runtime, and IProto thread immediately starts processing
pending requests.
'iproto_msg_max' can be decreased, but obviously it can not stop
already runned requests, so if now in IProto thread request count
is > new 'iproto_msg_max' value, then it takes some time until
some requests will be finished.
Closes #3320
---
 src/box/box.cc                  |   7 +++
 src/box/box.h                   |   1 +
 src/box/iproto.cc               |  70 +++++++++++++++++------
 src/box/iproto.h                |   3 +
 src/box/lua/cfg.cc              |  12 ++++
 src/box/lua/load_cfg.lua        |   3 +
 test/app-tap/init_script.result |  73 ++++++++++++------------
 test/box/admin.result           |   2 +
 test/box/cfg.result             |  24 ++++++++
 test/box/cfg.test.lua           |   9 +++
 test/box/request_limit.result   | 119 +++++++++++++++++++++++++++++++++++++++-
 test/box/request_limit.test.lua |  55 ++++++++++++++++++-
 12 files changed, 322 insertions(+), 56 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index d2dfc5b5f..80684ad48 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -759,6 +759,12 @@ box_set_vinyl_timeout(void)
 	vinyl_engine_set_timeout(vinyl,	cfg_getd("vinyl_timeout"));
 }
 
+void
+box_set_iproto_msg_max(void)
+{
+	iproto_set_msg_max(cfg_geti("iproto_msg_max"));
+}
+
 /* }}} configuration bindings */
 
 /**
@@ -1735,6 +1741,7 @@ box_cfg_xc(void)
 	box_check_instance_uuid(&instance_uuid);
 	box_check_replicaset_uuid(&replicaset_uuid);
 
+	box_set_iproto_msg_max();
 	box_set_checkpoint_count();
 	box_set_too_long_threshold();
 	box_set_replication_timeout();
diff --git a/src/box/box.h b/src/box/box.h
index c9b5aad01..712e21191 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -181,6 +181,7 @@ void box_set_vinyl_cache(void);
 void box_set_vinyl_timeout(void);
 void box_set_replication_timeout(void);
 void box_set_replication_connect_quorum(void);
+void box_set_iproto_msg_max(void);
 
 extern "C" {
 #endif /* defined(__cplusplus) */
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 585e8cc83..1266eb255 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -63,8 +63,7 @@
 #include "applier.h"
 #include "cfg.h"
 
-/* The number of iproto messages in flight */
-enum { IPROTO_MSG_MAX = 768 };
+enum { IPROTO_MSG_MAX_MIN = 768 };
 
 /**
  * Network readahead. A signed integer to avoid
@@ -83,6 +82,9 @@ enum { IPROTO_MSG_MAX = 768 };
  */
 unsigned iproto_readahead = 16320;
 
+/* The maximal number of iproto messages in flight. */
+static int iproto_msg_max = IPROTO_MSG_MAX_MIN;
+
 /**
  * How big is a buffer which needs to be shrunk before
  * it is put back into buffer cache.
@@ -381,7 +383,7 @@ iproto_must_stop_input()
 {
 	size_t connection_count = mempool_count(&iproto_connection_pool);
 	size_t request_count = mempool_count(&iproto_msg_pool);
-	return request_count > connection_count + IPROTO_MSG_MAX;
+	return request_count > connection_count + iproto_msg_max;
 }
 
 /**
@@ -1623,7 +1625,7 @@ net_cord_f(va_list /* ap */)
 	cbus_endpoint_create(&endpoint, "net", fiber_schedule_cb, fiber());
 	/* Create a pipe to "tx" thread. */
 	cpipe_create(&tx_pipe, "tx");
-	cpipe_set_max_input(&tx_pipe, IPROTO_MSG_MAX/2);
+	cpipe_set_max_input(&tx_pipe, iproto_msg_max / 2);
 	/* Process incomming messages. */
 	cbus_loop(&endpoint);
 
@@ -1651,29 +1653,47 @@ iproto_init()
 
 	/* Create a pipe to "net" thread. */
 	cpipe_create(&net_pipe, "net");
-	cpipe_set_max_input(&net_pipe, IPROTO_MSG_MAX/2);
+	cpipe_set_max_input(&net_pipe, iproto_msg_max / 2);
 }
 
 /**
  * Since there is no way to "synchronously" change the
- * state of the io thread, to change the listen port
- * we need to bounce a couple of messages to and
- * from this thread.
+ * state of the io thread, to change the listen port or max
+ * message count in fly it is needed to bounce a couple of
+ * messages to and from this thread.
  */
 struct iproto_cfg_msg: public cbus_call_msg
 {
+	/** New URI to bind to. */
 	const char *uri;
+	bool need_update_uri;
+
+	/** New IProto max message count in fly. */
+	int iproto_msg_max;
+	bool need_update_msg_max;
 };
 
+static struct iproto_cfg_msg cfg_msg;
+
 static int
 iproto_do_cfg(struct cbus_call_msg *m)
 {
-	const char *uri  = ((struct iproto_cfg_msg *) m)->uri;
+	assert(m == &cfg_msg);
 	try {
-		if (evio_service_is_active(&binary))
-			evio_service_stop(&binary);
-		if (uri != NULL)
-			evio_service_bind(&binary, uri);
+		if (cfg_msg.need_update_uri) {
+			if (evio_service_is_active(&binary))
+				evio_service_stop(&binary);
+			if (cfg_msg.uri != NULL)
+				evio_service_bind(&binary, cfg_msg.uri);
+		}
+		if (cfg_msg.need_update_msg_max) {
+			cpipe_set_max_input(&tx_pipe,
+					    cfg_msg.iproto_msg_max / 2);
+			int old = iproto_msg_max;
+			iproto_msg_max = cfg_msg.iproto_msg_max;
+			if (old < iproto_msg_max)
+				iproto_resume();
+		}
 	} catch (Exception *e) {
 		return -1;
 	}
@@ -1696,9 +1716,10 @@ iproto_do_listen(struct cbus_call_msg *m)
 void
 iproto_bind(const char *uri)
 {
-	static struct iproto_cfg_msg m;
-	m.uri = uri;
-	if (cbus_call(&net_pipe, &tx_pipe, &m, iproto_do_cfg,
+	memset(&cfg_msg, 0, sizeof(cfg_msg));
+	cfg_msg.need_update_uri = true;
+	cfg_msg.uri = uri;
+	if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg,
 		      NULL, TIMEOUT_INFINITY))
 		diag_raise();
 }
@@ -1724,3 +1745,20 @@ iproto_reset_stat(void)
 {
 	rmean_cleanup(rmean_net);
 }
+
+void
+iproto_set_msg_max(int new_iproto_msg_max)
+{
+	if (new_iproto_msg_max < IPROTO_MSG_MAX_MIN) {
+		tnt_raise(ClientError, ER_CFG, "iproto_msg_max",
+			  tt_sprintf("minimal value is %d",
+				     IPROTO_MSG_MAX_MIN));
+	}
+	memset(&cfg_msg, 0, sizeof(cfg_msg));
+	cfg_msg.need_update_msg_max = true;
+	cfg_msg.iproto_msg_max = new_iproto_msg_max;
+	if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg,
+		      NULL, TIMEOUT_INFINITY))
+		diag_raise();
+	cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2);
+}
diff --git a/src/box/iproto.h b/src/box/iproto.h
index 0268000da..a1dddc405 100644
--- a/src/box/iproto.h
+++ b/src/box/iproto.h
@@ -63,6 +63,9 @@ iproto_bind(const char *uri);
 void
 iproto_listen();
 
+void
+iproto_set_msg_max(int iproto_msg_max);
+
 #endif /* defined(__cplusplus) */
 
 #endif
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 5e88ca348..1fd953011 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -220,6 +220,17 @@ lbox_cfg_set_vinyl_timeout(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_cfg_set_iproto_msg_max(struct lua_State *L)
+{
+	try {
+		box_set_iproto_msg_max();
+	} catch (Exception *) {
+		luaT_error(L);
+	}
+	return 0;
+}
+
 static int
 lbox_cfg_set_worker_pool_threads(struct lua_State *L)
 {
@@ -275,6 +286,7 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_replication_timeout", lbox_cfg_set_replication_timeout},
 		{"cfg_set_replication_connect_quorum",
 			lbox_cfg_set_replication_connect_quorum},
+		{"cfg_set_iproto_msg_max", lbox_cfg_set_iproto_msg_max},
 		{NULL, NULL}
 	};
 
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 3a5a6d46a..6ed30e016 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -63,6 +63,7 @@ local default_cfg = {
     feedback_enabled      = true,
     feedback_host         = "https://feedback.tarantool.io",
     feedback_interval     = 3600,
+    iproto_msg_max        = 768,
 }
 
 -- types of available options
@@ -123,6 +124,7 @@ local template_cfg = {
     feedback_enabled      = 'boolean',
     feedback_host         = 'string',
     feedback_interval     = 'number',
+    iproto_msg_max        = 'number',
 }
 
 local function normalize_uri(port)
@@ -195,6 +197,7 @@ local dynamic_cfg = {
     replication_timeout     = private.cfg_set_replication_timeout,
     replication_connect_quorum = private.cfg_set_replication_connect_quorum,
     replication_skip_conflict = function() end,
+    iproto_msg_max          = private.cfg_set_iproto_msg_max,
 }
 
 local dynamic_cfg_skip_at_load = {
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 5625f1466..741f764af 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -12,42 +12,43 @@ box.cfg
 7	feedback_interval:3600
 8	force_recovery:false
 9	hot_standby:false
-10	listen:port
-11	log:tarantool.log
-12	log_format:plain
-13	log_level:5
-14	log_nonblock:true
-15	memtx_dir:.
-16	memtx_max_tuple_size:1048576
-17	memtx_memory:107374182
-18	memtx_min_tuple_size:16
-19	pid_file:box.pid
-20	read_only:false
-21	readahead:16320
-22	replication_connect_timeout:4
-23	replication_skip_conflict:false
-24	replication_sync_lag:10
-25	replication_timeout:1
-26	rows_per_wal:500000
-27	slab_alloc_factor:1.05
-28	too_long_threshold:0.5
-29	vinyl_bloom_fpr:0.05
-30	vinyl_cache:134217728
-31	vinyl_dir:.
-32	vinyl_max_tuple_size:1048576
-33	vinyl_memory:134217728
-34	vinyl_page_size:8192
-35	vinyl_range_size:1073741824
-36	vinyl_read_threads:1
-37	vinyl_run_count_per_level:2
-38	vinyl_run_size_ratio:3.5
-39	vinyl_timeout:60
-40	vinyl_write_threads:2
-41	wal_dir:.
-42	wal_dir_rescan_delay:2
-43	wal_max_size:268435456
-44	wal_mode:write
-45	worker_pool_threads:4
+10	iproto_msg_max:768
+11	listen:port
+12	log:tarantool.log
+13	log_format:plain
+14	log_level:5
+15	log_nonblock:true
+16	memtx_dir:.
+17	memtx_max_tuple_size:1048576
+18	memtx_memory:107374182
+19	memtx_min_tuple_size:16
+20	pid_file:box.pid
+21	read_only:false
+22	readahead:16320
+23	replication_connect_timeout:4
+24	replication_skip_conflict:false
+25	replication_sync_lag:10
+26	replication_timeout:1
+27	rows_per_wal:500000
+28	slab_alloc_factor:1.05
+29	too_long_threshold:0.5
+30	vinyl_bloom_fpr:0.05
+31	vinyl_cache:134217728
+32	vinyl_dir:.
+33	vinyl_max_tuple_size:1048576
+34	vinyl_memory:134217728
+35	vinyl_page_size:8192
+36	vinyl_range_size:1073741824
+37	vinyl_read_threads:1
+38	vinyl_run_count_per_level:2
+39	vinyl_run_size_ratio:3.5
+40	vinyl_timeout:60
+41	vinyl_write_threads:2
+42	wal_dir:.
+43	wal_dir_rescan_delay:2
+44	wal_max_size:268435456
+45	wal_mode:write
+46	worker_pool_threads:4
 --
 -- Test insert from detached fiber
 --
diff --git a/test/box/admin.result b/test/box/admin.result
index 2168c3adb..7a1432a76 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -36,6 +36,8 @@ cfg_filter(box.cfg)
     - false
   - - hot_standby
     - false
+  - - iproto_msg_max
+    - 768
   - - listen
     - <hidden>
   - - log
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 28449d9cc..af3de4537 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -32,6 +32,8 @@ cfg_filter(box.cfg)
     - false
   - - hot_standby
     - false
+  - - iproto_msg_max
+    - 768
   - - listen
     - <hidden>
   - - log
@@ -129,6 +131,8 @@ cfg_filter(box.cfg)
     - false
   - - hot_standby
     - false
+  - - iproto_msg_max
+    - 768
   - - listen
     - <hidden>
   - - log
@@ -411,6 +415,26 @@ test_run:cmd("cleanup server cfg_tester")
 ---
 - true
 ...
+--
+-- gh-3320: box.cfg{iproto_msg_max}.
+--
+box.cfg{iproto_msg_max = 'invalid'}
+---
+- error: 'Incorrect value for option ''iproto_msg_max'': should be of type number'
+...
+box.cfg{iproto_msg_max = 10}
+---
+- error: 'Incorrect value for option ''iproto_msg_max'': minimal value is 768'
+...
+old = box.cfg.iproto_msg_max
+---
+...
+box.cfg{iproto_msg_max = old + 1000}
+---
+...
+box.cfg{iproto_msg_max = old}
+---
+...
 test_run:cmd("clear filter")
 ---
 - true
diff --git a/test/box/cfg.test.lua b/test/box/cfg.test.lua
index a73ae395b..c2ac91c74 100644
--- a/test/box/cfg.test.lua
+++ b/test/box/cfg.test.lua
@@ -81,4 +81,13 @@ test_run:cmd("switch default")
 test_run:cmd("stop server cfg_tester")
 test_run:cmd("cleanup server cfg_tester")
 
+--
+-- gh-3320: box.cfg{iproto_msg_max}.
+--
+box.cfg{iproto_msg_max = 'invalid'}
+box.cfg{iproto_msg_max = 10}
+old = box.cfg.iproto_msg_max
+box.cfg{iproto_msg_max = old + 1000}
+box.cfg{iproto_msg_max = old}
+
 test_run:cmd("clear filter")
diff --git a/test/box/request_limit.result b/test/box/request_limit.result
index 464528b41..acdac2493 100644
--- a/test/box/request_limit.result
+++ b/test/box/request_limit.result
@@ -22,7 +22,7 @@ active = 0
 continue = false
 ---
 ...
-limit = 768
+limit = box.cfg.iproto_msg_max
 ---
 ...
 run_max = (limit - 100) / 2
@@ -100,6 +100,121 @@ continue = true
 while active ~= 0 do fiber.sleep(0.01) end
 ---
 ...
+--
+-- gh-3320: allow to change maximal count of messages.
+--
+--
+-- Increate maximal message count when nothing is blocked.
+--
+box.cfg{iproto_msg_max = limit * 2}
+---
+...
+run_max = limit * 2 - 100
+---
+...
+run_workers(conn)
+---
+...
+wait_block()
+---
+...
+active == run_max
+---
+- true
+...
+-- Max can be decreased back even if now the limit is violated.
+-- But a new input is blocked in such a case.
+box.cfg{iproto_msg_max = limit}
+---
+...
+old_active = active
+---
+...
+for i = 1, 300 do fiber.create(do_long, conn) end
+---
+...
+-- Afer time active cout is not changed - the input is blocked.
+wait_block()
+---
+...
+active == old_active
+---
+- true
+...
+continue = true
+---
+...
+while active ~= 0 do fiber.sleep(0.01) end
+---
+...
+--
+-- Check that changing iproto_msg_max can resume stopped
+-- connections.
+--
+run_max = limit / 2 + 100
+---
+...
+run_workers(conn)
+---
+...
+run_workers(conn2)
+---
+...
+wait_block()
+---
+...
+active >= limit
+---
+- true
+...
+active < run_max * 2
+---
+- true
+...
+box.cfg{iproto_msg_max = limit * 2}
+---
+...
+wait_block()
+---
+...
+active == run_max * 2
+---
+- true
+...
+continue = true
+---
+...
+while active ~= 0 do fiber.sleep(0.01) end
+---
+...
+--
+-- Test TX fiber pool size limit.
+--
+run_max = 2500
+---
+...
+box.cfg{iproto_msg_max = 5000}
+---
+...
+run_workers(conn)
+---
+...
+run_workers(conn2)
+---
+...
+wait_block()
+---
+...
+active
+---
+- 4096
+...
+continue = true
+---
+...
+while active ~= 0 do fiber.sleep(0.01) end
+---
+...
 conn2:close()
 ---
 ...
@@ -109,6 +224,6 @@ conn:close()
 box.schema.user.revoke('guest', 'read,write,execute', 'universe')
 ---
 ...
-box.cfg{readahead = old_readahead}
+box.cfg{readahead = old_readahead, iproto_msg_max = limit}
 ---
 ...
diff --git a/test/box/request_limit.test.lua b/test/box/request_limit.test.lua
index 81298d91d..f002cea04 100644
--- a/test/box/request_limit.test.lua
+++ b/test/box/request_limit.test.lua
@@ -8,7 +8,7 @@ conn = net_box.connect(box.cfg.listen)
 conn2 = net_box.connect(box.cfg.listen)
 active = 0
 continue = false
-limit = 768
+limit = box.cfg.iproto_msg_max
 run_max = (limit - 100) / 2
 
 old_readahead = box.cfg.readahead
@@ -56,8 +56,59 @@ active == run_max * 2 or active
 continue = true
 while active ~= 0 do fiber.sleep(0.01) end
 
+--
+-- gh-3320: allow to change maximal count of messages.
+--
+
+--
+-- Increate maximal message count when nothing is blocked.
+--
+box.cfg{iproto_msg_max = limit * 2}
+run_max = limit * 2 - 100
+run_workers(conn)
+wait_block()
+active == run_max
+-- Max can be decreased back even if now the limit is violated.
+-- But a new input is blocked in such a case.
+box.cfg{iproto_msg_max = limit}
+old_active = active
+for i = 1, 300 do fiber.create(do_long, conn) end
+-- Afer time active cout is not changed - the input is blocked.
+wait_block()
+active == old_active
+continue = true
+while active ~= 0 do fiber.sleep(0.01) end
+
+--
+-- Check that changing iproto_msg_max can resume stopped
+-- connections.
+--
+run_max = limit / 2 + 100
+run_workers(conn)
+run_workers(conn2)
+wait_block()
+active >= limit
+active < run_max * 2
+box.cfg{iproto_msg_max = limit * 2}
+wait_block()
+active == run_max * 2
+continue = true
+while active ~= 0 do fiber.sleep(0.01) end
+
+--
+-- Test TX fiber pool size limit.
+--
+run_max = 2500
+box.cfg{iproto_msg_max = 5000}
+run_workers(conn)
+run_workers(conn2)
+wait_block()
+active
+continue = true
+while active ~= 0 do fiber.sleep(0.01) end
+
 conn2:close()
 conn:close()
 
 box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-box.cfg{readahead = old_readahead}
+box.cfg{readahead = old_readahead, iproto_msg_max = limit}
-- 
2.15.1 (Apple Git-101)
    
    
More information about the Tarantool-patches
mailing list