[tarantool-patches] Re: [PATCH v2 2/2] iproto: allow to configure IPROTO_MSG_MAX

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri May 4 14:56:23 MSK 2018


Hello. Thanks for review!

On 04/05/2018 11:46, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy at tarantool.org> [18/05/04 00:07]:
> 
>> -struct iproto_bind_msg: public cbus_call_msg
>> +struct iproto_cfg_msg: public cbus_call_msg
>>   {
>> +	/** New URI to bind to. */
>>   	const char *uri;
>> +	bool need_update_uri;
>> +
>> +	/** New IProto max message count in fly. */
>> +	int iproto_msg_max;
>> +	bool need_update_msg_max;
>>   };
> 
> need_update -> update

Done.

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 8f4e27159..83833a2aa 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1738,11 +1738,11 @@ struct iproto_cfg_msg: public cbus_call_msg
  {
         /** New URI to bind to. */
         const char *uri;
-       bool need_update_uri;
+       bool update_uri;
  
         /** New IProto max message count in fly. */
         int iproto_msg_max;
-       bool need_update_msg_max;
+       bool update_msg_max;
  };

> 
>> +void
>> +fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size)
>> +{
>> +	if (new_max_size < FIBER_POOL_SIZE_DEFAULT)
>> +		new_max_size = FIBER_POOL_SIZE_DEFAULT;
>> +
>> +	if (new_max_size > pool->max_size) {
>> +		pool->max_size = new_max_size;
>> +		cbus_process(&pool->endpoint);
>> +	} else {
>> +		pool->max_size = new_max_size;
>> +	}
> 
> Why do you need cbus_process here? This is error-prone. Please
> remove.

Ok.

diff --git a/src/fiber_pool.c b/src/fiber_pool.c
index 3b9718ad3..f60b6f0ff 100644
--- a/src/fiber_pool.c
+++ b/src/fiber_pool.c
@@ -141,13 +141,7 @@ fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size)
  {
         if (new_max_size < FIBER_POOL_SIZE_DEFAULT)
                 new_max_size = FIBER_POOL_SIZE_DEFAULT;
-
-       if (new_max_size > pool->max_size) {
-               pool->max_size = new_max_size;
-               cbus_process(&pool->endpoint);
-       } else {
-               pool->max_size = new_max_size;
-       }
+       pool->max_size = new_max_size;
  }

> 
>> +--
>> +-- Test TX fiber pool size limit. It is increased together with
>> +-- iproto msg max.
>> +--
>> +run_max = 2500
>> +---
>> +...
>> +box.cfg{net_msg_max = 5000}
> 
> Please reduce the number of messages/workers in your tests to several or
> maybe a dozen, not hundreds or thousands. By having a lot of
> fibers/messages you don't test anything new, only make
> the test run longer and harder to debug.

Here I test increased fiber pool size. Its minimal value is 4096, and the
only way to test, that it is increased, is to send more than 4096 simultaneous
requests.

On my laptop it runs less than a second. Moreover, it can be run in parallel with
other tests, so I do not think, that 5k here is too many.

If you want, I can delete the test. Or reduce count from 5k to "maybe a dozen", but
it will not test anything. Should I do it?

> 
> The test would win from adding more randomness and more tries to
> changes in msg max: e.g. why not increase/decrease msg max a few
> times under unceasing load? This would test both growing the fiber
> pool and shrinking it.

There is no sense in randomness. It just makes it harder to reproduce. I added
test on fiber pool decreasing when it is stuck. Appears, that it hangs, until
at least one request is finished. So I reworked net_msg_max update. Diff becomes
too big, so see the entire patch below.


diff --git a/src/box/box.cc b/src/box/box.cc
index d2dfc5b5f..0d1f0e6ae 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -759,6 +759,14 @@ box_set_vinyl_timeout(void)
  	vinyl_engine_set_timeout(vinyl,	cfg_getd("vinyl_timeout"));
  }
  
