[Tarantool-patches] [PATCH v2] iproto: make iproto thread more independent from tx

Ilya Kosarev i.kosarev at tarantool.org
Sat Sep 26 01:57:47 MSK 2020


Hi!
 
Thanks for the review.
 
4 answers below. Fixed all your comments in v3, pushed & sent to the next reviewer.
  
>Понедельник, 21 сентября 2020, 18:52 +03:00 от Leonid Vasiliev <lvasiliev at tarantool.org>:
> 
>Hi! Thank you for the patch.
>
>LGTM.
>
>All comments below are at your discretion:
>
>On 9/18/20 4:09 PM, Ilya Kosarev wrote:
>> On connection, an evio service callback is invoked to accept it. The
>> next step after acception was to process connection to tx thread
>> through cbus. This meant that any connection interaction involves
>> tx thread even before we get to decode what does the client want
>> from us. Consequently, a number of problems appears. The main one
>> is that we might get descriptor leak in case of unresponsive tx thread
>> (for example, when building secondary index). There are some other
>> cases where we might not want to spend precious tx time to process
>> the connection in case iproto can do it all alone.
>> This patch allows iproto to accept connection and send greeting by
>> itself. The connection is initialized in tx thread when the real
>> request comes through iproto_msg_decode(). In case request type was not
>> recognized we can also send reply with an error without using tx. It is
>> planned to add more iproto logic to prevent extra interaction with
>> tx thread. This patch already to some extent solves descriptors leakage
>> problem as far as connection establishes and stays in fetch_schema
>> state while tx thread is unresponsive.
>> The other user visible change is that on_connect triggers won't run on
>> connections that don't provide any input, as reflected in
>> bad_trigger.test.py.
>>
>> Part of #3776
>>
>> @TarantoolBot document
>> Title: iproto: on_connect triggers execution
>> Update the documentation for on_connect triggers to reflect that they
>> are now being executed only with the first request. Though the triggers
>> are still the first thing to be executed on a new connection. While it
>> is quite a synthetic case to establish a connection without making
>> any requests it is technically possible and now your triggers won't be
>> executed in this case. Some request is required to start their
>> execution.
>> ---
>> Branch:  https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto
>> Issue:  https://github.com/tarantool/tarantool/issues/3776
>>
>> @ChangeLog:
>> * Make iproto thread more independent from tx (gh-3776).
>
>- Changelog about changes visible to the user. I would suggest writing
>about on_connect triggers here. on_disconnect triggers will also not
>work if there was not one message has been sent, as I understand it.
*  
Right, i thought it is going to be too long for the ChangeLog. But actually it should be here.
>
>>
>> Changes in v2:
>> - docbot request provided
>> - ChangeLog provided
>> - net_send_msg() used instead of net_send_error() where needed
>> - error cases made clear in iproto_msg_decode()
>> - replaced net_check_connection() with iproto_connection_fail() predicate
>> - improved tx session initialization fail processing to avoid extra tries
>> - fixed obuf_dup() fail processing in iproto_process_connect()
>> - added some comments
>> - some comments style fixed
>>
>> src/box/iproto.cc | 304 +++++++++++++++++++++-----------
>> test/box-py/bad_trigger.result | 1 +
>> test/box-py/bad_trigger.test.py | 22 ++-
>> 3 files changed, 223 insertions(+), 104 deletions(-)
>>
>> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
>> index b8f65e5eca..67bbd357a3 100644
>> --- a/src/box/iproto.cc
>> +++ b/src/box/iproto.cc
>> @@ -245,7 +245,7 @@ iproto_msg_new(struct iproto_connection *con);
>> static void
>> iproto_resume(void);
>>
>> -static void
>> +static int
>> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>> bool *stop_input);
>>
>> @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)
>> iproto_resume();
>> }
>>
>> +static inline void
>> +iproto_connection_delete(struct iproto_connection *con);
>> +
>> /**
>> * A single global queue for all requests in all connections. All
>> * requests from all connections are processed concurrently.
>> @@ -280,6 +283,11 @@ static struct cord net_cord;
>> * in the tx thread.
>> */
>> static struct slab_cache net_slabc;
>> +/**
>> + * Slab cache used for allocating memory for output network buffers
>> + * in the iproto thread.
>> + */
>> +static struct slab_cache iproto_slabc;
>>
>> struct rmean *rmean_net;
>>
>> @@ -298,6 +306,9 @@ const char *rmean_net_strings[IPROTO_LAST] = {
>> "REQUESTS",
>> };
>>
>> +static int
>> +tx_init_connect(struct iproto_msg *msg);
>> +
>> static void
>> tx_process_destroy(struct cmsg *m);
>>
>> @@ -463,6 +474,7 @@ struct iproto_connection
>> struct ev_io output;
>> /** Logical session. */
>> struct session *session;
>> + bool init_failed;
>> ev_loop *loop;
>> /**
>> * Pre-allocated disconnect msg. Is sent right after
>> @@ -650,7 +662,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con)
>> * other parts of the connection.
>> */
>> con->state = IPROTO_CONNECTION_DESTROYED;
>> - cpipe_push(&tx_pipe, &con->destroy_msg);
>> + if (con->session != NULL)
>> + cpipe_push(&tx_pipe, &con->destroy_msg);
>> + else {
>> + /*
>> + * In case session was not created we can safely destroy
>> + * not involving tx thread. Thus we also need to destroy
>> + * obuf, which still belongs to iproto thread.
>> + */
>> + obuf_destroy(&con->obuf[0]);
>> + obuf_destroy(&con->obuf[1]);
>> + iproto_connection_delete(con);
>> + }
>> }
>>
>> /**
>> @@ -677,9 +700,18 @@ iproto_connection_close(struct iproto_connection *con)
>> * is done only once.
>> */
>> con->p_ibuf->wpos -= con->parse_size;
>> - cpipe_push(&tx_pipe, &con->disconnect_msg);
>> assert(con->state == IPROTO_CONNECTION_ALIVE);
>> con->state = IPROTO_CONNECTION_CLOSED;
>> + rlist_del(&con->in_stop_list);
>> + if (con->session != NULL)
>> + cpipe_push(&tx_pipe, &con->disconnect_msg);
>> + else
>> + /*
>> + * In case session was not created we can safely
>> + * try to start destroy not involving tx thread.
>> + */
>> + iproto_connection_try_to_start_destroy(con);
*  
Added brackets to make this if statement more clear.
>> + return;
>> } else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
>> iproto_connection_try_to_start_destroy(con);
>> } else {
>> @@ -809,6 +841,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>> assert(rlist_empty(&con->in_stop_list));
>> int n_requests = 0;
>> bool stop_input = false;
>> + bool obuf_in_iproto = (con->session == NULL);
>> const char *errmsg;
>> while (con->parse_size != 0 && !stop_input) {
>> if (iproto_check_msg_max()) {
>> @@ -853,12 +886,20 @@ err_msgpack:
>>
>> msg->len = reqend - reqstart; /* total request length */
>>
>> - iproto_msg_decode(msg, &pos, reqend, &stop_input);
>> - /*
>> - * This can't throw, but should not be
>> - * done in case of exception.
>> - */
>> - cpipe_push_input(&tx_pipe, &msg->base);
>> + if (iproto_msg_decode(msg, &pos, reqend, &stop_input) != -1) {
>> + if (obuf_in_iproto) {
>> + /*
>> + * If session was not created yet and obuf is
>> + * still in iproto we need to destroy it. New
>> + * one will be created in tx thread if needed.
>> + */
>> + obuf_destroy(&con->obuf[0]);
>> + obuf_destroy(&con->obuf[1]);
>> + obuf_in_iproto = false;
>> + }
>> + cpipe_push_input(&tx_pipe, &msg->base);
>> + }
>> +
>> n_requests++;
>> /* Request is parsed */
>> assert(reqend > reqstart);
>> @@ -1105,8 +1146,8 @@ iproto_connection_new(int fd)
>> ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
>> ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
>> ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
>> - obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
>> - obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
>> + obuf_create(&con->obuf[0], &iproto_slabc, iproto_readahead);
>> + obuf_create(&con->obuf[1], &iproto_slabc, iproto_readahead);
>> con->p_ibuf = &con->ibuf[0];
>> con->tx.p_obuf = &con->obuf[0];
>> iproto_wpos_create(&con->wpos, con->tx.p_obuf);
>> @@ -1114,6 +1155,7 @@ iproto_connection_new(int fd)
>> con->parse_size = 0;
>> con->long_poll_count = 0;
>> con->session = NULL;
>> + con->init_failed = false;
>> rlist_create(&con->in_stop_list);
>> /* It may be very awkward to allocate at close. */
>> cmsg_init(&con->destroy_msg, destroy_route);
>> @@ -1134,10 +1176,6 @@ iproto_connection_delete(struct iproto_connection *con)
>> assert(!evio_has_fd(&con->input));
>> assert(con->session == NULL);
>> assert(con->state == IPROTO_CONNECTION_DESTROYED);
>> - /*
>> - * The output buffers must have been deleted
>> - * in tx thread.
>> - */
>> ibuf_destroy(&con->ibuf[0]);
>> ibuf_destroy(&con->ibuf[1]);
>> assert(con->obuf[0].pos == 0 &&
>> @@ -1172,6 +1210,9 @@ tx_reply_error(struct iproto_msg *msg);
>> static void
>> tx_reply_iproto_error(struct cmsg *m);
>>
>> +static void
>> +tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos);
>> +
>> static void
>> net_send_msg(struct cmsg *msg);
>>
>> @@ -1244,7 +1285,7 @@ static const struct cmsg_hop error_route[] = {
>> { net_send_error, NULL },
>> };
>>
>> -static void
>> +static int
>> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>> bool *stop_input)
>> {
>> @@ -1314,13 +1355,27 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>> (uint32_t) type);
>
>- You have three possible return values. Maybe add a function
>description?
*  
Right, done.
>
>> goto error;
>> }
>> - return;
>> + return 0;
>> error:
>> /** Log and send the error. */
>> diag_log();
>> diag_create(&msg->diag);
>> diag_move(&fiber()->diag, &msg->diag);
>> - cmsg_init(&msg->base, error_route);
>> + if (msg->connection->session != NULL) {
>> + cmsg_init(&msg->base, error_route);
>> + return 1;
>> + }
>> + /*
>> + * In case session was not created we can process error path
>> + * without tx thread.
>> + */
>> + tx_accept_wpos(msg->connection, &msg->wpos);
>> + struct obuf *out = msg->connection->tx.p_obuf;
>> + iproto_reply_error(out, diag_last_error(&msg->diag),
>> + msg->header.sync, ::schema_version);
>> + iproto_wpos_create(&msg->wpos, out);
>> + net_send_msg(&(msg->base));
>> + return -1;
>> }
>
>- I meant something like:
>/** Log and send the error. */
>diag_log();
>if (msg->connection->session != NULL) {
>diag_create(&msg->diag);
>diag_move(&fiber()->diag, &msg->diag);
>cmsg_init(&msg->base, error_route);
>return 1;
>}
>/*
>* In case session was not created we can process error path
>* without tx thread.
>*/
>tx_accept_wpos(msg->connection, &msg->wpos);
>struct obuf *out = msg->connection->tx.p_obuf;
>iproto_reply_error(out, diag_last_error(&fiber()->diag),
>msg->header.sync, ::schema_version);
>iproto_wpos_create(&msg->wpos, out);
>net_send_msg(&(msg->base));
>return -1;
*  
Right, i see, i made it overcomplicated. We can also replace this block:
struct obuf *out = msg->connection->tx.p_obuf;
iproto_reply_error(out, diag_last_error(&fiber()->diag),
                   msg->header.sync, ::schema_version);
iproto_wpos_create(&msg->wpos, out) ;
          with an existent function.
>
>>
>> static void
>> @@ -1382,10 +1437,6 @@ tx_process_destroy(struct cmsg *m)
>> session_destroy(con->session);
>> con->session = NULL; /* safety */
>> }
>> - /*
>> - * Got to be done in iproto thread since
>> - * that's where the memory is allocated.
>> - */
>> obuf_destroy(&con->obuf[0]);
>> obuf_destroy(&con->obuf[1]);
>> }
>> @@ -1478,13 +1529,27 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
>> }
>> }
>>
>> -static inline struct iproto_msg *
>> -tx_accept_msg(struct cmsg *m)
>> +static inline int
>> +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg)
>> {
>> - struct iproto_msg *msg = (struct iproto_msg *) m;
>> - tx_accept_wpos(msg->connection, &msg->wpos);
>> - tx_fiber_init(msg->connection->session, msg->header.sync);
>> - return msg;
>> + *msg = (struct iproto_msg *) m;
>> + /*
>> + * In case connection init failed we don't need to try anymore.
>> + */
>> + if ((*msg)->connection->init_failed)
>> + return -1;
>> + /*
>> + * In case session was not created we need to init connection in tx and
>> + * create it here.
>> + */
>> + if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) {
>> + (*msg)->connection->init_failed = true;
>> + (*msg)->close_connection = true;
>> + return -1;
>> + }
>> + tx_accept_wpos((*msg)->connection, &(*msg)->wpos);
>> + tx_fiber_init((*msg)->connection->session, (*msg)->header.sync);
>> + return 0;
>> }
>>
>> /**
>> @@ -1507,7 +1572,14 @@ tx_reply_error(struct iproto_msg *msg)
>> static void
>> tx_reply_iproto_error(struct cmsg *m)
>> {
>> - struct iproto_msg *msg = tx_accept_msg(m);
>> + struct iproto_msg *msg;
>> + /*
>> + * We don't need to check tx_accept_msg() return value here
>> + * as far as if we might only process iproto error in tx
>> + * in case connection session is already created and
>> + * thus tx_accept_msg() can't fail.
>> + */
>> + tx_accept_msg(m, &msg);
>> struct obuf *out = msg->connection->tx.p_obuf;
>> iproto_reply_error(out, diag_last_error(&msg->diag),
>> msg->header.sync, ::schema_version);
>> @@ -1527,7 +1599,9 @@ tx_inject_delay(void)
>> static void
>> tx_process1(struct cmsg *m)
>> {
>> - struct iproto_msg *msg = tx_accept_msg(m);
>> + struct iproto_msg *msg;
>> + if (tx_accept_msg(m, &msg) != 0)
>> + goto error;
>> if (tx_check_schema(msg->header.schema_version))
>> goto error;
>>
>> @@ -1553,17 +1627,20 @@ error:
>> static void
>> tx_process_select(struct cmsg *m)
>> {
>> - struct iproto_msg *msg = tx_accept_msg(m);
>> + struct iproto_msg *msg;
>> + struct request *req;
>> struct obuf *out;
>> struct obuf_svp svp;
>> struct port port;
>> int count;
>> int rc;
>> - struct request *req = &msg->dml;
>> + if (tx_accept_msg(m, &msg) != 0)
>> + goto error;
>> if (tx_check_schema(msg->header.schema_version))
>> goto error;
>>
>> tx_inject_delay();
>> + req = &msg->dml;
>> rc = box_select(req->space_id, req->index_id,
>> req->iterator, req->offset, req->limit,
>> req->key, req->key_end, &port);
>> @@ -1607,7 +1684,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event)
>> static void
>> tx_process_call(struct cmsg *m)
>> {
>> - struct iproto_msg *msg = tx_accept_msg(m);
>> + struct iproto_msg *msg;
>> + if (tx_accept_msg(m, &msg) != 0)
>> + goto error;
>> if (tx_check_schema(msg->header.schema_version))
>> goto error;
>>
>> @@ -1686,13 +1765,15 @@ error:
>> static void
>> tx_process_misc(struct cmsg *m)
>> {
>> - struct iproto_msg *msg = tx_accept_msg(m);
>> - struct iproto_connection *con = msg->connection;
>> - struct obuf *out = con->tx.p_obuf;
>> + struct iproto_msg *msg;
>> + if (tx_accept_msg(m, &msg) != 0)
>> + goto error;
>> if (tx_check_schema(msg->header.schema_version))
>> goto error;
>>
>> try {
>> + struct iproto_connection *con = msg->connection;
>> + struct obuf *out = con->tx.p_obuf;
>> struct ballot ballot;
>> switch (msg->header.type) {
>> case IPROTO_AUTH:
>> @@ -1729,7 +1810,7 @@ error:
>> static void
>> tx_process_sql(struct cmsg *m)
>> {
>> - struct iproto_msg *msg = tx_accept_msg(m);
>> + struct iproto_msg *msg;
>> struct obuf *out;
>> struct port port;
>> struct sql_bind *bind = NULL;
>> @@ -1738,6 +1819,8 @@ tx_process_sql(struct cmsg *m)
>> uint32_t len;
>> bool is_unprepare = false;
>>
>> + if (tx_accept_msg(m, &msg) != 0)
>> + goto error;
>> if (tx_check_schema(msg->header.schema_version))
>> goto error;
>> assert(msg->header.type == IPROTO_EXECUTE ||
>> @@ -1825,7 +1908,11 @@ error:
>> static void
>> tx_process_replication(struct cmsg *m)
>> {
>> - struct iproto_msg *msg = tx_accept_msg(m);
>> + struct iproto_msg *msg;
>> + if (tx_accept_msg(m, &msg) != 0) {
>> + tx_reply_error(msg);
>> + return;
>> + }
>> struct iproto_connection *con = msg->connection;
>> struct ev_io io;
>> coio_create(&io, con->input.fd);
>> @@ -1865,6 +1952,29 @@ tx_process_replication(struct cmsg *m)
>> }
>> }
>>
>> +/**
>> + * Check connection health and try to send an error to the client
>> + * in case of internal connection init or on_connect trigger failure.
>> + */
>> +static bool
>> +iproto_connection_fail(struct iproto_msg *msg)
>> +{
>> + if (!msg->close_connection)
>> + return false;
>> + struct iproto_connection *con = msg->connection;
>> + int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov,
>> + obuf_iovcnt(msg->wpos.obuf));
>> + if (nwr > 0) {
>> + /* Count statistics. */
>> + rmean_collect(rmean_net, IPROTO_SENT, nwr);
>> + } else if (nwr < 0 && ! sio_wouldblock(errno)) {
>> + diag_log();
>> + }
>> + iproto_connection_close(con);
>> + iproto_msg_delete(msg);
>> + return true;
>> +}
>> +
>> static void
>> net_send_msg(struct cmsg *m)
>> {
>> @@ -1881,6 +1991,9 @@ net_send_msg(struct cmsg *m)
>> }
>> con->wend = msg->wpos;
>>
>> + if (iproto_connection_fail(msg))
>> + return;
>> +
>> if (evio_has_fd(&con->output)) {
>> if (! ev_is_active(&con->output))
>> ev_feed_event(con->loop, &con->output, EV_WRITE);
>> @@ -1910,6 +2023,10 @@ net_end_join(struct cmsg *m)
>> struct iproto_connection *con = msg->connection;
>>
>> msg->p_ibuf->rpos += msg->len;
>> +
>> + if (iproto_connection_fail(msg))
>> + return;
>> +
>> iproto_msg_delete(msg);
>>
>> assert(! ev_is_active(&con->input));
>> @@ -1928,6 +2045,10 @@ net_end_subscribe(struct cmsg *m)
>> struct iproto_connection *con = msg->connection;
>>
>> msg->p_ibuf->rpos += msg->len;
>> +
>> + if (iproto_connection_fail(msg))
>> + return;
>> +
>> iproto_msg_delete(msg);
>>
>> assert(! ev_is_active(&con->input));
>> @@ -1936,81 +2057,60 @@ net_end_subscribe(struct cmsg *m)
>> }
>>
>> /**
>> - * Handshake a connection: invoke the on-connect trigger
>> - * and possibly authenticate. Try to send the client an error
>> - * upon a failure.
>> + * Handshake a connection: prepare greeting for it.
>> */
>> static void
>> -tx_process_connect(struct cmsg *m)
>> +iproto_process_connect(struct iproto_msg *msg)
>> {
>> - struct iproto_msg *msg = (struct iproto_msg *) m;
>> struct iproto_connection *con = msg->connection;
>> struct obuf *out = msg->connection->tx.p_obuf;
>> - try { /* connect. */
>> - con->session = session_create(SESSION_TYPE_BINARY);
>> - if (con->session == NULL)
>> - diag_raise();
>> - con->session->meta.connection = con;
>> - tx_fiber_init(con->session, 0);
>> - char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
>> - /* TODO: dirty read from tx thread */
>> - struct tt_uuid uuid = INSTANCE_UUID;
>> - random_bytes(con->salt, IPROTO_SALT_SIZE);
>> - greeting_encode(greeting, tarantool_version_id(), &uuid,
>> - con->salt, IPROTO_SALT_SIZE);
>> - obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
>> - if (! rlist_empty(&session_on_connect)) {
>> - if (session_run_on_connect_triggers(con->session) != 0)
>> - diag_raise();
>> - }
>> + char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
>> + /* TODO: dirty read from tx thread */
>> + struct tt_uuid uuid = INSTANCE_UUID;
>> + random_bytes(con->salt, IPROTO_SALT_SIZE);
>> + greeting_encode(greeting, tarantool_version_id(), &uuid,
>> + con->salt, IPROTO_SALT_SIZE);
>> + if (obuf_dup(out, greeting, IPROTO_GREETING_SIZE)
>> + != IPROTO_GREETING_SIZE) {
>> + diag_set(OutOfMemory, IPROTO_GREETING_SIZE,
>> + "greeting obuf", "dup");
>> + iproto_reply_error(out, diag_last_error(&fiber()->diag),
>> + msg->header.sync, ::schema_version);
>> iproto_wpos_create(&msg->wpos, out);
>> - } catch (Exception *e) {
>> - tx_reply_error(msg);
>> msg->close_connection = true;
>> - }
>> -}
>> -
>> -/**
>> - * Send a response to connect to the client or close the
>> - * connection in case on_connect trigger failed.
>> - */
>> -static void
>> -net_send_greeting(struct cmsg *m)
>> -{
>> - struct iproto_msg *msg = (struct iproto_msg *) m;
>> - struct iproto_connection *con = msg->connection;
>> - if (msg->close_connection) {
>> - struct obuf *out = msg->wpos.obuf;
>> - int64_t nwr = sio_writev(con->output.fd, out->iov,
>> - obuf_iovcnt(out));
>> -
>> - if (nwr > 0) {
>> - /* Count statistics. */
>> - rmean_collect(rmean_net, IPROTO_SENT, nwr);
>> - } else if (nwr < 0 && ! sio_wouldblock(errno)) {
>> - diag_log();
>> - }
>> - assert(iproto_connection_is_idle(con));
>> - iproto_connection_close(con);
>> - iproto_msg_delete(msg);
>> + iproto_connection_fail(msg);
>> return;
>> }
>> + iproto_wpos_create(&msg->wpos, out);
>> con->wend = msg->wpos;
>> - /*
>> - * Connect is synchronous, so no one could have been
>> - * messing up with the connection while it was in
>> - * progress.
>> - */
>> assert(evio_has_fd(&con->output));
>> - /* Handshake OK, start reading input. */
>> ev_feed_event(con->loop, &con->output, EV_WRITE);
>> iproto_msg_delete(msg);
>> }
>>
>> -static const struct cmsg_hop connect_route[] = {
>> - { tx_process_connect, &net_pipe },
>> - { net_send_greeting, NULL },
>> -};
>> +static int
>> +tx_init_connect(struct iproto_msg *msg)
>> +{
>> + struct iproto_connection *con = msg->connection;
>> + obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
>> + obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
>> + con->tx.p_obuf = &con->obuf[0];
>> + iproto_wpos_create(&con->wpos, con->tx.p_obuf);
>> + iproto_wpos_create(&con->wend, con->tx.p_obuf);
>> +
>> + con->session = session_create(SESSION_TYPE_BINARY);
>> + if (con->session == NULL)
>> + return -1;
>> + con->session->meta.connection = con;
>> +
>> + tx_fiber_init(con->session, 0);
>> + if (! rlist_empty(&session_on_connect)) {
>> + if (session_run_on_connect_triggers(con->session) != 0)
>> + return -1;
>> + }
>> +
>> + return 0;
>> +}
>>
>> /** }}} */
>>
>> @@ -2037,11 +2137,10 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
>> mempool_free(&iproto_connection_pool, con);
>> return -1;
>> }
>> - cmsg_init(&msg->base, connect_route);
>> msg->p_ibuf = con->p_ibuf;
>> msg->wpos = con->wpos;
>> msg->close_connection = false;
>> - cpipe_push(&tx_pipe, &msg->base);
>> + iproto_process_connect(msg);
>> return 0;
>> }
>>
>> @@ -2054,6 +2153,8 @@ static struct evio_service binary; /* iproto binary listener */
>> static int
>> net_cord_f(va_list /* ap */)
>> {
>> + slab_cache_create(&iproto_slabc, &runtime);
>> +
>> mempool_create(&iproto_msg_pool, &cord()->slabc,
>> sizeof(struct iproto_msg));
>> mempool_create(&iproto_connection_pool, &cord()->slabc,
>> @@ -2297,7 +2398,8 @@ iproto_listen(const char *uri)
>> size_t
>> iproto_mem_used(void)
>> {
>> - return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc);
>> + return slab_cache_used(&net_cord.slabc)
>> + + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc);
>> }
>>
>> size_t
>> diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result
>> index 5d064b7648..bfa9c2b759 100644
>> --- a/test/box-py/bad_trigger.result
>> +++ b/test/box-py/bad_trigger.result
>> @@ -14,6 +14,7 @@ type(box.session.on_connect(f1))
>> - function
>> ...
>> greeting: True
>> +Nothing to read yet: Resource temporarily unavailable
>> fixheader: True
>> error code 32
>> error message: [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value)
>> diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py
>> index 7d200b9218..4739dfe136 100644
>> --- a/test/box-py/bad_trigger.test.py
>> +++ b/test/box-py/bad_trigger.test.py
>> @@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection
>> from lib.tarantool_connection import TarantoolConnection
>> from tarantool import NetworkError
>> from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \
>> - REQUEST_TYPE_ERROR
>> + REQUEST_TYPE_ERROR, REQUEST_TYPE_PING
>> import socket
>> import msgpack
>>
>> @@ -26,9 +26,25 @@ s = conn.socket
>> # Read greeting
>> print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE
>>
>> -# Read error packet
>> +# Check socket
>> IPROTO_FIXHEADER_SIZE = 5
>> -fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
>> +s.setblocking(False)
>> +fixheader = None
>> +try:
>> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
>> +except socket.error as err:
>> + print 'Nothing to read yet:', str(err).split(']')[1]
>> +else:
>> + print 'Received fixheader'
>> +s.setblocking(True)
>> +
>> +# Send ping
>> +query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING })
>> +s.send(msgpack.dumps(len(query)) + query)
>> +
>> +# Read error packet
>> +if not fixheader:
>> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
>> print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE
>> unpacker.feed(fixheader)
>> packet_len = unpacker.unpack()
>> 
 
 
--
Ilya Kosarev
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20200926/c8e2c0c4/attachment.html>


More information about the Tarantool-patches mailing list