* [tarantool-patches] Re: [commits] [tarantool] 01/02: iproto: fix error with unstoppable batching
[not found] ` <1525434981.906931598.16256142066578016@mxpdd8.i.mail.ru>
@ 2018-05-07 19:54 ` Konstantin Osipov
2018-05-08 11:07 ` Vladislav Shpilevoy
0 siblings, 1 reply; 3+ messages in thread
From: Konstantin Osipov @ 2018-05-07 19:54 UTC (permalink / raw)
To: Vladislav Shpilevoy, tarantool-patches
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/05/04 14:58]:
> iproto: fix error with unstoppable batching
>
> IProto connection stops to read input on reached request limit.
> But when multiple requests are in a batch, the IProto does not
> check the limit, so it can be violated.
>
> Lets check the limit during batch parsing after each message too,
> not only once before parsing.
> ---
> src/box/iproto.cc | 177 +++++++++++++++++++++++++++++-------------
> test/box/errinj.result | 86 ++++++++++++++++++++
> test/box/errinj.test.lua | 39 ++++++++++
> test/box/net_msg_max.result | 21 ++++-
> test/box/net_msg_max.test.lua | 13 +++-
> 5 files changed, 279 insertions(+), 57 deletions(-)
>
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> -/**
> - * Throttle the queue to the tx thread and ensure the fiber pool
> - * in tx thread is not depleted by a flood of incoming requests:
> - * resume a stopped connection only if there is a spare message
> - * object in the message pool.
> - */
> -static void
> -iproto_resume()
> +static struct iproto_msg *
> +iproto_msg_new(struct iproto_connection *con)
> {
Great!
> - /*
> - * Most of the time we have nothing to do here: throttling
> - * is not active.
> - */
> - if (rlist_empty(&stopped_connections))
> - return;
> - if (iproto_check_msg_max())
> - return;
> -
> - struct iproto_connection *con;
> - con = rlist_first_entry(&stopped_connections, struct iproto_connection,
> - in_stop_list);
> - ev_feed_event(con->loop, &con->input, EV_READ);
> + struct iproto_msg *msg =
> + (struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
> + ERROR_INJECT(ERRINJ_TESTING, {
> + mempool_free(&iproto_msg_pool, msg);
> + msg = NULL;
> + });
> + if (msg == NULL) {
> + diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
> + say_warn("can not allocate new net_msg on connection %s",
> + sio_socketname(con->input.fd));
OutOfMemory is a logged error. Are you sure you want double
logging?
> + return NULL;
> + }
> + msg->connection = con;
> + return msg;
> }
>
> /**
> @@ -434,13 +423,22 @@ iproto_connection_is_idle(struct iproto_connection *con)
> static inline void
> iproto_connection_stop(struct iproto_connection *con)
> {
> - say_warn("net_msg_max limit reached, stopping input on connection %s",
> + say_warn("stopping input on connection %s",
> sio_socketname(con->input.fd));
Please have 3 functions, but avoid double logging:
iproto_connection_stop
iproto_connectoin_stop_msg_max_limit
iproto_connection_stop_readahead_limit
> assert(rlist_empty(&con->in_stop_list));
> ev_io_stop(con->loop, &con->input);
Please add a comment here:
/*
* Important to add to tail and fetch from head to ensure
* strict lifo order (fairness) for stopped connections.
*/
> rlist_add_tail(&stopped_connections, &con->in_stop_list);
> }
>
> +static inline void
> +iproto_connection_stop_by_limit(struct iproto_connection *con)
> +{
> + say_warn("net_msg_max limit reached on connection %s",
> + sio_socketname(con->input.fd));
> + assert(iproto_check_msg_max());
> + iproto_connection_stop(con);
> +}
> +
> /**
> * Initiate a connection shutdown. This method may
> * be invoked many times, and does the internal
> @@ -575,20 +573,31 @@ iproto_connection_input_buffer(struct iproto_connection *con)
> return new_ibuf;
> }
>
> -/** Enqueue all requests which were read up. */
> -static inline void
> +/**
> + * Enqueue all requests which were read up. If a request limit is
> + * reached - stop the connection input even if not the whole batch
> + * is enqueued. Else try to read more feeding read event to the
> + * event loop.
> + * @param con Connection to enqueue in.
> + * @param in Buffer to parse.
> + *
> + * @retval 0 Success.
> + * @retval -1 Invalid MessagePack error.
> + */
> +static inline int
> iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
> {
> int n_requests = 0;
> bool stop_input = false;
> - while (con->parse_size && stop_input == false) {
> + while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) {
> const char *reqstart = in->wpos - con->parse_size;
> const char *pos = reqstart;
> /* Read request length. */
> if (mp_typeof(*pos) != MP_UINT) {
> cpipe_flush_input(&tx_pipe);
> - tnt_raise(ClientError, ER_INVALID_MSGPACK,
> - "packet length");
> + diag_set(ClientError, ER_INVALID_MSGPACK,
> + "packet length");
> + return -1;
> }
> if (mp_check_uint(pos, in->wpos) >= 0)
> break;
> @@ -597,6 +606,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
> if (reqend > in->wpos)
> break;
> struct iproto_msg *msg = iproto_msg_new(con);
> + if (msg == NULL) {
> + /*
> + * Do not tread it as an error - just wait
> + * until some of requests are finished.
tread -> treat
> + */
> + iproto_connection_stop(con);
> + return 0;
> + }
> msg->p_ibuf = con->p_ibuf;
> msg->wpos = con->wpos;
>
> @@ -644,9 +661,57 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
> * requests, keep reading input, if only to avoid
> * a deadlock on this connection.
> */
> - ev_feed_event(con->loop, &con->input, EV_READ);
> + if (iproto_check_msg_max())
> + iproto_connection_stop_by_limit(con);
> + else
> + ev_feed_event(con->loop, &con->input, EV_READ);
Could you please avoid the double check for msg max here?
> }
> cpipe_flush_input(&tx_pipe);
> + return 0;
> +}
> +
> +/**
> + * Enqueue connection's pending requests. Completely resurrect the
> + * connection, if it has no more requests, and the limit still is
> + * not reached.
> + */
> +static void
> +iproto_connection_resume(struct iproto_connection *con)
> +{
> + assert(! iproto_check_msg_max());
> + rlist_del(&con->in_stop_list);
> + /*
> + * enqueue_batch() stops the connection again, if the
> + * limit is reached again.
> + */
> + if (iproto_enqueue_batch(con, con->p_ibuf) != 0) {
> + struct error *e = box_error_last();
> + iproto_write_error(con->input.fd, e, ::schema_version, 0);
> + error_log(e);
> + iproto_connection_close(con);
> + }
> +}
> +
> +/**
> + * Resume as many connections as possible until a request limit is
> + * reached. Each connection before to start accept new requests
> + * enqueues pending ones. And the input is resumed only if the
> + * limit still is not reached.
> + *
> + * This global connections resuming is needed when one connection
> + * finished a request, and another connection can get the freed
> + * message.
I can't parse this, sorry :(
How about:
Resume as many connections as possible until a request limit is
reached. By design of iproto_enqueue_batch(), a paused connection almost always has a
pending request fully read up, so resuming a connection will
immediately enqueue the request as an iproto message and exhaust
the limit. Thus we aren't really resuming all connections here:
only as many as is necessary to use up the limit.
> + */
> +static void
> +iproto_resume()
> +{
> + while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
Please add a comment:
/* Shift from list head to ensure strict FIFO (fairness) for
* resumed connections.
*/
> + struct iproto_connection *con =
> + rlist_first_entry(&stopped_connections,
> + struct iproto_connection,
> + in_stop_list);
> + iproto_connection_resume(con);
> + }
> }
>
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 3+ messages in thread
* [tarantool-patches] Re: [commits] [tarantool] 01/02: iproto: fix error with unstoppable batching
2018-05-07 19:54 ` [tarantool-patches] Re: [commits] [tarantool] 01/02: iproto: fix error with unstoppable batching Konstantin Osipov
@ 2018-05-08 11:07 ` Vladislav Shpilevoy
0 siblings, 0 replies; 3+ messages in thread
From: Vladislav Shpilevoy @ 2018-05-08 11:07 UTC (permalink / raw)
To: Konstantin Osipov, tarantool-patches
Hello. Thanks for review.
>> + if (msg == NULL) {
>> + diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
>> + say_warn("can not allocate new net_msg on connection %s",
>> + sio_socketname(con->input.fd));
>
> OutOfMemory is a logged error. Are you sure you want double
> logging?
No, it is not logged error.
class OutOfMemory: public SystemError ->
-> class SystemError: public Exception ->
-> class Exception: public error ->
-> struct error.
>
>> + return NULL;
>> + }
>> + msg->connection = con;
>> + return msg;
>> }
>>
>> /**
>> @@ -434,13 +423,22 @@ iproto_connection_is_idle(struct iproto_connection *con)
>> static inline void
>> iproto_connection_stop(struct iproto_connection *con)
>> {
>> - say_warn("net_msg_max limit reached, stopping input on connection %s",
>> + say_warn("stopping input on connection %s",
>> sio_socketname(con->input.fd));
>
> Please have 3 functions, but avoid double logging:
> iproto_connection_stop
> iproto_connectoin_stop_msg_max_limit
> iproto_connection_stop_readahead_limit
Done.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 3212d9697..0a5eb1f86 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -426,19 +426,33 @@ iproto_connection_is_idle(struct iproto_connection *con)
}
static inline void
-iproto_connection_stop(struct iproto_connection *con)
+iproto_connection_stop_impl(struct iproto_connection *con)
{
say_warn("stopping input on connection %s",
sio_socketname(con->input.fd));
assert(rlist_empty(&con->in_stop_list));
ev_io_stop(con->loop, &con->input);
+}
+
+static inline void
+iproto_connection_stop(struct iproto_connection *con)
+{
+ iproto_connection_stop_impl(con);
rlist_add_tail(&stopped_connections, &con->in_stop_list);
}
static inline void
-iproto_connection_stop_by_limit(struct iproto_connection *con)
+iproto_connection_stop_readaheadlimit(struct iproto_connection *con)
+{
+ say_warn("readahead limit is reached on connection %s",
+ sio_socketname(con->input.fd));
+ iproto_connection_stop_impl(con);
+}
+
+static inline void
+iproto_connection_stop_msg_max_limit(struct iproto_connection *con)
{
- say_warn("net_msg_max limit reached on connection %s",
+ say_warn("net_msg_max limit is reached on connection %s",
sio_socketname(con->input.fd));
assert(iproto_check_msg_max());
iproto_connection_stop(con);
@@ -667,7 +681,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
* a deadlock on this connection.
*/
if (iproto_check_msg_max())
- iproto_connection_stop_by_limit(con);
+ iproto_connection_stop_msg_max_limit(con);
else
ev_feed_event(con->loop, &con->input, EV_READ);
}
@@ -728,13 +742,14 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
int fd = con->input.fd;
assert(fd >= 0);
assert(rlist_empty(&con->in_stop_list));
+ assert(loop == con->loop);
/*
* Throttle if there are too many pending requests,
* otherwise we might deplete the fiber pool in tx
* thread and deadlock.
*/
if (iproto_check_msg_max()) {
- iproto_connection_stop_by_limit(con);
+ iproto_connection_stop_msg_max_limit(con);
return;
}
@@ -742,9 +757,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
/* Ensure we have sufficient space for the next round. */
struct ibuf *in = iproto_connection_input_buffer(con);
if (in == NULL) {
- say_warn("readahead limit reached, stopping input on connection %s",
- sio_socketname(con->input.fd));
- ev_io_stop(loop, &con->input);
+ iproto_connection_stop_readaheadlimit(con);
return;
}
/* Read input. */
>
>> assert(rlist_empty(&con->in_stop_list));
>> ev_io_stop(con->loop, &con->input);
>
> Please add a comment here:
>
> /*
> * Important to add to tail and fetch from head to ensure
> * strict lifo order (fairness) for stopped connections.
> */
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 0a5eb1f86..251d9fa0d 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -438,6 +438,10 @@ static inline void
iproto_connection_stop(struct iproto_connection *con)
{
iproto_connection_stop_impl(con);
+ /*
+ * Important to add to tail and fetch from head to ensure
+ * strict lifo order (fairness) for stopped connections.
+ */
rlist_add_tail(&stopped_connections, &con->in_stop_list);
}
>> @@ -597,6 +606,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>> if (reqend > in->wpos)
>> break;
>> struct iproto_msg *msg = iproto_msg_new(con);
>> + if (msg == NULL) {
>> + /*
>> + * Do not tread it as an error - just wait
>> + * until some of requests are finished.
>
> tread -> treat
@@ -627,7 +631,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
struct iproto_msg *msg = iproto_msg_new(con);
if (msg == NULL) {
/*
- * Do not tread it as an error - just wait
+ * Do not treat it as an error - just wait
* until some of requests are finished.
*/
iproto_connection_stop(con);
>> @@ -644,9 +661,57 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>> * requests, keep reading input, if only to avoid
>> * a deadlock on this connection.
>> */
>> - ev_feed_event(con->loop, &con->input, EV_READ);
>> + if (iproto_check_msg_max())
>> + iproto_connection_stop_by_limit(con);
>> + else
>> + ev_feed_event(con->loop, &con->input, EV_READ);
>
> Could you please avoid the double check for msg max here?
There was no double check, as I said before. This check is out of cycle. But ok,
I move this into.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 9618a04b0..605d38798 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -610,9 +610,15 @@ iproto_connection_input_buffer(struct iproto_connection *con)
static inline int
iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
{
+ assert(rlist_empty(&con->in_stop_list));
int n_requests = 0;
bool stop_input = false;
- while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) {
+ while (con->parse_size != 0 && !stop_input) {
+ if (iproto_check_msg_max()) {
+ iproto_connection_stop_msg_max_limit(con);
+ cpipe_flush_input(&tx_pipe);
+ return 0;
+ }
const char *reqstart = in->wpos - con->parse_size;
const char *pos = reqstart;
/* Read request length. */
@@ -666,7 +672,6 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
ev_io_stop(con->loop, &con->output);
ev_io_stop(con->loop, &con->input);
} else if (n_requests != 1 || con->parse_size != 0) {
- assert(rlist_empty(&con->in_stop_list));
/*
* Keep reading input, as long as the socket
* supplies data, but don't waste CPU on an extra
@@ -684,10 +689,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
* requests, keep reading input, if only to avoid
* a deadlock on this connection.
*/
- if (iproto_check_msg_max())
- iproto_connection_stop_msg_max_limit(con);
- else
- ev_feed_event(con->loop, &con->input, EV_READ);
+ ev_feed_event(con->loop, &con->input, EV_READ);
}
cpipe_flush_input(&tx_pipe);
return 0;
>> +/**
>> + * Resume as many connections as possible until a request limit is
>> + * reached. Each connection before to start accept new requests
>> + * enqueues pending ones. And the input is resumed only if the
>> + * limit still is not reached.
>> + *
>> + * This global connections resuming is needed when one connection
>> + * finished a request, and another connection can get the freed
>> + * message.
>
> I can't parse this, sorry :(
>
> How about:
>
> Resume as many connections as possible until a request limit is
> reached. By design of iproto_enqueue_batch(), a paused connection almost always has a
> pending request fully read up, so resuming a connection will
> immediately enqueue the request as an iproto message and exhaust
> the limit. Thus we aren't really resuming all connections here:
> only as many as is necessary to use up the limit.
Ok.
@@ -717,13 +719,12 @@ iproto_connection_resume(struct iproto_connection *con)
/**
* Resume as many connections as possible until a request limit is
- * reached. Each connection before to start accept new requests
- * enqueues pending ones. And the input is resumed only if the
- * limit still is not reached.
- *
- * This global connections resuming is needed when one connection
- * finished a request, and another connection can get the freed
- * message.
+ * reached. By design of iproto_enqueue_batch(), a paused
+ * connection almost always has a pending request fully read up,
+ * so resuming a connection will immediately enqueue the request
+ * as an iproto message and exhaust the limit. Thus we aren't
+ * really resuming all connections here: only as many as is
+ * necessary to use up the limit.
*/
static void
>
>> + */
>> +static void
>> +iproto_resume()
>> +{
>> + while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
>
> Please add a comment:
> /* Shift from list head to ensure strict FIFO (fairness) for
> * resumed connections.
> */
Ok.
@@ -730,6 +730,10 @@ static void
iproto_resume()
{
while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
+ /*
+ * Shift from list head to ensure strict FIFO
+ * (fairness) for resumed connections.
+ */
struct iproto_connection *con =
rlist_first_entry(&stopped_connections,
struct iproto_connection
^ permalink raw reply [flat|nested] 3+ messages in thread
* [tarantool-patches] Re: [commits] [tarantool] 02/02: iproto: allow to configure IPROTO_MSG_MAX
[not found] ` <20180507195948.GB30274@atlas>
@ 2018-05-08 11:07 ` Vladislav Shpilevoy
0 siblings, 0 replies; 3+ messages in thread
From: Vladislav Shpilevoy @ 2018-05-08 11:07 UTC (permalink / raw)
To: Konstantin Osipov, tarantool-patches
>> -static int
>> -iproto_do_listen(struct cbus_call_msg *m)
>> +static inline int
>> +iproto_do_cfg(struct iproto_cfg_msg *msg)
>> {
>> - (void) m;
>> - try {
>> - if (evio_service_is_active(&binary))
>> - evio_service_listen(&binary);
>> - } catch (Exception *e) {
>> + const struct cmsg_hop cfg_route[] = {
>> + { iproto_do_cfg_f, NULL },
>> + };
>> + cmsg_init(msg, cfg_route);
>> + cpipe_push(&net_pipe, msg);
>> + while (msg->rc == IPROTO_CFG_NOT_FINISHED)
>> + fiber_sleep(0.001);
>
> Holy cow.
>
> Oh now I understand the comment. Please rewrite this, why
> would you ever need to write such a thing...
> Simply route the message back and wakeup a cond in the return
> function.
Done. In the same diff you can see the new fiber pool size
behavior - it is net_msg_max * FIBER_POOL_SIZE_FACTOR (5).
diff --git a/src/box/box.cc b/src/box/box.cc
index 0d1f0e6ae..412a140ce 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -764,7 +764,8 @@ box_set_net_msg_max(void)
{
int new_iproto_msg_max = cfg_geti("net_msg_max");
iproto_set_msg_max(new_iproto_msg_max);
- fiber_pool_set_max_size(&tx_fiber_pool, new_iproto_msg_max);
+ fiber_pool_set_max_size(&tx_fiber_pool,
+ new_iproto_msg_max * FIBER_POOL_SIZE_FACTOR);
}
/* }}} configuration bindings */
@@ -1719,7 +1720,8 @@ static inline void
box_cfg_xc(void)
{
/* Join the cord interconnect as "tx" endpoint. */
- fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE_DEFAULT,
+ fiber_pool_create(&tx_fiber_pool, "tx",
+ IPROTO_MSG_MAX_DEFAULT * FIBER_POOL_SIZE_FACTOR,
FIBER_POOL_IDLE_TIMEOUT);
/* Add an extra endpoint for WAL wake up/rollback messages. */
cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index edce6169c..0c975f7b4 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -63,11 +63,6 @@
#include "applier.h"
#include "cfg.h"
-enum {
- IPROTO_MSG_MAX_DEFAULT = 768,
- IPROTO_MSG_MAX_MIN = 2,
-};
-
/**
* Network readahead. A signed integer to avoid
* automatic type coercion to an unsigned type.
@@ -1750,13 +1745,6 @@ iproto_init()
cpipe_set_max_input(&net_pipe, iproto_msg_max / 2);
}
-/** IProto configuration change result codes. */
-enum {
- IPROTO_CFG_OK,
- IPROTO_CFG_ERROR,
- IPROTO_CFG_NOT_FINISHED,
-};
-
/** Available IProto configuration changes. */
enum iproto_cfg_op {
IPROTO_CFG_BIND,
@@ -1770,7 +1758,7 @@ enum iproto_cfg_op {
* message count in fly it is needed to send a special message to
* IProto thread.
*/
-struct iproto_cfg_msg: public cmsg
+struct iproto_cfg_msg: public cbus_call_msg
{
/** Operation to execute in IProto thread. */
enum iproto_cfg_op op;
@@ -1781,29 +1769,17 @@ struct iproto_cfg_msg: public cmsg
/** New IProto max message count in fly. */
int iproto_msg_max;
};
- /**
- * Cfg result code that can be read atomically by
- * different threads - TX and IProto, because it consists
- * of single byte.
- */
- int8_t rc;
- /**
- * Diag can be read by TX thread only when rc becomes
- * not IPROTO_CFG_NOT_FINISHED.
- */
- struct diag diag;
};
static inline void
iproto_cfg_msg_create(struct iproto_cfg_msg *msg, enum iproto_cfg_op op)
{
memset(msg, 0, sizeof(*msg));
- msg->rc = IPROTO_CFG_NOT_FINISHED;
msg->op = op;
}
-static void
-iproto_do_cfg_f(struct cmsg *m)
+static int
+iproto_do_cfg_f(struct cbus_call_msg *m)
{
struct iproto_cfg_msg *cfg_msg = (struct iproto_cfg_msg *) m;
int old;
@@ -1830,29 +1806,18 @@ iproto_do_cfg_f(struct cmsg *m)
default:
unreachable();
}
- cfg_msg->rc = IPROTO_CFG_OK;
} catch (Exception *e) {
- diag_move(diag_get(), &cfg_msg->diag);
- cfg_msg->rc = IPROTO_CFG_ERROR;
+ return -1;
}
+ return 0;
}
-static inline int
+static inline void
iproto_do_cfg(struct iproto_cfg_msg *msg)
{
- const struct cmsg_hop cfg_route[] = {
- { iproto_do_cfg_f, NULL },
- };
- cmsg_init(msg, cfg_route);
- cpipe_push(&net_pipe, msg);
- while (msg->rc == IPROTO_CFG_NOT_FINISHED)
- fiber_sleep(0.001);
- if (msg->rc == IPROTO_CFG_ERROR) {
- diag_move(&msg->diag, diag_get());
- return -1;
- }
- assert(msg->rc == IPROTO_CFG_OK);
- return 0;
+ if (cbus_call(&net_pipe, &tx_pipe, msg, iproto_do_cfg_f,
+ NULL, TIMEOUT_INFINITY) != 0)
+ diag_raise();
}
void
@@ -1861,8 +1826,7 @@ iproto_bind(const char *uri)
struct iproto_cfg_msg cfg_msg;
iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_BIND);
cfg_msg.uri = uri;
- if (iproto_do_cfg(&cfg_msg) != 0)
- diag_raise();
+ iproto_do_cfg(&cfg_msg);
}
void
@@ -1870,8 +1834,7 @@ iproto_listen()
{
struct iproto_cfg_msg cfg_msg;
iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_LISTEN);
- if (iproto_do_cfg(&cfg_msg) != 0)
- diag_raise();
+ iproto_do_cfg(&cfg_msg);
}
size_t
@@ -1897,7 +1860,6 @@ iproto_set_msg_max(int new_iproto_msg_max)
struct iproto_cfg_msg cfg_msg;
iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_MSG_MAX);
cfg_msg.iproto_msg_max = new_iproto_msg_max;
- if (iproto_do_cfg(&cfg_msg) != 0)
- diag_raise();
+ iproto_do_cfg(&cfg_msg);
cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2);
}
diff --git a/src/box/iproto.h b/src/box/iproto.h
index a1dddc405..2ebe03c13 100644
--- a/src/box/iproto.h
+++ b/src/box/iproto.h
@@ -37,6 +37,11 @@
extern "C" {
#endif /* defined(__cplusplus) */
+enum {
+ IPROTO_MSG_MAX_DEFAULT = 768,
+ IPROTO_MSG_MAX_MIN = 2,
+};
+
extern unsigned iproto_readahead;
/**
diff --git a/src/fiber_pool.c b/src/fiber_pool.c
index f60b6f0ff..b93b85a7c 100644
--- a/src/fiber_pool.c
+++ b/src/fiber_pool.c
@@ -139,8 +139,6 @@ fiber_pool_cb(ev_loop *loop, struct ev_watcher *watcher, int events)
void
fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size)
{
- if (new_max_size < FIBER_POOL_SIZE_DEFAULT)
- new_max_size = FIBER_POOL_SIZE_DEFAULT;
pool->max_size = new_max_size;
}
diff --git a/src/fiber_pool.h b/src/fiber_pool.h
index 2a04d6063..5f7b345af 100644
--- a/src/fiber_pool.h
+++ b/src/fiber_pool.h
@@ -42,7 +42,7 @@ extern "C" {
#endif /* defined(__cplusplus) */
enum {
- FIBER_POOL_SIZE_DEFAULT = 4096,
+ FIBER_POOL_SIZE_FACTOR = 5,
FIBER_POOL_IDLE_TIMEOUT = 1
};
@@ -87,8 +87,7 @@ fiber_pool_create(struct fiber_pool *pool, const char *name, int max_pool_size,
float idle_timeout);
/**
- * Set maximal fiber pool size. If a new size is bigger than the
- * current, then pending requests processing is started.
+ * Set maximal fiber pool size.
* @param pool Fiber pool to set size.
* @param new_max_size New maximal size.
*/
^ permalink raw reply [flat|nested] 3+ messages in thread
end of thread, other threads:[~2018-05-08 11:07 UTC | newest]
Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
[not found] <152543542862.15530.3321867952732897038@localhost>
[not found] ` <1525434981.906931598.16256142066578016@mxpdd8.i.mail.ru>
2018-05-07 19:54 ` [tarantool-patches] Re: [commits] [tarantool] 01/02: iproto: fix error with unstoppable batching Konstantin Osipov
2018-05-08 11:07 ` Vladislav Shpilevoy
[not found] ` <1525434982.906931598.14094229686469248@mxpdd2.i.mail.ru>
[not found] ` <20180507195948.GB30274@atlas>
2018-05-08 11:07 ` [tarantool-patches] Re: [commits] [tarantool] 02/02: iproto: allow to configure IPROTO_MSG_MAX Vladislav Shpilevoy
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox