Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] Re: [commits] [tarantool] 01/02: iproto: fix error with unstoppable batching
       [not found] ` <1525434981.906931598.16256142066578016@mxpdd8.i.mail.ru>
@ 2018-05-07 19:54   ` Konstantin Osipov
  2018-05-08 11:07     ` Vladislav Shpilevoy
  0 siblings, 1 reply; 3+ messages in thread
From: Konstantin Osipov @ 2018-05-07 19:54 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/05/04 14:58]:
>     iproto: fix error with unstoppable batching
>     
>     IProto connection stops to read input on reached request limit.
>     But when multiple requests are in a batch, the IProto does not
>     check the limit, so it can be violated.
>     
>     Lets check the limit during batch parsing after each message too,
>     not only once before parsing.
> ---
>  src/box/iproto.cc             | 177 +++++++++++++++++++++++++++++-------------
>  test/box/errinj.result        |  86 ++++++++++++++++++++
>  test/box/errinj.test.lua      |  39 ++++++++++
>  test/box/net_msg_max.result   |  21 ++++-
>  test/box/net_msg_max.test.lua |  13 +++-
>  5 files changed, 279 insertions(+), 57 deletions(-)
> 
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> -/**
> - * Throttle the queue to the tx thread and ensure the fiber pool
> - * in tx thread is not depleted by a flood of incoming requests:
> - * resume a stopped connection only if there is a spare message
> - * object in the message pool.
> - */
> -static void
> -iproto_resume()
> +static struct iproto_msg *
> +iproto_msg_new(struct iproto_connection *con)
>  {

Great!

> -	/*
> -	 * Most of the time we have nothing to do here: throttling
> -	 * is not active.
> -	 */
> -	if (rlist_empty(&stopped_connections))
> -		return;
> -	if (iproto_check_msg_max())
> -		return;
> -
> -	struct iproto_connection *con;
> -	con = rlist_first_entry(&stopped_connections, struct iproto_connection,
> -				in_stop_list);
> -	ev_feed_event(con->loop, &con->input, EV_READ);
> +	struct iproto_msg *msg =
> +		(struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
> +	ERROR_INJECT(ERRINJ_TESTING, {
> +		mempool_free(&iproto_msg_pool, msg);
> +		msg = NULL;
> +	});
> +	if (msg == NULL) {
> +		diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
> +		say_warn("can not allocate new net_msg on connection %s",
> +			 sio_socketname(con->input.fd));

OutOfMemory is a logged error. Are you sure you want double
logging?

> +		return NULL;
> +	}
> +	msg->connection = con;
> +	return msg;
>  }
>  
>  /**
> @@ -434,13 +423,22 @@ iproto_connection_is_idle(struct iproto_connection *con)
>  static inline void
>  iproto_connection_stop(struct iproto_connection *con)
>  {
> -	say_warn("net_msg_max limit reached, stopping input on connection %s",
> +	say_warn("stopping input on connection %s",
>  		 sio_socketname(con->input.fd));

Please have 3 functions, but avoid double logging:
iproto_connection_stop
iproto_connectoin_stop_msg_max_limit
iproto_connection_stop_readahead_limit 

>  	assert(rlist_empty(&con->in_stop_list));
>  	ev_io_stop(con->loop, &con->input);

Please add a comment here:

/*
 * Important to add to tail and fetch from head to ensure
 * strict lifo order (fairness) for stopped connections.
 */

>  	rlist_add_tail(&stopped_connections, &con->in_stop_list);
>  }
>  
> +static inline void
> +iproto_connection_stop_by_limit(struct iproto_connection *con)
> +{
> +	say_warn("net_msg_max limit reached on connection %s",
> +		 sio_socketname(con->input.fd));
> +	assert(iproto_check_msg_max());
> +	iproto_connection_stop(con);
> +}
> +
>  /**
>   * Initiate a connection shutdown. This method may
>   * be invoked many times, and does the internal
> @@ -575,20 +573,31 @@ iproto_connection_input_buffer(struct iproto_connection *con)
>  	return new_ibuf;
>  }
>  
> -/** Enqueue all requests which were read up. */
> -static inline void
> +/**
> + * Enqueue all requests which were read up. If a request limit is
> + * reached - stop the connection input even if not the whole batch
> + * is enqueued. Else try to read more feeding read event to the
> + * event loop.
> + * @param con Connection to enqueue in.
> + * @param in Buffer to parse.
> + *
> + * @retval  0 Success.
> + * @retval -1 Invalid MessagePack error.
> + */
> +static inline int
>  iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>  {
>  	int n_requests = 0;
>  	bool stop_input = false;
> -	while (con->parse_size && stop_input == false) {
> +	while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) {
>  		const char *reqstart = in->wpos - con->parse_size;
>  		const char *pos = reqstart;
>  		/* Read request length. */
>  		if (mp_typeof(*pos) != MP_UINT) {
>  			cpipe_flush_input(&tx_pipe);
> -			tnt_raise(ClientError, ER_INVALID_MSGPACK,
> -				  "packet length");
> +			diag_set(ClientError, ER_INVALID_MSGPACK,
> +				 "packet length");
> +			return -1;
>  		}
>  		if (mp_check_uint(pos, in->wpos) >= 0)
>  			break;
> @@ -597,6 +606,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>  		if (reqend > in->wpos)
>  			break;
>  		struct iproto_msg *msg = iproto_msg_new(con);
> +		if (msg == NULL) {
> +			/*
> +			 * Do not tread it as an error - just wait
> +			 * until some of requests are finished.

tread -> treat

> +			 */
> +			iproto_connection_stop(con);
> +			return 0;
> +		}
>  		msg->p_ibuf = con->p_ibuf;
>  		msg->wpos = con->wpos;
>  
> @@ -644,9 +661,57 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>  		 * requests, keep reading input, if only to avoid
>  		 * a deadlock on this connection.
>  		 */
> -		ev_feed_event(con->loop, &con->input, EV_READ);
> +		if (iproto_check_msg_max())
> +			iproto_connection_stop_by_limit(con);
> +		else
> +			ev_feed_event(con->loop, &con->input, EV_READ);

Could you please avoid the double check for msg max here?

>  	}
>  	cpipe_flush_input(&tx_pipe);
> +	return 0;
> +}
> +
> +/**
> + * Enqueue connection's pending requests. Completely resurrect the
> + * connection, if it has no more requests, and the limit still is
> + * not reached.
> + */
> +static void
> +iproto_connection_resume(struct iproto_connection *con)
> +{
> +	assert(! iproto_check_msg_max());
> +	rlist_del(&con->in_stop_list);
> +	/*
> +	 * enqueue_batch() stops the connection again, if the
> +	 * limit is reached again.
> +	 */
> +	if (iproto_enqueue_batch(con, con->p_ibuf) != 0) {
> +		struct error *e = box_error_last();
> +		iproto_write_error(con->input.fd, e, ::schema_version, 0);
> +		error_log(e);
> +		iproto_connection_close(con);
> +	}
> +}
> +
> +/**
> + * Resume as many connections as possible until a request limit is
> + * reached. Each connection before to start accept new requests
> + * enqueues pending ones. And the input is resumed only if the
> + * limit still is not reached.
> + *
> + * This global connections resuming is needed when one connection
> + * finished a request, and another connection can get the freed
> + * message.

I can't parse this, sorry :(

How about:

Resume as many connections as possible until a request limit is
reached. By design of iproto_enqueue_batch(), a paused connection almost always has a
pending request fully read up, so resuming a connection will
immediately enqueue the request as an iproto message and exhaust
the limit. Thus we aren't really resuming all connections here:
only as many as is necessary to use up the limit.

> + */
> +static void
> +iproto_resume()
> +{
> +	while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {

  Please add a comment: 
  /* Shift from list head to ensure strict FIFO (fairness) for
   * resumed connections.
   */
> +		struct iproto_connection *con =
> +			rlist_first_entry(&stopped_connections,
> +					  struct iproto_connection,
> +					  in_stop_list);
> +		iproto_connection_resume(con);
> +	}
>  }
>  

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 3+ messages in thread

* [tarantool-patches] Re: [commits] [tarantool] 01/02: iproto: fix error with unstoppable batching
  2018-05-07 19:54   ` [tarantool-patches] Re: [commits] [tarantool] 01/02: iproto: fix error with unstoppable batching Konstantin Osipov
@ 2018-05-08 11:07     ` Vladislav Shpilevoy
  0 siblings, 0 replies; 3+ messages in thread
From: Vladislav Shpilevoy @ 2018-05-08 11:07 UTC (permalink / raw)
  To: Konstantin Osipov, tarantool-patches

Hello. Thanks for review.

>> +	if (msg == NULL) {
>> +		diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
>> +		say_warn("can not allocate new net_msg on connection %s",
>> +			 sio_socketname(con->input.fd));
> 
> OutOfMemory is a logged error. Are you sure you want double
> logging?

No, it is not logged error.

class OutOfMemory: public SystemError ->
-> class SystemError: public Exception ->
-> class Exception: public error ->
-> struct error.

> 
>> +		return NULL;
>> +	}
>> +	msg->connection = con;
>> +	return msg;
>>   }
>>   
>>   /**
>> @@ -434,13 +423,22 @@ iproto_connection_is_idle(struct iproto_connection *con)
>>   static inline void
>>   iproto_connection_stop(struct iproto_connection *con)
>>   {
>> -	say_warn("net_msg_max limit reached, stopping input on connection %s",
>> +	say_warn("stopping input on connection %s",
>>   		 sio_socketname(con->input.fd));
> 
> Please have 3 functions, but avoid double logging:
> iproto_connection_stop
> iproto_connectoin_stop_msg_max_limit
> iproto_connection_stop_readahead_limit

Done.

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 3212d9697..0a5eb1f86 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -426,19 +426,33 @@ iproto_connection_is_idle(struct iproto_connection *con)
  }
  
  static inline void
-iproto_connection_stop(struct iproto_connection *con)
+iproto_connection_stop_impl(struct iproto_connection *con)
  {
  	say_warn("stopping input on connection %s",
  		 sio_socketname(con->input.fd));
  	assert(rlist_empty(&con->in_stop_list));
  	ev_io_stop(con->loop, &con->input);
+}
+
+static inline void
+iproto_connection_stop(struct iproto_connection *con)
+{
+	iproto_connection_stop_impl(con);
  	rlist_add_tail(&stopped_connections, &con->in_stop_list);
  }
  
  static inline void
-iproto_connection_stop_by_limit(struct iproto_connection *con)
+iproto_connection_stop_readaheadlimit(struct iproto_connection *con)
+{
+	say_warn("readahead limit is reached on connection %s",
+		 sio_socketname(con->input.fd));
+	iproto_connection_stop_impl(con);
+}
+
+static inline void
+iproto_connection_stop_msg_max_limit(struct iproto_connection *con)
  {
-	say_warn("net_msg_max limit reached on connection %s",
+	say_warn("net_msg_max limit is reached on connection %s",
  		 sio_socketname(con->input.fd));
  	assert(iproto_check_msg_max());
  	iproto_connection_stop(con);
@@ -667,7 +681,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
  		 * a deadlock on this connection.
  		 */
  		if (iproto_check_msg_max())
-			iproto_connection_stop_by_limit(con);
+			iproto_connection_stop_msg_max_limit(con);
  		else
  			ev_feed_event(con->loop, &con->input, EV_READ);
  	}
@@ -728,13 +742,14 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
  	int fd = con->input.fd;
  	assert(fd >= 0);
  	assert(rlist_empty(&con->in_stop_list));
+	assert(loop == con->loop);
  	/*
  	 * Throttle if there are too many pending requests,
  	 * otherwise we might deplete the fiber pool in tx
  	 * thread and deadlock.
  	 */
  	if (iproto_check_msg_max()) {
-		iproto_connection_stop_by_limit(con);
+		iproto_connection_stop_msg_max_limit(con);
  		return;
  	}
  
@@ -742,9 +757,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
  		/* Ensure we have sufficient space for the next round.  */
  		struct ibuf *in = iproto_connection_input_buffer(con);
  		if (in == NULL) {
-			say_warn("readahead limit reached, stopping input on connection %s",
-				 sio_socketname(con->input.fd));
-			ev_io_stop(loop, &con->input);
+			iproto_connection_stop_readaheadlimit(con);
  			return;
  		}
  		/* Read input. */

> 
>>   	assert(rlist_empty(&con->in_stop_list));
>>   	ev_io_stop(con->loop, &con->input);
> 
> Please add a comment here:
> 
> /*
>   * Important to add to tail and fetch from head to ensure
>   * strict lifo order (fairness) for stopped connections.
>   */

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 0a5eb1f86..251d9fa0d 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -438,6 +438,10 @@ static inline void
  iproto_connection_stop(struct iproto_connection *con)
  {
  	iproto_connection_stop_impl(con);
+	/*
+	 * Important to add to tail and fetch from head to ensure
+	 * strict lifo order (fairness) for stopped connections.
+	 */
  	rlist_add_tail(&stopped_connections, &con->in_stop_list);
  }


>> @@ -597,6 +606,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>>   		if (reqend > in->wpos)
>>   			break;
>>   		struct iproto_msg *msg = iproto_msg_new(con);
>> +		if (msg == NULL) {
>> +			/*
>> +			 * Do not tread it as an error - just wait
>> +			 * until some of requests are finished.
> 
> tread -> treat

@@ -627,7 +631,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
  		struct iproto_msg *msg = iproto_msg_new(con);
  		if (msg == NULL) {
  			/*
-			 * Do not tread it as an error - just wait
+			 * Do not treat it as an error - just wait
  			 * until some of requests are finished.
  			 */
  			iproto_connection_stop(con);


>> @@ -644,9 +661,57 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>>   		 * requests, keep reading input, if only to avoid
>>   		 * a deadlock on this connection.
>>   		 */
>> -		ev_feed_event(con->loop, &con->input, EV_READ);
>> +		if (iproto_check_msg_max())
>> +			iproto_connection_stop_by_limit(con);
>> +		else
>> +			ev_feed_event(con->loop, &con->input, EV_READ);
> 
> Could you please avoid the double check for msg max here?

There was no double check, as I said before. This check is out of cycle. But ok,
I move this into.

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 9618a04b0..605d38798 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -610,9 +610,15 @@ iproto_connection_input_buffer(struct iproto_connection *con)
  static inline int
  iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
  {
+	assert(rlist_empty(&con->in_stop_list));
  	int n_requests = 0;
  	bool stop_input = false;
-	while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) {
+	while (con->parse_size != 0 && !stop_input) {
+		if (iproto_check_msg_max()) {
+			iproto_connection_stop_msg_max_limit(con);
+			cpipe_flush_input(&tx_pipe);
+			return 0;
+		}
  		const char *reqstart = in->wpos - con->parse_size;
  		const char *pos = reqstart;
  		/* Read request length. */
@@ -666,7 +672,6 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
  		ev_io_stop(con->loop, &con->output);
  		ev_io_stop(con->loop, &con->input);
  	} else if (n_requests != 1 || con->parse_size != 0) {
-		assert(rlist_empty(&con->in_stop_list));
  		/*
  		 * Keep reading input, as long as the socket
  		 * supplies data, but don't waste CPU on an extra
@@ -684,10 +689,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
  		 * requests, keep reading input, if only to avoid
  		 * a deadlock on this connection.
  		 */
-		if (iproto_check_msg_max())
-			iproto_connection_stop_msg_max_limit(con);
-		else
-			ev_feed_event(con->loop, &con->input, EV_READ);
+		ev_feed_event(con->loop, &con->input, EV_READ);
  	}
  	cpipe_flush_input(&tx_pipe);
  	return 0;

>> +/**
>> + * Resume as many connections as possible until a request limit is
>> + * reached. Each connection before to start accept new requests
>> + * enqueues pending ones. And the input is resumed only if the
>> + * limit still is not reached.
>> + *
>> + * This global connections resuming is needed when one connection
>> + * finished a request, and another connection can get the freed
>> + * message.
> 
> I can't parse this, sorry :(
> 
> How about:
> 
> Resume as many connections as possible until a request limit is
> reached. By design of iproto_enqueue_batch(), a paused connection almost always has a
> pending request fully read up, so resuming a connection will
> immediately enqueue the request as an iproto message and exhaust
> the limit. Thus we aren't really resuming all connections here:
> only as many as is necessary to use up the limit.

Ok.

@@ -717,13 +719,12 @@ iproto_connection_resume(struct iproto_connection *con)
  
  /**
   * Resume as many connections as possible until a request limit is
- * reached. Each connection before to start accept new requests
- * enqueues pending ones. And the input is resumed only if the
- * limit still is not reached.
- *
- * This global connections resuming is needed when one connection
- * finished a request, and another connection can get the freed
- * message.
+ * reached. By design of iproto_enqueue_batch(), a paused
+ * connection almost always has a pending request fully read up,
+ * so resuming a connection will immediately enqueue the request
+ * as an iproto message and exhaust the limit. Thus we aren't
+ * really resuming all connections here: only as many as is
+ * necessary to use up the limit.
   */
  static void

> 
>> + */
>> +static void
>> +iproto_resume()
>> +{
>> +	while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
> 
>    Please add a comment:
>    /* Shift from list head to ensure strict FIFO (fairness) for
>     * resumed connections.
>     */

Ok.

@@ -730,6 +730,10 @@ static void
  iproto_resume()
  {
         while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
+               /*
+                * Shift from list head to ensure strict FIFO
+                * (fairness) for resumed connections.
+                */
                 struct iproto_connection *con =
                         rlist_first_entry(&stopped_connections,
                                           struct iproto_connection  

^ permalink raw reply	[flat|nested] 3+ messages in thread

* [tarantool-patches] Re: [commits] [tarantool] 02/02: iproto: allow to configure IPROTO_MSG_MAX
       [not found]   ` <20180507195948.GB30274@atlas>
@ 2018-05-08 11:07     ` Vladislav Shpilevoy
  0 siblings, 0 replies; 3+ messages in thread
From: Vladislav Shpilevoy @ 2018-05-08 11:07 UTC (permalink / raw)
  To: Konstantin Osipov, tarantool-patches


>> -static int
>> -iproto_do_listen(struct cbus_call_msg *m)
>> +static inline int
>> +iproto_do_cfg(struct iproto_cfg_msg *msg)
>>   {
>> -	(void) m;
>> -	try {
>> -		if (evio_service_is_active(&binary))
>> -			evio_service_listen(&binary);
>> -	} catch (Exception *e) {
>> +	const struct cmsg_hop cfg_route[] = {
>> +		{ iproto_do_cfg_f, NULL },
>> +	};
>> +	cmsg_init(msg, cfg_route);
>> +	cpipe_push(&net_pipe, msg);
>> +	while (msg->rc == IPROTO_CFG_NOT_FINISHED)
>> +		fiber_sleep(0.001);
> 
> Holy cow.
> 
> Oh now I understand the comment. Please rewrite this, why
> would you ever need to write such a thing...
> Simply route the message back and wakeup a cond in the return
> function.

Done. In the same diff you can see the new fiber pool size
behavior - it is net_msg_max * FIBER_POOL_SIZE_FACTOR (5).

diff --git a/src/box/box.cc b/src/box/box.cc
index 0d1f0e6ae..412a140ce 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -764,7 +764,8 @@ box_set_net_msg_max(void)
  {
  	int new_iproto_msg_max = cfg_geti("net_msg_max");
  	iproto_set_msg_max(new_iproto_msg_max);
-	fiber_pool_set_max_size(&tx_fiber_pool, new_iproto_msg_max);
+	fiber_pool_set_max_size(&tx_fiber_pool,
+				new_iproto_msg_max * FIBER_POOL_SIZE_FACTOR);
  }
  
  /* }}} configuration bindings */
@@ -1719,7 +1720,8 @@ static inline void
  box_cfg_xc(void)
  {
  	/* Join the cord interconnect as "tx" endpoint. */
-	fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE_DEFAULT,
+	fiber_pool_create(&tx_fiber_pool, "tx",
+			  IPROTO_MSG_MAX_DEFAULT * FIBER_POOL_SIZE_FACTOR,
  			  FIBER_POOL_IDLE_TIMEOUT);
  	/* Add an extra endpoint for WAL wake up/rollback messages. */
  	cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index edce6169c..0c975f7b4 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -63,11 +63,6 @@
  #include "applier.h"
  #include "cfg.h"
  
-enum {
-	IPROTO_MSG_MAX_DEFAULT = 768,
-	IPROTO_MSG_MAX_MIN = 2,
-};
-
  /**
   * Network readahead. A signed integer to avoid
   * automatic type coercion to an unsigned type.
@@ -1750,13 +1745,6 @@ iproto_init()
  	cpipe_set_max_input(&net_pipe, iproto_msg_max / 2);
  }
  
-/** IProto configuration change result codes. */
-enum {
-	IPROTO_CFG_OK,
-	IPROTO_CFG_ERROR,
-	IPROTO_CFG_NOT_FINISHED,
-};
-
  /** Available IProto configuration changes. */
  enum iproto_cfg_op {
  	IPROTO_CFG_BIND,
@@ -1770,7 +1758,7 @@ enum iproto_cfg_op {
   * message count in fly it is needed to send a special message to
   * IProto thread.
   */
-struct iproto_cfg_msg: public cmsg
+struct iproto_cfg_msg: public cbus_call_msg
  {
  	/** Operation to execute in IProto thread. */
  	enum iproto_cfg_op op;
@@ -1781,29 +1769,17 @@ struct iproto_cfg_msg: public cmsg
  		/** New IProto max message count in fly. */
  		int iproto_msg_max;
  	};
-	/**
-	 * Cfg result code that can be read atomically by
-	 * different threads - TX and IProto, because it consists
-	 * of single byte.
-	 */
-	int8_t rc;
-	/**
-	 * Diag can be read by TX thread only when rc becomes
-	 * not IPROTO_CFG_NOT_FINISHED.
-	 */
-	struct diag diag;
  };
  
  static inline void
  iproto_cfg_msg_create(struct iproto_cfg_msg *msg, enum iproto_cfg_op op)
  {
  	memset(msg, 0, sizeof(*msg));
-	msg->rc = IPROTO_CFG_NOT_FINISHED;
  	msg->op = op;
  }
  
-static void
-iproto_do_cfg_f(struct cmsg *m)
+static int
+iproto_do_cfg_f(struct cbus_call_msg *m)
  {
  	struct iproto_cfg_msg *cfg_msg = (struct iproto_cfg_msg *) m;
  	int old;
@@ -1830,29 +1806,18 @@ iproto_do_cfg_f(struct cmsg *m)
  		default:
  			unreachable();
  		}
-		cfg_msg->rc = IPROTO_CFG_OK;
  	} catch (Exception *e) {
-		diag_move(diag_get(), &cfg_msg->diag);
-		cfg_msg->rc = IPROTO_CFG_ERROR;
+		return -1;
  	}
+	return 0;
  }
  
-static inline int
+static inline void
  iproto_do_cfg(struct iproto_cfg_msg *msg)
  {
-	const struct cmsg_hop cfg_route[] = {
-		{ iproto_do_cfg_f, NULL },
-	};
-	cmsg_init(msg, cfg_route);
-	cpipe_push(&net_pipe, msg);
-	while (msg->rc == IPROTO_CFG_NOT_FINISHED)
-		fiber_sleep(0.001);
-	if (msg->rc == IPROTO_CFG_ERROR) {
-		diag_move(&msg->diag, diag_get());
-		return -1;
-	}
-	assert(msg->rc == IPROTO_CFG_OK);
-	return 0;
+	if (cbus_call(&net_pipe, &tx_pipe, msg, iproto_do_cfg_f,
+		      NULL, TIMEOUT_INFINITY) != 0)
+		diag_raise();
  }
  
  void
@@ -1861,8 +1826,7 @@ iproto_bind(const char *uri)
  	struct iproto_cfg_msg cfg_msg;
  	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_BIND);
  	cfg_msg.uri = uri;
-	if (iproto_do_cfg(&cfg_msg) != 0)
-		diag_raise();
+	iproto_do_cfg(&cfg_msg);
  }
  
  void
@@ -1870,8 +1834,7 @@ iproto_listen()
  {
  	struct iproto_cfg_msg cfg_msg;
  	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_LISTEN);
-	if (iproto_do_cfg(&cfg_msg) != 0)
-		diag_raise();
+	iproto_do_cfg(&cfg_msg);
  }
  
  size_t
@@ -1897,7 +1860,6 @@ iproto_set_msg_max(int new_iproto_msg_max)
  	struct iproto_cfg_msg cfg_msg;
  	iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_MSG_MAX);
  	cfg_msg.iproto_msg_max = new_iproto_msg_max;
-	if (iproto_do_cfg(&cfg_msg) != 0)
-		diag_raise();
+	iproto_do_cfg(&cfg_msg);
  	cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2);
  }
diff --git a/src/box/iproto.h b/src/box/iproto.h
index a1dddc405..2ebe03c13 100644
--- a/src/box/iproto.h
+++ b/src/box/iproto.h
@@ -37,6 +37,11 @@
  extern "C" {
  #endif /* defined(__cplusplus) */
  
+enum {
+	IPROTO_MSG_MAX_DEFAULT = 768,
+	IPROTO_MSG_MAX_MIN = 2,
+};
+
  extern unsigned iproto_readahead;
  
  /**
diff --git a/src/fiber_pool.c b/src/fiber_pool.c
index f60b6f0ff..b93b85a7c 100644
--- a/src/fiber_pool.c
+++ b/src/fiber_pool.c
@@ -139,8 +139,6 @@ fiber_pool_cb(ev_loop *loop, struct ev_watcher *watcher, int events)
  void
  fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size)
  {
-	if (new_max_size < FIBER_POOL_SIZE_DEFAULT)
-		new_max_size = FIBER_POOL_SIZE_DEFAULT;
  	pool->max_size = new_max_size;
  }
  
diff --git a/src/fiber_pool.h b/src/fiber_pool.h
index 2a04d6063..5f7b345af 100644
--- a/src/fiber_pool.h
+++ b/src/fiber_pool.h
@@ -42,7 +42,7 @@ extern "C" {
  #endif /* defined(__cplusplus) */
  
  enum {
-	FIBER_POOL_SIZE_DEFAULT = 4096,
+	FIBER_POOL_SIZE_FACTOR = 5,
  	FIBER_POOL_IDLE_TIMEOUT = 1
  };
  
@@ -87,8 +87,7 @@ fiber_pool_create(struct fiber_pool *pool, const char *name, int max_pool_size,
  		  float idle_timeout);
  
  /**
- * Set maximal fiber pool size. If a new size is bigger than the
- * current, then pending requests processing is started.
+ * Set maximal fiber pool size.
   * @param pool Fiber pool to set size.
   * @param new_max_size New maximal size.
   */

^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2018-05-08 11:07 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
     [not found] <152543542862.15530.3321867952732897038@localhost>
     [not found] ` <1525434981.906931598.16256142066578016@mxpdd8.i.mail.ru>
2018-05-07 19:54   ` [tarantool-patches] Re: [commits] [tarantool] 01/02: iproto: fix error with unstoppable batching Konstantin Osipov
2018-05-08 11:07     ` Vladislav Shpilevoy
     [not found] ` <1525434982.906931598.14094229686469248@mxpdd2.i.mail.ru>
     [not found]   ` <20180507195948.GB30274@atlas>
2018-05-08 11:07     ` [tarantool-patches] Re: [commits] [tarantool] 02/02: iproto: allow to configure IPROTO_MSG_MAX Vladislav Shpilevoy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox