Tarantool development patches archive
 help / color / mirror / Atom feed
* [PATCH v2 0/4] IProto fixes and iproto_msg_max option
@ 2018-04-23 17:05 Vladislav Shpilevoy
  2018-04-23 17:05 ` [PATCH v2 1/4] iproto: fix error with input discarding Vladislav Shpilevoy
                   ` (3 more replies)
  0 siblings, 4 replies; 9+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-23 17:05 UTC (permalink / raw)
  To: tarantool-patches; +Cc: vdavydov.dev

Branch: http://github.com/tarantool/tarantool/tree/gh-3320-config-msg-max
Issue: https://github.com/tarantool/tarantool/issues/3320

IPROTO_MSG_MAX is a constant that restricts count of requests in fly. It allows
to do not produce too many fibers in TX thread, that would lead to too big
overhead on fibers switching, their stack storing.

But some users have powerful metal on which Tarantool IPROTO_MSG_MAX constant is
not serious, or they want to run more long-poll requests. The patch exposes it
as a configuration runtime parameter.

'iproto_msg_max' is its name. If a user sees that IProto thread is stuck due to
too many requests, it can change iproto_msg_max in runtime, and IProto thread
immediately starts processing pending requests.

'iproto_msg_max' can be decreased, but obviously it can not stop already runned
requests, so if now in IProto thread request count is > new 'iproto_msg_max'
value, then it takes some time until some requests will be finished.

Maximal IProto message count is very linked with TX fiber pool size, that limits
count of fibers produced by remote clients, and transactions. Now it is
configurable too.

In the same patchset a bug is fixed with incorrect discarding long-poll requests
input. Discard just calls iproto_resume() trying to notify libev, that is can
get new data. But is makes no sense, because iproto_resume() continues only
connections stopped due to reached limit of maximal requests in fly, but not
connections whose buffer is overflowed. Such stopped connections can be
continued only after a request is complete.

To continue read from the connection whose buffer was freed by discarding
long-poll it is necessary to explicitly feed to it EV_READ event. On this event iproto_connection_on_input will read a blocked data if it exists.

Another fixed bug is unstoppable batching ignoring the message limit on IProto
requests.

Vladislav Shpilevoy (4):
  iproto: fix error with input discarding
  iproto: fix error with unstoppable batching
  iproto: allow to configure IPROTO_MSG_MAX
  Allow to configure TX fiber pool size

 src/box/box.cc                  |  16 +-
 src/box/box.h                   |   1 +
 src/box/iproto.cc               | 111 ++++++++++----
 src/box/iproto.h                |   3 +
 src/box/lua/cfg.cc              |  12 ++
 src/box/lua/load_cfg.lua        |   5 +
 src/fiber_pool.h                |   6 +-
 test/app-tap/init_script.result |  73 ++++-----
 test/box/admin.result           |   2 +
 test/box/cfg.result             |  27 ++++
 test/box/cfg.test.lua           |  10 ++
 test/box/fat_tx.lua             |  30 ++++
 test/box/request_limit.result   | 321 ++++++++++++++++++++++++++++++++++++++++
 test/box/request_limit.test.lua | 155 +++++++++++++++++++
 14 files changed, 704 insertions(+), 68 deletions(-)
 create mode 100644 test/box/fat_tx.lua
 create mode 100644 test/box/request_limit.result
 create mode 100644 test/box/request_limit.test.lua

-- 
2.15.1 (Apple Git-101)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH v2 1/4] iproto: fix error with input discarding
  2018-04-23 17:05 [PATCH v2 0/4] IProto fixes and iproto_msg_max option Vladislav Shpilevoy
@ 2018-04-23 17:05 ` Vladislav Shpilevoy
  2018-04-24 15:35   ` Vladimir Davydov
  2018-04-23 17:05 ` [PATCH v2 2/4] iproto: fix error with unstoppable batching Vladislav Shpilevoy
                   ` (2 subsequent siblings)
  3 siblings, 1 reply; 9+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-23 17:05 UTC (permalink / raw)
  To: tarantool-patches; +Cc: vdavydov.dev

Long-polling request has a feature allowing to discard an input
buffer in a connection before the main request is finished.

Before the patch discard just calls iproto_resume() trying to
notify libev, that is can get new data. But is makes no sense,
because iproto_resume() continues only connections stopped due
to reached limit of maximal requests in fly, but not connections
whose buffer is overflowed. Such stopped connections can be
continued only after a request is complete.

To continue read from the connection whose buffer was freed by
discarding long-poll it is necessary to explicitly feed to it
EV_READ event. On this event iproto_connection_on_input will read
a blocked data if it exists.
---
 src/box/iproto.cc               |   5 +-
 test/box/request_limit.result   | 122 ++++++++++++++++++++++++++++++++++++++++
 test/box/request_limit.test.lua |  70 +++++++++++++++++++++++
 3 files changed, 195 insertions(+), 2 deletions(-)
 create mode 100644 test/box/request_limit.result
 create mode 100644 test/box/request_limit.test.lua

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 37026984d..a88226a9f 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1084,10 +1084,11 @@ net_discard_input(struct cmsg *m)
 {
 	struct iproto_msg *msg = container_of(m, struct iproto_msg,
 					      discard_input);
+	struct iproto_connection *conn = msg->connection;
 	msg->p_ibuf->rpos += msg->len;
 	msg->len = 0;
-	msg->connection->long_poll_requests++;
-	iproto_resume();
+	conn->long_poll_requests++;
+	ev_feed_event(conn->loop, &conn->input, EV_READ);
 }
 
 static void
diff --git a/test/box/request_limit.result b/test/box/request_limit.result
new file mode 100644
index 000000000..bef998b91
--- /dev/null
+++ b/test/box/request_limit.result
@@ -0,0 +1,122 @@
+test_run = require('test_run').new()
+---
+...
+fiber = require('fiber')
+---
+...
+net_box = require('net.box')
+---
+...
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+---
+...
+conn = net_box.connect(box.cfg.listen)
+---
+...
+conn2 = net_box.connect(box.cfg.listen)
+---
+...
+active = 0
+---
+...
+finished = 0
+---
+...
+continue = false
+---
+...
+limit = 768
+---
+...
+run_max = (limit - 100) / 2
+---
+...
+old_readahead = box.cfg.readahead
+---
+...
+box.cfg{readahead = 9000}
+---
+...
+long_str = string.rep('a', 1000)
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function do_long_f(...)
+	active = active + 1
+	while not continue do
+		fiber.sleep(0.1)
+	end
+	active = active - 1
+	finished = finished + 1
+end;
+---
+...
+function do_long(c)
+	c:call('do_long_f', {long_str})
+end;
+---
+...
+function run_workers(c)
+	finished = 0
+	continue = false
+	for i = 1, run_max do
+		fiber.create(do_long, c)
+	end
+end;
+---
+...
+-- Wait until 'active' stops growing - it means, that the input
+-- is blocked.
+function wait_block()
+	local old_val = -1
+	while old_val ~= active do
+		old_val = active
+		fiber.sleep(0.1)
+	end
+end;
+---
+...
+function wait_finished(needed)
+	continue = true
+	while finished ~= needed do fiber.sleep(0.01) end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+--
+-- Test that message count limit is reachable.
+--
+run_workers(conn)
+---
+...
+run_workers(conn2)
+---
+...
+wait_block()
+---
+...
+active == run_max * 2 or active
+---
+- true
+...
+wait_finished(active)
+---
+...
+conn2:close()
+---
+...
+conn:close()
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
+box.cfg{readahead = old_readahead}
+---
+...
diff --git a/test/box/request_limit.test.lua b/test/box/request_limit.test.lua
new file mode 100644
index 000000000..2bc35d8fa
--- /dev/null
+++ b/test/box/request_limit.test.lua
@@ -0,0 +1,70 @@
+test_run = require('test_run').new()
+
+fiber = require('fiber')
+net_box = require('net.box')
+
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+conn = net_box.connect(box.cfg.listen)
+conn2 = net_box.connect(box.cfg.listen)
+active = 0
+finished = 0
+continue = false
+limit = 768
+run_max = (limit - 100) / 2
+
+old_readahead = box.cfg.readahead
+box.cfg{readahead = 9000}
+long_str = string.rep('a', 1000)
+
+test_run:cmd("setopt delimiter ';'")
+function do_long_f(...)
+	active = active + 1
+	while not continue do
+		fiber.sleep(0.1)
+	end
+	active = active - 1
+	finished = finished + 1
+end;
+
+function do_long(c)
+	c:call('do_long_f', {long_str})
+end;
+
+function run_workers(c)
+	finished = 0
+	continue = false
+	for i = 1, run_max do
+		fiber.create(do_long, c)
+	end
+end;
+
+-- Wait until 'active' stops growing - it means, that the input
+-- is blocked.
+function wait_block()
+	local old_val = -1
+	while old_val ~= active do
+		old_val = active
+		fiber.sleep(0.1)
+	end
+end;
+
+function wait_finished(needed)
+	continue = true
+	while finished ~= needed do fiber.sleep(0.01) end
+end;
+test_run:cmd("setopt delimiter ''");
+
+--
+-- Test that message count limit is reachable.
+--
+run_workers(conn)
+run_workers(conn2)
+wait_block()
+active == run_max * 2 or active
+wait_finished(active)
+
+conn2:close()
+conn:close()
+
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+box.cfg{readahead = old_readahead}
-- 
2.15.1 (Apple Git-101)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH v2 2/4] iproto: fix error with unstoppable batching
  2018-04-23 17:05 [PATCH v2 0/4] IProto fixes and iproto_msg_max option Vladislav Shpilevoy
  2018-04-23 17:05 ` [PATCH v2 1/4] iproto: fix error with input discarding Vladislav Shpilevoy
@ 2018-04-23 17:05 ` Vladislav Shpilevoy
  2018-04-24 15:36   ` Vladimir Davydov
  2018-04-23 17:05 ` [PATCH v2 3/4] iproto: allow to configure IPROTO_MSG_MAX Vladislav Shpilevoy
  2018-04-23 17:05 ` [PATCH v2 4/4] Allow to configure TX fiber pool size Vladislav Shpilevoy
  3 siblings, 1 reply; 9+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-23 17:05 UTC (permalink / raw)
  To: tarantool-patches; +Cc: vdavydov.dev

IProto connection stops input reading, when active request count
is reached. But when multiple requests are in a batch, the IProto
does not check the limit, so it can be violated.

Lets check the limit during batch parsing after each message too,
not only once before parsing.
---
 src/box/iproto.cc               | 29 +++++++++++++++++++----------
 test/box/request_limit.result   | 20 ++++++++++++++++++++
 test/box/request_limit.test.lua | 10 ++++++++++
 3 files changed, 49 insertions(+), 10 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index a88226a9f..be3c5a1a6 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -582,7 +582,8 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 {
 	int n_requests = 0;
 	bool stop_input = false;
-	while (con->parse_size && stop_input == false) {
+	while (con->parse_size != 0 && stop_input == false &&
+	       !iproto_must_stop_input()) {
 		const char *reqstart = in->wpos - con->parse_size;
 		const char *pos = reqstart;
 		/* Read request length. */
@@ -691,18 +692,26 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
 		int nrd = sio_read(fd, in->wpos, ibuf_unused(in));
 		if (nrd < 0) {                  /* Socket is not ready. */
 			ev_io_start(loop, &con->input);
-			return;
-		}
-		if (nrd == 0) {                 /* EOF */
+			/*
+			 * Socket has no data, but there can be
+			 * non-parsed requests, stopped by
+			 * requests limit. Try to enqueue them, if
+			 * exist.
+			 */
+			if (con->parse_size == 0)
+				return;
+		} else if (nrd == 0) {
+			/* EOF */
 			iproto_connection_close(con);
 			return;
-		}
-		/* Count statistics */
-		rmean_collect(rmean_net, IPROTO_RECEIVED, nrd);
+		} else {
+			/* Count statistics */
+			rmean_collect(rmean_net, IPROTO_RECEIVED, nrd);
 
-		/* Update the read position and connection state. */
-		in->wpos += nrd;
-		con->parse_size += nrd;
+			/* Update the read position and connection state. */
+			in->wpos += nrd;
+			con->parse_size += nrd;
+		}
 		/* Enqueue all requests which are fully read up. */
 		iproto_enqueue_batch(con, in);
 	} catch (Exception *e) {
diff --git a/test/box/request_limit.result b/test/box/request_limit.result
index bef998b91..2691aa329 100644
--- a/test/box/request_limit.result
+++ b/test/box/request_limit.result
@@ -108,6 +108,26 @@ active == run_max * 2 or active
 wait_finished(active)
 ---
 ...
+--
+-- Test that each message in a batch is checked. When a limit is
+-- reached, other messages must be processed later.
+--
+run_max = limit * 5
+---
+...
+run_workers(conn)
+---
+...
+wait_block()
+---
+...
+active
+---
+- 769
+...
+wait_finished(run_max)
+---
+...
 conn2:close()
 ---
 ...
diff --git a/test/box/request_limit.test.lua b/test/box/request_limit.test.lua
index 2bc35d8fa..bff7b5282 100644
--- a/test/box/request_limit.test.lua
+++ b/test/box/request_limit.test.lua
@@ -63,6 +63,16 @@ wait_block()
 active == run_max * 2 or active
 wait_finished(active)
 
+--
+-- Test that each message in a batch is checked. When a limit is
+-- reached, other messages must be processed later.
+--
+run_max = limit * 5
+run_workers(conn)
+wait_block()
+active
+wait_finished(run_max)
+
 conn2:close()
 conn:close()
 
-- 
2.15.1 (Apple Git-101)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH v2 3/4] iproto: allow to configure IPROTO_MSG_MAX
  2018-04-23 17:05 [PATCH v2 0/4] IProto fixes and iproto_msg_max option Vladislav Shpilevoy
  2018-04-23 17:05 ` [PATCH v2 1/4] iproto: fix error with input discarding Vladislav Shpilevoy
  2018-04-23 17:05 ` [PATCH v2 2/4] iproto: fix error with unstoppable batching Vladislav Shpilevoy
@ 2018-04-23 17:05 ` Vladislav Shpilevoy
  2018-04-23 17:05 ` [PATCH v2 4/4] Allow to configure TX fiber pool size Vladislav Shpilevoy
  3 siblings, 0 replies; 9+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-23 17:05 UTC (permalink / raw)
  To: tarantool-patches; +Cc: vdavydov.dev

IPROTO_MSG_MAX is a constant that restricts count of requests in
fly. It allows to do not produce too many fibers in TX thread,
that would lead to too big overhead on fibers switching, their
stack storing.

But some users have powerful metal on which Tarantool
IPROTO_MSG_MAX constant is not serious. The patch exposes it as
a configuration runtime parameter.

'iproto_msg_max' is its name. If a user sees that IProto thread
is stuck due to too many requests, it can change iproto_msg_max
in runtime, and IProto thread immediately starts processing
pending requests.

'iproto_msg_max' can be decreased, but obviously it can not stop
already runned requests, so if now in IProto thread request count
is > new 'iproto_msg_max' value, then it takes some time until
some requests will be finished.

Closes #3320
---
 src/box/box.cc                  |   7 ++
 src/box/box.h                   |   1 +
 src/box/iproto.cc               |  77 ++++++++++++++++------
 src/box/iproto.h                |   3 +
 src/box/lua/cfg.cc              |  12 ++++
 src/box/lua/load_cfg.lua        |   3 +
 test/app-tap/init_script.result |  73 ++++++++++-----------
 test/box/admin.result           |   2 +
 test/box/cfg.result             |  27 ++++++++
 test/box/cfg.test.lua           |  10 +++
 test/box/request_limit.result   | 140 +++++++++++++++++++++++++++++++++++++++-
 test/box/request_limit.test.lua |  64 +++++++++++++++++-
 12 files changed, 361 insertions(+), 58 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index d2dfc5b5f..80684ad48 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -759,6 +759,12 @@ box_set_vinyl_timeout(void)
 	vinyl_engine_set_timeout(vinyl,	cfg_getd("vinyl_timeout"));
 }
 
+void
+box_set_iproto_msg_max(void)
+{
+	iproto_set_msg_max(cfg_geti("iproto_msg_max"));
+}
+
 /* }}} configuration bindings */
 
 /**
@@ -1735,6 +1741,7 @@ box_cfg_xc(void)
 	box_check_instance_uuid(&instance_uuid);
 	box_check_replicaset_uuid(&replicaset_uuid);
 
+	box_set_iproto_msg_max();
 	box_set_checkpoint_count();
 	box_set_too_long_threshold();
 	box_set_replication_timeout();
diff --git a/src/box/box.h b/src/box/box.h
index c9b5aad01..712e21191 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -181,6 +181,7 @@ void box_set_vinyl_cache(void);
 void box_set_vinyl_timeout(void);
 void box_set_replication_timeout(void);
 void box_set_replication_connect_quorum(void);
+void box_set_iproto_msg_max(void);
 
 extern "C" {
 #endif /* defined(__cplusplus) */
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index be3c5a1a6..2bf07c99f 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -63,8 +63,10 @@
 #include "applier.h"
 #include "cfg.h"
 
-/* The number of iproto messages in flight */
-enum { IPROTO_MSG_MAX = 768 };
+enum {
+	IPROTO_MSG_MAX_DEFAULT = 768,
+	IPROTO_MSG_MAX_MIN = 2,
+};
 
 /**
  * Network readahead. A signed integer to avoid
@@ -83,6 +85,9 @@ enum { IPROTO_MSG_MAX = 768 };
  */
 unsigned iproto_readahead = 16320;
 
+/* The maximal number of iproto messages in flight. */
+static int iproto_msg_max = IPROTO_MSG_MAX_DEFAULT;
+
 /**
  * How big is a buffer which needs to be shrunk before
  * it is put back into buffer cache.
@@ -381,7 +386,7 @@ iproto_must_stop_input()
 {
 	size_t connection_count = mempool_count(&iproto_connection_pool);
 	size_t request_count = mempool_count(&iproto_msg_pool);
-	return request_count > connection_count + IPROTO_MSG_MAX;
+	return request_count > connection_count + iproto_msg_max;
 }
 
 /**
@@ -1632,7 +1637,7 @@ net_cord_f(va_list /* ap */)
 	cbus_endpoint_create(&endpoint, "net", fiber_schedule_cb, fiber());
 	/* Create a pipe to "tx" thread. */
 	cpipe_create(&tx_pipe, "tx");
-	cpipe_set_max_input(&tx_pipe, IPROTO_MSG_MAX/2);
+	cpipe_set_max_input(&tx_pipe, iproto_msg_max / 2);
 	/* Process incomming messages. */
 	cbus_loop(&endpoint);
 
@@ -1660,29 +1665,47 @@ iproto_init()
 
 	/* Create a pipe to "net" thread. */
 	cpipe_create(&net_pipe, "net");
-	cpipe_set_max_input(&net_pipe, IPROTO_MSG_MAX/2);
+	cpipe_set_max_input(&net_pipe, iproto_msg_max / 2);
 }
 
 /**
  * Since there is no way to "synchronously" change the
- * state of the io thread, to change the listen port
- * we need to bounce a couple of messages to and
- * from this thread.
+ * state of the io thread, to change the listen port or max
+ * message count in fly it is needed to bounce a couple of
+ * messages to and from this thread.
  */
-struct iproto_bind_msg: public cbus_call_msg
+struct iproto_cfg_msg: public cbus_call_msg
 {
+	/** New URI to bind to. */
 	const char *uri;
+	bool need_update_uri;
+
+	/** New IProto max message count in fly. */
+	int iproto_msg_max;
+	bool need_update_msg_max;
 };
 
+static struct iproto_cfg_msg cfg_msg;
+
 static int
-iproto_do_bind(struct cbus_call_msg *m)
+iproto_do_cfg(struct cbus_call_msg *m)
 {
-	const char *uri  = ((struct iproto_bind_msg *) m)->uri;
+	assert(m == &cfg_msg);
 	try {
-		if (evio_service_is_active(&binary))
-			evio_service_stop(&binary);
-		if (uri != NULL)
-			evio_service_bind(&binary, uri);
+		if (cfg_msg.need_update_uri) {
+			if (evio_service_is_active(&binary))
+				evio_service_stop(&binary);
+			if (cfg_msg.uri != NULL)
+				evio_service_bind(&binary, cfg_msg.uri);
+		}
+		if (cfg_msg.need_update_msg_max) {
+			cpipe_set_max_input(&tx_pipe,
+					    cfg_msg.iproto_msg_max / 2);
+			int old = iproto_msg_max;
+			iproto_msg_max = cfg_msg.iproto_msg_max;
+			if (old < iproto_msg_max)
+				iproto_resume();
+		}
 	} catch (Exception *e) {
 		return -1;
 	}
@@ -1705,9 +1728,10 @@ iproto_do_listen(struct cbus_call_msg *m)
 void
 iproto_bind(const char *uri)
 {
-	static struct iproto_bind_msg m;
-	m.uri = uri;
-	if (cbus_call(&net_pipe, &tx_pipe, &m, iproto_do_bind,
+	memset(&cfg_msg, 0, sizeof(cfg_msg));
+	cfg_msg.need_update_uri = true;
+	cfg_msg.uri = uri;
+	if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg,
 		      NULL, TIMEOUT_INFINITY))
 		diag_raise();
 }
@@ -1733,3 +1757,20 @@ iproto_reset_stat(void)
 {
 	rmean_cleanup(rmean_net);
 }
+
+void
+iproto_set_msg_max(int new_iproto_msg_max)
+{
+	if (new_iproto_msg_max < IPROTO_MSG_MAX_MIN) {
+		tnt_raise(ClientError, ER_CFG, "iproto_msg_max",
+			  tt_sprintf("minimal value is %d",
+				     IPROTO_MSG_MAX_MIN));
+	}
+	memset(&cfg_msg, 0, sizeof(cfg_msg));
+	cfg_msg.need_update_msg_max = true;
+	cfg_msg.iproto_msg_max = new_iproto_msg_max;
+	if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg,
+		      NULL, TIMEOUT_INFINITY))
+		diag_raise();
+	cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2);
+}
diff --git a/src/box/iproto.h b/src/box/iproto.h
index 0268000da..a1dddc405 100644
--- a/src/box/iproto.h
+++ b/src/box/iproto.h
@@ -63,6 +63,9 @@ iproto_bind(const char *uri);
 void
 iproto_listen();
 
+void
+iproto_set_msg_max(int iproto_msg_max);
+
 #endif /* defined(__cplusplus) */
 
 #endif
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 5e88ca348..1fd953011 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -220,6 +220,17 @@ lbox_cfg_set_vinyl_timeout(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_cfg_set_iproto_msg_max(struct lua_State *L)
+{
+	try {
+		box_set_iproto_msg_max();
+	} catch (Exception *) {
+		luaT_error(L);
+	}
+	return 0;
+}
+
 static int
 lbox_cfg_set_worker_pool_threads(struct lua_State *L)
 {
@@ -275,6 +286,7 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_replication_timeout", lbox_cfg_set_replication_timeout},
 		{"cfg_set_replication_connect_quorum",
 			lbox_cfg_set_replication_connect_quorum},
+		{"cfg_set_iproto_msg_max", lbox_cfg_set_iproto_msg_max},
 		{NULL, NULL}
 	};
 
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 3a5a6d46a..6ed30e016 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -63,6 +63,7 @@ local default_cfg = {
     feedback_enabled      = true,
     feedback_host         = "https://feedback.tarantool.io",
     feedback_interval     = 3600,
+    iproto_msg_max        = 768,
 }
 
 -- types of available options
@@ -123,6 +124,7 @@ local template_cfg = {
     feedback_enabled      = 'boolean',
     feedback_host         = 'string',
     feedback_interval     = 'number',
+    iproto_msg_max        = 'number',
 }
 
 local function normalize_uri(port)
@@ -195,6 +197,7 @@ local dynamic_cfg = {
     replication_timeout     = private.cfg_set_replication_timeout,
     replication_connect_quorum = private.cfg_set_replication_connect_quorum,
     replication_skip_conflict = function() end,
+    iproto_msg_max          = private.cfg_set_iproto_msg_max,
 }
 
 local dynamic_cfg_skip_at_load = {
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 5625f1466..741f764af 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -12,42 +12,43 @@ box.cfg
 7	feedback_interval:3600
 8	force_recovery:false
 9	hot_standby:false
-10	listen:port
-11	log:tarantool.log
-12	log_format:plain
-13	log_level:5
-14	log_nonblock:true
-15	memtx_dir:.
-16	memtx_max_tuple_size:1048576
-17	memtx_memory:107374182
-18	memtx_min_tuple_size:16
-19	pid_file:box.pid
-20	read_only:false
-21	readahead:16320
-22	replication_connect_timeout:4
-23	replication_skip_conflict:false
-24	replication_sync_lag:10
-25	replication_timeout:1
-26	rows_per_wal:500000
-27	slab_alloc_factor:1.05
-28	too_long_threshold:0.5
-29	vinyl_bloom_fpr:0.05
-30	vinyl_cache:134217728
-31	vinyl_dir:.
-32	vinyl_max_tuple_size:1048576
-33	vinyl_memory:134217728
-34	vinyl_page_size:8192
-35	vinyl_range_size:1073741824
-36	vinyl_read_threads:1
-37	vinyl_run_count_per_level:2
-38	vinyl_run_size_ratio:3.5
-39	vinyl_timeout:60
-40	vinyl_write_threads:2
-41	wal_dir:.
-42	wal_dir_rescan_delay:2
-43	wal_max_size:268435456
-44	wal_mode:write
-45	worker_pool_threads:4
+10	iproto_msg_max:768
+11	listen:port
+12	log:tarantool.log
+13	log_format:plain
+14	log_level:5
+15	log_nonblock:true
+16	memtx_dir:.
+17	memtx_max_tuple_size:1048576
+18	memtx_memory:107374182
+19	memtx_min_tuple_size:16
+20	pid_file:box.pid
+21	read_only:false
+22	readahead:16320
+23	replication_connect_timeout:4
+24	replication_skip_conflict:false
+25	replication_sync_lag:10
+26	replication_timeout:1
+27	rows_per_wal:500000
+28	slab_alloc_factor:1.05
+29	too_long_threshold:0.5
+30	vinyl_bloom_fpr:0.05
+31	vinyl_cache:134217728
+32	vinyl_dir:.
+33	vinyl_max_tuple_size:1048576
+34	vinyl_memory:134217728
+35	vinyl_page_size:8192
+36	vinyl_range_size:1073741824
+37	vinyl_read_threads:1
+38	vinyl_run_count_per_level:2
+39	vinyl_run_size_ratio:3.5
+40	vinyl_timeout:60
+41	vinyl_write_threads:2
+42	wal_dir:.
+43	wal_dir_rescan_delay:2
+44	wal_max_size:268435456
+45	wal_mode:write
+46	worker_pool_threads:4
 --
 -- Test insert from detached fiber
 --
diff --git a/test/box/admin.result b/test/box/admin.result
index 2168c3adb..7a1432a76 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -36,6 +36,8 @@ cfg_filter(box.cfg)
     - false
   - - hot_standby
     - false
+  - - iproto_msg_max
+    - 768
   - - listen
     - <hidden>
   - - log
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 28449d9cc..c309847ef 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -32,6 +32,8 @@ cfg_filter(box.cfg)
     - false
   - - hot_standby
     - false
+  - - iproto_msg_max
+    - 768
   - - listen
     - <hidden>
   - - log
@@ -129,6 +131,8 @@ cfg_filter(box.cfg)
     - false
   - - hot_standby
     - false
+  - - iproto_msg_max
+    - 768
   - - listen
     - <hidden>
   - - log
@@ -411,6 +415,29 @@ test_run:cmd("cleanup server cfg_tester")
 ---
 - true
 ...
+--
+-- gh-3320: box.cfg{iproto_msg_max}.
+--
+box.cfg{iproto_msg_max = 'invalid'}
+---
+- error: 'Incorrect value for option ''iproto_msg_max'': should be of type number'
+...
+box.cfg{iproto_msg_max = 0}
+---
+- error: 'Incorrect value for option ''iproto_msg_max'': minimal value is 2'
+...
+old = box.cfg.iproto_msg_max
+---
+...
+box.cfg{iproto_msg_max = 2}
+---
+...
+box.cfg{iproto_msg_max = old + 1000}
+---
+...
+box.cfg{iproto_msg_max = old}
+---
+...
 test_run:cmd("clear filter")
 ---
 - true
diff --git a/test/box/cfg.test.lua b/test/box/cfg.test.lua
index a73ae395b..2c40346fe 100644
--- a/test/box/cfg.test.lua
+++ b/test/box/cfg.test.lua
@@ -81,4 +81,14 @@ test_run:cmd("switch default")
 test_run:cmd("stop server cfg_tester")
 test_run:cmd("cleanup server cfg_tester")
 
+--
+-- gh-3320: box.cfg{iproto_msg_max}.
+--
+box.cfg{iproto_msg_max = 'invalid'}
+box.cfg{iproto_msg_max = 0}
+old = box.cfg.iproto_msg_max
+box.cfg{iproto_msg_max = 2}
+box.cfg{iproto_msg_max = old + 1000}
+box.cfg{iproto_msg_max = old}
+
 test_run:cmd("clear filter")
diff --git a/test/box/request_limit.result b/test/box/request_limit.result
index 2691aa329..8f722b1b9 100644
--- a/test/box/request_limit.result
+++ b/test/box/request_limit.result
@@ -25,7 +25,7 @@ finished = 0
 continue = false
 ---
 ...
-limit = 768
+limit = box.cfg.iproto_msg_max
 ---
 ...
 run_max = (limit - 100) / 2
@@ -128,6 +128,142 @@ active
 wait_finished(run_max)
 ---
 ...
+--
+-- gh-3320: allow to change maximal count of messages.
+--
+--
+-- Test minimal iproto msg count.
+--
+box.cfg{iproto_msg_max = 2}
+---
+...
+conn:ping()
+---
+- true
+...
+#conn.space._space:select{} > 0
+---
+- true
+...
+run_max = 15
+---
+...
+run_workers(conn)
+---
+...
+wait_block()
+---
+...
+active
+---
+- 3
+...
+wait_finished(run_max)
+---
+...
+--
+-- Increate maximal message count when nothing is blocked.
+--
+box.cfg{iproto_msg_max = limit * 2}
+---
+...
+run_max = limit * 2 - 100
+---
+...
+run_workers(conn)
+---
+...
+wait_block()
+---
+...
+active == run_max
+---
+- true
+...
+-- Max can be decreased back even if now the limit is violated.
+-- But a new input is blocked in such a case.
+box.cfg{iproto_msg_max = limit}
+---
+...
+old_active = active
+---
+...
+for i = 1, 300 do fiber.create(do_long, conn) end
+---
+...
+-- Afer time active count is not changed - the input is blocked.
+wait_block()
+---
+...
+active == old_active
+---
+- true
+...
+wait_finished(active + 300)
+---
+...
+--
+-- Check that changing iproto_msg_max can resume stopped
+-- connections.
+--
+run_max = limit / 2 + 100
+---
+...
+run_workers(conn)
+---
+...
+run_workers(conn2)
+---
+...
+wait_block()
+---
+...
+active >= limit
+---
+- true
+...
+active < run_max * 2
+---
+- true
+...
+box.cfg{iproto_msg_max = limit * 2}
+---
+...
+wait_block()
+---
+...
+active == run_max * 2
+---
+- true
+...
+wait_finished(active)
+---
+...
+--
+-- Test TX fiber pool size limit.
+--
+run_max = 2500
+---
+...
+box.cfg{iproto_msg_max = 5000}
+---
+...
+run_workers(conn)
+---
+...
+run_workers(conn2)
+---
+...
+wait_block()
+---
+...
+active
+---
+- 4096
+...
+wait_finished(run_max * 2)
+---
+...
 conn2:close()
 ---
 ...
@@ -137,6 +273,6 @@ conn:close()
 box.schema.user.revoke('guest', 'read,write,execute', 'universe')
 ---
 ...
-box.cfg{readahead = old_readahead}
+box.cfg{readahead = old_readahead, iproto_msg_max = limit}
 ---
 ...
diff --git a/test/box/request_limit.test.lua b/test/box/request_limit.test.lua
index bff7b5282..286df0455 100644
--- a/test/box/request_limit.test.lua
+++ b/test/box/request_limit.test.lua
@@ -9,7 +9,7 @@ conn2 = net_box.connect(box.cfg.listen)
 active = 0
 finished = 0
 continue = false
-limit = 768
+limit = box.cfg.iproto_msg_max
 run_max = (limit - 100) / 2
 
 old_readahead = box.cfg.readahead
@@ -73,8 +73,68 @@ wait_block()
 active
 wait_finished(run_max)
 
+--
+-- gh-3320: allow to change maximal count of messages.
+--
+
+--
+-- Test minimal iproto msg count.
+--
+box.cfg{iproto_msg_max = 2}
+conn:ping()
+#conn.space._space:select{} > 0
+run_max = 15
+run_workers(conn)
+wait_block()
+active
+wait_finished(run_max)
+
+--
+-- Increate maximal message count when nothing is blocked.
+--
+box.cfg{iproto_msg_max = limit * 2}
+run_max = limit * 2 - 100
+run_workers(conn)
+wait_block()
+active == run_max
+-- Max can be decreased back even if now the limit is violated.
+-- But a new input is blocked in such a case.
+box.cfg{iproto_msg_max = limit}
+old_active = active
+for i = 1, 300 do fiber.create(do_long, conn) end
+-- Afer time active count is not changed - the input is blocked.
+wait_block()
+active == old_active
+wait_finished(active + 300)
+
+--
+-- Check that changing iproto_msg_max can resume stopped
+-- connections.
+--
+run_max = limit / 2 + 100
+run_workers(conn)
+run_workers(conn2)
+wait_block()
+active >= limit
+active < run_max * 2
+box.cfg{iproto_msg_max = limit * 2}
+wait_block()
+active == run_max * 2
+wait_finished(active)
+
+--
+-- Test TX fiber pool size limit.
+--
+run_max = 2500
+box.cfg{iproto_msg_max = 5000}
+run_workers(conn)
+run_workers(conn2)
+wait_block()
+active
+wait_finished(run_max * 2)
+
 conn2:close()
 conn:close()
 
 box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-box.cfg{readahead = old_readahead}
+box.cfg{readahead = old_readahead, iproto_msg_max = limit}
-- 
2.15.1 (Apple Git-101)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH v2 4/4] Allow to configure TX fiber pool size
  2018-04-23 17:05 [PATCH v2 0/4] IProto fixes and iproto_msg_max option Vladislav Shpilevoy
                   ` (2 preceding siblings ...)
  2018-04-23 17:05 ` [PATCH v2 3/4] iproto: allow to configure IPROTO_MSG_MAX Vladislav Shpilevoy
@ 2018-04-23 17:05 ` Vladislav Shpilevoy
  2018-04-24 15:43   ` Vladimir Davydov
  3 siblings, 1 reply; 9+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-23 17:05 UTC (permalink / raw)
  To: tarantool-patches; +Cc: vdavydov.dev

TX fiber pool size provides fibers to execute transactions and
remote requests, so it is linked with maximal remote request
count, that is allowed to be altered in the previous patch. Lets
do the same for fiber pool size.

Follow up #3320
---
 src/box/box.cc                  |  9 ++++++++-
 src/box/lua/load_cfg.lua        |  2 ++
 src/fiber_pool.h                |  6 +++++-
 test/box/fat_tx.lua             | 30 ++++++++++++++++++++++++++++
 test/box/request_limit.result   | 43 +++++++++++++++++++++++++++++++++++++++++
 test/box/request_limit.test.lua | 15 ++++++++++++++
 6 files changed, 103 insertions(+), 2 deletions(-)
 create mode 100644 test/box/fat_tx.lua

diff --git a/src/box/box.cc b/src/box/box.cc
index 80684ad48..d2552709a 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1716,8 +1716,15 @@ box_is_configured(void)
 static inline void
 box_cfg_xc(void)
 {
+	int fiber_pool_size =
+		cfg_geti_default("fiber_pool_size", FIBER_POOL_SIZE_DEFAULT);
+	if (fiber_pool_size < FIBER_POOL_SIZE_MIN) {
+		tnt_raise(ClientError, ER_CFG, "fiber_pool_size",
+			  tt_sprintf("minimal value is %d",
+				     FIBER_POOL_SIZE_MIN));
+	}
 	/* Join the cord interconnect as "tx" endpoint. */
-	fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE,
+	fiber_pool_create(&tx_fiber_pool, "tx", fiber_pool_size,
 			  FIBER_POOL_IDLE_TIMEOUT);
 	/* Add an extra endpoint for WAL wake up/rollback messages. */
 	cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 6ed30e016..b8af58450 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -64,6 +64,7 @@ local default_cfg = {
     feedback_host         = "https://feedback.tarantool.io",
     feedback_interval     = 3600,
     iproto_msg_max        = 768,
+    fiber_pool_size       = 4096,
 }
 
 -- types of available options
@@ -125,6 +126,7 @@ local template_cfg = {
     feedback_host         = 'string',
     feedback_interval     = 'number',
     iproto_msg_max        = 'number',
+    fiber_pool_size       = 'number',
 }
 
 local function normalize_uri(port)
diff --git a/src/fiber_pool.h b/src/fiber_pool.h
index d6a95105b..9f99da5ea 100644
--- a/src/fiber_pool.h
+++ b/src/fiber_pool.h
@@ -41,7 +41,11 @@
 extern "C" {
 #endif /* defined(__cplusplus) */
 
-enum { FIBER_POOL_SIZE = 4096, FIBER_POOL_IDLE_TIMEOUT = 1 };
+enum {
+	FIBER_POOL_SIZE_DEFAULT = 4096,
+	FIBER_POOL_SIZE_MIN = 100,
+	FIBER_POOL_IDLE_TIMEOUT = 1
+};
 
 /**
  * A pool of worker fibers to handle messages,
diff --git a/test/box/fat_tx.lua b/test/box/fat_tx.lua
new file mode 100644
index 000000000..6ef5c8f96
--- /dev/null
+++ b/test/box/fat_tx.lua
@@ -0,0 +1,30 @@
+#!/usr/bin/env tarantool
+os = require('os')
+
+box.cfg{
+    listen = os.getenv("LISTEN"),
+    pid_file = "tarantool.pid",
+    fiber_pool_size = 5000,
+    iproto_msg_max = 5000
+}
+
+require('console').listen(os.getenv('ADMIN'))
+net_box = require('net.box')
+fiber = require('fiber')
+
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+conn = net_box.connect(box.cfg.listen)
+net_box = require('net.box')
+active = 0
+continue = false
+function do_long_f()
+	active = active + 1
+	while not continue do
+		fiber.sleep(0.1)
+	end
+	active = active - 1
+end
+
+function do_long(c)
+	c:call('do_long_f')
+end
diff --git a/test/box/request_limit.result b/test/box/request_limit.result
index 8f722b1b9..068ba3766 100644
--- a/test/box/request_limit.result
+++ b/test/box/request_limit.result
@@ -270,6 +270,49 @@ conn2:close()
 conn:close()
 ---
 ...
+--
+-- Test that new fiber pool limit can be reached.
+--
+test_run:cmd('create server fat_tx with script = "box/fat_tx.lua"')
+---
+- true
+...
+test_run:cmd("start server fat_tx")
+---
+- true
+...
+test_run:cmd('switch fat_tx')
+---
+- true
+...
+box.cfg.fiber_pool_size
+---
+- 5000
+...
+for i = 1, 5000 do fiber.create(do_long, conn) end
+---
+...
+while active ~= 5000 do fiber.sleep(0.01) end
+---
+...
+continue = true
+---
+...
+while active ~= 0 do fiber.sleep(0.01) end
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server fat_tx")
+---
+- true
+...
+test_run:cmd("cleanup server fat_tx")
+---
+- true
+...
 box.schema.user.revoke('guest', 'read,write,execute', 'universe')
 ---
 ...
diff --git a/test/box/request_limit.test.lua b/test/box/request_limit.test.lua
index 286df0455..f02d0dcdd 100644
--- a/test/box/request_limit.test.lua
+++ b/test/box/request_limit.test.lua
@@ -136,5 +136,20 @@ wait_finished(run_max * 2)
 conn2:close()
 conn:close()
 
+--
+-- Test that new fiber pool limit can be reached.
+--
+test_run:cmd('create server fat_tx with script = "box/fat_tx.lua"')
+test_run:cmd("start server fat_tx")
+test_run:cmd('switch fat_tx')
+box.cfg.fiber_pool_size
+for i = 1, 5000 do fiber.create(do_long, conn) end
+while active ~= 5000 do fiber.sleep(0.01) end
+continue = true
+while active ~= 0 do fiber.sleep(0.01) end
+test_run:cmd("switch default")
+test_run:cmd("stop server fat_tx")
+test_run:cmd("cleanup server fat_tx")
+
 box.schema.user.revoke('guest', 'read,write,execute', 'universe')
 box.cfg{readahead = old_readahead, iproto_msg_max = limit}
-- 
2.15.1 (Apple Git-101)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v2 1/4] iproto: fix error with input discarding
  2018-04-23 17:05 ` [PATCH v2 1/4] iproto: fix error with input discarding Vladislav Shpilevoy
