<HTML><BODY><div class="js-helper js-readmsg-msg"><style type="text/css"></style><div><div id="style_16003750360967136677_BODY"><div class="cl_393548"><div><div>Hi,</div><div> </div><div>Thanks for your review!</div><div> </div><div>See 12 answers below. Sent v2 of the patch considering your comments.</div><blockquote style="border-left:1px solid #0857A6;margin:10px;padding:0 0 0 10px;">Понедельник, 14 сентября 2020, 19:17 +03:00 от Leonid Vasiliev <<a href="/compose?To=lvasiliev@tarantool.org">lvasiliev@tarantool.org</a>>:<br> <div id=""><div class="js-helper_mr_css_attr js-readmsg-msg_mr_css_attr"><style type="text/css"></style><div><div id="style_16001002780063496734_BODY_mr_css_attr"><br>Hi! Thank you for the patch.<br>Look some comments/questions below:<br><br>- Please add @ChangeLog</div></div></div></div></blockquote></div><ol><li>Right, will be fixed in v2.</li></ol><div><blockquote style="border-left:1px solid #0857A6;margin:10px;padding:0 0 0 10px;"><div><div class="js-helper_mr_css_attr js-readmsg-msg_mr_css_attr"><div><div>- Look like "@TarantoolBot document" needs to be added to update<br><a href="https://www.tarantool.io/en/doc/2.5/reference/reference_lua/net_box/#net-box-on-connect" rel="noopener noreferrer" target="_blank">https://www.tarantool.io/en/doc/2.5/reference/reference_lua/net_box/#net-box-on-connect</a></div></div></div></div></blockquote></div><ol start="2"><li>Right, doc bot request will be included in v2 of the patch.</li></ol><div><blockquote style="border-left:1px solid #0857A6;margin:10px;padding:0 0 0 10px;"><div><div class="js-helper_mr_css_attr js-readmsg-msg_mr_css_attr"><div><div><br>- How do you check the problem is gone? Any stress test?</div></div></div></div></blockquote></div><ol start="3"><li>Yes, here is the test i adapted from Yaroslav’s version:<br><a href="https://github.com/tarantool/tarantool/issues/3776#issuecomment-617736729" rel="noopener noreferrer" target="_blank">https://github.com/tarantool/tarantool/issues/3776#issuecomment-617736729</a><br>I didn’t find the way to implement it under test-run.</li></ol><div><blockquote style="border-left:1px solid #0857A6;margin:10px;padding:0 0 0 10px;"><div><div class="js-helper_mr_css_attr js-readmsg-msg_mr_css_attr"><div><div>On 14.08.2020 13:47, Ilya Kosarev wrote:<br>> On connection, an evio service callback is invoked to accept it. The<br>> next step after acception was to process connection to tx thread<br>> through cbus. This meant that any connection interaction involves<br>> tx thread even before we get to decode what does the client want<br>> from us. Consequently, a number of problems appears. The main one<br>> is that we might get descriptor leak in case of unresponsive tx thread<br>> (for example, when building secondary index). There are some other<br>> cases where we might not want to spend precious tx time to process<br>> the connection in case iproto can do it all alone.<br>> This patch allows iproto to accept connection and send greeting by<br>> itself. The connection is initialized in tx thread when the real<br>> request comes through iproto_msg_decode(). In case request type was not<br>> recognized we can also send reply with an error without using tx. It is<br>> planned to add more iproto logic to prevent extra interaction with<br>> tx thread. This patch already to some extent solves descriptors leakage<br>> problem as far as connection establishes and stays in fetch_schema<br>> state while tx thread is unresponsive.<br>> The other user visible change is that on_connect triggers won't run on<br>> connections that don't provide any input, as reflected in<br>> bad_trigger.test.py.<br><br>- Is it ok from exploitation point of view? Did you have a conversation<br>with Mons?</div></div></div></div></blockquote></div><ol start="4"><li>Well, it was mentioned in conversation with Mons. I think i will clarify<br>it once more as soon as possible. Though i am quite sure it is fine as far<br>as any real application provides some input immediately on connection<br>thus сausing trigger execution.</li></ol><div><blockquote style="border-left:1px solid #0857A6;margin:10px;padding:0 0 0 10px;"><div><div class="js-helper_mr_css_attr js-readmsg-msg_mr_css_attr"><div><div><br>><br>> Part of #3776<br>> ---<br>> Branch: <a href="https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto" rel="noopener noreferrer" target="_blank">https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto</a><br>> Issue: <a href="https://github.com/tarantool/tarantool/issues/3776" rel="noopener noreferrer" target="_blank">https://github.com/tarantool/tarantool/issues/3776</a><br>><br>> src/box/iproto.cc | 287 ++++++++++++++++++++------------<br>> test/box-py/bad_trigger.result | 1 +<br>> test/box-py/bad_trigger.test.py | 22 ++-<br>> 3 files changed, 202 insertions(+), 108 deletions(-)<br>><br>> diff --git a/src/box/iproto.cc b/src/box/iproto.cc<br>> index b8f65e5eca..a027d15c1d 100644<br>> --- a/src/box/iproto.cc<br>> +++ b/src/box/iproto.cc<br>- "on_connect trigger must be processed before any other request on this<br>connection." (from comment) - It looks like it's worth clarifying this<br>guarantee.</div></div></div></div></blockquote></div><ol start="5"><li>Right. This guarantee is kept. Triggers are being processed in<div><div>tx_init_connect() from tx_accept_msg() before any other actions.</div></div></li></ol><div><blockquote style="border-left:1px solid #0857A6;margin:10px;padding:0 0 0 10px;"><div><div class="js-helper_mr_css_attr js-readmsg-msg_mr_css_attr"><div><div><br>> @@ -245,7 +245,7 @@ iproto_msg_new(struct iproto_connection *con);<br>> static void<br>> iproto_resume(void);<br>><br>> -static void<br>> +static int<br>> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,<br>> bool *stop_input);<br>><br>> @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)<br>> iproto_resume();<br>> }<br>><br>> +static inline void<br>> +iproto_connection_delete(struct iproto_connection *con);<br>> +<br>> /**<br>> * A single global queue for all requests in all connections. All<br>> * requests from all connections are processed concurrently.<br>> @@ -280,6 +283,11 @@ static struct cord net_cord;<br>> * in the tx thread.<br>> */<br>> static struct slab_cache net_slabc;<br>> +/**<br>> + * Slab cache used for allocating memory for output network buffers<br>> + * in the iproto thread.<br>> + */<br>> +static struct slab_cache iproto_slabc;<br>><br>> struct rmean *rmean_net;<br>><br>> @@ -298,6 +306,9 @@ const char *rmean_net_strings[IPROTO_LAST] = {<br>> "REQUESTS",<br>> };<br>><br>> +static int<br>> +tx_init_connect(struct iproto_msg *msg);<br>> +<br>> static void<br>> tx_process_destroy(struct cmsg *m);<br>><br>> @@ -650,7 +661,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con)<br>> * other parts of the connection.<br>> */<br>> con->state = IPROTO_CONNECTION_DESTROYED;<br>> - cpipe_push(&tx_pipe, &con->destroy_msg);<br>> + if (con->session != NULL)<br>> + cpipe_push(&tx_pipe, &con->destroy_msg);<br>> + else {<br>> + /*<br>> + * In case session was not created we can safely destroy<br>> + * not involving tx thread. Thus we also need to destroy<br>> + * obuf, which still belongs to iproto thread.<br>> + */<br>> + obuf_destroy(&con->obuf[0]);<br>> + obuf_destroy(&con->obuf[1]);<br>> + iproto_connection_delete(con);<br>> + }<br>> }<br>><br>> /**<br>> @@ -677,9 +699,18 @@ iproto_connection_close(struct iproto_connection *con)<br>> * is done only once.<br>> */<br>> con->p_ibuf->wpos -= con->parse_size;<br>> - cpipe_push(&tx_pipe, &con->disconnect_msg);<br>> assert(con->state == IPROTO_CONNECTION_ALIVE);<br>> con->state = IPROTO_CONNECTION_CLOSED;<br>> + rlist_del(&con->in_stop_list);<br><br>- Is done below(rlist_del()). Or is there some kind of trick?</div></div></div></div></blockquote></div><ol start="6"><li>Well, this one is fine. rlist_del() was called in the end of<br>iproto_connection_close() as far the next disconnect actions<br>were done later anyway. Now we might call iproto_connection_try_to_start_destroy()<br>directly from this conditional branch. Thus we need rlist_del() to be done already.<br>This conditional branch returns at the end and the second rlist_del() is not called. In<br>other cases the second one is still used.</li></ol><div><blockquote style="border-left:1px solid #0857A6;margin:10px;padding:0 0 0 10px;"><div><div class="js-helper_mr_css_attr js-readmsg-msg_mr_css_attr"><div><div><br>> + if (con->session != NULL)<br>> + cpipe_push(&tx_pipe, &con->disconnect_msg);<br>> + else<br>> + /*<br>> + * In case session was not created we can safely<br>> + * try to start destroy not involving tx thread.<br>> + */<br>> + iproto_connection_try_to_start_destroy(con);<br>> + return;<br>> } else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {<br>> iproto_connection_try_to_start_destroy(con);<br>> } else {<br>> @@ -809,6 +840,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)<br>> assert(rlist_empty(&con->in_stop_list));<br>> int n_requests = 0;<br>> bool stop_input = false;<br>> + bool obuf_in_iproto = (con->session == NULL);<br>> const char *errmsg;<br>> while (con->parse_size != 0 && !stop_input) {<br>> if (iproto_check_msg_max()) {<br>> @@ -853,12 +885,20 @@ err_msgpack:<br>><br>> msg->len = reqend - reqstart; /* total request length */<br>><br>> - iproto_msg_decode(msg, &pos, reqend, &stop_input);<br>> - /*<br>> - * This can't throw, but should not be<br>> - * done in case of exception.<br>> - */<br>> - cpipe_push_input(&tx_pipe, &msg->base);<br>> + if (iproto_msg_decode(msg, &pos, reqend, &stop_input) == 0) {<br>> + if (obuf_in_iproto) {<br>> + /*<br>> + * If session was not created yet and obuf is<br>> + * still in iproto we need to destroy it. New<br>> + * one will be created in tx thread if needed.<br>> + */<br>> + obuf_destroy(&con->obuf[0]);<br>> + obuf_destroy(&con->obuf[1]);<br>> + obuf_in_iproto = false; > + }<br>> + cpipe_push_input(&tx_pipe, &msg->base);<br>> + }<br>> +<br>> n_requests++;<br>> /* Request is parsed */<br>> assert(reqend > reqstart);<br>> @@ -1105,8 +1145,8 @@ iproto_connection_new(int fd)<br>> ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);<br>> ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);<br>> ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);<br>> - obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);<br>> - obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);<br>> + obuf_create(&con->obuf[0], &iproto_slabc, iproto_readahead);<br>> + obuf_create(&con->obuf[1], &iproto_slabc, iproto_readahead);<br>> con->p_ibuf = &con->ibuf[0];<br>> con->tx.p_obuf = &con->obuf[0];<br>> iproto_wpos_create(&con->wpos, con->tx.p_obuf);<br>> @@ -1134,10 +1174,6 @@ iproto_connection_delete(struct iproto_connection *con)<br>> assert(!evio_has_fd(&con->input));<br>> assert(con->session == NULL);<br>> assert(con->state == IPROTO_CONNECTION_DESTROYED);<br>> - /*<br>> - * The output buffers must have been deleted<br>> - * in tx thread.<br>> - */<br>> ibuf_destroy(&con->ibuf[0]);<br>> ibuf_destroy(&con->ibuf[1]);<br>> assert(con->obuf[0].pos == 0 &&<br>> @@ -1172,6 +1208,9 @@ tx_reply_error(struct iproto_msg *msg);<br>> static void<br>> tx_reply_iproto_error(struct cmsg *m);<br>><br>> +static void<br>> +tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos);<br>> +<br>> static void<br>> net_send_msg(struct cmsg *msg);<br>><br>> @@ -1244,7 +1283,7 @@ static const struct cmsg_hop error_route[] = {<br>> { net_send_error, NULL },<br>> };<br>><br>> -static void<br>> +static int<br>> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,<br>> bool *stop_input)<br>> {<br>> @@ -1314,13 +1353,27 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,<br>> (uint32_t) type);<br>> goto error;<br>> }<br>> - return;<br>> + return 0;<br>> error:<br>> /** Log and send the error. */<br>> diag_log();<br>> diag_create(&msg->diag);<br>> diag_move(&fiber()->diag, &msg->diag);<br><br>- Unneeded in case session == NULL. Use net_send_msg() instead of<br>net_send_error().</div></div></div></div></blockquote></div><ol start="7"><li>Right, will be fixed in v2.</li></ol><div><blockquote style="border-left:1px solid #0857A6;margin:10px;padding:0 0 0 10px;"><div><div class="js-helper_mr_css_attr js-readmsg-msg_mr_css_attr"><div><div><br>> - cmsg_init(&msg->base, error_route);<br>> + if (msg->connection->session != NULL) {<br>> + cmsg_init(&msg->base, error_route);<br>> + return 0;<br><br>- May be return -1 (decode fail). What does the return value mean<br>otherwise?</div></div></div></div></blockquote></div><ol start="8"><li>Well, yes, i see, this is confusing. Basically -1 means we have nothing to do with<br>tx any more. I think it is better to introduce 2 error codes and return 1 here to separate<br>this case from the pure iproto case.</li></ol><div><blockquote style="border-left:1px solid #0857A6;margin:10px;padding:0 0 0 10px;"><div><div class="js-helper_mr_css_attr js-readmsg-msg_mr_css_attr"><div><div><br>> + }<br>> + /*<br>> + * In case session was not created we can process error path<br>> + * without tx thread.<br>> + */<br>> + tx_accept_wpos(msg->connection, &msg->wpos);<br>> + struct obuf *out = msg->connection->tx.p_obuf;<br>> + iproto_reply_error(out, diag_last_error(&msg->diag),<br>> + msg->header.sync, ::schema_version);<br>> + iproto_wpos_create(&msg->wpos, out);<br>> + net_send_error(&(msg->base));<br>> + return -1;<br>> }<br>><br>> static void<br>> @@ -1382,10 +1435,6 @@ tx_process_destroy(struct cmsg *m)<br>> session_destroy(con->session);<br>> con->session = NULL; /* safety */<br>> }<br>> - /*<br>> - * Got to be done in iproto thread since<br>> - * that's where the memory is allocated.<br>> - */<br>> obuf_destroy(&con->obuf[0]);<br>> obuf_destroy(&con->obuf[1]);<br>> }<br>> @@ -1478,13 +1527,21 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)<br>> }<br>> }<br>><br>> -static inline struct iproto_msg *<br>> -tx_accept_msg(struct cmsg *m)<br>> +static inline int<br>> +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg)<br>> {<br>> - struct iproto_msg *msg = (struct iproto_msg *) m;<br>> - tx_accept_wpos(msg->connection, &msg->wpos);<br>> - tx_fiber_init(msg->connection->session, msg->header.sync);<br>> - return msg;<br>> + *msg = (struct iproto_msg *) m;<br>> + /*<br>> + * In case session was not created we need to init connection in tx and<br>> + * create it here.<br>> + */<br>> + if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) {<br>> + (*msg)->close_connection = true;<br>> + return -1;<br>> + }<br>> + tx_accept_wpos((*msg)->connection, &(*msg)->wpos);<br>> + tx_fiber_init((*msg)->connection->session, (*msg)->header.sync);<br>> + return 0;<br>> }<br>><br>> /**<br>> @@ -1507,7 +1564,8 @@ tx_reply_error(struct iproto_msg *msg)<br>> static void<br>> tx_reply_iproto_error(struct cmsg *m)<br>> {<br>> - struct iproto_msg *msg = tx_accept_msg(m);<br>> + struct iproto_msg *msg;<br>> + tx_accept_msg(m, &msg);<br><br>- Please add a comment why the return value check is unnecessary.</div></div></div></div></blockquote></div><ol start="9"><li>Right, will be fixed in v2.</li></ol><div><blockquote style="border-left:1px solid #0857A6;margin:10px;padding:0 0 0 10px;"><div><div class="js-helper_mr_css_attr js-readmsg-msg_mr_css_attr"><div><div><br>> struct obuf *out = msg->connection->tx.p_obuf;<br>> iproto_reply_error(out, diag_last_error(&msg->diag),<br>> msg->header.sync, ::schema_version);<br>> @@ -1527,7 +1585,9 @@ tx_inject_delay(void)<br>> static void<br>> tx_process1(struct cmsg *m)<br>> {<br>> - struct iproto_msg *msg = tx_accept_msg(m);<br>> + struct iproto_msg *msg;<br>> + if (tx_accept_msg(m, &msg) != 0)<br>> + goto error;<br>> if (tx_check_schema(msg->header.schema_version))<br>> goto error;<br>><br>> @@ -1553,17 +1613,20 @@ error:<br>> static void<br>> tx_process_select(struct cmsg *m)<br>> {<br>> - struct iproto_msg *msg = tx_accept_msg(m);<br>> + struct iproto_msg *msg;<br>> + struct request *req;<br><br>- Unneeded change.</div></div></div></div></blockquote></div><ol start="10"><li>This change is needed as far as variables <br>have to be declared before goto.</li></ol><div><blockquote style="border-left:1px solid #0857A6;margin:10px;padding:0 0 0 10px;"><div><div class="js-helper_mr_css_attr js-readmsg-msg_mr_css_attr"><div><div><br>> struct obuf *out;<br>> struct obuf_svp svp;<br>> struct port port;<br>> int count;<br>> int rc;<br>> - struct request *req = &msg->dml;<br>> + if (tx_accept_msg(m, &msg) != 0)<br>> + goto error;<br>> if (tx_check_schema(msg->header.schema_version))<br>> goto error;<br>><br>> tx_inject_delay();<br>> + req = &msg->dml;<br>> rc = box_select(req->space_id, req->index_id,<br>> req->iterator, req->offset, req->limit,<br>> req->key, req->key_end, &port);<br>> @@ -1607,7 +1670,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event)<br>> static void<br>> tx_process_call(struct cmsg *m)<br>> {<br>> - struct iproto_msg *msg = tx_accept_msg(m);<br>> + struct iproto_msg *msg;<br>> + if (tx_accept_msg(m, &msg) != 0)<br>> + goto error;<br>> if (tx_check_schema(msg->header.schema_version))<br>> goto error;<br>><br>> @@ -1686,13 +1751,15 @@ error:<br>> static void<br>> tx_process_misc(struct cmsg *m)<br>> {<br>> - struct iproto_msg *msg = tx_accept_msg(m);<br>> - struct iproto_connection *con = msg->connection;<br>> - struct obuf *out = con->tx.p_obuf;<br>> + struct iproto_msg *msg;<br>> + if (tx_accept_msg(m, &msg) != 0)<br>> + goto error;<br>> if (tx_check_schema(msg->header.schema_version))<br>> goto error;<br>><br>> try {<br>> + struct iproto_connection *con = msg->connection;<br>> + struct obuf *out = con->tx.p_obuf;<br>> struct ballot ballot;<br>> switch (msg->header.type) {<br>> case IPROTO_AUTH:<br>> @@ -1729,7 +1796,7 @@ error:<br>> static void<br>> tx_process_sql(struct cmsg *m)<br>> {<br>> - struct iproto_msg *msg = tx_accept_msg(m);<br>> + struct iproto_msg *msg;<br>> struct obuf *out;<br>> struct port port;<br>> struct sql_bind *bind = NULL;<br>> @@ -1738,6 +1805,8 @@ tx_process_sql(struct cmsg *m)<br>> uint32_t len;<br>> bool is_unprepare = false;<br>><br>> + if (tx_accept_msg(m, &msg) != 0)<br>> + goto error;<br>> if (tx_check_schema(msg->header.schema_version))<br>> goto error;<br>> assert(msg->header.type == IPROTO_EXECUTE ||<br>> @@ -1825,7 +1894,11 @@ error:<br>> static void<br>> tx_process_replication(struct cmsg *m)<br>> {<br>> - struct iproto_msg *msg = tx_accept_msg(m);<br>> + struct iproto_msg *msg;<br>> + if (tx_accept_msg(m, &msg) != 0) {<br>> + tx_reply_error(msg);<br>> + return;<br>> + }<br>> struct iproto_connection *con = msg->connection;<br>> struct ev_io io;<br>> coio_create(&io, con->input.fd);<br>> @@ -1865,6 +1938,29 @@ tx_process_replication(struct cmsg *m)<br>> }<br>> }<br>><br>> +static int<br>> +net_check_connection(struct iproto_msg *msg)<br><br>- Seems like the "bool" type should be used for net_check_connection<br>(<a href="https://www.tarantool.io/en/doc/1.10/dev_guide/c_style_guide/#chapter-16-function-return-values-and-names" rel="noopener noreferrer" target="_blank">https://www.tarantool.io/en/doc/1.10/dev_guide/c_style_guide/#chapter-16-function-return-values-and-names</a>).<br>In the function, you not only check a connection status, but also close<br>it if necessary. I think it would be nice to add a description of the<br>function.</div></div></div></div></blockquote></div><ol start="11"><li>Right. I see, it is confusing now. Will be fixed in v2.</li></ol><div><blockquote style="border-left:1px solid #0857A6;margin:10px;padding:0 0 0 10px;"><div><div class="js-helper_mr_css_attr js-readmsg-msg_mr_css_attr"><div><div><br>> +{<br>> + if (!msg->close_connection)<br>> + return 0;<br>> +<br>> + struct iproto_connection *con = msg->connection;<br>> + struct obuf *out = msg->wpos.obuf;<br>> + int64_t nwr = sio_writev(con->output.fd, out->iov,<br>> + obuf_iovcnt(out));<br>> +<br>> + if (nwr > 0) {<br>> + /* Count statistics. */<br>> + rmean_collect(rmean_net, IPROTO_SENT, nwr);<br>> + } else if (nwr < 0 && ! sio_wouldblock(errno)) {<br>> + diag_log();<br>> + }<br>> + assert(iproto_connection_is_idle(con));<br>> + iproto_connection_close(con);<br>> + iproto_msg_delete(msg);<br>> + return -1;<br>> +}<br>> +<br>> static void<br>> net_send_msg(struct cmsg *m)<br>> {<br>> @@ -1881,6 +1977,9 @@ net_send_msg(struct cmsg *m)<br>> }<br>> con->wend = msg->wpos;<br>><br>> + if (net_check_connection(msg) != 0)<br>> + return;<br>> +<br>> if (evio_has_fd(&con->output)) {<br>> if (! ev_is_active(&con->output))<br>> ev_feed_event(con->loop, &con->output, EV_WRITE);<br>> @@ -1910,6 +2009,10 @@ net_end_join(struct cmsg *m)<br>> struct iproto_connection *con = msg->connection;<br>><br>> msg->p_ibuf->rpos += msg->len;<br>> +<br>> + if (net_check_connection(msg) != 0)<br>> + return;<br>> +<br>> iproto_msg_delete(msg);<br>><br>> assert(! ev_is_active(&con->input));<br>> @@ -1928,6 +2031,10 @@ net_end_subscribe(struct cmsg *m)<br>> struct iproto_connection *con = msg->connection;<br>><br>> msg->p_ibuf->rpos += msg->len;<br>> +<br>> + if (net_check_connection(msg) != 0)<br>> + return;<br>> +<br>> iproto_msg_delete(msg);<br>><br>> assert(! ev_is_active(&con->input));<br>> @@ -1936,81 +2043,49 @@ net_end_subscribe(struct cmsg *m)<br>> }<br>><br>> /**<br>> - * Handshake a connection: invoke the on-connect trigger<br>> - * and possibly authenticate. Try to send the client an error<br>> - * upon a failure.<br>> + * Handshake a connection: prepare greeting for it.<br>> */<br>> static void<br>> -tx_process_connect(struct cmsg *m)<br>> +iproto_process_connect(struct iproto_msg *msg)<br>> {<br>> - struct iproto_msg *msg = (struct iproto_msg *) m;<br>> struct iproto_connection *con = msg->connection;<br>> struct obuf *out = msg->connection->tx.p_obuf;<br>> - try { /* connect. */<br><br>- Why "try - catch" has been deleted?</div></div></div></div></blockquote></div><ol start="12"><li>Hmm, right, I forgot about the obuf_dup_xc() function. Other potential<br>throwers were moved. Fail on obuf_dup() is now processed correctly.</li></ol><div><blockquote style="border-left:1px solid #0857A6;margin:10px;padding:0 0 0 10px;"><div><div class="js-helper_mr_css_attr js-readmsg-msg_mr_css_attr"><div><div><br>> - con->session = session_create(SESSION_TYPE_BINARY);<br>> - if (con->session == NULL)<br>> - diag_raise();<br>> - con->session->meta.connection = con;<br>> - tx_fiber_init(con->session, 0);<br>> - char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);<br>> - /* TODO: dirty read from tx thread */<br>> - struct tt_uuid uuid = INSTANCE_UUID;<br>> - random_bytes(con->salt, IPROTO_SALT_SIZE);<br>> - greeting_encode(greeting, tarantool_version_id(), &uuid,<br>> - con->salt, IPROTO_SALT_SIZE);<br>> - obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);<br>> - if (! rlist_empty(&session_on_connect)) {<br>> - if (session_run_on_connect_triggers(con->session) != 0)<br>> - diag_raise();<br>> - }<br>> - iproto_wpos_create(&msg->wpos, out);<br>> - } catch (Exception *e) {<br>> - tx_reply_error(msg);<br>> - msg->close_connection = true;<br>> - }<br>> -}<br>> -<br>> -/**<br>> - * Send a response to connect to the client or close the<br>> - * connection in case on_connect trigger failed.<br>> - */<br>> -static void<br>> -net_send_greeting(struct cmsg *m)<br>> -{<br>> - struct iproto_msg *msg = (struct iproto_msg *) m;<br>> - struct iproto_connection *con = msg->connection;<br>> - if (msg->close_connection) {<br>> - struct obuf *out = msg->wpos.obuf;<br>> - int64_t nwr = sio_writev(con->output.fd, out->iov,<br>> - obuf_iovcnt(out));<br>> -<br>> - if (nwr > 0) {<br>> - /* Count statistics. */<br>> - rmean_collect(rmean_net, IPROTO_SENT, nwr);<br>> - } else if (nwr < 0 && ! sio_wouldblock(errno)) {<br>> - diag_log();<br>> - }<br>> - assert(iproto_connection_is_idle(con));<br>> - iproto_connection_close(con);<br>> - iproto_msg_delete(msg);<br>> - return;<br>> - }<br>> + char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);<br>> + struct tt_uuid uuid = INSTANCE_UUID;<br>> + random_bytes(con->salt, IPROTO_SALT_SIZE);<br>> + greeting_encode(greeting, tarantool_version_id(), &uuid,<br>> + con->salt, IPROTO_SALT_SIZE);<br>> + obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);<br>> + iproto_wpos_create(&msg->wpos, out);<br>> con->wend = msg->wpos;<br>> - /*<br>> - * Connect is synchronous, so no one could have been<br>> - * messing up with the connection while it was in<br>> - * progress.<br>> - */<br>> assert(evio_has_fd(&con->output));<br>> - /* Handshake OK, start reading input. */<br>> ev_feed_event(con->loop, &con->output, EV_WRITE);<br>> iproto_msg_delete(msg);<br>> }<br>><br>> -static const struct cmsg_hop connect_route[] = {<br>> - { tx_process_connect, &net_pipe },<br>> - { net_send_greeting, NULL },<br>> -};<br>> +static int<br>> +tx_init_connect(struct iproto_msg *msg)<br>> +{<br>> + struct iproto_connection *con = msg->connection;<br>> + obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);<br>> + obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);<br>> + con->tx.p_obuf = &con->obuf[0];<br>> + iproto_wpos_create(&con->wpos, con->tx.p_obuf);<br>> + iproto_wpos_create(&con->wend, con->tx.p_obuf);<br>> +<br>> + con->session = session_create(SESSION_TYPE_BINARY);<br>> + if (con->session == NULL)<br>> + return -1;<br>> + con->session->meta.connection = con;<br>> +<br>> + tx_fiber_init(con->session, 0);<br>> + if (! rlist_empty(&session_on_connect)) {<br>> + if (session_run_on_connect_triggers(con->session) != 0)<br>> + return -1;<br>> + }<br>> +<br>> + return 0;<br>> +}<br>><br>> /** }}} */<br>><br>> @@ -2037,11 +2112,10 @@ iproto_on_accept(struct evio_service * /* service */, int fd,<br>> mempool_free(&iproto_connection_pool, con);<br>> return -1;<br>> }<br>> - cmsg_init(&msg->base, connect_route);<br>> msg->p_ibuf = con->p_ibuf;<br>> msg->wpos = con->wpos;<br>> msg->close_connection = false;<br>> - cpipe_push(&tx_pipe, &msg->base);<br>> + iproto_process_connect(msg);<br>> return 0;<br>> }<br>><br>> @@ -2054,6 +2128,8 @@ static struct evio_service binary; /* iproto binary listener */<br>> static int<br>> net_cord_f(va_list /* ap */)<br>> {<br>> + slab_cache_create(&iproto_slabc, &runtime);<br>> +<br>> mempool_create(&iproto_msg_pool, &cord()->slabc,<br>> sizeof(struct iproto_msg));<br>> mempool_create(&iproto_connection_pool, &cord()->slabc,<br>> @@ -2297,7 +2373,8 @@ iproto_listen(const char *uri)<br>> size_t<br>> iproto_mem_used(void)<br>> {<br>> - return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc);<br>> + return slab_cache_used(&net_cord.slabc)<br>> + + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc);<br>> }<br>><br>> size_t<br>> diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result<br>> index 5d064b7648..bfa9c2b759 100644<br>> --- a/test/box-py/bad_trigger.result<br>> +++ b/test/box-py/bad_trigger.result<br>> @@ -14,6 +14,7 @@ type(box.session.on_connect(f1))<br>> - function<br>> ...<br>> greeting: True<br>> +Nothing to read yet: Resource temporarily unavailable<br>> fixheader: True<br>> error code 32<br>> error message: [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value)<br>> diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py<br>> index 7d200b9218..7f45f5e713 100644<br>> --- a/test/box-py/bad_trigger.test.py<br>> +++ b/test/box-py/bad_trigger.test.py<br>> @@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection<br>> from lib.tarantool_connection import TarantoolConnection<br>> from tarantool import NetworkError<br>> from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \<br>> - REQUEST_TYPE_ERROR<br>> + REQUEST_TYPE_ERROR, REQUEST_TYPE_PING<br>> import socket<br>> import msgpack<br>><br>> @@ -26,9 +26,25 @@ s = conn.socket<br>> # Read greeting<br>> print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE<br>><br>> -# Read error packet<br>> +# Check soscket<br><br>- Typo "socket"<br>> IPROTO_FIXHEADER_SIZE = 5<br>> -fixheader = s.recv(IPROTO_FIXHEADER_SIZE)<br>> +s.setblocking(False)<br>> +fixheader = None<br>> +try:<br>> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE)<br>> +except socket.error as err:<br>> + print 'Nothing to read yet:', str(err).split(']')[1]<br>> +else:<br>> + print 'Received fixheader'<br>> +s.setblocking(True)<br>> +<br>> +# Send ping<br>> +query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING })<br>> +s.send(msgpack.dumps(len(query)) + query)<br>> +<br>> +# Read error packet<br>> +if not fixheader:<br>> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE)<br>> print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE<br>> unpacker.feed(fixheader)<br>> packet_len = unpacker.unpack()<br>></div></div></div></div></blockquote></div><div> <div> </div><div data-signature-widget="container"><div data-signature-widget="content"><div>--<br>Ilya Kosarev</div></div></div><div> </div></div></div></div></div></div><div> </div></BODY></HTML>