[tarantool-patches] Re: [PATCH v2 1/2] iproto: fix error with unstoppable batching

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri May 4 14:56:21 MSK 2018


Hello. Thanks for review!

On 04/05/2018 11:26, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy at tarantool.org> [18/05/04 00:07]:
>> IProto connection stops input reading, when active request count
> 
> Vlad, while gerund in English is a valid grammatical form it is
> rarely used in colloquial speech. Please try to avoid using it
> unless you know what you're doing. Sorry for nitpicking.
> 
> Instead of trying to construct a complex sentence, simply try
> to split your point in two smaller ones. English, despite being a
> member of Indo-European family along with Russian, had lost
> grammatical cases and respective endings long time ago. Without case
> endings complex sentences are extremely hard to parse,
> so they fell out of use.

New commit message:

     iproto: fix error with unstoppable batching
     
     IProto connection stops to read input on reached request limit.
     But when multiple requests are in a batch, the IProto does not
     check the limit, so it can be violated.
     
     Lets check the limit during batch parsing after each message too,
     not only once before parsing.

> 
>> is reached. But when multiple requests are in a batch, the IProto
>> does not check the limit, so it can be violated.
>>
>> Lets check the limit during batch parsing after each message too,
>> not only once before parsing.
>> ---
>>   src/box/iproto.cc             | 143 +++++++++++++++++++++++++++---------------
>>   test/box/errinj.result        |  69 ++++++++++++++++++++
>>   test/box/errinj.test.lua      |  29 +++++++++
>>   test/box/net_msg_max.result   |  21 ++++++-
>>   test/box/net_msg_max.test.lua |  13 +++-
>>   5 files changed, 222 insertions(+), 53 deletions(-)
>>
>> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
>> index 9ccaf1dc7..dd7f97ecf 100644
>> --- a/src/box/iproto.cc
>> +++ b/src/box/iproto.cc
>> @@ -192,9 +192,13 @@ static struct mempool iproto_msg_pool;
>>   static struct iproto_msg *
>>   iproto_msg_new(struct iproto_connection *con)
>>   {
>> +	ERROR_INJECT(ERRINJ_TESTING, { return NULL; });
>>   	struct iproto_msg *msg =
>> -		(struct iproto_msg *) mempool_alloc_xc(&iproto_msg_pool);
>> -	msg->connection = con;
>> +		(struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
>> +	if (msg != NULL)
>> +		msg->connection = con;
>> +	else
>> +		diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
> 
> Nitpick: usually handling the error first and dealing with the
> rest later makes it easier to understand what's going on.
> 
> This idiom would also make the code easier to parse in future, when more
> stuff is added to the "else" branch:
> 
> if (msg == NULL) {
>      diag_set()
>      return NULL;
> }
> 
> msg->connection = con;
> return msg;

Done.

         struct iproto_msg *msg =
                 (struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
-       if (msg != NULL)
-               msg->connection = con;
-       else
+       if (msg == NULL) {
                 diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
+               return NULL;
+       }
+       msg->connection = con;
         return msg;
  }

> 
> 
>>   }
>>   
>> -/** Enqueue all requests which were read up. */
>> -static inline void
>> +/**
>> + * Enqueue all requests which were read up. If a requests limit is
> 
> request limit; request is an adjective here - when a noun is used
> as an adjective, its singular form is used.

Done.

- * Enqueue all requests which were read up. If a requests limit is
+ * Enqueue all requests which were read up. If a request limit is

>> @@ -644,9 +643,49 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>>   		 * requests, keep reading input, if only to avoid
>>   		 * a deadlock on this connection.
>>   		 */
>> -		ev_feed_event(con->loop, &con->input, EV_READ);
>> +		if (iproto_check_msg_max())
>> +			iproto_connection_stop(con);
>> +		else
>> +			ev_feed_event(con->loop, &con->input, EV_READ);
> 
> Looks like a double check for msg max in the same loop. Could you
> please try to avoid that?

This check is out of loop. After it is finished, I check if it happened
due to the limit.

> 
>>   	}
>>   	cpipe_flush_input(&tx_pipe);
>> +	return 0;
>> +}
>> +
>> +/**
>> + * Enqueue connection's pending requests. Completely ressurect the
> 
> Please turn on spell checking in your editor.

Done.

- * Enqueue connection's pending requests. Completely ressurect the
+ * Enqueue connection's pending requests. Completely resurrect the

>> +/**
>> + * Throttle the queue to the tx thread and ensure the fiber pool
>> + * in tx thread is not depleted by a flood of incoming requests:
>> + * resume a stopped connection only if there is a spare message
>> + * object in the message pool.
>> + */
>> +static void
>> +iproto_resume()
>> +{
>> +	while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
>> +		struct iproto_connection *con =
>> +			rlist_first_entry(&stopped_connections,
>> +					  struct iproto_connection,
>> +					  in_stop_list);
>> +		iproto_connection_resume(con);
>> +	}
> 
> By the look of it, resume() resumes all stopped connections.
> 
> This is not the case, however, as long at iproto_connection_stop() is made
> *after* reading input, so most likely with a request already
> sitting in the input buffer. The tricky relationship between
> iproto_resume() and the timing of calling iproto_connection_stop()
> requires an explanation. The comment needs a brush-up as well.

Done.

@@ -662,6 +663,10 @@ iproto_connection_resume(struct iproto_connection *con)
  {
         assert(! iproto_check_msg_max());
         rlist_del(&con->in_stop_list);
+       /*
+        * Enqueue_batch() stops the connection again, if the
+        * limit is reached again.
+        */
         if (iproto_enqueue_batch(con, con->p_ibuf) != 0) {
@@ -671,10 +676,14 @@ iproto_connection_resume(struct iproto_connection *con)
  }
  
  /**
- * Throttle the queue to the tx thread and ensure the fiber pool
- * in tx thread is not depleted by a flood of incoming requests:
- * resume a stopped connection only if there is a spare message
- * object in the message pool.
+ * Resume as many connections as possible until a request limit is
+ * reached. Each connection before to start accept new requests
+ * enqueues pending ones. And the input is resumed only if the
+ * limit still is not reached.
+ *
+ * This global connections resuming is needed when one connection
+ * finished a request, and another connection can get the freed
+ * message.
   */
  static void
  iproto_resume()

> 
>>   static void
>> @@ -657,16 +696,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
>>   		(struct iproto_connection *) watcher->data;
>>   	int fd = con->input.fd;
>>   	assert(fd >= 0);
>> -	if (! rlist_empty(&con->in_stop_list)) {
>> -		/* Resumed stopped connection. */
>> -		rlist_del(&con->in_stop_list);
>> -		/*
>> -		 * This connection may have no input, so
>> -		 * resume one more connection which might have
>> -		 * input.
>> -		 */
>> -		iproto_resume();
>> -	}
>> +	assert(rlist_empty(&con->in_stop_list));
> 
> Nice.

👍👍
>> diff --git a/test/box/errinj.result b/test/box/errinj.result
>> index 5b4bc23a3..5da3e2642 100644
>> --- a/test/box/errinj.result
>> +++ b/test/box/errinj.result
>> @@ -1150,6 +1150,75 @@ cn:close()
>>   s:drop()
>>   ---
>>   ...
>> +--
>> +-- In messages memory pool is over, stop the connection, until the
>> +-- pool has free memory.
>> +--
>> +started = 0
>> +---
>> +...
>> +finished = 0
>> +---
>> +...
>> +continue = false
>> +---
>> +...
>> +test_run:cmd('setopt delimiter ";"')
>> +---
>> +- true
>> +...
>> +function long_poll_f()
>> +    started = started + 1
>> +    f = fiber.self()
>> +    while not continue do fiber.sleep(0.01) end
>> +    finished = finished + 1
>> +end;
>> +---
>> +...
>> +test_run:cmd('setopt delimiter ""');
>> +---
>> +- true
>> +...
>> +cn = net_box.connect(box.cfg.listen)
>> +---
>> +...
>> +function long_poll() cn:call('long_poll_f') end
>> +---
>> +...
>> +_ = fiber.create(long_poll)
>> +---
>> +...
>> +while started ~= 1 do fiber.sleep(0.01) end
>> +---
>> +...
>> +-- Simulate OOM for new requests.
>> +errinj.set("ERRINJ_TESTING", true)
>> +---
>> +- ok
>> +...
>> +_ = fiber.create(long_poll)
>> +---
>> +...
>> +fiber.sleep(0.1)
> 
> Please avoid 0.1 sleeps, entirely. If you're waiting for anything,
> please wait in a loop with wait timeout set 0.01 seconds.
>> +---
>> +...
>> +started == 1
>> +---
>> +- true
>> +...
>> +continue = true
>> +---
>> +...
>> +errinj.set("ERRINJ_TESTING", false)
>> +---
>> +- ok
>> +...
>> +while finished ~= 2 do fiber.sleep(0.01) end
> 
> I don't understand how exactly you're testing that the connection
> is stopped when the pool is exhausted.
> 
> It seems you simply wait for your long-poll fiber to block, then
> resume it, then wait for it to finish.

Yes, you are right. I refactored the patch to make it more clear.


diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 1e4a88ecc..baa6bb660 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -190,17 +190,7 @@ struct iproto_msg
  static struct mempool iproto_msg_pool;
  
  static struct iproto_msg *
-iproto_msg_new(struct iproto_connection *con)
-{
-	ERROR_INJECT(ERRINJ_TESTING, { return NULL; });
-	struct iproto_msg *msg =
-		(struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
-	if (msg != NULL)
-		msg->connection = con;
-	else
-		diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
-	return msg;
-}
+iproto_msg_new(struct iproto_connection *con);
  
  /**
   * Resume stopped connections, if any.
@@ -388,6 +378,25 @@ iproto_check_msg_max()
  	return request_count > IPROTO_MSG_MAX;
  }
  
+static struct iproto_msg *
+iproto_msg_new(struct iproto_connection *con)
+{
+	struct iproto_msg *msg =
+		(struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
+	ERROR_INJECT(ERRINJ_TESTING, {
+		mempool_free(&iproto_msg_pool, msg);
+		msg = NULL;
+	});
+	if (msg == NULL) {
+		diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
+		say_warn("can not allocate new net_msg on connection %s",
+			 sio_socketname(con->input.fd));
+		return NULL;
+	}
+	msg->connection = con;
+	return msg;
+}
+
  /**
   * A connection is idle when the client is gone
   * and there are no outstanding msgs in the msg queue.
@@ -414,13 +423,22 @@ iproto_connection_is_idle(struct iproto_connection *con)
  static inline void
  iproto_connection_stop(struct iproto_connection *con)
  {
-	say_warn("net_msg_max limit reached, stopping input on connection %s",
+	say_warn("stopping input on connection %s",
  		 sio_socketname(con->input.fd));
  	assert(rlist_empty(&con->in_stop_list));
  	ev_io_stop(con->loop, &con->input);
  	rlist_add_tail(&stopped_connections, &con->in_stop_list);
  }
  
+static inline void
+iproto_connection_stop_by_limit(struct iproto_connection *con)
+{
+	say_warn("net_msg_max limit reached on connection %s",
+		 sio_socketname(con->input.fd));
+	assert(iproto_check_msg_max());
+	iproto_connection_stop(con);
+}
+
  /**
   * Initiate a connection shutdown. This method may
   * be invoked many times, and does the internal
@@ -644,7 +662,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
  		 * a deadlock on this connection.
  		 */
  		if (iproto_check_msg_max())
-			iproto_connection_stop(con);
+			iproto_connection_stop_by_limit(con);
  		else
  			ev_feed_event(con->loop, &con->input, EV_READ);
  	}
@@ -711,7 +729,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
  	 * thread and deadlock.
  	 */
  	if (iproto_check_msg_max()) {
-		iproto_connection_stop(con);
+		iproto_connection_stop_by_limit(con);
  		return;
  	}

diff --git a/test/box/errinj.test.lua b/test/box/errinj.test.lua
index 365d535c2..8f284f6ab 100644
--- a/test/box/errinj.test.lua
+++ b/test/box/errinj.test.lua
@@ -411,11 +411,21 @@ _ = fiber.create(long_poll)
  while started ~= 1 do fiber.sleep(0.01) end
  -- Simulate OOM for new requests.
  errinj.set("ERRINJ_TESTING", true)
+-- This request tries to allocate a memory for request data, and
+-- fails with this. This stops the connection until an existing
+-- request is finished.
+log = require('log')
+-- Fill log with garbage to avoid accidentaly reading previous
+-- test cases result.
+log.info(string.rep('a', 1000))
  _ = fiber.create(long_poll)
-fiber.sleep(0.1)
+while not test_run:grep_log('default', 'can not allocate new net_msg on connection', 1000) do fiber.sleep(0.01) end
+test_run:grep_log('default', 'stopping input on connection', 1000) ~= nil
  started == 1
  continue = true
  errinj.set("ERRINJ_TESTING", false)
+-- Ensure, that when memory is available again, the pending
+-- request is executed.
  while finished ~= 2 do fiber.sleep(0.01) end
  cn:close()






More information about the Tarantool-patches mailing list