[tarantool-patches] Re: [commits] [tarantool] 01/02: iproto: fix error with unstoppable batching

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue May 8 14:07:25 MSK 2018


Hello. Thanks for review.

>> +	if (msg == NULL) {
>> +		diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
>> +		say_warn("can not allocate new net_msg on connection %s",
>> +			 sio_socketname(con->input.fd));
> 
> OutOfMemory is a logged error. Are you sure you want double
> logging?

No, it is not logged error.

class OutOfMemory: public SystemError ->
-> class SystemError: public Exception ->
-> class Exception: public error ->
-> struct error.

> 
>> +		return NULL;
>> +	}
>> +	msg->connection = con;
>> +	return msg;
>>   }
>>   
>>   /**
>> @@ -434,13 +423,22 @@ iproto_connection_is_idle(struct iproto_connection *con)
>>   static inline void
>>   iproto_connection_stop(struct iproto_connection *con)
>>   {
>> -	say_warn("net_msg_max limit reached, stopping input on connection %s",
>> +	say_warn("stopping input on connection %s",
>>   		 sio_socketname(con->input.fd));
> 
> Please have 3 functions, but avoid double logging:
> iproto_connection_stop
> iproto_connectoin_stop_msg_max_limit
> iproto_connection_stop_readahead_limit

Done.

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 3212d9697..0a5eb1f86 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -426,19 +426,33 @@ iproto_connection_is_idle(struct iproto_connection *con)
  }
  
  static inline void
-iproto_connection_stop(struct iproto_connection *con)
+iproto_connection_stop_impl(struct iproto_connection *con)
  {
  	say_warn("stopping input on connection %s",
  		 sio_socketname(con->input.fd));
  	assert(rlist_empty(&con->in_stop_list));
  	ev_io_stop(con->loop, &con->input);
+}
+
+static inline void
+iproto_connection_stop(struct iproto_connection *con)
+{
+	iproto_connection_stop_impl(con);
  	rlist_add_tail(&stopped_connections, &con->in_stop_list);
  }
  
  static inline void
-iproto_connection_stop_by_limit(struct iproto_connection *con)
+iproto_connection_stop_readaheadlimit(struct iproto_connection *con)
+{
+	say_warn("readahead limit is reached on connection %s",
+		 sio_socketname(con->input.fd));
+	iproto_connection_stop_impl(con);
+}
+
+static inline void
+iproto_connection_stop_msg_max_limit(struct iproto_connection *con)
  {
-	say_warn("net_msg_max limit reached on connection %s",
+	say_warn("net_msg_max limit is reached on connection %s",
  		 sio_socketname(con->input.fd));
  	assert(iproto_check_msg_max());
  	iproto_connection_stop(con);
@@ -667,7 +681,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
  		 * a deadlock on this connection.
  		 */
  		if (iproto_check_msg_max())
-			iproto_connection_stop_by_limit(con);
+			iproto_connection_stop_msg_max_limit(con);
  		else
  			ev_feed_event(con->loop, &con->input, EV_READ);
  	}
@@ -728,13 +742,14 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
  	int fd = con->input.fd;
  	assert(fd >= 0);
  	assert(rlist_empty(&con->in_stop_list));
+	assert(loop == con->loop);
  	/*
  	 * Throttle if there are too many pending requests,
  	 * otherwise we might deplete the fiber pool in tx
  	 * thread and deadlock.
  	 */
  	if (iproto_check_msg_max()) {
-		iproto_connection_stop_by_limit(con);
+		iproto_connection_stop_msg_max_limit(con);
  		return;
  	}
  
@@ -742,9 +757,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
  		/* Ensure we have sufficient space for the next round.  */
  		struct ibuf *in = iproto_connection_input_buffer(con);
  		if (in == NULL) {
-			say_warn("readahead limit reached, stopping input on connection %s",
-				 sio_socketname(con->input.fd));
-			ev_io_stop(loop, &con->input);
+			iproto_connection_stop_readaheadlimit(con);
  			return;
  		}
  		/* Read input. */

> 
>>   	assert(rlist_empty(&con->in_stop_list));
>>   	ev_io_stop(con->loop, &con->input);
> 
> Please add a comment here:
> 
> /*
>   * Important to add to tail and fetch from head to ensure
>   * strict lifo order (fairness) for stopped connections.
>   */

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 0a5eb1f86..251d9fa0d 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -438,6 +438,10 @@ static inline void
  iproto_connection_stop(struct iproto_connection *con)
  {
  	iproto_connection_stop_impl(con);
+	/*
+	 * Important to add to tail and fetch from head to ensure
+	 * strict lifo order (fairness) for stopped connections.
+	 */
  	rlist_add_tail(&stopped_connections, &con->in_stop_list);
  }


