Hi!   Thanks for the review.   4 answers below. Fixed all your comments in v3, pushed & sent to the next reviewer.   >Понедельник, 21 сентября 2020, 18:52 +03:00 от Leonid Vasiliev : >  >Hi! Thank you for the patch. > >LGTM. > >All comments below are at your discretion: > >On 9/18/20 4:09 PM, Ilya Kosarev wrote: >> On connection, an evio service callback is invoked to accept it. The >> next step after acception was to process connection to tx thread >> through cbus. This meant that any connection interaction involves >> tx thread even before we get to decode what does the client want >> from us. Consequently, a number of problems appears. The main one >> is that we might get descriptor leak in case of unresponsive tx thread >> (for example, when building secondary index). There are some other >> cases where we might not want to spend precious tx time to process >> the connection in case iproto can do it all alone. >> This patch allows iproto to accept connection and send greeting by >> itself. The connection is initialized in tx thread when the real >> request comes through iproto_msg_decode(). In case request type was not >> recognized we can also send reply with an error without using tx. It is >> planned to add more iproto logic to prevent extra interaction with >> tx thread. This patch already to some extent solves descriptors leakage >> problem as far as connection establishes and stays in fetch_schema >> state while tx thread is unresponsive. >> The other user visible change is that on_connect triggers won't run on >> connections that don't provide any input, as reflected in >> bad_trigger.test.py. >> >> Part of #3776 >> >> @TarantoolBot document >> Title: iproto: on_connect triggers execution >> Update the documentation for on_connect triggers to reflect that they >> are now being executed only with the first request. Though the triggers >> are still the first thing to be executed on a new connection. While it >> is quite a synthetic case to establish a connection without making >> any requests it is technically possible and now your triggers won't be >> executed in this case. Some request is required to start their >> execution. >> --- >> Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto >> Issue: https://github.com/tarantool/tarantool/issues/3776 >> >> @ChangeLog: >> * Make iproto thread more independent from tx (gh-3776). > >- Changelog about changes visible to the user. I would suggest writing >about on_connect triggers here. on_disconnect triggers will also not >work if there was not one message has been sent, as I understand it. * Right, i thought it is going to be too long for the ChangeLog. But actually it should be here. > >> >> Changes in v2: >> - docbot request provided >> - ChangeLog provided >> - net_send_msg() used instead of net_send_error() where needed >> - error cases made clear in iproto_msg_decode() >> - replaced net_check_connection() with iproto_connection_fail() predicate >> - improved tx session initialization fail processing to avoid extra tries >> - fixed obuf_dup() fail processing in iproto_process_connect() >> - added some comments >> - some comments style fixed >> >> src/box/iproto.cc | 304 +++++++++++++++++++++----------- >> test/box-py/bad_trigger.result | 1 + >> test/box-py/bad_trigger.test.py | 22 ++- >> 3 files changed, 223 insertions(+), 104 deletions(-) >> >> diff --git a/src/box/iproto.cc b/src/box/iproto.cc >> index b8f65e5eca..67bbd357a3 100644 >> --- a/src/box/iproto.cc >> +++ b/src/box/iproto.cc >> @@ -245,7 +245,7 @@ iproto_msg_new(struct iproto_connection *con); >> static void >> iproto_resume(void); >> >> -static void >> +static int >> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, >> bool *stop_input); >> >> @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg) >> iproto_resume(); >> } >> >> +static inline void >> +iproto_connection_delete(struct iproto_connection *con); >> + >> /** >> * A single global queue for all requests in all connections. All >> * requests from all connections are processed concurrently. >> @@ -280,6 +283,11 @@ static struct cord net_cord; >> * in the tx thread. >> */ >> static struct slab_cache net_slabc; >> +/** >> + * Slab cache used for allocating memory for output network buffers >> + * in the iproto thread. >> + */ >> +static struct slab_cache iproto_slabc; >> >> struct rmean *rmean_net; >> >> @@ -298,6 +306,9 @@ const char *rmean_net_strings[IPROTO_LAST] = { >> "REQUESTS", >> }; >> >> +static int >> +tx_init_connect(struct iproto_msg *msg); >> + >> static void >> tx_process_destroy(struct cmsg *m); >> >> @@ -463,6 +474,7 @@ struct iproto_connection >> struct ev_io output; >> /** Logical session. */ >> struct session *session; >> + bool init_failed; >> ev_loop *loop; >> /** >> * Pre-allocated disconnect msg. Is sent right after >> @@ -650,7 +662,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con) >> * other parts of the connection. >> */ >> con->state = IPROTO_CONNECTION_DESTROYED; >> - cpipe_push(&tx_pipe, &con->destroy_msg); >> + if (con->session != NULL) >> + cpipe_push(&tx_pipe, &con->destroy_msg); >> + else { >> + /* >> + * In case session was not created we can safely destroy >> + * not involving tx thread. Thus we also need to destroy >> + * obuf, which still belongs to iproto thread. >> + */ >> + obuf_destroy(&con->obuf[0]); >> + obuf_destroy(&con->obuf[1]); >> + iproto_connection_delete(con); >> + } >> } >> >> /** >> @@ -677,9 +700,18 @@ iproto_connection_close(struct iproto_connection *con) >> * is done only once. >> */ >> con->p_ibuf->wpos -= con->parse_size; >> - cpipe_push(&tx_pipe, &con->disconnect_msg); >> assert(con->state == IPROTO_CONNECTION_ALIVE); >> con->state = IPROTO_CONNECTION_CLOSED; >> + rlist_del(&con->in_stop_list); >> + if (con->session != NULL) >> + cpipe_push(&tx_pipe, &con->disconnect_msg); >> + else >> + /* >> + * In case session was not created we can safely >> + * try to start destroy not involving tx thread. >> + */ >> + iproto_connection_try_to_start_destroy(con); * Added brackets to make this if statement more clear. >> + return; >> } else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) { >> iproto_connection_try_to_start_destroy(con); >> } else { >> @@ -809,6 +841,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) >> assert(rlist_empty(&con->in_stop_list)); >> int n_requests = 0; >> bool stop_input = false; >> + bool obuf_in_iproto = (con->session == NULL); >> const char *errmsg; >> while (con->parse_size != 0 && !stop_input) { >> if (iproto_check_msg_max()) { >> @@ -853,12 +886,20 @@ err_msgpack: >> >> msg->len = reqend - reqstart; /* total request length */ >> >> - iproto_msg_decode(msg, &pos, reqend, &stop_input); >> - /* >> - * This can't throw, but should not be >> - * done in case of exception. >> - */ >> - cpipe_push_input(&tx_pipe, &msg->base); >> + if (iproto_msg_decode(msg, &pos, reqend, &stop_input) != -1) { >> + if (obuf_in_iproto) { >> + /* >> + * If session was not created yet and obuf is >> + * still in iproto we need to destroy it. New >> + * one will be created in tx thread if needed. >> + */ >> + obuf_destroy(&con->obuf[0]); >> + obuf_destroy(&con->obuf[1]); >> + obuf_in_iproto = false; >> + } >> + cpipe_push_input(&tx_pipe, &msg->base); >> + } >> + >> n_requests++; >> /* Request is parsed */ >> assert(reqend > reqstart); >> @@ -1105,8 +1146,8 @@ iproto_connection_new(int fd) >> ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE); >> ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead); >> ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead); >> - obuf_create(&con->obuf[0], &net_slabc, iproto_readahead); >> - obuf_create(&con->obuf[1], &net_slabc, iproto_readahead); >> + obuf_create(&con->obuf[0], &iproto_slabc, iproto_readahead); >> + obuf_create(&con->obuf[1], &iproto_slabc, iproto_readahead); >> con->p_ibuf = &con->ibuf[0]; >> con->tx.p_obuf = &con->obuf[0]; >> iproto_wpos_create(&con->wpos, con->tx.p_obuf); >> @@ -1114,6 +1155,7 @@ iproto_connection_new(int fd) >> con->parse_size = 0; >> con->long_poll_count = 0; >> con->session = NULL; >> + con->init_failed = false; >> rlist_create(&con->in_stop_list); >> /* It may be very awkward to allocate at close. */ >> cmsg_init(&con->destroy_msg, destroy_route); >> @@ -1134,10 +1176,6 @@ iproto_connection_delete(struct iproto_connection *con) >> assert(!evio_has_fd(&con->input)); >> assert(con->session == NULL); >> assert(con->state == IPROTO_CONNECTION_DESTROYED); >> - /* >> - * The output buffers must have been deleted >> - * in tx thread. >> - */ >> ibuf_destroy(&con->ibuf[0]); >> ibuf_destroy(&con->ibuf[1]); >> assert(con->obuf[0].pos == 0 && >> @@ -1172,6 +1210,9 @@ tx_reply_error(struct iproto_msg *msg); >> static void >> tx_reply_iproto_error(struct cmsg *m); >> >> +static void >> +tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos); >> + >> static void >> net_send_msg(struct cmsg *msg); >> >> @@ -1244,7 +1285,7 @@ static const struct cmsg_hop error_route[] = { >> { net_send_error, NULL }, >> }; >> >> -static void >> +static int >> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, >> bool *stop_input) >> { >> @@ -1314,13 +1355,27 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, >> (uint32_t) type); > >- You have three possible return values. Maybe add a function >description? * Right, done. > >> goto error; >> } >> - return; >> + return 0; >> error: >> /** Log and send the error. */ >> diag_log(); >> diag_create(&msg->diag); >> diag_move(&fiber()->diag, &msg->diag); >> - cmsg_init(&msg->base, error_route); >> + if (msg->connection->session != NULL) { >> + cmsg_init(&msg->base, error_route); >> + return 1; >> + } >> + /* >> + * In case session was not created we can process error path >> + * without tx thread. >> + */ >> + tx_accept_wpos(msg->connection, &msg->wpos); >> + struct obuf *out = msg->connection->tx.p_obuf; >> + iproto_reply_error(out, diag_last_error(&msg->diag), >> + msg->header.sync, ::schema_version); >> + iproto_wpos_create(&msg->wpos, out); >> + net_send_msg(&(msg->base)); >> + return -1; >> } > >- I meant something like: >/** Log and send the error. */ >diag_log(); >if (msg->connection->session != NULL) { >diag_create(&msg->diag); >diag_move(&fiber()->diag, &msg->diag); >cmsg_init(&msg->base, error_route); >return 1; >} >/* >* In case session was not created we can process error path >* without tx thread. >*/ >tx_accept_wpos(msg->connection, &msg->wpos); >struct obuf *out = msg->connection->tx.p_obuf; >iproto_reply_error(out, diag_last_error(&fiber()->diag), >msg->header.sync, ::schema_version); >iproto_wpos_create(&msg->wpos, out); >net_send_msg(&(msg->base)); >return -1; * Right, i see, i made it overcomplicated. We can also replace this block: struct obuf *out = msg->connection->tx.p_obuf; iproto_reply_error(out, diag_last_error(&fiber()->diag),                    msg->header.sync, ::schema_version); iproto_wpos_create(&msg->wpos, out) ;           with an existent function. > >> >> static void >> @@ -1382,10 +1437,6 @@ tx_process_destroy(struct cmsg *m) >> session_destroy(con->session); >> con->session = NULL; /* safety */ >> } >> - /* >> - * Got to be done in iproto thread since >> - * that's where the memory is allocated. >> - */ >> obuf_destroy(&con->obuf[0]); >> obuf_destroy(&con->obuf[1]); >> } >> @@ -1478,13 +1529,27 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos) >> } >> } >> >> -static inline struct iproto_msg * >> -tx_accept_msg(struct cmsg *m) >> +static inline int >> +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg) >> { >> - struct iproto_msg *msg = (struct iproto_msg *) m; >> - tx_accept_wpos(msg->connection, &msg->wpos); >> - tx_fiber_init(msg->connection->session, msg->header.sync); >> - return msg; >> + *msg = (struct iproto_msg *) m; >> + /* >> + * In case connection init failed we don't need to try anymore. >> + */ >> + if ((*msg)->connection->init_failed) >> + return -1; >> + /* >> + * In case session was not created we need to init connection in tx and >> + * create it here. >> + */ >> + if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) { >> + (*msg)->connection->init_failed = true; >> + (*msg)->close_connection = true; >> + return -1; >> + } >> + tx_accept_wpos((*msg)->connection, &(*msg)->wpos); >> + tx_fiber_init((*msg)->connection->session, (*msg)->header.sync); >> + return 0; >> } >> >> /** >> @@ -1507,7 +1572,14 @@ tx_reply_error(struct iproto_msg *msg) >> static void >> tx_reply_iproto_error(struct cmsg *m) >> { >> - struct iproto_msg *msg = tx_accept_msg(m); >> + struct iproto_msg *msg; >> + /* >> + * We don't need to check tx_accept_msg() return value here >> + * as far as if we might only process iproto error in tx >> + * in case connection session is already created and >> + * thus tx_accept_msg() can't fail. >> + */ >> + tx_accept_msg(m, &msg); >> struct obuf *out = msg->connection->tx.p_obuf; >> iproto_reply_error(out, diag_last_error(&msg->diag), >> msg->header.sync, ::schema_version); >> @@ -1527,7 +1599,9 @@ tx_inject_delay(void) >> static void >> tx_process1(struct cmsg *m) >> { >> - struct iproto_msg *msg = tx_accept_msg(m); >> + struct iproto_msg *msg; >> + if (tx_accept_msg(m, &msg) != 0) >> + goto error; >> if (tx_check_schema(msg->header.schema_version)) >> goto error; >> >> @@ -1553,17 +1627,20 @@ error: >> static void >> tx_process_select(struct cmsg *m) >> { >> - struct iproto_msg *msg = tx_accept_msg(m); >> + struct iproto_msg *msg; >> + struct request *req; >> struct obuf *out; >> struct obuf_svp svp; >> struct port port; >> int count; >> int rc; >> - struct request *req = &msg->dml; >> + if (tx_accept_msg(m, &msg) != 0) >> + goto error; >> if (tx_check_schema(msg->header.schema_version)) >> goto error; >> >> tx_inject_delay(); >> + req = &msg->dml; >> rc = box_select(req->space_id, req->index_id, >> req->iterator, req->offset, req->limit, >> req->key, req->key_end, &port); >> @@ -1607,7 +1684,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event) >> static void >> tx_process_call(struct cmsg *m) >> { >> - struct iproto_msg *msg = tx_accept_msg(m); >> + struct iproto_msg *msg; >> + if (tx_accept_msg(m, &msg) != 0) >> + goto error; >> if (tx_check_schema(msg->header.schema_version)) >> goto error; >> >> @@ -1686,13 +1765,15 @@ error: >> static void >> tx_process_misc(struct cmsg *m) >> { >> - struct iproto_msg *msg = tx_accept_msg(m); >> - struct iproto_connection *con = msg->connection; >> - struct obuf *out = con->tx.p_obuf; >> + struct iproto_msg *msg; >> + if (tx_accept_msg(m, &msg) != 0) >> + goto error; >> if (tx_check_schema(msg->header.schema_version)) >> goto error; >> >> try { >> + struct iproto_connection *con = msg->connection; >> + struct obuf *out = con->tx.p_obuf; >> struct ballot ballot; >> switch (msg->header.type) { >> case IPROTO_AUTH: >> @@ -1729,7 +1810,7 @@ error: >> static void >> tx_process_sql(struct cmsg *m) >> { >> - struct iproto_msg *msg = tx_accept_msg(m); >> + struct iproto_msg *msg; >> struct obuf *out; >> struct port port; >> struct sql_bind *bind = NULL; >> @@ -1738,6 +1819,8 @@ tx_process_sql(struct cmsg *m) >> uint32_t len; >> bool is_unprepare = false; >> >> + if (tx_accept_msg(m, &msg) != 0) >> + goto error; >> if (tx_check_schema(msg->header.schema_version)) >> goto error; >> assert(msg->header.type == IPROTO_EXECUTE || >> @@ -1825,7 +1908,11 @@ error: >> static void >> tx_process_replication(struct cmsg *m) >> { >> - struct iproto_msg *msg = tx_accept_msg(m); >> + struct iproto_msg *msg; >> + if (tx_accept_msg(m, &msg) != 0) { >> + tx_reply_error(msg); >> + return; >> + } >> struct iproto_connection *con = msg->connection; >> struct ev_io io; >> coio_create(&io, con->input.fd); >> @@ -1865,6 +1952,29 @@ tx_process_replication(struct cmsg *m) >> } >> } >> >> +/** >> + * Check connection health and try to send an error to the client >> + * in case of internal connection init or on_connect trigger failure. >> + */ >> +static bool >> +iproto_connection_fail(struct iproto_msg *msg) >> +{ >> + if (!msg->close_connection) >> + return false; >> + struct iproto_connection *con = msg->connection; >> + int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov, >> + obuf_iovcnt(msg->wpos.obuf)); >> + if (nwr > 0) { >> + /* Count statistics. */ >> + rmean_collect(rmean_net, IPROTO_SENT, nwr); >> + } else if (nwr < 0 && ! sio_wouldblock(errno)) { >> + diag_log(); >> + } >> + iproto_connection_close(con); >> + iproto_msg_delete(msg); >> + return true; >> +} >> + >> static void >> net_send_msg(struct cmsg *m) >> { >> @@ -1881,6 +1991,9 @@ net_send_msg(struct cmsg *m) >> } >> con->wend = msg->wpos; >> >> + if (iproto_connection_fail(msg)) >> + return; >> + >> if (evio_has_fd(&con->output)) { >> if (! ev_is_active(&con->output)) >> ev_feed_event(con->loop, &con->output, EV_WRITE); >> @@ -1910,6 +2023,10 @@ net_end_join(struct cmsg *m) >> struct iproto_connection *con = msg->connection; >> >> msg->p_ibuf->rpos += msg->len; >> + >> + if (iproto_connection_fail(msg)) >> + return; >> + >> iproto_msg_delete(msg); >> >> assert(! ev_is_active(&con->input)); >> @@ -1928,6 +2045,10 @@ net_end_subscribe(struct cmsg *m) >> struct iproto_connection *con = msg->connection; >> >> msg->p_ibuf->rpos += msg->len; >> + >> + if (iproto_connection_fail(msg)) >> + return; >> + >> iproto_msg_delete(msg); >> >> assert(! ev_is_active(&con->input)); >> @@ -1936,81 +2057,60 @@ net_end_subscribe(struct cmsg *m) >> } >> >> /** >> - * Handshake a connection: invoke the on-connect trigger >> - * and possibly authenticate. Try to send the client an error >> - * upon a failure. >> + * Handshake a connection: prepare greeting for it. >> */ >> static void >> -tx_process_connect(struct cmsg *m) >> +iproto_process_connect(struct iproto_msg *msg) >> { >> - struct iproto_msg *msg = (struct iproto_msg *) m; >> struct iproto_connection *con = msg->connection; >> struct obuf *out = msg->connection->tx.p_obuf; >> - try { /* connect. */ >> - con->session = session_create(SESSION_TYPE_BINARY); >> - if (con->session == NULL) >> - diag_raise(); >> - con->session->meta.connection = con; >> - tx_fiber_init(con->session, 0); >> - char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE); >> - /* TODO: dirty read from tx thread */ >> - struct tt_uuid uuid = INSTANCE_UUID; >> - random_bytes(con->salt, IPROTO_SALT_SIZE); >> - greeting_encode(greeting, tarantool_version_id(), &uuid, >> - con->salt, IPROTO_SALT_SIZE); >> - obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE); >> - if (! rlist_empty(&session_on_connect)) { >> - if (session_run_on_connect_triggers(con->session) != 0) >> - diag_raise(); >> - } >> + char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE); >> + /* TODO: dirty read from tx thread */ >> + struct tt_uuid uuid = INSTANCE_UUID; >> + random_bytes(con->salt, IPROTO_SALT_SIZE); >> + greeting_encode(greeting, tarantool_version_id(), &uuid, >> + con->salt, IPROTO_SALT_SIZE); >> + if (obuf_dup(out, greeting, IPROTO_GREETING_SIZE) >> + != IPROTO_GREETING_SIZE) { >> + diag_set(OutOfMemory, IPROTO_GREETING_SIZE, >> + "greeting obuf", "dup"); >> + iproto_reply_error(out, diag_last_error(&fiber()->diag), >> + msg->header.sync, ::schema_version); >> iproto_wpos_create(&msg->wpos, out); >> - } catch (Exception *e) { >> - tx_reply_error(msg); >> msg->close_connection = true; >> - } >> -} >> - >> -/** >> - * Send a response to connect to the client or close the >> - * connection in case on_connect trigger failed. >> - */ >> -static void >> -net_send_greeting(struct cmsg *m) >> -{ >> - struct iproto_msg *msg = (struct iproto_msg *) m; >> - struct iproto_connection *con = msg->connection; >> - if (msg->close_connection) { >> - struct obuf *out = msg->wpos.obuf; >> - int64_t nwr = sio_writev(con->output.fd, out->iov, >> - obuf_iovcnt(out)); >> - >> - if (nwr > 0) { >> - /* Count statistics. */ >> - rmean_collect(rmean_net, IPROTO_SENT, nwr); >> - } else if (nwr < 0 && ! sio_wouldblock(errno)) { >> - diag_log(); >> - } >> - assert(iproto_connection_is_idle(con)); >> - iproto_connection_close(con); >> - iproto_msg_delete(msg); >> + iproto_connection_fail(msg); >> return; >> } >> + iproto_wpos_create(&msg->wpos, out); >> con->wend = msg->wpos; >> - /* >> - * Connect is synchronous, so no one could have been >> - * messing up with the connection while it was in >> - * progress. >> - */ >> assert(evio_has_fd(&con->output)); >> - /* Handshake OK, start reading input. */ >> ev_feed_event(con->loop, &con->output, EV_WRITE); >> iproto_msg_delete(msg); >> } >> >> -static const struct cmsg_hop connect_route[] = { >> - { tx_process_connect, &net_pipe }, >> - { net_send_greeting, NULL }, >> -}; >> +static int >> +tx_init_connect(struct iproto_msg *msg) >> +{ >> + struct iproto_connection *con = msg->connection; >> + obuf_create(&con->obuf[0], &net_slabc, iproto_readahead); >> + obuf_create(&con->obuf[1], &net_slabc, iproto_readahead); >> + con->tx.p_obuf = &con->obuf[0]; >> + iproto_wpos_create(&con->wpos, con->tx.p_obuf); >> + iproto_wpos_create(&con->wend, con->tx.p_obuf); >> + >> + con->session = session_create(SESSION_TYPE_BINARY); >> + if (con->session == NULL) >> + return -1; >> + con->session->meta.connection = con; >> + >> + tx_fiber_init(con->session, 0); >> + if (! rlist_empty(&session_on_connect)) { >> + if (session_run_on_connect_triggers(con->session) != 0) >> + return -1; >> + } >> + >> + return 0; >> +} >> >> /** }}} */ >> >> @@ -2037,11 +2137,10 @@ iproto_on_accept(struct evio_service * /* service */, int fd, >> mempool_free(&iproto_connection_pool, con); >> return -1; >> } >> - cmsg_init(&msg->base, connect_route); >> msg->p_ibuf = con->p_ibuf; >> msg->wpos = con->wpos; >> msg->close_connection = false; >> - cpipe_push(&tx_pipe, &msg->base); >> + iproto_process_connect(msg); >> return 0; >> } >> >> @@ -2054,6 +2153,8 @@ static struct evio_service binary; /* iproto binary listener */ >> static int >> net_cord_f(va_list /* ap */) >> { >> + slab_cache_create(&iproto_slabc, &runtime); >> + >> mempool_create(&iproto_msg_pool, &cord()->slabc, >> sizeof(struct iproto_msg)); >> mempool_create(&iproto_connection_pool, &cord()->slabc, >> @@ -2297,7 +2398,8 @@ iproto_listen(const char *uri) >> size_t >> iproto_mem_used(void) >> { >> - return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc); >> + return slab_cache_used(&net_cord.slabc) >> + + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc); >> } >> >> size_t >> diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result >> index 5d064b7648..bfa9c2b759 100644 >> --- a/test/box-py/bad_trigger.result >> +++ b/test/box-py/bad_trigger.result >> @@ -14,6 +14,7 @@ type(box.session.on_connect(f1)) >> - function >> ... >> greeting: True >> +Nothing to read yet: Resource temporarily unavailable >> fixheader: True >> error code 32 >> error message: [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value) >> diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py >> index 7d200b9218..4739dfe136 100644 >> --- a/test/box-py/bad_trigger.test.py >> +++ b/test/box-py/bad_trigger.test.py >> @@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection >> from lib.tarantool_connection import TarantoolConnection >> from tarantool import NetworkError >> from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \ >> - REQUEST_TYPE_ERROR >> + REQUEST_TYPE_ERROR, REQUEST_TYPE_PING >> import socket >> import msgpack >> >> @@ -26,9 +26,25 @@ s = conn.socket >> # Read greeting >> print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE >> >> -# Read error packet >> +# Check socket >> IPROTO_FIXHEADER_SIZE = 5 >> -fixheader = s.recv(IPROTO_FIXHEADER_SIZE) >> +s.setblocking(False) >> +fixheader = None >> +try: >> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE) >> +except socket.error as err: >> + print 'Nothing to read yet:', str(err).split(']')[1] >> +else: >> + print 'Received fixheader' >> +s.setblocking(True) >> + >> +# Send ping >> +query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING }) >> +s.send(msgpack.dumps(len(query)) + query) >> + >> +# Read error packet >> +if not fixheader: >> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE) >> print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE >> unpacker.feed(fixheader) >> packet_len = unpacker.unpack() >>     -- Ilya Kosarev