[PATCH 3/3] iproto: allow to configure IPROTO_MSG_MAX
Vladimir Davydov
vdavydov.dev at gmail.com
Mon Apr 23 14:34:56 MSK 2018
On Sat, Apr 21, 2018 at 01:52:11AM +0300, Vladislav Shpilevoy wrote:
> IPROTO_MSG_MAX is a constant that restricts count of requests in
> fly. It allows to do not produce too many fibers in TX thread,
> that would lead to too big overhead on fibers switching, their
> stack storing.
>
> But some users have powerful metal on which Tarantool
> IPROTO_MSG_MAX constant is not serious. The patch exposes it as
> a configuration runtime parameter.
>
> 'iproto_msg_max' is its name. If a user sees that IProto thread
> is stuck due to too many requests, it can change iproto_msg_max
> in runtime, and IProto thread immediately starts processing
> pending requests.
>
> 'iproto_msg_max' can be decreased, but obviously it can not stop
> already runned requests, so if now in IProto thread request count
> is > new 'iproto_msg_max' value, then it takes some time until
> some requests will be finished.
>
> Closes #3320
> ---
> src/box/box.cc | 7 +++
> src/box/box.h | 1 +
> src/box/iproto.cc | 70 +++++++++++++++++------
> src/box/iproto.h | 3 +
> src/box/lua/cfg.cc | 12 ++++
> src/box/lua/load_cfg.lua | 3 +
> test/app-tap/init_script.result | 73 ++++++++++++------------
> test/box/admin.result | 2 +
> test/box/cfg.result | 24 ++++++++
> test/box/cfg.test.lua | 9 +++
> test/box/request_limit.result | 119 +++++++++++++++++++++++++++++++++++++++-
> test/box/request_limit.test.lua | 55 ++++++++++++++++++-
> 12 files changed, 322 insertions(+), 56 deletions(-)
>
> diff --git a/src/box/box.cc b/src/box/box.cc
> index d2dfc5b5f..80684ad48 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -759,6 +759,12 @@ box_set_vinyl_timeout(void)
> vinyl_engine_set_timeout(vinyl, cfg_getd("vinyl_timeout"));
> }
>
> +void
> +box_set_iproto_msg_max(void)
> +{
> + iproto_set_msg_max(cfg_geti("iproto_msg_max"));
> +}
> +
> /* }}} configuration bindings */
>
> /**
> @@ -1735,6 +1741,7 @@ box_cfg_xc(void)
> box_check_instance_uuid(&instance_uuid);
> box_check_replicaset_uuid(&replicaset_uuid);
>
> + box_set_iproto_msg_max();
> box_set_checkpoint_count();
> box_set_too_long_threshold();
> box_set_replication_timeout();
> diff --git a/src/box/box.h b/src/box/box.h
> index c9b5aad01..712e21191 100644
> --- a/src/box/box.h
> +++ b/src/box/box.h
> @@ -181,6 +181,7 @@ void box_set_vinyl_cache(void);
> void box_set_vinyl_timeout(void);
> void box_set_replication_timeout(void);
> void box_set_replication_connect_quorum(void);
> +void box_set_iproto_msg_max(void);
>
> extern "C" {
> #endif /* defined(__cplusplus) */
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index 585e8cc83..1266eb255 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -63,8 +63,7 @@
> #include "applier.h"
> #include "cfg.h"
>
> -/* The number of iproto messages in flight */
> -enum { IPROTO_MSG_MAX = 768 };
> +enum { IPROTO_MSG_MAX_MIN = 768 };
Why do you forbid to set iproto_msg_max to 1 for instance? Why do we
have to allow at least 768 messages in flight? If there's no specific
reason, I'd prefer to remove this artificial lower bound.
>
> /**
> * Network readahead. A signed integer to avoid
> @@ -83,6 +82,9 @@ enum { IPROTO_MSG_MAX = 768 };
> */
> unsigned iproto_readahead = 16320;
>
> +/* The maximal number of iproto messages in flight. */
> +static int iproto_msg_max = IPROTO_MSG_MAX_MIN;
> +
> /**
> * How big is a buffer which needs to be shrunk before
> * it is put back into buffer cache.
> @@ -381,7 +383,7 @@ iproto_must_stop_input()
> {
> size_t connection_count = mempool_count(&iproto_connection_pool);
> size_t request_count = mempool_count(&iproto_msg_pool);
> - return request_count > connection_count + IPROTO_MSG_MAX;
> + return request_count > connection_count + iproto_msg_max;
> }
>
> /**
> @@ -1623,7 +1625,7 @@ net_cord_f(va_list /* ap */)
> cbus_endpoint_create(&endpoint, "net", fiber_schedule_cb, fiber());
> /* Create a pipe to "tx" thread. */
> cpipe_create(&tx_pipe, "tx");
> - cpipe_set_max_input(&tx_pipe, IPROTO_MSG_MAX/2);
> + cpipe_set_max_input(&tx_pipe, iproto_msg_max / 2);
> /* Process incomming messages. */
> cbus_loop(&endpoint);
>
> @@ -1651,29 +1653,47 @@ iproto_init()
>
> /* Create a pipe to "net" thread. */
> cpipe_create(&net_pipe, "net");
> - cpipe_set_max_input(&net_pipe, IPROTO_MSG_MAX/2);
> + cpipe_set_max_input(&net_pipe, iproto_msg_max / 2);
> }
>
> /**
> * Since there is no way to "synchronously" change the
> - * state of the io thread, to change the listen port
> - * we need to bounce a couple of messages to and
> - * from this thread.
> + * state of the io thread, to change the listen port or max
> + * message count in fly it is needed to bounce a couple of
> + * messages to and from this thread.
> */
> struct iproto_cfg_msg: public cbus_call_msg
> {
> + /** New URI to bind to. */
> const char *uri;
> + bool need_update_uri;
> +
> + /** New IProto max message count in fly. */
> + int iproto_msg_max;
> + bool need_update_msg_max;
> };
>
> +static struct iproto_cfg_msg cfg_msg;
> +
> static int
> iproto_do_cfg(struct cbus_call_msg *m)
> {
> - const char *uri = ((struct iproto_cfg_msg *) m)->uri;
> + assert(m == &cfg_msg);
> try {
> - if (evio_service_is_active(&binary))
> - evio_service_stop(&binary);
> - if (uri != NULL)
> - evio_service_bind(&binary, uri);
> + if (cfg_msg.need_update_uri) {
> + if (evio_service_is_active(&binary))
> + evio_service_stop(&binary);
> + if (cfg_msg.uri != NULL)
> + evio_service_bind(&binary, cfg_msg.uri);
> + }
> + if (cfg_msg.need_update_msg_max) {
> + cpipe_set_max_input(&tx_pipe,
> + cfg_msg.iproto_msg_max / 2);
> + int old = iproto_msg_max;
> + iproto_msg_max = cfg_msg.iproto_msg_max;
> + if (old < iproto_msg_max)
> + iproto_resume();
> + }
This is a matter of personal taste, but I'd prefer to not introduce
these extra flags, i.e.
if (cfg_msg.uri != NULL)
/* set uri */
if (cfg_msg.iproto_msg_max > 0)
/* update iproto_msg max */
> } catch (Exception *e) {
> return -1;
> }
> @@ -1696,9 +1716,10 @@ iproto_do_listen(struct cbus_call_msg *m)
> void
> iproto_bind(const char *uri)
> {
> - static struct iproto_cfg_msg m;
> - m.uri = uri;
> - if (cbus_call(&net_pipe, &tx_pipe, &m, iproto_do_cfg,
> + memset(&cfg_msg, 0, sizeof(cfg_msg));
> + cfg_msg.need_update_uri = true;
> + cfg_msg.uri = uri;
> + if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg,
> NULL, TIMEOUT_INFINITY))
> diag_raise();
> }
> @@ -1724,3 +1745,20 @@ iproto_reset_stat(void)
> {
> rmean_cleanup(rmean_net);
> }
> +
> +void
> +iproto_set_msg_max(int new_iproto_msg_max)
> +{
> + if (new_iproto_msg_max < IPROTO_MSG_MAX_MIN) {
> + tnt_raise(ClientError, ER_CFG, "iproto_msg_max",
> + tt_sprintf("minimal value is %d",
> + IPROTO_MSG_MAX_MIN));
> + }
> + memset(&cfg_msg, 0, sizeof(cfg_msg));
> + cfg_msg.need_update_msg_max = true;
> + cfg_msg.iproto_msg_max = new_iproto_msg_max;
> + if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg,
> + NULL, TIMEOUT_INFINITY))
> + diag_raise();
> + cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2);
> +}
AFAIR IPROTO_MSG_MAX is related to FIBER_POOL_SIZE so if we increase the
former, we should increase the latter as well, no?
> diff --git a/test/box/request_limit.test.lua b/test/box/request_limit.test.lua
> index 81298d91d..f002cea04 100644
> --- a/test/box/request_limit.test.lua
> +++ b/test/box/request_limit.test.lua
> @@ -8,7 +8,7 @@ conn = net_box.connect(box.cfg.listen)
> conn2 = net_box.connect(box.cfg.listen)
> active = 0
> continue = false
> -limit = 768
> +limit = box.cfg.iproto_msg_max
> run_max = (limit - 100) / 2
>
> old_readahead = box.cfg.readahead
> @@ -56,8 +56,59 @@ active == run_max * 2 or active
> continue = true
> while active ~= 0 do fiber.sleep(0.01) end
>
> +--
> +-- gh-3320: allow to change maximal count of messages.
> +--
> +
> +--
> +-- Increate maximal message count when nothing is blocked.
> +--
> +box.cfg{iproto_msg_max = limit * 2}
> +run_max = limit * 2 - 100
> +run_workers(conn)
> +wait_block()
> +active == run_max
> +-- Max can be decreased back even if now the limit is violated.
> +-- But a new input is blocked in such a case.
> +box.cfg{iproto_msg_max = limit}
> +old_active = active
> +for i = 1, 300 do fiber.create(do_long, conn) end
> +-- Afer time active cout is not changed - the input is blocked.
> +wait_block()
> +active == old_active
> +continue = true
> +while active ~= 0 do fiber.sleep(0.01) end
> +
> +--
> +-- Check that changing iproto_msg_max can resume stopped
> +-- connections.
> +--
> +run_max = limit / 2 + 100
> +run_workers(conn)
> +run_workers(conn2)
> +wait_block()
> +active >= limit
> +active < run_max * 2
> +box.cfg{iproto_msg_max = limit * 2}
> +wait_block()
> +active == run_max * 2
> +continue = true
> +while active ~= 0 do fiber.sleep(0.01) end
> +
> +--
> +-- Test TX fiber pool size limit.
> +--
> +run_max = 2500
> +box.cfg{iproto_msg_max = 5000}
> +run_workers(conn)
> +run_workers(conn2)
> +wait_block()
> +active
> +continue = true
> +while active ~= 0 do fiber.sleep(0.01) end
> +
> conn2:close()
> conn:close()
>
> box.schema.user.revoke('guest', 'read,write,execute', 'universe')
> -box.cfg{readahead = old_readahead}
> +box.cfg{readahead = old_readahead, iproto_msg_max = limit}
If I run the test several times in a row, it fails. Please fix.
================================================================================
TEST PARAMS RESULT
---------------------------------------------------------------------------
box/request_limit.test.lua [ pass ]
box/request_limit.test.lua [ fail ]
Test failed! Result content mismatch:
--- box/request_limit.result Mon Apr 23 14:19:48 2018
+++ box/request_limit.reject Mon Apr 23 14:32:52 2018
@@ -179,7 +179,7 @@
...
active == run_max * 2
---
-- true
+- false
...
continue = true
---
More information about the Tarantool-patches
mailing list