@ 2018-04-24 15:35   ` Vladimir Davydov
  0 siblings, 0 replies; 9+ messages in thread
From: Vladimir Davydov @ 2018-04-24 15:35 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

On Mon, Apr 23, 2018 at 08:05:01PM +0300, Vladislav Shpilevoy wrote:
> Long-polling request has a feature allowing to discard an input
> buffer in a connection before the main request is finished.
> 
> Before the patch discard just calls iproto_resume() trying to
> notify libev, that is can get new data. But is makes no sense,
> because iproto_resume() continues only connections stopped due
> to reached limit of maximal requests in fly, but not connections
> whose buffer is overflowed. Such stopped connections can be
> continued only after a request is complete.
> 
> To continue read from the connection whose buffer was freed by
> discarding long-poll it is necessary to explicitly feed to it
> EV_READ event. On this event iproto_connection_on_input will read
> a blocked data if it exists.
> ---
>  src/box/iproto.cc               |   5 +-
>  test/box/request_limit.result   | 122 ++++++++++++++++++++++++++++++++++++++++
>  test/box/request_limit.test.lua |  70 +++++++++++++++++++++++
>  3 files changed, 195 insertions(+), 2 deletions(-)
>  create mode 100644 test/box/request_limit.result
>  create mode 100644 test/box/request_limit.test.lua

Ack

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v2 2/4] iproto: fix error with unstoppable batching
  2018-04-23 17:05 ` [PATCH v2 2/4] iproto: fix error with unstoppable batching Vladislav Shpilevoy
