Hi!
 
Thanks for the review.
 
4 answers below. Fixed all your comments in v3, pushed & sent to the next reviewer.
 
Понедельник, 21 сентября 2020, 18:52 +03:00 от Leonid Vasiliev <lvasiliev@tarantool.org>:
 
Hi! Thank you for the patch.

LGTM.

All comments below are at your discretion:

On 9/18/20 4:09 PM, Ilya Kosarev wrote:
> On connection, an evio service callback is invoked to accept it. The
> next step after acception was to process connection to tx thread
> through cbus. This meant that any connection interaction involves
> tx thread even before we get to decode what does the client want
> from us. Consequently, a number of problems appears. The main one
> is that we might get descriptor leak in case of unresponsive tx thread
> (for example, when building secondary index). There are some other
> cases where we might not want to spend precious tx time to process
> the connection in case iproto can do it all alone.
> This patch allows iproto to accept connection and send greeting by
> itself. The connection is initialized in tx thread when the real
> request comes through iproto_msg_decode(). In case request type was not
> recognized we can also send reply with an error without using tx. It is
> planned to add more iproto logic to prevent extra interaction with
> tx thread. This patch already to some extent solves descriptors leakage
> problem as far as connection establishes and stays in fetch_schema
> state while tx thread is unresponsive.
> The other user visible change is that on_connect triggers won't run on
> connections that don't provide any input, as reflected in
> bad_trigger.test.py.
>
> Part of #3776
>
> @TarantoolBot document
> Title: iproto: on_connect triggers execution
> Update the documentation for on_connect triggers to reflect that they
> are now being executed only with the first request. Though the triggers
> are still the first thing to be executed on a new connection. While it
> is quite a synthetic case to establish a connection without making
> any requests it is technically possible and now your triggers won't be
> executed in this case. Some request is required to start their
> execution.
> ---
> Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto
> Issue: https://github.com/tarantool/tarantool/issues/3776
>
> @ChangeLog:
> * Make iproto thread more independent from tx (gh-3776).

- Changelog about changes visible to the user. I would suggest writing
about on_connect triggers here. on_disconnect triggers will also not
work if there was not one message has been sent, as I understand it.
  1. Right, i thought it is going to be too long for the ChangeLog. But actually it should be here.

>
> Changes in v2:
> - docbot request provided
> - ChangeLog provided
> - net_send_msg() used instead of net_send_error() where needed
> - error cases made clear in iproto_msg_decode()
> - replaced net_check_connection() with iproto_connection_fail() predicate
> - improved tx session initialization fail processing to avoid extra tries
> - fixed obuf_dup() fail processing in iproto_process_connect()
> - added some comments
> - some comments style fixed
>
> src/box/iproto.cc | 304 +++++++++++++++++++++-----------
> test/box-py/bad_trigger.result | 1 +
> test/box-py/bad_trigger.test.py | 22 ++-
> 3 files changed, 223 insertions(+), 104 deletions(-)
>
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index b8f65e5eca..67bbd357a3 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -245,7 +245,7 @@ iproto_msg_new(struct iproto_connection *con);
> static void
> iproto_resume(void);
>
> -static void
> +static int
> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
> bool *stop_input);
>
> @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)
> iproto_resume();
> }
>
> +static inline void
> +iproto_connection_delete(struct iproto_connection *con);
> +
> /**
> * A single global queue for all requests in all connections. All
> * requests from all connections are processed concurrently.
> @@ -280,6 +283,11 @@ static struct cord net_cord;
> * in the tx thread.
> */
> static struct slab_cache net_slabc;
> +/**
> + * Slab cache used for allocating memory for output network buffers
> + * in the iproto thread.
> + */
> +static struct slab_cache iproto_slabc;
>
> struct rmean *rmean_net;
>
> @@ -298,6 +306,9 @@ const char *rmean_net_strings[IPROTO_LAST] = {
> "REQUESTS",
> };
>
> +static int
> +tx_init_connect(struct iproto_msg *msg);
> +
> static void
> tx_process_destroy(struct cmsg *m);
>
> @@ -463,6 +474,7 @@ struct iproto_connection
> struct ev_io output;
> /** Logical session. */
> struct session *session;
> + bool init_failed;
> ev_loop *loop;
> /**
> * Pre-allocated disconnect msg. Is sent right after
> @@ -650,7 +662,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con)
> * other parts of the connection.
> */
> con->state = IPROTO_CONNECTION_DESTROYED;
> - cpipe_push(&tx_pipe, &con->destroy_msg);
> + if (con->session != NULL)
> + cpipe_push(&tx_pipe, &con->destroy_msg);
> + else {
> + /*
> + * In case session was not created we can safely destroy
> + * not involving tx thread. Thus we also need to destroy
> + * obuf, which still belongs to iproto thread.
> + */
> + obuf_destroy(&con->obuf[0]);
> + obuf_destroy(&con->obuf[1]);
> + iproto_connection_delete(con);
> + }
> }
>
> /**
> @@ -677,9 +700,18 @@ iproto_connection_close(struct iproto_connection *con)
> * is done only once.
> */
> con->p_ibuf->wpos -= con->parse_size;
> - cpipe_push(&tx_pipe, &con->disconnect_msg);
> assert(con->state == IPROTO_CONNECTION_ALIVE);
> con->state = IPROTO_CONNECTION_CLOSED;
> + rlist_del(&con->in_stop_list);
> + if (con->session != NULL)
> + cpipe_push(&tx_pipe, &con->disconnect_msg);
> + else
> + /*
> + * In case session was not created we can safely
> + * try to start destroy not involving tx thread.
> + */
> + iproto_connection_try_to_start_destroy(con);
  1. Added brackets to make this if statement more clear.
