[PATCH 2/3] iproto: fix error with input discarding

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Apr 21 01:52:10 MSK 2018


Long-polling request has a feature allowing to discard an input
buffer in a connection before the main request is finished.

Before the patch discard just calls iproto_resume() trying to
notify libev, that is can get new data. But is makes no sense,
because iproto_resume() continues only connections stopped due
to reached limit of maximal requests in fly, but not connections
whose buffer is overflowed. Such stopped connections can be
continued only after a request is complete.

To continue read from the connection whose buffer was freed by
discarding long-poll it is necessary to explicitly feed to it
EV_READ event. On this event iproto_connection_on_input will read
a blocked data if it exists.
---
 src/box/iproto.cc               |   5 +-
 test/box/request_limit.result   | 114 ++++++++++++++++++++++++++++++++++++++++
 test/box/request_limit.test.lua |  63 ++++++++++++++++++++++
 3 files changed, 180 insertions(+), 2 deletions(-)
 create mode 100644 test/box/request_limit.result
 create mode 100644 test/box/request_limit.test.lua

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index fe502b51b..585e8cc83 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1084,10 +1084,11 @@ net_discard_input(struct cmsg *m)
 {
 	struct iproto_msg *msg = container_of(m, struct iproto_msg,
 					      discard_input);
+	struct iproto_connection *conn = msg->connection;
 	msg->p_ibuf->rpos += msg->len;
 	msg->len = 0;
-	msg->connection->long_poll_requests++;
-	iproto_resume();
+	conn->long_poll_requests++;
+	ev_feed_event(conn->loop, &conn->input, EV_READ);
 }
 
 static void
diff --git a/test/box/request_limit.result b/test/box/request_limit.result
new file mode 100644
index 000000000..464528b41
--- /dev/null
+++ b/test/box/request_limit.result
@@ -0,0 +1,114 @@
+test_run = require('test_run').new()
+---
+...
+fiber = require('fiber')
+---
+...
+net_box = require('net.box')
+---
+...
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+---
+...
+conn = net_box.connect(box.cfg.listen)
+---
+...
+conn2 = net_box.connect(box.cfg.listen)
+---
+...
+active = 0
+---
+...
+continue = false
+---
+...
+limit = 768
+---
+...
+run_max = (limit - 100) / 2
+---
+...
+old_readahead = box.cfg.readahead
+---
+...
+box.cfg{readahead = 9000}
+---
+...
+long_str = string.rep('a', 1000)
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function do_long_f(...)
+	active = active + 1
+	while not continue do
+		fiber.sleep(0.1)
+	end
+	active = active - 1
+end;
+---
+...
+function do_long(c)
+	c:call('do_long_f', {long_str})
+end;
+---
+...
+function run_workers(c)
+	continue = false
+	for i = 1, run_max do
+		fiber.create(do_long, c)
+	end
+end;
+---
+...
+-- Wait until 'active' stops growing - it means, that the input
+-- is blocked.
+function wait_block()
+	local old_val = -1
+	while old_val ~= active do
+		old_val = active
+		fiber.sleep(0.1)
+	end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+--
+-- Test that message count limit is reachable.
+--
+run_workers(conn)
+---
+...
+run_workers(conn2)
+---
+...
+wait_block()
+---
+...
+active == run_max * 2 or active
+---
+- true
+...
+continue = true
+---
+...
+while active ~= 0 do fiber.sleep(0.01) end
+---
+...
+conn2:close()
+---
+...
+conn:close()
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
+box.cfg{readahead = old_readahead}
+---
+...
diff --git a/test/box/request_limit.test.lua b/test/box/request_limit.test.lua
new file mode 100644
index 000000000..81298d91d
--- /dev/null
+++ b/test/box/request_limit.test.lua
@@ -0,0 +1,63 @@
+test_run = require('test_run').new()
+
+fiber = require('fiber')
+net_box = require('net.box')
+
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+conn = net_box.connect(box.cfg.listen)
+conn2 = net_box.connect(box.cfg.listen)
+active = 0
+continue = false
+limit = 768
+run_max = (limit - 100) / 2
+
+old_readahead = box.cfg.readahead
+box.cfg{readahead = 9000}
+long_str = string.rep('a', 1000)
+
+test_run:cmd("setopt delimiter ';'")
+function do_long_f(...)
+	active = active + 1
+	while not continue do
+		fiber.sleep(0.1)
+	end
+	active = active - 1
+end;
+
+function do_long(c)
+	c:call('do_long_f', {long_str})
+end;
+
+function run_workers(c)
+	continue = false
+	for i = 1, run_max do
+		fiber.create(do_long, c)
+	end
+end;
+
+-- Wait until 'active' stops growing - it means, that the input
+-- is blocked.
+function wait_block()
+	local old_val = -1
+	while old_val ~= active do
+		old_val = active
+		fiber.sleep(0.1)
+	end
+end;
+test_run:cmd("setopt delimiter ''");
+
+--
+-- Test that message count limit is reachable.
+--
+run_workers(conn)
+run_workers(conn2)
+wait_block()
+active == run_max * 2 or active
+continue = true
+while active ~= 0 do fiber.sleep(0.01) end
+
+conn2:close()
+conn:close()
+
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+box.cfg{readahead = old_readahead}
-- 
2.15.1 (Apple Git-101)




More information about the Tarantool-patches mailing list