@ 2018-04-24 15:36   ` Vladimir Davydov
  0 siblings, 0 replies; 9+ messages in thread
From: Vladimir Davydov @ 2018-04-24 15:36 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

On Mon, Apr 23, 2018 at 08:05:02PM +0300, Vladislav Shpilevoy wrote:
> IProto connection stops input reading, when active request count
> is reached. But when multiple requests are in a batch, the IProto
> does not check the limit, so it can be violated.
> 
> Lets check the limit during batch parsing after each message too,
> not only once before parsing.
> ---
>  src/box/iproto.cc               | 29 +++++++++++++++++++----------
>  test/box/request_limit.result   | 20 ++++++++++++++++++++
>  test/box/request_limit.test.lua | 10 ++++++++++
>  3 files changed, 49 insertions(+), 10 deletions(-)

Ack

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [PATCH v2 4/4] Allow to configure TX fiber pool size
  2018-04-23 17:05 ` [PATCH v2 4/4] Allow to configure TX fiber pool size Vladislav Shpilevoy
@ 2018-04-24 15:43   ` Vladimir Davydov
  2018-04-24 17:33     ` [tarantool-patches] " Vladislav Shpilevoy
  0 siblings, 1 reply; 9+ messages in thread
From: Vladimir Davydov @ 2018-04-24 15:43 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

On Mon, Apr 23, 2018 at 08:05:04PM +0300, Vladislav Shpilevoy wrote:
> TX fiber pool size provides fibers to execute transactions and
> remote requests, so it is linked with maximal remote request
> count, that is allowed to be altered in the previous patch. Lets
> do the same for fiber pool size.
> 
> Follow up #3320
> ---

I don't think that having a separate option for configuring the fiber
pool size is reasonable, because iproto_msg_max and fiber_pool_size are
interconnected. For example, it's pointless to set iproto_msg_max to
1000 and fiber_pool_size to 100 or vice versa. I guess we should leave
just one option that would tweak both iproto_msg_max and fiber_pool_size
under the hood. Not sure about that though.

>  src/box/box.cc                  |  9 ++++++++-
>  src/box/lua/load_cfg.lua        |  2 ++
>  src/fiber_pool.h                |  6 +++++-
>  test/box/fat_tx.lua             | 30 ++++++++++++++++++++++++++++
>  test/box/request_limit.result   | 43 +++++++++++++++++++++++++++++++++++++++++
>  test/box/request_limit.test.lua | 15 ++++++++++++++
>  6 files changed, 103 insertions(+), 2 deletions(-)
>  create mode 100644 test/box/fat_tx.lua
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 80684ad48..d2552709a 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1716,8 +1716,15 @@ box_is_configured(void)
>  static inline void
>  box_cfg_xc(void)
>  {
> +	int fiber_pool_size =
> +		cfg_geti_default("fiber_pool_size", FIBER_POOL_SIZE_DEFAULT);
> +	if (fiber_pool_size < FIBER_POOL_SIZE_MIN) {
> +		tnt_raise(ClientError, ER_CFG, "fiber_pool_size",
> +			  tt_sprintf("minimal value is %d",
> +				     FIBER_POOL_SIZE_MIN));
> +	}
>  	/* Join the cord interconnect as "tx" endpoint. */
> -	fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE,
> +	fiber_pool_create(&tx_fiber_pool, "tx", fiber_pool_size,
>  			  FIBER_POOL_IDLE_TIMEOUT);

Can't you make this option dynamic?

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [tarantool-patches] Re: [PATCH v2 4/4] Allow to configure TX fiber pool size
  2018-04-24 15:43   ` Vladimir Davydov
@ 2018-04-24 17:33     ` Vladislav Shpilevoy
  0 siblings, 0 replies; 9+ messages in thread
From: Vladislav Shpilevoy @ 2018-04-24 17:33 UTC (permalink / raw)
  To: tarantool-patches, Vladimir Davydov

Hello. Thanks for review!

On 24/04/2018 18:43, Vladimir Davydov wrote:
> On Mon, Apr 23, 2018 at 08:05:04PM +0300, Vladislav Shpilevoy wrote:
>> TX fiber pool size provides fibers to execute transactions and
>> remote requests, so it is linked with maximal remote request
>> count, that is allowed to be altered in the previous patch. Lets
>> do the same for fiber pool size.
>>
>> Follow up #3320
>> ---
> 
> I don't think that having a separate option for configuring the fiber
> pool size is reasonable, because iproto_msg_max and fiber_pool_size are
> interconnected. For example, it's pointless to set iproto_msg_max to
> 1000 and fiber_pool_size to 100 or vice versa. I guess we should leave
> just one option that would tweak both iproto_msg_max and fiber_pool_size
> under the hood. Not sure about that though.

You are right. I squashed last 2 commits on the branch, and removed
fiber_pool_size option.

> 
>>   src/box/box.cc                  |  9 ++++++++-
>>   src/box/lua/load_cfg.lua        |  2 ++
>>   src/fiber_pool.h                |  6 +++++-
>>   test/box/fat_tx.lua             | 30 ++++++++++++++++++++++++++++
>>   test/box/request_limit.result   | 43 +++++++++++++++++++++++++++++++++++++++++
>>   test/box/request_limit.test.lua | 15 ++++++++++++++
>>   6 files changed, 103 insertions(+), 2 deletions(-)
>>   create mode 100644 test/box/fat_tx.lua
>>
>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index 80684ad48..d2552709a 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -1716,8 +1716,15 @@ box_is_configured(void)
>>   static inline void
>>   box_cfg_xc(void)
>>   {
>> +	int fiber_pool_size =
>> +		cfg_geti_default("fiber_pool_size", FIBER_POOL_SIZE_DEFAULT);
>> +	if (fiber_pool_size < FIBER_POOL_SIZE_MIN) {
>> +		tnt_raise(ClientError, ER_CFG, "fiber_pool_size",
>> +			  tt_sprintf("minimal value is %d",
>> +				     FIBER_POOL_SIZE_MIN));
>> +	}
>>   	/* Join the cord interconnect as "tx" endpoint. */
>> -	fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE,
>> +	fiber_pool_create(&tx_fiber_pool, "tx", fiber_pool_size,
>>   			  FIBER_POOL_IDLE_TIMEOUT);
> 
> Can't you make this option dynamic?

Sure, done on the branch. Full diff of last 2 squashed commits is below.

iproto: allow to configure IPROTO_MSG_MAX

IPROTO_MSG_MAX is a constant that restricts count of requests in
fly. It allows to do not produce too many fibers in TX thread,
that would lead to too big overhead on fibers switching, their
stack storing.

But some users have powerful metal on which Tarantool
IPROTO_MSG_MAX constant is not serious. The patch exposes it as
a configuration runtime parameter.

'iproto_msg_max' is its name. If a user sees that IProto thread
is stuck due to too many requests, it can change iproto_msg_max
in runtime, and IProto thread immediately starts processing
pending requests.

'iproto_msg_max' can be decreased, but obviously it can not stop
already runned requests, so if now in IProto thread request count
is > new 'iproto_msg_max' value, then it takes some time until
some requests will be finished.

`iproto_msg_max` automatically increases fiber pool size, when
needed.

Closes #3320
---
  src/box/box.cc                  |  11 +++-
  src/box/box.h                   |   1 +
  src/box/iproto.cc               |  77 +++++++++++++++++-----
  src/box/iproto.h                |   3 +
  src/box/lua/cfg.cc              |  12 ++++
  src/box/lua/load_cfg.lua        |   3 +
  src/fiber_pool.c                |  14 ++++
  src/fiber_pool.h                |  14 +++-
  test/app-tap/init_script.result |  73 +++++++++++----------
  test/box/admin.result           |   2 +
  test/box/cfg.result             |  27 ++++++++
  test/box/cfg.test.lua           |  10 +++
  test/box/request_limit.result   | 141 +++++++++++++++++++++++++++++++++++++++-
  test/box/request_limit.test.lua |  65 +++++++++++++++++-
  14 files changed, 393 insertions(+), 60 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index d2dfc5b5f..6e2dddcf8 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -759,6 +759,14 @@ box_set_vinyl_timeout(void)
  	vinyl_engine_set_timeout(vinyl,	cfg_getd("vinyl_timeout"));
  }
  
+void
+box_set_iproto_msg_max(void)
+{
+	int new_iproto_msg_max = cfg_geti("iproto_msg_max");
+	iproto_set_msg_max(new_iproto_msg_max);
+	fiber_pool_set_max_size(&tx_fiber_pool, new_iproto_msg_max);
+}
+
  /* }}} configuration bindings */
  
  /**
@@ -1711,7 +1719,7 @@ static inline void
  box_cfg_xc(void)
  {
  	/* Join the cord interconnect as "tx" endpoint. */
-	fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE,
+	fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE_DEFAULT,
  			  FIBER_POOL_IDLE_TIMEOUT);
  	/* Add an extra endpoint for WAL wake up/rollback messages. */
  	cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