> + return;
> } else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
> iproto_connection_try_to_start_destroy(con);
> } else {
> @@ -809,6 +841,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
> assert(rlist_empty(&con->in_stop_list));
> int n_requests = 0;
> bool stop_input = false;
> + bool obuf_in_iproto = (con->session == NULL);
> const char *errmsg;
> while (con->parse_size != 0 && !stop_input) {
> if (iproto_check_msg_max()) {
> @@ -853,12 +886,20 @@ err_msgpack:
>
> msg->len = reqend - reqstart; /* total request length */
>
> - iproto_msg_decode(msg, &pos, reqend, &stop_input);
> - /*
> - * This can't throw, but should not be
> - * done in case of exception.
> - */
> - cpipe_push_input(&tx_pipe, &msg->base);
> + if (iproto_msg_decode(msg, &pos, reqend, &stop_input) != -1) {
> + if (obuf_in_iproto) {
> + /*
> + * If session was not created yet and obuf is
> + * still in iproto we need to destroy it. New
> + * one will be created in tx thread if needed.
> + */
> + obuf_destroy(&con->obuf[0]);
> + obuf_destroy(&con->obuf[1]);
> + obuf_in_iproto = false;
> + }
> + cpipe_push_input(&tx_pipe, &msg->base);
> + }
> +
> n_requests++;
> /* Request is parsed */
> assert(reqend > reqstart);
> @@ -1105,8 +1146,8 @@ iproto_connection_new(int fd)
> ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
> ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
> ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
> - obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
> - obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
> + obuf_create(&con->obuf[0], &iproto_slabc, iproto_readahead);
> + obuf_create(&con->obuf[1], &iproto_slabc, iproto_readahead);
> con->p_ibuf = &con->ibuf[0];
> con->tx.p_obuf = &con->obuf[0];
> iproto_wpos_create(&con->wpos, con->tx.p_obuf);
> @@ -1114,6 +1155,7 @@ iproto_connection_new(int fd)
> con->parse_size = 0;
> con->long_poll_count = 0;
> con->session = NULL;
> + con->init_failed = false;
> rlist_create(&con->in_stop_list);
> /* It may be very awkward to allocate at close. */
> cmsg_init(&con->destroy_msg, destroy_route);
> @@ -1134,10 +1176,6 @@ iproto_connection_delete(struct iproto_connection *con)
> assert(!evio_has_fd(&con->input));
> assert(con->session == NULL);
> assert(con->state == IPROTO_CONNECTION_DESTROYED);
> - /*
> - * The output buffers must have been deleted
> - * in tx thread.
> - */
> ibuf_destroy(&con->ibuf[0]);
> ibuf_destroy(&con->ibuf[1]);
> assert(con->obuf[0].pos == 0 &&
> @@ -1172,6 +1210,9 @@ tx_reply_error(struct iproto_msg *msg);
> static void
> tx_reply_iproto_error(struct cmsg *m);
>
> +static void
> +tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos);
> +
> static void
> net_send_msg(struct cmsg *msg);
>
> @@ -1244,7 +1285,7 @@ static const struct cmsg_hop error_route[] = {
> { net_send_error, NULL },
> };
>
> -static void
> +static int
> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
> bool *stop_input)
> {
> @@ -1314,13 +1355,27 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
> (uint32_t) type);

- You have three possible return values. Maybe add a function
description?
  1. Right, done.

> goto error;
> }
> - return;
> + return 0;
> error:
> /** Log and send the error. */
> diag_log();
> diag_create(&msg->diag);
> diag_move(&fiber()->diag, &msg->diag);
> - cmsg_init(&msg->base, error_route);
> + if (msg->connection->session != NULL) {
> + cmsg_init(&msg->base, error_route);
> + return 1;
> + }
> + /*
> + * In case session was not created we can process error path
> + * without tx thread.
> + */
> + tx_accept_wpos(msg->connection, &msg->wpos);
> + struct obuf *out = msg->connection->tx.p_obuf;
> + iproto_reply_error(out, diag_last_error(&msg->diag),
> + msg->header.sync, ::schema_version);
> + iproto_wpos_create(&msg->wpos, out);
> + net_send_msg(&(msg->base));
> + return -1;
> }

