Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH v2 0/2] IProto fix and net_msg_max option
@ 2018-05-03 21:05 Vladislav Shpilevoy
  2018-05-03 21:05 ` [tarantool-patches] [PATCH v2 1/2] iproto: fix error with unstoppable batching Vladislav Shpilevoy
  2018-05-03 21:05 ` [tarantool-patches] [PATCH v2 2/2] iproto: allow to configure IPROTO_MSG_MAX Vladislav Shpilevoy
  0 siblings, 2 replies; 7+ messages in thread
From: Vladislav Shpilevoy @ 2018-05-03 21:05 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

Branch: http://github.com/tarantool/tarantool/tree/gh-3320-config-msg-max
Issue: https://github.com/tarantool/tarantool/issues/3320

IPROTO_MSG_MAX is a constant that restricts count of requests in fly. It allows
to do not produce too many fibers in TX thread, that would lead to too big
overhead on fibers switching, their stack storing.

But some users have powerful metal on which Tarantool IPROTO_MSG_MAX constant is
not serious, or they want to run more long-poll requests. The patch exposes it
as a configuration runtime parameter.

'net_msg_max' is its name. If a user sees that IProto thread is stuck due to
too many requests, it can change iproto_msg_max in runtime, and IProto thread
immediately starts processing pending requests.

'net_msg_max' can be decreased, but obviously it can not stop already runned
requests, so if now in IProto thread request count is > new 'net_msg_max'
value, then it takes some time until some requests will be finished.

Maximal IProto message count is very linked with TX fiber pool size, that limits
count of fibers produced by remote clients, and transactions. Now it is
configurable too.

In the same patchset a bug is fixed with unstoppable batching ignoring the
message limit on IProto requests, and iproto connection resuming is reworked to
separate connection resuming and new data reading.

Vladislav Shpilevoy (2):
  iproto: fix error with unstoppable batching
  iproto: allow to configure IPROTO_MSG_MAX

 src/box/box.cc                  |  11 +-
 src/box/box.h                   |   1 +
 src/box/iproto.cc               | 220 ++++++++++++++++++++++++++++------------
 src/box/iproto.h                |   3 +
 src/box/lua/cfg.cc              |  12 +++
 src/box/lua/load_cfg.lua        |   3 +
 src/fiber_pool.c                |  14 +++
 src/fiber_pool.h                |  14 ++-
 test/app-tap/init_script.result |  55 +++++-----
 test/box/admin.result           |   2 +
 test/box/cfg.result             |  27 +++++
 test/box/cfg.test.lua           |  10 ++
 test/box/errinj.result          |  69 +++++++++++++
 test/box/errinj.test.lua        |  29 ++++++
 test/box/net_msg_max.result     | 134 +++++++++++++++++++++++-
 test/box/net_msg_max.test.lua   |  72 ++++++++++++-
 16 files changed, 574 insertions(+), 102 deletions(-)

-- 
2.15.1 (Apple Git-101)

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [tarantool-patches] [PATCH v2 1/2] iproto: fix error with unstoppable batching
  2018-05-03 21:05 [tarantool-patches] [PATCH v2 0/2] IProto fix and net_msg_max option Vladislav Shpilevoy
@ 2018-05-03 21:05 ` Vladislav Shpilevoy
  2018-05-04  8:26   ` [tarantool-patches] " Konstantin Osipov
  2018-05-03 21:05 ` [tarantool-patches] [PATCH v2 2/2] iproto: allow to configure IPROTO_MSG_MAX Vladislav Shpilevoy
  1 sibling, 1 reply; 7+ messages in thread
From: Vladislav Shpilevoy @ 2018-05-03 21:05 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

IProto connection stops input reading, when active request count
is reached. But when multiple requests are in a batch, the IProto
does not check the limit, so it can be violated.

Lets check the limit during batch parsing after each message too,
not only once before parsing.
---
 src/box/iproto.cc             | 143 +++++++++++++++++++++++++++---------------
 test/box/errinj.result        |  69 ++++++++++++++++++++
 test/box/errinj.test.lua      |  29 +++++++++
 test/box/net_msg_max.result   |  21 ++++++-
 test/box/net_msg_max.test.lua |  13 +++-
 5 files changed, 222 insertions(+), 53 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 9ccaf1dc7..dd7f97ecf 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -192,9 +192,13 @@ static struct mempool iproto_msg_pool;
 static struct iproto_msg *
 iproto_msg_new(struct iproto_connection *con)
 {
+	ERROR_INJECT(ERRINJ_TESTING, { return NULL; });
 	struct iproto_msg *msg =
-		(struct iproto_msg *) mempool_alloc_xc(&iproto_msg_pool);
-	msg->connection = con;
+		(struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
+	if (msg != NULL)
+		msg->connection = con;
+	else
+		diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
 	return msg;
 }
 
@@ -384,30 +388,6 @@ iproto_check_msg_max()
 	return request_count > IPROTO_MSG_MAX;
 }
 
-/**
- * Throttle the queue to the tx thread and ensure the fiber pool
- * in tx thread is not depleted by a flood of incoming requests:
- * resume a stopped connection only if there is a spare message
- * object in the message pool.
- */
-static void
-iproto_resume()
-{
-	/*
-	 * Most of the time we have nothing to do here: throttling
-	 * is not active.
-	 */
-	if (rlist_empty(&stopped_connections))
-		return;
-	if (iproto_check_msg_max())
-		return;
-
-	struct iproto_connection *con;
-	con = rlist_first_entry(&stopped_connections, struct iproto_connection,
-				in_stop_list);
-	ev_feed_event(con->loop, &con->input, EV_READ);
-}
-
 /**
  * A connection is idle when the client is gone
  * and there are no outstanding msgs in the msg queue.
@@ -575,20 +555,31 @@ iproto_connection_input_buffer(struct iproto_connection *con)
 	return new_ibuf;
 }
 
-/** Enqueue all requests which were read up. */
-static inline void
+/**
+ * Enqueue all requests which were read up. If a requests limit is
+ * reached - stop the connection input even if not the whole batch
+ * is enqueued. Else try to read more feeding read event to the
+ * event loop.
+ * @param con Connection to enqueue in.
+ * @param in Buffer to parse.
+ *
+ * @retval  0 Success.
+ * @retval -1 Invalid MessagePack error.
+ */
+static inline int
 iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 {
 	int n_requests = 0;
 	bool stop_input = false;
-	while (con->parse_size && stop_input == false) {
+	while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) {
 		const char *reqstart = in->wpos - con->parse_size;
 		const char *pos = reqstart;
 		/* Read request length. */
 		if (mp_typeof(*pos) != MP_UINT) {
 			cpipe_flush_input(&tx_pipe);
-			tnt_raise(ClientError, ER_INVALID_MSGPACK,
-				  "packet length");
+			diag_set(ClientError, ER_INVALID_MSGPACK,
+				 "packet length");
+			return -1;
 		}
 		if (mp_check_uint(pos, in->wpos) >= 0)
 			break;
@@ -597,6 +588,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 		if (reqend > in->wpos)
 			break;
 		struct iproto_msg *msg = iproto_msg_new(con);
+		if (msg == NULL) {
+			/*
+			 * Do not tread it as an error - just wait
+			 * until some of requests are finished.
+			 */
+			iproto_connection_stop(con);
+			return 0;
+		}
 		msg->p_ibuf = con->p_ibuf;
 		msg->wpos = con->wpos;
 
@@ -644,9 +643,49 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 		 * requests, keep reading input, if only to avoid
 		 * a deadlock on this connection.
 		 */
-		ev_feed_event(con->loop, &con->input, EV_READ);
+		if (iproto_check_msg_max())
+			iproto_connection_stop(con);
+		else
+			ev_feed_event(con->loop, &con->input, EV_READ);
 	}
 	cpipe_flush_input(&tx_pipe);
+	return 0;
+}
+
+/**
+ * Enqueue connection's pending requests. Completely ressurect the
+ * connection, if it has no more requests, and the limit still is
+ * not reached.
+ */
+static void
+iproto_connection_resume(struct iproto_connection *con)
+{
+	assert(! iproto_check_msg_max());
+	rlist_del(&con->in_stop_list);
+	if (iproto_enqueue_batch(con, con->p_ibuf) != 0) {
+		struct error *e = box_error_last();
+		iproto_write_error(con->input.fd, e, ::schema_version, 0);
+		error_log(e);
+		iproto_connection_close(con);
+	}
+}
+
+/**
+ * Throttle the queue to the tx thread and ensure the fiber pool
+ * in tx thread is not depleted by a flood of incoming requests:
+ * resume a stopped connection only if there is a spare message
+ * object in the message pool.
+ */
+static void
+iproto_resume()
+{
+	while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
+		struct iproto_connection *con =
+			rlist_first_entry(&stopped_connections,
+					  struct iproto_connection,
+					  in_stop_list);
+		iproto_connection_resume(con);
+	}
 }
 
 static void
@@ -657,16 +696,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
 		(struct iproto_connection *) watcher->data;
 	int fd = con->input.fd;
 	assert(fd >= 0);
-	if (! rlist_empty(&con->in_stop_list)) {
-		/* Resumed stopped connection. */
-		rlist_del(&con->in_stop_list);
-		/*
-		 * This connection may have no input, so
-		 * resume one more connection which might have
-		 * input.
-		 */
-		iproto_resume();
-	}
+	assert(rlist_empty(&con->in_stop_list));
 	/*
 	 * Throttle if there are too many pending requests,
 	 * otherwise we might deplete the fiber pool in tx
@@ -703,7 +733,8 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
 		in->wpos += nrd;
 		con->parse_size += nrd;
 		/* Enqueue all requests which are fully read up. */
-		iproto_enqueue_batch(con, in);
+		if (iproto_enqueue_batch(con, in) != 0)
+			diag_raise();
 	} catch (Exception *e) {
 		/* Best effort at sending the error message to the client. */
 		iproto_write_error(fd, e, ::schema_version, 0);
@@ -801,7 +832,11 @@ static struct iproto_connection *
 iproto_connection_new(int fd)
 {
 	struct iproto_connection *con = (struct iproto_connection *)
-		mempool_alloc_xc(&iproto_connection_pool);
+		mempool_alloc(&iproto_connection_pool);
+	if (con == NULL) {
+		diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con");
+		return NULL;
+	}
 	con->input.data = con->output.data = con;
 	con->loop = loop();
 	ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
@@ -1471,7 +1506,8 @@ net_end_join(struct cmsg *m)
 	 * Enqueue any messages if they are in the readahead
 	 * queue. Will simply start input otherwise.
 	 */
-	iproto_enqueue_batch(con, msg->p_ibuf);
+	if (iproto_enqueue_batch(con, msg->p_ibuf) != 0)
+		iproto_connection_close(con);
 }
 
 static void
@@ -1574,20 +1610,29 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
 {
 	(void) addr;
 	(void) addrlen;
-	struct iproto_connection *con;
-
-	con = iproto_connection_new(fd);
+	struct iproto_msg *msg;
+	struct iproto_connection *con = iproto_connection_new(fd);
+	if (con == NULL)
+		goto error_conn;
 	/*
 	 * Ignore msg allocation failure - the queue size is
 	 * fixed so there is a limited number of msgs in
 	 * use, all stored in just a few blocks of the memory pool.
 	 */
-	struct iproto_msg *msg = iproto_msg_new(con);
+	msg = iproto_msg_new(con);
+	if (msg == NULL)
+		goto error_msg;
 	cmsg_init(&msg->base, connect_route);
 	msg->p_ibuf = con->p_ibuf;
 	msg->wpos = con->wpos;
 	msg->close_connection = false;
 	cpipe_push(&tx_pipe, &msg->base);
+	return;
+error_msg:
+	mempool_free(&iproto_connection_pool, con);
+error_conn:
+	close(fd);
+	return;
 }
 
 static struct evio_service binary; /* iproto binary listener */
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 5b4bc23a3..5da3e2642 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -1150,6 +1150,75 @@ cn:close()
 s:drop()
 ---
 ...
+--
+-- In messages memory pool is over, stop the connection, until the
+-- pool has free memory.
+--
+started = 0
+---
+...
+finished = 0
+---
+...
+continue = false
+---
+...
+test_run:cmd('setopt delimiter ";"')
+---
+- true
+...
+function long_poll_f()
+    started = started + 1
+    f = fiber.self()
+    while not continue do fiber.sleep(0.01) end
+    finished = finished + 1
+end;
+---
+...
+test_run:cmd('setopt delimiter ""');
+---
+- true
+...
+cn = net_box.connect(box.cfg.listen)
+---
+...
+function long_poll() cn:call('long_poll_f') end
+---
+...
+_ = fiber.create(long_poll)
+---
+...
+while started ~= 1 do fiber.sleep(0.01) end
+---
+...
+-- Simulate OOM for new requests.
+errinj.set("ERRINJ_TESTING", true)
+---
+- ok
+...
+_ = fiber.create(long_poll)
+---
+...
+fiber.sleep(0.1)
+---
+...
+started == 1
+---
+- true
+...
+continue = true
+---
+...
+errinj.set("ERRINJ_TESTING", false)
+---
+- ok
+...
+while finished ~= 2 do fiber.sleep(0.01) end
+---
+...
+cn:close()
+---
+...
 box.schema.user.revoke('guest', 'read,write,execute','universe')
 ---
 ...
diff --git a/test/box/errinj.test.lua b/test/box/errinj.test.lua
index e1460d1b6..365d535c2 100644
--- a/test/box/errinj.test.lua
+++ b/test/box/errinj.test.lua
@@ -390,6 +390,35 @@ ok, err
 cn:close()
 s:drop()
 
+--
+-- In messages memory pool is over, stop the connection, until the
+-- pool has free memory.
+--
+started = 0
+finished = 0
+continue = false
+test_run:cmd('setopt delimiter ";"')
+function long_poll_f()
+    started = started + 1
+    f = fiber.self()
+    while not continue do fiber.sleep(0.01) end
+    finished = finished + 1
+end;
+test_run:cmd('setopt delimiter ""');
+cn = net_box.connect(box.cfg.listen)
+function long_poll() cn:call('long_poll_f') end
+_ = fiber.create(long_poll)
+while started ~= 1 do fiber.sleep(0.01) end
+-- Simulate OOM for new requests.
+errinj.set("ERRINJ_TESTING", true)
+_ = fiber.create(long_poll)
+fiber.sleep(0.1)
+started == 1
+continue = true
+errinj.set("ERRINJ_TESTING", false)
+while finished ~= 2 do fiber.sleep(0.01) end
+cn:close()
+
 box.schema.user.revoke('guest', 'read,write,execute','universe')
 
 --
diff --git a/test/box/net_msg_max.result b/test/box/net_msg_max.result
index dde2016b7..de22bcbb9 100644
--- a/test/box/net_msg_max.result
+++ b/test/box/net_msg_max.result
@@ -70,10 +70,13 @@ end;
 ...
 -- Wait until 'active' stops growing - it means, that the input
 -- is blocked.
+-- No more messages.
 function wait_active(value)
 	while value ~= active do
 		fiber.sleep(0.01)
 	end
+	fiber.sleep(0.01)
+	assert(value == active)
 end;
 ---
 ...
@@ -99,11 +102,23 @@ run_workers(conn2)
 wait_active(run_max * 2)
 ---
 ...
-active == run_max * 2 or active
+wait_finished(active)
 ---
-- true
 ...
-wait_finished(active)
+--
+-- Test that each message in a batch is checked. When a limit is
+-- reached, other messages must be processed later.
+--
+run_max = limit * 5
+---
+...
+run_workers(conn)
+---
+...
+wait_active(limit + 1)
+---
+...
+wait_finished(run_max)
 ---
 ...
 conn2:close()
diff --git a/test/box/net_msg_max.test.lua b/test/box/net_msg_max.test.lua
index 560e37017..39f8f53f7 100644
--- a/test/box/net_msg_max.test.lua
+++ b/test/box/net_msg_max.test.lua
@@ -44,6 +44,9 @@ function wait_active(value)
 	while value ~= active do
 		fiber.sleep(0.01)
 	end
+	fiber.sleep(0.01)
+-- No more messages.
+	assert(value == active)
 end;
 
 function wait_finished(needed)
@@ -58,9 +61,17 @@ test_run:cmd("setopt delimiter ''");
 run_workers(conn)
 run_workers(conn2)
 wait_active(run_max * 2)
-active == run_max * 2 or active
 wait_finished(active)
 
+--
+-- Test that each message in a batch is checked. When a limit is
+-- reached, other messages must be processed later.
+--
+run_max = limit * 5
+run_workers(conn)
+wait_active(limit + 1)
+wait_finished(run_max)
+
 conn2:close()
 conn:close()
 
-- 
2.15.1 (Apple Git-101)

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [tarantool-patches] [PATCH v2 2/2] iproto: allow to configure IPROTO_MSG_MAX
  2018-05-03 21:05 [tarantool-patches] [PATCH v2 0/2] IProto fix and net_msg_max option Vladislav Shpilevoy
  2018-05-03 21:05 ` [tarantool-patches] [PATCH v2 1/2] iproto: fix error with unstoppable batching Vladislav Shpilevoy
