From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 7C28B22525 for ; Thu, 3 May 2018 17:05:23 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id HOXzTPNkFtbe for ; Thu, 3 May 2018 17:05:23 -0400 (EDT) Received: from smtp43.i.mail.ru (smtp43.i.mail.ru [94.100.177.103]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id BB8D92175E for ; Thu, 3 May 2018 17:05:22 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH v2 2/2] iproto: allow to configure IPROTO_MSG_MAX Date: Fri, 4 May 2018 00:05:20 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org IPROTO_MSG_MAX is a constant that restricts count of requests in fly. It allows to do not produce too many fibers in TX thread, that would lead to too big overhead on fibers switching, their stack storing. But some users have powerful metal on which Tarantool IPROTO_MSG_MAX constant is not serious. The patch exposes it as a configuration runtime parameter. 'net_msg_max' is its name. If a user sees that IProto thread is stuck due to too many requests, it can change iproto_msg_max in runtime, and IProto thread immediately starts processing pending requests. 'net_msg_max' can be decreased, but obviously it can not stop already runned requests, so if now in IProto thread request count is > new 'net_msg_max' value, then it takes some time until some requests will be finished. `net_msg_max` automatically increases fiber pool size, when needed. Closes #3320 --- src/box/box.cc | 11 +++- src/box/box.h | 1 + src/box/iproto.cc | 77 +++++++++++++++++++------- src/box/iproto.h | 3 ++ src/box/lua/cfg.cc | 12 +++++ src/box/lua/load_cfg.lua | 3 ++ src/fiber_pool.c | 14 +++++ src/fiber_pool.h | 14 ++++- test/app-tap/init_script.result | 55 +++++++++---------- test/box/admin.result | 2 + test/box/cfg.result | 27 ++++++++++ test/box/cfg.test.lua | 10 ++++ test/box/net_msg_max.result | 117 +++++++++++++++++++++++++++++++++++++++- test/box/net_msg_max.test.lua | 59 +++++++++++++++++++- 14 files changed, 354 insertions(+), 51 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index d2dfc5b5f..0d1f0e6ae 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -759,6 +759,14 @@ box_set_vinyl_timeout(void) vinyl_engine_set_timeout(vinyl, cfg_getd("vinyl_timeout")); } +void +box_set_net_msg_max(void) +{ + int new_iproto_msg_max = cfg_geti("net_msg_max"); + iproto_set_msg_max(new_iproto_msg_max); + fiber_pool_set_max_size(&tx_fiber_pool, new_iproto_msg_max); +} + /* }}} configuration bindings */ /** @@ -1711,7 +1719,7 @@ static inline void box_cfg_xc(void) { /* Join the cord interconnect as "tx" endpoint. */ - fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE, + fiber_pool_create(&tx_fiber_pool, "tx", FIBER_POOL_SIZE_DEFAULT, FIBER_POOL_IDLE_TIMEOUT); /* Add an extra endpoint for WAL wake up/rollback messages. */ cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint); @@ -1735,6 +1743,7 @@ box_cfg_xc(void) box_check_instance_uuid(&instance_uuid); box_check_replicaset_uuid(&replicaset_uuid); + box_set_net_msg_max(); box_set_checkpoint_count(); box_set_too_long_threshold(); box_set_replication_timeout(); diff --git a/src/box/box.h b/src/box/box.h index c9b5aad01..7726cb4f3 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -181,6 +181,7 @@ void box_set_vinyl_cache(void); void box_set_vinyl_timeout(void); void box_set_replication_timeout(void); void box_set_replication_connect_quorum(void); +void box_set_net_msg_max(void); extern "C" { #endif /* defined(__cplusplus) */ diff --git a/src/box/iproto.cc b/src/box/iproto.cc index dd7f97ecf..3175839cf 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -63,8 +63,10 @@ #include "applier.h" #include "cfg.h" -/* The number of iproto messages in flight */ -enum { IPROTO_MSG_MAX = 768 }; +enum { + IPROTO_MSG_MAX_DEFAULT = 768, + IPROTO_MSG_MAX_MIN = 2, +}; /** * Network readahead. A signed integer to avoid @@ -83,6 +85,9 @@ enum { IPROTO_MSG_MAX = 768 }; */ unsigned iproto_readahead = 16320; +/* The maximal number of iproto messages in fly. */ +static int iproto_msg_max = IPROTO_MSG_MAX_DEFAULT; + /** * How big is a buffer which needs to be shrunk before * it is put back into buffer cache. @@ -385,7 +390,7 @@ static inline bool iproto_check_msg_max() { size_t request_count = mempool_count(&iproto_msg_pool); - return request_count > IPROTO_MSG_MAX; + return request_count > (size_t) iproto_msg_max; } /** @@ -1666,7 +1671,7 @@ net_cord_f(va_list /* ap */) cbus_endpoint_create(&endpoint, "net", fiber_schedule_cb, fiber()); /* Create a pipe to "tx" thread. */ cpipe_create(&tx_pipe, "tx"); - cpipe_set_max_input(&tx_pipe, IPROTO_MSG_MAX/2); + cpipe_set_max_input(&tx_pipe, iproto_msg_max / 2); /* Process incomming messages. */ cbus_loop(&endpoint); @@ -1694,29 +1699,47 @@ iproto_init() /* Create a pipe to "net" thread. */ cpipe_create(&net_pipe, "net"); - cpipe_set_max_input(&net_pipe, IPROTO_MSG_MAX/2); + cpipe_set_max_input(&net_pipe, iproto_msg_max / 2); } /** * Since there is no way to "synchronously" change the - * state of the io thread, to change the listen port - * we need to bounce a couple of messages to and - * from this thread. + * state of the io thread, to change the listen port or max + * message count in fly it is needed to bounce a couple of + * messages to and from this thread. */ -struct iproto_bind_msg: public cbus_call_msg +struct iproto_cfg_msg: public cbus_call_msg { + /** New URI to bind to. */ const char *uri; + bool need_update_uri; + + /** New IProto max message count in fly. */ + int iproto_msg_max; + bool need_update_msg_max; }; +static struct iproto_cfg_msg cfg_msg; + static int -iproto_do_bind(struct cbus_call_msg *m) +iproto_do_cfg(struct cbus_call_msg *m) { - const char *uri = ((struct iproto_bind_msg *) m)->uri; + assert(m == &cfg_msg); try { - if (evio_service_is_active(&binary)) - evio_service_stop(&binary); - if (uri != NULL) - evio_service_bind(&binary, uri); + if (cfg_msg.need_update_uri) { + if (evio_service_is_active(&binary)) + evio_service_stop(&binary); + if (cfg_msg.uri != NULL) + evio_service_bind(&binary, cfg_msg.uri); + } + if (cfg_msg.need_update_msg_max) { + cpipe_set_max_input(&tx_pipe, + cfg_msg.iproto_msg_max / 2); + int old = iproto_msg_max; + iproto_msg_max = cfg_msg.iproto_msg_max; + if (old < iproto_msg_max) + iproto_resume(); + } } catch (Exception *e) { return -1; } @@ -1739,9 +1762,10 @@ iproto_do_listen(struct cbus_call_msg *m) void iproto_bind(const char *uri) { - static struct iproto_bind_msg m; - m.uri = uri; - if (cbus_call(&net_pipe, &tx_pipe, &m, iproto_do_bind, + memset(&cfg_msg, 0, sizeof(cfg_msg)); + cfg_msg.need_update_uri = true; + cfg_msg.uri = uri; + if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg, NULL, TIMEOUT_INFINITY)) diag_raise(); } @@ -1767,3 +1791,20 @@ iproto_reset_stat(void) { rmean_cleanup(rmean_net); } + +void +iproto_set_msg_max(int new_iproto_msg_max) +{ + if (new_iproto_msg_max < IPROTO_MSG_MAX_MIN) { + tnt_raise(ClientError, ER_CFG, "iproto_msg_max", + tt_sprintf("minimal value is %d", + IPROTO_MSG_MAX_MIN)); + } + memset(&cfg_msg, 0, sizeof(cfg_msg)); + cfg_msg.need_update_msg_max = true; + cfg_msg.iproto_msg_max = new_iproto_msg_max; + if (cbus_call(&net_pipe, &tx_pipe, &cfg_msg, iproto_do_cfg, + NULL, TIMEOUT_INFINITY)) + diag_raise(); + cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2); +} diff --git a/src/box/iproto.h b/src/box/iproto.h index 0268000da..a1dddc405 100644 --- a/src/box/iproto.h +++ b/src/box/iproto.h @@ -63,6 +63,9 @@ iproto_bind(const char *uri); void iproto_listen(); +void +iproto_set_msg_max(int iproto_msg_max); + #endif /* defined(__cplusplus) */ #endif diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc index 5e88ca348..629ded626 100644 --- a/src/box/lua/cfg.cc +++ b/src/box/lua/cfg.cc @@ -220,6 +220,17 @@ lbox_cfg_set_vinyl_timeout(struct lua_State *L) return 0; } +static int +lbox_cfg_set_net_msg_max(struct lua_State *L) +{ + try { + box_set_net_msg_max(); + } catch (Exception *) { + luaT_error(L); + } + return 0; +} + static int lbox_cfg_set_worker_pool_threads(struct lua_State *L) { @@ -275,6 +286,7 @@ box_lua_cfg_init(struct lua_State *L) {"cfg_set_replication_timeout", lbox_cfg_set_replication_timeout}, {"cfg_set_replication_connect_quorum", lbox_cfg_set_replication_connect_quorum}, + {"cfg_set_net_msg_max", lbox_cfg_set_net_msg_max}, {NULL, NULL} }; diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 3a5a6d46a..5e3efdb4b 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -63,6 +63,7 @@ local default_cfg = { feedback_enabled = true, feedback_host = "https://feedback.tarantool.io", feedback_interval = 3600, + net_msg_max = 768, } -- types of available options @@ -123,6 +124,7 @@ local template_cfg = { feedback_enabled = 'boolean', feedback_host = 'string', feedback_interval = 'number', + net_msg_max = 'number', } local function normalize_uri(port) @@ -195,6 +197,7 @@ local dynamic_cfg = { replication_timeout = private.cfg_set_replication_timeout, replication_connect_quorum = private.cfg_set_replication_connect_quorum, replication_skip_conflict = function() end, + net_msg_max = private.cfg_set_net_msg_max, } local dynamic_cfg_skip_at_load = { diff --git a/src/fiber_pool.c b/src/fiber_pool.c index aa8b19510..3b9718ad3 100644 --- a/src/fiber_pool.c +++ b/src/fiber_pool.c @@ -136,6 +136,20 @@ fiber_pool_cb(ev_loop *loop, struct ev_watcher *watcher, int events) } } +void +fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size) +{ + if (new_max_size < FIBER_POOL_SIZE_DEFAULT) + new_max_size = FIBER_POOL_SIZE_DEFAULT; + + if (new_max_size > pool->max_size) { + pool->max_size = new_max_size; + cbus_process(&pool->endpoint); + } else { + pool->max_size = new_max_size; + } +} + void fiber_pool_create(struct fiber_pool *pool, const char *name, int max_pool_size, float idle_timeout) diff --git a/src/fiber_pool.h b/src/fiber_pool.h index d6a95105b..2a04d6063 100644 --- a/src/fiber_pool.h +++ b/src/fiber_pool.h @@ -41,7 +41,10 @@ extern "C" { #endif /* defined(__cplusplus) */ -enum { FIBER_POOL_SIZE = 4096, FIBER_POOL_IDLE_TIMEOUT = 1 }; +enum { + FIBER_POOL_SIZE_DEFAULT = 4096, + FIBER_POOL_IDLE_TIMEOUT = 1 +}; /** * A pool of worker fibers to handle messages, @@ -83,6 +86,15 @@ void fiber_pool_create(struct fiber_pool *pool, const char *name, int max_pool_size, float idle_timeout); +/** + * Set maximal fiber pool size. If a new size is bigger than the + * current, then pending requests processing is started. + * @param pool Fiber pool to set size. + * @param new_max_size New maximal size. + */ +void +fiber_pool_set_max_size(struct fiber_pool *pool, int new_max_size); + /** * Destroy a fiber pool */ diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result index 5625f1466..7504a2813 100644 --- a/test/app-tap/init_script.result +++ b/test/app-tap/init_script.result @@ -21,33 +21,34 @@ box.cfg 16 memtx_max_tuple_size:1048576 17 memtx_memory:107374182 18 memtx_min_tuple_size:16 -19 pid_file:box.pid -20 read_only:false -21 readahead:16320 -22 replication_connect_timeout:4 -23 replication_skip_conflict:false -24 replication_sync_lag:10 -25 replication_timeout:1 -26 rows_per_wal:500000 -27 slab_alloc_factor:1.05 -28 too_long_threshold:0.5 -29 vinyl_bloom_fpr:0.05 -30 vinyl_cache:134217728 -31 vinyl_dir:. -32 vinyl_max_tuple_size:1048576 -33 vinyl_memory:134217728 -34 vinyl_page_size:8192 -35 vinyl_range_size:1073741824 -36 vinyl_read_threads:1 -37 vinyl_run_count_per_level:2 -38 vinyl_run_size_ratio:3.5 -39 vinyl_timeout:60 -40 vinyl_write_threads:2 -41 wal_dir:. -42 wal_dir_rescan_delay:2 -43 wal_max_size:268435456 -44 wal_mode:write -45 worker_pool_threads:4 +19 net_msg_max:768 +20 pid_file:box.pid +21 read_only:false +22 readahead:16320 +23 replication_connect_timeout:4 +24 replication_skip_conflict:false +25 replication_sync_lag:10 +26 replication_timeout:1 +27 rows_per_wal:500000 +28 slab_alloc_factor:1.05 +29 too_long_threshold:0.5 +30 vinyl_bloom_fpr:0.05 +31 vinyl_cache:134217728 +32 vinyl_dir:. +33 vinyl_max_tuple_size:1048576 +34 vinyl_memory:134217728 +35 vinyl_page_size:8192 +36 vinyl_range_size:1073741824 +37 vinyl_read_threads:1 +38 vinyl_run_count_per_level:2 +39 vinyl_run_size_ratio:3.5 +40 vinyl_timeout:60 +41 vinyl_write_threads:2 +42 wal_dir:. +43 wal_dir_rescan_delay:2 +44 wal_max_size:268435456 +45 wal_mode:write +46 worker_pool_threads:4 -- -- Test insert from detached fiber -- diff --git a/test/box/admin.result b/test/box/admin.result index 2168c3adb..29c8c592d 100644 --- a/test/box/admin.result +++ b/test/box/admin.result @@ -54,6 +54,8 @@ cfg_filter(box.cfg) - 107374182 - - memtx_min_tuple_size - + - - net_msg_max + - 768 - - pid_file - - - read_only diff --git a/test/box/cfg.result b/test/box/cfg.result index 28449d9cc..61c5f79af 100644 --- a/test/box/cfg.result +++ b/test/box/cfg.result @@ -50,6 +50,8 @@ cfg_filter(box.cfg) - 107374182 - - memtx_min_tuple_size - + - - net_msg_max + - 768 - - pid_file - - - read_only @@ -147,6 +149,8 @@ cfg_filter(box.cfg) - 107374182 - - memtx_min_tuple_size - + - - net_msg_max + - 768 - - pid_file - - - read_only @@ -411,6 +415,29 @@ test_run:cmd("cleanup server cfg_tester") --- - true ... +-- +-- gh-3320: box.cfg{net_msg_max}. +-- +box.cfg{net_msg_max = 'invalid'} +--- +- error: 'Incorrect value for option ''net_msg_max'': should be of type number' +... +box.cfg{net_msg_max = 0} +--- +- error: 'Incorrect value for option ''iproto_msg_max'': minimal value is 2' +... +old = box.cfg.net_msg_max +--- +... +box.cfg{net_msg_max = 2} +--- +... +box.cfg{net_msg_max = old + 1000} +--- +... +box.cfg{net_msg_max = old} +--- +... test_run:cmd("clear filter") --- - true diff --git a/test/box/cfg.test.lua b/test/box/cfg.test.lua index a73ae395b..2d819c94c 100644 --- a/test/box/cfg.test.lua +++ b/test/box/cfg.test.lua @@ -81,4 +81,14 @@ test_run:cmd("switch default") test_run:cmd("stop server cfg_tester") test_run:cmd("cleanup server cfg_tester") +-- +-- gh-3320: box.cfg{net_msg_max}. +-- +box.cfg{net_msg_max = 'invalid'} +box.cfg{net_msg_max = 0} +old = box.cfg.net_msg_max +box.cfg{net_msg_max = 2} +box.cfg{net_msg_max = old + 1000} +box.cfg{net_msg_max = old} + test_run:cmd("clear filter") diff --git a/test/box/net_msg_max.result b/test/box/net_msg_max.result index de22bcbb9..3b515bffd 100644 --- a/test/box/net_msg_max.result +++ b/test/box/net_msg_max.result @@ -25,7 +25,7 @@ finished = 0 continue = false --- ... -limit = 768 +limit = box.cfg.net_msg_max --- ... run_max = (limit - 100) / 2 @@ -121,6 +121,119 @@ wait_active(limit + 1) wait_finished(run_max) --- ... +-- +-- gh-3320: allow to change maximal count of messages. +-- +-- +-- Test minimal iproto msg count. +-- +box.cfg{net_msg_max = 2} +--- +... +conn:ping() +--- +- true +... +#conn.space._space:select{} > 0 +--- +- true +... +run_max = 15 +--- +... +run_workers(conn) +--- +... +wait_active(3) +--- +... +active +--- +- 3 +... +wait_finished(run_max) +--- +... +-- +-- Increate maximal message count when nothing is blocked. +-- +box.cfg{net_msg_max = limit * 2} +--- +... +run_max = limit * 2 - 100 +--- +... +run_workers(conn) +--- +... +wait_active(run_max) +--- +... +-- Max can be decreased back even if now the limit is violated. +-- But a new input is blocked in such a case. +box.cfg{net_msg_max = limit} +--- +... +old_active = active +--- +... +for i = 1, 300 do fiber.create(do_long, conn) end +--- +... +-- Afer time active count is not changed - the input is blocked. +wait_active(old_active) +--- +... +wait_finished(active + 300) +--- +... +-- +-- Check that changing net_msg_max can resume stopped +-- connections. +-- +run_max = limit / 2 + 100 +--- +... +run_workers(conn) +--- +... +run_workers(conn2) +--- +... +wait_active(limit + 1) +--- +... +box.cfg{net_msg_max = limit * 2} +--- +... +wait_active(run_max * 2) +--- +... +wait_finished(active) +--- +... +-- +-- Test TX fiber pool size limit. It is increased together with +-- iproto msg max. +-- +run_max = 2500 +--- +... +box.cfg{net_msg_max = 5000} +--- +... +run_workers(conn) +--- +... +run_workers(conn2) +--- +... +wait_active(5000) +--- +... +wait_finished(run_max * 2) +--- +... conn2:close() --- ... @@ -130,6 +243,6 @@ conn:close() box.schema.user.revoke('guest', 'read,write,execute', 'universe') --- ... -box.cfg{readahead = old_readahead} +box.cfg{readahead = old_readahead, net_msg_max = limit} --- ... diff --git a/test/box/net_msg_max.test.lua b/test/box/net_msg_max.test.lua index 39f8f53f7..887d73ddc 100644 --- a/test/box/net_msg_max.test.lua +++ b/test/box/net_msg_max.test.lua @@ -9,7 +9,7 @@ conn2 = net_box.connect(box.cfg.listen) active = 0 finished = 0 continue = false -limit = 768 +limit = box.cfg.net_msg_max run_max = (limit - 100) / 2 old_readahead = box.cfg.readahead @@ -72,8 +72,63 @@ run_workers(conn) wait_active(limit + 1) wait_finished(run_max) +-- +-- gh-3320: allow to change maximal count of messages. +-- + +-- +-- Test minimal iproto msg count. +-- +box.cfg{net_msg_max = 2} +conn:ping() +#conn.space._space:select{} > 0 +run_max = 15 +run_workers(conn) +wait_active(3) +active +wait_finished(run_max) + +-- +-- Increate maximal message count when nothing is blocked. +-- +box.cfg{net_msg_max = limit * 2} +run_max = limit * 2 - 100 +run_workers(conn) +wait_active(run_max) +-- Max can be decreased back even if now the limit is violated. +-- But a new input is blocked in such a case. +box.cfg{net_msg_max = limit} +old_active = active +for i = 1, 300 do fiber.create(do_long, conn) end +-- Afer time active count is not changed - the input is blocked. +wait_active(old_active) +wait_finished(active + 300) + +-- +-- Check that changing net_msg_max can resume stopped +-- connections. +-- +run_max = limit / 2 + 100 +run_workers(conn) +run_workers(conn2) +wait_active(limit + 1) +box.cfg{net_msg_max = limit * 2} +wait_active(run_max * 2) +wait_finished(active) + +-- +-- Test TX fiber pool size limit. It is increased together with +-- iproto msg max. +-- +run_max = 2500 +box.cfg{net_msg_max = 5000} +run_workers(conn) +run_workers(conn2) +wait_active(5000) +wait_finished(run_max * 2) + conn2:close() conn:close() box.schema.user.revoke('guest', 'read,write,execute', 'universe') -box.cfg{readahead = old_readahead} +box.cfg{readahead = old_readahead, net_msg_max = limit} -- 2.15.1 (Apple Git-101)