- I meant something like:
/** Log and send the error. */
diag_log();
if (msg->connection->session != NULL) {
diag_create(&msg->diag);
diag_move(&fiber()->diag, &msg->diag);
cmsg_init(&msg->base, error_route);
return 1;
}
/*
* In case session was not created we can process error path
* without tx thread.
*/
tx_accept_wpos(msg->connection, &msg->wpos);
struct obuf *out = msg->connection->tx.p_obuf;
iproto_reply_error(out, diag_last_error(&fiber()->diag),
msg->header.sync, ::schema_version);
iproto_wpos_create(&msg->wpos, out);
net_send_msg(&(msg->base));
return -1;
  1. Right, i see, i made it overcomplicated. We can also replace this block:
struct obuf *out = msg->connection->tx.p_obuf;
iproto_reply_error(out, diag_last_error(&fiber()->diag),
                   msg->header.sync, ::schema_version);
iproto_wpos_create(&msg->wpos, out)
;
          with an existent function.

>
> static void
> @@ -1382,10 +1437,6 @@ tx_process_destroy(struct cmsg *m)
> session_destroy(con->session);
> con->session = NULL; /* safety */
> }
> - /*
> - * Got to be done in iproto thread since
> - * that's where the memory is allocated.
> - */
> obuf_destroy(&con->obuf[0]);
> obuf_destroy(&con->obuf[1]);
> }
> @@ -1478,13 +1529,27 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
> }
> }
>
> -static inline struct iproto_msg *
> -tx_accept_msg(struct cmsg *m)
> +static inline int
> +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg)
> {
> - struct iproto_msg *msg = (struct iproto_msg *) m;
> - tx_accept_wpos(msg->connection, &msg->wpos);
> - tx_fiber_init(msg->connection->session, msg->header.sync);
> - return msg;
> + *msg = (struct iproto_msg *) m;
> + /*
> + * In case connection init failed we don't need to try anymore.
> + */
> + if ((*msg)->connection->init_failed)
> + return -1;
> + /*
> + * In case session was not created we need to init connection in tx and
> + * create it here.
> + */
> + if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) {
> + (*msg)->connection->init_failed = true;
> + (*msg)->close_connection = true;
> + return -1;
> + }
> + tx_accept_wpos((*msg)->connection, &(*msg)->wpos);
> + tx_fiber_init((*msg)->connection->session, (*msg)->header.sync);
> + return 0;
> }
>
> /**
> @@ -1507,7 +1572,14 @@ tx_reply_error(struct iproto_msg *msg)
> static void
> tx_reply_iproto_error(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + /*
> + * We don't need to check tx_accept_msg() return value here
> + * as far as if we might only process iproto error in tx
> + * in case connection session is already created and
> + * thus tx_accept_msg() can't fail.
> + */
> + tx_accept_msg(m, &msg);
> struct obuf *out = msg->connection->tx.p_obuf;
> iproto_reply_error(out, diag_last_error(&msg->diag),
> msg->header.sync, ::schema_version);
> @@ -1527,7 +1599,9 @@ tx_inject_delay(void)
> static void
> tx_process1(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + if (tx_accept_msg(m, &msg) != 0)
> + goto error;
> if (tx_check_schema(msg->header.schema_version))
> goto error;
>
> @@ -1553,17 +1627,20 @@ error:
> static void
> tx_process_select(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + struct request *req;
> struct obuf *out;
> struct obuf_svp svp;
> struct port port;
> int count;
> int rc;
> - struct request *req = &msg->dml;
> + if (tx_accept_msg(m, &msg) != 0)
> + goto error;
> if (tx_check_schema(msg->header.schema_version))
> goto error;
>
> tx_inject_delay();
> + req = &msg->dml;
> rc = box_select(req->space_id, req->index_id,
> req->iterator, req->offset, req->limit,
> req->key, req->key_end, &port);
> @@ -1607,7 +1684,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event)
> static void
> tx_process_call(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + if (tx_accept_msg(m, &msg) != 0)
> + goto error;
> if (tx_check_schema(msg->header.schema_version))
> goto error;
>
> @@ -1686,13 +1765,15 @@ error:
> static void
> tx_process_misc(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> - struct iproto_connection *con = msg->connection;
> - struct obuf *out = con->tx.p_obuf;
> + struct iproto_msg *msg;
> + if (tx_accept_msg(m, &msg) != 0)
> + goto error;
> if (tx_check_schema(msg->header.schema_version))
> goto error;
>
> try {
> + struct iproto_connection *con = msg->connection;
> + struct obuf *out = con->tx.p_obuf;
> struct ballot ballot;
> switch (msg->header.type) {
> case IPROTO_AUTH:
> @@ -1729,7 +1810,7 @@ error:
> static void
> tx_process_sql(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> struct obuf *out;
> struct port port;
> struct sql_bind *bind = NULL;
> @@ -1738,6 +1819,8 @@ tx_process_sql(struct cmsg *m)
> uint32_t len;
> bool is_unprepare = false;
>
> + if (tx_accept_msg(m, &msg) != 0)
> + goto error;
> if (tx_check_schema(msg->header.schema_version))
> goto error;
> assert(msg->header.type == IPROTO_EXECUTE ||
> @@ -1825,7 +1908,11 @@ error:
> static void
> tx_process_replication(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + if (tx_accept_msg(m, &msg) != 0) {
> + tx_reply_error(msg);
> + return;
> + }
> struct iproto_connection *con = msg->connection;
> struct ev_io io;
> coio_create(&io, con->input.fd);
> @@ -1865,6 +1952,29 @@ tx_process_replication(struct cmsg *m)
> }
> }
>
> +/**
> + * Check connection health and try to send an error to the client
> + * in case of internal connection init or on_connect trigger failure.
> + */
> +static bool
> +iproto_connection_fail(struct iproto_msg *msg)
> +{
> + if (!msg->close_connection)
> + return false;
> + struct iproto_connection *con = msg->connection;
> + int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov,
> + obuf_iovcnt(msg->wpos.obuf));
> + if (nwr > 0) {
> + /* Count statistics. */
> + rmean_collect(rmean_net, IPROTO_SENT, nwr);
> + } else if (nwr < 0 && ! sio_wouldblock(errno)) {
> + diag_log();
> + }
> + iproto_connection_close(con);
> + iproto_msg_delete(msg);
> + return true;
> +}
> +
> static void
> net_send_msg(struct cmsg *m)
> {
> @@ -1881,6 +1991,9 @@ net_send_msg(struct cmsg *m)
> }
> con->wend = msg->wpos;
>
> + if (iproto_connection_fail(msg))
> + return;
> +
> if (evio_has_fd(&con->output)) {
> if (! ev_is_active(&con->output))
> ev_feed_event(con->loop, &con->output, EV_WRITE);
> @@ -1910,6 +2023,10 @@ net_end_join(struct cmsg *m)
> struct iproto_connection *con = msg->connection;
>
> msg->p_ibuf->rpos += msg->len;
> +
> + if (iproto_connection_fail(msg))
> + return;
> +
> iproto_msg_delete(msg);
>
> assert(! ev_is_active(&con->input));
> @@ -1928,6 +2045,10 @@ net_end_subscribe(struct cmsg *m)
> struct iproto_connection *con = msg->connection;
>
> msg->p_ibuf->rpos += msg->len;
> +
> + if (iproto_connection_fail(msg))
> + return;
> +
> iproto_msg_delete(msg);
>
> assert(! ev_is_active(&con->input));
> @@ -1936,81 +2057,60 @@ net_end_subscribe(struct cmsg *m)
> }
>
> /**
> - * Handshake a connection: invoke the on-connect trigger
> - * and possibly authenticate. Try to send the client an error
> - * upon a failure.
> + * Handshake a connection: prepare greeting for it.
> */
> static void
> -tx_process_connect(struct cmsg *m)
> +iproto_process_connect(struct iproto_msg *msg)
> {
> - struct iproto_msg *msg = (struct iproto_msg *) m;
> struct iproto_connection *con = msg->connection;
> struct obuf *out = msg->connection->tx.p_obuf;
> - try { /* connect. */
> - con->session = session_create(SESSION_TYPE_BINARY);
> - if (con->session == NULL)
> - diag_raise();
> - con->session->meta.connection = con;
> - tx_fiber_init(con->session, 0);
> - char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
> - /* TODO: dirty read from tx thread */
> - struct tt_uuid uuid = INSTANCE_UUID;
> - random_bytes(con->salt, IPROTO_SALT_SIZE);
> - greeting_encode(greeting, tarantool_version_id(), &uuid,
> - con->salt, IPROTO_SALT_SIZE);
> - obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
> - if (! rlist_empty(&session_on_connect)) {
> - if (session_run_on_connect_triggers(con->session) != 0)
> - diag_raise();
> - }
> + char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
> + /* TODO: dirty read from tx thread */
> + struct tt_uuid uuid = INSTANCE_UUID;
> + random_bytes(con->salt, IPROTO_SALT_SIZE);
> + greeting_encode(greeting, tarantool_version_id(), &uuid,
> + con->salt, IPROTO_SALT_SIZE);
> + if (obuf_dup(out, greeting, IPROTO_GREETING_SIZE)
> + != IPROTO_GREETING_SIZE) {
> + diag_set(OutOfMemory, IPROTO_GREETING_SIZE,
> + "greeting obuf", "dup");
> + iproto_reply_error(out, diag_last_error(&fiber()->diag),
> + msg->header.sync, ::schema_version);
> iproto_wpos_create(&msg->wpos, out);
> - } catch (Exception *e) {
> - tx_reply_error(msg);
> msg->close_connection = true;
> - }
> -}
> -
> -/**
> - * Send a response to connect to the client or close the
> - * connection in case on_connect trigger failed.
> - */
> -static void
> -net_send_greeting(struct cmsg *m)
> -{
> - struct iproto_msg *msg = (struct iproto_msg *) m;
> - struct iproto_connection *con = msg->connection;
> - if (msg->close_connection) {
> - struct obuf *out = msg->wpos.obuf;
> - int64_t nwr = sio_writev(con->output.fd, out->iov,
> - obuf_iovcnt(out));
> -
> - if (nwr > 0) {
> - /* Count statistics. */
> - rmean_collect(rmean_net, IPROTO_SENT, nwr);
> - } else if (nwr < 0 && ! sio_wouldblock(errno)) {
> - diag_log();
> - }
> - assert(iproto_connection_is_idle(con));
> - iproto_connection_close(con);
> - iproto_msg_delete(msg);
> + iproto_connection_fail(msg);
> return;
> }
> + iproto_wpos_create(&msg->wpos, out);
> con->wend = msg->wpos;
> - /*
> - * Connect is synchronous, so no one could have been
> - * messing up with the connection while it was in
> - * progress.
> - */
> assert(evio_has_fd(&con->output));
> - /* Handshake OK, start reading input. */
> ev_feed_event(con->loop, &con->output, EV_WRITE);
> iproto_msg_delete(msg);
> }
>
> -static const struct cmsg_hop connect_route[] = {
> - { tx_process_connect, &net_pipe },
> - { net_send_greeting, NULL },
> -};
> +static int
> +tx_init_connect(struct iproto_msg *msg)
> +{
> + struct iproto_connection *con = msg->connection;
> + obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
> + obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
> + con->tx.p_obuf = &con->obuf[0];
> + iproto_wpos_create(&con->wpos, con->tx.p_obuf);
> + iproto_wpos_create(&con->wend, con->tx.p_obuf);
> +
> + con->session = session_create(SESSION_TYPE_BINARY);
> + if (con->session == NULL)
> + return -1;
> + con->session->meta.connection = con;
> +
> + tx_fiber_init(con->session, 0);
> + if (! rlist_empty(&session_on_connect)) {
> + if (session_run_on_connect_triggers(con->session) != 0)
> + return -1;
> + }
> +
> + return 0;
> +}
>
> /** }}} */
>
> @@ -2037,11 +2137,10 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
> mempool_free(&iproto_connection_pool, con);
> return -1;
> }
> - cmsg_init(&msg->base, connect_route);
> msg->p_ibuf = con->p_ibuf;
> msg->wpos = con->wpos;
> msg->close_connection = false;
> - cpipe_push(&tx_pipe, &msg->base);
> + iproto_process_connect(msg);
> return 0;
> }
>
> @@ -2054,6 +2153,8 @@ static struct evio_service binary; /* iproto binary listener */
> static int
> net_cord_f(va_list /* ap */)
> {
> + slab_cache_create(&iproto_slabc, &runtime);
> +
> mempool_create(&iproto_msg_pool, &cord()->slabc,
> sizeof(struct iproto_msg));
> mempool_create(&iproto_connection_pool, &cord()->slabc,
> @@ -2297,7 +2398,8 @@ iproto_listen(const char *uri)
> size_t
> iproto_mem_used(void)
> {
> - return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc);
> + return slab_cache_used(&net_cord.slabc)
> + + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc);
> }
>
> size_t
> diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result
> index 5d064b7648..bfa9c2b759 100644
> --- a/test/box-py/bad_trigger.result
> +++ b/test/box-py/bad_trigger.result
> @@ -14,6 +14,7 @@ type(box.session.on_connect(f1))
> - function
> ...
> greeting: True
> +Nothing to read yet: Resource temporarily unavailable
> fixheader: True
> error code 32
> error message: [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value)
> diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py
> index 7d200b9218..4739dfe136 100644
> --- a/test/box-py/bad_trigger.test.py
> +++ b/test/box-py/bad_trigger.test.py
> @@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection
> from lib.tarantool_connection import TarantoolConnection
> from tarantool import NetworkError
> from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \
> - REQUEST_TYPE_ERROR
> + REQUEST_TYPE_ERROR, REQUEST_TYPE_PING
> import socket
> import msgpack
>
> @@ -26,9 +26,25 @@ s = conn.socket
> # Read greeting
> print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE
>
> -# Read error packet
> +# Check socket
> IPROTO_FIXHEADER_SIZE = 5
> -fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
> +s.setblocking(False)
> +fixheader = None
> +try:
> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
> +except socket.error as err:
> + print 'Nothing to read yet:', str(err).split(']')[1]
> +else:
> + print 'Received fixheader'
> +s.setblocking(True)
> +
> +# Send ping
> +query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING })
> +s.send(msgpack.dumps(len(query)) + query)
> +
> +# Read error packet
> +if not fixheader:
> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
> print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE
> unpacker.feed(fixheader)
> packet_len = unpacker.unpack()
>
 
 
--
Ilya Kosarev