From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v2 2/4] iproto: fix error with unstoppable batching Date: Mon, 23 Apr 2018 20:05:02 +0300 Message-Id: <23176a3824d858233ee557528659694d60c0de17.1524502856.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com List-ID: IProto connection stops input reading, when active request count is reached. But when multiple requests are in a batch, the IProto does not check the limit, so it can be violated. Lets check the limit during batch parsing after each message too, not only once before parsing. --- src/box/iproto.cc | 29 +++++++++++++++++++---------- test/box/request_limit.result | 20 ++++++++++++++++++++ test/box/request_limit.test.lua | 10 ++++++++++ 3 files changed, 49 insertions(+), 10 deletions(-) diff --git a/src/box/iproto.cc b/src/box/iproto.cc index a88226a9f..be3c5a1a6 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -582,7 +582,8 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) { int n_requests = 0; bool stop_input = false; - while (con->parse_size && stop_input == false) { + while (con->parse_size != 0 && stop_input == false && + !iproto_must_stop_input()) { const char *reqstart = in->wpos - con->parse_size; const char *pos = reqstart; /* Read request length. */ @@ -691,18 +692,26 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, int nrd = sio_read(fd, in->wpos, ibuf_unused(in)); if (nrd < 0) { /* Socket is not ready. */ ev_io_start(loop, &con->input); - return; - } - if (nrd == 0) { /* EOF */ + /* + * Socket has no data, but there can be + * non-parsed requests, stopped by + * requests limit. Try to enqueue them, if + * exist. + */ + if (con->parse_size == 0) + return; + } else if (nrd == 0) { + /* EOF */ iproto_connection_close(con); return; - } - /* Count statistics */ - rmean_collect(rmean_net, IPROTO_RECEIVED, nrd); + } else { + /* Count statistics */ + rmean_collect(rmean_net, IPROTO_RECEIVED, nrd); - /* Update the read position and connection state. */ - in->wpos += nrd; - con->parse_size += nrd; + /* Update the read position and connection state. */ + in->wpos += nrd; + con->parse_size += nrd; + } /* Enqueue all requests which are fully read up. */ iproto_enqueue_batch(con, in); } catch (Exception *e) { diff --git a/test/box/request_limit.result b/test/box/request_limit.result index bef998b91..2691aa329 100644 --- a/test/box/request_limit.result +++ b/test/box/request_limit.result @@ -108,6 +108,26 @@ active == run_max * 2 or active wait_finished(active) --- ... +-- +-- Test that each message in a batch is checked. When a limit is +-- reached, other messages must be processed later. +-- +run_max = limit * 5 +--- +... +run_workers(conn) +--- +... +wait_block() +--- +... +active +--- +- 769 +... +wait_finished(run_max) +--- +... conn2:close() --- ... diff --git a/test/box/request_limit.test.lua b/test/box/request_limit.test.lua index 2bc35d8fa..bff7b5282 100644 --- a/test/box/request_limit.test.lua +++ b/test/box/request_limit.test.lua @@ -63,6 +63,16 @@ wait_block() active == run_max * 2 or active wait_finished(active) +-- +-- Test that each message in a batch is checked. When a limit is +-- reached, other messages must be processed later. +-- +run_max = limit * 5 +run_workers(conn) +wait_block() +active +wait_finished(run_max) + conn2:close() conn:close() -- 2.15.1 (Apple Git-101)