@ 2018-05-03 21:05 ` Vladislav Shpilevoy
  2018-05-04  8:46   ` [tarantool-patches] " Konstantin Osipov
  1 sibling, 1 reply; 7+ messages in thread
From: Vladislav Shpilevoy @ 2018-05-03 21:05 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

IPROTO_MSG_MAX is a constant that restricts count of requests in
fly. It allows to do not produce too many fibers in TX thread,
that would lead to too big overhead on fibers switching, their
stack storing.

But some users have powerful metal on which Tarantool
IPROTO_MSG_MAX constant is not serious. The patch exposes it as
a configuration runtime parameter.

'net_msg_max' is its name. If a user sees that IProto thread
is stuck due to too many requests, it can change iproto_msg_max
in runtime, and IProto thread immediately starts processing
pending requests.

'net_msg_max' can be decreased, but obviously it can not stop
already runned requests, so if now in IProto thread request count
is > new 'net_msg_max' value, then it takes some time until
some requests will be finished.

`net_msg_max` automatically increases fiber pool size, when
needed.

Closes #3320
---
 src/box/box.cc                  |  11 +++-
 src/box/box.h                   |   1 +
 src/box/iproto.cc               |  77 +++++++++++++++++++-------
 src/box/iproto.h                |   3 ++
 src/box/lua/cfg.cc              |  12 +++++
 src/box/lua/load_cfg.lua        |   3 ++
 src/fiber_pool.c                |  14 +++++
 src/fiber_pool.h                |  14 ++++-
 test/app-tap/init_script.result |  55 +++++++++----------
 test/box/admin.result           |   2 +
 test/box/cfg.result             |  27 ++++++++++
 test/box/cfg.test.lua           |  10 ++++
 test/box/net_msg_max.result     | 117 +++++++++++++++++++++++++++++++++++++++-
 test/box/net_msg_max.test.lua   |  59 +++++++++++++++++++-
 14 files changed, 354 insertions(+), 51 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index d2dfc5b5f..0d1f0e6ae 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -759,6 +759,14 @@ box_set_vinyl_timeout(void)
 	vinyl_engine_set_timeout(vinyl,	cfg_getd("vinyl_timeout"));
 }
 
+void
+box_set_net_msg_max(void)
+{
+	int new_iproto_msg_max = cfg_geti("net_msg_max");
+	iproto_set_msg_max(new_iproto_msg_max);
+	fiber_pool_set_max_size(&tx_fiber_pool, new_iproto_msg_max);
+}
+
 /* }}} configuration bindings */
 
 /**
@@ -1711,7 +1719,7 @@ static inline void
 box_cfg_xc(void)
 {
 	/* Join the cord interconnect as "tx" endpoint. */
-	fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE,
+	fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE_DEFAULT,
 			  FIBER_POOL_IDLE_TIMEOUT);
 	/* Add an extra endpoint for WAL wake up/rollback messages. */
 	cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
@@ -1735,6 +1743,7 @@ box_cfg_xc(void)
 	box_check_instance_uuid(&instance_uuid);
 	box_check_replicaset_uuid(&replicaset_uuid);
 
+	box_set_net_msg_max();
 	box_set_checkpoint_count();
 	box_set_too_long_threshold();
 	box_set_replication_timeout();
diff --git a/src/box/box.h b/src/box/box.h
index c9b5aad01..7726cb4f3 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -181,6 +181,7 @@ void box_set_vinyl_cache(void);
 void box_set_vinyl_timeout(void);
 void box_set_replication_timeout(void);
 void box_set_replication_connect_quorum(void);
+void box_set_net_msg_max(void);
 
 extern "C" {
 #endif /* defined(__cplusplus) */
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index dd7f97ecf..3175839cf 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -63,8 +63,10 @@
 #include "applier.h"
 #include "cfg.h"
 
-/* The number of iproto messages in flight */
-enum { IPROTO_MSG_MAX = 768 };
+enum {
+	IPROTO_MSG_MAX_DEFAULT = 768,
+	IPROTO_MSG_MAX_MIN = 2,
+};
 
 /**
  * Network readahead. A signed integer to avoid
@@ -83,6 +85,9 @@ enum { IPROTO_MSG_MAX = 768 };
  */
 unsigned iproto_readahead = 16320;
 
+/* The maximal number of iproto messages in fly. */
+static int iproto_msg_max = IPROTO_MSG_MAX_DEFAULT;
+
 /**
  * How big is a buffer which needs to be shrunk before
  * it is put back into buffer cache.
@@ -385,7 +390,7 @@ static inline bool
 iproto_check_msg_max()
 {
 	size_t request_count = mempool_count(&iproto_msg_pool);
-	return request_count > IPROTO_MSG_MAX;
+	return request_count > (size_t) iproto_msg_max;
 }
 
 /**
@@ -1666,7 +1671,7 @@ net_cord_f(va_list /* ap */)
 	cbus_endpoint_create(&endpoint, "net", fiber_schedule_cb, fiber());
 	/* Create a pipe to "tx" thread. */
 	cpipe_create(&tx_pipe, "tx");
-	cpipe_set_max_input(&tx_pipe, IPROTO_MSG_MAX/2);
+	cpipe_set_max_input(&tx_pipe, iproto_msg_max / 2);
 	/* Process incomming messages. */
 	cbus_loop(&endpoint);
 
@@ -1694,29 +1699,47 @@ iproto_init()
 
 	/* Create a pipe to "net" thread. */
 	cpipe_create(&net_pipe, "net");
-	cpipe_set_max_input(&net_pipe, IPROTO_MSG_MAX/2);
+	cpipe_set_max_input(&net_pipe, iproto_msg_max / 2);
 }
 
 /**
  * Since there is no way to "synchronously" change the
- * state of the io thread, to change the listen port
- * we need to bounce a couple of messages to and
- * from this thread.
+ * state of the io thread, to change the listen port or max
+ * message count in fly it is needed to bounce a couple of
+ * messages to and from this thread.
  */
-struct iproto_bind_msg: public cbus_call_msg
+struct iproto_cfg_msg: public cbus_call_msg
 {
+	/** New URI to bind to. */
 	const char *uri;
+	bool need_update_uri;
+
+	/** New IProto max message count in fly. */
+	int iproto_msg_max;
+	bool need_update_msg_max;
 };
 
+static struct iproto_cfg_msg cfg_msg;
+
 static int
-iproto_do_bind(struct cbus_call_msg *m)
+iproto_do_cfg(struct cbus_call_msg *m)
 {
-	const char *uri  = ((struct iproto_bind_msg *) m)->uri;
+	assert(m == &cfg_msg);
 	try {
-		if (evio_service_is_active(&binary))
-			evio_service_stop(&binary);
-		if (uri != NULL)
-			evio_service_bind(&binary, uri);
+		if (cfg_msg.need_update_uri) {
+			if (evio_service_is_active(&binary))
+				evio_service_stop(&binary);
+			if (cfg_msg.uri != NULL)
+				evio_service_bind(&binary, cfg_msg.uri);
+		}
+		if (cfg_msg.need_update_msg_max) {
+			cpipe_set_max_input(&tx_pipe,
+					    cfg_msg.iproto_msg_max / 2);
+			int old = iproto_msg_max;
+			iproto_msg_max = cfg_msg.iproto_msg_max;
+			if (old < iproto_msg_max)
+				iproto_resume();
+		}
 	} catch (Exception *e) {
 		return -1;
 	}
@@ -1739,9 +1762,10 @@ iproto_do_listen(struct cbus_call_msg *m)
 void
 iproto_bind(const char *uri)
 {
-	static struct iproto_bind_msg m;
-	m.uri = uri;
-	if (cbus_call(&net_pipe, &tx_pipe, &m, iproto_do_bind,
+	memset(&cfg_msg, 0, sizeof(cfg_msg));
+	cfg_msg.need_update_uri = true;
+	cfg_msg.uri = uri;
+	if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg,
 		      NULL, TIMEOUT_INFINITY))
 		diag_raise();
 }
@@ -1767,3 +1791,20 @@ iproto_reset_stat(void)
 {
 	rmean_cleanup(rmean_net);
 }
+
+void
+iproto_set_msg_max(int new_iproto_msg_max)
+{
+	if (new_iproto_msg_max < IPROTO_MSG_MAX_MIN) {
+		tnt_raise(ClientError, ER_CFG, "iproto_msg_max",
+			  tt_sprintf("minimal value is %d",
+				     IPROTO_MSG_MAX_MIN));
+	}
+	memset(&cfg_msg, 0, sizeof(cfg_msg));
+	cfg_msg.need_update_msg_max = true;
+	cfg_msg.iproto_msg_max = new_iproto_msg_max;
+	if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg,
+		      NULL, TIMEOUT_INFINITY))
+		diag_raise();
+	cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2);
+}
diff --git a/src/box/iproto.h b/src/box/iproto.h
index 0268000da..a1dddc405 100644
--- a/src/box/iproto.h
+++ b/src/box/iproto.h
@@ -63,6 +63,9 @@ iproto_bind(const char *uri);
 void
 iproto_listen();
 
+void
+iproto_set_msg_max(int iproto_msg_max);
+
 #endif /* defined(__cplusplus) */
 
 #endif
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 5e88ca348..629ded626 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -220,6 +220,17 @@ lbox_cfg_set_vinyl_timeout(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_cfg_set_net_msg_max(struct lua_State *L)
+{
+	try {
+		box_set_net_msg_max();
+	} catch (Exception *) {
+		luaT_error(L);
+	}
+	return 0;
+}
+
 static int
 lbox_cfg_set_worker_pool_threads(struct lua_State *L)
 {
@@ -275,6 +286,7 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_replication_timeout", lbox_cfg_set_replication_timeout},
 		{"cfg_set_replication_connect_quorum",
 			lbox_cfg_set_replication_connect_quorum},
+		{"cfg_set_net_msg_max", lbox_cfg_set_net_msg_max},
 		{NULL, NULL}
 	};
 
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 3a5a6d46a..5e3efdb4b 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -63,6 +63,7 @@ local default_cfg = {
     feedback_enabled      = true,
     feedback_host         = "https://feedback.tarantool.io",
     feedback_interval     = 3600,
+    net_msg_max           = 768,
 }
 
 -- types of available options
@@ -123,6 +124,7 @@ local template_cfg = {
     feedback_enabled      = 'boolean',
     feedback_host         = 'string',
     feedback_interval     = 'number',
+    net_msg_max           = 'number',
 }
 
 local function normalize_uri(port)
@@ -195,6 +197,7 @@ local dynamic_cfg = {
     replication_timeout     = private.cfg_set_replication_timeout,
     replication_connect_quorum = private.cfg_set_replication_connect_quorum,
     replication_skip_conflict = function() end,
+    net_msg_max             = private.cfg_set_net_msg_max,
 }
 
 local dynamic_cfg_skip_at_load = {
diff --git a/src/fiber_pool.c b/src/fiber_pool.c
index aa8b19510..3b9718ad3 100644
--- a/src/fiber_pool.c
+++ b/src/fiber_pool.c
@@ -136,6 +136,20 @@ fiber_pool_cb(ev_loop *loop, struct ev_watcher *watcher, int events)
 	}
 }
 
+void
+fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size)
+{
+	if (new_max_size < FIBER_POOL_SIZE_DEFAULT)
+		new_max_size = FIBER_POOL_SIZE_DEFAULT;
+
+	if (new_max_size > pool->max_size) {
+		pool->max_size = new_max_size;
+		cbus_process(&pool->endpoint);
+	} else {
+		pool->max_size = new_max_size;
+	}
+}
+
 void
 fiber_pool_create(struct fiber_pool *pool, const char *name, int max_pool_size,
 		  float idle_timeout)
diff --git a/src/fiber_pool.h b/src/fiber_pool.h
index d6a95105b..2a04d6063 100644
--- a/src/fiber_pool.h
+++ b/src/fiber_pool.h
@@ -41,7 +41,10 @@
 extern "C" {
 #endif /* defined(__cplusplus) */
 
-enum { FIBER_POOL_SIZE = 4096, FIBER_POOL_IDLE_TIMEOUT = 1 };
+enum {
+	FIBER_POOL_SIZE_DEFAULT = 4096,
+	FIBER_POOL_IDLE_TIMEOUT = 1
+};
 
 /**
  * A pool of worker fibers to handle messages,
@@ -83,6 +86,15 @@ void
 fiber_pool_create(struct fiber_pool *pool, const char *name, int max_pool_size,
 		  float idle_timeout);
 
+/**
+ * Set maximal fiber pool size. If a new size is bigger than the
+ * current, then pending requests processing is started.
+ * @param pool Fiber pool to set size.
+ * @param new_max_size New maximal size.
+ */
+void
+fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size);
+
 /**
  * Destroy a fiber pool
  */
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 5625f1466..7504a2813 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -21,33 +21,34 @@ box.cfg
 16	memtx_max_tuple_size:1048576
 17	memtx_memory:107374182
 18	memtx_min_tuple_size:16
-19	pid_file:box.pid
-20	read_only:false
-21	readahead:16320
-22	replication_connect_timeout:4
-23	replication_skip_conflict:false
-24	replication_sync_lag:10
-25	replication_timeout:1
-26	rows_per_wal:500000
-27	slab_alloc_factor:1.05
-28	too_long_threshold:0.5
-29	vinyl_bloom_fpr:0.05
-30	vinyl_cache:134217728
-31	vinyl_dir:.
-32	vinyl_max_tuple_size:1048576
-33	vinyl_memory:134217728
-34	vinyl_page_size:8192
-35	vinyl_range_size:1073741824
-36	vinyl_read_threads:1
-37	vinyl_run_count_per_level:2
-38	vinyl_run_size_ratio:3.5
-39	vinyl_timeout:60
-40	vinyl_write_threads:2
-41	wal_dir:.
-42	wal_dir_rescan_delay:2
-43	wal_max_size:268435456
-44	wal_mode:write
-45	worker_pool_threads:4
+19	net_msg_max:768
+20	pid_file:box.pid
+21	read_only:false
+22	readahead:16320
+23	replication_connect_timeout:4
+24	replication_skip_conflict:false
+25	replication_sync_lag:10
+26	replication_timeout:1
+27	rows_per_wal:500000
+28	slab_alloc_factor:1.05
+29	too_long_threshold:0.5
+30	vinyl_bloom_fpr:0.05
+31	vinyl_cache:134217728
+32	vinyl_dir:.
+33	vinyl_max_tuple_size:1048576
+34	vinyl_memory:134217728
+35	vinyl_page_size:8192
+36	vinyl_range_size:1073741824
+37	vinyl_read_threads:1
+38	vinyl_run_count_per_level:2
+39	vinyl_run_size_ratio:3.5
+40	vinyl_timeout:60
+41	vinyl_write_threads:2
+42	wal_dir:.
+43	wal_dir_rescan_delay:2
+44	wal_max_size:268435456
+45	wal_mode:write
+46	worker_pool_threads:4
 --
 -- Test insert from detached fiber
 --
diff --git a/test/box/admin.result b/test/box/admin.result
index 2168c3adb..29c8c592d 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -54,6 +54,8 @@ cfg_filter(box.cfg)
     - 107374182
   - - memtx_min_tuple_size
     - <hidden>
+  - - net_msg_max
+    - 768
   - - pid_file
     - <hidden>
   - - read_only
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 28449d9cc..61c5f79af 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -50,6 +50,8 @@ cfg_filter(box.cfg)
     - 107374182
   - - memtx_min_tuple_size
     - <hidden>
+  - - net_msg_max
+    - 768
   - - pid_file
     - <hidden>
   - - read_only
@@ -147,6 +149,8 @@ cfg_filter(box.cfg)
     - 107374182
   - - memtx_min_tuple_size
     - <hidden>
+  - - net_msg_max
+    - 768
   - - pid_file
     - <hidden>
   - - read_only
@@ -411,6 +415,29 @@ test_run:cmd("cleanup server cfg_tester")
 ---
 - true
 ...
+--
+-- gh-3320: box.cfg{net_msg_max}.
+--
+box.cfg{net_msg_max = 'invalid'}
+---
+- error: 'Incorrect value for option ''net_msg_max'': should be of type number'
+...
+box.cfg{net_msg_max = 0}
+---
+- error: 'Incorrect value for option ''iproto_msg_max'': minimal value is 2'
+...
+old = box.cfg.net_msg_max
+---
+...
+box.cfg{net_msg_max = 2}
+---
+...
+box.cfg{net_msg_max = old + 1000}
+---
+...
+box.cfg{net_msg_max = old}
+---
+...
 test_run:cmd("clear filter")
 ---
 - true
diff --git a/test/box/cfg.test.lua b/test/box/cfg.test.lua
index a73ae395b..2d819c94c 100644
--- a/test/box/cfg.test.lua
+++ b/test/box/cfg.test.lua
@@ -81,4 +81,14 @@ test_run:cmd("switch default")
 test_run:cmd("stop server cfg_tester")
 test_run:cmd("cleanup server cfg_tester")
 
+--
+-- gh-3320: box.cfg{net_msg_max}.
+--
+box.cfg{net_msg_max = 'invalid'}
+box.cfg{net_msg_max = 0}
+old = box.cfg.net_msg_max
+box.cfg{net_msg_max = 2}
+box.cfg{net_msg_max = old + 1000}
+box.cfg{net_msg_max = old}
+
 test_run:cmd("clear filter")
diff --git a/test/box/net_msg_max.result b/test/box/net_msg_max.result
index de22bcbb9..3b515bffd 100644
--- a/test/box/net_msg_max.result
+++ b/test/box/net_msg_max.result
@@ -25,7 +25,7 @@ finished = 0
 continue = false
 ---
 ...
-limit = 768
+limit = box.cfg.net_msg_max
 ---
 ...
 run_max = (limit - 100) / 2
@@ -121,6 +121,119 @@ wait_active(limit + 1)
 wait_finished(run_max)
 ---
 ...
+--
+-- gh-3320: allow to change maximal count of messages.
+--
+--
+-- Test minimal iproto msg count.
+--
+box.cfg{net_msg_max = 2}
+---
+...
+conn:ping()
+---
+- true
+...
+#conn.space._space:select{} > 0
+---
+- true
+...
+run_max = 15
+---
+...
+run_workers(conn)
+---
+...
+wait_active(3)
+---
+...
+active
+---
+- 3
+...
+wait_finished(run_max)
+---
+...
+--
+-- Increate maximal message count when nothing is blocked.
+--
+box.cfg{net_msg_max = limit * 2}
+---
+...
+run_max = limit * 2 - 100
+---
+...
+run_workers(conn)
+---
+...
+wait_active(run_max)
+---
+...
+-- Max can be decreased back even if now the limit is violated.
+-- But a new input is blocked in such a case.
+box.cfg{net_msg_max = limit}
+---
+...
+old_active = active
+---
+...
+for i = 1, 300 do fiber.create(do_long, conn) end
+---
+...
+-- Afer time active count is not changed - the input is blocked.
+wait_active(old_active)
+---
+...
+wait_finished(active + 300)
+---
+...
+--
+-- Check that changing net_msg_max can resume stopped
+-- connections.
+--
+run_max = limit / 2 + 100
+---
+...
+run_workers(conn)
+---
+...
+run_workers(conn2)
+---
+...
+wait_active(limit + 1)
+---
+...
+box.cfg{net_msg_max = limit * 2}
+---
+...
+wait_active(run_max * 2)
+---
+...
+wait_finished(active)
+---
+...
+--
+-- Test TX fiber pool size limit. It is increased together with
+-- iproto msg max.
+--
+run_max = 2500
+---
+...
+box.cfg{net_msg_max = 5000}
+---
+...
+run_workers(conn)
+---
+...
+run_workers(conn2)
+---
+...
+wait_active(5000)
+---
+...
+wait_finished(run_max * 2)
+---
+...
 conn2:close()
 ---
 ...
@@ -130,6 +243,6 @@ conn:close()
 box.schema.user.revoke('guest', 'read,write,execute', 'universe')
 ---
 ...
-box.cfg{readahead = old_readahead}
+box.cfg{readahead = old_readahead, net_msg_max = limit}
 ---
 ...
diff --git a/test/box/net_msg_max.test.lua b/test/box/net_msg_max.test.lua
index 39f8f53f7..887d73ddc 100644
--- a/test/box/net_msg_max.test.lua
+++ b/test/box/net_msg_max.test.lua
@@ -9,7 +9,7 @@ conn2 = net_box.connect(box.cfg.listen)
 active = 0
 finished = 0
 continue = false
-limit = 768
+limit = box.cfg.net_msg_max
 run_max = (limit - 100) / 2
 
 old_readahead = box.cfg.readahead
@@ -72,8 +72,63 @@ run_workers(conn)
 wait_active(limit + 1)
 wait_finished(run_max)
 
