[PATCH v2 2/4] iproto: fix error with unstoppable batching

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Mon Apr 23 20:05:02 MSK 2018


IProto connection stops input reading, when active request count
is reached. But when multiple requests are in a batch, the IProto
does not check the limit, so it can be violated.

Lets check the limit during batch parsing after each message too,
not only once before parsing.
---
 src/box/iproto.cc               | 29 +++++++++++++++++++----------
 test/box/request_limit.result   | 20 ++++++++++++++++++++
 test/box/request_limit.test.lua | 10 ++++++++++
 3 files changed, 49 insertions(+), 10 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index a88226a9f..be3c5a1a6 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -582,7 +582,8 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 {
 	int n_requests = 0;
 	bool stop_input = false;
-	while (con->parse_size && stop_input == false) {
+	while (con->parse_size != 0 && stop_input == false &&
+	       !iproto_must_stop_input()) {
 		const char *reqstart = in->wpos - con->parse_size;
 		const char *pos = reqstart;
 		/* Read request length. */
@@ -691,18 +692,26 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
 		int nrd = sio_read(fd, in->wpos, ibuf_unused(in));
 		if (nrd < 0) {                  /* Socket is not ready. */
 			ev_io_start(loop, &con->input);
-			return;
-		}
-		if (nrd == 0) {                 /* EOF */
+			/*
+			 * Socket has no data, but there can be
+			 * non-parsed requests, stopped by
+			 * requests limit. Try to enqueue them, if
+			 * exist.
+			 */
+			if (con->parse_size == 0)
+				return;
+		} else if (nrd == 0) {
+			/* EOF */
 			iproto_connection_close(con);
 			return;
-		}
-		/* Count statistics */
-		rmean_collect(rmean_net, IPROTO_RECEIVED, nrd);
+		} else {
+			/* Count statistics */
+			rmean_collect(rmean_net, IPROTO_RECEIVED, nrd);
 
-		/* Update the read position and connection state. */
-		in->wpos += nrd;
-		con->parse_size += nrd;
+			/* Update the read position and connection state. */
+			in->wpos += nrd;
+			con->parse_size += nrd;
+		}
 		/* Enqueue all requests which are fully read up. */
 		iproto_enqueue_batch(con, in);
 	} catch (Exception *e) {
diff --git a/test/box/request_limit.result b/test/box/request_limit.result
index bef998b91..2691aa329 100644
--- a/test/box/request_limit.result
+++ b/test/box/request_limit.result
@@ -108,6 +108,26 @@ active == run_max * 2 or active
 wait_finished(active)
 ---
 ...
+--
+-- Test that each message in a batch is checked. When a limit is
+-- reached, other messages must be processed later.
+--
+run_max = limit * 5
+---
+...
+run_workers(conn)
+---
+...
+wait_block()
+---
+...
+active
+---
+- 769
+...
+wait_finished(run_max)
+---
+...
 conn2:close()
 ---
 ...
diff --git a/test/box/request_limit.test.lua b/test/box/request_limit.test.lua
index 2bc35d8fa..bff7b5282 100644
--- a/test/box/request_limit.test.lua
+++ b/test/box/request_limit.test.lua
@@ -63,6 +63,16 @@ wait_block()
 active == run_max * 2 or active
 wait_finished(active)
 
+--
+-- Test that each message in a batch is checked. When a limit is
+-- reached, other messages must be processed later.
+--
+run_max = limit * 5
+run_workers(conn)
+wait_block()
+active
+wait_finished(run_max)
+
 conn2:close()
 conn:close()
 
-- 
2.15.1 (Apple Git-101)




More information about the Tarantool-patches mailing list