From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id CD18321234 for ; Fri, 4 May 2018 07:56:24 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id jYO4a_S5K8cY for ; Fri, 4 May 2018 07:56:24 -0400 (EDT) Received: from smtp50.i.mail.ru (smtp50.i.mail.ru [94.100.177.110]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 641CD211EA for ; Fri, 4 May 2018 07:56:24 -0400 (EDT) Subject: [tarantool-patches] Re: [PATCH v2 1/2] iproto: fix error with unstoppable batching References: <00583e6bbf9ca53c3a44114351b9fdb9e539321d.1525381393.git.v.shpilevoy@tarantool.org> <20180504082607.GA5623@atlas> From: Vladislav Shpilevoy Message-ID: Date: Fri, 4 May 2018 14:56:21 +0300 MIME-Version: 1.0 In-Reply-To: <20180504082607.GA5623@atlas> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Language: en-US Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: Konstantin Osipov Cc: tarantool-patches@freelists.org Hello. Thanks for review! On 04/05/2018 11:26, Konstantin Osipov wrote: > * Vladislav Shpilevoy [18/05/04 00:07]: >> IProto connection stops input reading, when active request count > > Vlad, while gerund in English is a valid grammatical form it is > rarely used in colloquial speech. Please try to avoid using it > unless you know what you're doing. Sorry for nitpicking. > > Instead of trying to construct a complex sentence, simply try > to split your point in two smaller ones. English, despite being a > member of Indo-European family along with Russian, had lost > grammatical cases and respective endings long time ago. Without case > endings complex sentences are extremely hard to parse, > so they fell out of use. New commit message: iproto: fix error with unstoppable batching IProto connection stops to read input on reached request limit. But when multiple requests are in a batch, the IProto does not check the limit, so it can be violated. Lets check the limit during batch parsing after each message too, not only once before parsing. > >> is reached. But when multiple requests are in a batch, the IProto >> does not check the limit, so it can be violated. >> >> Lets check the limit during batch parsing after each message too, >> not only once before parsing. >> --- >> src/box/iproto.cc | 143 +++++++++++++++++++++++++++--------------- >> test/box/errinj.result | 69 ++++++++++++++++++++ >> test/box/errinj.test.lua | 29 +++++++++ >> test/box/net_msg_max.result | 21 ++++++- >> test/box/net_msg_max.test.lua | 13 +++- >> 5 files changed, 222 insertions(+), 53 deletions(-) >> >> diff --git a/src/box/iproto.cc b/src/box/iproto.cc >> index 9ccaf1dc7..dd7f97ecf 100644 >> --- a/src/box/iproto.cc >> +++ b/src/box/iproto.cc >> @@ -192,9 +192,13 @@ static struct mempool iproto_msg_pool; >> static struct iproto_msg * >> iproto_msg_new(struct iproto_connection *con) >> { >> + ERROR_INJECT(ERRINJ_TESTING, { return NULL; }); >> struct iproto_msg *msg = >> - (struct iproto_msg *) mempool_alloc_xc(&iproto_msg_pool); >> - msg->connection = con; >> + (struct iproto_msg *) mempool_alloc(&iproto_msg_pool); >> + if (msg != NULL) >> + msg->connection = con; >> + else >> + diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg"); > > Nitpick: usually handling the error first and dealing with the > rest later makes it easier to understand what's going on. > > This idiom would also make the code easier to parse in future, when more > stuff is added to the "else" branch: > > if (msg == NULL) { > diag_set() > return NULL; > } > > msg->connection = con; > return msg; Done. struct iproto_msg *msg = (struct iproto_msg *) mempool_alloc(&iproto_msg_pool); - if (msg != NULL) - msg->connection = con; - else + if (msg == NULL) { diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg"); + return NULL; + } + msg->connection = con; return msg; } > > >> } >> >> -/** Enqueue all requests which were read up. */ >> -static inline void >> +/** >> + * Enqueue all requests which were read up. If a requests limit is > > request limit; request is an adjective here - when a noun is used > as an adjective, its singular form is used. Done. - * Enqueue all requests which were read up. If a requests limit is + * Enqueue all requests which were read up. If a request limit is >> @@ -644,9 +643,49 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) >> * requests, keep reading input, if only to avoid >> * a deadlock on this connection. >> */ >> - ev_feed_event(con->loop, &con->input, EV_READ); >> + if (iproto_check_msg_max()) >> + iproto_connection_stop(con); >> + else >> + ev_feed_event(con->loop, &con->input, EV_READ); > > Looks like a double check for msg max in the same loop. Could you > please try to avoid that? This check is out of loop. After it is finished, I check if it happened due to the limit. > >> } >> cpipe_flush_input(&tx_pipe); >> + return 0; >> +} >> + >> +/** >> + * Enqueue connection's pending requests. Completely ressurect the > > Please turn on spell checking in your editor. Done. - * Enqueue connection's pending requests. Completely ressurect the + * Enqueue connection's pending requests. Completely resurrect the >> +/** >> + * Throttle the queue to the tx thread and ensure the fiber pool >> + * in tx thread is not depleted by a flood of incoming requests: >> + * resume a stopped connection only if there is a spare message >> + * object in the message pool. >> + */ >> +static void >> +iproto_resume() >> +{ >> + while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) { >> + struct iproto_connection *con = >> + rlist_first_entry(&stopped_connections, >> + struct iproto_connection, >> + in_stop_list); >> + iproto_connection_resume(con); >> + } > > By the look of it, resume() resumes all stopped connections. > > This is not the case, however, as long at iproto_connection_stop() is made > *after* reading input, so most likely with a request already > sitting in the input buffer. The tricky relationship between > iproto_resume() and the timing of calling iproto_connection_stop() > requires an explanation. The comment needs a brush-up as well. Done. @@ -662,6 +663,10 @@ iproto_connection_resume(struct iproto_connection *con) { assert(! iproto_check_msg_max()); rlist_del(&con->in_stop_list); + /* + * Enqueue_batch() stops the connection again, if the + * limit is reached again. + */ if (iproto_enqueue_batch(con, con->p_ibuf) != 0) { @@ -671,10 +676,14 @@ iproto_connection_resume(struct iproto_connection *con) } /** - * Throttle the queue to the tx thread and ensure the fiber pool - * in tx thread is not depleted by a flood of incoming requests: - * resume a stopped connection only if there is a spare message - * object in the message pool. + * Resume as many connections as possible until a request limit is + * reached. Each connection before to start accept new requests + * enqueues pending ones. And the input is resumed only if the + * limit still is not reached. + * + * This global connections resuming is needed when one connection + * finished a request, and another connection can get the freed + * message. */ static void iproto_resume() > >> static void >> @@ -657,16 +696,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, >> (struct iproto_connection *) watcher->data; >> int fd = con->input.fd; >> assert(fd >= 0); >> - if (! rlist_empty(&con->in_stop_list)) { >> - /* Resumed stopped connection. */ >> - rlist_del(&con->in_stop_list); >> - /* >> - * This connection may have no input, so >> - * resume one more connection which might have >> - * input. >> - */ >> - iproto_resume(); >> - } >> + assert(rlist_empty(&con->in_stop_list)); > > Nice. 👍👍 >> diff --git a/test/box/errinj.result b/test/box/errinj.result >> index 5b4bc23a3..5da3e2642 100644 >> --- a/test/box/errinj.result >> +++ b/test/box/errinj.result >> @@ -1150,6 +1150,75 @@ cn:close() >> s:drop() >> --- >> ... >> +-- >> +-- In messages memory pool is over, stop the connection, until the >> +-- pool has free memory. >> +-- >> +started = 0 >> +--- >> +... >> +finished = 0 >> +--- >> +... >> +continue = false >> +--- >> +... >> +test_run:cmd('setopt delimiter ";"') >> +--- >> +- true >> +... >> +function long_poll_f() >> + started = started + 1 >> + f = fiber.self() >> + while not continue do fiber.sleep(0.01) end >> + finished = finished + 1 >> +end; >> +--- >> +... >> +test_run:cmd('setopt delimiter ""'); >> +--- >> +- true >> +... >> +cn = net_box.connect(box.cfg.listen) >> +--- >> +... >> +function long_poll() cn:call('long_poll_f') end >> +--- >> +... >> +_ = fiber.create(long_poll) >> +--- >> +... >> +while started ~= 1 do fiber.sleep(0.01) end >> +--- >> +... >> +-- Simulate OOM for new requests. >> +errinj.set("ERRINJ_TESTING", true) >> +--- >> +- ok >> +... >> +_ = fiber.create(long_poll) >> +--- >> +... >> +fiber.sleep(0.1) > > Please avoid 0.1 sleeps, entirely. If you're waiting for anything, > please wait in a loop with wait timeout set 0.01 seconds. >> +--- >> +... >> +started == 1 >> +--- >> +- true >> +... >> +continue = true >> +--- >> +... >> +errinj.set("ERRINJ_TESTING", false) >> +--- >> +- ok >> +... >> +while finished ~= 2 do fiber.sleep(0.01) end > > I don't understand how exactly you're testing that the connection > is stopped when the pool is exhausted. > > It seems you simply wait for your long-poll fiber to block, then > resume it, then wait for it to finish. Yes, you are right. I refactored the patch to make it more clear. diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 1e4a88ecc..baa6bb660 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -190,17 +190,7 @@ struct iproto_msg static struct mempool iproto_msg_pool; static struct iproto_msg * -iproto_msg_new(struct iproto_connection *con) -{ - ERROR_INJECT(ERRINJ_TESTING, { return NULL; }); - struct iproto_msg *msg = - (struct iproto_msg *) mempool_alloc(&iproto_msg_pool); - if (msg != NULL) - msg->connection = con; - else - diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg"); - return msg; -} +iproto_msg_new(struct iproto_connection *con); /** * Resume stopped connections, if any. @@ -388,6 +378,25 @@ iproto_check_msg_max() return request_count > IPROTO_MSG_MAX; } +static struct iproto_msg * +iproto_msg_new(struct iproto_connection *con) +{ + struct iproto_msg *msg = + (struct iproto_msg *) mempool_alloc(&iproto_msg_pool); + ERROR_INJECT(ERRINJ_TESTING, { + mempool_free(&iproto_msg_pool, msg); + msg = NULL; + }); + if (msg == NULL) { + diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg"); + say_warn("can not allocate new net_msg on connection %s", + sio_socketname(con->input.fd)); + return NULL; + } + msg->connection = con; + return msg; +} + /** * A connection is idle when the client is gone * and there are no outstanding msgs in the msg queue. @@ -414,13 +423,22 @@ iproto_connection_is_idle(struct iproto_connection *con) static inline void iproto_connection_stop(struct iproto_connection *con) { - say_warn("net_msg_max limit reached, stopping input on connection %s", + say_warn("stopping input on connection %s", sio_socketname(con->input.fd)); assert(rlist_empty(&con->in_stop_list)); ev_io_stop(con->loop, &con->input); rlist_add_tail(&stopped_connections, &con->in_stop_list); } +static inline void +iproto_connection_stop_by_limit(struct iproto_connection *con) +{ + say_warn("net_msg_max limit reached on connection %s", + sio_socketname(con->input.fd)); + assert(iproto_check_msg_max()); + iproto_connection_stop(con); +} + /** * Initiate a connection shutdown. This method may * be invoked many times, and does the internal @@ -644,7 +662,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) * a deadlock on this connection. */ if (iproto_check_msg_max()) - iproto_connection_stop(con); + iproto_connection_stop_by_limit(con); else ev_feed_event(con->loop, &con->input, EV_READ); } @@ -711,7 +729,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, * thread and deadlock. */ if (iproto_check_msg_max()) { - iproto_connection_stop(con); + iproto_connection_stop_by_limit(con); return; } diff --git a/test/box/errinj.test.lua b/test/box/errinj.test.lua index 365d535c2..8f284f6ab 100644 --- a/test/box/errinj.test.lua +++ b/test/box/errinj.test.lua @@ -411,11 +411,21 @@ _ = fiber.create(long_poll) while started ~= 1 do fiber.sleep(0.01) end -- Simulate OOM for new requests. errinj.set("ERRINJ_TESTING", true) +-- This request tries to allocate a memory for request data, and +-- fails with this. This stops the connection until an existing +-- request is finished. +log = require('log') +-- Fill log with garbage to avoid accidentaly reading previous +-- test cases result. +log.info(string.rep('a', 1000)) _ = fiber.create(long_poll) -fiber.sleep(0.1) +while not test_run:grep_log('default', 'can not allocate new net_msg on connection', 1000) do fiber.sleep(0.01) end +test_run:grep_log('default', 'stopping input on connection', 1000) ~= nil started == 1 continue = true errinj.set("ERRINJ_TESTING", false) +-- Ensure, that when memory is available again, the pending +-- request is executed. while finished ~= 2 do fiber.sleep(0.01) end cn:close()