From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 75CCD22509 for ; Thu, 3 May 2018 17:05:23 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 55IfVcJaDiSI for ; Thu, 3 May 2018 17:05:23 -0400 (EDT) Received: from smtp43.i.mail.ru (smtp43.i.mail.ru [94.100.177.103]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id BBD83217A9 for ; Thu, 3 May 2018 17:05:22 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH v2 1/2] iproto: fix error with unstoppable batching Date: Fri, 4 May 2018 00:05:19 +0300 Message-Id: <00583e6bbf9ca53c3a44114351b9fdb9e539321d.1525381393.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org IProto connection stops input reading, when active request count is reached. But when multiple requests are in a batch, the IProto does not check the limit, so it can be violated. Lets check the limit during batch parsing after each message too, not only once before parsing. --- src/box/iproto.cc | 143 +++++++++++++++++++++++++++--------------- test/box/errinj.result | 69 ++++++++++++++++++++ test/box/errinj.test.lua | 29 +++++++++ test/box/net_msg_max.result | 21 ++++++- test/box/net_msg_max.test.lua | 13 +++- 5 files changed, 222 insertions(+), 53 deletions(-) diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 9ccaf1dc7..dd7f97ecf 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -192,9 +192,13 @@ static struct mempool iproto_msg_pool; static struct iproto_msg * iproto_msg_new(struct iproto_connection *con) { + ERROR_INJECT(ERRINJ_TESTING, { return NULL; }); struct iproto_msg *msg = - (struct iproto_msg *) mempool_alloc_xc(&iproto_msg_pool); - msg->connection = con; + (struct iproto_msg *) mempool_alloc(&iproto_msg_pool); + if (msg != NULL) + msg->connection = con; + else + diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg"); return msg; } @@ -384,30 +388,6 @@ iproto_check_msg_max() return request_count > IPROTO_MSG_MAX; } -/** - * Throttle the queue to the tx thread and ensure the fiber pool - * in tx thread is not depleted by a flood of incoming requests: - * resume a stopped connection only if there is a spare message - * object in the message pool. - */ -static void -iproto_resume() -{ - /* - * Most of the time we have nothing to do here: throttling - * is not active. - */ - if (rlist_empty(&stopped_connections)) - return; - if (iproto_check_msg_max()) - return; - - struct iproto_connection *con; - con = rlist_first_entry(&stopped_connections, struct iproto_connection, - in_stop_list); - ev_feed_event(con->loop, &con->input, EV_READ); -} - /** * A connection is idle when the client is gone * and there are no outstanding msgs in the msg queue. @@ -575,20 +555,31 @@ iproto_connection_input_buffer(struct iproto_connection *con) return new_ibuf; } -/** Enqueue all requests which were read up. */ -static inline void +/** + * Enqueue all requests which were read up. If a requests limit is + * reached - stop the connection input even if not the whole batch + * is enqueued. Else try to read more feeding read event to the + * event loop. + * @param con Connection to enqueue in. + * @param in Buffer to parse. + * + * @retval 0 Success. + * @retval -1 Invalid MessagePack error. + */ +static inline int iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) { int n_requests = 0; bool stop_input = false; - while (con->parse_size && stop_input == false) { + while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) { const char *reqstart = in->wpos - con->parse_size; const char *pos = reqstart; /* Read request length. */ if (mp_typeof(*pos) != MP_UINT) { cpipe_flush_input(&tx_pipe); - tnt_raise(ClientError, ER_INVALID_MSGPACK, - "packet length"); + diag_set(ClientError, ER_INVALID_MSGPACK, + "packet length"); + return -1; } if (mp_check_uint(pos, in->wpos) >= 0) break; @@ -597,6 +588,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) if (reqend > in->wpos) break; struct iproto_msg *msg = iproto_msg_new(con); + if (msg == NULL) { + /* + * Do not tread it as an error - just wait + * until some of requests are finished. + */ + iproto_connection_stop(con); + return 0; + } msg->p_ibuf = con->p_ibuf; msg->wpos = con->wpos; @@ -644,9 +643,49 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) * requests, keep reading input, if only to avoid * a deadlock on this connection. */ - ev_feed_event(con->loop, &con->input, EV_READ); + if (iproto_check_msg_max()) + iproto_connection_stop(con); + else + ev_feed_event(con->loop, &con->input, EV_READ); } cpipe_flush_input(&tx_pipe); + return 0; +} + +/** + * Enqueue connection's pending requests. Completely ressurect the + * connection, if it has no more requests, and the limit still is + * not reached. + */ +static void +iproto_connection_resume(struct iproto_connection *con) +{ + assert(! iproto_check_msg_max()); + rlist_del(&con->in_stop_list); + if (iproto_enqueue_batch(con, con->p_ibuf) != 0) { + struct error *e = box_error_last(); + iproto_write_error(con->input.fd, e, ::schema_version, 0); + error_log(e); + iproto_connection_close(con); + } +} + +/** + * Throttle the queue to the tx thread and ensure the fiber pool + * in tx thread is not depleted by a flood of incoming requests: + * resume a stopped connection only if there is a spare message + * object in the message pool. + */ +static void +iproto_resume() +{ + while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) { + struct iproto_connection *con = + rlist_first_entry(&stopped_connections, + struct iproto_connection, + in_stop_list); + iproto_connection_resume(con); + } } static void @@ -657,16 +696,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, (struct iproto_connection *) watcher->data; int fd = con->input.fd; assert(fd >= 0); - if (! rlist_empty(&con->in_stop_list)) { - /* Resumed stopped connection. */ - rlist_del(&con->in_stop_list); - /* - * This connection may have no input, so - * resume one more connection which might have - * input. - */ - iproto_resume(); - } + assert(rlist_empty(&con->in_stop_list)); /* * Throttle if there are too many pending requests, * otherwise we might deplete the fiber pool in tx @@ -703,7 +733,8 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, in->wpos += nrd; con->parse_size += nrd; /* Enqueue all requests which are fully read up. */ - iproto_enqueue_batch(con, in); + if (iproto_enqueue_batch(con, in) != 0) + diag_raise(); } catch (Exception *e) { /* Best effort at sending the error message to the client. */ iproto_write_error(fd, e, ::schema_version, 0); @@ -801,7 +832,11 @@ static struct iproto_connection * iproto_connection_new(int fd) { struct iproto_connection *con = (struct iproto_connection *) - mempool_alloc_xc(&iproto_connection_pool); + mempool_alloc(&iproto_connection_pool); + if (con == NULL) { + diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con"); + return NULL; + } con->input.data = con->output.data = con; con->loop = loop(); ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ); @@ -1471,7 +1506,8 @@ net_end_join(struct cmsg *m) * Enqueue any messages if they are in the readahead * queue. Will simply start input otherwise. */ - iproto_enqueue_batch(con, msg->p_ibuf); + if (iproto_enqueue_batch(con, msg->p_ibuf) != 0) + iproto_connection_close(con); } static void @@ -1574,20 +1610,29 @@ iproto_on_accept(struct evio_service * /* service */, int fd, { (void) addr; (void) addrlen; - struct iproto_connection *con; - - con = iproto_connection_new(fd); + struct iproto_msg *msg; + struct iproto_connection *con = iproto_connection_new(fd); + if (con == NULL) + goto error_conn; /* * Ignore msg allocation failure - the queue size is * fixed so there is a limited number of msgs in * use, all stored in just a few blocks of the memory pool. */ - struct iproto_msg *msg = iproto_msg_new(con); + msg = iproto_msg_new(con); + if (msg == NULL) + goto error_msg; cmsg_init(&msg->base, connect_route); msg->p_ibuf = con->p_ibuf; msg->wpos = con->wpos; msg->close_connection = false; cpipe_push(&tx_pipe, &msg->base); + return; +error_msg: + mempool_free(&iproto_connection_pool, con); +error_conn: + close(fd); + return; } static struct evio_service binary; /* iproto binary listener */ diff --git a/test/box/errinj.result b/test/box/errinj.result index 5b4bc23a3..5da3e2642 100644 --- a/test/box/errinj.result +++ b/test/box/errinj.result @@ -1150,6 +1150,75 @@ cn:close() s:drop() --- ... +-- +-- In messages memory pool is over, stop the connection, until the +-- pool has free memory. +-- +started = 0 +--- +... +finished = 0 +--- +... +continue = false +--- +... +test_run:cmd('setopt delimiter ";"') +--- +- true +... +function long_poll_f() + started = started + 1 + f = fiber.self() + while not continue do fiber.sleep(0.01) end + finished = finished + 1 +end; +--- +... +test_run:cmd('setopt delimiter ""'); +--- +- true +... +cn = net_box.connect(box.cfg.listen) +--- +... +function long_poll() cn:call('long_poll_f') end +--- +... +_ = fiber.create(long_poll) +--- +... +while started ~= 1 do fiber.sleep(0.01) end +--- +... +-- Simulate OOM for new requests. +errinj.set("ERRINJ_TESTING", true) +--- +- ok +... +_ = fiber.create(long_poll) +--- +... +fiber.sleep(0.1) +--- +... +started == 1 +--- +- true +... +continue = true +--- +... +errinj.set("ERRINJ_TESTING", false) +--- +- ok +... +while finished ~= 2 do fiber.sleep(0.01) end +--- +... +cn:close() +--- +... box.schema.user.revoke('guest', 'read,write,execute','universe') --- ... diff --git a/test/box/errinj.test.lua b/test/box/errinj.test.lua index e1460d1b6..365d535c2 100644 --- a/test/box/errinj.test.lua +++ b/test/box/errinj.test.lua @@ -390,6 +390,35 @@ ok, err cn:close() s:drop() +-- +-- In messages memory pool is over, stop the connection, until the +-- pool has free memory. +-- +started = 0 +finished = 0 +continue = false +test_run:cmd('setopt delimiter ";"') +function long_poll_f() + started = started + 1 + f = fiber.self() + while not continue do fiber.sleep(0.01) end + finished = finished + 1 +end; +test_run:cmd('setopt delimiter ""'); +cn = net_box.connect(box.cfg.listen) +function long_poll() cn:call('long_poll_f') end +_ = fiber.create(long_poll) +while started ~= 1 do fiber.sleep(0.01) end +-- Simulate OOM for new requests. +errinj.set("ERRINJ_TESTING", true) +_ = fiber.create(long_poll) +fiber.sleep(0.1) +started == 1 +continue = true +errinj.set("ERRINJ_TESTING", false) +while finished ~= 2 do fiber.sleep(0.01) end +cn:close() + box.schema.user.revoke('guest', 'read,write,execute','universe') -- diff --git a/test/box/net_msg_max.result b/test/box/net_msg_max.result index dde2016b7..de22bcbb9 100644 --- a/test/box/net_msg_max.result +++ b/test/box/net_msg_max.result @@ -70,10 +70,13 @@ end; ... -- Wait until 'active' stops growing - it means, that the input -- is blocked. +-- No more messages. function wait_active(value) while value ~= active do fiber.sleep(0.01) end + fiber.sleep(0.01) + assert(value == active) end; --- ... @@ -99,11 +102,23 @@ run_workers(conn2) wait_active(run_max * 2) --- ... -active == run_max * 2 or active +wait_finished(active) --- -- true ... -wait_finished(active) +-- +-- Test that each message in a batch is checked. When a limit is +-- reached, other messages must be processed later. +-- +run_max = limit * 5 +--- +... +run_workers(conn) +--- +... +wait_active(limit + 1) +--- +... +wait_finished(run_max) --- ... conn2:close() diff --git a/test/box/net_msg_max.test.lua b/test/box/net_msg_max.test.lua index 560e37017..39f8f53f7 100644 --- a/test/box/net_msg_max.test.lua +++ b/test/box/net_msg_max.test.lua @@ -44,6 +44,9 @@ function wait_active(value) while value ~= active do fiber.sleep(0.01) end + fiber.sleep(0.01) +-- No more messages. + assert(value == active) end; function wait_finished(needed) @@ -58,9 +61,17 @@ test_run:cmd("setopt delimiter ''"); run_workers(conn) run_workers(conn2) wait_active(run_max * 2) -active == run_max * 2 or active wait_finished(active) +-- +-- Test that each message in a batch is checked. When a limit is +-- reached, other messages must be processed later. +-- +run_max = limit * 5 +run_workers(conn) +wait_active(limit + 1) +wait_finished(run_max) + conn2:close() conn:close() -- 2.15.1 (Apple Git-101)