+void
+box_set_net_msg_max(void)
+{
+	int new_iproto_msg_max = cfg_geti("net_msg_max");
+	iproto_set_msg_max(new_iproto_msg_max);
+	fiber_pool_set_max_size(&tx_fiber_pool, new_iproto_msg_max);
+}
+
  /* }}} configuration bindings */
  
  /**
@@ -1711,7 +1719,7 @@ static inline void
  box_cfg_xc(void)
  {
  	/* Join the cord interconnect as "tx" endpoint. */
-	fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE,
+	fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE_DEFAULT,
  			  FIBER_POOL_IDLE_TIMEOUT);
  	/* Add an extra endpoint for WAL wake up/rollback messages. */
  	cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
@@ -1735,6 +1743,7 @@ box_cfg_xc(void)
  	box_check_instance_uuid(&instance_uuid);
  	box_check_replicaset_uuid(&replicaset_uuid);
  
+	box_set_net_msg_max();
  	box_set_checkpoint_count();
  	box_set_too_long_threshold();
  	box_set_replication_timeout();
diff --git a/src/box/box.h b/src/box/box.h
index c9b5aad01..7726cb4f3 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -181,6 +181,7 @@ void box_set_vinyl_cache(void);
  void box_set_vinyl_timeout(void);
  void box_set_replication_timeout(void);
  void box_set_replication_connect_quorum(void);
+void box_set_net_msg_max(void);
  
  extern "C" {
  #endif /* defined(__cplusplus) */
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index baa6bb660..3212d9697 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -63,8 +63,10 @@
  #include "applier.h"
  #include "cfg.h"
  
-/* The number of iproto messages in flight */
-enum { IPROTO_MSG_MAX = 768 };
+enum {
+	IPROTO_MSG_MAX_DEFAULT = 768,
+	IPROTO_MSG_MAX_MIN = 2,
+};
  
  /**
   * Network readahead. A signed integer to avoid
@@ -83,6 +85,9 @@ enum { IPROTO_MSG_MAX = 768 };
   */
  unsigned iproto_readahead = 16320;
  
+/* The maximal number of iproto messages in fly. */
+static int iproto_msg_max = IPROTO_MSG_MAX_DEFAULT;
+
  /**
   * How big is a buffer which needs to be shrunk before
   * it is put back into buffer cache.
@@ -375,7 +380,7 @@ static inline bool
  iproto_check_msg_max()
  {
  	size_t request_count = mempool_count(&iproto_msg_pool);
-	return request_count > IPROTO_MSG_MAX;
+	return request_count > (size_t) iproto_msg_max;
  }
  
  static struct iproto_msg *
@@ -1692,7 +1697,7 @@ net_cord_f(va_list /* ap */)
  	cbus_endpoint_create(&endpoint, "net", fiber_schedule_cb, fiber());
  	/* Create a pipe to "tx" thread. */
  	cpipe_create(&tx_pipe, "tx");
-	cpipe_set_max_input(&tx_pipe, IPROTO_MSG_MAX/2);
+	cpipe_set_max_input(&tx_pipe, iproto_msg_max / 2);
  	/* Process incomming messages. */
  	cbus_loop(&endpoint);
  
@@ -1720,65 +1725,130 @@ iproto_init()
  
  	/* Create a pipe to "net" thread. */
  	cpipe_create(&net_pipe, "net");
-	cpipe_set_max_input(&net_pipe, IPROTO_MSG_MAX/2);
+	cpipe_set_max_input(&net_pipe, iproto_msg_max / 2);
  }
  
+/** IProto configuration change result codes. */
+enum {
+	IPROTO_CFG_OK,
+	IPROTO_CFG_ERROR,
+	IPROTO_CFG_NOT_FINISHED,
+};
+
+/** Available IProto configuration changes. */
+enum iproto_cfg_op {
+	IPROTO_CFG_BIND,
+	IPROTO_CFG_MSG_MAX,
+	IPROTO_CFG_LISTEN
+};
+
  /**
   * Since there is no way to "synchronously" change the
- * state of the io thread, to change the listen port
- * we need to bounce a couple of messages to and
- * from this thread.
+ * state of the io thread, to change the listen port or max
+ * message count in fly it is needed to send a special message to
+ * IProto thread.
   */
-struct iproto_bind_msg: public cbus_call_msg
+struct iproto_cfg_msg: public cmsg
  {
-	const char *uri;
+	/** Operation to execute in IProto thread. */
+	enum iproto_cfg_op op;
+	union {
+		/** New URI to bind to. */
+		const char *uri;
+
+		/** New IProto max message count in fly. */
+		int iproto_msg_max;
+	};
+	/**
+	 * Cfg result code that can be read atomically by
+	 * different threads - TX and IProto, because it consists
+	 * of single byte.
+	 */
+	int8_t rc;
+	/**
+	 * Diag can be read by TX thread only when rc becomes
+	 * not IPROTO_CFG_NOT_FINISHED.
+	 */
+	struct diag diag;
  };
  
-static int
-iproto_do_bind(struct cbus_call_msg *m)
+static inline void
+iproto_cfg_msg_create(struct iproto_cfg_msg *msg, enum iproto_cfg_op op)
+{
+	memset(msg, 0, sizeof(*msg));
+	msg->rc = IPROTO_CFG_NOT_FINISHED;
+	msg->op = op;
+}
+
+static void
+iproto_do_cfg_f(struct cmsg *m)
  {
-	const char *uri  = ((struct iproto_bind_msg *) m)->uri;
+	struct iproto_cfg_msg *cfg_msg = (struct iproto_cfg_msg *) m;
+	int old;
  	try {
-		if (evio_service_is_active(&binary))
-			evio_service_stop(&binary);
-		if (uri != NULL)
-			evio_service_bind(&binary, uri);
+		switch (cfg_msg->op) {
+		case IPROTO_CFG_BIND:
+			if (evio_service_is_active(&binary))
+				evio_service_stop(&binary);
+			if (cfg_msg->uri != NULL)
+				evio_service_bind(&binary, cfg_msg->uri);
+			break;
+		case IPROTO_CFG_MSG_MAX:
+			cpipe_set_max_input(&tx_pipe,
+					    cfg_msg->iproto_msg_max / 2);
+			old = iproto_msg_max;
+			iproto_msg_max = cfg_msg->iproto_msg_max;
+			if (old < iproto_msg_max)
+				iproto_resume();
+			break;
+		case IPROTO_CFG_LISTEN:
+			if (evio_service_is_active(&binary))
+				evio_service_listen(&binary);
+			break;
+		default:
+			unreachable();
+		}
+		cfg_msg->rc = IPROTO_CFG_OK;
  	} catch (Exception *e) {
-		return -1;
+		diag_move(diag_get(), &cfg_msg->diag);
+		cfg_msg->rc = IPROTO_CFG_ERROR;
  	}
-	return 0;
  }
  
-static int
-iproto_do_listen(struct cbus_call_msg *m)
+static inline int
+iproto_do_cfg(struct iproto_cfg_msg *msg)
  {
-	(void) m;
-	try {
-		if (evio_service_is_active(&binary))
-			evio_service_listen(&binary);
-	} catch (Exception *e) {
+	const struct cmsg_hop cfg_route[] = {
+		{ iproto_do_cfg_f, NULL },
+	};
+	cmsg_init(msg, cfg_route);
+	cpipe_push(&net_pipe, msg);
+	while (msg->rc == IPROTO_CFG_NOT_FINISHED)
+		fiber_sleep(0.001);
+	if (msg->rc == IPROTO_CFG_ERROR) {
+		diag_move(&msg->diag, diag_get());
  		return -1;
  	}
+	assert(msg->rc == IPROTO_CFG_OK);
  	return 0;
  }
  
  void
  iproto_bind(const char *uri)
  {
-	static struct iproto_bind_msg m;
-	m.uri = uri;
-	if (cbus_call(&net_pipe, &tx_pipe, &m, iproto_do_bind,
-		      NULL, TIMEOUT_INFINITY))
+	struct iproto_cfg_msg cfg_msg;
+	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_BIND);
+	cfg_msg.uri = uri;
+	if (iproto_do_cfg(&cfg_msg) != 0)
  		diag_raise();
  }
  
  void
  iproto_listen()
  {
-	/* Declare static to avoid stack corruption on fiber cancel. */
-	static struct cbus_call_msg m;
-	if (cbus_call(&net_pipe, &tx_pipe, &m, iproto_do_listen,
-		      NULL, TIMEOUT_INFINITY))
+	struct iproto_cfg_msg cfg_msg;
+	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_LISTEN);
+	if (iproto_do_cfg(&cfg_msg) != 0)
  		diag_raise();
  }
  
