[tarantool-patches] [PATCH v2 1/2] iproto: fix error with unstoppable batching

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri May 4 00:05:19 MSK 2018


IProto connection stops input reading, when active request count
is reached. But when multiple requests are in a batch, the IProto
does not check the limit, so it can be violated.

Lets check the limit during batch parsing after each message too,
not only once before parsing.
---
 src/box/iproto.cc             | 143 +++++++++++++++++++++++++++---------------
 test/box/errinj.result        |  69 ++++++++++++++++++++
 test/box/errinj.test.lua      |  29 +++++++++
 test/box/net_msg_max.result   |  21 ++++++-
 test/box/net_msg_max.test.lua |  13 +++-
 5 files changed, 222 insertions(+), 53 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 9ccaf1dc7..dd7f97ecf 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -192,9 +192,13 @@ static struct mempool iproto_msg_pool;
 static struct iproto_msg *
 iproto_msg_new(struct iproto_connection *con)
 {
+	ERROR_INJECT(ERRINJ_TESTING, { return NULL; });
 	struct iproto_msg *msg =
-		(struct iproto_msg *) mempool_alloc_xc(&iproto_msg_pool);
-	msg->connection = con;
+		(struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
+	if (msg != NULL)
+		msg->connection = con;
+	else
+		diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
 	return msg;
 }
 
@@ -384,30 +388,6 @@ iproto_check_msg_max()
 	return request_count > IPROTO_MSG_MAX;
 }
 
-/**
- * Throttle the queue to the tx thread and ensure the fiber pool
- * in tx thread is not depleted by a flood of incoming requests:
- * resume a stopped connection only if there is a spare message
- * object in the message pool.
- */
-static void
-iproto_resume()
-{
-	/*
-	 * Most of the time we have nothing to do here: throttling
-	 * is not active.
-	 */
-	if (rlist_empty(&stopped_connections))
-		return;
-	if (iproto_check_msg_max())
-		return;
-
-	struct iproto_connection *con;
-	con = rlist_first_entry(&stopped_connections, struct iproto_connection,
-				in_stop_list);
-	ev_feed_event(con->loop, &con->input, EV_READ);
-}
-
 /**
  * A connection is idle when the client is gone
  * and there are no outstanding msgs in the msg queue.
@@ -575,20 +555,31 @@ iproto_connection_input_buffer(struct iproto_connection *con)
 	return new_ibuf;
 }
 
-/** Enqueue all requests which were read up. */
-static inline void
+/**
+ * Enqueue all requests which were read up. If a requests limit is
+ * reached - stop the connection input even if not the whole batch
+ * is enqueued. Else try to read more feeding read event to the
+ * event loop.
+ * @param con Connection to enqueue in.
+ * @param in Buffer to parse.
+ *
+ * @retval  0 Success.
+ * @retval -1 Invalid MessagePack error.
+ */
+static inline int
 iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 {
 	int n_requests = 0;
 	bool stop_input = false;
-	while (con->parse_size && stop_input == false) {
+	while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) {
 		const char *reqstart = in->wpos - con->parse_size;
 		const char *pos = reqstart;
 		/* Read request length. */
 		if (mp_typeof(*pos) != MP_UINT) {
 			cpipe_flush_input(&tx_pipe);
-			tnt_raise(ClientError, ER_INVALID_MSGPACK,
-				  "packet length");
+			diag_set(ClientError, ER_INVALID_MSGPACK,
+				 "packet length");
+			return -1;
 		}
 		if (mp_check_uint(pos, in->wpos) >= 0)
 			break;
@@ -597,6 +588,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 		if (reqend > in->wpos)
 			break;
 		struct iproto_msg *msg = iproto_msg_new(con);
+		if (msg == NULL) {
+			/*
+			 * Do not tread it as an error - just wait
+			 * until some of requests are finished.
+			 */
+			iproto_connection_stop(con);
+			return 0;
+		}
 		msg->p_ibuf = con->p_ibuf;
 		msg->wpos = con->wpos;
 
@@ -644,9 +643,49 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 		 * requests, keep reading input, if only to avoid
 		 * a deadlock on this connection.
 		 */
-		ev_feed_event(con->loop, &con->input, EV_READ);
+		if (iproto_check_msg_max())
+			iproto_connection_stop(con);
+		else
+			ev_feed_event(con->loop, &con->input, EV_READ);
 	}
 	cpipe_flush_input(&tx_pipe);
+	return 0;
+}
+
+/**
+ * Enqueue connection's pending requests. Completely ressurect the
+ * connection, if it has no more requests, and the limit still is
+ * not reached.
+ */
+static void
+iproto_connection_resume(struct iproto_connection *con)
+{
+	assert(! iproto_check_msg_max());
+	rlist_del(&con->in_stop_list);
+	if (iproto_enqueue_batch(con, con->p_ibuf) != 0) {
+		struct error *e = box_error_last();
+		iproto_write_error(con->input.fd, e, ::schema_version, 0);
+		error_log(e);
+		iproto_connection_close(con);
+	}
+}
+
+/**
+ * Throttle the queue to the tx thread and ensure the fiber pool
+ * in tx thread is not depleted by a flood of incoming requests:
+ * resume a stopped connection only if there is a spare message
+ * object in the message pool.
+ */
+static void
+iproto_resume()
+{
+	while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
+		struct iproto_connection *con =
+			rlist_first_entry(&stopped_connections,
+					  struct iproto_connection,
+					  in_stop_list);
+		iproto_connection_resume(con);
+	}
 }
 
 static void
@@ -657,16 +696,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
 		(struct iproto_connection *) watcher->data;
 	int fd = con->input.fd;
 	assert(fd >= 0);
-	if (! rlist_empty(&con->in_stop_list)) {
-		/* Resumed stopped connection. */
-		rlist_del(&con->in_stop_list);
-		/*
-		 * This connection may have no input, so
-		 * resume one more connection which might have
-		 * input.
-		 */
-		iproto_resume();
-	}
+	assert(rlist_empty(&con->in_stop_list));
 	/*
 	 * Throttle if there are too many pending requests,
 	 * otherwise we might deplete the fiber pool in tx
@@ -703,7 +733,8 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
 		in->wpos += nrd;
 		con->parse_size += nrd;
 		/* Enqueue all requests which are fully read up. */
-		iproto_enqueue_batch(con, in);
+		if (iproto_enqueue_batch(con, in) != 0)
+			diag_raise();
 	} catch (Exception *e) {
 		/* Best effort at sending the error message to the client. */
 		iproto_write_error(fd, e, ::schema_version, 0);
@@ -801,7 +832,11 @@ static struct iproto_connection *
 iproto_connection_new(int fd)
 {
 	struct iproto_connection *con = (struct iproto_connection *)
-		mempool_alloc_xc(&iproto_connection_pool);
+		mempool_alloc(&iproto_connection_pool);
+	if (con == NULL) {
+		diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con");
+		return NULL;
+	}
 	con->input.data = con->output.data = con;
 	con->loop = loop();
 	ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
@@ -1471,7 +1506,8 @@ net_end_join(struct cmsg *m)
 	 * Enqueue any messages if they are in the readahead
 	 * queue. Will simply start input otherwise.
 	 */
-	iproto_enqueue_batch(con, msg->p_ibuf);
+	if (iproto_enqueue_batch(con, msg->p_ibuf) != 0)
+		iproto_connection_close(con);
 }
 
 static void
@@ -1574,20 +1610,29 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
 {
 	(void) addr;
 	(void) addrlen;
-	struct iproto_connection *con;
-
-	con = iproto_connection_new(fd);
+	struct iproto_msg *msg;
+	struct iproto_connection *con = iproto_connection_new(fd);
+	if (con == NULL)
+		goto error_conn;
 	/*
 	 * Ignore msg allocation failure - the queue size is
 	 * fixed so there is a limited number of msgs in
 	 * use, all stored in just a few blocks of the memory pool.
 	 */
-	struct iproto_msg *msg = iproto_msg_new(con);
+	msg = iproto_msg_new(con);
+	if (msg == NULL)
+		goto error_msg;
 	cmsg_init(&msg->base, connect_route);
 	msg->p_ibuf = con->p_ibuf;
 	msg->wpos = con->wpos;
 	msg->close_connection = false;
 	cpipe_push(&tx_pipe, &msg->base);
+	return;
+error_msg:
+	mempool_free(&iproto_connection_pool, con);
+error_conn:
+	close(fd);
+	return;
 }
 
 static struct evio_service binary; /* iproto binary listener */
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 5b4bc23a3..5da3e2642 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -1150,6 +1150,75 @@ cn:close()
 s:drop()
 ---
 ...
+--
+-- In messages memory pool is over, stop the connection, until the
+-- pool has free memory.
+--
+started = 0
+---
+...
+finished = 0
+---
+...
+continue = false
+---
+...
+test_run:cmd('setopt delimiter ";"')
+---
+- true
+...
+function long_poll_f()
+    started = started + 1
+    f = fiber.self()
+    while not continue do fiber.sleep(0.01) end
+    finished = finished + 1
+end;
+---
+...
+test_run:cmd('setopt delimiter ""');
+---
+- true
+...
+cn = net_box.connect(box.cfg.listen)
+---
+...
+function long_poll() cn:call('long_poll_f') end
+---
+...
+_ = fiber.create(long_poll)
+---
+...
+while started ~= 1 do fiber.sleep(0.01) end
+---
+...
+-- Simulate OOM for new requests.
+errinj.set("ERRINJ_TESTING", true)
+---
+- ok
+...
+_ = fiber.create(long_poll)
+---
+...
+fiber.sleep(0.1)
+---
+...
+started == 1
+---
+- true
+...
+continue = true
+---
+...
+errinj.set("ERRINJ_TESTING", false)
+---
+- ok
+...
+while finished ~= 2 do fiber.sleep(0.01) end
+---
+...
+cn:close()
+---
+...
 box.schema.user.revoke('guest', 'read,write,execute','universe')
 ---
 ...
diff --git a/test/box/errinj.test.lua b/test/box/errinj.test.lua
index e1460d1b6..365d535c2 100644
--- a/test/box/errinj.test.lua
+++ b/test/box/errinj.test.lua
@@ -390,6 +390,35 @@ ok, err
 cn:close()
 s:drop()
 
+--
+-- In messages memory pool is over, stop the connection, until the
+-- pool has free memory.
+--
+started = 0
+finished = 0
+continue = false
+test_run:cmd('setopt delimiter ";"')
+function long_poll_f()
+    started = started + 1
+    f = fiber.self()
+    while not continue do fiber.sleep(0.01) end
+    finished = finished + 1
+end;
+test_run:cmd('setopt delimiter ""');
+cn = net_box.connect(box.cfg.listen)
+function long_poll() cn:call('long_poll_f') end
+_ = fiber.create(long_poll)
+while started ~= 1 do fiber.sleep(0.01) end
+-- Simulate OOM for new requests.
+errinj.set("ERRINJ_TESTING", true)
+_ = fiber.create(long_poll)
+fiber.sleep(0.1)
+started == 1
+continue = true
+errinj.set("ERRINJ_TESTING", false)
+while finished ~= 2 do fiber.sleep(0.01) end
+cn:close()
+
 box.schema.user.revoke('guest', 'read,write,execute','universe')
 
 --
diff --git a/test/box/net_msg_max.result b/test/box/net_msg_max.result
index dde2016b7..de22bcbb9 100644
--- a/test/box/net_msg_max.result
+++ b/test/box/net_msg_max.result
@@ -70,10 +70,13 @@ end;
 ...
 -- Wait until 'active' stops growing - it means, that the input
 -- is blocked.
+-- No more messages.
 function wait_active(value)
 	while value ~= active do
 		fiber.sleep(0.01)
 	end
+	fiber.sleep(0.01)
+	assert(value == active)
 end;
 ---
 ...
@@ -99,11 +102,23 @@ run_workers(conn2)
 wait_active(run_max * 2)
 ---
 ...
-active == run_max * 2 or active
+wait_finished(active)
 ---
-- true
 ...
-wait_finished(active)
+--
+-- Test that each message in a batch is checked. When a limit is
+-- reached, other messages must be processed later.
+--
+run_max = limit * 5
+---
+...
+run_workers(conn)
+---
+...
+wait_active(limit + 1)
+---
+...
+wait_finished(run_max)
 ---
 ...
 conn2:close()
diff --git a/test/box/net_msg_max.test.lua b/test/box/net_msg_max.test.lua
index 560e37017..39f8f53f7 100644
--- a/test/box/net_msg_max.test.lua
+++ b/test/box/net_msg_max.test.lua
@@ -44,6 +44,9 @@ function wait_active(value)
 	while value ~= active do
 		fiber.sleep(0.01)
 	end
+	fiber.sleep(0.01)
+-- No more messages.
+	assert(value == active)
 end;
 
 function wait_finished(needed)
@@ -58,9 +61,17 @@ test_run:cmd("setopt delimiter ''");
 run_workers(conn)
 run_workers(conn2)
 wait_active(run_max * 2)
-active == run_max * 2 or active
 wait_finished(active)
 
+--
+-- Test that each message in a batch is checked. When a limit is
+-- reached, other messages must be processed later.
+--
+run_max = limit * 5
+run_workers(conn)
+wait_active(limit + 1)
+wait_finished(run_max)
+
 conn2:close()
 conn:close()
 
-- 
2.15.1 (Apple Git-101)





More information about the Tarantool-patches mailing list