[tarantool-patches] Re: [commits] [tarantool] 01/02: iproto: fix error with unstoppable batching
Konstantin Osipov
kostja at tarantool.org
Mon May 7 22:54:05 MSK 2018
* Vladislav Shpilevoy <v.shpilevoy at tarantool.org> [18/05/04 14:58]:
> iproto: fix error with unstoppable batching
>
> IProto connection stops to read input on reached request limit.
> But when multiple requests are in a batch, the IProto does not
> check the limit, so it can be violated.
>
> Lets check the limit during batch parsing after each message too,
> not only once before parsing.
> ---
> src/box/iproto.cc | 177 +++++++++++++++++++++++++++++-------------
> test/box/errinj.result | 86 ++++++++++++++++++++
> test/box/errinj.test.lua | 39 ++++++++++
> test/box/net_msg_max.result | 21 ++++-
> test/box/net_msg_max.test.lua | 13 +++-
> 5 files changed, 279 insertions(+), 57 deletions(-)
>
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> -/**
> - * Throttle the queue to the tx thread and ensure the fiber pool
> - * in tx thread is not depleted by a flood of incoming requests:
> - * resume a stopped connection only if there is a spare message
> - * object in the message pool.
> - */
> -static void
> -iproto_resume()
> +static struct iproto_msg *
> +iproto_msg_new(struct iproto_connection *con)
> {
Great!
> - /*
> - * Most of the time we have nothing to do here: throttling
> - * is not active.
> - */
> - if (rlist_empty(&stopped_connections))
> - return;
> - if (iproto_check_msg_max())
> - return;
> -
> - struct iproto_connection *con;
> - con = rlist_first_entry(&stopped_connections, struct iproto_connection,
> - in_stop_list);
> - ev_feed_event(con->loop, &con->input, EV_READ);
> + struct iproto_msg *msg =
> + (struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
> + ERROR_INJECT(ERRINJ_TESTING, {
> + mempool_free(&iproto_msg_pool, msg);
> + msg = NULL;
> + });
> + if (msg == NULL) {
> + diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
> + say_warn("can not allocate new net_msg on connection %s",
> + sio_socketname(con->input.fd));
OutOfMemory is a logged error. Are you sure you want double
logging?
> + return NULL;
> + }
> + msg->connection = con;
> + return msg;
> }
>
> /**
> @@ -434,13 +423,22 @@ iproto_connection_is_idle(struct iproto_connection *con)
> static inline void
> iproto_connection_stop(struct iproto_connection *con)
> {
> - say_warn("net_msg_max limit reached, stopping input on connection %s",
> + say_warn("stopping input on connection %s",
> sio_socketname(con->input.fd));
Please have 3 functions, but avoid double logging:
iproto_connection_stop
iproto_connectoin_stop_msg_max_limit
iproto_connection_stop_readahead_limit
> assert(rlist_empty(&con->in_stop_list));
> ev_io_stop(con->loop, &con->input);
Please add a comment here:
/*
* Important to add to tail and fetch from head to ensure
* strict lifo order (fairness) for stopped connections.
*/
> rlist_add_tail(&stopped_connections, &con->in_stop_list);
> }
>
> +static inline void
> +iproto_connection_stop_by_limit(struct iproto_connection *con)
> +{
> + say_warn("net_msg_max limit reached on connection %s",
> + sio_socketname(con->input.fd));
> + assert(iproto_check_msg_max());
> + iproto_connection_stop(con);
> +}
> +
> /**
> * Initiate a connection shutdown. This method may
> * be invoked many times, and does the internal
> @@ -575,20 +573,31 @@ iproto_connection_input_buffer(struct iproto_connection *con)
> return new_ibuf;
> }
>
> -/** Enqueue all requests which were read up. */
> -static inline void
> +/**
> + * Enqueue all requests which were read up. If a request limit is
> + * reached - stop the connection input even if not the whole batch
> + * is enqueued. Else try to read more feeding read event to the
> + * event loop.
> + * @param con Connection to enqueue in.
> + * @param in Buffer to parse.
> + *
> + * @retval 0 Success.
> + * @retval -1 Invalid MessagePack error.
> + */
> +static inline int
> iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
> {
> int n_requests = 0;
> bool stop_input = false;
> - while (con->parse_size && stop_input == false) {
> + while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) {
> const char *reqstart = in->wpos - con->parse_size;
> const char *pos = reqstart;
> /* Read request length. */
> if (mp_typeof(*pos) != MP_UINT) {
> cpipe_flush_input(&tx_pipe);
> - tnt_raise(ClientError, ER_INVALID_MSGPACK,
> - "packet length");
> + diag_set(ClientError, ER_INVALID_MSGPACK,
> + "packet length");
> + return -1;
> }
> if (mp_check_uint(pos, in->wpos) >= 0)
> break;
> @@ -597,6 +606,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
> if (reqend > in->wpos)
> break;
> struct iproto_msg *msg = iproto_msg_new(con);
> + if (msg == NULL) {
> + /*
> + * Do not tread it as an error - just wait
> + * until some of requests are finished.
tread -> treat
> + */
> + iproto_connection_stop(con);
> + return 0;
> + }
> msg->p_ibuf = con->p_ibuf;
> msg->wpos = con->wpos;
>
> @@ -644,9 +661,57 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
> * requests, keep reading input, if only to avoid
> * a deadlock on this connection.
> */
> - ev_feed_event(con->loop, &con->input, EV_READ);
> + if (iproto_check_msg_max())
> + iproto_connection_stop_by_limit(con);
> + else
> + ev_feed_event(con->loop, &con->input, EV_READ);
Could you please avoid the double check for msg max here?
> }
> cpipe_flush_input(&tx_pipe);
> + return 0;
> +}
> +
> +/**
> + * Enqueue connection's pending requests. Completely resurrect the
> + * connection, if it has no more requests, and the limit still is
> + * not reached.
> + */
> +static void
> +iproto_connection_resume(struct iproto_connection *con)
> +{
> + assert(! iproto_check_msg_max());
> + rlist_del(&con->in_stop_list);
> + /*
> + * enqueue_batch() stops the connection again, if the
> + * limit is reached again.
> + */
> + if (iproto_enqueue_batch(con, con->p_ibuf) != 0) {
> + struct error *e = box_error_last();
> + iproto_write_error(con->input.fd, e, ::schema_version, 0);
> + error_log(e);
> + iproto_connection_close(con);
> + }
> +}
> +
> +/**
> + * Resume as many connections as possible until a request limit is
> + * reached. Each connection before to start accept new requests
> + * enqueues pending ones. And the input is resumed only if the
> + * limit still is not reached.
> + *
> + * This global connections resuming is needed when one connection
> + * finished a request, and another connection can get the freed
> + * message.
I can't parse this, sorry :(
How about:
Resume as many connections as possible until a request limit is
reached. By design of iproto_enqueue_batch(), a paused connection almost always has a
pending request fully read up, so resuming a connection will
immediately enqueue the request as an iproto message and exhaust
the limit. Thus we aren't really resuming all connections here:
only as many as is necessary to use up the limit.
> + */
> +static void
> +iproto_resume()
> +{
> + while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
Please add a comment:
/* Shift from list head to ensure strict FIFO (fairness) for
* resumed connections.
*/
> + struct iproto_connection *con =
> + rlist_first_entry(&stopped_connections,
> + struct iproto_connection,
> + in_stop_list);
> + iproto_connection_resume(con);
> + }
> }
>
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
More information about the Tarantool-patches
mailing list