+--
+-- gh-3320: allow to change maximal count of messages.
+--
+
+--
+-- Test minimal iproto msg count.
+--
+box.cfg{net_msg_max = 2}
+conn:ping()
+#conn.space._space:select{} > 0
+run_max = 15
+run_workers(conn)
+wait_active(3)
+active
+wait_finished(run_max)
+
+--
+-- Increate maximal message count when nothing is blocked.
+--
+box.cfg{net_msg_max = limit * 2}
+run_max = limit * 2 - 100
+run_workers(conn)
+wait_active(run_max)
+-- Max can be decreased back even if now the limit is violated.
+-- But a new input is blocked in such a case.
+box.cfg{net_msg_max = limit}
+old_active = active
+for i = 1, 300 do fiber.create(do_long, conn) end
+-- Afer time active count is not changed - the input is blocked.
+wait_active(old_active)
+wait_finished(active + 300)
+
+--
+-- Check that changing net_msg_max can resume stopped
+-- connections.
+--
+run_max = limit / 2 + 100
+run_workers(conn)
+run_workers(conn2)
+wait_active(limit + 1)
+box.cfg{net_msg_max = limit * 2}
+wait_active(run_max * 2)
+wait_finished(active)
+
+--
+-- Test TX fiber pool size limit. It is increased together with
+-- iproto msg max.
+--
+run_max = 2500
+box.cfg{net_msg_max = 5000}
+run_workers(conn)
+run_workers(conn2)
+wait_active(5000)
+wait_finished(run_max * 2)
+
 conn2:close()
 conn:close()
 
 box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-box.cfg{readahead = old_readahead}
+box.cfg{readahead = old_readahead, net_msg_max = limit}
-- 
2.15.1 (Apple Git-101)

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [tarantool-patches] Re: [PATCH v2 1/2] iproto: fix error with unstoppable batching
  2018-05-03 21:05 ` [tarantool-patches] [PATCH v2 1/2] iproto: fix error with unstoppable batching Vladislav Shpilevoy
@ 2018-05-04  8:26   ` Konstantin Osipov
  2018-05-04 11:56     ` Vladislav Shpilevoy
  0 siblings, 1 reply; 7+ messages in thread
From: Konstantin Osipov @ 2018-05-04  8:26 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/05/04 00:07]:
> IProto connection stops input reading, when active request count

Vlad, while gerund in English is a valid grammatical form it is
rarely used in colloquial speech. Please try to avoid using it
unless you know what you're doing. Sorry for nitpicking.

Instead of trying to construct a complex sentence, simply try
to split your point in two smaller ones. English, despite being a
member of Indo-European family along with Russian, had lost
grammatical cases and respective endings long time ago. Without case
endings complex sentences are extremely hard to parse,
so they fell out of use.

> is reached. But when multiple requests are in a batch, the IProto
> does not check the limit, so it can be violated.
> 
> Lets check the limit during batch parsing after each message too,
> not only once before parsing.
> ---
>  src/box/iproto.cc             | 143 +++++++++++++++++++++++++++---------------
>  test/box/errinj.result        |  69 ++++++++++++++++++++
>  test/box/errinj.test.lua      |  29 +++++++++
>  test/box/net_msg_max.result   |  21 ++++++-
>  test/box/net_msg_max.test.lua |  13 +++-
>  5 files changed, 222 insertions(+), 53 deletions(-)
> 
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index 9ccaf1dc7..dd7f97ecf 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -192,9 +192,13 @@ static struct mempool iproto_msg_pool;
>  static struct iproto_msg *
>  iproto_msg_new(struct iproto_connection *con)
>  {
> +	ERROR_INJECT(ERRINJ_TESTING, { return NULL; });
>  	struct iproto_msg *msg =
> -		(struct iproto_msg *) mempool_alloc_xc(&iproto_msg_pool);
> -	msg->connection = con;
> +		(struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
> +	if (msg != NULL)
> +		msg->connection = con;
> +	else
> +		diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");

Nitpick: usually handling the error first and dealing with the
rest later makes it easier to understand what's going on.

This idiom would also make the code easier to parse in future, when more
stuff is added to the "else" branch:

if (msg == NULL) {
    diag_set()
    return NULL;
}

msg->connection = con;
return msg;


>  }
>  
> -/** Enqueue all requests which were read up. */
> -static inline void
> +/**
> + * Enqueue all requests which were read up. If a requests limit is

request limit; request is an adjective here - when a noun is used
as an adjective, its singular form is used.

> + * reached - stop the connection input even if not the whole batch
> + * is enqueued. Else try to read more feeding read event to the
> + * event loop.
> + * @param con Connection to enqueue in.
> + * @param in Buffer to parse.
> + *
> + * @retval  0 Success.
> + * @retval -1 Invalid MessagePack error.
> + */
> +static inline int
>  iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>  {
>  	int n_requests = 0;
>  	bool stop_input = false;
> -	while (con->parse_size && stop_input == false) {
> +	while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) {
>  		const char *reqstart = in->wpos - con->parse_size;
>  		const char *pos = reqstart;
>  		/* Read request length. */
>  		if (mp_typeof(*pos) != MP_UINT) {
>  			cpipe_flush_input(&tx_pipe);
> -			tnt_raise(ClientError, ER_INVALID_MSGPACK,
> -				  "packet length");
> +			diag_set(ClientError, ER_INVALID_MSGPACK,
> +				 "packet length");
> +			return -1;
>  		}
>  		if (mp_check_uint(pos, in->wpos) >= 0)
>  			break;
> @@ -597,6 +588,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>  		if (reqend > in->wpos)
>  			break;
>  		struct iproto_msg *msg = iproto_msg_new(con);
> +		if (msg == NULL) {
> +			/*
> +			 * Do not tread it as an error - just wait
> +			 * until some of requests are finished.
> +			 */
> +			iproto_connection_stop(con);
> +			return 0;
> +		}
>  		msg->p_ibuf = con->p_ibuf;
>  		msg->wpos = con->wpos;
>  
> @@ -644,9 +643,49 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>  		 * requests, keep reading input, if only to avoid
>  		 * a deadlock on this connection.
>  		 */
> -		ev_feed_event(con->loop, &con->input, EV_READ);
> +		if (iproto_check_msg_max())
> +			iproto_connection_stop(con);
> +		else
> +			ev_feed_event(con->loop, &con->input, EV_READ);

Looks like a double check for msg max in the same loop. Could you
please try to avoid that?

>  	}
>  	cpipe_flush_input(&tx_pipe);
> +	return 0;
> +}
> +
> +/**
> + * Enqueue connection's pending requests. Completely ressurect the

Please turn on spell checking in your editor.

> + * connection, if it has no more requests, and the limit still is
> + * not reached.
> + */
> +static void
> +iproto_connection_resume(struct iproto_connection *con)
> +{
> +	assert(! iproto_check_msg_max());
> +	rlist_del(&con->in_stop_list);
> +	if (iproto_enqueue_batch(con, con->p_ibuf) != 0) {
> +		struct error *e = box_error_last();
> +		iproto_write_error(con->input.fd, e, ::schema_version, 0);
> +		error_log(e);
> +		iproto_connection_close(con);
> +	}
> +}
> +
> +/**
> + * Throttle the queue to the tx thread and ensure the fiber pool
> + * in tx thread is not depleted by a flood of incoming requests:
> + * resume a stopped connection only if there is a spare message
> + * object in the message pool.
> + */
> +static void
> +iproto_resume()
> +{
> +	while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
> +		struct iproto_connection *con =
> +			rlist_first_entry(&stopped_connections,
> +					  struct iproto_connection,
> +					  in_stop_list);
> +		iproto_connection_resume(con);
> +	}

By the look of it, resume() resumes all stopped connections. 

This is not the case, however, as long at iproto_connection_stop() is made
*after* reading input, so most likely with a request already
sitting in the input buffer. The tricky relationship between
iproto_resume() and the timing of calling iproto_connection_stop() 
requires an explanation. The comment needs a brush-up as well.

>  static void
> @@ -657,16 +696,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
>  		(struct iproto_connection *) watcher->data;
>  	int fd = con->input.fd;
>  	assert(fd >= 0);
> -	if (! rlist_empty(&con->in_stop_list)) {
> -		/* Resumed stopped connection. */
> -		rlist_del(&con->in_stop_list);
> -		/*
> -		 * This connection may have no input, so
> -		 * resume one more connection which might have
> -		 * input.
> -		 */
> -		iproto_resume();
> -	}
> +	assert(rlist_empty(&con->in_stop_list));

Nice.

>  	/*
>  	 * Throttle if there are too many pending requests,
>  	 * otherwise we might deplete the fiber pool in tx
> @@ -703,7 +733,8 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
>  		in->wpos += nrd;
>  		con->parse_size += nrd;
>  		/* Enqueue all requests which are fully read up. */
> -		iproto_enqueue_batch(con, in);
> +		if (iproto_enqueue_batch(con, in) != 0)
> +			diag_raise();
>  	} catch (Exception *e) {
>  		/* Best effort at sending the error message to the client. */
>  		iproto_write_error(fd, e, ::schema_version, 0);
> @@ -801,7 +832,11 @@ static struct iproto_connection *
>  iproto_connection_new(int fd)
>  {
>  	struct iproto_connection *con = (struct iproto_connection *)
> -		mempool_alloc_xc(&iproto_connection_pool);
> +		mempool_alloc(&iproto_connection_pool);
> +	if (con == NULL) {
> +		diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con");
> +		return NULL;
> +	}
>  	con->input.data = con->output.data = con;
>  	con->loop = loop();
>  	ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
> @@ -1471,7 +1506,8 @@ net_end_join(struct cmsg *m)
>  	 * Enqueue any messages if they are in the readahead
>  	 * queue. Will simply start input otherwise.
>  	 */
> -	iproto_enqueue_batch(con, msg->p_ibuf);
> +	if (iproto_enqueue_batch(con, msg->p_ibuf) != 0)
> +		iproto_connection_close(con);
>  }
>  
>  static void
> @@ -1574,20 +1610,29 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
>  {
>  	(void) addr;
>  	(void) addrlen;
> -	struct iproto_connection *con;
> -
> -	con = iproto_connection_new(fd);
> +	struct iproto_msg *msg;
> +	struct iproto_connection *con = iproto_connection_new(fd);
> +	if (con == NULL)
> +		goto error_conn;
>  	/*
>  	 * Ignore msg allocation failure - the queue size is
>  	 * fixed so there is a limited number of msgs in
>  	 * use, all stored in just a few blocks of the memory pool.
>  	 */
> -	struct iproto_msg *msg = iproto_msg_new(con);
> +	msg = iproto_msg_new(con);
> +	if (msg == NULL)
> +		goto error_msg;
>  	cmsg_init(&msg->base, connect_route);
>  	msg->p_ibuf = con->p_ibuf;
>  	msg->wpos = con->wpos;
>  	msg->close_connection = false;
>  	cpipe_push(&tx_pipe, &msg->base);
> +	return;
> +error_msg:
> +	mempool_free(&iproto_connection_pool, con);
> +error_conn:
> +	close(fd);
> +	return;
>  }
>  
>  static struct evio_service binary; /* iproto binary listener */
> diff --git a/test/box/errinj.result b/test/box/errinj.result
> index 5b4bc23a3..5da3e2642 100644
> --- a/test/box/errinj.result
> +++ b/test/box/errinj.result
> @@ -1150,6 +1150,75 @@ cn:close()
>  s:drop()
>  ---
>  ...
> +--
> +-- In messages memory pool is over, stop the connection, until the
> +-- pool has free memory.
> +--
> +started = 0
> +---
> +...
> +finished = 0
> +---
> +...
> +continue = false
> +---
> +...
> +test_run:cmd('setopt delimiter ";"')
> +---
> +- true
> +...
> +function long_poll_f()
> +    started = started + 1
> +    f = fiber.self()
> +    while not continue do fiber.sleep(0.01) end
> +    finished = finished + 1
> +end;
> +---
> +...
> +test_run:cmd('setopt delimiter ""');
> +---
> +- true
> +...
> +cn = net_box.connect(box.cfg.listen)
> +---
> +...
> +function long_poll() cn:call('long_poll_f') end
> +---
> +...
> +_ = fiber.create(long_poll)
> +---
> +...
> +while started ~= 1 do fiber.sleep(0.01) end
> +---
> +...
> +-- Simulate OOM for new requests.
> +errinj.set("ERRINJ_TESTING", true)
> +---
> +- ok
> +...
> +_ = fiber.create(long_poll)
> +---
> +...
> +fiber.sleep(0.1)

Please avoid 0.1 sleeps, entirely. If you're waiting for anything,
please wait in a loop with wait timeout set 0.01 seconds.
> +---
> +...
> +started == 1
> +---
> +- true
> +...
> +continue = true
> +---
> +...
> +errinj.set("ERRINJ_TESTING", false)
> +---
> +- ok
> +...
> +while finished ~= 2 do fiber.sleep(0.01) end

I don't understand how exactly you're testing that the connection
is stopped when the pool is exhausted.

It seems you simply wait for your long-poll fiber to block, then
resume it, then wait for it to finish.

> +---
> +...
> +cn:close()
> +---
> +...
>  box.schema.user.revoke('guest', 'read,write,execute','universe')
>  ---
>  ...
> diff --git a/test/box/errinj.test.lua b/test/box/errinj.test.lua
> index e1460d1b6..365d535c2 100644
> --- a/test/box/errinj.test.lua
> +++ b/test/box/errinj.test.lua
> @@ -390,6 +390,35 @@ ok, err
>  cn:close()
>  s:drop()
>  
> +--
> +-- In messages memory pool is over, stop the connection, until the
> +-- pool has free memory.
> +--
> +started = 0
> +finished = 0
> +continue = false
> +test_run:cmd('setopt delimiter ";"')
> +function long_poll_f()
> +    started = started + 1
> +    f = fiber.self()
> +    while not continue do fiber.sleep(0.01) end
> +    finished = finished + 1
> +end;
> +test_run:cmd('setopt delimiter ""');
> +cn = net_box.connect(box.cfg.listen)
> +function long_poll() cn:call('long_poll_f') end
> +_ = fiber.create(long_poll)
> +while started ~= 1 do fiber.sleep(0.01) end
> +-- Simulate OOM for new requests.
> +errinj.set("ERRINJ_TESTING", true)
> +_ = fiber.create(long_poll)
> +fiber.sleep(0.1)
> +started == 1
> +continue = true
> +errinj.set("ERRINJ_TESTING", false)
> +while finished ~= 2 do fiber.sleep(0.01) end
> +cn:close()
> +
>  box.schema.user.revoke('guest', 'read,write,execute','universe')
>  
>  --
> diff --git a/test/box/net_msg_max.result b/test/box/net_msg_max.result
> index dde2016b7..de22bcbb9 100644
> --- a/test/box/net_msg_max.result
> +++ b/test/box/net_msg_max.result
> @@ -70,10 +70,13 @@ end;
>  ...
>  -- Wait until 'active' stops growing - it means, that the input
>  -- is blocked.
> +-- No more messages.
>  function wait_active(value)
>  	while value ~= active do
>  		fiber.sleep(0.01)
>  	end
> +	fiber.sleep(0.01)
> +	assert(value == active)
>  end;
>  ---
>  ...
> @@ -99,11 +102,23 @@ run_workers(conn2)
>  wait_active(run_max * 2)
>  ---
>  ...
> -active == run_max * 2 or active
> +wait_finished(active)
>  ---
> -- true
>  ...
> -wait_finished(active)
> +--
> +-- Test that each message in a batch is checked. When a limit is
> +-- reached, other messages must be processed later.
> +--
> +run_max = limit * 5
> +---
> +...
> +run_workers(conn)
> +---
> +...
> +wait_active(limit + 1)
> +---
> +...
> +wait_finished(run_max)
>  ---
>  ...
>  conn2:close()
> diff --git a/test/box/net_msg_max.test.lua b/test/box/net_msg_max.test.lua
> index 560e37017..39f8f53f7 100644
> --- a/test/box/net_msg_max.test.lua
> +++ b/test/box/net_msg_max.test.lua
> @@ -44,6 +44,9 @@ function wait_active(value)
>  	while value ~= active do
>  		fiber.sleep(0.01)
>  	end
> +	fiber.sleep(0.01)
> +-- No more messages.
> +	assert(value == active)
>  end;
>  
>  function wait_finished(needed)
> @@ -58,9 +61,17 @@ test_run:cmd("setopt delimiter ''");
>  run_workers(conn)
>  run_workers(conn2)
>  wait_active(run_max * 2)
> -active == run_max * 2 or active
>  wait_finished(active)
>  
> +--
> +-- Test that each message in a batch is checked. When a limit is
> +-- reached, other messages must be processed later.
> +--
> +run_max = limit * 5
> +run_workers(conn)
> +wait_active(limit + 1)
> +wait_finished(run_max)
> +
>  conn2:close()
>  conn:close()

Thank you for working on this patch,

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [tarantool-patches] Re: [PATCH v2 2/2] iproto: allow to configure IPROTO_MSG_MAX
  2018-05-03 21:05 ` [tarantool-patches] [PATCH v2 2/2] iproto: allow to configure IPROTO_MSG_MAX Vladislav Shpilevoy
