From: Ilya Kosarev <i.kosarev@tarantool.org> To: v.shpilevoy@tarantool.org, alyapunov@tarantool.org Cc: tarantool-patches@dev.tarantool.org Subject: [Tarantool-patches] [PATCH v6 3/3] iproto: move greeting from tx thread to iproto Date: Sun, 13 Dec 2020 02:38:47 +0300 [thread overview] Message-ID: <9781cebc8d5e7b2fbf7a38b692b78047169e08b6.1607815050.git.i.kosarev@tarantool.org> (raw) In-Reply-To: <cover.1607815050.git.i.kosarev@tarantool.org> In-Reply-To: <cover.1607815050.git.i.kosarev@tarantool.org> On connection, an evio service callback is invoked to accept it. The next step after acceptance was to process connection to tx thread through cbus. This meant that any connection interaction involves tx thread even before we get to send the greeting to the client. Consequently, the client might reach the instance with enormous number of connections, leading to the file descriptors limit exhaustion in case of unresponsive tx thread (for example, when building secondary index) and extra tx_process_connects afterwords, even in case the instance doesn't fail with too many open files error. This patch allows iproto to accept connection and send greeting by itself. Thus the connection is being established and stays in fetch_schema state while tx thread is unresponsive. Closes #3776 --- src/box/iproto.cc | 118 +++++++++++++----- test/app/gh-4787-netbox-empty-errmsg.result | 18 --- test/app/gh-4787-netbox-empty-errmsg.test.lua | 8 -- 3 files changed, 88 insertions(+), 56 deletions(-) diff --git a/src/box/iproto.cc b/src/box/iproto.cc index f7330af21d..a7da115925 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -453,6 +453,8 @@ struct iproto_connection * meaningless. */ size_t parse_size; + /** Iproto buffer used to send greeting. */ + char greeting_buf[IPROTO_GREETING_SIZE]; /** * Nubmer of active long polling requests that have already * discarded their arguments in order not to stall other @@ -1091,6 +1093,46 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher, } } +static int +iproto_buf_flush(struct iproto_connection *con) +{ + struct iovec greeting; + greeting.iov_base = &con->greeting_buf; + greeting.iov_len = IPROTO_GREETING_SIZE; + ssize_t nwr = sio_writev(con->output.fd, &greeting, 1); + + if (nwr > 0) { + /* Count statistics */ + rmean_collect(rmean_net, IPROTO_SENT, nwr); + return 1; + } else if (nwr < 0 && !sio_wouldblock(errno)) { + return -1; + } + + return 0; +} + +static void +iproto_connection_on_greeting(ev_loop *loop, struct ev_io *watcher, + int /* revents */) +{ + struct iproto_connection *con = + (struct iproto_connection *)watcher->data; + int rc = iproto_buf_flush(con); + if (rc <= 0) { + if (rc == 0) { + ev_io_start(loop, &con->output); + } else { + diag_log(); + con->state = IPROTO_CONNECTION_CLOSED; + } + return; + } + ev_io_stop(con->loop, &con->output); + ev_io_init(&con->output, iproto_connection_on_output, + con->output.fd, EV_WRITE); +} + static struct iproto_connection * iproto_connection_new(int fd) { @@ -1103,7 +1145,7 @@ iproto_connection_new(int fd) con->input.data = con->output.data = con; con->loop = loop(); ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ); - ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE); + ev_io_init(&con->output, iproto_connection_on_greeting, fd, EV_WRITE); ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead); ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead); obuf_create(&con->obuf[0], &net_slabc, iproto_readahead); @@ -1938,50 +1980,64 @@ net_end_subscribe(struct cmsg *m) } /** - * Handshake a connection: invoke the on-connect trigger - * and possibly authenticate. Try to send the client an error - * upon a failure. + * Handshake a connection: send greeting for it. + */ +static void +iproto_process_connect(struct iproto_msg *msg) +{ + struct iproto_connection *con = msg->connection; + /* + * INSTANCE_UUID is guaranteed to be inited before this moment. + * We start listening either in local_recovery() or bootstrap(). + * The INSTANCE_UUID is ensured to be inited in the beginning of + * both methods. In case of local_recovery() it is verified that + * INSTANCE_UUID was read from the snapshot in memtx_engine_new(). + * In bootstrap() INSTANCE_UUID is either taken from the + * instance_uuid box.cfg{} param or created on the spot. + */ + random_bytes(con->salt, IPROTO_SALT_SIZE); + greeting_encode(con->greeting_buf, tarantool_version_id(), + &INSTANCE_UUID, con->salt, IPROTO_SALT_SIZE); + assert(evio_has_fd(&con->output)); + ev_feed_event(con->loop, &con->output, EV_WRITE); +} + +/** + * Create the session and invoke the on_connect triggers. */ static void tx_process_connect(struct cmsg *m) { struct iproto_msg *msg = (struct iproto_msg *) m; struct iproto_connection *con = msg->connection; - struct obuf *out = msg->connection->tx.p_obuf; - try { /* connect. */ - con->session = session_create(SESSION_TYPE_BINARY); - if (con->session == NULL) - diag_raise(); - con->session->meta.connection = con; - tx_fiber_init(con->session, 0); - char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE); - /* TODO: dirty read from tx thread */ - struct tt_uuid uuid = INSTANCE_UUID; - random_bytes(con->salt, IPROTO_SALT_SIZE); - greeting_encode(greeting, tarantool_version_id(), &uuid, - con->salt, IPROTO_SALT_SIZE); - obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE); - if (! rlist_empty(&session_on_connect)) { - if (session_run_on_connect_triggers(con->session) != 0) - diag_raise(); - } - iproto_wpos_create(&msg->wpos, out); - } catch (Exception *e) { + + con->session = session_create(SESSION_TYPE_BINARY); + if (con->session == NULL) { tx_reply_error(msg); msg->close_connection = true; + return; + } + con->session->meta.connection = con; + + tx_fiber_init(con->session, 0); + if (! rlist_empty(&session_on_connect)) { + if (session_run_on_connect_triggers(con->session) != 0) { + tx_reply_error(msg); + msg->close_connection = true; + } } } /** - * Send a response to connect to the client or close the - * connection in case on_connect trigger failed. + * Try to send the client an error upon a failure. Start reading + * input in case the connection is inited and all good. */ static void -net_send_greeting(struct cmsg *m) +net_finish_connect(struct cmsg *m) { struct iproto_msg *msg = (struct iproto_msg *) m; struct iproto_connection *con = msg->connection; - if (msg->close_connection) { + if (msg->close_connection || con->state == IPROTO_CONNECTION_CLOSED) { struct obuf *out = msg->wpos.obuf; int64_t nwr = sio_writev(con->output.fd, out->iov, obuf_iovcnt(out)); @@ -1993,6 +2049,7 @@ net_send_greeting(struct cmsg *m) diag_log(); } assert(iproto_connection_is_idle(con)); + con->state = IPROTO_CONNECTION_ALIVE; iproto_connection_close(con); iproto_msg_delete(msg); return; @@ -2005,13 +2062,13 @@ net_send_greeting(struct cmsg *m) */ assert(evio_has_fd(&con->output)); /* Handshake OK, start reading input. */ - ev_feed_event(con->loop, &con->output, EV_WRITE); + ev_feed_event(con->loop, &con->input, EV_READ); iproto_msg_delete(msg); } static const struct cmsg_hop connect_route[] = { { tx_process_connect, &net_pipe }, - { net_send_greeting, NULL }, + { net_finish_connect, NULL }, }; /** }}} */ @@ -2042,6 +2099,7 @@ iproto_on_accept(struct evio_service * /* service */, int fd, cmsg_init(&msg->base, connect_route); msg->p_ibuf = con->p_ibuf; msg->wpos = con->wpos; + iproto_process_connect(msg); cpipe_push(&tx_pipe, &msg->base); return 0; } diff --git a/test/app/gh-4787-netbox-empty-errmsg.result b/test/app/gh-4787-netbox-empty-errmsg.result index d30337a050..6389b27bc8 100644 --- a/test/app/gh-4787-netbox-empty-errmsg.result +++ b/test/app/gh-4787-netbox-empty-errmsg.result @@ -38,24 +38,6 @@ req_during_auth() | - Connection is not established, state is "auth" | ... --- Check the same for 'initial' state. -ok, err = nil - | --- - | ... -do \ - c = netbox.connect(box.cfg.listen, {wait_connected = false}) \ - ok, err = pcall(c.call, c, 'echo', {}, {is_async = true}) \ -end - | --- - | ... -ok, err - | --- - | - false - | - Connection is not established, state is "initial" - | ... -c:close() - | --- - | ... box.schema.user.drop('test') | --- | ... diff --git a/test/app/gh-4787-netbox-empty-errmsg.test.lua b/test/app/gh-4787-netbox-empty-errmsg.test.lua index 0eecaa1bf0..55ea43f26f 100755 --- a/test/app/gh-4787-netbox-empty-errmsg.test.lua +++ b/test/app/gh-4787-netbox-empty-errmsg.test.lua @@ -21,12 +21,4 @@ end req_during_auth() --- Check the same for 'initial' state. -ok, err = nil -do \ - c = netbox.connect(box.cfg.listen, {wait_connected = false}) \ - ok, err = pcall(c.call, c, 'echo', {}, {is_async = true}) \ -end -ok, err -c:close() box.schema.user.drop('test') -- 2.17.1
next prev parent reply other threads:[~2020-12-12 23:38 UTC|newest] Thread overview: 6+ messages / expand[flat|nested] mbox.gz Atom feed top 2020-12-12 23:38 [Tarantool-patches] [PATCH v6 0/3] iproto: greeting enhancement Ilya Kosarev 2020-12-12 23:38 ` [Tarantool-patches] [PATCH v6 1/3] iproto: move msg fields initialization to iproto_msg_new() Ilya Kosarev 2020-12-12 23:38 ` [Tarantool-patches] [PATCH v6 2/3] iproto: fix comment and add assert on destruction Ilya Kosarev 2020-12-12 23:38 ` Ilya Kosarev [this message] 2020-12-15 23:04 ` [Tarantool-patches] [PATCH v6 3/3] iproto: move greeting from tx thread to iproto Vladislav Shpilevoy 2020-12-19 17:41 ` Ilya Kosarev
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=9781cebc8d5e7b2fbf7a38b692b78047169e08b6.1607815050.git.i.kosarev@tarantool.org \ --to=i.kosarev@tarantool.org \ --cc=alyapunov@tarantool.org \ --cc=tarantool-patches@dev.tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v6 3/3] iproto: move greeting from tx thread to iproto' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox