[PATCH 3/3] iproto: allow to configure IPROTO_MSG_MAX

Vladimir Davydov vdavydov.dev at gmail.com
Mon Apr 23 14:34:56 MSK 2018


On Sat, Apr 21, 2018 at 01:52:11AM +0300, Vladislav Shpilevoy wrote:
> IPROTO_MSG_MAX is a constant that restricts count of requests in
> fly. It allows to do not produce too many fibers in TX thread,
> that would lead to too big overhead on fibers switching, their
> stack storing.
> 
> But some users have powerful metal on which Tarantool
> IPROTO_MSG_MAX constant is not serious. The patch exposes it as
> a configuration runtime parameter.
> 
> 'iproto_msg_max' is its name. If a user sees that IProto thread
> is stuck due to too many requests, it can change iproto_msg_max
> in runtime, and IProto thread immediately starts processing
> pending requests.
> 
> 'iproto_msg_max' can be decreased, but obviously it can not stop
> already runned requests, so if now in IProto thread request count
> is > new 'iproto_msg_max' value, then it takes some time until
> some requests will be finished.
> 
> Closes #3320
> ---
>  src/box/box.cc                  |   7 +++
>  src/box/box.h                   |   1 +
>  src/box/iproto.cc               |  70 +++++++++++++++++------
>  src/box/iproto.h                |   3 +
>  src/box/lua/cfg.cc              |  12 ++++
>  src/box/lua/load_cfg.lua        |   3 +
>  test/app-tap/init_script.result |  73 ++++++++++++------------
>  test/box/admin.result           |   2 +
>  test/box/cfg.result             |  24 ++++++++
>  test/box/cfg.test.lua           |   9 +++
>  test/box/request_limit.result   | 119 +++++++++++++++++++++++++++++++++++++++-
>  test/box/request_limit.test.lua |  55 ++++++++++++++++++-
>  12 files changed, 322 insertions(+), 56 deletions(-)
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index d2dfc5b5f..80684ad48 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -759,6 +759,12 @@ box_set_vinyl_timeout(void)
>  	vinyl_engine_set_timeout(vinyl,	cfg_getd("vinyl_timeout"));
>  }
>  
> +void
> +box_set_iproto_msg_max(void)
> +{
> +	iproto_set_msg_max(cfg_geti("iproto_msg_max"));
> +}
> +
>  /* }}} configuration bindings */
>  
>  /**
> @@ -1735,6 +1741,7 @@ box_cfg_xc(void)
>  	box_check_instance_uuid(&instance_uuid);
>  	box_check_replicaset_uuid(&replicaset_uuid);
>  
> +	box_set_iproto_msg_max();
>  	box_set_checkpoint_count();
>  	box_set_too_long_threshold();
>  	box_set_replication_timeout();
> diff --git a/src/box/box.h b/src/box/box.h
> index c9b5aad01..712e21191 100644
> --- a/src/box/box.h
> +++ b/src/box/box.h
> @@ -181,6 +181,7 @@ void box_set_vinyl_cache(void);
>  void box_set_vinyl_timeout(void);
>  void box_set_replication_timeout(void);
>  void box_set_replication_connect_quorum(void);
> +void box_set_iproto_msg_max(void);
>  
>  extern "C" {
>  #endif /* defined(__cplusplus) */
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index 585e8cc83..1266eb255 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -63,8 +63,7 @@
>  #include "applier.h"
>  #include "cfg.h"
>  
> -/* The number of iproto messages in flight */
> -enum { IPROTO_MSG_MAX = 768 };
> +enum { IPROTO_MSG_MAX_MIN = 768 };

Why do you forbid to set iproto_msg_max to 1 for instance? Why do we
have to allow at least 768 messages in flight? If there's no specific
reason, I'd prefer to remove this artificial lower bound.

