[tarantool-patches] Re: [commits] [tarantool] 01/02: iproto: fix error with unstoppable batching

Konstantin Osipov kostja at tarantool.org
Mon May 7 22:54:05 MSK 2018


* Vladislav Shpilevoy <v.shpilevoy at tarantool.org> [18/05/04 14:58]:
>     iproto: fix error with unstoppable batching
>     
>     IProto connection stops to read input on reached request limit.
>     But when multiple requests are in a batch, the IProto does not
>     check the limit, so it can be violated.
>     
>     Lets check the limit during batch parsing after each message too,
>     not only once before parsing.
> ---
>  src/box/iproto.cc             | 177 +++++++++++++++++++++++++++++-------------
>  test/box/errinj.result        |  86 ++++++++++++++++++++
>  test/box/errinj.test.lua      |  39 ++++++++++
>  test/box/net_msg_max.result   |  21 ++++-
>  test/box/net_msg_max.test.lua |  13 +++-
>  5 files changed, 279 insertions(+), 57 deletions(-)
> 
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> -/**
> - * Throttle the queue to the tx thread and ensure the fiber pool
> - * in tx thread is not depleted by a flood of incoming requests:
> - * resume a stopped connection only if there is a spare message
> - * object in the message pool.
> - */
> -static void
> -iproto_resume()
> +static struct iproto_msg *
> +iproto_msg_new(struct iproto_connection *con)
>  {

Great!

> -	/*
> -	 * Most of the time we have nothing to do here: throttling
> -	 * is not active.
> -	 */
> -	if (rlist_empty(&stopped_connections))
> -		return;
> -	if (iproto_check_msg_max())
> -		return;
> -
> -	struct iproto_connection *con;
> -	con = rlist_first_entry(&stopped_connections, struct iproto_connection,
> -				in_stop_list);
> -	ev_feed_event(con->loop, &con->input, EV_READ);
> +	struct iproto_msg *msg =
> +		(struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
> +	ERROR_INJECT(ERRINJ_TESTING, {
> +		mempool_free(&iproto_msg_pool, msg);
> +		msg = NULL;
> +	});
> +	if (msg == NULL) {
> +		diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
> +		say_warn("can not allocate new net_msg on connection %s",
> +			 sio_socketname(con->input.fd));

OutOfMemory is a logged error. Are you sure you want double
logging?

> +		return NULL;
> +	}
> +	msg->connection = con;
> +	return msg;
>  }
>  
>  /**
> @@ -434,13 +423,22 @@ iproto_connection_is_idle(struct iproto_connection *con)
>  static inline void
>  iproto_connection_stop(struct iproto_connection *con)
>  {
> -	say_warn("net_msg_max limit reached, stopping input on connection %s",
> +	say_warn("stopping input on connection %s",
>  		 sio_socketname(con->input.fd));

Please have 3 functions, but avoid double logging:
iproto_connection_stop
iproto_connectoin_stop_msg_max_limit
iproto_connection_stop_readahead_limit 

>  	assert(rlist_empty(&con->in_stop_list));
>  	ev_io_stop(con->loop, &con->input);

Please add a comment here:

/*
 * Important to add to tail and fetch from head to ensure
 * strict lifo order (fairness) for stopped connections.
 */

>  	rlist_add_tail(&stopped_connections, &con->in_stop_list);
>  }
>  
> +static inline void
> +iproto_connection_stop_by_limit(struct iproto_connection *con)
> +{
> +	say_warn("net_msg_max limit reached on connection %s",
> +		 sio_socketname(con->input.fd));
> +	assert(iproto_check_msg_max());
> +	iproto_connection_stop(con);
> +}
> +
>  /**
>   * Initiate a connection shutdown. This method may
>   * be invoked many times, and does the internal
> @@ -575,20 +573,31 @@ iproto_connection_input_buffer(struct iproto_connection *con)
>  	return new_ibuf;
>  }
>  
> -/** Enqueue all requests which were read up. */
> -static inline void
> +/**
> + * Enqueue all requests which were read up. If a request limit is
> + * reached - stop the connection input even if not the whole batch
> + * is enqueued. Else try to read more feeding read event to the
> + * event loop.
> + * @param con Connection to enqueue in.
> + * @param in Buffer to parse.
> + *
> + * @retval  0 Success.
> + * @retval -1 Invalid MessagePack error.
> + */
> +static inline int
>  iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>  {
>  	int n_requests = 0;
>  	bool stop_input = false;
> -	while (con->parse_size && stop_input == false) {
> +	while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) {
>  		const char *reqstart = in->wpos - con->parse_size;
>  		const char *pos = reqstart;
>  		/* Read request length. */
>  		if (mp_typeof(*pos) != MP_UINT) {
>  			cpipe_flush_input(&tx_pipe);
> -			tnt_raise(ClientError, ER_INVALID_MSGPACK,
> -				  "packet length");
> +			diag_set(ClientError, ER_INVALID_MSGPACK,
> +				 "packet length");
> +			return -1;
>  		}
>  		if (mp_check_uint(pos, in->wpos) >= 0)
>  			break;
> @@ -597,6 +606,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>  		if (reqend > in->wpos)
>  			break;
>  		struct iproto_msg *msg = iproto_msg_new(con);
> +		if (msg == NULL) {
> +			/*
> +			 * Do not tread it as an error - just wait
> +			 * until some of requests are finished.

tread -> treat

> +			 */
> +			iproto_connection_stop(con);
> +			return 0;
> +		}
>  		msg->p_ibuf = con->p_ibuf;
>  		msg->wpos = con->wpos;
>  
> @@ -644,9 +661,57 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>  		 * requests, keep reading input, if only to avoid
>  		 * a deadlock on this connection.
>  		 */
> -		ev_feed_event(con->loop, &con->input, EV_READ);
> +		if (iproto_check_msg_max())
> +			iproto_connection_stop_by_limit(con);
> +		else
> +			ev_feed_event(con->loop, &con->input, EV_READ);

Could you please avoid the double check for msg max here?

>  	}
>  	cpipe_flush_input(&tx_pipe);
> +	return 0;
> +}
> +
> +/**
> + * Enqueue connection's pending requests. Completely resurrect the
> + * connection, if it has no more requests, and the limit still is
> + * not reached.
> + */
> +static void
> +iproto_connection_resume(struct iproto_connection *con)
> +{
> +	assert(! iproto_check_msg_max());
> +	rlist_del(&con->in_stop_list);
> +	/*
> +	 * enqueue_batch() stops the connection again, if the
> +	 * limit is reached again.
> +	 */
> +	if (iproto_enqueue_batch(con, con->p_ibuf) != 0) {
> +		struct error *e = box_error_last();
> +		iproto_write_error(con->input.fd, e, ::schema_version, 0);
> +		error_log(e);
> +		iproto_connection_close(con);
> +	}
> +}
> +
> +/**
> + * Resume as many connections as possible until a request limit is
> + * reached. Each connection before to start accept new requests
> + * enqueues pending ones. And the input is resumed only if the
> + * limit still is not reached.
> + *
> + * This global connections resuming is needed when one connection
> + * finished a request, and another connection can get the freed
> + * message.

I can't parse this, sorry :(

How about:

Resume as many connections as possible until a request limit is
reached. By design of iproto_enqueue_batch(), a paused connection almost always has a
pending request fully read up, so resuming a connection will
immediately enqueue the request as an iproto message and exhaust
the limit. Thus we aren't really resuming all connections here:
only as many as is necessary to use up the limit.

> + */
> +static void
> +iproto_resume()
> +{
> +	while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {

  Please add a comment: 
  /* Shift from list head to ensure strict FIFO (fairness) for
   * resumed connections.
   */
> +		struct iproto_connection *con =
> +			rlist_first_entry(&stopped_connections,
> +					  struct iproto_connection,
> +					  in_stop_list);
> +		iproto_connection_resume(con);
> +	}
>  }
>  

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov




More information about the Tarantool-patches mailing list