From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 53E4B218CC for ; Fri, 4 May 2018 04:26:11 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id wRsO2uMknJiZ for ; Fri, 4 May 2018 04:26:11 -0400 (EDT) Received: from smtp31.i.mail.ru (smtp31.i.mail.ru [94.100.177.91]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id C52F021B5C for ; Fri, 4 May 2018 04:26:10 -0400 (EDT) Date: Fri, 4 May 2018 11:26:07 +0300 From: Konstantin Osipov Subject: [tarantool-patches] Re: [PATCH v2 1/2] iproto: fix error with unstoppable batching Message-ID: <20180504082607.GA5623@atlas> References: <00583e6bbf9ca53c3a44114351b9fdb9e539321d.1525381393.git.v.shpilevoy@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <00583e6bbf9ca53c3a44114351b9fdb9e539321d.1525381393.git.v.shpilevoy@tarantool.org> Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: Vladislav Shpilevoy Cc: tarantool-patches@freelists.org * Vladislav Shpilevoy [18/05/04 00:07]: > IProto connection stops input reading, when active request count Vlad, while gerund in English is a valid grammatical form it is rarely used in colloquial speech. Please try to avoid using it unless you know what you're doing. Sorry for nitpicking. Instead of trying to construct a complex sentence, simply try to split your point in two smaller ones. English, despite being a member of Indo-European family along with Russian, had lost grammatical cases and respective endings long time ago. Without case endings complex sentences are extremely hard to parse, so they fell out of use. > is reached. But when multiple requests are in a batch, the IProto > does not check the limit, so it can be violated. > > Lets check the limit during batch parsing after each message too, > not only once before parsing. > --- > src/box/iproto.cc | 143 +++++++++++++++++++++++++++--------------- > test/box/errinj.result | 69 ++++++++++++++++++++ > test/box/errinj.test.lua | 29 +++++++++ > test/box/net_msg_max.result | 21 ++++++- > test/box/net_msg_max.test.lua | 13 +++- > 5 files changed, 222 insertions(+), 53 deletions(-) > > diff --git a/src/box/iproto.cc b/src/box/iproto.cc > index 9ccaf1dc7..dd7f97ecf 100644 > --- a/src/box/iproto.cc > +++ b/src/box/iproto.cc > @@ -192,9 +192,13 @@ static struct mempool iproto_msg_pool; > static struct iproto_msg * > iproto_msg_new(struct iproto_connection *con) > { > + ERROR_INJECT(ERRINJ_TESTING, { return NULL; }); > struct iproto_msg *msg = > - (struct iproto_msg *) mempool_alloc_xc(&iproto_msg_pool); > - msg->connection = con; > + (struct iproto_msg *) mempool_alloc(&iproto_msg_pool); > + if (msg != NULL) > + msg->connection = con; > + else > + diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg"); Nitpick: usually handling the error first and dealing with the rest later makes it easier to understand what's going on. This idiom would also make the code easier to parse in future, when more stuff is added to the "else" branch: if (msg == NULL) { diag_set() return NULL; } msg->connection = con; return msg; > } > > -/** Enqueue all requests which were read up. */ > -static inline void > +/** > + * Enqueue all requests which were read up. If a requests limit is request limit; request is an adjective here - when a noun is used as an adjective, its singular form is used. > + * reached - stop the connection input even if not the whole batch > + * is enqueued. Else try to read more feeding read event to the > + * event loop. > + * @param con Connection to enqueue in. > + * @param in Buffer to parse. > + * > + * @retval 0 Success. > + * @retval -1 Invalid MessagePack error. > + */ > +static inline int > iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) > { > int n_requests = 0; > bool stop_input = false; > - while (con->parse_size && stop_input == false) { > + while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) { > const char *reqstart = in->wpos - con->parse_size; > const char *pos = reqstart; > /* Read request length. */ > if (mp_typeof(*pos) != MP_UINT) { > cpipe_flush_input(&tx_pipe); > - tnt_raise(ClientError, ER_INVALID_MSGPACK, > - "packet length"); > + diag_set(ClientError, ER_INVALID_MSGPACK, > + "packet length"); > + return -1; > } > if (mp_check_uint(pos, in->wpos) >= 0) > break; > @@ -597,6 +588,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) > if (reqend > in->wpos) > break; > struct iproto_msg *msg = iproto_msg_new(con); > + if (msg == NULL) { > + /* > + * Do not tread it as an error - just wait > + * until some of requests are finished. > + */ > + iproto_connection_stop(con); > + return 0; > + } > msg->p_ibuf = con->p_ibuf; > msg->wpos = con->wpos; > > @@ -644,9 +643,49 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) > * requests, keep reading input, if only to avoid > * a deadlock on this connection. > */ > - ev_feed_event(con->loop, &con->input, EV_READ); > + if (iproto_check_msg_max()) > + iproto_connection_stop(con); > + else > + ev_feed_event(con->loop, &con->input, EV_READ); Looks like a double check for msg max in the same loop. Could you please try to avoid that? > } > cpipe_flush_input(&tx_pipe); > + return 0; > +} > + > +/** > + * Enqueue connection's pending requests. Completely ressurect the Please turn on spell checking in your editor. > + * connection, if it has no more requests, and the limit still is > + * not reached. > + */ > +static void > +iproto_connection_resume(struct iproto_connection *con) > +{ > + assert(! iproto_check_msg_max()); > + rlist_del(&con->in_stop_list); > + if (iproto_enqueue_batch(con, con->p_ibuf) != 0) { > + struct error *e = box_error_last(); > + iproto_write_error(con->input.fd, e, ::schema_version, 0); > + error_log(e); > + iproto_connection_close(con); > + } > +} > + > +/** > + * Throttle the queue to the tx thread and ensure the fiber pool > + * in tx thread is not depleted by a flood of incoming requests: > + * resume a stopped connection only if there is a spare message > + * object in the message pool. > + */ > +static void > +iproto_resume() > +{ > + while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) { > + struct iproto_connection *con = > + rlist_first_entry(&stopped_connections, > + struct iproto_connection, > + in_stop_list); > + iproto_connection_resume(con); > + } By the look of it, resume() resumes all stopped connections. This is not the case, however, as long at iproto_connection_stop() is made *after* reading input, so most likely with a request already sitting in the input buffer. The tricky relationship between iproto_resume() and the timing of calling iproto_connection_stop() requires an explanation. The comment needs a brush-up as well. > static void > @@ -657,16 +696,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, > (struct iproto_connection *) watcher->data; > int fd = con->input.fd; > assert(fd >= 0); > - if (! rlist_empty(&con->in_stop_list)) { > - /* Resumed stopped connection. */ > - rlist_del(&con->in_stop_list); > - /* > - * This connection may have no input, so > - * resume one more connection which might have > - * input. > - */ > - iproto_resume(); > - } > + assert(rlist_empty(&con->in_stop_list)); Nice. > /* > * Throttle if there are too many pending requests, > * otherwise we might deplete the fiber pool in tx > @@ -703,7 +733,8 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, > in->wpos += nrd; > con->parse_size += nrd; > /* Enqueue all requests which are fully read up. */ > - iproto_enqueue_batch(con, in); > + if (iproto_enqueue_batch(con, in) != 0) > + diag_raise(); > } catch (Exception *e) { > /* Best effort at sending the error message to the client. */ > iproto_write_error(fd, e, ::schema_version, 0); > @@ -801,7 +832,11 @@ static struct iproto_connection * > iproto_connection_new(int fd) > { > struct iproto_connection *con = (struct iproto_connection *) > - mempool_alloc_xc(&iproto_connection_pool); > + mempool_alloc(&iproto_connection_pool); > + if (con == NULL) { > + diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con"); > + return NULL; > + } > con->input.data = con->output.data = con; > con->loop = loop(); > ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ); > @@ -1471,7 +1506,8 @@ net_end_join(struct cmsg *m) > * Enqueue any messages if they are in the readahead > * queue. Will simply start input otherwise. > */ > - iproto_enqueue_batch(con, msg->p_ibuf); > + if (iproto_enqueue_batch(con, msg->p_ibuf) != 0) > + iproto_connection_close(con); > } > > static void > @@ -1574,20 +1610,29 @@ iproto_on_accept(struct evio_service * /* service */, int fd, > { > (void) addr; > (void) addrlen; > - struct iproto_connection *con; > - > - con = iproto_connection_new(fd); > + struct iproto_msg *msg; > + struct iproto_connection *con = iproto_connection_new(fd); > + if (con == NULL) > + goto error_conn; > /* > * Ignore msg allocation failure - the queue size is > * fixed so there is a limited number of msgs in > * use, all stored in just a few blocks of the memory pool. > */ > - struct iproto_msg *msg = iproto_msg_new(con); > + msg = iproto_msg_new(con); > + if (msg == NULL) > + goto error_msg; > cmsg_init(&msg->base, connect_route); > msg->p_ibuf = con->p_ibuf; > msg->wpos = con->wpos; > msg->close_connection = false; > cpipe_push(&tx_pipe, &msg->base); > + return; > +error_msg: > + mempool_free(&iproto_connection_pool, con); > +error_conn: > + close(fd); > + return; > } > > static struct evio_service binary; /* iproto binary listener */ > diff --git a/test/box/errinj.result b/test/box/errinj.result > index 5b4bc23a3..5da3e2642 100644 > --- a/test/box/errinj.result > +++ b/test/box/errinj.result > @@ -1150,6 +1150,75 @@ cn:close() > s:drop() > --- > ... > +-- > +-- In messages memory pool is over, stop the connection, until the > +-- pool has free memory. > +-- > +started = 0 > +--- > +... > +finished = 0 > +--- > +... > +continue = false > +--- > +... > +test_run:cmd('setopt delimiter ";"') > +--- > +- true > +... > +function long_poll_f() > + started = started + 1 > + f = fiber.self() > + while not continue do fiber.sleep(0.01) end > + finished = finished + 1 > +end; > +--- > +... > +test_run:cmd('setopt delimiter ""'); > +--- > +- true > +... > +cn = net_box.connect(box.cfg.listen) > +--- > +... > +function long_poll() cn:call('long_poll_f') end > +--- > +... > +_ = fiber.create(long_poll) > +--- > +... > +while started ~= 1 do fiber.sleep(0.01) end > +--- > +... > +-- Simulate OOM for new requests. > +errinj.set("ERRINJ_TESTING", true) > +--- > +- ok > +... > +_ = fiber.create(long_poll) > +--- > +... > +fiber.sleep(0.1) Please avoid 0.1 sleeps, entirely. If you're waiting for anything, please wait in a loop with wait timeout set 0.01 seconds. > +--- > +... > +started == 1 > +--- > +- true > +... > +continue = true > +--- > +... > +errinj.set("ERRINJ_TESTING", false) > +--- > +- ok > +... > +while finished ~= 2 do fiber.sleep(0.01) end I don't understand how exactly you're testing that the connection is stopped when the pool is exhausted. It seems you simply wait for your long-poll fiber to block, then resume it, then wait for it to finish. > +--- > +... > +cn:close() > +--- > +... > box.schema.user.revoke('guest', 'read,write,execute','universe') > --- > ... > diff --git a/test/box/errinj.test.lua b/test/box/errinj.test.lua > index e1460d1b6..365d535c2 100644 > --- a/test/box/errinj.test.lua > +++ b/test/box/errinj.test.lua > @@ -390,6 +390,35 @@ ok, err > cn:close() > s:drop() > > +-- > +-- In messages memory pool is over, stop the connection, until the > +-- pool has free memory. > +-- > +started = 0 > +finished = 0 > +continue = false > +test_run:cmd('setopt delimiter ";"') > +function long_poll_f() > + started = started + 1 > + f = fiber.self() > + while not continue do fiber.sleep(0.01) end > + finished = finished + 1 > +end; > +test_run:cmd('setopt delimiter ""'); > +cn = net_box.connect(box.cfg.listen) > +function long_poll() cn:call('long_poll_f') end > +_ = fiber.create(long_poll) > +while started ~= 1 do fiber.sleep(0.01) end > +-- Simulate OOM for new requests. > +errinj.set("ERRINJ_TESTING", true) > +_ = fiber.create(long_poll) > +fiber.sleep(0.1) > +started == 1 > +continue = true > +errinj.set("ERRINJ_TESTING", false) > +while finished ~= 2 do fiber.sleep(0.01) end > +cn:close() > + > box.schema.user.revoke('guest', 'read,write,execute','universe') > > -- > diff --git a/test/box/net_msg_max.result b/test/box/net_msg_max.result > index dde2016b7..de22bcbb9 100644 > --- a/test/box/net_msg_max.result > +++ b/test/box/net_msg_max.result > @@ -70,10 +70,13 @@ end; > ... > -- Wait until 'active' stops growing - it means, that the input > -- is blocked. > +-- No more messages. > function wait_active(value) > while value ~= active do > fiber.sleep(0.01) > end > + fiber.sleep(0.01) > + assert(value == active) > end; > --- > ... > @@ -99,11 +102,23 @@ run_workers(conn2) > wait_active(run_max * 2) > --- > ... > -active == run_max * 2 or active > +wait_finished(active) > --- > -- true > ... > -wait_finished(active) > +-- > +-- Test that each message in a batch is checked. When a limit is > +-- reached, other messages must be processed later. > +-- > +run_max = limit * 5 > +--- > +... > +run_workers(conn) > +--- > +... > +wait_active(limit + 1) > +--- > +... > +wait_finished(run_max) > --- > ... > conn2:close() > diff --git a/test/box/net_msg_max.test.lua b/test/box/net_msg_max.test.lua > index 560e37017..39f8f53f7 100644 > --- a/test/box/net_msg_max.test.lua > +++ b/test/box/net_msg_max.test.lua > @@ -44,6 +44,9 @@ function wait_active(value) > while value ~= active do > fiber.sleep(0.01) > end > + fiber.sleep(0.01) > +-- No more messages. > + assert(value == active) > end; > > function wait_finished(needed) > @@ -58,9 +61,17 @@ test_run:cmd("setopt delimiter ''"); > run_workers(conn) > run_workers(conn2) > wait_active(run_max * 2) > -active == run_max * 2 or active > wait_finished(active) > > +-- > +-- Test that each message in a batch is checked. When a limit is > +-- reached, other messages must be processed later. > +-- > +run_max = limit * 5 > +run_workers(conn) > +wait_active(limit + 1) > +wait_finished(run_max) > + > conn2:close() > conn:close() Thank you for working on this patch, -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov