[tarantool-patches] Re: [PATCH v2 1/2] iproto: fix error with unstoppable batching
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Fri May 4 14:56:21 MSK 2018
Hello. Thanks for review!
On 04/05/2018 11:26, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy at tarantool.org> [18/05/04 00:07]:
>> IProto connection stops input reading, when active request count
>
> Vlad, while gerund in English is a valid grammatical form it is
> rarely used in colloquial speech. Please try to avoid using it
> unless you know what you're doing. Sorry for nitpicking.
>
> Instead of trying to construct a complex sentence, simply try
> to split your point in two smaller ones. English, despite being a
> member of Indo-European family along with Russian, had lost
> grammatical cases and respective endings long time ago. Without case
> endings complex sentences are extremely hard to parse,
> so they fell out of use.
New commit message:
iproto: fix error with unstoppable batching
IProto connection stops to read input on reached request limit.
But when multiple requests are in a batch, the IProto does not
check the limit, so it can be violated.
Lets check the limit during batch parsing after each message too,
not only once before parsing.
>
>> is reached. But when multiple requests are in a batch, the IProto
>> does not check the limit, so it can be violated.
>>
>> Lets check the limit during batch parsing after each message too,
>> not only once before parsing.
>> ---
>> src/box/iproto.cc | 143 +++++++++++++++++++++++++++---------------
>> test/box/errinj.result | 69 ++++++++++++++++++++
>> test/box/errinj.test.lua | 29 +++++++++
>> test/box/net_msg_max.result | 21 ++++++-
>> test/box/net_msg_max.test.lua | 13 +++-
>> 5 files changed, 222 insertions(+), 53 deletions(-)
>>
>> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
>> index 9ccaf1dc7..dd7f97ecf 100644
>> --- a/src/box/iproto.cc
>> +++ b/src/box/iproto.cc
>> @@ -192,9 +192,13 @@ static struct mempool iproto_msg_pool;
>> static struct iproto_msg *
>> iproto_msg_new(struct iproto_connection *con)
>> {
>> + ERROR_INJECT(ERRINJ_TESTING, { return NULL; });
>> struct iproto_msg *msg =
>> - (struct iproto_msg *) mempool_alloc_xc(&iproto_msg_pool);
>> - msg->connection = con;
>> + (struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
>> + if (msg != NULL)
>> + msg->connection = con;
>> + else
>> + diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
>
> Nitpick: usually handling the error first and dealing with the
> rest later makes it easier to understand what's going on.
>
> This idiom would also make the code easier to parse in future, when more
> stuff is added to the "else" branch:
>
> if (msg == NULL) {
> diag_set()
> return NULL;
> }
>
> msg->connection = con;
> return msg;
Done.
struct iproto_msg *msg =
(struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
- if (msg != NULL)
- msg->connection = con;
- else
+ if (msg == NULL) {
diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
+ return NULL;
+ }
+ msg->connection = con;
return msg;
}
>
>
>> }
>>
>> -/** Enqueue all requests which were read up. */
>> -static inline void
>> +/**
>> + * Enqueue all requests which were read up. If a requests limit is
>
> request limit; request is an adjective here - when a noun is used
> as an adjective, its singular form is used.
Done.
- * Enqueue all requests which were read up. If a requests limit is
+ * Enqueue all requests which were read up. If a request limit is
>> @@ -644,9 +643,49 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>> * requests, keep reading input, if only to avoid
>> * a deadlock on this connection.
>> */
>> - ev_feed_event(con->loop, &con->input, EV_READ);
>> + if (iproto_check_msg_max())
>> + iproto_connection_stop(con);
>> + else
>> + ev_feed_event(con->loop, &con->input, EV_READ);
>
> Looks like a double check for msg max in the same loop. Could you
> please try to avoid that?
This check is out of loop. After it is finished, I check if it happened
due to the limit.
>
>> }
>> cpipe_flush_input(&tx_pipe);
>> + return 0;
>> +}
>> +
>> +/**
>> + * Enqueue connection's pending requests. Completely ressurect the
>
> Please turn on spell checking in your editor.
Done.
- * Enqueue connection's pending requests. Completely ressurect the
+ * Enqueue connection's pending requests. Completely resurrect the
>> +/**
>> + * Throttle the queue to the tx thread and ensure the fiber pool
>> + * in tx thread is not depleted by a flood of incoming requests:
>> + * resume a stopped connection only if there is a spare message
>> + * object in the message pool.
>> + */
>> +static void
>> +iproto_resume()
>> +{
>> + while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
>> + struct iproto_connection *con =
>> + rlist_first_entry(&stopped_connections,
>> + struct iproto_connection,
>> + in_stop_list);
>> + iproto_connection_resume(con);
>> + }
>
> By the look of it, resume() resumes all stopped connections.
>
> This is not the case, however, as long at iproto_connection_stop() is made
> *after* reading input, so most likely with a request already
> sitting in the input buffer. The tricky relationship between
> iproto_resume() and the timing of calling iproto_connection_stop()
> requires an explanation. The comment needs a brush-up as well.
Done.
@@ -662,6 +663,10 @@ iproto_connection_resume(struct iproto_connection *con)
{
assert(! iproto_check_msg_max());
rlist_del(&con->in_stop_list);
+ /*
+ * Enqueue_batch() stops the connection again, if the
+ * limit is reached again.
+ */
if (iproto_enqueue_batch(con, con->p_ibuf) != 0) {
@@ -671,10 +676,14 @@ iproto_connection_resume(struct iproto_connection *con)
}
/**
- * Throttle the queue to the tx thread and ensure the fiber pool
- * in tx thread is not depleted by a flood of incoming requests:
- * resume a stopped connection only if there is a spare message
- * object in the message pool.
+ * Resume as many connections as possible until a request limit is
+ * reached. Each connection before to start accept new requests
+ * enqueues pending ones. And the input is resumed only if the
+ * limit still is not reached.
+ *
+ * This global connections resuming is needed when one connection
+ * finished a request, and another connection can get the freed
+ * message.
*/
static void
iproto_resume()
>
>> static void
>> @@ -657,16 +696,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
>> (struct iproto_connection *) watcher->data;
>> int fd = con->input.fd;
>> assert(fd >= 0);
>> - if (! rlist_empty(&con->in_stop_list)) {
>> - /* Resumed stopped connection. */
>> - rlist_del(&con->in_stop_list);
>> - /*
>> - * This connection may have no input, so
>> - * resume one more connection which might have
>> - * input.
>> - */
>> - iproto_resume();
>> - }
>> + assert(rlist_empty(&con->in_stop_list));
>
> Nice.
👍👍
>> diff --git a/test/box/errinj.result b/test/box/errinj.result
>> index 5b4bc23a3..5da3e2642 100644
>> --- a/test/box/errinj.result
>> +++ b/test/box/errinj.result
>> @@ -1150,6 +1150,75 @@ cn:close()
>> s:drop()
>> ---
>> ...
>> +--
>> +-- In messages memory pool is over, stop the connection, until the
>> +-- pool has free memory.
>> +--
>> +started = 0
>> +---
>> +...
>> +finished = 0
>> +---
>> +...
>> +continue = false
>> +---
>> +...
>> +test_run:cmd('setopt delimiter ";"')
>> +---
>> +- true
>> +...
>> +function long_poll_f()
>> + started = started + 1
>> + f = fiber.self()
>> + while not continue do fiber.sleep(0.01) end
>> + finished = finished + 1
>> +end;
>> +---
>> +...
>> +test_run:cmd('setopt delimiter ""');
>> +---
>> +- true
>> +...
>> +cn = net_box.connect(box.cfg.listen)
>> +---
>> +...
>> +function long_poll() cn:call('long_poll_f') end
>> +---
>> +...
>> +_ = fiber.create(long_poll)
>> +---
>> +...
>> +while started ~= 1 do fiber.sleep(0.01) end
>> +---
>> +...
>> +-- Simulate OOM for new requests.
>> +errinj.set("ERRINJ_TESTING", true)
>> +---
>> +- ok
>> +...
>> +_ = fiber.create(long_poll)
>> +---
>> +...
>> +fiber.sleep(0.1)
>
> Please avoid 0.1 sleeps, entirely. If you're waiting for anything,
> please wait in a loop with wait timeout set 0.01 seconds.
>> +---
>> +...
>> +started == 1
>> +---
>> +- true
>> +...
>> +continue = true
>> +---
>> +...
>> +errinj.set("ERRINJ_TESTING", false)
>> +---
>> +- ok
>> +...
>> +while finished ~= 2 do fiber.sleep(0.01) end
>
> I don't understand how exactly you're testing that the connection
> is stopped when the pool is exhausted.
>
> It seems you simply wait for your long-poll fiber to block, then
> resume it, then wait for it to finish.
Yes, you are right. I refactored the patch to make it more clear.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 1e4a88ecc..baa6bb660 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -190,17 +190,7 @@ struct iproto_msg
static struct mempool iproto_msg_pool;
static struct iproto_msg *
-iproto_msg_new(struct iproto_connection *con)
-{
- ERROR_INJECT(ERRINJ_TESTING, { return NULL; });
- struct iproto_msg *msg =
- (struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
- if (msg != NULL)
- msg->connection = con;
- else
- diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
- return msg;
-}
+iproto_msg_new(struct iproto_connection *con);
/**
* Resume stopped connections, if any.
@@ -388,6 +378,25 @@ iproto_check_msg_max()
return request_count > IPROTO_MSG_MAX;
}
+static struct iproto_msg *
+iproto_msg_new(struct iproto_connection *con)
+{
+ struct iproto_msg *msg =
+ (struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
+ ERROR_INJECT(ERRINJ_TESTING, {
+ mempool_free(&iproto_msg_pool, msg);
+ msg = NULL;
+ });
+ if (msg == NULL) {
+ diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
+ say_warn("can not allocate new net_msg on connection %s",
+ sio_socketname(con->input.fd));
+ return NULL;
+ }
+ msg->connection = con;
+ return msg;
+}
+
/**
* A connection is idle when the client is gone
* and there are no outstanding msgs in the msg queue.
@@ -414,13 +423,22 @@ iproto_connection_is_idle(struct iproto_connection *con)
static inline void
iproto_connection_stop(struct iproto_connection *con)
{
- say_warn("net_msg_max limit reached, stopping input on connection %s",
+ say_warn("stopping input on connection %s",
sio_socketname(con->input.fd));
assert(rlist_empty(&con->in_stop_list));
ev_io_stop(con->loop, &con->input);
rlist_add_tail(&stopped_connections, &con->in_stop_list);
}
+static inline void
+iproto_connection_stop_by_limit(struct iproto_connection *con)
+{
+ say_warn("net_msg_max limit reached on connection %s",
+ sio_socketname(con->input.fd));
+ assert(iproto_check_msg_max());
+ iproto_connection_stop(con);
+}
+
/**
* Initiate a connection shutdown. This method may
* be invoked many times, and does the internal
@@ -644,7 +662,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
* a deadlock on this connection.
*/
if (iproto_check_msg_max())
- iproto_connection_stop(con);
+ iproto_connection_stop_by_limit(con);
else
ev_feed_event(con->loop, &con->input, EV_READ);
}
@@ -711,7 +729,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
* thread and deadlock.
*/
if (iproto_check_msg_max()) {
- iproto_connection_stop(con);
+ iproto_connection_stop_by_limit(con);
return;
}
diff --git a/test/box/errinj.test.lua b/test/box/errinj.test.lua
index 365d535c2..8f284f6ab 100644
--- a/test/box/errinj.test.lua
+++ b/test/box/errinj.test.lua
@@ -411,11 +411,21 @@ _ = fiber.create(long_poll)
while started ~= 1 do fiber.sleep(0.01) end
-- Simulate OOM for new requests.
errinj.set("ERRINJ_TESTING", true)
+-- This request tries to allocate a memory for request data, and
+-- fails with this. This stops the connection until an existing
+-- request is finished.
+log = require('log')
+-- Fill log with garbage to avoid accidentaly reading previous
+-- test cases result.
+log.info(string.rep('a', 1000))
_ = fiber.create(long_poll)
-fiber.sleep(0.1)
+while not test_run:grep_log('default', 'can not allocate new net_msg on connection', 1000) do fiber.sleep(0.01) end
+test_run:grep_log('default', 'stopping input on connection', 1000) ~= nil
started == 1
continue = true
errinj.set("ERRINJ_TESTING", false)
+-- Ensure, that when memory is available again, the pending
+-- request is executed.
while finished ~= 2 do fiber.sleep(0.01) end
cn:close()
More information about the Tarantool-patches
mailing list