@@ -1793,3 +1863,19 @@ iproto_reset_stat(void)
  {
  	rmean_cleanup(rmean_net);
  }
+
+void
+iproto_set_msg_max(int new_iproto_msg_max)
+{
+	if (new_iproto_msg_max < IPROTO_MSG_MAX_MIN) {
+		tnt_raise(ClientError, ER_CFG, "iproto_msg_max",
+			  tt_sprintf("minimal value is %d",
+				     IPROTO_MSG_MAX_MIN));
+	}
+	struct iproto_cfg_msg cfg_msg;
+	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_MSG_MAX);
+	cfg_msg.iproto_msg_max = new_iproto_msg_max;
+	if (iproto_do_cfg(&cfg_msg) != 0)
+		diag_raise();
+	cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2);
+}
diff --git a/src/box/iproto.h b/src/box/iproto.h
index 0268000da..a1dddc405 100644
--- a/src/box/iproto.h
+++ b/src/box/iproto.h
@@ -63,6 +63,9 @@ iproto_bind(const char *uri);
  void
  iproto_listen();
  
+void
+iproto_set_msg_max(int iproto_msg_max);
+
  #endif /* defined(__cplusplus) */
  
  #endif
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 5e88ca348..629ded626 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -220,6 +220,17 @@ lbox_cfg_set_vinyl_timeout(struct lua_State *L)
  	return 0;
  }
  
+static int
+lbox_cfg_set_net_msg_max(struct lua_State *L)
+{
+	try {
+		box_set_net_msg_max();
+	} catch (Exception *) {
+		luaT_error(L);
+	}
+	return 0;
+}
+
  static int
  lbox_cfg_set_worker_pool_threads(struct lua_State *L)
  {
@@ -275,6 +286,7 @@ box_lua_cfg_init(struct lua_State *L)
  		{"cfg_set_replication_timeout", lbox_cfg_set_replication_timeout},
  		{"cfg_set_replication_connect_quorum",
  			lbox_cfg_set_replication_connect_quorum},
+		{"cfg_set_net_msg_max", lbox_cfg_set_net_msg_max},
  		{NULL, NULL}
  	};
  
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 3a5a6d46a..5e3efdb4b 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -63,6 +63,7 @@ local default_cfg = {
      feedback_enabled      = true,
      feedback_host         = "https://feedback.tarantool.io",
      feedback_interval     = 3600,
+    net_msg_max           = 768,
  }
  
  -- types of available options
@@ -123,6 +124,7 @@ local template_cfg = {
      feedback_enabled      = 'boolean',
      feedback_host         = 'string',
      feedback_interval     = 'number',
+    net_msg_max           = 'number',
  }
  
  local function normalize_uri(port)
@@ -195,6 +197,7 @@ local dynamic_cfg = {
      replication_timeout     = private.cfg_set_replication_timeout,
      replication_connect_quorum = private.cfg_set_replication_connect_quorum,
      replication_skip_conflict = function() end,
+    net_msg_max             = private.cfg_set_net_msg_max,
  }
  
  local dynamic_cfg_skip_at_load = {
diff --git a/src/fiber_pool.c b/src/fiber_pool.c
index aa8b19510..f60b6f0ff 100644
--- a/src/fiber_pool.c
+++ b/src/fiber_pool.c
@@ -136,6 +136,14 @@ fiber_pool_cb(ev_loop *loop, struct ev_watcher *watcher, int events)
  	}
  }
  
+void
+fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size)
+{
+	if (new_max_size < FIBER_POOL_SIZE_DEFAULT)
+		new_max_size = FIBER_POOL_SIZE_DEFAULT;
+	pool->max_size = new_max_size;
+}
+
  void
  fiber_pool_create(struct fiber_pool *pool, const char *name, int max_pool_size,
  		  float idle_timeout)
diff --git a/src/fiber_pool.h b/src/fiber_pool.h
index d6a95105b..2a04d6063 100644
--- a/src/fiber_pool.h
+++ b/src/fiber_pool.h
@@ -41,7 +41,10 @@
  extern "C" {
  #endif /* defined(__cplusplus) */
  
-enum { FIBER_POOL_SIZE = 4096, FIBER_POOL_IDLE_TIMEOUT = 1 };
+enum {
+	FIBER_POOL_SIZE_DEFAULT = 4096,
+	FIBER_POOL_IDLE_TIMEOUT = 1
+};
  
  /**
   * A pool of worker fibers to handle messages,
@@ -83,6 +86,15 @@ void
  fiber_pool_create(struct fiber_pool *pool, const char *name, int max_pool_size,
  		  float idle_timeout);
  
+/**
+ * Set maximal fiber pool size. If a new size is bigger than the
+ * current, then pending requests processing is started.
+ * @param pool Fiber pool to set size.
+ * @param new_max_size New maximal size.
+ */
+void
+fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size);
+
  /**
   * Destroy a fiber pool
   */
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 5625f1466..7504a2813 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -21,33 +21,34 @@ box.cfg
  16	memtx_max_tuple_size:1048576
  17	memtx_memory:107374182
  18	memtx_min_tuple_size:16
-19	pid_file:box.pid
-20	read_only:false
-21	readahead:16320
-22	replication_connect_timeout:4
-23	replication_skip_conflict:false
-24	replication_sync_lag:10
-25	replication_timeout:1
-26	rows_per_wal:500000
-27	slab_alloc_factor:1.05
-28	too_long_threshold:0.5
-29	vinyl_bloom_fpr:0.05
-30	vinyl_cache:134217728
-31	vinyl_dir:.
-32	vinyl_max_tuple_size:1048576
-33	vinyl_memory:134217728
-34	vinyl_page_size:8192
-35	vinyl_range_size:1073741824
-36	vinyl_read_threads:1
-37	vinyl_run_count_per_level:2
-38	vinyl_run_size_ratio:3.5
-39	vinyl_timeout:60
-40	vinyl_write_threads:2
-41	wal_dir:.
-42	wal_dir_rescan_delay:2
-43	wal_max_size:268435456
-44	wal_mode:write
-45	worker_pool_threads:4
+19	net_msg_max:768
+20	pid_file:box.pid
+21	read_only:false
+22	readahead:16320
+23	replication_connect_timeout:4
+24	replication_skip_conflict:false
+25	replication_sync_lag:10
+26	replication_timeout:1
+27	rows_per_wal:500000
+28	slab_alloc_factor:1.05
+29	too_long_threshold:0.5
+30	vinyl_bloom_fpr:0.05
+31	vinyl_cache:134217728
+32	vinyl_dir:.
+33	vinyl_max_tuple_size:1048576
+34	vinyl_memory:134217728
+35	vinyl_page_size:8192
+36	vinyl_range_size:1073741824
+37	vinyl_read_threads:1
+38	vinyl_run_count_per_level:2
+39	vinyl_run_size_ratio:3.5
+40	vinyl_timeout:60
+41	vinyl_write_threads:2
+42	wal_dir:.
+43	wal_dir_rescan_delay:2
+44	wal_max_size:268435456
+45	wal_mode:write
+46	worker_pool_threads:4
  --
  -- Test insert from detached fiber
  --
diff --git a/test/box/admin.result b/test/box/admin.result
index 2168c3adb..29c8c592d 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -54,6 +54,8 @@ cfg_filter(box.cfg)
      - 107374182
    - - memtx_min_tuple_size
      - <hidden>
+  - - net_msg_max
+    - 768
    - - pid_file
      - <hidden>
    - - read_only
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 28449d9cc..61c5f79af 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -50,6 +50,8 @@ cfg_filter(box.cfg)
      - 107374182
    - - memtx_min_tuple_size
      - <hidden>
+  - - net_msg_max
+    - 768
    - - pid_file
      - <hidden>
    - - read_only
@@ -147,6 +149,8 @@ cfg_filter(box.cfg)
      - 107374182
    - - memtx_min_tuple_size
      - <hidden>
+  - - net_msg_max
+    - 768
    - - pid_file
      - <hidden>
    - - read_only
@@ -411,6 +415,29 @@ test_run:cmd("cleanup server cfg_tester")
  ---
  - true
  ...
+--
+-- gh-3320: box.cfg{net_msg_max}.
+--
+box.cfg{net_msg_max = 'invalid'}
+---
+- error: 'Incorrect value for option ''net_msg_max'': should be of type number'
+...
+box.cfg{net_msg_max = 0}
+---
+- error: 'Incorrect value for option ''iproto_msg_max'': minimal value is 2'
+...
+old = box.cfg.net_msg_max
+---
+...
+box.cfg{net_msg_max = 2}
+---
+...
+box.cfg{net_msg_max = old + 1000}
+---
+...
+box.cfg{net_msg_max = old}
+---
+...
  test_run:cmd("clear filter")
  ---
  - true
diff --git a/test/box/cfg.test.lua b/test/box/cfg.test.lua
index a73ae395b..2d819c94c 100644
--- a/test/box/cfg.test.lua
+++ b/test/box/cfg.test.lua
@@ -81,4 +81,14 @@ test_run:cmd("switch default")
  test_run:cmd("stop server cfg_tester")
  test_run:cmd("cleanup server cfg_tester")
  
+--
+-- gh-3320: box.cfg{net_msg_max}.
+--
+box.cfg{net_msg_max = 'invalid'}
+box.cfg{net_msg_max = 0}
+old = box.cfg.net_msg_max
+box.cfg{net_msg_max = 2}
+box.cfg{net_msg_max = old + 1000}
+box.cfg{net_msg_max = old}
+
  test_run:cmd("clear filter")
diff --git a/test/box/net_msg_max.result b/test/box/net_msg_max.result
index de22bcbb9..a5315bd3d 100644
--- a/test/box/net_msg_max.result
+++ b/test/box/net_msg_max.result
@@ -25,7 +25,7 @@ finished = 0
  continue = false
  ---
  ...
-limit = 768
+limit = box.cfg.net_msg_max
  ---
  ...
  run_max = (limit - 100) / 2
@@ -121,6 +121,134 @@ wait_active(limit + 1)
  wait_finished(run_max)
  ---
  ...
+--
+-- gh-3320: allow to change maximal count of messages.
+--
+--
+-- Test minimal iproto msg count.
+--
+box.cfg{net_msg_max = 2}
+---
+...
+conn:ping()
+---
+- true
+...
+#conn.space._space:select{} > 0
+---
+- true
+...
+run_max = 15
+---
+...
+run_workers(conn)
+---
+...
+wait_active(3)
+---
+...
+active
+---
+- 3
+...
+wait_finished(run_max)
+---
+...
+--
+-- Increate maximal message count when nothing is blocked.
+--
+box.cfg{net_msg_max = limit * 2}
+---
+...
+run_max = limit * 2 - 100
+---
+...
+run_workers(conn)
+---
+...
+wait_active(run_max)
+---
+...
+-- Max can be decreased back even if now the limit is violated.
+-- But a new input is blocked in such a case.
+box.cfg{net_msg_max = limit}
+---
+...
+old_active = active
+---
+...
+for i = 1, 300 do fiber.create(do_long, conn) end
+---
+...
+-- Afer time active count is not changed - the input is blocked.
+wait_active(old_active)
+---
+...
+wait_finished(active + 300)
+---
+...
+--
+-- Check that changing net_msg_max can resume stopped
+-- connections.
+--
+run_max = limit / 2 + 100
+---
+...
+run_workers(conn)
+---
+...
+run_workers(conn2)
+---
+...
+wait_active(limit + 1)
+---
+...
+box.cfg{net_msg_max = limit * 2}
+---
+...
+wait_active(run_max * 2)
+---
+...
+wait_finished(active)
+---
+...
+--
+-- Test TX fiber pool size limit. It is increased together with
+-- iproto msg max.
+--
+run_max = 2500
+---
+...
+box.cfg{net_msg_max = 5000}
+---
+...
+run_workers(conn)
+---
+...
+run_workers(conn2)
+---
+...
+wait_active(5000)
+---
+...
+-- Allow to decrease tx fiber pool size even if is full already.
+box.cfg{net_msg_max = 3000}
+---
+...
+wait_active(5000)
+---
+...
+-- More workers can be run, but they will be blocked until older
+-- requests are finished.
+run_max = 100
+---
+...
+run_workers(conn)
+---
+...
+wait_finished(5100)
+---
+...
  conn2:close()
  ---
  ...
