From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Mon, 23 Apr 2018 14:34:56 +0300 From: Vladimir Davydov Subject: Re: [PATCH 3/3] iproto: allow to configure IPROTO_MSG_MAX Message-ID: <20180423113456.4ui5m6w3f4hur2yh@esperanza> References: <9a34dae0a53841d2d3f977f1096768396bb2835b.1524264646.git.v.shpilevoy@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <9a34dae0a53841d2d3f977f1096768396bb2835b.1524264646.git.v.shpilevoy@tarantool.org> To: Vladislav Shpilevoy Cc: tarantool-patches@freelists.org List-ID: On Sat, Apr 21, 2018 at 01:52:11AM +0300, Vladislav Shpilevoy wrote: > IPROTO_MSG_MAX is a constant that restricts count of requests in > fly. It allows to do not produce too many fibers in TX thread, > that would lead to too big overhead on fibers switching, their > stack storing. > > But some users have powerful metal on which Tarantool > IPROTO_MSG_MAX constant is not serious. The patch exposes it as > a configuration runtime parameter. > > 'iproto_msg_max' is its name. If a user sees that IProto thread > is stuck due to too many requests, it can change iproto_msg_max > in runtime, and IProto thread immediately starts processing > pending requests. > > 'iproto_msg_max' can be decreased, but obviously it can not stop > already runned requests, so if now in IProto thread request count > is > new 'iproto_msg_max' value, then it takes some time until > some requests will be finished. > > Closes #3320 > --- > src/box/box.cc | 7 +++ > src/box/box.h | 1 + > src/box/iproto.cc | 70 +++++++++++++++++------ > src/box/iproto.h | 3 + > src/box/lua/cfg.cc | 12 ++++ > src/box/lua/load_cfg.lua | 3 + > test/app-tap/init_script.result | 73 ++++++++++++------------ > test/box/admin.result | 2 + > test/box/cfg.result | 24 ++++++++ > test/box/cfg.test.lua | 9 +++ > test/box/request_limit.result | 119 +++++++++++++++++++++++++++++++++++++++- > test/box/request_limit.test.lua | 55 ++++++++++++++++++- > 12 files changed, 322 insertions(+), 56 deletions(-) > > diff --git a/src/box/box.cc b/src/box/box.cc > index d2dfc5b5f..80684ad48 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -759,6 +759,12 @@ box_set_vinyl_timeout(void) > vinyl_engine_set_timeout(vinyl, cfg_getd("vinyl_timeout")); > } > > +void > +box_set_iproto_msg_max(void) > +{ > + iproto_set_msg_max(cfg_geti("iproto_msg_max")); > +} > + > /* }}} configuration bindings */ > > /** > @@ -1735,6 +1741,7 @@ box_cfg_xc(void) > box_check_instance_uuid(&instance_uuid); > box_check_replicaset_uuid(&replicaset_uuid); > > + box_set_iproto_msg_max(); > box_set_checkpoint_count(); > box_set_too_long_threshold(); > box_set_replication_timeout(); > diff --git a/src/box/box.h b/src/box/box.h > index c9b5aad01..712e21191 100644 > --- a/src/box/box.h > +++ b/src/box/box.h > @@ -181,6 +181,7 @@ void box_set_vinyl_cache(void); > void box_set_vinyl_timeout(void); > void box_set_replication_timeout(void); > void box_set_replication_connect_quorum(void); > +void box_set_iproto_msg_max(void); > > extern "C" { > #endif /* defined(__cplusplus) */ > diff --git a/src/box/iproto.cc b/src/box/iproto.cc > index 585e8cc83..1266eb255 100644 > --- a/src/box/iproto.cc > +++ b/src/box/iproto.cc > @@ -63,8 +63,7 @@ > #include "applier.h" > #include "cfg.h" > > -/* The number of iproto messages in flight */ > -enum { IPROTO_MSG_MAX = 768 }; > +enum { IPROTO_MSG_MAX_MIN = 768 }; Why do you forbid to set iproto_msg_max to 1 for instance? Why do we have to allow at least 768 messages in flight? If there's no specific reason, I'd prefer to remove this artificial lower bound. > > /** > * Network readahead. A signed integer to avoid > @@ -83,6 +82,9 @@ enum { IPROTO_MSG_MAX = 768 }; > */ > unsigned iproto_readahead = 16320; > > +/* The maximal number of iproto messages in flight. */ > +static int iproto_msg_max = IPROTO_MSG_MAX_MIN; > + > /** > * How big is a buffer which needs to be shrunk before > * it is put back into buffer cache. > @@ -381,7 +383,7 @@ iproto_must_stop_input() > { > size_t connection_count = mempool_count(&iproto_connection_pool); > size_t request_count = mempool_count(&iproto_msg_pool); > - return request_count > connection_count + IPROTO_MSG_MAX; > + return request_count > connection_count + iproto_msg_max; > } > > /** > @@ -1623,7 +1625,7 @@ net_cord_f(va_list /* ap */) > cbus_endpoint_create(&endpoint, "net", fiber_schedule_cb, fiber()); > /* Create a pipe to "tx" thread. */ > cpipe_create(&tx_pipe, "tx"); > - cpipe_set_max_input(&tx_pipe, IPROTO_MSG_MAX/2); > + cpipe_set_max_input(&tx_pipe, iproto_msg_max / 2); > /* Process incomming messages. */ > cbus_loop(&endpoint); > > @@ -1651,29 +1653,47 @@ iproto_init() > > /* Create a pipe to "net" thread. */ > cpipe_create(&net_pipe, "net"); > - cpipe_set_max_input(&net_pipe, IPROTO_MSG_MAX/2); > + cpipe_set_max_input(&net_pipe, iproto_msg_max / 2); > } > > /** > * Since there is no way to "synchronously" change the > - * state of the io thread, to change the listen port > - * we need to bounce a couple of messages to and > - * from this thread. > + * state of the io thread, to change the listen port or max > + * message count in fly it is needed to bounce a couple of > + * messages to and from this thread. > */ > struct iproto_cfg_msg: public cbus_call_msg > { > + /** New URI to bind to. */ > const char *uri; > + bool need_update_uri; > + > + /** New IProto max message count in fly. */ > + int iproto_msg_max; > + bool need_update_msg_max; > }; > > +static struct iproto_cfg_msg cfg_msg; > + > static int > iproto_do_cfg(struct cbus_call_msg *m) > { > - const char *uri = ((struct iproto_cfg_msg *) m)->uri; > + assert(m == &cfg_msg); > try { > - if (evio_service_is_active(&binary)) > - evio_service_stop(&binary); > - if (uri != NULL) > - evio_service_bind(&binary, uri); > + if (cfg_msg.need_update_uri) { > + if (evio_service_is_active(&binary)) > + evio_service_stop(&binary); > + if (cfg_msg.uri != NULL) > + evio_service_bind(&binary, cfg_msg.uri); > + } > + if (cfg_msg.need_update_msg_max) { > + cpipe_set_max_input(&tx_pipe, > + cfg_msg.iproto_msg_max / 2); > + int old = iproto_msg_max; > + iproto_msg_max = cfg_msg.iproto_msg_max; > + if (old < iproto_msg_max) > + iproto_resume(); > + } This is a matter of personal taste, but I'd prefer to not introduce these extra flags, i.e. if (cfg_msg.uri != NULL) /* set uri */ if (cfg_msg.iproto_msg_max > 0) /* update iproto_msg max */ > } catch (Exception *e) { > return -1; > } > @@ -1696,9 +1716,10 @@ iproto_do_listen(struct cbus_call_msg *m) > void > iproto_bind(const char *uri) > { > - static struct iproto_cfg_msg m; > - m.uri = uri; > - if (cbus_call(&net_pipe, &tx_pipe, &m, iproto_do_cfg, > + memset(&cfg_msg, 0, sizeof(cfg_msg)); > + cfg_msg.need_update_uri = true; > + cfg_msg.uri = uri; > + if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg, > NULL, TIMEOUT_INFINITY)) > diag_raise(); > } > @@ -1724,3 +1745,20 @@ iproto_reset_stat(void) > { > rmean_cleanup(rmean_net); > } > + > +void > +iproto_set_msg_max(int new_iproto_msg_max) > +{ > + if (new_iproto_msg_max < IPROTO_MSG_MAX_MIN) { > + tnt_raise(ClientError, ER_CFG, "iproto_msg_max", > + tt_sprintf("minimal value is %d", > + IPROTO_MSG_MAX_MIN)); > + } > + memset(&cfg_msg, 0, sizeof(cfg_msg)); > + cfg_msg.need_update_msg_max = true; > + cfg_msg.iproto_msg_max = new_iproto_msg_max; > + if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg, > + NULL, TIMEOUT_INFINITY)) > + diag_raise(); > + cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2); > +} AFAIR IPROTO_MSG_MAX is related to FIBER_POOL_SIZE so if we increase the former, we should increase the latter as well, no? > diff --git a/test/box/request_limit.test.lua b/test/box/request_limit.test.lua > index 81298d91d..f002cea04 100644 > --- a/test/box/request_limit.test.lua > +++ b/test/box/request_limit.test.lua > @@ -8,7 +8,7 @@ conn = net_box.connect(box.cfg.listen) > conn2 = net_box.connect(box.cfg.listen) > active = 0 > continue = false > -limit = 768 > +limit = box.cfg.iproto_msg_max > run_max = (limit - 100) / 2 > > old_readahead = box.cfg.readahead > @@ -56,8 +56,59 @@ active == run_max * 2 or active > continue = true > while active ~= 0 do fiber.sleep(0.01) end > > +-- > +-- gh-3320: allow to change maximal count of messages. > +-- > + > +-- > +-- Increate maximal message count when nothing is blocked. > +-- > +box.cfg{iproto_msg_max = limit * 2} > +run_max = limit * 2 - 100 > +run_workers(conn) > +wait_block() > +active == run_max > +-- Max can be decreased back even if now the limit is violated. > +-- But a new input is blocked in such a case. > +box.cfg{iproto_msg_max = limit} > +old_active = active > +for i = 1, 300 do fiber.create(do_long, conn) end > +-- Afer time active cout is not changed - the input is blocked. > +wait_block() > +active == old_active > +continue = true > +while active ~= 0 do fiber.sleep(0.01) end > + > +-- > +-- Check that changing iproto_msg_max can resume stopped > +-- connections. > +-- > +run_max = limit / 2 + 100 > +run_workers(conn) > +run_workers(conn2) > +wait_block() > +active >= limit > +active < run_max * 2 > +box.cfg{iproto_msg_max = limit * 2} > +wait_block() > +active == run_max * 2 > +continue = true > +while active ~= 0 do fiber.sleep(0.01) end > + > +-- > +-- Test TX fiber pool size limit. > +-- > +run_max = 2500 > +box.cfg{iproto_msg_max = 5000} > +run_workers(conn) > +run_workers(conn2) > +wait_block() > +active > +continue = true > +while active ~= 0 do fiber.sleep(0.01) end > + > conn2:close() > conn:close() > > box.schema.user.revoke('guest', 'read,write,execute', 'universe') > -box.cfg{readahead = old_readahead} > +box.cfg{readahead = old_readahead, iproto_msg_max = limit} If I run the test several times in a row, it fails. Please fix. ================================================================================ TEST PARAMS RESULT --------------------------------------------------------------------------- box/request_limit.test.lua [ pass ] box/request_limit.test.lua [ fail ] Test failed! Result content mismatch: --- box/request_limit.result Mon Apr 23 14:19:48 2018 +++ box/request_limit.reject Mon Apr 23 14:32:52 2018 @@ -179,7 +179,7 @@ ... active == run_max * 2 --- -- true +- false ... continue = true ---