@@ -1735,6 +1743,7 @@ box_cfg_xc(void)
  	box_check_instance_uuid(&instance_uuid);
  	box_check_replicaset_uuid(&replicaset_uuid);
  
+	box_set_iproto_msg_max();
  	box_set_checkpoint_count();
  	box_set_too_long_threshold();
  	box_set_replication_timeout();
diff --git a/src/box/box.h b/src/box/box.h
index c9b5aad01..712e21191 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -181,6 +181,7 @@ void box_set_vinyl_cache(void);
  void box_set_vinyl_timeout(void);
  void box_set_replication_timeout(void);
  void box_set_replication_connect_quorum(void);
+void box_set_iproto_msg_max(void);
  
  extern "C" {
  #endif /* defined(__cplusplus) */
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index be3c5a1a6..51add7d41 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -63,8 +63,10 @@
  #include "applier.h"
  #include "cfg.h"
  
-/* The number of iproto messages in flight */
-enum { IPROTO_MSG_MAX = 768 };
+enum {
+	IPROTO_MSG_MAX_DEFAULT = 768,
+	IPROTO_MSG_MAX_MIN = 2,
+};
  
  /**
   * Network readahead. A signed integer to avoid
@@ -83,6 +85,9 @@ enum { IPROTO_MSG_MAX = 768 };
   */
  unsigned iproto_readahead = 16320;
  
+/* The maximal number of iproto messages in fly. */
+static int iproto_msg_max = IPROTO_MSG_MAX_DEFAULT;
+
  /**
   * How big is a buffer which needs to be shrunk before
   * it is put back into buffer cache.
@@ -381,7 +386,7 @@ iproto_must_stop_input()
  {
  	size_t connection_count = mempool_count(&iproto_connection_pool);
  	size_t request_count = mempool_count(&iproto_msg_pool);
-	return request_count > connection_count + IPROTO_MSG_MAX;
+	return request_count > connection_count + iproto_msg_max;
  }
  
  /**
@@ -1632,7 +1637,7 @@ net_cord_f(va_list /* ap */)
  	cbus_endpoint_create(&endpoint, "net", fiber_schedule_cb, fiber());
  	/* Create a pipe to "tx" thread. */
  	cpipe_create(&tx_pipe, "tx");
-	cpipe_set_max_input(&tx_pipe, IPROTO_MSG_MAX/2);
+	cpipe_set_max_input(&tx_pipe, iproto_msg_max / 2);
  	/* Process incomming messages. */
  	cbus_loop(&endpoint);
  
@@ -1660,29 +1665,47 @@ iproto_init()
  
  	/* Create a pipe to "net" thread. */
  	cpipe_create(&net_pipe, "net");
-	cpipe_set_max_input(&net_pipe, IPROTO_MSG_MAX/2);
+	cpipe_set_max_input(&net_pipe, iproto_msg_max / 2);
  }
  
  /**
   * Since there is no way to "synchronously" change the
- * state of the io thread, to change the listen port
- * we need to bounce a couple of messages to and
- * from this thread.
+ * state of the io thread, to change the listen port or max
+ * message count in fly it is needed to bounce a couple of
+ * messages to and from this thread.
   */
-struct iproto_bind_msg: public cbus_call_msg
+struct iproto_cfg_msg: public cbus_call_msg
  {
+	/** New URI to bind to. */
  	const char *uri;
+	bool need_update_uri;
+
+	/** New IProto max message count in fly. */
+	int iproto_msg_max;
+	bool need_update_msg_max;
  };
  
+static struct iproto_cfg_msg cfg_msg;
+
  static int
-iproto_do_bind(struct cbus_call_msg *m)
+iproto_do_cfg(struct cbus_call_msg *m)
  {
-	const char *uri  = ((struct iproto_bind_msg *) m)->uri;
+	assert(m == &cfg_msg);
  	try {
-		if (evio_service_is_active(&binary))
-			evio_service_stop(&binary);
-		if (uri != NULL)
-			evio_service_bind(&binary, uri);
+		if (cfg_msg.need_update_uri) {
+			if (evio_service_is_active(&binary))
+				evio_service_stop(&binary);
+			if (cfg_msg.uri != NULL)
+				evio_service_bind(&binary, cfg_msg.uri);
+		}
+		if (cfg_msg.need_update_msg_max) {
+			cpipe_set_max_input(&tx_pipe,
+					    cfg_msg.iproto_msg_max / 2);
+			int old = iproto_msg_max;
+			iproto_msg_max = cfg_msg.iproto_msg_max;
+			if (old < iproto_msg_max)
+				iproto_resume();
+		}
  	} catch (Exception *e) {
  		return -1;
  	}
@@ -1705,9 +1728,10 @@ iproto_do_listen(struct cbus_call_msg *m)
  void
  iproto_bind(const char *uri)
  {
-	static struct iproto_bind_msg m;
-	m.uri = uri;
-	if (cbus_call(&net_pipe, &tx_pipe, &m, iproto_do_bind,
+	memset(&cfg_msg, 0, sizeof(cfg_msg));
+	cfg_msg.need_update_uri = true;
+	cfg_msg.uri = uri;
+	if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg,
  		      NULL, TIMEOUT_INFINITY))
  		diag_raise();
  }
@@ -1733,3 +1757,20 @@ iproto_reset_stat(void)
  {
  	rmean_cleanup(rmean_net);
  }
+
+void
+iproto_set_msg_max(int new_iproto_msg_max)
+{
+	if (new_iproto_msg_max < IPROTO_MSG_MAX_MIN) {
+		tnt_raise(ClientError, ER_CFG, "iproto_msg_max",
+			  tt_sprintf("minimal value is %d",
+				     IPROTO_MSG_MAX_MIN));
+	}
+	memset(&cfg_msg, 0, sizeof(cfg_msg));
+	cfg_msg.need_update_msg_max = true;
+	cfg_msg.iproto_msg_max = new_iproto_msg_max;
+	if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg,
+		      NULL, TIMEOUT_INFINITY))
+		diag_raise();
+	cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2);
+}
diff --git a/src/box/iproto.h b/src/box/iproto.h
index 0268000da..a1dddc405 100644
--- a/src/box/iproto.h
+++ b/src/box/iproto.h
@@ -63,6 +63,9 @@ iproto_bind(const char *uri);
  void
  iproto_listen();
  
+void
+iproto_set_msg_max(int iproto_msg_max);
+
  #endif /* defined(__cplusplus) */
  
  #endif
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 5e88ca348..1fd953011 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -220,6 +220,17 @@ lbox_cfg_set_vinyl_timeout(struct lua_State *L)
  	return 0;
  }
  
+static int
+lbox_cfg_set_iproto_msg_max(struct lua_State *L)
+{
+	try {
+		box_set_iproto_msg_max();
+	} catch (Exception *) {
+		luaT_error(L);
+	}
+	return 0;
+}
+
  static int
  lbox_cfg_set_worker_pool_threads(struct lua_State *L)
  {
@@ -275,6 +286,7 @@ box_lua_cfg_init(struct lua_State *L)
  		{"cfg_set_replication_timeout", lbox_cfg_set_replication_timeout},
  		{"cfg_set_replication_connect_quorum",
  			lbox_cfg_set_replication_connect_quorum},
+		{"cfg_set_iproto_msg_max", lbox_cfg_set_iproto_msg_max},
  		{NULL, NULL}
  	};
  
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 3a5a6d46a..6ed30e016 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -63,6 +63,7 @@ local default_cfg = {
      feedback_enabled      = true,
      feedback_host         = "https://feedback.tarantool.io",
      feedback_interval     = 3600,
+    iproto_msg_max        = 768,
  }
  
  -- types of available options
@@ -123,6 +124,7 @@ local template_cfg = {
      feedback_enabled      = 'boolean',
      feedback_host         = 'string',
      feedback_interval     = 'number',
+    iproto_msg_max        = 'number',
  }
  
  local function normalize_uri(port)
@@ -195,6 +197,7 @@ local dynamic_cfg = {
      replication_timeout     = private.cfg_set_replication_timeout,
      replication_connect_quorum = private.cfg_set_replication_connect_quorum,
      replication_skip_conflict = function() end,
+    iproto_msg_max          = private.cfg_set_iproto_msg_max,
  }
  
  local dynamic_cfg_skip_at_load = {
diff --git a/src/fiber_pool.c b/src/fiber_pool.c
index aa8b19510..3b9718ad3 100644
--- a/src/fiber_pool.c
+++ b/src/fiber_pool.c
@@ -136,6 +136,20 @@ fiber_pool_cb(ev_loop *loop, struct ev_watcher *watcher, int events)
  	}
  }
  
+void
+fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size)
+{
+	if (new_max_size < FIBER_POOL_SIZE_DEFAULT)
+		new_max_size = FIBER_POOL_SIZE_DEFAULT;
+
+	if (new_max_size > pool->max_size) {
+		pool->max_size = new_max_size;
+		cbus_process(&pool->endpoint);
+	} else {
+		pool->max_size = new_max_size;
+	}
+}
+
  void
  fiber_pool_create(struct fiber_pool *pool, const char *name, int max_pool_size,
  		  float idle_timeout)
diff --git a/src/fiber_pool.h b/src/fiber_pool.h
index d6a95105b..2a04d6063 100644
--- a/src/fiber_pool.h
+++ b/src/fiber_pool.h
@@ -41,7 +41,10 @@
  extern "C" {
  #endif /* defined(__cplusplus) */
  
-enum { FIBER_POOL_SIZE = 4096, FIBER_POOL_IDLE_TIMEOUT = 1 };
+enum {
+	FIBER_POOL_SIZE_DEFAULT = 4096,
+	FIBER_POOL_IDLE_TIMEOUT = 1
+};
  
  /**
   * A pool of worker fibers to handle messages,
@@ -83,6 +86,15 @@ void
  fiber_pool_create(struct fiber_pool *pool, const char *name, int max_pool_size,
  		  float idle_timeout);
  
+/**
+ * Set maximal fiber pool size. If a new size is bigger than the
+ * current, then pending requests processing is started.
+ * @param pool Fiber pool to set size.
+ * @param new_max_size New maximal size.
+ */
+void
+fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size);
+
  /**
   * Destroy a fiber pool
   */
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 5625f1466..741f764af 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -12,42 +12,43 @@ box.cfg
  7	feedback_interval:3600
  8	force_recovery:false
  9	hot_standby:false
-10	listen:port
-11	log:tarantool.log
-12	log_format:plain
-13	log_level:5
-14	log_nonblock:true
-15	memtx_dir:.
-16	memtx_max_tuple_size:1048576
-17	memtx_memory:107374182
-18	memtx_min_tuple_size:16
-19	pid_file:box.pid
-20	read_only:false
-21	readahead:16320
-22	replication_connect_timeout:4
-23	replication_skip_conflict:false
-24	replication_sync_lag:10
-25	replication_timeout:1
-26	rows_per_wal:500000
-27	slab_alloc_factor:1.05
-28	too_long_threshold:0.5
-29	vinyl_bloom_fpr:0.05
-30	vinyl_cache:134217728
-31	vinyl_dir:.
-32	vinyl_max_tuple_size:1048576
-33	vinyl_memory:134217728
-34	vinyl_page_size:8192
-35	vinyl_range_size:1073741824
-36	vinyl_read_threads:1
-37	vinyl_run_count_per_level:2
-38	vinyl_run_size_ratio:3.5
-39	vinyl_timeout:60
-40	vinyl_write_threads:2
-41	wal_dir:.
-42	wal_dir_rescan_delay:2
-43	wal_max_size:268435456
-44	wal_mode:write
-45	worker_pool_threads:4
+10	iproto_msg_max:768
+11	listen:port
+12	log:tarantool.log
+13	log_format:plain
+14	log_level:5
+15	log_nonblock:true
+16	memtx_dir:.
+17	memtx_max_tuple_size:1048576
+18	memtx_memory:107374182
+19	memtx_min_tuple_size:16
+20	pid_file:box.pid
+21	read_only:false
+22	readahead:16320
+23	replication_connect_timeout:4
+24	replication_skip_conflict:false
+25	replication_sync_lag:10
+26	replication_timeout:1
+27	rows_per_wal:500000
+28	slab_alloc_factor:1.05
+29	too_long_threshold:0.5
+30	vinyl_bloom_fpr:0.05
+31	vinyl_cache:134217728
+32	vinyl_dir:.
+33	vinyl_max_tuple_size:1048576
+34	vinyl_memory:134217728
+35	vinyl_page_size:8192
+36	vinyl_range_size:1073741824
+37	vinyl_read_threads:1
+38	vinyl_run_count_per_level:2
+39	vinyl_run_size_ratio:3.5
+40	vinyl_timeout:60
+41	vinyl_write_threads:2
+42	wal_dir:.
+43	wal_dir_rescan_delay:2
+44	wal_max_size:268435456
+45	wal_mode:write
+46	worker_pool_threads:4
  --
  -- Test insert from detached fiber
  --
diff --git a/test/box/admin.result b/test/box/admin.result
index 2168c3adb..7a1432a76 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -36,6 +36,8 @@ cfg_filter(box.cfg)
      - false
    - - hot_standby
      - false
+  - - iproto_msg_max
+    - 768
    - - listen
      - <hidden>
    - - log
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 28449d9cc..c309847ef 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -32,6 +32,8 @@ cfg_filter(box.cfg)
      - false
    - - hot_standby
      - false
+  - - iproto_msg_max
+    - 768
    - - listen
      - <hidden>
    - - log
@@ -129,6 +131,8 @@ cfg_filter(box.cfg)
      - false
    - - hot_standby
      - false
+  - - iproto_msg_max
+    - 768
    - - listen
      - <hidden>
    - - log
@@ -411,6 +415,29 @@ test_run:cmd("cleanup server cfg_tester")
  ---
  - true
  ...
+--
+-- gh-3320: box.cfg{iproto_msg_max}.
+--
+box.cfg{iproto_msg_max = 'invalid'}
+---
+- error: 'Incorrect value for option ''iproto_msg_max'': should be of type number'
+...
+box.cfg{iproto_msg_max = 0}
+---
+- error: 'Incorrect value for option ''iproto_msg_max'': minimal value is 2'
+...
+old = box.cfg.iproto_msg_max
+---
+...
+box.cfg{iproto_msg_max = 2}
+---
+...
+box.cfg{iproto_msg_max = old + 1000}
+---
+...
+box.cfg{iproto_msg_max = old}
+---
+...
  test_run:cmd("clear filter")
  ---
  - true
diff --git a/test/box/cfg.test.lua b/test/box/cfg.test.lua
index a73ae395b..2c40346fe 100644
--- a/test/box/cfg.test.lua
+++ b/test/box/cfg.test.lua
@@ -81,4 +81,14 @@ test_run:cmd("switch default")
  test_run:cmd("stop server cfg_tester")
  test_run:cmd("cleanup server cfg_tester")
  
+--
+-- gh-3320: box.cfg{iproto_msg_max}.
+--
+box.cfg{iproto_msg_max = 'invalid'}
+box.cfg{iproto_msg_max = 0}
+old = box.cfg.iproto_msg_max
+box.cfg{iproto_msg_max = 2}
+box.cfg{iproto_msg_max = old + 1000}
+box.cfg{iproto_msg_max = old}
+
  test_run:cmd("clear filter")
diff --git a/test/box/request_limit.result b/test/box/request_limit.result
index 2691aa329..5d8ba4e9a 100644
--- a/test/box/request_limit.result
+++ b/test/box/request_limit.result
@@ -25,7 +25,7 @@ finished = 0
  continue = false
  ---
  ...
-limit = 768
+limit = box.cfg.iproto_msg_max
  ---
  ...
  run_max = (limit - 100) / 2
@@ -128,6 +128,143 @@ active
  wait_finished(run_max)
  ---
  ...
+--
+-- gh-3320: allow to change maximal count of messages.
+--
+--
+-- Test minimal iproto msg count.
+--
+box.cfg{iproto_msg_max = 2}
+---
+...
+conn:ping()
+---
+- true
+...
+#conn.space._space:select{} > 0
+---
+- true
+...
+run_max = 15
+---
+...
+run_workers(conn)
+---
+...
+wait_block()
+---
+...
+active
+---
+- 3
+...
+wait_finished(run_max)
+---
+...
+--
+-- Increate maximal message count when nothing is blocked.
+--
+box.cfg{iproto_msg_max = limit * 2}
+---
+...
+run_max = limit * 2 - 100
+---
+...
+run_workers(conn)
+---
+...
+wait_block()
+---
+...
+active == run_max
+---
+- true
+...
+-- Max can be decreased back even if now the limit is violated.
+-- But a new input is blocked in such a case.
+box.cfg{iproto_msg_max = limit}
+---
+...
+old_active = active
+---
+...
+for i = 1, 300 do fiber.create(do_long, conn) end
+---
+...
+-- Afer time active count is not changed - the input is blocked.
+wait_block()
+---
+...
+active == old_active
+---
+- true
+...
+wait_finished(active + 300)
+---
+...
+--
+-- Check that changing iproto_msg_max can resume stopped
+-- connections.
+--
+run_max = limit / 2 + 100
+---
+...
+run_workers(conn)
+---
+...
+run_workers(conn2)
+---
+...
+wait_block()
+---
+...
+active >= limit
+---
+- true
+...
+active < run_max * 2
+---
+- true
+...
+box.cfg{iproto_msg_max = limit * 2}
+---
+...
+wait_block()
+---
+...
+active == run_max * 2
+---
+- true
+...
+wait_finished(active)
+---
+...
+--
+-- Test TX fiber pool size limit. It is increased together with
+-- iproto msg max.
+--
+run_max = 2500
+---
+...
+box.cfg{iproto_msg_max = 5000}
+---
+...
+run_workers(conn)
+---
+...
+run_workers(conn2)
+---
+...
+wait_block()
+---
+...
+active
+---
+- 5000
+...
+wait_finished(run_max * 2)
+---
+...
  conn2:close()
  ---
  ...
@@ -137,6 +274,6 @@ conn:close()
  box.schema.user.revoke('guest', 'read,write,execute', 'universe')
  ---
  ...
-box.cfg{readahead = old_readahead}
+box.cfg{readahead = old_readahead, iproto_msg_max = limit}
  ---
  ...
diff --git a/test/box/request_limit.test.lua b/test/box/request_limit.test.lua
index bff7b5282..3e7ddf79d 100644
--- a/test/box/request_limit.test.lua
+++ b/test/box/request_limit.test.lua
@@ -9,7 +9,7 @@ conn2 = net_box.connect(box.cfg.listen)
  active = 0
  finished = 0
  continue = false
-limit = 768
+limit = box.cfg.iproto_msg_max
  run_max = (limit - 100) / 2
  
  old_readahead = box.cfg.readahead
@@ -73,8 +73,69 @@ wait_block()
  active
  wait_finished(run_max)
  
+--
+-- gh-3320: allow to change maximal count of messages.
+--
+
+--
+-- Test minimal iproto msg count.
+--
+box.cfg{iproto_msg_max = 2}
+conn:ping()
+#conn.space._space:select{} > 0
+run_max = 15
+run_workers(conn)
+wait_block()
+active
+wait_finished(run_max)
+
+--
+-- Increate maximal message count when nothing is blocked.
+--
+box.cfg{iproto_msg_max = limit * 2}
+run_max = limit * 2 - 100
+run_workers(conn)
+wait_block()
+active == run_max
+-- Max can be decreased back even if now the limit is violated.
+-- But a new input is blocked in such a case.
+box.cfg{iproto_msg_max = limit}
+old_active = active
+for i = 1, 300 do fiber.create(do_long, conn) end
+-- Afer time active count is not changed - the input is blocked.
+wait_block()
+active == old_active
+wait_finished(active + 300)
+
+--
+-- Check that changing iproto_msg_max can resume stopped
+-- connections.
+--
+run_max = limit / 2 + 100
+run_workers(conn)
+run_workers(conn2)
+wait_block()
+active >= limit
+active < run_max * 2
+box.cfg{iproto_msg_max = limit * 2}
+wait_block()
+active == run_max * 2
+wait_finished(active)
+
+--
+-- Test TX fiber pool size limit. It is increased together with
+-- iproto msg max.
+--
+run_max = 2500
+box.cfg{iproto_msg_max = 5000}
+run_workers(conn)
+run_workers(conn2)
+wait_block()
+active
+wait_finished(run_max * 2)
+
  conn2:close()
  conn:close()
  
  box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-box.cfg{readahead = old_readahead}
+box.cfg{readahead = old_readahead, iproto_msg_max = limit}

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2018-04-24 17:33 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-04-23 17:05 [PATCH v2 0/4] IProto fixes and iproto_msg_max option Vladislav Shpilevoy
2018-04-23 17:05 ` [PATCH v2 1/4] iproto: fix error with input discarding Vladislav Shpilevoy
2018-04-24 15:35   ` Vladimir Davydov
2018-04-23 17:05 ` [PATCH v2 2/4] iproto: fix error with unstoppable batching Vladislav Shpilevoy
2018-04-24 15:36   ` Vladimir Davydov
2018-04-23 17:05 ` [PATCH v2 3/4] iproto: allow to configure IPROTO_MSG_MAX Vladislav Shpilevoy
2018-04-23 17:05 ` [PATCH v2 4/4] Allow to configure TX fiber pool size Vladislav Shpilevoy
2018-04-24 15:43   ` Vladimir Davydov
2018-04-24 17:33     ` [tarantool-patches] " Vladislav Shpilevoy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox