From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 40B31207F8 for ; Mon, 7 May 2018 15:54:10 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id GhfXJAdTkjr1 for ; Mon, 7 May 2018 15:54:10 -0400 (EDT) Received: from smtp63.i.mail.ru (smtp63.i.mail.ru [217.69.128.43]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 8B0C2207B3 for ; Mon, 7 May 2018 15:54:09 -0400 (EDT) Date: Mon, 7 May 2018 22:54:05 +0300 From: Konstantin Osipov Subject: [tarantool-patches] Re: [commits] [tarantool] 01/02: iproto: fix error with unstoppable batching Message-ID: <20180507195405.GA30274@atlas> References: <152543542862.15530.3321867952732897038@localhost> <1525434981.906931598.16256142066578016@mxpdd8.i.mail.ru> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <1525434981.906931598.16256142066578016@mxpdd8.i.mail.ru> Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: Vladislav Shpilevoy , tarantool-patches@freelists.org * Vladislav Shpilevoy [18/05/04 14:58]: > iproto: fix error with unstoppable batching > > IProto connection stops to read input on reached request limit. > But when multiple requests are in a batch, the IProto does not > check the limit, so it can be violated. > > Lets check the limit during batch parsing after each message too, > not only once before parsing. > --- > src/box/iproto.cc | 177 +++++++++++++++++++++++++++++------------- > test/box/errinj.result | 86 ++++++++++++++++++++ > test/box/errinj.test.lua | 39 ++++++++++ > test/box/net_msg_max.result | 21 ++++- > test/box/net_msg_max.test.lua | 13 +++- > 5 files changed, 279 insertions(+), 57 deletions(-) > > diff --git a/src/box/iproto.cc b/src/box/iproto.cc > -/** > - * Throttle the queue to the tx thread and ensure the fiber pool > - * in tx thread is not depleted by a flood of incoming requests: > - * resume a stopped connection only if there is a spare message > - * object in the message pool. > - */ > -static void > -iproto_resume() > +static struct iproto_msg * > +iproto_msg_new(struct iproto_connection *con) > { Great! > - /* > - * Most of the time we have nothing to do here: throttling > - * is not active. > - */ > - if (rlist_empty(&stopped_connections)) > - return; > - if (iproto_check_msg_max()) > - return; > - > - struct iproto_connection *con; > - con = rlist_first_entry(&stopped_connections, struct iproto_connection, > - in_stop_list); > - ev_feed_event(con->loop, &con->input, EV_READ); > + struct iproto_msg *msg = > + (struct iproto_msg *) mempool_alloc(&iproto_msg_pool); > + ERROR_INJECT(ERRINJ_TESTING, { > + mempool_free(&iproto_msg_pool, msg); > + msg = NULL; > + }); > + if (msg == NULL) { > + diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg"); > + say_warn("can not allocate new net_msg on connection %s", > + sio_socketname(con->input.fd)); OutOfMemory is a logged error. Are you sure you want double logging? > + return NULL; > + } > + msg->connection = con; > + return msg; > } > > /** > @@ -434,13 +423,22 @@ iproto_connection_is_idle(struct iproto_connection *con) > static inline void > iproto_connection_stop(struct iproto_connection *con) > { > - say_warn("net_msg_max limit reached, stopping input on connection %s", > + say_warn("stopping input on connection %s", > sio_socketname(con->input.fd)); Please have 3 functions, but avoid double logging: iproto_connection_stop iproto_connectoin_stop_msg_max_limit iproto_connection_stop_readahead_limit > assert(rlist_empty(&con->in_stop_list)); > ev_io_stop(con->loop, &con->input); Please add a comment here: /* * Important to add to tail and fetch from head to ensure * strict lifo order (fairness) for stopped connections. */ > rlist_add_tail(&stopped_connections, &con->in_stop_list); > } > > +static inline void > +iproto_connection_stop_by_limit(struct iproto_connection *con) > +{ > + say_warn("net_msg_max limit reached on connection %s", > + sio_socketname(con->input.fd)); > + assert(iproto_check_msg_max()); > + iproto_connection_stop(con); > +} > + > /** > * Initiate a connection shutdown. This method may > * be invoked many times, and does the internal > @@ -575,20 +573,31 @@ iproto_connection_input_buffer(struct iproto_connection *con) > return new_ibuf; > } > > -/** Enqueue all requests which were read up. */ > -static inline void > +/** > + * Enqueue all requests which were read up. If a request limit is > + * reached - stop the connection input even if not the whole batch > + * is enqueued. Else try to read more feeding read event to the > + * event loop. > + * @param con Connection to enqueue in. > + * @param in Buffer to parse. > + * > + * @retval 0 Success. > + * @retval -1 Invalid MessagePack error. > + */ > +static inline int > iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) > { > int n_requests = 0; > bool stop_input = false; > - while (con->parse_size && stop_input == false) { > + while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) { > const char *reqstart = in->wpos - con->parse_size; > const char *pos = reqstart; > /* Read request length. */ > if (mp_typeof(*pos) != MP_UINT) { > cpipe_flush_input(&tx_pipe); > - tnt_raise(ClientError, ER_INVALID_MSGPACK, > - "packet length"); > + diag_set(ClientError, ER_INVALID_MSGPACK, > + "packet length"); > + return -1; > } > if (mp_check_uint(pos, in->wpos) >= 0) > break; > @@ -597,6 +606,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) > if (reqend > in->wpos) > break; > struct iproto_msg *msg = iproto_msg_new(con); > + if (msg == NULL) { > + /* > + * Do not tread it as an error - just wait > + * until some of requests are finished. tread -> treat > + */ > + iproto_connection_stop(con); > + return 0; > + } > msg->p_ibuf = con->p_ibuf; > msg->wpos = con->wpos; > > @@ -644,9 +661,57 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) > * requests, keep reading input, if only to avoid > * a deadlock on this connection. > */ > - ev_feed_event(con->loop, &con->input, EV_READ); > + if (iproto_check_msg_max()) > + iproto_connection_stop_by_limit(con); > + else > + ev_feed_event(con->loop, &con->input, EV_READ); Could you please avoid the double check for msg max here? > } > cpipe_flush_input(&tx_pipe); > + return 0; > +} > + > +/** > + * Enqueue connection's pending requests. Completely resurrect the > + * connection, if it has no more requests, and the limit still is > + * not reached. > + */ > +static void > +iproto_connection_resume(struct iproto_connection *con) > +{ > + assert(! iproto_check_msg_max()); > + rlist_del(&con->in_stop_list); > + /* > + * enqueue_batch() stops the connection again, if the > + * limit is reached again. > + */ > + if (iproto_enqueue_batch(con, con->p_ibuf) != 0) { > + struct error *e = box_error_last(); > + iproto_write_error(con->input.fd, e, ::schema_version, 0); > + error_log(e); > + iproto_connection_close(con); > + } > +} > + > +/** > + * Resume as many connections as possible until a request limit is > + * reached. Each connection before to start accept new requests > + * enqueues pending ones. And the input is resumed only if the > + * limit still is not reached. > + * > + * This global connections resuming is needed when one connection > + * finished a request, and another connection can get the freed > + * message. I can't parse this, sorry :( How about: Resume as many connections as possible until a request limit is reached. By design of iproto_enqueue_batch(), a paused connection almost always has a pending request fully read up, so resuming a connection will immediately enqueue the request as an iproto message and exhaust the limit. Thus we aren't really resuming all connections here: only as many as is necessary to use up the limit. > + */ > +static void > +iproto_resume() > +{ > + while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) { Please add a comment: /* Shift from list head to ensure strict FIFO (fairness) for * resumed connections. */ > + struct iproto_connection *con = > + rlist_first_entry(&stopped_connections, > + struct iproto_connection, > + in_stop_list); > + iproto_connection_resume(con); > + } > } > -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov