* [Tarantool-patches] [PATCH v2] iproto: make iproto thread more independent from tx @ 2020-09-18 13:09 Ilya Kosarev 2020-09-21 15:51 ` Leonid Vasiliev 0 siblings, 1 reply; 3+ messages in thread From: Ilya Kosarev @ 2020-09-18 13:09 UTC (permalink / raw) To: lvasiliev, alyapunov; +Cc: tarantool-patches On connection, an evio service callback is invoked to accept it. The next step after acception was to process connection to tx thread through cbus. This meant that any connection interaction involves tx thread even before we get to decode what does the client want from us. Consequently, a number of problems appears. The main one is that we might get descriptor leak in case of unresponsive tx thread (for example, when building secondary index). There are some other cases where we might not want to spend precious tx time to process the connection in case iproto can do it all alone. This patch allows iproto to accept connection and send greeting by itself. The connection is initialized in tx thread when the real request comes through iproto_msg_decode(). In case request type was not recognized we can also send reply with an error without using tx. It is planned to add more iproto logic to prevent extra interaction with tx thread. This patch already to some extent solves descriptors leakage problem as far as connection establishes and stays in fetch_schema state while tx thread is unresponsive. The other user visible change is that on_connect triggers won't run on connections that don't provide any input, as reflected in bad_trigger.test.py. Part of #3776 @TarantoolBot document Title: iproto: on_connect triggers execution Update the documentation for on_connect triggers to reflect that they are now being executed only with the first request. Though the triggers are still the first thing to be executed on a new connection. While it is quite a synthetic case to establish a connection without making any requests it is technically possible and now your triggers won't be executed in this case. Some request is required to start their execution. --- Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto Issue: https://github.com/tarantool/tarantool/issues/3776 @ChangeLog: * Make iproto thread more independent from tx (gh-3776). Changes in v2: - docbot request provided - ChangeLog provided - net_send_msg() used instead of net_send_error() where needed - error cases made clear in iproto_msg_decode() - replaced net_check_connection() with iproto_connection_fail() predicate - improved tx session initialization fail processing to avoid extra tries - fixed obuf_dup() fail processing in iproto_process_connect() - added some comments - some comments style fixed src/box/iproto.cc | 304 +++++++++++++++++++++----------- test/box-py/bad_trigger.result | 1 + test/box-py/bad_trigger.test.py | 22 ++- 3 files changed, 223 insertions(+), 104 deletions(-) diff --git a/src/box/iproto.cc b/src/box/iproto.cc index b8f65e5eca..67bbd357a3 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -245,7 +245,7 @@ iproto_msg_new(struct iproto_connection *con); static void iproto_resume(void); -static void +static int iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, bool *stop_input); @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg) iproto_resume(); } +static inline void +iproto_connection_delete(struct iproto_connection *con); + /** * A single global queue for all requests in all connections. All * requests from all connections are processed concurrently. @@ -280,6 +283,11 @@ static struct cord net_cord; * in the tx thread. */ static struct slab_cache net_slabc; +/** + * Slab cache used for allocating memory for output network buffers + * in the iproto thread. + */ +static struct slab_cache iproto_slabc; struct rmean *rmean_net; @@ -298,6 +306,9 @@ const char *rmean_net_strings[IPROTO_LAST] = { "REQUESTS", }; +static int +tx_init_connect(struct iproto_msg *msg); + static void tx_process_destroy(struct cmsg *m); @@ -463,6 +474,7 @@ struct iproto_connection struct ev_io output; /** Logical session. */ struct session *session; + bool init_failed; ev_loop *loop; /** * Pre-allocated disconnect msg. Is sent right after @@ -650,7 +662,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con) * other parts of the connection. */ con->state = IPROTO_CONNECTION_DESTROYED; - cpipe_push(&tx_pipe, &con->destroy_msg); + if (con->session != NULL) + cpipe_push(&tx_pipe, &con->destroy_msg); + else { + /* + * In case session was not created we can safely destroy + * not involving tx thread. Thus we also need to destroy + * obuf, which still belongs to iproto thread. + */ + obuf_destroy(&con->obuf[0]); + obuf_destroy(&con->obuf[1]); + iproto_connection_delete(con); + } } /** @@ -677,9 +700,18 @@ iproto_connection_close(struct iproto_connection *con) * is done only once. */ con->p_ibuf->wpos -= con->parse_size; - cpipe_push(&tx_pipe, &con->disconnect_msg); assert(con->state == IPROTO_CONNECTION_ALIVE); con->state = IPROTO_CONNECTION_CLOSED; + rlist_del(&con->in_stop_list); + if (con->session != NULL) + cpipe_push(&tx_pipe, &con->disconnect_msg); + else + /* + * In case session was not created we can safely + * try to start destroy not involving tx thread. + */ + iproto_connection_try_to_start_destroy(con); + return; } else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) { iproto_connection_try_to_start_destroy(con); } else { @@ -809,6 +841,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) assert(rlist_empty(&con->in_stop_list)); int n_requests = 0; bool stop_input = false; + bool obuf_in_iproto = (con->session == NULL); const char *errmsg; while (con->parse_size != 0 && !stop_input) { if (iproto_check_msg_max()) { @@ -853,12 +886,20 @@ err_msgpack: msg->len = reqend - reqstart; /* total request length */ - iproto_msg_decode(msg, &pos, reqend, &stop_input); - /* - * This can't throw, but should not be - * done in case of exception. - */ - cpipe_push_input(&tx_pipe, &msg->base); + if (iproto_msg_decode(msg, &pos, reqend, &stop_input) != -1) { + if (obuf_in_iproto) { + /* + * If session was not created yet and obuf is + * still in iproto we need to destroy it. New + * one will be created in tx thread if needed. + */ + obuf_destroy(&con->obuf[0]); + obuf_destroy(&con->obuf[1]); + obuf_in_iproto = false; + } + cpipe_push_input(&tx_pipe, &msg->base); + } + n_requests++; /* Request is parsed */ assert(reqend > reqstart); @@ -1105,8 +1146,8 @@ iproto_connection_new(int fd) ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE); ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead); ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead); - obuf_create(&con->obuf[0], &net_slabc, iproto_readahead); - obuf_create(&con->obuf[1], &net_slabc, iproto_readahead); + obuf_create(&con->obuf[0], &iproto_slabc, iproto_readahead); + obuf_create(&con->obuf[1], &iproto_slabc, iproto_readahead); con->p_ibuf = &con->ibuf[0]; con->tx.p_obuf = &con->obuf[0]; iproto_wpos_create(&con->wpos, con->tx.p_obuf); @@ -1114,6 +1155,7 @@ iproto_connection_new(int fd) con->parse_size = 0; con->long_poll_count = 0; con->session = NULL; + con->init_failed = false; rlist_create(&con->in_stop_list); /* It may be very awkward to allocate at close. */ cmsg_init(&con->destroy_msg, destroy_route); @@ -1134,10 +1176,6 @@ iproto_connection_delete(struct iproto_connection *con) assert(!evio_has_fd(&con->input)); assert(con->session == NULL); assert(con->state == IPROTO_CONNECTION_DESTROYED); - /* - * The output buffers must have been deleted - * in tx thread. - */ ibuf_destroy(&con->ibuf[0]); ibuf_destroy(&con->ibuf[1]); assert(con->obuf[0].pos == 0 && @@ -1172,6 +1210,9 @@ tx_reply_error(struct iproto_msg *msg); static void tx_reply_iproto_error(struct cmsg *m); +static void +tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos); + static void net_send_msg(struct cmsg *msg); @@ -1244,7 +1285,7 @@ static const struct cmsg_hop error_route[] = { { net_send_error, NULL }, }; -static void +static int iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, bool *stop_input) { @@ -1314,13 +1355,27 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, (uint32_t) type); goto error; } - return; + return 0; error: /** Log and send the error. */ diag_log(); diag_create(&msg->diag); diag_move(&fiber()->diag, &msg->diag); - cmsg_init(&msg->base, error_route); + if (msg->connection->session != NULL) { + cmsg_init(&msg->base, error_route); + return 1; + } + /* + * In case session was not created we can process error path + * without tx thread. + */ + tx_accept_wpos(msg->connection, &msg->wpos); + struct obuf *out = msg->connection->tx.p_obuf; + iproto_reply_error(out, diag_last_error(&msg->diag), + msg->header.sync, ::schema_version); + iproto_wpos_create(&msg->wpos, out); + net_send_msg(&(msg->base)); + return -1; } static void @@ -1382,10 +1437,6 @@ tx_process_destroy(struct cmsg *m) session_destroy(con->session); con->session = NULL; /* safety */ } - /* - * Got to be done in iproto thread since - * that's where the memory is allocated. - */ obuf_destroy(&con->obuf[0]); obuf_destroy(&con->obuf[1]); } @@ -1478,13 +1529,27 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos) } } -static inline struct iproto_msg * -tx_accept_msg(struct cmsg *m) +static inline int +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg) { - struct iproto_msg *msg = (struct iproto_msg *) m; - tx_accept_wpos(msg->connection, &msg->wpos); - tx_fiber_init(msg->connection->session, msg->header.sync); - return msg; + *msg = (struct iproto_msg *) m; + /* + * In case connection init failed we don't need to try anymore. + */ + if ((*msg)->connection->init_failed) + return -1; + /* + * In case session was not created we need to init connection in tx and + * create it here. + */ + if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) { + (*msg)->connection->init_failed = true; + (*msg)->close_connection = true; + return -1; + } + tx_accept_wpos((*msg)->connection, &(*msg)->wpos); + tx_fiber_init((*msg)->connection->session, (*msg)->header.sync); + return 0; } /** @@ -1507,7 +1572,14 @@ tx_reply_error(struct iproto_msg *msg) static void tx_reply_iproto_error(struct cmsg *m) { - struct iproto_msg *msg = tx_accept_msg(m); + struct iproto_msg *msg; + /* + * We don't need to check tx_accept_msg() return value here + * as far as if we might only process iproto error in tx + * in case connection session is already created and + * thus tx_accept_msg() can't fail. + */ + tx_accept_msg(m, &msg); struct obuf *out = msg->connection->tx.p_obuf; iproto_reply_error(out, diag_last_error(&msg->diag), msg->header.sync, ::schema_version); @@ -1527,7 +1599,9 @@ tx_inject_delay(void) static void tx_process1(struct cmsg *m) { - struct iproto_msg *msg = tx_accept_msg(m); + struct iproto_msg *msg; + if (tx_accept_msg(m, &msg) != 0) + goto error; if (tx_check_schema(msg->header.schema_version)) goto error; @@ -1553,17 +1627,20 @@ error: static void tx_process_select(struct cmsg *m) { - struct iproto_msg *msg = tx_accept_msg(m); + struct iproto_msg *msg; + struct request *req; struct obuf *out; struct obuf_svp svp; struct port port; int count; int rc; - struct request *req = &msg->dml; + if (tx_accept_msg(m, &msg) != 0) + goto error; if (tx_check_schema(msg->header.schema_version)) goto error; tx_inject_delay(); + req = &msg->dml; rc = box_select(req->space_id, req->index_id, req->iterator, req->offset, req->limit, req->key, req->key_end, &port); @@ -1607,7 +1684,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event) static void tx_process_call(struct cmsg *m) { - struct iproto_msg *msg = tx_accept_msg(m); + struct iproto_msg *msg; + if (tx_accept_msg(m, &msg) != 0) + goto error; if (tx_check_schema(msg->header.schema_version)) goto error; @@ -1686,13 +1765,15 @@ error: static void tx_process_misc(struct cmsg *m) { - struct iproto_msg *msg = tx_accept_msg(m); - struct iproto_connection *con = msg->connection; - struct obuf *out = con->tx.p_obuf; + struct iproto_msg *msg; + if (tx_accept_msg(m, &msg) != 0) + goto error; if (tx_check_schema(msg->header.schema_version)) goto error; try { + struct iproto_connection *con = msg->connection; + struct obuf *out = con->tx.p_obuf; struct ballot ballot; switch (msg->header.type) { case IPROTO_AUTH: @@ -1729,7 +1810,7 @@ error: static void tx_process_sql(struct cmsg *m) { - struct iproto_msg *msg = tx_accept_msg(m); + struct iproto_msg *msg; struct obuf *out; struct port port; struct sql_bind *bind = NULL; @@ -1738,6 +1819,8 @@ tx_process_sql(struct cmsg *m) uint32_t len; bool is_unprepare = false; + if (tx_accept_msg(m, &msg) != 0) + goto error; if (tx_check_schema(msg->header.schema_version)) goto error; assert(msg->header.type == IPROTO_EXECUTE || @@ -1825,7 +1908,11 @@ error: static void tx_process_replication(struct cmsg *m) { - struct iproto_msg *msg = tx_accept_msg(m); + struct iproto_msg *msg; + if (tx_accept_msg(m, &msg) != 0) { + tx_reply_error(msg); + return; + } struct iproto_connection *con = msg->connection; struct ev_io io; coio_create(&io, con->input.fd); @@ -1865,6 +1952,29 @@ tx_process_replication(struct cmsg *m) } } +/** + * Check connection health and try to send an error to the client + * in case of internal connection init or on_connect trigger failure. + */ +static bool +iproto_connection_fail(struct iproto_msg *msg) +{ + if (!msg->close_connection) + return false; + struct iproto_connection *con = msg->connection; + int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov, + obuf_iovcnt(msg->wpos.obuf)); + if (nwr > 0) { + /* Count statistics. */ + rmean_collect(rmean_net, IPROTO_SENT, nwr); + } else if (nwr < 0 && ! sio_wouldblock(errno)) { + diag_log(); + } + iproto_connection_close(con); + iproto_msg_delete(msg); + return true; +} + static void net_send_msg(struct cmsg *m) { @@ -1881,6 +1991,9 @@ net_send_msg(struct cmsg *m) } con->wend = msg->wpos; + if (iproto_connection_fail(msg)) + return; + if (evio_has_fd(&con->output)) { if (! ev_is_active(&con->output)) ev_feed_event(con->loop, &con->output, EV_WRITE); @@ -1910,6 +2023,10 @@ net_end_join(struct cmsg *m) struct iproto_connection *con = msg->connection; msg->p_ibuf->rpos += msg->len; + + if (iproto_connection_fail(msg)) + return; + iproto_msg_delete(msg); assert(! ev_is_active(&con->input)); @@ -1928,6 +2045,10 @@ net_end_subscribe(struct cmsg *m) struct iproto_connection *con = msg->connection; msg->p_ibuf->rpos += msg->len; + + if (iproto_connection_fail(msg)) + return; + iproto_msg_delete(msg); assert(! ev_is_active(&con->input)); @@ -1936,81 +2057,60 @@ net_end_subscribe(struct cmsg *m) } /** - * Handshake a connection: invoke the on-connect trigger - * and possibly authenticate. Try to send the client an error - * upon a failure. + * Handshake a connection: prepare greeting for it. */ static void -tx_process_connect(struct cmsg *m) +iproto_process_connect(struct iproto_msg *msg) { - struct iproto_msg *msg = (struct iproto_msg *) m; struct iproto_connection *con = msg->connection; struct obuf *out = msg->connection->tx.p_obuf; - try { /* connect. */ - con->session = session_create(SESSION_TYPE_BINARY); - if (con->session == NULL) - diag_raise(); - con->session->meta.connection = con; - tx_fiber_init(con->session, 0); - char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE); - /* TODO: dirty read from tx thread */ - struct tt_uuid uuid = INSTANCE_UUID; - random_bytes(con->salt, IPROTO_SALT_SIZE); - greeting_encode(greeting, tarantool_version_id(), &uuid, - con->salt, IPROTO_SALT_SIZE); - obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE); - if (! rlist_empty(&session_on_connect)) { - if (session_run_on_connect_triggers(con->session) != 0) - diag_raise(); - } + char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE); + /* TODO: dirty read from tx thread */ + struct tt_uuid uuid = INSTANCE_UUID; + random_bytes(con->salt, IPROTO_SALT_SIZE); + greeting_encode(greeting, tarantool_version_id(), &uuid, + con->salt, IPROTO_SALT_SIZE); + if (obuf_dup(out, greeting, IPROTO_GREETING_SIZE) + != IPROTO_GREETING_SIZE) { + diag_set(OutOfMemory, IPROTO_GREETING_SIZE, + "greeting obuf", "dup"); + iproto_reply_error(out, diag_last_error(&fiber()->diag), + msg->header.sync, ::schema_version); iproto_wpos_create(&msg->wpos, out); - } catch (Exception *e) { - tx_reply_error(msg); msg->close_connection = true; - } -} - -/** - * Send a response to connect to the client or close the - * connection in case on_connect trigger failed. - */ -static void -net_send_greeting(struct cmsg *m) -{ - struct iproto_msg *msg = (struct iproto_msg *) m; - struct iproto_connection *con = msg->connection; - if (msg->close_connection) { - struct obuf *out = msg->wpos.obuf; - int64_t nwr = sio_writev(con->output.fd, out->iov, - obuf_iovcnt(out)); - - if (nwr > 0) { - /* Count statistics. */ - rmean_collect(rmean_net, IPROTO_SENT, nwr); - } else if (nwr < 0 && ! sio_wouldblock(errno)) { - diag_log(); - } - assert(iproto_connection_is_idle(con)); - iproto_connection_close(con); - iproto_msg_delete(msg); + iproto_connection_fail(msg); return; } + iproto_wpos_create(&msg->wpos, out); con->wend = msg->wpos; - /* - * Connect is synchronous, so no one could have been - * messing up with the connection while it was in - * progress. - */ assert(evio_has_fd(&con->output)); - /* Handshake OK, start reading input. */ ev_feed_event(con->loop, &con->output, EV_WRITE); iproto_msg_delete(msg); } -static const struct cmsg_hop connect_route[] = { - { tx_process_connect, &net_pipe }, - { net_send_greeting, NULL }, -}; +static int +tx_init_connect(struct iproto_msg *msg) +{ + struct iproto_connection *con = msg->connection; + obuf_create(&con->obuf[0], &net_slabc, iproto_readahead); + obuf_create(&con->obuf[1], &net_slabc, iproto_readahead); + con->tx.p_obuf = &con->obuf[0]; + iproto_wpos_create(&con->wpos, con->tx.p_obuf); + iproto_wpos_create(&con->wend, con->tx.p_obuf); + + con->session = session_create(SESSION_TYPE_BINARY); + if (con->session == NULL) + return -1; + con->session->meta.connection = con; + + tx_fiber_init(con->session, 0); + if (! rlist_empty(&session_on_connect)) { + if (session_run_on_connect_triggers(con->session) != 0) + return -1; + } + + return 0; +} /** }}} */ @@ -2037,11 +2137,10 @@ iproto_on_accept(struct evio_service * /* service */, int fd, mempool_free(&iproto_connection_pool, con); return -1; } - cmsg_init(&msg->base, connect_route); msg->p_ibuf = con->p_ibuf; msg->wpos = con->wpos; msg->close_connection = false; - cpipe_push(&tx_pipe, &msg->base); + iproto_process_connect(msg); return 0; } @@ -2054,6 +2153,8 @@ static struct evio_service binary; /* iproto binary listener */ static int net_cord_f(va_list /* ap */) { + slab_cache_create(&iproto_slabc, &runtime); + mempool_create(&iproto_msg_pool, &cord()->slabc, sizeof(struct iproto_msg)); mempool_create(&iproto_connection_pool, &cord()->slabc, @@ -2297,7 +2398,8 @@ iproto_listen(const char *uri) size_t iproto_mem_used(void) { - return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc); + return slab_cache_used(&net_cord.slabc) + + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc); } size_t diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result index 5d064b7648..bfa9c2b759 100644 --- a/test/box-py/bad_trigger.result +++ b/test/box-py/bad_trigger.result @@ -14,6 +14,7 @@ type(box.session.on_connect(f1)) - function ... greeting: True +Nothing to read yet: Resource temporarily unavailable fixheader: True error code 32 error message: [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value) diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py index 7d200b9218..4739dfe136 100644 --- a/test/box-py/bad_trigger.test.py +++ b/test/box-py/bad_trigger.test.py @@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection from lib.tarantool_connection import TarantoolConnection from tarantool import NetworkError from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \ - REQUEST_TYPE_ERROR + REQUEST_TYPE_ERROR, REQUEST_TYPE_PING import socket import msgpack @@ -26,9 +26,25 @@ s = conn.socket # Read greeting print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE -# Read error packet +# Check socket IPROTO_FIXHEADER_SIZE = 5 -fixheader = s.recv(IPROTO_FIXHEADER_SIZE) +s.setblocking(False) +fixheader = None +try: + fixheader = s.recv(IPROTO_FIXHEADER_SIZE) +except socket.error as err: + print 'Nothing to read yet:', str(err).split(']')[1] +else: + print 'Received fixheader' +s.setblocking(True) + +# Send ping +query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING }) +s.send(msgpack.dumps(len(query)) + query) + +# Read error packet +if not fixheader: + fixheader = s.recv(IPROTO_FIXHEADER_SIZE) print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE unpacker.feed(fixheader) packet_len = unpacker.unpack() -- 2.17.1 ^ permalink raw reply [flat|nested] 3+ messages in thread
* Re: [Tarantool-patches] [PATCH v2] iproto: make iproto thread more independent from tx 2020-09-18 13:09 [Tarantool-patches] [PATCH v2] iproto: make iproto thread more independent from tx Ilya Kosarev @ 2020-09-21 15:51 ` Leonid Vasiliev 2020-09-25 22:57 ` Ilya Kosarev 0 siblings, 1 reply; 3+ messages in thread From: Leonid Vasiliev @ 2020-09-21 15:51 UTC (permalink / raw) To: Ilya Kosarev, alyapunov; +Cc: tarantool-patches Hi! Thank you for the patch. LGTM. All comments below are at your discretion: On 9/18/20 4:09 PM, Ilya Kosarev wrote: > On connection, an evio service callback is invoked to accept it. The > next step after acception was to process connection to tx thread > through cbus. This meant that any connection interaction involves > tx thread even before we get to decode what does the client want > from us. Consequently, a number of problems appears. The main one > is that we might get descriptor leak in case of unresponsive tx thread > (for example, when building secondary index). There are some other > cases where we might not want to spend precious tx time to process > the connection in case iproto can do it all alone. > This patch allows iproto to accept connection and send greeting by > itself. The connection is initialized in tx thread when the real > request comes through iproto_msg_decode(). In case request type was not > recognized we can also send reply with an error without using tx. It is > planned to add more iproto logic to prevent extra interaction with > tx thread. This patch already to some extent solves descriptors leakage > problem as far as connection establishes and stays in fetch_schema > state while tx thread is unresponsive. > The other user visible change is that on_connect triggers won't run on > connections that don't provide any input, as reflected in > bad_trigger.test.py. > > Part of #3776 > > @TarantoolBot document > Title: iproto: on_connect triggers execution > Update the documentation for on_connect triggers to reflect that they > are now being executed only with the first request. Though the triggers > are still the first thing to be executed on a new connection. While it > is quite a synthetic case to establish a connection without making > any requests it is technically possible and now your triggers won't be > executed in this case. Some request is required to start their > execution. > --- > Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto > Issue: https://github.com/tarantool/tarantool/issues/3776 > > @ChangeLog: > * Make iproto thread more independent from tx (gh-3776). - Changelog about changes visible to the user. I would suggest writing about on_connect triggers here. on_disconnect triggers will also not work if there was not one message has been sent, as I understand it. > > Changes in v2: > - docbot request provided > - ChangeLog provided > - net_send_msg() used instead of net_send_error() where needed > - error cases made clear in iproto_msg_decode() > - replaced net_check_connection() with iproto_connection_fail() predicate > - improved tx session initialization fail processing to avoid extra tries > - fixed obuf_dup() fail processing in iproto_process_connect() > - added some comments > - some comments style fixed > > src/box/iproto.cc | 304 +++++++++++++++++++++----------- > test/box-py/bad_trigger.result | 1 + > test/box-py/bad_trigger.test.py | 22 ++- > 3 files changed, 223 insertions(+), 104 deletions(-) > > diff --git a/src/box/iproto.cc b/src/box/iproto.cc > index b8f65e5eca..67bbd357a3 100644 > --- a/src/box/iproto.cc > +++ b/src/box/iproto.cc > @@ -245,7 +245,7 @@ iproto_msg_new(struct iproto_connection *con); > static void > iproto_resume(void); > > -static void > +static int > iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, > bool *stop_input); > > @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg) > iproto_resume(); > } > > +static inline void > +iproto_connection_delete(struct iproto_connection *con); > + > /** > * A single global queue for all requests in all connections. All > * requests from all connections are processed concurrently. > @@ -280,6 +283,11 @@ static struct cord net_cord; > * in the tx thread. > */ > static struct slab_cache net_slabc; > +/** > + * Slab cache used for allocating memory for output network buffers > + * in the iproto thread. > + */ > +static struct slab_cache iproto_slabc; > > struct rmean *rmean_net; > > @@ -298,6 +306,9 @@ const char *rmean_net_strings[IPROTO_LAST] = { > "REQUESTS", > }; > > +static int > +tx_init_connect(struct iproto_msg *msg); > + > static void > tx_process_destroy(struct cmsg *m); > > @@ -463,6 +474,7 @@ struct iproto_connection > struct ev_io output; > /** Logical session. */ > struct session *session; > + bool init_failed; > ev_loop *loop; > /** > * Pre-allocated disconnect msg. Is sent right after > @@ -650,7 +662,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con) > * other parts of the connection. > */ > con->state = IPROTO_CONNECTION_DESTROYED; > - cpipe_push(&tx_pipe, &con->destroy_msg); > + if (con->session != NULL) > + cpipe_push(&tx_pipe, &con->destroy_msg); > + else { > + /* > + * In case session was not created we can safely destroy > + * not involving tx thread. Thus we also need to destroy > + * obuf, which still belongs to iproto thread. > + */ > + obuf_destroy(&con->obuf[0]); > + obuf_destroy(&con->obuf[1]); > + iproto_connection_delete(con); > + } > } > > /** > @@ -677,9 +700,18 @@ iproto_connection_close(struct iproto_connection *con) > * is done only once. > */ > con->p_ibuf->wpos -= con->parse_size; > - cpipe_push(&tx_pipe, &con->disconnect_msg); > assert(con->state == IPROTO_CONNECTION_ALIVE); > con->state = IPROTO_CONNECTION_CLOSED; > + rlist_del(&con->in_stop_list); > + if (con->session != NULL) > + cpipe_push(&tx_pipe, &con->disconnect_msg); > + else > + /* > + * In case session was not created we can safely > + * try to start destroy not involving tx thread. > + */ > + iproto_connection_try_to_start_destroy(con); > + return; > } else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) { > iproto_connection_try_to_start_destroy(con); > } else { > @@ -809,6 +841,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) > assert(rlist_empty(&con->in_stop_list)); > int n_requests = 0; > bool stop_input = false; > + bool obuf_in_iproto = (con->session == NULL); > const char *errmsg; > while (con->parse_size != 0 && !stop_input) { > if (iproto_check_msg_max()) { > @@ -853,12 +886,20 @@ err_msgpack: > > msg->len = reqend - reqstart; /* total request length */ > > - iproto_msg_decode(msg, &pos, reqend, &stop_input); > - /* > - * This can't throw, but should not be > - * done in case of exception. > - */ > - cpipe_push_input(&tx_pipe, &msg->base); > + if (iproto_msg_decode(msg, &pos, reqend, &stop_input) != -1) { > + if (obuf_in_iproto) { > + /* > + * If session was not created yet and obuf is > + * still in iproto we need to destroy it. New > + * one will be created in tx thread if needed. > + */ > + obuf_destroy(&con->obuf[0]); > + obuf_destroy(&con->obuf[1]); > + obuf_in_iproto = false; > + } > + cpipe_push_input(&tx_pipe, &msg->base); > + } > + > n_requests++; > /* Request is parsed */ > assert(reqend > reqstart); > @@ -1105,8 +1146,8 @@ iproto_connection_new(int fd) > ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE); > ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead); > ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead); > - obuf_create(&con->obuf[0], &net_slabc, iproto_readahead); > - obuf_create(&con->obuf[1], &net_slabc, iproto_readahead); > + obuf_create(&con->obuf[0], &iproto_slabc, iproto_readahead); > + obuf_create(&con->obuf[1], &iproto_slabc, iproto_readahead); > con->p_ibuf = &con->ibuf[0]; > con->tx.p_obuf = &con->obuf[0]; > iproto_wpos_create(&con->wpos, con->tx.p_obuf); > @@ -1114,6 +1155,7 @@ iproto_connection_new(int fd) > con->parse_size = 0; > con->long_poll_count = 0; > con->session = NULL; > + con->init_failed = false; > rlist_create(&con->in_stop_list); > /* It may be very awkward to allocate at close. */ > cmsg_init(&con->destroy_msg, destroy_route); > @@ -1134,10 +1176,6 @@ iproto_connection_delete(struct iproto_connection *con) > assert(!evio_has_fd(&con->input)); > assert(con->session == NULL); > assert(con->state == IPROTO_CONNECTION_DESTROYED); > - /* > - * The output buffers must have been deleted > - * in tx thread. > - */ > ibuf_destroy(&con->ibuf[0]); > ibuf_destroy(&con->ibuf[1]); > assert(con->obuf[0].pos == 0 && > @@ -1172,6 +1210,9 @@ tx_reply_error(struct iproto_msg *msg); > static void > tx_reply_iproto_error(struct cmsg *m); > > +static void > +tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos); > + > static void > net_send_msg(struct cmsg *msg); > > @@ -1244,7 +1285,7 @@ static const struct cmsg_hop error_route[] = { > { net_send_error, NULL }, > }; > > -static void > +static int > iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, > bool *stop_input) > { > @@ -1314,13 +1355,27 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, > (uint32_t) type); - You have three possible return values. Maybe add a function description? > goto error; > } > - return; > + return 0; > error: > /** Log and send the error. */ > diag_log(); > diag_create(&msg->diag); > diag_move(&fiber()->diag, &msg->diag); > - cmsg_init(&msg->base, error_route); > + if (msg->connection->session != NULL) { > + cmsg_init(&msg->base, error_route); > + return 1; > + } > + /* > + * In case session was not created we can process error path > + * without tx thread. > + */ > + tx_accept_wpos(msg->connection, &msg->wpos); > + struct obuf *out = msg->connection->tx.p_obuf; > + iproto_reply_error(out, diag_last_error(&msg->diag), > + msg->header.sync, ::schema_version); > + iproto_wpos_create(&msg->wpos, out); > + net_send_msg(&(msg->base)); > + return -1; > } - I meant something like: /** Log and send the error. */ diag_log(); if (msg->connection->session != NULL) { diag_create(&msg->diag); diag_move(&fiber()->diag, &msg->diag); cmsg_init(&msg->base, error_route); return 1; } /* * In case session was not created we can process error path * without tx thread. */ tx_accept_wpos(msg->connection, &msg->wpos); struct obuf *out = msg->connection->tx.p_obuf; iproto_reply_error(out, diag_last_error(&fiber()->diag), msg->header.sync, ::schema_version); iproto_wpos_create(&msg->wpos, out); net_send_msg(&(msg->base)); return -1; > > static void > @@ -1382,10 +1437,6 @@ tx_process_destroy(struct cmsg *m) > session_destroy(con->session); > con->session = NULL; /* safety */ > } > - /* > - * Got to be done in iproto thread since > - * that's where the memory is allocated. > - */ > obuf_destroy(&con->obuf[0]); > obuf_destroy(&con->obuf[1]); > } > @@ -1478,13 +1529,27 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos) > } > } > > -static inline struct iproto_msg * > -tx_accept_msg(struct cmsg *m) > +static inline int > +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg) > { > - struct iproto_msg *msg = (struct iproto_msg *) m; > - tx_accept_wpos(msg->connection, &msg->wpos); > - tx_fiber_init(msg->connection->session, msg->header.sync); > - return msg; > + *msg = (struct iproto_msg *) m; > + /* > + * In case connection init failed we don't need to try anymore. > + */ > + if ((*msg)->connection->init_failed) > + return -1; > + /* > + * In case session was not created we need to init connection in tx and > + * create it here. > + */ > + if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) { > + (*msg)->connection->init_failed = true; > + (*msg)->close_connection = true; > + return -1; > + } > + tx_accept_wpos((*msg)->connection, &(*msg)->wpos); > + tx_fiber_init((*msg)->connection->session, (*msg)->header.sync); > + return 0; > } > > /** > @@ -1507,7 +1572,14 @@ tx_reply_error(struct iproto_msg *msg) > static void > tx_reply_iproto_error(struct cmsg *m) > { > - struct iproto_msg *msg = tx_accept_msg(m); > + struct iproto_msg *msg; > + /* > + * We don't need to check tx_accept_msg() return value here > + * as far as if we might only process iproto error in tx > + * in case connection session is already created and > + * thus tx_accept_msg() can't fail. > + */ > + tx_accept_msg(m, &msg); > struct obuf *out = msg->connection->tx.p_obuf; > iproto_reply_error(out, diag_last_error(&msg->diag), > msg->header.sync, ::schema_version); > @@ -1527,7 +1599,9 @@ tx_inject_delay(void) > static void > tx_process1(struct cmsg *m) > { > - struct iproto_msg *msg = tx_accept_msg(m); > + struct iproto_msg *msg; > + if (tx_accept_msg(m, &msg) != 0) > + goto error; > if (tx_check_schema(msg->header.schema_version)) > goto error; > > @@ -1553,17 +1627,20 @@ error: > static void > tx_process_select(struct cmsg *m) > { > - struct iproto_msg *msg = tx_accept_msg(m); > + struct iproto_msg *msg; > + struct request *req; > struct obuf *out; > struct obuf_svp svp; > struct port port; > int count; > int rc; > - struct request *req = &msg->dml; > + if (tx_accept_msg(m, &msg) != 0) > + goto error; > if (tx_check_schema(msg->header.schema_version)) > goto error; > > tx_inject_delay(); > + req = &msg->dml; > rc = box_select(req->space_id, req->index_id, > req->iterator, req->offset, req->limit, > req->key, req->key_end, &port); > @@ -1607,7 +1684,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event) > static void > tx_process_call(struct cmsg *m) > { > - struct iproto_msg *msg = tx_accept_msg(m); > + struct iproto_msg *msg; > + if (tx_accept_msg(m, &msg) != 0) > + goto error; > if (tx_check_schema(msg->header.schema_version)) > goto error; > > @@ -1686,13 +1765,15 @@ error: > static void > tx_process_misc(struct cmsg *m) > { > - struct iproto_msg *msg = tx_accept_msg(m); > - struct iproto_connection *con = msg->connection; > - struct obuf *out = con->tx.p_obuf; > + struct iproto_msg *msg; > + if (tx_accept_msg(m, &msg) != 0) > + goto error; > if (tx_check_schema(msg->header.schema_version)) > goto error; > > try { > + struct iproto_connection *con = msg->connection; > + struct obuf *out = con->tx.p_obuf; > struct ballot ballot; > switch (msg->header.type) { > case IPROTO_AUTH: > @@ -1729,7 +1810,7 @@ error: > static void > tx_process_sql(struct cmsg *m) > { > - struct iproto_msg *msg = tx_accept_msg(m); > + struct iproto_msg *msg; > struct obuf *out; > struct port port; > struct sql_bind *bind = NULL; > @@ -1738,6 +1819,8 @@ tx_process_sql(struct cmsg *m) > uint32_t len; > bool is_unprepare = false; > > + if (tx_accept_msg(m, &msg) != 0) > + goto error; > if (tx_check_schema(msg->header.schema_version)) > goto error; > assert(msg->header.type == IPROTO_EXECUTE || > @@ -1825,7 +1908,11 @@ error: > static void > tx_process_replication(struct cmsg *m) > { > - struct iproto_msg *msg = tx_accept_msg(m); > + struct iproto_msg *msg; > + if (tx_accept_msg(m, &msg) != 0) { > + tx_reply_error(msg); > + return; > + } > struct iproto_connection *con = msg->connection; > struct ev_io io; > coio_create(&io, con->input.fd); > @@ -1865,6 +1952,29 @@ tx_process_replication(struct cmsg *m) > } > } > > +/** > + * Check connection health and try to send an error to the client > + * in case of internal connection init or on_connect trigger failure. > + */ > +static bool > +iproto_connection_fail(struct iproto_msg *msg) > +{ > + if (!msg->close_connection) > + return false; > + struct iproto_connection *con = msg->connection; > + int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov, > + obuf_iovcnt(msg->wpos.obuf)); > + if (nwr > 0) { > + /* Count statistics. */ > + rmean_collect(rmean_net, IPROTO_SENT, nwr); > + } else if (nwr < 0 && ! sio_wouldblock(errno)) { > + diag_log(); > + } > + iproto_connection_close(con); > + iproto_msg_delete(msg); > + return true; > +} > + > static void > net_send_msg(struct cmsg *m) > { > @@ -1881,6 +1991,9 @@ net_send_msg(struct cmsg *m) > } > con->wend = msg->wpos; > > + if (iproto_connection_fail(msg)) > + return; > + > if (evio_has_fd(&con->output)) { > if (! ev_is_active(&con->output)) > ev_feed_event(con->loop, &con->output, EV_WRITE); > @@ -1910,6 +2023,10 @@ net_end_join(struct cmsg *m) > struct iproto_connection *con = msg->connection; > > msg->p_ibuf->rpos += msg->len; > + > + if (iproto_connection_fail(msg)) > + return; > + > iproto_msg_delete(msg); > > assert(! ev_is_active(&con->input)); > @@ -1928,6 +2045,10 @@ net_end_subscribe(struct cmsg *m) > struct iproto_connection *con = msg->connection; > > msg->p_ibuf->rpos += msg->len; > + > + if (iproto_connection_fail(msg)) > + return; > + > iproto_msg_delete(msg); > > assert(! ev_is_active(&con->input)); > @@ -1936,81 +2057,60 @@ net_end_subscribe(struct cmsg *m) > } > > /** > - * Handshake a connection: invoke the on-connect trigger > - * and possibly authenticate. Try to send the client an error > - * upon a failure. > + * Handshake a connection: prepare greeting for it. > */ > static void > -tx_process_connect(struct cmsg *m) > +iproto_process_connect(struct iproto_msg *msg) > { > - struct iproto_msg *msg = (struct iproto_msg *) m; > struct iproto_connection *con = msg->connection; > struct obuf *out = msg->connection->tx.p_obuf; > - try { /* connect. */ > - con->session = session_create(SESSION_TYPE_BINARY); > - if (con->session == NULL) > - diag_raise(); > - con->session->meta.connection = con; > - tx_fiber_init(con->session, 0); > - char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE); > - /* TODO: dirty read from tx thread */ > - struct tt_uuid uuid = INSTANCE_UUID; > - random_bytes(con->salt, IPROTO_SALT_SIZE); > - greeting_encode(greeting, tarantool_version_id(), &uuid, > - con->salt, IPROTO_SALT_SIZE); > - obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE); > - if (! rlist_empty(&session_on_connect)) { > - if (session_run_on_connect_triggers(con->session) != 0) > - diag_raise(); > - } > + char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE); > + /* TODO: dirty read from tx thread */ > + struct tt_uuid uuid = INSTANCE_UUID; > + random_bytes(con->salt, IPROTO_SALT_SIZE); > + greeting_encode(greeting, tarantool_version_id(), &uuid, > + con->salt, IPROTO_SALT_SIZE); > + if (obuf_dup(out, greeting, IPROTO_GREETING_SIZE) > + != IPROTO_GREETING_SIZE) { > + diag_set(OutOfMemory, IPROTO_GREETING_SIZE, > + "greeting obuf", "dup"); > + iproto_reply_error(out, diag_last_error(&fiber()->diag), > + msg->header.sync, ::schema_version); > iproto_wpos_create(&msg->wpos, out); > - } catch (Exception *e) { > - tx_reply_error(msg); > msg->close_connection = true; > - } > -} > - > -/** > - * Send a response to connect to the client or close the > - * connection in case on_connect trigger failed. > - */ > -static void > -net_send_greeting(struct cmsg *m) > -{ > - struct iproto_msg *msg = (struct iproto_msg *) m; > - struct iproto_connection *con = msg->connection; > - if (msg->close_connection) { > - struct obuf *out = msg->wpos.obuf; > - int64_t nwr = sio_writev(con->output.fd, out->iov, > - obuf_iovcnt(out)); > - > - if (nwr > 0) { > - /* Count statistics. */ > - rmean_collect(rmean_net, IPROTO_SENT, nwr); > - } else if (nwr < 0 && ! sio_wouldblock(errno)) { > - diag_log(); > - } > - assert(iproto_connection_is_idle(con)); > - iproto_connection_close(con); > - iproto_msg_delete(msg); > + iproto_connection_fail(msg); > return; > } > + iproto_wpos_create(&msg->wpos, out); > con->wend = msg->wpos; > - /* > - * Connect is synchronous, so no one could have been > - * messing up with the connection while it was in > - * progress. > - */ > assert(evio_has_fd(&con->output)); > - /* Handshake OK, start reading input. */ > ev_feed_event(con->loop, &con->output, EV_WRITE); > iproto_msg_delete(msg); > } > > -static const struct cmsg_hop connect_route[] = { > - { tx_process_connect, &net_pipe }, > - { net_send_greeting, NULL }, > -}; > +static int > +tx_init_connect(struct iproto_msg *msg) > +{ > + struct iproto_connection *con = msg->connection; > + obuf_create(&con->obuf[0], &net_slabc, iproto_readahead); > + obuf_create(&con->obuf[1], &net_slabc, iproto_readahead); > + con->tx.p_obuf = &con->obuf[0]; > + iproto_wpos_create(&con->wpos, con->tx.p_obuf); > + iproto_wpos_create(&con->wend, con->tx.p_obuf); > + > + con->session = session_create(SESSION_TYPE_BINARY); > + if (con->session == NULL) > + return -1; > + con->session->meta.connection = con; > + > + tx_fiber_init(con->session, 0); > + if (! rlist_empty(&session_on_connect)) { > + if (session_run_on_connect_triggers(con->session) != 0) > + return -1; > + } > + > + return 0; > +} > > /** }}} */ > > @@ -2037,11 +2137,10 @@ iproto_on_accept(struct evio_service * /* service */, int fd, > mempool_free(&iproto_connection_pool, con); > return -1; > } > - cmsg_init(&msg->base, connect_route); > msg->p_ibuf = con->p_ibuf; > msg->wpos = con->wpos; > msg->close_connection = false; > - cpipe_push(&tx_pipe, &msg->base); > + iproto_process_connect(msg); > return 0; > } > > @@ -2054,6 +2153,8 @@ static struct evio_service binary; /* iproto binary listener */ > static int > net_cord_f(va_list /* ap */) > { > + slab_cache_create(&iproto_slabc, &runtime); > + > mempool_create(&iproto_msg_pool, &cord()->slabc, > sizeof(struct iproto_msg)); > mempool_create(&iproto_connection_pool, &cord()->slabc, > @@ -2297,7 +2398,8 @@ iproto_listen(const char *uri) > size_t > iproto_mem_used(void) > { > - return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc); > + return slab_cache_used(&net_cord.slabc) > + + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc); > } > > size_t > diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result > index 5d064b7648..bfa9c2b759 100644 > --- a/test/box-py/bad_trigger.result > +++ b/test/box-py/bad_trigger.result > @@ -14,6 +14,7 @@ type(box.session.on_connect(f1)) > - function > ... > greeting: True > +Nothing to read yet: Resource temporarily unavailable > fixheader: True > error code 32 > error message: [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value) > diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py > index 7d200b9218..4739dfe136 100644 > --- a/test/box-py/bad_trigger.test.py > +++ b/test/box-py/bad_trigger.test.py > @@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection > from lib.tarantool_connection import TarantoolConnection > from tarantool import NetworkError > from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \ > - REQUEST_TYPE_ERROR > + REQUEST_TYPE_ERROR, REQUEST_TYPE_PING > import socket > import msgpack > > @@ -26,9 +26,25 @@ s = conn.socket > # Read greeting > print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE > > -# Read error packet > +# Check socket > IPROTO_FIXHEADER_SIZE = 5 > -fixheader = s.recv(IPROTO_FIXHEADER_SIZE) > +s.setblocking(False) > +fixheader = None > +try: > + fixheader = s.recv(IPROTO_FIXHEADER_SIZE) > +except socket.error as err: > + print 'Nothing to read yet:', str(err).split(']')[1] > +else: > + print 'Received fixheader' > +s.setblocking(True) > + > +# Send ping > +query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING }) > +s.send(msgpack.dumps(len(query)) + query) > + > +# Read error packet > +if not fixheader: > + fixheader = s.recv(IPROTO_FIXHEADER_SIZE) > print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE > unpacker.feed(fixheader) > packet_len = unpacker.unpack() > ^ permalink raw reply [flat|nested] 3+ messages in thread
* Re: [Tarantool-patches] [PATCH v2] iproto: make iproto thread more independent from tx 2020-09-21 15:51 ` Leonid Vasiliev @ 2020-09-25 22:57 ` Ilya Kosarev 0 siblings, 0 replies; 3+ messages in thread From: Ilya Kosarev @ 2020-09-25 22:57 UTC (permalink / raw) To: Leonid Vasiliev; +Cc: tarantool-patches [-- Attachment #1: Type: text/plain, Size: 24682 bytes --] Hi! Thanks for the review. 4 answers below. Fixed all your comments in v3, pushed & sent to the next reviewer. >Понедельник, 21 сентября 2020, 18:52 +03:00 от Leonid Vasiliev <lvasiliev@tarantool.org>: > >Hi! Thank you for the patch. > >LGTM. > >All comments below are at your discretion: > >On 9/18/20 4:09 PM, Ilya Kosarev wrote: >> On connection, an evio service callback is invoked to accept it. The >> next step after acception was to process connection to tx thread >> through cbus. This meant that any connection interaction involves >> tx thread even before we get to decode what does the client want >> from us. Consequently, a number of problems appears. The main one >> is that we might get descriptor leak in case of unresponsive tx thread >> (for example, when building secondary index). There are some other >> cases where we might not want to spend precious tx time to process >> the connection in case iproto can do it all alone. >> This patch allows iproto to accept connection and send greeting by >> itself. The connection is initialized in tx thread when the real >> request comes through iproto_msg_decode(). In case request type was not >> recognized we can also send reply with an error without using tx. It is >> planned to add more iproto logic to prevent extra interaction with >> tx thread. This patch already to some extent solves descriptors leakage >> problem as far as connection establishes and stays in fetch_schema >> state while tx thread is unresponsive. >> The other user visible change is that on_connect triggers won't run on >> connections that don't provide any input, as reflected in >> bad_trigger.test.py. >> >> Part of #3776 >> >> @TarantoolBot document >> Title: iproto: on_connect triggers execution >> Update the documentation for on_connect triggers to reflect that they >> are now being executed only with the first request. Though the triggers >> are still the first thing to be executed on a new connection. While it >> is quite a synthetic case to establish a connection without making >> any requests it is technically possible and now your triggers won't be >> executed in this case. Some request is required to start their >> execution. >> --- >> Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto >> Issue: https://github.com/tarantool/tarantool/issues/3776 >> >> @ChangeLog: >> * Make iproto thread more independent from tx (gh-3776). > >- Changelog about changes visible to the user. I would suggest writing >about on_connect triggers here. on_disconnect triggers will also not >work if there was not one message has been sent, as I understand it. * Right, i thought it is going to be too long for the ChangeLog. But actually it should be here. > >> >> Changes in v2: >> - docbot request provided >> - ChangeLog provided >> - net_send_msg() used instead of net_send_error() where needed >> - error cases made clear in iproto_msg_decode() >> - replaced net_check_connection() with iproto_connection_fail() predicate >> - improved tx session initialization fail processing to avoid extra tries >> - fixed obuf_dup() fail processing in iproto_process_connect() >> - added some comments >> - some comments style fixed >> >> src/box/iproto.cc | 304 +++++++++++++++++++++----------- >> test/box-py/bad_trigger.result | 1 + >> test/box-py/bad_trigger.test.py | 22 ++- >> 3 files changed, 223 insertions(+), 104 deletions(-) >> >> diff --git a/src/box/iproto.cc b/src/box/iproto.cc >> index b8f65e5eca..67bbd357a3 100644 >> --- a/src/box/iproto.cc >> +++ b/src/box/iproto.cc >> @@ -245,7 +245,7 @@ iproto_msg_new(struct iproto_connection *con); >> static void >> iproto_resume(void); >> >> -static void >> +static int >> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, >> bool *stop_input); >> >> @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg) >> iproto_resume(); >> } >> >> +static inline void >> +iproto_connection_delete(struct iproto_connection *con); >> + >> /** >> * A single global queue for all requests in all connections. All >> * requests from all connections are processed concurrently. >> @@ -280,6 +283,11 @@ static struct cord net_cord; >> * in the tx thread. >> */ >> static struct slab_cache net_slabc; >> +/** >> + * Slab cache used for allocating memory for output network buffers >> + * in the iproto thread. >> + */ >> +static struct slab_cache iproto_slabc; >> >> struct rmean *rmean_net; >> >> @@ -298,6 +306,9 @@ const char *rmean_net_strings[IPROTO_LAST] = { >> "REQUESTS", >> }; >> >> +static int >> +tx_init_connect(struct iproto_msg *msg); >> + >> static void >> tx_process_destroy(struct cmsg *m); >> >> @@ -463,6 +474,7 @@ struct iproto_connection >> struct ev_io output; >> /** Logical session. */ >> struct session *session; >> + bool init_failed; >> ev_loop *loop; >> /** >> * Pre-allocated disconnect msg. Is sent right after >> @@ -650,7 +662,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con) >> * other parts of the connection. >> */ >> con->state = IPROTO_CONNECTION_DESTROYED; >> - cpipe_push(&tx_pipe, &con->destroy_msg); >> + if (con->session != NULL) >> + cpipe_push(&tx_pipe, &con->destroy_msg); >> + else { >> + /* >> + * In case session was not created we can safely destroy >> + * not involving tx thread. Thus we also need to destroy >> + * obuf, which still belongs to iproto thread. >> + */ >> + obuf_destroy(&con->obuf[0]); >> + obuf_destroy(&con->obuf[1]); >> + iproto_connection_delete(con); >> + } >> } >> >> /** >> @@ -677,9 +700,18 @@ iproto_connection_close(struct iproto_connection *con) >> * is done only once. >> */ >> con->p_ibuf->wpos -= con->parse_size; >> - cpipe_push(&tx_pipe, &con->disconnect_msg); >> assert(con->state == IPROTO_CONNECTION_ALIVE); >> con->state = IPROTO_CONNECTION_CLOSED; >> + rlist_del(&con->in_stop_list); >> + if (con->session != NULL) >> + cpipe_push(&tx_pipe, &con->disconnect_msg); >> + else >> + /* >> + * In case session was not created we can safely >> + * try to start destroy not involving tx thread. >> + */ >> + iproto_connection_try_to_start_destroy(con); * Added brackets to make this if statement more clear. >> + return; >> } else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) { >> iproto_connection_try_to_start_destroy(con); >> } else { >> @@ -809,6 +841,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) >> assert(rlist_empty(&con->in_stop_list)); >> int n_requests = 0; >> bool stop_input = false; >> + bool obuf_in_iproto = (con->session == NULL); >> const char *errmsg; >> while (con->parse_size != 0 && !stop_input) { >> if (iproto_check_msg_max()) { >> @@ -853,12 +886,20 @@ err_msgpack: >> >> msg->len = reqend - reqstart; /* total request length */ >> >> - iproto_msg_decode(msg, &pos, reqend, &stop_input); >> - /* >> - * This can't throw, but should not be >> - * done in case of exception. >> - */ >> - cpipe_push_input(&tx_pipe, &msg->base); >> + if (iproto_msg_decode(msg, &pos, reqend, &stop_input) != -1) { >> + if (obuf_in_iproto) { >> + /* >> + * If session was not created yet and obuf is >> + * still in iproto we need to destroy it. New >> + * one will be created in tx thread if needed. >> + */ >> + obuf_destroy(&con->obuf[0]); >> + obuf_destroy(&con->obuf[1]); >> + obuf_in_iproto = false; >> + } >> + cpipe_push_input(&tx_pipe, &msg->base); >> + } >> + >> n_requests++; >> /* Request is parsed */ >> assert(reqend > reqstart); >> @@ -1105,8 +1146,8 @@ iproto_connection_new(int fd) >> ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE); >> ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead); >> ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead); >> - obuf_create(&con->obuf[0], &net_slabc, iproto_readahead); >> - obuf_create(&con->obuf[1], &net_slabc, iproto_readahead); >> + obuf_create(&con->obuf[0], &iproto_slabc, iproto_readahead); >> + obuf_create(&con->obuf[1], &iproto_slabc, iproto_readahead); >> con->p_ibuf = &con->ibuf[0]; >> con->tx.p_obuf = &con->obuf[0]; >> iproto_wpos_create(&con->wpos, con->tx.p_obuf); >> @@ -1114,6 +1155,7 @@ iproto_connection_new(int fd) >> con->parse_size = 0; >> con->long_poll_count = 0; >> con->session = NULL; >> + con->init_failed = false; >> rlist_create(&con->in_stop_list); >> /* It may be very awkward to allocate at close. */ >> cmsg_init(&con->destroy_msg, destroy_route); >> @@ -1134,10 +1176,6 @@ iproto_connection_delete(struct iproto_connection *con) >> assert(!evio_has_fd(&con->input)); >> assert(con->session == NULL); >> assert(con->state == IPROTO_CONNECTION_DESTROYED); >> - /* >> - * The output buffers must have been deleted >> - * in tx thread. >> - */ >> ibuf_destroy(&con->ibuf[0]); >> ibuf_destroy(&con->ibuf[1]); >> assert(con->obuf[0].pos == 0 && >> @@ -1172,6 +1210,9 @@ tx_reply_error(struct iproto_msg *msg); >> static void >> tx_reply_iproto_error(struct cmsg *m); >> >> +static void >> +tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos); >> + >> static void >> net_send_msg(struct cmsg *msg); >> >> @@ -1244,7 +1285,7 @@ static const struct cmsg_hop error_route[] = { >> { net_send_error, NULL }, >> }; >> >> -static void >> +static int >> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, >> bool *stop_input) >> { >> @@ -1314,13 +1355,27 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, >> (uint32_t) type); > >- You have three possible return values. Maybe add a function >description? * Right, done. > >> goto error; >> } >> - return; >> + return 0; >> error: >> /** Log and send the error. */ >> diag_log(); >> diag_create(&msg->diag); >> diag_move(&fiber()->diag, &msg->diag); >> - cmsg_init(&msg->base, error_route); >> + if (msg->connection->session != NULL) { >> + cmsg_init(&msg->base, error_route); >> + return 1; >> + } >> + /* >> + * In case session was not created we can process error path >> + * without tx thread. >> + */ >> + tx_accept_wpos(msg->connection, &msg->wpos); >> + struct obuf *out = msg->connection->tx.p_obuf; >> + iproto_reply_error(out, diag_last_error(&msg->diag), >> + msg->header.sync, ::schema_version); >> + iproto_wpos_create(&msg->wpos, out); >> + net_send_msg(&(msg->base)); >> + return -1; >> } > >- I meant something like: >/** Log and send the error. */ >diag_log(); >if (msg->connection->session != NULL) { >diag_create(&msg->diag); >diag_move(&fiber()->diag, &msg->diag); >cmsg_init(&msg->base, error_route); >return 1; >} >/* >* In case session was not created we can process error path >* without tx thread. >*/ >tx_accept_wpos(msg->connection, &msg->wpos); >struct obuf *out = msg->connection->tx.p_obuf; >iproto_reply_error(out, diag_last_error(&fiber()->diag), >msg->header.sync, ::schema_version); >iproto_wpos_create(&msg->wpos, out); >net_send_msg(&(msg->base)); >return -1; * Right, i see, i made it overcomplicated. We can also replace this block: struct obuf *out = msg->connection->tx.p_obuf; iproto_reply_error(out, diag_last_error(&fiber()->diag), msg->header.sync, ::schema_version); iproto_wpos_create(&msg->wpos, out) ; with an existent function. > >> >> static void >> @@ -1382,10 +1437,6 @@ tx_process_destroy(struct cmsg *m) >> session_destroy(con->session); >> con->session = NULL; /* safety */ >> } >> - /* >> - * Got to be done in iproto thread since >> - * that's where the memory is allocated. >> - */ >> obuf_destroy(&con->obuf[0]); >> obuf_destroy(&con->obuf[1]); >> } >> @@ -1478,13 +1529,27 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos) >> } >> } >> >> -static inline struct iproto_msg * >> -tx_accept_msg(struct cmsg *m) >> +static inline int >> +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg) >> { >> - struct iproto_msg *msg = (struct iproto_msg *) m; >> - tx_accept_wpos(msg->connection, &msg->wpos); >> - tx_fiber_init(msg->connection->session, msg->header.sync); >> - return msg; >> + *msg = (struct iproto_msg *) m; >> + /* >> + * In case connection init failed we don't need to try anymore. >> + */ >> + if ((*msg)->connection->init_failed) >> + return -1; >> + /* >> + * In case session was not created we need to init connection in tx and >> + * create it here. >> + */ >> + if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) { >> + (*msg)->connection->init_failed = true; >> + (*msg)->close_connection = true; >> + return -1; >> + } >> + tx_accept_wpos((*msg)->connection, &(*msg)->wpos); >> + tx_fiber_init((*msg)->connection->session, (*msg)->header.sync); >> + return 0; >> } >> >> /** >> @@ -1507,7 +1572,14 @@ tx_reply_error(struct iproto_msg *msg) >> static void >> tx_reply_iproto_error(struct cmsg *m) >> { >> - struct iproto_msg *msg = tx_accept_msg(m); >> + struct iproto_msg *msg; >> + /* >> + * We don't need to check tx_accept_msg() return value here >> + * as far as if we might only process iproto error in tx >> + * in case connection session is already created and >> + * thus tx_accept_msg() can't fail. >> + */ >> + tx_accept_msg(m, &msg); >> struct obuf *out = msg->connection->tx.p_obuf; >> iproto_reply_error(out, diag_last_error(&msg->diag), >> msg->header.sync, ::schema_version); >> @@ -1527,7 +1599,9 @@ tx_inject_delay(void) >> static void >> tx_process1(struct cmsg *m) >> { >> - struct iproto_msg *msg = tx_accept_msg(m); >> + struct iproto_msg *msg; >> + if (tx_accept_msg(m, &msg) != 0) >> + goto error; >> if (tx_check_schema(msg->header.schema_version)) >> goto error; >> >> @@ -1553,17 +1627,20 @@ error: >> static void >> tx_process_select(struct cmsg *m) >> { >> - struct iproto_msg *msg = tx_accept_msg(m); >> + struct iproto_msg *msg; >> + struct request *req; >> struct obuf *out; >> struct obuf_svp svp; >> struct port port; >> int count; >> int rc; >> - struct request *req = &msg->dml; >> + if (tx_accept_msg(m, &msg) != 0) >> + goto error; >> if (tx_check_schema(msg->header.schema_version)) >> goto error; >> >> tx_inject_delay(); >> + req = &msg->dml; >> rc = box_select(req->space_id, req->index_id, >> req->iterator, req->offset, req->limit, >> req->key, req->key_end, &port); >> @@ -1607,7 +1684,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event) >> static void >> tx_process_call(struct cmsg *m) >> { >> - struct iproto_msg *msg = tx_accept_msg(m); >> + struct iproto_msg *msg; >> + if (tx_accept_msg(m, &msg) != 0) >> + goto error; >> if (tx_check_schema(msg->header.schema_version)) >> goto error; >> >> @@ -1686,13 +1765,15 @@ error: >> static void >> tx_process_misc(struct cmsg *m) >> { >> - struct iproto_msg *msg = tx_accept_msg(m); >> - struct iproto_connection *con = msg->connection; >> - struct obuf *out = con->tx.p_obuf; >> + struct iproto_msg *msg; >> + if (tx_accept_msg(m, &msg) != 0) >> + goto error; >> if (tx_check_schema(msg->header.schema_version)) >> goto error; >> >> try { >> + struct iproto_connection *con = msg->connection; >> + struct obuf *out = con->tx.p_obuf; >> struct ballot ballot; >> switch (msg->header.type) { >> case IPROTO_AUTH: >> @@ -1729,7 +1810,7 @@ error: >> static void >> tx_process_sql(struct cmsg *m) >> { >> - struct iproto_msg *msg = tx_accept_msg(m); >> + struct iproto_msg *msg; >> struct obuf *out; >> struct port port; >> struct sql_bind *bind = NULL; >> @@ -1738,6 +1819,8 @@ tx_process_sql(struct cmsg *m) >> uint32_t len; >> bool is_unprepare = false; >> >> + if (tx_accept_msg(m, &msg) != 0) >> + goto error; >> if (tx_check_schema(msg->header.schema_version)) >> goto error; >> assert(msg->header.type == IPROTO_EXECUTE || >> @@ -1825,7 +1908,11 @@ error: >> static void >> tx_process_replication(struct cmsg *m) >> { >> - struct iproto_msg *msg = tx_accept_msg(m); >> + struct iproto_msg *msg; >> + if (tx_accept_msg(m, &msg) != 0) { >> + tx_reply_error(msg); >> + return; >> + } >> struct iproto_connection *con = msg->connection; >> struct ev_io io; >> coio_create(&io, con->input.fd); >> @@ -1865,6 +1952,29 @@ tx_process_replication(struct cmsg *m) >> } >> } >> >> +/** >> + * Check connection health and try to send an error to the client >> + * in case of internal connection init or on_connect trigger failure. >> + */ >> +static bool >> +iproto_connection_fail(struct iproto_msg *msg) >> +{ >> + if (!msg->close_connection) >> + return false; >> + struct iproto_connection *con = msg->connection; >> + int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov, >> + obuf_iovcnt(msg->wpos.obuf)); >> + if (nwr > 0) { >> + /* Count statistics. */ >> + rmean_collect(rmean_net, IPROTO_SENT, nwr); >> + } else if (nwr < 0 && ! sio_wouldblock(errno)) { >> + diag_log(); >> + } >> + iproto_connection_close(con); >> + iproto_msg_delete(msg); >> + return true; >> +} >> + >> static void >> net_send_msg(struct cmsg *m) >> { >> @@ -1881,6 +1991,9 @@ net_send_msg(struct cmsg *m) >> } >> con->wend = msg->wpos; >> >> + if (iproto_connection_fail(msg)) >> + return; >> + >> if (evio_has_fd(&con->output)) { >> if (! ev_is_active(&con->output)) >> ev_feed_event(con->loop, &con->output, EV_WRITE); >> @@ -1910,6 +2023,10 @@ net_end_join(struct cmsg *m) >> struct iproto_connection *con = msg->connection; >> >> msg->p_ibuf->rpos += msg->len; >> + >> + if (iproto_connection_fail(msg)) >> + return; >> + >> iproto_msg_delete(msg); >> >> assert(! ev_is_active(&con->input)); >> @@ -1928,6 +2045,10 @@ net_end_subscribe(struct cmsg *m) >> struct iproto_connection *con = msg->connection; >> >> msg->p_ibuf->rpos += msg->len; >> + >> + if (iproto_connection_fail(msg)) >> + return; >> + >> iproto_msg_delete(msg); >> >> assert(! ev_is_active(&con->input)); >> @@ -1936,81 +2057,60 @@ net_end_subscribe(struct cmsg *m) >> } >> >> /** >> - * Handshake a connection: invoke the on-connect trigger >> - * and possibly authenticate. Try to send the client an error >> - * upon a failure. >> + * Handshake a connection: prepare greeting for it. >> */ >> static void >> -tx_process_connect(struct cmsg *m) >> +iproto_process_connect(struct iproto_msg *msg) >> { >> - struct iproto_msg *msg = (struct iproto_msg *) m; >> struct iproto_connection *con = msg->connection; >> struct obuf *out = msg->connection->tx.p_obuf; >> - try { /* connect. */ >> - con->session = session_create(SESSION_TYPE_BINARY); >> - if (con->session == NULL) >> - diag_raise(); >> - con->session->meta.connection = con; >> - tx_fiber_init(con->session, 0); >> - char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE); >> - /* TODO: dirty read from tx thread */ >> - struct tt_uuid uuid = INSTANCE_UUID; >> - random_bytes(con->salt, IPROTO_SALT_SIZE); >> - greeting_encode(greeting, tarantool_version_id(), &uuid, >> - con->salt, IPROTO_SALT_SIZE); >> - obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE); >> - if (! rlist_empty(&session_on_connect)) { >> - if (session_run_on_connect_triggers(con->session) != 0) >> - diag_raise(); >> - } >> + char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE); >> + /* TODO: dirty read from tx thread */ >> + struct tt_uuid uuid = INSTANCE_UUID; >> + random_bytes(con->salt, IPROTO_SALT_SIZE); >> + greeting_encode(greeting, tarantool_version_id(), &uuid, >> + con->salt, IPROTO_SALT_SIZE); >> + if (obuf_dup(out, greeting, IPROTO_GREETING_SIZE) >> + != IPROTO_GREETING_SIZE) { >> + diag_set(OutOfMemory, IPROTO_GREETING_SIZE, >> + "greeting obuf", "dup"); >> + iproto_reply_error(out, diag_last_error(&fiber()->diag), >> + msg->header.sync, ::schema_version); >> iproto_wpos_create(&msg->wpos, out); >> - } catch (Exception *e) { >> - tx_reply_error(msg); >> msg->close_connection = true; >> - } >> -} >> - >> -/** >> - * Send a response to connect to the client or close the >> - * connection in case on_connect trigger failed. >> - */ >> -static void >> -net_send_greeting(struct cmsg *m) >> -{ >> - struct iproto_msg *msg = (struct iproto_msg *) m; >> - struct iproto_connection *con = msg->connection; >> - if (msg->close_connection) { >> - struct obuf *out = msg->wpos.obuf; >> - int64_t nwr = sio_writev(con->output.fd, out->iov, >> - obuf_iovcnt(out)); >> - >> - if (nwr > 0) { >> - /* Count statistics. */ >> - rmean_collect(rmean_net, IPROTO_SENT, nwr); >> - } else if (nwr < 0 && ! sio_wouldblock(errno)) { >> - diag_log(); >> - } >> - assert(iproto_connection_is_idle(con)); >> - iproto_connection_close(con); >> - iproto_msg_delete(msg); >> + iproto_connection_fail(msg); >> return; >> } >> + iproto_wpos_create(&msg->wpos, out); >> con->wend = msg->wpos; >> - /* >> - * Connect is synchronous, so no one could have been >> - * messing up with the connection while it was in >> - * progress. >> - */ >> assert(evio_has_fd(&con->output)); >> - /* Handshake OK, start reading input. */ >> ev_feed_event(con->loop, &con->output, EV_WRITE); >> iproto_msg_delete(msg); >> } >> >> -static const struct cmsg_hop connect_route[] = { >> - { tx_process_connect, &net_pipe }, >> - { net_send_greeting, NULL }, >> -}; >> +static int >> +tx_init_connect(struct iproto_msg *msg) >> +{ >> + struct iproto_connection *con = msg->connection; >> + obuf_create(&con->obuf[0], &net_slabc, iproto_readahead); >> + obuf_create(&con->obuf[1], &net_slabc, iproto_readahead); >> + con->tx.p_obuf = &con->obuf[0]; >> + iproto_wpos_create(&con->wpos, con->tx.p_obuf); >> + iproto_wpos_create(&con->wend, con->tx.p_obuf); >> + >> + con->session = session_create(SESSION_TYPE_BINARY); >> + if (con->session == NULL) >> + return -1; >> + con->session->meta.connection = con; >> + >> + tx_fiber_init(con->session, 0); >> + if (! rlist_empty(&session_on_connect)) { >> + if (session_run_on_connect_triggers(con->session) != 0) >> + return -1; >> + } >> + >> + return 0; >> +} >> >> /** }}} */ >> >> @@ -2037,11 +2137,10 @@ iproto_on_accept(struct evio_service * /* service */, int fd, >> mempool_free(&iproto_connection_pool, con); >> return -1; >> } >> - cmsg_init(&msg->base, connect_route); >> msg->p_ibuf = con->p_ibuf; >> msg->wpos = con->wpos; >> msg->close_connection = false; >> - cpipe_push(&tx_pipe, &msg->base); >> + iproto_process_connect(msg); >> return 0; >> } >> >> @@ -2054,6 +2153,8 @@ static struct evio_service binary; /* iproto binary listener */ >> static int >> net_cord_f(va_list /* ap */) >> { >> + slab_cache_create(&iproto_slabc, &runtime); >> + >> mempool_create(&iproto_msg_pool, &cord()->slabc, >> sizeof(struct iproto_msg)); >> mempool_create(&iproto_connection_pool, &cord()->slabc, >> @@ -2297,7 +2398,8 @@ iproto_listen(const char *uri) >> size_t >> iproto_mem_used(void) >> { >> - return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc); >> + return slab_cache_used(&net_cord.slabc) >> + + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc); >> } >> >> size_t >> diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result >> index 5d064b7648..bfa9c2b759 100644 >> --- a/test/box-py/bad_trigger.result >> +++ b/test/box-py/bad_trigger.result >> @@ -14,6 +14,7 @@ type(box.session.on_connect(f1)) >> - function >> ... >> greeting: True >> +Nothing to read yet: Resource temporarily unavailable >> fixheader: True >> error code 32 >> error message: [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value) >> diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py >> index 7d200b9218..4739dfe136 100644 >> --- a/test/box-py/bad_trigger.test.py >> +++ b/test/box-py/bad_trigger.test.py >> @@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection >> from lib.tarantool_connection import TarantoolConnection >> from tarantool import NetworkError >> from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \ >> - REQUEST_TYPE_ERROR >> + REQUEST_TYPE_ERROR, REQUEST_TYPE_PING >> import socket >> import msgpack >> >> @@ -26,9 +26,25 @@ s = conn.socket >> # Read greeting >> print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE >> >> -# Read error packet >> +# Check socket >> IPROTO_FIXHEADER_SIZE = 5 >> -fixheader = s.recv(IPROTO_FIXHEADER_SIZE) >> +s.setblocking(False) >> +fixheader = None >> +try: >> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE) >> +except socket.error as err: >> + print 'Nothing to read yet:', str(err).split(']')[1] >> +else: >> + print 'Received fixheader' >> +s.setblocking(True) >> + >> +# Send ping >> +query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING }) >> +s.send(msgpack.dumps(len(query)) + query) >> + >> +# Read error packet >> +if not fixheader: >> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE) >> print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE >> unpacker.feed(fixheader) >> packet_len = unpacker.unpack() >> -- Ilya Kosarev [-- Attachment #2: Type: text/html, Size: 31113 bytes --] ^ permalink raw reply [flat|nested] 3+ messages in thread
end of thread, other threads:[~2020-09-25 22:57 UTC | newest] Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2020-09-18 13:09 [Tarantool-patches] [PATCH v2] iproto: make iproto thread more independent from tx Ilya Kosarev 2020-09-21 15:51 ` Leonid Vasiliev 2020-09-25 22:57 ` Ilya Kosarev
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox