[tarantool-patches] Re: [commits] [tarantool] 01/02: iproto: fix error with unstoppable batching
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Tue May 8 14:07:25 MSK 2018
Hello. Thanks for review.
>> + if (msg == NULL) {
>> + diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
>> + say_warn("can not allocate new net_msg on connection %s",
>> + sio_socketname(con->input.fd));
>
> OutOfMemory is a logged error. Are you sure you want double
> logging?
No, it is not logged error.
class OutOfMemory: public SystemError ->
-> class SystemError: public Exception ->
-> class Exception: public error ->
-> struct error.
>
>> + return NULL;
>> + }
>> + msg->connection = con;
>> + return msg;
>> }
>>
>> /**
>> @@ -434,13 +423,22 @@ iproto_connection_is_idle(struct iproto_connection *con)
>> static inline void
>> iproto_connection_stop(struct iproto_connection *con)
>> {
>> - say_warn("net_msg_max limit reached, stopping input on connection %s",
>> + say_warn("stopping input on connection %s",
>> sio_socketname(con->input.fd));
>
> Please have 3 functions, but avoid double logging:
> iproto_connection_stop
> iproto_connectoin_stop_msg_max_limit
> iproto_connection_stop_readahead_limit
Done.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 3212d9697..0a5eb1f86 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -426,19 +426,33 @@ iproto_connection_is_idle(struct iproto_connection *con)
}
static inline void
-iproto_connection_stop(struct iproto_connection *con)
+iproto_connection_stop_impl(struct iproto_connection *con)
{
say_warn("stopping input on connection %s",
sio_socketname(con->input.fd));
assert(rlist_empty(&con->in_stop_list));
ev_io_stop(con->loop, &con->input);
+}
+
+static inline void
+iproto_connection_stop(struct iproto_connection *con)
+{
+ iproto_connection_stop_impl(con);
rlist_add_tail(&stopped_connections, &con->in_stop_list);
}
static inline void
-iproto_connection_stop_by_limit(struct iproto_connection *con)
+iproto_connection_stop_readaheadlimit(struct iproto_connection *con)
+{
+ say_warn("readahead limit is reached on connection %s",
+ sio_socketname(con->input.fd));
+ iproto_connection_stop_impl(con);
+}
+
+static inline void
+iproto_connection_stop_msg_max_limit(struct iproto_connection *con)
{
- say_warn("net_msg_max limit reached on connection %s",
+ say_warn("net_msg_max limit is reached on connection %s",
sio_socketname(con->input.fd));
assert(iproto_check_msg_max());
iproto_connection_stop(con);
@@ -667,7 +681,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
* a deadlock on this connection.
*/
if (iproto_check_msg_max())
- iproto_connection_stop_by_limit(con);
+ iproto_connection_stop_msg_max_limit(con);
else
ev_feed_event(con->loop, &con->input, EV_READ);
}
@@ -728,13 +742,14 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
int fd = con->input.fd;
assert(fd >= 0);
assert(rlist_empty(&con->in_stop_list));
+ assert(loop == con->loop);
/*
* Throttle if there are too many pending requests,
* otherwise we might deplete the fiber pool in tx
* thread and deadlock.
*/
if (iproto_check_msg_max()) {
- iproto_connection_stop_by_limit(con);
+ iproto_connection_stop_msg_max_limit(con);
return;
}
@@ -742,9 +757,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
/* Ensure we have sufficient space for the next round. */
struct ibuf *in = iproto_connection_input_buffer(con);
if (in == NULL) {
- say_warn("readahead limit reached, stopping input on connection %s",
- sio_socketname(con->input.fd));
- ev_io_stop(loop, &con->input);
+ iproto_connection_stop_readaheadlimit(con);
return;
}
/* Read input. */
>
>> assert(rlist_empty(&con->in_stop_list));
>> ev_io_stop(con->loop, &con->input);
>
> Please add a comment here:
>
> /*
> * Important to add to tail and fetch from head to ensure
> * strict lifo order (fairness) for stopped connections.
> */
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 0a5eb1f86..251d9fa0d 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -438,6 +438,10 @@ static inline void
iproto_connection_stop(struct iproto_connection *con)
{
iproto_connection_stop_impl(con);
+ /*
+ * Important to add to tail and fetch from head to ensure
+ * strict lifo order (fairness) for stopped connections.
+ */
rlist_add_tail(&stopped_connections, &con->in_stop_list);
}
>> @@ -597,6 +606,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>> if (reqend > in->wpos)
>> break;
>> struct iproto_msg *msg = iproto_msg_new(con);
>> + if (msg == NULL) {
>> + /*
>> + * Do not tread it as an error - just wait
>> + * until some of requests are finished.
>
> tread -> treat
@@ -627,7 +631,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
struct iproto_msg *msg = iproto_msg_new(con);
if (msg == NULL) {
/*
- * Do not tread it as an error - just wait
+ * Do not treat it as an error - just wait
* until some of requests are finished.
*/
iproto_connection_stop(con);
>> @@ -644,9 +661,57 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>> * requests, keep reading input, if only to avoid
>> * a deadlock on this connection.
>> */
>> - ev_feed_event(con->loop, &con->input, EV_READ);
>> + if (iproto_check_msg_max())
>> + iproto_connection_stop_by_limit(con);
>> + else
>> + ev_feed_event(con->loop, &con->input, EV_READ);
>
> Could you please avoid the double check for msg max here?
There was no double check, as I said before. This check is out of cycle. But ok,
I move this into.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 9618a04b0..605d38798 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -610,9 +610,15 @@ iproto_connection_input_buffer(struct iproto_connection *con)
static inline int
iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
{
+ assert(rlist_empty(&con->in_stop_list));
int n_requests = 0;
bool stop_input = false;
- while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) {
+ while (con->parse_size != 0 && !stop_input) {
+ if (iproto_check_msg_max()) {
+ iproto_connection_stop_msg_max_limit(con);
+ cpipe_flush_input(&tx_pipe);
+ return 0;
+ }
const char *reqstart = in->wpos - con->parse_size;
const char *pos = reqstart;
/* Read request length. */
@@ -666,7 +672,6 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
ev_io_stop(con->loop, &con->output);
ev_io_stop(con->loop, &con->input);
} else if (n_requests != 1 || con->parse_size != 0) {
- assert(rlist_empty(&con->in_stop_list));
/*
* Keep reading input, as long as the socket
* supplies data, but don't waste CPU on an extra
@@ -684,10 +689,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
* requests, keep reading input, if only to avoid
* a deadlock on this connection.
*/
- if (iproto_check_msg_max())
- iproto_connection_stop_msg_max_limit(con);
- else
- ev_feed_event(con->loop, &con->input, EV_READ);
+ ev_feed_event(con->loop, &con->input, EV_READ);
}
cpipe_flush_input(&tx_pipe);
return 0;
>> +/**
>> + * Resume as many connections as possible until a request limit is
>> + * reached. Each connection before to start accept new requests
>> + * enqueues pending ones. And the input is resumed only if the
>> + * limit still is not reached.
>> + *
>> + * This global connections resuming is needed when one connection
>> + * finished a request, and another connection can get the freed
>> + * message.
>
> I can't parse this, sorry :(
>
> How about:
>
> Resume as many connections as possible until a request limit is
> reached. By design of iproto_enqueue_batch(), a paused connection almost always has a
> pending request fully read up, so resuming a connection will
> immediately enqueue the request as an iproto message and exhaust
> the limit. Thus we aren't really resuming all connections here:
> only as many as is necessary to use up the limit.
Ok.
@@ -717,13 +719,12 @@ iproto_connection_resume(struct iproto_connection *con)
/**
* Resume as many connections as possible until a request limit is
- * reached. Each connection before to start accept new requests
- * enqueues pending ones. And the input is resumed only if the
- * limit still is not reached.
- *
- * This global connections resuming is needed when one connection
- * finished a request, and another connection can get the freed
- * message.
+ * reached. By design of iproto_enqueue_batch(), a paused
+ * connection almost always has a pending request fully read up,
+ * so resuming a connection will immediately enqueue the request
+ * as an iproto message and exhaust the limit. Thus we aren't
+ * really resuming all connections here: only as many as is
+ * necessary to use up the limit.
*/
static void
>
>> + */
>> +static void
>> +iproto_resume()
>> +{
>> + while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
>
> Please add a comment:
> /* Shift from list head to ensure strict FIFO (fairness) for
> * resumed connections.
> */
Ok.
@@ -730,6 +730,10 @@ static void
iproto_resume()
{
while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
+ /*
+ * Shift from list head to ensure strict FIFO
+ * (fairness) for resumed connections.
+ */
struct iproto_connection *con =
rlist_first_entry(&stopped_connections,
struct iproto_connection
More information about the Tarantool-patches
mailing list