From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id D35162116D for ; Tue, 8 May 2018 07:07:28 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 9NZy-JACK0sr for ; Tue, 8 May 2018 07:07:28 -0400 (EDT) Received: from smtp44.i.mail.ru (smtp44.i.mail.ru [94.100.177.104]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 6E57A21127 for ; Tue, 8 May 2018 07:07:28 -0400 (EDT) Subject: [tarantool-patches] Re: [commits] [tarantool] 01/02: iproto: fix error with unstoppable batching References: <152543542862.15530.3321867952732897038@localhost> <1525434981.906931598.16256142066578016@mxpdd8.i.mail.ru> <20180507195405.GA30274@atlas> From: Vladislav Shpilevoy Message-ID: Date: Tue, 8 May 2018 14:07:25 +0300 MIME-Version: 1.0 In-Reply-To: <20180507195405.GA30274@atlas> Content-Type: text/plain; charset=utf-8; format=flowed Content-Language: en-US Content-Transfer-Encoding: 7bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: Konstantin Osipov , tarantool-patches@freelists.org Hello. Thanks for review. >> + if (msg == NULL) { >> + diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg"); >> + say_warn("can not allocate new net_msg on connection %s", >> + sio_socketname(con->input.fd)); > > OutOfMemory is a logged error. Are you sure you want double > logging? No, it is not logged error. class OutOfMemory: public SystemError -> -> class SystemError: public Exception -> -> class Exception: public error -> -> struct error. > >> + return NULL; >> + } >> + msg->connection = con; >> + return msg; >> } >> >> /** >> @@ -434,13 +423,22 @@ iproto_connection_is_idle(struct iproto_connection *con) >> static inline void >> iproto_connection_stop(struct iproto_connection *con) >> { >> - say_warn("net_msg_max limit reached, stopping input on connection %s", >> + say_warn("stopping input on connection %s", >> sio_socketname(con->input.fd)); > > Please have 3 functions, but avoid double logging: > iproto_connection_stop > iproto_connectoin_stop_msg_max_limit > iproto_connection_stop_readahead_limit Done. diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 3212d9697..0a5eb1f86 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -426,19 +426,33 @@ iproto_connection_is_idle(struct iproto_connection *con) } static inline void -iproto_connection_stop(struct iproto_connection *con) +iproto_connection_stop_impl(struct iproto_connection *con) { say_warn("stopping input on connection %s", sio_socketname(con->input.fd)); assert(rlist_empty(&con->in_stop_list)); ev_io_stop(con->loop, &con->input); +} + +static inline void +iproto_connection_stop(struct iproto_connection *con) +{ + iproto_connection_stop_impl(con); rlist_add_tail(&stopped_connections, &con->in_stop_list); } static inline void -iproto_connection_stop_by_limit(struct iproto_connection *con) +iproto_connection_stop_readaheadlimit(struct iproto_connection *con) +{ + say_warn("readahead limit is reached on connection %s", + sio_socketname(con->input.fd)); + iproto_connection_stop_impl(con); +} + +static inline void +iproto_connection_stop_msg_max_limit(struct iproto_connection *con) { - say_warn("net_msg_max limit reached on connection %s", + say_warn("net_msg_max limit is reached on connection %s", sio_socketname(con->input.fd)); assert(iproto_check_msg_max()); iproto_connection_stop(con); @@ -667,7 +681,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) * a deadlock on this connection. */ if (iproto_check_msg_max()) - iproto_connection_stop_by_limit(con); + iproto_connection_stop_msg_max_limit(con); else ev_feed_event(con->loop, &con->input, EV_READ); } @@ -728,13 +742,14 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, int fd = con->input.fd; assert(fd >= 0); assert(rlist_empty(&con->in_stop_list)); + assert(loop == con->loop); /* * Throttle if there are too many pending requests, * otherwise we might deplete the fiber pool in tx * thread and deadlock. */ if (iproto_check_msg_max()) { - iproto_connection_stop_by_limit(con); + iproto_connection_stop_msg_max_limit(con); return; } @@ -742,9 +757,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, /* Ensure we have sufficient space for the next round. */ struct ibuf *in = iproto_connection_input_buffer(con); if (in == NULL) { - say_warn("readahead limit reached, stopping input on connection %s", - sio_socketname(con->input.fd)); - ev_io_stop(loop, &con->input); + iproto_connection_stop_readaheadlimit(con); return; } /* Read input. */ > >> assert(rlist_empty(&con->in_stop_list)); >> ev_io_stop(con->loop, &con->input); > > Please add a comment here: > > /* > * Important to add to tail and fetch from head to ensure > * strict lifo order (fairness) for stopped connections. > */ diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 0a5eb1f86..251d9fa0d 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -438,6 +438,10 @@ static inline void iproto_connection_stop(struct iproto_connection *con) { iproto_connection_stop_impl(con); + /* + * Important to add to tail and fetch from head to ensure + * strict lifo order (fairness) for stopped connections. + */ rlist_add_tail(&stopped_connections, &con->in_stop_list); } >> @@ -597,6 +606,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) >> if (reqend > in->wpos) >> break; >> struct iproto_msg *msg = iproto_msg_new(con); >> + if (msg == NULL) { >> + /* >> + * Do not tread it as an error - just wait >> + * until some of requests are finished. > > tread -> treat @@ -627,7 +631,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) struct iproto_msg *msg = iproto_msg_new(con); if (msg == NULL) { /* - * Do not tread it as an error - just wait + * Do not treat it as an error - just wait * until some of requests are finished. */ iproto_connection_stop(con); >> @@ -644,9 +661,57 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) >> * requests, keep reading input, if only to avoid >> * a deadlock on this connection. >> */ >> - ev_feed_event(con->loop, &con->input, EV_READ); >> + if (iproto_check_msg_max()) >> + iproto_connection_stop_by_limit(con); >> + else >> + ev_feed_event(con->loop, &con->input, EV_READ); > > Could you please avoid the double check for msg max here? There was no double check, as I said before. This check is out of cycle. But ok, I move this into. diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 9618a04b0..605d38798 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -610,9 +610,15 @@ iproto_connection_input_buffer(struct iproto_connection *con) static inline int iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) { + assert(rlist_empty(&con->in_stop_list)); int n_requests = 0; bool stop_input = false; - while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) { + while (con->parse_size != 0 && !stop_input) { + if (iproto_check_msg_max()) { + iproto_connection_stop_msg_max_limit(con); + cpipe_flush_input(&tx_pipe); + return 0; + } const char *reqstart = in->wpos - con->parse_size; const char *pos = reqstart; /* Read request length. */ @@ -666,7 +672,6 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) ev_io_stop(con->loop, &con->output); ev_io_stop(con->loop, &con->input); } else if (n_requests != 1 || con->parse_size != 0) { - assert(rlist_empty(&con->in_stop_list)); /* * Keep reading input, as long as the socket * supplies data, but don't waste CPU on an extra @@ -684,10 +689,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) * requests, keep reading input, if only to avoid * a deadlock on this connection. */ - if (iproto_check_msg_max()) - iproto_connection_stop_msg_max_limit(con); - else - ev_feed_event(con->loop, &con->input, EV_READ); + ev_feed_event(con->loop, &con->input, EV_READ); } cpipe_flush_input(&tx_pipe); return 0; >> +/** >> + * Resume as many connections as possible until a request limit is >> + * reached. Each connection before to start accept new requests >> + * enqueues pending ones. And the input is resumed only if the >> + * limit still is not reached. >> + * >> + * This global connections resuming is needed when one connection >> + * finished a request, and another connection can get the freed >> + * message. > > I can't parse this, sorry :( > > How about: > > Resume as many connections as possible until a request limit is > reached. By design of iproto_enqueue_batch(), a paused connection almost always has a > pending request fully read up, so resuming a connection will > immediately enqueue the request as an iproto message and exhaust > the limit. Thus we aren't really resuming all connections here: > only as many as is necessary to use up the limit. Ok. @@ -717,13 +719,12 @@ iproto_connection_resume(struct iproto_connection *con) /** * Resume as many connections as possible until a request limit is - * reached. Each connection before to start accept new requests - * enqueues pending ones. And the input is resumed only if the - * limit still is not reached. - * - * This global connections resuming is needed when one connection - * finished a request, and another connection can get the freed - * message. + * reached. By design of iproto_enqueue_batch(), a paused + * connection almost always has a pending request fully read up, + * so resuming a connection will immediately enqueue the request + * as an iproto message and exhaust the limit. Thus we aren't + * really resuming all connections here: only as many as is + * necessary to use up the limit. */ static void > >> + */ >> +static void >> +iproto_resume() >> +{ >> + while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) { > > Please add a comment: > /* Shift from list head to ensure strict FIFO (fairness) for > * resumed connections. > */ Ok. @@ -730,6 +730,10 @@ static void iproto_resume() { while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) { + /* + * Shift from list head to ensure strict FIFO + * (fairness) for resumed connections. + */ struct iproto_connection *con = rlist_first_entry(&stopped_connections, struct iproto_connection