>  
>  /**
>   * Network readahead. A signed integer to avoid
> @@ -83,6 +82,9 @@ enum { IPROTO_MSG_MAX = 768 };
>   */
>  unsigned iproto_readahead = 16320;
>  
> +/* The maximal number of iproto messages in flight. */
> +static int iproto_msg_max = IPROTO_MSG_MAX_MIN;
> +
>  /**
>   * How big is a buffer which needs to be shrunk before
>   * it is put back into buffer cache.
> @@ -381,7 +383,7 @@ iproto_must_stop_input()
>  {
>  	size_t connection_count = mempool_count(&iproto_connection_pool);
>  	size_t request_count = mempool_count(&iproto_msg_pool);
> -	return request_count > connection_count + IPROTO_MSG_MAX;
> +	return request_count > connection_count + iproto_msg_max;
>  }
>  
>  /**
> @@ -1623,7 +1625,7 @@ net_cord_f(va_list /* ap */)
>  	cbus_endpoint_create(&endpoint, "net", fiber_schedule_cb, fiber());
>  	/* Create a pipe to "tx" thread. */
>  	cpipe_create(&tx_pipe, "tx");
> -	cpipe_set_max_input(&tx_pipe, IPROTO_MSG_MAX/2);
> +	cpipe_set_max_input(&tx_pipe, iproto_msg_max / 2);
>  	/* Process incomming messages. */
>  	cbus_loop(&endpoint);
>  
> @@ -1651,29 +1653,47 @@ iproto_init()
>  
>  	/* Create a pipe to "net" thread. */
>  	cpipe_create(&net_pipe, "net");
> -	cpipe_set_max_input(&net_pipe, IPROTO_MSG_MAX/2);
> +	cpipe_set_max_input(&net_pipe, iproto_msg_max / 2);
>  }
>  
>  /**
>   * Since there is no way to "synchronously" change the
> - * state of the io thread, to change the listen port
> - * we need to bounce a couple of messages to and
> - * from this thread.
> + * state of the io thread, to change the listen port or max
> + * message count in fly it is needed to bounce a couple of
> + * messages to and from this thread.
>   */
>  struct iproto_cfg_msg: public cbus_call_msg
>  {
> +	/** New URI to bind to. */
>  	const char *uri;
> +	bool need_update_uri;
> +
> +	/** New IProto max message count in fly. */
> +	int iproto_msg_max;
> +	bool need_update_msg_max;
>  };
>  
> +static struct iproto_cfg_msg cfg_msg;
> +
>  static int
>  iproto_do_cfg(struct cbus_call_msg *m)
>  {
> -	const char *uri  = ((struct iproto_cfg_msg *) m)->uri;
> +	assert(m == &cfg_msg);
>  	try {
> -		if (evio_service_is_active(&binary))
> -			evio_service_stop(&binary);
> -		if (uri != NULL)
> -			evio_service_bind(&binary, uri);
> +		if (cfg_msg.need_update_uri) {
> +			if (evio_service_is_active(&binary))
> +				evio_service_stop(&binary);
> +			if (cfg_msg.uri != NULL)
> +				evio_service_bind(&binary, cfg_msg.uri);
> +		}
> +		if (cfg_msg.need_update_msg_max) {
> +			cpipe_set_max_input(&tx_pipe,
> +					    cfg_msg.iproto_msg_max / 2);
> +			int old = iproto_msg_max;
> +			iproto_msg_max = cfg_msg.iproto_msg_max;
> +			if (old < iproto_msg_max)
> +				iproto_resume();
> +		}

This is a matter of personal taste, but I'd prefer to not introduce
these extra flags, i.e.

	if (cfg_msg.uri != NULL)
		/* set uri */

	if (cfg_msg.iproto_msg_max > 0)
		/* update iproto_msg max */

>  	} catch (Exception *e) {
>  		return -1;
>  	}
> @@ -1696,9 +1716,10 @@ iproto_do_listen(struct cbus_call_msg *m)
>  void
>  iproto_bind(const char *uri)
>  {
> -	static struct iproto_cfg_msg m;
> -	m.uri = uri;
> -	if (cbus_call(&net_pipe, &tx_pipe, &m, iproto_do_cfg,
> +	memset(&cfg_msg, 0, sizeof(cfg_msg));
> +	cfg_msg.need_update_uri = true;
> +	cfg_msg.uri = uri;
> +	if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg,
>  		      NULL, TIMEOUT_INFINITY))
>  		diag_raise();
>  }
> @@ -1724,3 +1745,20 @@ iproto_reset_stat(void)
>  {
>  	rmean_cleanup(rmean_net);
>  }
> +
> +void
> +iproto_set_msg_max(int new_iproto_msg_max)
> +{
> +	if (new_iproto_msg_max < IPROTO_MSG_MAX_MIN) {
> +		tnt_raise(ClientError, ER_CFG, "iproto_msg_max",
> +			  tt_sprintf("minimal value is %d",
> +				     IPROTO_MSG_MAX_MIN));
> +	}
> +	memset(&cfg_msg, 0, sizeof(cfg_msg));
> +	cfg_msg.need_update_msg_max = true;
> +	cfg_msg.iproto_msg_max = new_iproto_msg_max;
> +	if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg,
> +		      NULL, TIMEOUT_INFINITY))
> +		diag_raise();
> +	cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2);
> +}

AFAIR IPROTO_MSG_MAX is related to FIBER_POOL_SIZE so if we increase the
former, we should increase the latter as well, no?

> diff --git a/test/box/request_limit.test.lua b/test/box/request_limit.test.lua
> index 81298d91d..f002cea04 100644
> --- a/test/box/request_limit.test.lua
> +++ b/test/box/request_limit.test.lua
> @@ -8,7 +8,7 @@ conn = net_box.connect(box.cfg.listen)
>  conn2 = net_box.connect(box.cfg.listen)
>  active = 0
>  continue = false
> -limit = 768
> +limit = box.cfg.iproto_msg_max
>  run_max = (limit - 100) / 2
>  
>  old_readahead = box.cfg.readahead
> @@ -56,8 +56,59 @@ active == run_max * 2 or active
>  continue = true
>  while active ~= 0 do fiber.sleep(0.01) end
>  
> +--
> +-- gh-3320: allow to change maximal count of messages.
> +--
> +
> +--
> +-- Increate maximal message count when nothing is blocked.
> +--
> +box.cfg{iproto_msg_max = limit * 2}
> +run_max = limit * 2 - 100
> +run_workers(conn)
> +wait_block()
> +active == run_max
> +-- Max can be decreased back even if now the limit is violated.
> +-- But a new input is blocked in such a case.
> +box.cfg{iproto_msg_max = limit}
> +old_active = active
> +for i = 1, 300 do fiber.create(do_long, conn) end
> +-- Afer time active cout is not changed - the input is blocked.
> +wait_block()
> +active == old_active
> +continue = true
> +while active ~= 0 do fiber.sleep(0.01) end
> +
> +--
> +-- Check that changing iproto_msg_max can resume stopped
> +-- connections.
> +--
> +run_max = limit / 2 + 100
> +run_workers(conn)
> +run_workers(conn2)
> +wait_block()
> +active >= limit
> +active < run_max * 2
> +box.cfg{iproto_msg_max = limit * 2}
> +wait_block()
> +active == run_max * 2
> +continue = true
> +while active ~= 0 do fiber.sleep(0.01) end
> +
> +--
> +-- Test TX fiber pool size limit.
> +--
> +run_max = 2500
> +box.cfg{iproto_msg_max = 5000}
> +run_workers(conn)
> +run_workers(conn2)
> +wait_block()
> +active
> +continue = true
> +while active ~= 0 do fiber.sleep(0.01) end
> +
>  conn2:close()
>  conn:close()
>  
>  box.schema.user.revoke('guest', 'read,write,execute', 'universe')
> -box.cfg{readahead = old_readahead}
> +box.cfg{readahead = old_readahead, iproto_msg_max = limit}

If I run the test several times in a row, it fails. Please fix.

================================================================================
TEST                                            PARAMS          RESULT
---------------------------------------------------------------------------
box/request_limit.test.lua                                      [ pass ]
box/request_limit.test.lua                                      [ fail ]

Test failed! Result content mismatch:
--- box/request_limit.result    Mon Apr 23 14:19:48 2018
+++ box/request_limit.reject    Mon Apr 23 14:32:52 2018
@@ -179,7 +179,7 @@
 ...
 active == run_max * 2
 ---
-- true
+- false
 ...
 continue = true
 ---



More information about the Tarantool-patches mailing list