@ 2018-05-04  8:46   ` Konstantin Osipov
  2018-05-04 11:56     ` Vladislav Shpilevoy
  0 siblings, 1 reply; 7+ messages in thread
From: Konstantin Osipov @ 2018-05-04  8:46 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/05/04 00:07]:

> -struct iproto_bind_msg: public cbus_call_msg
> +struct iproto_cfg_msg: public cbus_call_msg
>  {
> +	/** New URI to bind to. */
>  	const char *uri;
> +	bool need_update_uri;
> +
> +	/** New IProto max message count in fly. */
> +	int iproto_msg_max;
> +	bool need_update_msg_max;
>  };

need_update -> update

> +void
> +fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size)
> +{
> +	if (new_max_size < FIBER_POOL_SIZE_DEFAULT)
> +		new_max_size = FIBER_POOL_SIZE_DEFAULT;
> +
> +	if (new_max_size > pool->max_size) {
> +		pool->max_size = new_max_size;
> +		cbus_process(&pool->endpoint);
> +	} else {
> +		pool->max_size = new_max_size;
> +	}

Why do you need cbus_process here? This is error-prone. Please
remove.

> +--
> +-- Test TX fiber pool size limit. It is increased together with
> +-- iproto msg max.
> +--
> +run_max = 2500
> +---
> +...
> +box.cfg{net_msg_max = 5000}

Please reduce the number of messages/workers in your tests to several or
maybe a dozen, not hundreds or thousands. By having a lot of
fibers/messages you don't test anything new, only make
the test run longer and harder to debug.

The test would win from adding more randomness and more tries to
changes in msg max: e.g. why not increase/decrease msg max a few
times under unceasing load? This would test both growing the fiber
pool and shrinking it.

> +---
> +...
> +run_workers(conn)
> +---
> +...
> +run_workers(conn2)
> +---
> +...
> +wait_active(5000)
> +---
> +...
> +wait_finished(run_max * 2)
> +---
> +...
>  conn2:close()
>  ---
>  ...
> @@ -130,6 +243,6 @@ conn:close()
>  box.schema.user.revoke('guest', 'read,write,execute', 'universe')
>  ---
>  ...
> -box.cfg{readahead = old_readahead}
> +box.cfg{readahead = old_readahead, net_msg_max = limit}
>  ---
>  ...
> diff --git a/test/box/net_msg_max.test.lua b/test/box/net_msg_max.test.lua
> index 39f8f53f7..887d73ddc 100644
> --- a/test/box/net_msg_max.test.lua
> +++ b/test/box/net_msg_max.test.lua
> @@ -9,7 +9,7 @@ conn2 = net_box.connect(box.cfg.listen)
>  active = 0
>  finished = 0
>  continue = false
> -limit = 768
> +limit = box.cfg.net_msg_max
>  run_max = (limit - 100) / 2
>  
>  old_readahead = box.cfg.readahead
> @@ -72,8 +72,63 @@ run_workers(conn)
>  wait_active(limit + 1)
>  wait_finished(run_max)
>  
> +--
> +-- gh-3320: allow to change maximal count of messages.
> +--
> +
> +--
> +-- Test minimal iproto msg count.
> +--
> +box.cfg{net_msg_max = 2}
> +conn:ping()
> +#conn.space._space:select{} > 0
> +run_max = 15
> +run_workers(conn)
> +wait_active(3)
> +active
> +wait_finished(run_max)
> +
> +--
> +-- Increate maximal message count when nothing is blocked.
> +--
> +box.cfg{net_msg_max = limit * 2}
> +run_max = limit * 2 - 100
> +run_workers(conn)
> +wait_active(run_max)
> +-- Max can be decreased back even if now the limit is violated.
> +-- But a new input is blocked in such a case.
> +box.cfg{net_msg_max = limit}
> +old_active = active
> +for i = 1, 300 do fiber.create(do_long, conn) end
> +-- Afer time active count is not changed - the input is blocked.
> +wait_active(old_active)
> +wait_finished(active + 300)
> +
> +--
> +-- Check that changing net_msg_max can resume stopped
> +-- connections.
> +--
> +run_max = limit / 2 + 100
> +run_workers(conn)
> +run_workers(conn2)
> +wait_active(limit + 1)
> +box.cfg{net_msg_max = limit * 2}
> +wait_active(run_max * 2)
> +wait_finished(active)
> +
> +--
> +-- Test TX fiber pool size limit. It is increased together with
> +-- iproto msg max.
> +--
> +run_max = 2500
> +box.cfg{net_msg_max = 5000}
> +run_workers(conn)
> +run_workers(conn2)
> +wait_active(5000)
> +wait_finished(run_max * 2)
> +

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [tarantool-patches] Re: [PATCH v2 1/2] iproto: fix error with unstoppable batching
  2018-05-04  8:26   ` [tarantool-patches] " Konstantin Osipov
@ 2018-05-04 11:56     ` Vladislav Shpilevoy
  0 siblings, 0 replies; 7+ messages in thread
From: Vladislav Shpilevoy @ 2018-05-04 11:56 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches

Hello. Thanks for review!

On 04/05/2018 11:26, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/05/04 00:07]:
>> IProto connection stops input reading, when active request count
> 
> Vlad, while gerund in English is a valid grammatical form it is
> rarely used in colloquial speech. Please try to avoid using it
> unless you know what you're doing. Sorry for nitpicking.
> 
> Instead of trying to construct a complex sentence, simply try
> to split your point in two smaller ones. English, despite being a
> member of Indo-European family along with Russian, had lost
> grammatical cases and respective endings long time ago. Without case
> endings complex sentences are extremely hard to parse,
> so they fell out of use.

New commit message:

     iproto: fix error with unstoppable batching
     
     IProto connection stops to read input on reached request limit.
     But when multiple requests are in a batch, the IProto does not
     check the limit, so it can be violated.
     
     Lets check the limit during batch parsing after each message too,
     not only once before parsing.

> 
>> is reached. But when multiple requests are in a batch, the IProto
>> does not check the limit, so it can be violated.
>>
>> Lets check the limit during batch parsing after each message too,
>> not only once before parsing.
>> ---
>>   src/box/iproto.cc             | 143 +++++++++++++++++++++++++++---------------
>>   test/box/errinj.result        |  69 ++++++++++++++++++++
>>   test/box/errinj.test.lua      |  29 +++++++++
>>   test/box/net_msg_max.result   |  21 ++++++-
>>   test/box/net_msg_max.test.lua |  13 +++-
>>   5 files changed, 222 insertions(+), 53 deletions(-)
>>
>> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
>> index 9ccaf1dc7..dd7f97ecf 100644
>> --- a/src/box/iproto.cc
>> +++ b/src/box/iproto.cc
>> @@ -192,9 +192,13 @@ static struct mempool iproto_msg_pool;
>>   static struct iproto_msg *
>>   iproto_msg_new(struct iproto_connection *con)
>>   {
>> +	ERROR_INJECT(ERRINJ_TESTING, { return NULL; });
>>   	struct iproto_msg *msg =
>> -		(struct iproto_msg *) mempool_alloc_xc(&iproto_msg_pool);
>> -	msg->connection = con;
>> +		(struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
>> +	if (msg != NULL)
>> +		msg->connection = con;
>> +	else
>> +		diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
> 
> Nitpick: usually handling the error first and dealing with the
> rest later makes it easier to understand what's going on.
> 
> This idiom would also make the code easier to parse in future, when more
> stuff is added to the "else" branch:
> 
> if (msg == NULL) {
>      diag_set()
>      return NULL;
> }
> 
> msg->connection = con;
> return msg;

Done.

         struct iproto_msg *msg =
                 (struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
-       if (msg != NULL)
-               msg->connection = con;
-       else
+       if (msg == NULL) {
                 diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
+               return NULL;
+       }
+       msg->connection = con;
         return msg;
  }

> 
> 
>>   }
>>   
>> -/** Enqueue all requests which were read up. */
>> -static inline void
>> +/**
>> + * Enqueue all requests which were read up. If a requests limit is
> 
> request limit; request is an adjective here - when a noun is used
> as an adjective, its singular form is used.

Done.

- * Enqueue all requests which were read up. If a requests limit is
+ * Enqueue all requests which were read up. If a request limit is

>> @@ -644,9 +643,49 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>>   		 * requests, keep reading input, if only to avoid
>>   		 * a deadlock on this connection.
>>   		 */
>> -		ev_feed_event(con->loop, &con->input, EV_READ);
>> +		if (iproto_check_msg_max())
>> +			iproto_connection_stop(con);
>> +		else
>> +			ev_feed_event(con->loop, &con->input, EV_READ);
> 
> Looks like a double check for msg max in the same loop. Could you
> please try to avoid that?

This check is out of loop. After it is finished, I check if it happened
due to the limit.

> 
>>   	}
>>   	cpipe_flush_input(&tx_pipe);
>> +	return 0;
>> +}
>> +
>> +/**
>> + * Enqueue connection's pending requests. Completely ressurect the
> 
> Please turn on spell checking in your editor.

Done.

- * Enqueue connection's pending requests. Completely ressurect the
+ * Enqueue connection's pending requests. Completely resurrect the

>> +/**
>> + * Throttle the queue to the tx thread and ensure the fiber pool
>> + * in tx thread is not depleted by a flood of incoming requests:
>> + * resume a stopped connection only if there is a spare message
>> + * object in the message pool.
>> + */
>> +static void
>> +iproto_resume()
>> +{
>> +	while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
>> +		struct iproto_connection *con =
>> +			rlist_first_entry(&stopped_connections,
>> +					  struct iproto_connection,
>> +					  in_stop_list);
>> +		iproto_connection_resume(con);
>> +	}
> 
> By the look of it, resume() resumes all stopped connections.
> 
> This is not the case, however, as long at iproto_connection_stop() is made
> *after* reading input, so most likely with a request already
> sitting in the input buffer. The tricky relationship between
> iproto_resume() and the timing of calling iproto_connection_stop()
> requires an explanation. The comment needs a brush-up as well.

Done.

@@ -662,6 +663,10 @@ iproto_connection_resume(struct iproto_connection *con)
  {
         assert(! iproto_check_msg_max());
         rlist_del(&con->in_stop_list);
+       /*
+        * Enqueue_batch() stops the connection again, if the
+        * limit is reached again.
+        */
         if (iproto_enqueue_batch(con, con->p_ibuf) != 0) {
@@ -671,10 +676,14 @@ iproto_connection_resume(struct iproto_connection *con)
  }
  
  /**
- * Throttle the queue to the tx thread and ensure the fiber pool
- * in tx thread is not depleted by a flood of incoming requests:
- * resume a stopped connection only if there is a spare message
- * object in the message pool.
+ * Resume as many connections as possible until a request limit is
+ * reached. Each connection before to start accept new requests
+ * enqueues pending ones. And the input is resumed only if the
+ * limit still is not reached.
+ *
+ * This global connections resuming is needed when one connection
+ * finished a request, and another connection can get the freed
+ * message.
   */
  static void
  iproto_resume()

> 
>>   static void
>> @@ -657,16 +696,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
>>   		(struct iproto_connection *) watcher->data;
>>   	int fd = con->input.fd;
>>   	assert(fd >= 0);
>> -	if (! rlist_empty(&con->in_stop_list)) {
>> -		/* Resumed stopped connection. */
>> -		rlist_del(&con->in_stop_list);
>> -		/*
>> -		 * This connection may have no input, so
>> -		 * resume one more connection which might have
>> -		 * input.
>> -		 */
>> -		iproto_resume();
>> -	}
>> +	assert(rlist_empty(&con->in_stop_list));
> 
> Nice.

👍👍
>> diff --git a/test/box/errinj.result b/test/box/errinj.result
>> index 5b4bc23a3..5da3e2642 100644
>> --- a/test/box/errinj.result
>> +++ b/test/box/errinj.result
>> @@ -1150,6 +1150,75 @@ cn:close()
>>   s:drop()
>>   ---
>>   ...
>> +--
>> +-- In messages memory pool is over, stop the connection, until the
>> +-- pool has free memory.
>> +--
>> +started = 0
>> +---
>> +...
>> +finished = 0
>> +---
>> +...
>> +continue = false
>> +---
>> +...
>> +test_run:cmd('setopt delimiter ";"')
>> +---
>> +- true
>> +...
>> +function long_poll_f()
>> +    started = started + 1
>> +    f = fiber.self()
>> +    while not continue do fiber.sleep(0.01) end
>> +    finished = finished + 1
>> +end;
>> +---
>> +...
>> +test_run:cmd('setopt delimiter ""');
>> +---
>> +- true
>> +...
>> +cn = net_box.connect(box.cfg.listen)
>> +---
>> +...
>> +function long_poll() cn:call('long_poll_f') end
>> +---
>> +...
>> +_ = fiber.create(long_poll)
>> +---
>> +...
>> +while started ~= 1 do fiber.sleep(0.01) end
>> +---
>> +...
>> +-- Simulate OOM for new requests.
>> +errinj.set("ERRINJ_TESTING", true)
>> +---
>> +- ok
>> +...
>> +_ = fiber.create(long_poll)
>> +---
>> +...
>> +fiber.sleep(0.1)
> 
> Please avoid 0.1 sleeps, entirely. If you're waiting for anything,
> please wait in a loop with wait timeout set 0.01 seconds.
>> +---
>> +...
>> +started == 1
>> +---
>> +- true
>> +...
>> +continue = true
>> +---
>> +...
>> +errinj.set("ERRINJ_TESTING", false)
>> +---
>> +- ok
>> +...
>> +while finished ~= 2 do fiber.sleep(0.01) end
> 
> I don't understand how exactly you're testing that the connection
> is stopped when the pool is exhausted.
> 
> It seems you simply wait for your long-poll fiber to block, then
> resume it, then wait for it to finish.

Yes, you are right. I refactored the patch to make it more clear.


diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 1e4a88ecc..baa6bb660 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -190,17 +190,7 @@ struct iproto_msg
  static struct mempool iproto_msg_pool;
  
  static struct iproto_msg *
-iproto_msg_new(struct iproto_connection *con)
-{
-	ERROR_INJECT(ERRINJ_TESTING, { return NULL; });
-	struct iproto_msg *msg =
-		(struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
-	if (msg != NULL)
-		msg->connection = con;
-	else
-		diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
-	return msg;
-}
+iproto_msg_new(struct iproto_connection *con);
  
  /**
   * Resume stopped connections, if any.
@@ -388,6 +378,25 @@ iproto_check_msg_max()
  	return request_count > IPROTO_MSG_MAX;
  }
  
+static struct iproto_msg *
+iproto_msg_new(struct iproto_connection *con)
+{
+	struct iproto_msg *msg =
+		(struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
+	ERROR_INJECT(ERRINJ_TESTING, {
+		mempool_free(&iproto_msg_pool, msg);
+		msg = NULL;
+	});
+	if (msg == NULL) {
+		diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
+		say_warn("can not allocate new net_msg on connection %s",
+			 sio_socketname(con->input.fd));
+		return NULL;
+	}
+	msg->connection = con;
+	return msg;
+}
+
  /**
   * A connection is idle when the client is gone
   * and there are no outstanding msgs in the msg queue.
@@ -414,13 +423,22 @@ iproto_connection_is_idle(struct iproto_connection *con)
  static inline void
  iproto_connection_stop(struct iproto_connection *con)
  {
-	say_warn("net_msg_max limit reached, stopping input on connection %s",
+	say_warn("stopping input on connection %s",
  		 sio_socketname(con->input.fd));
  	assert(rlist_empty(&con->in_stop_list));
  	ev_io_stop(con->loop, &con->input);
  	rlist_add_tail(&stopped_connections, &con->in_stop_list);
  }
  
+static inline void
+iproto_connection_stop_by_limit(struct iproto_connection *con)
+{
+	say_warn("net_msg_max limit reached on connection %s",
+		 sio_socketname(con->input.fd));
+	assert(iproto_check_msg_max());
+	iproto_connection_stop(con);
+}
+
  /**
   * Initiate a connection shutdown. This method may
   * be invoked many times, and does the internal
@@ -644,7 +662,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
  		 * a deadlock on this connection.
  		 */
  		if (iproto_check_msg_max())
-			iproto_connection_stop(con);
+			iproto_connection_stop_by_limit(con);
  		else
  			ev_feed_event(con->loop, &con->input, EV_READ);
  	}
@@ -711,7 +729,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
  	 * thread and deadlock.
  	 */
  	if (iproto_check_msg_max()) {
-		iproto_connection_stop(con);
+		iproto_connection_stop_by_limit(con);
  		return;
  	}

diff --git a/test/box/errinj.test.lua b/test/box/errinj.test.lua
index 365d535c2..8f284f6ab 100644
--- a/test/box/errinj.test.lua
+++ b/test/box/errinj.test.lua
@@ -411,11 +411,21 @@ _ = fiber.create(long_poll)
  while started ~= 1 do fiber.sleep(0.01) end
  -- Simulate OOM for new requests.
  errinj.set("ERRINJ_TESTING", true)
+-- This request tries to allocate a memory for request data, and
+-- fails with this. This stops the connection until an existing
+-- request is finished.
+log = require('log')
+-- Fill log with garbage to avoid accidentaly reading previous
+-- test cases result.
+log.info(string.rep('a', 1000))
  _ = fiber.create(long_poll)
-fiber.sleep(0.1)
+while not test_run:grep_log('default', 'can not allocate new net_msg on connection', 1000) do fiber.sleep(0.01) end
+test_run:grep_log('default', 'stopping input on connection', 1000) ~= nil
  started == 1
  continue = true
  errinj.set("ERRINJ_TESTING", false)
+-- Ensure, that when memory is available again, the pending
+-- request is executed.
  while finished ~= 2 do fiber.sleep(0.01) end
  cn:close()

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [tarantool-patches] Re: [PATCH v2 2/2] iproto: allow to configure IPROTO_MSG_MAX
  2018-05-04  8:46   ` [tarantool-patches] " Konstantin Osipov
@ 2018-05-04 11:56     ` Vladislav Shpilevoy
  0 siblings, 0 replies; 7+ messages in thread
From: Vladislav Shpilevoy @ 2018-05-04 11:56 UTC (permalink / raw)
  To: tarantool-patches, Konstantin Osipov

Hello. Thanks for review!

On 04/05/2018 11:46, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/05/04 00:07]:
> 
>> -struct iproto_bind_msg: public cbus_call_msg
>> +struct iproto_cfg_msg: public cbus_call_msg
>>   {
>> +	/** New URI to bind to. */
>>   	const char *uri;
>> +	bool need_update_uri;
>> +
>> +	/** New IProto max message count in fly. */
>> +	int iproto_msg_max;
>> +	bool need_update_msg_max;
>>   };
> 
> need_update -> update

Done.

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 8f4e27159..83833a2aa 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1738,11 +1738,11 @@ struct iproto_cfg_msg: public cbus_call_msg
  {
         /** New URI to bind to. */
         const char *uri;
-       bool need_update_uri;
+       bool update_uri;
  
         /** New IProto max message count in fly. */
         int iproto_msg_max;
-       bool need_update_msg_max;
+       bool update_msg_max;
  };

> 
>> +void
>> +fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size)
>> +{
>> +	if (new_max_size < FIBER_POOL_SIZE_DEFAULT)
>> +		new_max_size = FIBER_POOL_SIZE_DEFAULT;
>> +
>> +	if (new_max_size > pool->max_size) {
>> +		pool->max_size = new_max_size;
>> +		cbus_process(&pool->endpoint);
>> +	} else {
>> +		pool->max_size = new_max_size;
>> +	}
> 
> Why do you need cbus_process here? This is error-prone. Please
> remove.

Ok.

diff --git a/src/fiber_pool.c b/src/fiber_pool.c
index 3b9718ad3..f60b6f0ff 100644
--- a/src/fiber_pool.c
+++ b/src/fiber_pool.c
@@ -141,13 +141,7 @@ fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size)
  {
         if (new_max_size < FIBER_POOL_SIZE_DEFAULT)
                 new_max_size = FIBER_POOL_SIZE_DEFAULT;
-
-       if (new_max_size > pool->max_size) {
-               pool->max_size = new_max_size;
-               cbus_process(&pool->endpoint);
-       } else {
-               pool->max_size = new_max_size;
-       }
+       pool->max_size = new_max_size;
  }

> 
>> +--
>> +-- Test TX fiber pool size limit. It is increased together with
>> +-- iproto msg max.
>> +--
>> +run_max = 2500
>> +---
>> +...
>> +box.cfg{net_msg_max = 5000}
> 
> Please reduce the number of messages/workers in your tests to several or
> maybe a dozen, not hundreds or thousands. By having a lot of
> fibers/messages you don't test anything new, only make
> the test run longer and harder to debug.

Here I test increased fiber pool size. Its minimal value is 4096, and the
only way to test, that it is increased, is to send more than 4096 simultaneous
requests.

On my laptop it runs less than a second. Moreover, it can be run in parallel with
other tests, so I do not think, that 5k here is too many.

If you want, I can delete the test. Or reduce count from 5k to "maybe a dozen", but
it will not test anything. Should I do it?

> 
> The test would win from adding more randomness and more tries to
> changes in msg max: e.g. why not increase/decrease msg max a few
> times under unceasing load? This would test both growing the fiber
> pool and shrinking it.

There is no sense in randomness. It just makes it harder to reproduce. I added
test on fiber pool decreasing when it is stuck. Appears, that it hangs, until
at least one request is finished. So I reworked net_msg_max update. Diff becomes
too big, so see the entire patch below.


diff --git a/src/box/box.cc b/src/box/box.cc
index d2dfc5b5f..0d1f0e6ae 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -759,6 +759,14 @@ box_set_vinyl_timeout(void)
  	vinyl_engine_set_timeout(vinyl,	cfg_getd("vinyl_timeout"));
  }
  
+void
+box_set_net_msg_max(void)
+{
+	int new_iproto_msg_max = cfg_geti("net_msg_max");
+	iproto_set_msg_max(new_iproto_msg_max);
+	fiber_pool_set_max_size(&tx_fiber_pool, new_iproto_msg_max);
+}
+
  /* }}} configuration bindings */
  
  /**
@@ -1711,7 +1719,7 @@ static inline void
  box_cfg_xc(void)
  {
  	/* Join the cord interconnect as "tx" endpoint. */
-	fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE,
+	fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE_DEFAULT,
  			  FIBER_POOL_IDLE_TIMEOUT);
  	/* Add an extra endpoint for WAL wake up/rollback messages. */
  	cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
@@ -1735,6 +1743,7 @@ box_cfg_xc(void)
  	box_check_instance_uuid(&instance_uuid);
  	box_check_replicaset_uuid(&replicaset_uuid);
  
+	box_set_net_msg_max();
  	box_set_checkpoint_count();
  	box_set_too_long_threshold();
  	box_set_replication_timeout();
diff --git a/src/box/box.h b/src/box/box.h
index c9b5aad01..7726cb4f3 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -181,6 +181,7 @@ void box_set_vinyl_cache(void);
  void box_set_vinyl_timeout(void);
  void box_set_replication_timeout(void);
  void box_set_replication_connect_quorum(void);
+void box_set_net_msg_max(void);
  
  extern "C" {
  #endif /* defined(__cplusplus) */
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index baa6bb660..3212d9697 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -63,8 +63,10 @@
  #include "applier.h"
  #include "cfg.h"
  
-/* The number of iproto messages in flight */
-enum { IPROTO_MSG_MAX = 768 };
+enum {
+	IPROTO_MSG_MAX_DEFAULT = 768,
+	IPROTO_MSG_MAX_MIN = 2,
+};
  
  /**
   * Network readahead. A signed integer to avoid
@@ -83,6 +85,9 @@ enum { IPROTO_MSG_MAX = 768 };
   */
  unsigned iproto_readahead = 16320;
  
+/* The maximal number of iproto messages in fly. */
+static int iproto_msg_max = IPROTO_MSG_MAX_DEFAULT;
+
  /**
   * How big is a buffer which needs to be shrunk before
   * it is put back into buffer cache.
@@ -375,7 +380,7 @@ static inline bool
  iproto_check_msg_max()
  {
  	size_t request_count = mempool_count(&iproto_msg_pool);
-	return request_count > IPROTO_MSG_MAX;
+	return request_count > (size_t) iproto_msg_max;
  }
  
  static struct iproto_msg *
@@ -1692,7 +1697,7 @@ net_cord_f(va_list /* ap */)
  	cbus_endpoint_create(&endpoint, "net", fiber_schedule_cb, fiber());
  	/* Create a pipe to "tx" thread. */
  	cpipe_create(&tx_pipe, "tx");
-	cpipe_set_max_input(&tx_pipe, IPROTO_MSG_MAX/2);
+	cpipe_set_max_input(&tx_pipe, iproto_msg_max / 2);
  	/* Process incomming messages. */
  	cbus_loop(&endpoint);
  
@@ -1720,65 +1725,130 @@ iproto_init()
  
  	/* Create a pipe to "net" thread. */
  	cpipe_create(&net_pipe, "net");
-	cpipe_set_max_input(&net_pipe, IPROTO_MSG_MAX/2);
+	cpipe_set_max_input(&net_pipe, iproto_msg_max / 2);
  }
  
+/** IProto configuration change result codes. */
+enum {
+	IPROTO_CFG_OK,
+	IPROTO_CFG_ERROR,
+	IPROTO_CFG_NOT_FINISHED,
+};
+
+/** Available IProto configuration changes. */
+enum iproto_cfg_op {
+	IPROTO_CFG_BIND,
+	IPROTO_CFG_MSG_MAX,
+	IPROTO_CFG_LISTEN
+};
+
  /**
   * Since there is no way to "synchronously" change the
- * state of the io thread, to change the listen port
- * we need to bounce a couple of messages to and
- * from this thread.
+ * state of the io thread, to change the listen port or max
+ * message count in fly it is needed to send a special message to
+ * IProto thread.
   */
-struct iproto_bind_msg: public cbus_call_msg
+struct iproto_cfg_msg: public cmsg
  {
-	const char *uri;
+	/** Operation to execute in IProto thread. */
+	enum iproto_cfg_op op;
+	union {
+		/** New URI to bind to. */
+		const char *uri;
+
+		/** New IProto max message count in fly. */
+		int iproto_msg_max;
+	};
+	/**
+	 * Cfg result code that can be read atomically by
+	 * different threads - TX and IProto, because it consists
+	 * of single byte.
+	 */
+	int8_t rc;
+	/**
+	 * Diag can be read by TX thread only when rc becomes
+	 * not IPROTO_CFG_NOT_FINISHED.
+	 */
+	struct diag diag;
  };
  
-static int
-iproto_do_bind(struct cbus_call_msg *m)
+static inline void
+iproto_cfg_msg_create(struct iproto_cfg_msg *msg, enum iproto_cfg_op op)
+{
+	memset(msg, 0, sizeof(*msg));
+	msg->rc = IPROTO_CFG_NOT_FINISHED;
+	msg->op = op;
+}
+
+static void
+iproto_do_cfg_f(struct cmsg *m)
  {
-	const char *uri  = ((struct iproto_bind_msg *) m)->uri;
+	struct iproto_cfg_msg *cfg_msg = (struct iproto_cfg_msg *) m;
+	int old;
  	try {
-		if (evio_service_is_active(&binary))
-			evio_service_stop(&binary);
-		if (uri != NULL)
-			evio_service_bind(&binary, uri);
+		switch (cfg_msg->op) {
+		case IPROTO_CFG_BIND:
+			if (evio_service_is_active(&binary))
+				evio_service_stop(&binary);
+			if (cfg_msg->uri != NULL)
+				evio_service_bind(&binary, cfg_msg->uri);
+			break;
+		case IPROTO_CFG_MSG_MAX:
+			cpipe_set_max_input(&tx_pipe,
+					    cfg_msg->iproto_msg_max / 2);
+			old = iproto_msg_max;
+			iproto_msg_max = cfg_msg->iproto_msg_max;
+			if (old < iproto_msg_max)
+				iproto_resume();
+			break;
+		case IPROTO_CFG_LISTEN:
+			if (evio_service_is_active(&binary))
+				evio_service_listen(&binary);
+			break;
+		default:
+			unreachable();
+		}
+		cfg_msg->rc = IPROTO_CFG_OK;
  	} catch (Exception *e) {
-		return -1;
+		diag_move(diag_get(), &cfg_msg->diag);
+		cfg_msg->rc = IPROTO_CFG_ERROR;
  	}
-	return 0;
  }
  
-static int
-iproto_do_listen(struct cbus_call_msg *m)
+static inline int
+iproto_do_cfg(struct iproto_cfg_msg *msg)
  {
-	(void) m;
-	try {
-		if (evio_service_is_active(&binary))
-			evio_service_listen(&binary);
-	} catch (Exception *e) {
+	const struct cmsg_hop cfg_route[] = {
+		{ iproto_do_cfg_f, NULL },
+	};
+	cmsg_init(msg, cfg_route);
+	cpipe_push(&net_pipe, msg);
+	while (msg->rc == IPROTO_CFG_NOT_FINISHED)
+		fiber_sleep(0.001);
+	if (msg->rc == IPROTO_CFG_ERROR) {
+		diag_move(&msg->diag, diag_get());
  		return -1;
  	}
+	assert(msg->rc == IPROTO_CFG_OK);
  	return 0;
  }
  
  void
  iproto_bind(const char *uri)
  {
-	static struct iproto_bind_msg m;
-	m.uri = uri;
-	if (cbus_call(&net_pipe, &tx_pipe, &m, iproto_do_bind,
-		      NULL, TIMEOUT_INFINITY))
+	struct iproto_cfg_msg cfg_msg;
+	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_BIND);
+	cfg_msg.uri = uri;
+	if (iproto_do_cfg(&cfg_msg) != 0)
  		diag_raise();
  }
  
  void
  iproto_listen()
  {
-	/* Declare static to avoid stack corruption on fiber cancel. */
-	static struct cbus_call_msg m;
-	if (cbus_call(&net_pipe, &tx_pipe, &m, iproto_do_listen,
-		      NULL, TIMEOUT_INFINITY))
+	struct iproto_cfg_msg cfg_msg;
+	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_LISTEN);
+	if (iproto_do_cfg(&cfg_msg) != 0)
  		diag_raise();
  }
  
@@ -1793,3 +1863,19 @@ iproto_reset_stat(void)
  {
  	rmean_cleanup(rmean_net);
  }
+
+void
+iproto_set_msg_max(int new_iproto_msg_max)
+{
+	if (new_iproto_msg_max < IPROTO_MSG_MAX_MIN) {
+		tnt_raise(ClientError, ER_CFG, "iproto_msg_max",
+			  tt_sprintf("minimal value is %d",
+				     IPROTO_MSG_MAX_MIN));
+	}
+	struct iproto_cfg_msg cfg_msg;
+	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_MSG_MAX);
+	cfg_msg.iproto_msg_max = new_iproto_msg_max;
+	if (iproto_do_cfg(&cfg_msg) != 0)
+		diag_raise();
+	cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2);
+}
diff --git a/src/box/iproto.h b/src/box/iproto.h
index 0268000da..a1dddc405 100644
--- a/src/box/iproto.h
+++ b/src/box/iproto.h
@@ -63,6 +63,9 @@ iproto_bind(const char *uri);
  void
  iproto_listen();
  
+void
+iproto_set_msg_max(int iproto_msg_max);
+
  #endif /* defined(__cplusplus) */
  
  #endif
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 5e88ca348..629ded626 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -220,6 +220,17 @@ lbox_cfg_set_vinyl_timeout(struct lua_State *L)
  	return 0;
  }
  
+static int
+lbox_cfg_set_net_msg_max(struct lua_State *L)
+{
+	try {
+		box_set_net_msg_max();
+	} catch (Exception *) {
+		luaT_error(L);
+	}
+	return 0;
+}
+
  static int
  lbox_cfg_set_worker_pool_threads(struct lua_State *L)
  {
@@ -275,6 +286,7 @@ box_lua_cfg_init(struct lua_State *L)
  		{"cfg_set_replication_timeout", lbox_cfg_set_replication_timeout},
  		{"cfg_set_replication_connect_quorum",
  			lbox_cfg_set_replication_connect_quorum},
+		{"cfg_set_net_msg_max", lbox_cfg_set_net_msg_max},
  		{NULL, NULL}
  	};
  
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 3a5a6d46a..5e3efdb4b 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -63,6 +63,7 @@ local default_cfg = {
      feedback_enabled      = true,
      feedback_host         = "https://feedback.tarantool.io",
      feedback_interval     = 3600,
+    net_msg_max           = 768,
  }
  
  -- types of available options
@@ -123,6 +124,7 @@ local template_cfg = {
      feedback_enabled      = 'boolean',
      feedback_host         = 'string',
      feedback_interval     = 'number',
+    net_msg_max           = 'number',
  }
  
  local function normalize_uri(port)
@@ -195,6 +197,7 @@ local dynamic_cfg = {
      replication_timeout     = private.cfg_set_replication_timeout,
      replication_connect_quorum = private.cfg_set_replication_connect_quorum,
      replication_skip_conflict = function() end,
+    net_msg_max             = private.cfg_set_net_msg_max,
  }
  
  local dynamic_cfg_skip_at_load = {
diff --git a/src/fiber_pool.c b/src/fiber_pool.c
index aa8b19510..f60b6f0ff 100644
--- a/src/fiber_pool.c
+++ b/src/fiber_pool.c
@@ -136,6 +136,14 @@ fiber_pool_cb(ev_loop *loop, struct ev_watcher *watcher, int events)
  	}
  }
  