@@ -130,6 +258,6 @@ conn:close()
  box.schema.user.revoke('guest', 'read,write,execute', 'universe')
  ---
  ...
-box.cfg{readahead = old_readahead}
+box.cfg{readahead = old_readahead, net_msg_max = limit}
  ---
  ...
diff --git a/test/box/net_msg_max.test.lua b/test/box/net_msg_max.test.lua
index 39f8f53f7..378b8ab2a 100644
--- a/test/box/net_msg_max.test.lua
+++ b/test/box/net_msg_max.test.lua
@@ -9,7 +9,7 @@ conn2 = net_box.connect(box.cfg.listen)
  active = 0
  finished = 0
  continue = false
-limit = 768
+limit = box.cfg.net_msg_max
  run_max = (limit - 100) / 2
  
  old_readahead = box.cfg.readahead
@@ -72,8 +72,70 @@ run_workers(conn)
  wait_active(limit + 1)
  wait_finished(run_max)
  
+--
+-- gh-3320: allow to change maximal count of messages.
+--
+
+--
+-- Test minimal iproto msg count.
+--
+box.cfg{net_msg_max = 2}
+conn:ping()
+#conn.space._space:select{} > 0
+run_max = 15
+run_workers(conn)
+wait_active(3)
+active
+wait_finished(run_max)
+
+--
+-- Increate maximal message count when nothing is blocked.
+--
+box.cfg{net_msg_max = limit * 2}
+run_max = limit * 2 - 100
+run_workers(conn)
+wait_active(run_max)
+-- Max can be decreased back even if now the limit is violated.
+-- But a new input is blocked in such a case.
+box.cfg{net_msg_max = limit}
+old_active = active
+for i = 1, 300 do fiber.create(do_long, conn) end
+-- Afer time active count is not changed - the input is blocked.
+wait_active(old_active)
+wait_finished(active + 300)
+
+--
+-- Check that changing net_msg_max can resume stopped
+-- connections.
+--
+run_max = limit / 2 + 100
+run_workers(conn)
+run_workers(conn2)
+wait_active(limit + 1)
+box.cfg{net_msg_max = limit * 2}
+wait_active(run_max * 2)
+wait_finished(active)
+
+--
+-- Test TX fiber pool size limit. It is increased together with
+-- iproto msg max.
+--
+run_max = 2500
+box.cfg{net_msg_max = 5000}
+run_workers(conn)
+run_workers(conn2)
+wait_active(5000)
+-- Allow to decrease tx fiber pool size even if is full already.
+box.cfg{net_msg_max = 3000}
+wait_active(5000)
+-- More workers can be run, but they will be blocked until older
+-- requests are finished.
+run_max = 100
+run_workers(conn)
+wait_finished(5100)
+
  conn2:close()
  conn:close()
  
  box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-box.cfg{readahead = old_readahead}
+box.cfg{readahead = old_readahead, net_msg_max = limit}






More information about the Tarantool-patches mailing list