>> @@ -597,6 +606,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>>   		if (reqend > in->wpos)
>>   			break;
>>   		struct iproto_msg *msg = iproto_msg_new(con);
>> +		if (msg == NULL) {
>> +			/*
>> +			 * Do not tread it as an error - just wait
>> +			 * until some of requests are finished.
> 
> tread -> treat

@@ -627,7 +631,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
  		struct iproto_msg *msg = iproto_msg_new(con);
  		if (msg == NULL) {
  			/*
-			 * Do not tread it as an error - just wait
+			 * Do not treat it as an error - just wait
  			 * until some of requests are finished.
  			 */
  			iproto_connection_stop(con);


>> @@ -644,9 +661,57 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>>   		 * requests, keep reading input, if only to avoid
>>   		 * a deadlock on this connection.
>>   		 */
>> -		ev_feed_event(con->loop, &con->input, EV_READ);
>> +		if (iproto_check_msg_max())
>> +			iproto_connection_stop_by_limit(con);
>> +		else
>> +			ev_feed_event(con->loop, &con->input, EV_READ);
> 
> Could you please avoid the double check for msg max here?

There was no double check, as I said before. This check is out of cycle. But ok,
I move this into.

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 9618a04b0..605d38798 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -610,9 +610,15 @@ iproto_connection_input_buffer(struct iproto_connection *con)
  static inline int
  iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
  {
+	assert(rlist_empty(&con->in_stop_list));
  	int n_requests = 0;
  	bool stop_input = false;
-	while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) {
+	while (con->parse_size != 0 && !stop_input) {
+		if (iproto_check_msg_max()) {
+			iproto_connection_stop_msg_max_limit(con);
+			cpipe_flush_input(&tx_pipe);
+			return 0;
+		}
  		const char *reqstart = in->wpos - con->parse_size;
  		const char *pos = reqstart;
  		/* Read request length. */
@@ -666,7 +672,6 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
  		ev_io_stop(con->loop, &con->output);
  		ev_io_stop(con->loop, &con->input);
  	} else if (n_requests != 1 || con->parse_size != 0) {
-		assert(rlist_empty(&con->in_stop_list));
  		/*
  		 * Keep reading input, as long as the socket
  		 * supplies data, but don't waste CPU on an extra
@@ -684,10 +689,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
  		 * requests, keep reading input, if only to avoid
  		 * a deadlock on this connection.
  		 */
-		if (iproto_check_msg_max())
-			iproto_connection_stop_msg_max_limit(con);
-		else
-			ev_feed_event(con->loop, &con->input, EV_READ);
+		ev_feed_event(con->loop, &con->input, EV_READ);
  	}
  	cpipe_flush_input(&tx_pipe);
  	return 0;

>> +/**
>> + * Resume as many connections as possible until a request limit is
>> + * reached. Each connection before to start accept new requests
>> + * enqueues pending ones. And the input is resumed only if the
>> + * limit still is not reached.
>> + *
>> + * This global connections resuming is needed when one connection
>> + * finished a request, and another connection can get the freed
>> + * message.
> 
> I can't parse this, sorry :(
> 
> How about:
> 
> Resume as many connections as possible until a request limit is
> reached. By design of iproto_enqueue_batch(), a paused connection almost always has a
> pending request fully read up, so resuming a connection will
> immediately enqueue the request as an iproto message and exhaust
> the limit. Thus we aren't really resuming all connections here:
> only as many as is necessary to use up the limit.

Ok.

@@ -717,13 +719,12 @@ iproto_connection_resume(struct iproto_connection *con)
  
  /**
   * Resume as many connections as possible until a request limit is
- * reached. Each connection before to start accept new requests
- * enqueues pending ones. And the input is resumed only if the
- * limit still is not reached.
- *
- * This global connections resuming is needed when one connection
- * finished a request, and another connection can get the freed
- * message.
+ * reached. By design of iproto_enqueue_batch(), a paused
+ * connection almost always has a pending request fully read up,
+ * so resuming a connection will immediately enqueue the request
+ * as an iproto message and exhaust the limit. Thus we aren't
+ * really resuming all connections here: only as many as is
+ * necessary to use up the limit.
   */
  static void

> 
>> + */
>> +static void
>> +iproto_resume()
>> +{
>> +	while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
> 
>    Please add a comment:
>    /* Shift from list head to ensure strict FIFO (fairness) for
>     * resumed connections.
>     */

Ok.

@@ -730,6 +730,10 @@ static void
  iproto_resume()
  {
         while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
+               /*
+                * Shift from list head to ensure strict FIFO
+                * (fairness) for resumed connections.
+                */
                 struct iproto_connection *con =
                         rlist_first_entry(&stopped_connections,
                                           struct iproto_connection  




More information about the Tarantool-patches mailing list