+void
+fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size)
+{
+	if (new_max_size < FIBER_POOL_SIZE_DEFAULT)
+		new_max_size = FIBER_POOL_SIZE_DEFAULT;
+	pool->max_size = new_max_size;
+}
+
  void
  fiber_pool_create(struct fiber_pool *pool, const char *name, int max_pool_size,
  		  float idle_timeout)
diff --git a/src/fiber_pool.h b/src/fiber_pool.h
index d6a95105b..2a04d6063 100644
--- a/src/fiber_pool.h
+++ b/src/fiber_pool.h
@@ -41,7 +41,10 @@
  extern "C" {
  #endif /* defined(__cplusplus) */
  
-enum { FIBER_POOL_SIZE = 4096, FIBER_POOL_IDLE_TIMEOUT = 1 };
+enum {
+	FIBER_POOL_SIZE_DEFAULT = 4096,
+	FIBER_POOL_IDLE_TIMEOUT = 1
+};
  
  /**
   * A pool of worker fibers to handle messages,
@@ -83,6 +86,15 @@ void
  fiber_pool_create(struct fiber_pool *pool, const char *name, int max_pool_size,
  		  float idle_timeout);
  
+/**
+ * Set maximal fiber pool size. If a new size is bigger than the
+ * current, then pending requests processing is started.
+ * @param pool Fiber pool to set size.
+ * @param new_max_size New maximal size.
+ */
+void
+fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size);
+
  /**
   * Destroy a fiber pool
   */
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 5625f1466..7504a2813 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -21,33 +21,34 @@ box.cfg
  16	memtx_max_tuple_size:1048576
  17	memtx_memory:107374182
  18	memtx_min_tuple_size:16
-19	pid_file:box.pid
-20	read_only:false
-21	readahead:16320
-22	replication_connect_timeout:4
-23	replication_skip_conflict:false
-24	replication_sync_lag:10
-25	replication_timeout:1
-26	rows_per_wal:500000
-27	slab_alloc_factor:1.05
-28	too_long_threshold:0.5
-29	vinyl_bloom_fpr:0.05
-30	vinyl_cache:134217728
-31	vinyl_dir:.
-32	vinyl_max_tuple_size:1048576
-33	vinyl_memory:134217728
-34	vinyl_page_size:8192
-35	vinyl_range_size:1073741824
-36	vinyl_read_threads:1
-37	vinyl_run_count_per_level:2
-38	vinyl_run_size_ratio:3.5
-39	vinyl_timeout:60
-40	vinyl_write_threads:2
-41	wal_dir:.
-42	wal_dir_rescan_delay:2
-43	wal_max_size:268435456
-44	wal_mode:write
-45	worker_pool_threads:4
+19	net_msg_max:768
+20	pid_file:box.pid
+21	read_only:false
+22	readahead:16320
+23	replication_connect_timeout:4
+24	replication_skip_conflict:false
+25	replication_sync_lag:10
+26	replication_timeout:1
+27	rows_per_wal:500000
+28	slab_alloc_factor:1.05
+29	too_long_threshold:0.5
+30	vinyl_bloom_fpr:0.05
+31	vinyl_cache:134217728
+32	vinyl_dir:.
+33	vinyl_max_tuple_size:1048576
+34	vinyl_memory:134217728
+35	vinyl_page_size:8192
+36	vinyl_range_size:1073741824
+37	vinyl_read_threads:1
+38	vinyl_run_count_per_level:2
+39	vinyl_run_size_ratio:3.5
+40	vinyl_timeout:60
+41	vinyl_write_threads:2
+42	wal_dir:.
+43	wal_dir_rescan_delay:2
+44	wal_max_size:268435456
+45	wal_mode:write
+46	worker_pool_threads:4
  --
  -- Test insert from detached fiber
  --
diff --git a/test/box/admin.result b/test/box/admin.result
index 2168c3adb..29c8c592d 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -54,6 +54,8 @@ cfg_filter(box.cfg)
      - 107374182
    - - memtx_min_tuple_size
      - <hidden>
+  - - net_msg_max
+    - 768
    - - pid_file
      - <hidden>
    - - read_only
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 28449d9cc..61c5f79af 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -50,6 +50,8 @@ cfg_filter(box.cfg)
      - 107374182
    - - memtx_min_tuple_size
      - <hidden>
+  - - net_msg_max
+    - 768
    - - pid_file
      - <hidden>
    - - read_only
@@ -147,6 +149,8 @@ cfg_filter(box.cfg)
      - 107374182
    - - memtx_min_tuple_size
      - <hidden>
+  - - net_msg_max
+    - 768
    - - pid_file
      - <hidden>
    - - read_only
@@ -411,6 +415,29 @@ test_run:cmd("cleanup server cfg_tester")
  ---
  - true
  ...
+--
+-- gh-3320: box.cfg{net_msg_max}.
+--
+box.cfg{net_msg_max = 'invalid'}
+---
+- error: 'Incorrect value for option ''net_msg_max'': should be of type number'
+...
+box.cfg{net_msg_max = 0}
+---
+- error: 'Incorrect value for option ''iproto_msg_max'': minimal value is 2'
+...
+old = box.cfg.net_msg_max
+---
+...
+box.cfg{net_msg_max = 2}
+---
+...
+box.cfg{net_msg_max = old + 1000}
+---
+...
+box.cfg{net_msg_max = old}
+---
+...
  test_run:cmd("clear filter")
  ---
  - true
diff --git a/test/box/cfg.test.lua b/test/box/cfg.test.lua
index a73ae395b..2d819c94c 100644
--- a/test/box/cfg.test.lua
+++ b/test/box/cfg.test.lua
@@ -81,4 +81,14 @@ test_run:cmd("switch default")
  test_run:cmd("stop server cfg_tester")
  test_run:cmd("cleanup server cfg_tester")
  
+--
+-- gh-3320: box.cfg{net_msg_max}.
+--
+box.cfg{net_msg_max = 'invalid'}
+box.cfg{net_msg_max = 0}
+old = box.cfg.net_msg_max
+box.cfg{net_msg_max = 2}
+box.cfg{net_msg_max = old + 1000}
+box.cfg{net_msg_max = old}
+
  test_run:cmd("clear filter")
diff --git a/test/box/net_msg_max.result b/test/box/net_msg_max.result
index de22bcbb9..a5315bd3d 100644
--- a/test/box/net_msg_max.result
+++ b/test/box/net_msg_max.result
@@ -25,7 +25,7 @@ finished = 0
  continue = false
  ---
  ...
-limit = 768
+limit = box.cfg.net_msg_max
  ---
  ...
  run_max = (limit - 100) / 2
@@ -121,6 +121,134 @@ wait_active(limit + 1)
  wait_finished(run_max)
  ---
  ...
+--
+-- gh-3320: allow to change maximal count of messages.
+--
+--
+-- Test minimal iproto msg count.
+--
+box.cfg{net_msg_max = 2}
+---
+...
+conn:ping()
+---
+- true
+...
+#conn.space._space:select{} > 0
+---
+- true
+...
+run_max = 15
+---
+...
+run_workers(conn)
+---
+...
+wait_active(3)
+---
+...
+active
+---
+- 3
+...
+wait_finished(run_max)
+---
+...
+--
+-- Increate maximal message count when nothing is blocked.
+--
+box.cfg{net_msg_max = limit * 2}
+---
+...
+run_max = limit * 2 - 100
+---
+...
+run_workers(conn)
+---
+...
+wait_active(run_max)
+---
+...
+-- Max can be decreased back even if now the limit is violated.
+-- But a new input is blocked in such a case.
+box.cfg{net_msg_max = limit}
+---
+...
+old_active = active
+---
+...
+for i = 1, 300 do fiber.create(do_long, conn) end
+---
+...
+-- Afer time active count is not changed - the input is blocked.
+wait_active(old_active)
+---
+...
+wait_finished(active + 300)
+---
+...
+--
+-- Check that changing net_msg_max can resume stopped
+-- connections.
+--
+run_max = limit / 2 + 100
+---
+...
+run_workers(conn)
+---
+...
+run_workers(conn2)
+---
+...
+wait_active(limit + 1)
+---
+...
+box.cfg{net_msg_max = limit * 2}
+---
+...
+wait_active(run_max * 2)
+---
+...
+wait_finished(active)
+---
+...
+--
+-- Test TX fiber pool size limit. It is increased together with
+-- iproto msg max.
+--
+run_max = 2500
+---
+...
+box.cfg{net_msg_max = 5000}
+---
+...
+run_workers(conn)
+---
+...
+run_workers(conn2)
+---
+...
+wait_active(5000)
+---
+...
+-- Allow to decrease tx fiber pool size even if is full already.
+box.cfg{net_msg_max = 3000}
+---
+...
+wait_active(5000)
+---
+...
+-- More workers can be run, but they will be blocked until older
+-- requests are finished.
+run_max = 100
+---
+...
+run_workers(conn)
+---
+...
+wait_finished(5100)
+---
+...
  conn2:close()
  ---
  ...
@@ -130,6 +258,6 @@ conn:close()
  box.schema.user.revoke('guest', 'read,write,execute', 'universe')
  ---
  ...
-box.cfg{readahead = old_readahead}
+box.cfg{readahead = old_readahead, net_msg_max = limit}
  ---
  ...
diff --git a/test/box/net_msg_max.test.lua b/test/box/net_msg_max.test.lua
index 39f8f53f7..378b8ab2a 100644
--- a/test/box/net_msg_max.test.lua
+++ b/test/box/net_msg_max.test.lua
@@ -9,7 +9,7 @@ conn2 = net_box.connect(box.cfg.listen)
  active = 0
  finished = 0
  continue = false
-limit = 768
+limit = box.cfg.net_msg_max
  run_max = (limit - 100) / 2
  
  old_readahead = box.cfg.readahead
@@ -72,8 +72,70 @@ run_workers(conn)
  wait_active(limit + 1)
  wait_finished(run_max)
  
+--
+-- gh-3320: allow to change maximal count of messages.
+--
+
+--
+-- Test minimal iproto msg count.
+--
+box.cfg{net_msg_max = 2}
+conn:ping()
+#conn.space._space:select{} > 0
+run_max = 15
+run_workers(conn)
+wait_active(3)
+active
+wait_finished(run_max)
+
+--
+-- Increate maximal message count when nothing is blocked.
+--
+box.cfg{net_msg_max = limit * 2}
+run_max = limit * 2 - 100
+run_workers(conn)
+wait_active(run_max)
+-- Max can be decreased back even if now the limit is violated.
+-- But a new input is blocked in such a case.
+box.cfg{net_msg_max = limit}
+old_active = active
+for i = 1, 300 do fiber.create(do_long, conn) end
+-- Afer time active count is not changed - the input is blocked.
+wait_active(old_active)
+wait_finished(active + 300)
+
+--
+-- Check that changing net_msg_max can resume stopped
+-- connections.
+--
+run_max = limit / 2 + 100
+run_workers(conn)
+run_workers(conn2)
+wait_active(limit + 1)
+box.cfg{net_msg_max = limit * 2}
+wait_active(run_max * 2)
+wait_finished(active)
+
+--
+-- Test TX fiber pool size limit. It is increased together with
+-- iproto msg max.
+--
+run_max = 2500
+box.cfg{net_msg_max = 5000}
+run_workers(conn)
+run_workers(conn2)
+wait_active(5000)
+-- Allow to decrease tx fiber pool size even if is full already.
+box.cfg{net_msg_max = 3000}
+wait_active(5000)
+-- More workers can be run, but they will be blocked until older
+-- requests are finished.
+run_max = 100
+run_workers(conn)
+wait_finished(5100)
+
  conn2:close()
  conn:close()
  
  box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-box.cfg{readahead = old_readahead}
+box.cfg{readahead = old_readahead, net_msg_max = limit}

^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2018-05-04 11:56 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-05-03 21:05 [tarantool-patches] [PATCH v2 0/2] IProto fix and net_msg_max option Vladislav Shpilevoy
2018-05-03 21:05 ` [tarantool-patches] [PATCH v2 1/2] iproto: fix error with unstoppable batching Vladislav Shpilevoy
2018-05-04  8:26   ` [tarantool-patches] " Konstantin Osipov
2018-05-04 11:56     ` Vladislav Shpilevoy
2018-05-03 21:05 ` [tarantool-patches] [PATCH v2 2/2] iproto: allow to configure IPROTO_MSG_MAX Vladislav Shpilevoy
2018-05-04  8:46   ` [tarantool-patches] " Konstantin Osipov
2018-05-04 11:56     ` Vladislav Shpilevoy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox