>
> static void
> @@ -1382,10 +1437,6 @@ tx_process_destroy(struct cmsg *m)
> session_destroy(con->session);
> con->session = NULL; /* safety */
> }
> - /*
> - * Got to be done in iproto thread since
> - * that's where the memory is allocated.
> - */
> obuf_destroy(&con->obuf[0]);
> obuf_destroy(&con->obuf[1]);
> }
> @@ -1478,13 +1529,27 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
> }
> }
>
> -static inline struct iproto_msg *
> -tx_accept_msg(struct cmsg *m)
> +static inline int
> +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg)
> {
> - struct iproto_msg *msg = (struct iproto_msg *) m;
> - tx_accept_wpos(msg->connection, &msg->wpos);
> - tx_fiber_init(msg->connection->session, msg->header.sync);
> - return msg;
> + *msg = (struct iproto_msg *) m;
> + /*
> + * In case connection init failed we don't need to try anymore.
> + */
> + if ((*msg)->connection->init_failed)
> + return -1;
> + /*
> + * In case session was not created we need to init connection in tx and
> + * create it here.
> + */
> + if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) {
> + (*msg)->connection->init_failed = true;
> + (*msg)->close_connection = true;
> + return -1;
> + }
> + tx_accept_wpos((*msg)->connection, &(*msg)->wpos);
> + tx_fiber_init((*msg)->connection->session, (*msg)->header.sync);
> + return 0;
> }
>
> /**
> @@ -1507,7 +1572,14 @@ tx_reply_error(struct iproto_msg *msg)
> static void
> tx_reply_iproto_error(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + /*
> + * We don't need to check tx_accept_msg() return value here
> + * as far as if we might only process iproto error in tx
> + * in case connection session is already created and
> + * thus tx_accept_msg() can't fail.
> + */
> + tx_accept_msg(m, &msg);
> struct obuf *out = msg->connection->tx.p_obuf;
> iproto_reply_error(out, diag_last_error(&msg->diag),
> msg->header.sync, ::schema_version);
> @@ -1527,7 +1599,9 @@ tx_inject_delay(void)
> static void
> tx_process1(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + if (tx_accept_msg(m, &msg) != 0)
> + goto error;
> if (tx_check_schema(msg->header.schema_version))
> goto error;
>
> @@ -1553,17 +1627,20 @@ error:
> static void
> tx_process_select(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + struct request *req;
> struct obuf *out;
> struct obuf_svp svp;
> struct port port;
> int count;
> int rc;
> - struct request *req = &msg->dml;
> + if (tx_accept_msg(m, &msg) != 0)
> + goto error;
> if (tx_check_schema(msg->header.schema_version))
> goto error;
>
> tx_inject_delay();
> + req = &msg->dml;
> rc = box_select(req->space_id, req->index_id,
> req->iterator, req->offset, req->limit,
> req->key, req->key_end, &port);
> @@ -1607,7 +1684,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event)
> static void
> tx_process_call(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + if (tx_accept_msg(m, &msg) != 0)
> + goto error;
> if (tx_check_schema(msg->header.schema_version))
> goto error;
>
> @@ -1686,13 +1765,15 @@ error:
> static void
> tx_process_misc(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> - struct iproto_connection *con = msg->connection;
> - struct obuf *out = con->tx.p_obuf;
> + struct iproto_msg *msg;
> + if (tx_accept_msg(m, &msg) != 0)
> + goto error;
> if (tx_check_schema(msg->header.schema_version))
> goto error;
>
> try {
> + struct iproto_connection *con = msg->connection;
> + struct obuf *out = con->tx.p_obuf;
> struct ballot ballot;
> switch (msg->header.type) {
> case IPROTO_AUTH:
> @@ -1729,7 +1810,7 @@ error:
> static void
> tx_process_sql(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> struct obuf *out;
> struct port port;
> struct sql_bind *bind = NULL;
> @@ -1738,6 +1819,8 @@ tx_process_sql(struct cmsg *m)
> uint32_t len;
> bool is_unprepare = false;
>
> + if (tx_accept_msg(m, &msg) != 0)
> + goto error;
> if (tx_check_schema(msg->header.schema_version))
> goto error;
> assert(msg->header.type == IPROTO_EXECUTE ||
> @@ -1825,7 +1908,11 @@ error:
> static void
> tx_process_replication(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + if (tx_accept_msg(m, &msg) != 0) {
> + tx_reply_error(msg);
> + return;
> + }
> struct iproto_connection *con = msg->connection;
> struct ev_io io;
> coio_create(&io, con->input.fd);
> @@ -1865,6 +1952,29 @@ tx_process_replication(struct cmsg *m)
> }
> }
>
> +/**
> + * Check connection health and try to send an error to the client
> + * in case of internal connection init or on_connect trigger failure.
> + */
> +static bool
> +iproto_connection_fail(struct iproto_msg *msg)
> +{
> + if (!msg->close_connection)
> + return false;
> + struct iproto_connection *con = msg->connection;
> + int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov,
> + obuf_iovcnt(msg->wpos.obuf));
> + if (nwr > 0) {
> + /* Count statistics. */
> + rmean_collect(rmean_net, IPROTO_SENT, nwr);
> + } else if (nwr < 0 && ! sio_wouldblock(errno)) {
> + diag_log();
> + }
> + iproto_connection_close(con);
> + iproto_msg_delete(msg);
> + return true;
> +}
> +
> static void
> net_send_msg(struct cmsg *m)
> {
> @@ -1881,6 +1991,9 @@ net_send_msg(struct cmsg *m)
> }
> con->wend = msg->wpos;
>
> + if (iproto_connection_fail(msg))
> + return;
> +
> if (evio_has_fd(&con->output)) {
> if (! ev_is_active(&con->output))
> ev_feed_event(con->loop, &con->output, EV_WRITE);
> @@ -1910,6 +2023,10 @@ net_end_join(struct cmsg *m)
> struct iproto_connection *con = msg->connection;
>
> msg->p_ibuf->rpos += msg->len;
> +
> + if (iproto_connection_fail(msg))
> + return;
> +
> iproto_msg_delete(msg);
>
> assert(! ev_is_active(&con->input));
> @@ -1928,6 +2045,10 @@ net_end_subscribe(struct cmsg *m)
> struct iproto_connection *con = msg->connection;
>
> msg->p_ibuf->rpos += msg->len;
> +
> + if (iproto_connection_fail(msg))
> + return;
> +
> iproto_msg_delete(msg);
>
> assert(! ev_is_active(&con->input));
> @@ -1936,81 +2057,60 @@ net_end_subscribe(struct cmsg *m)
> }
>
> /**
> - * Handshake a connection: invoke the on-connect trigger
> - * and possibly authenticate. Try to send the client an error
> - * upon a failure.
> + * Handshake a connection: prepare greeting for it.
> */
> static void
> -tx_process_connect(struct cmsg *m)
> +iproto_process_connect(struct iproto_msg *msg)
> {
> - struct iproto_msg *msg = (struct iproto_msg *) m;
> struct iproto_connection *con = msg->connection;
> struct obuf *out = msg->connection->tx.p_obuf;
> - try { /* connect. */
> - con->session = session_create(SESSION_TYPE_BINARY);
> - if (con->session == NULL)
> - diag_raise();
> - con->session->meta.connection = con;
> - tx_fiber_init(con->session, 0);
> - char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
> - /* TODO: dirty read from tx thread */
> - struct tt_uuid uuid = INSTANCE_UUID;
> - random_bytes(con->salt, IPROTO_SALT_SIZE);
> - greeting_encode(greeting, tarantool_version_id(), &uuid,
> - con->salt, IPROTO_SALT_SIZE);
> - obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
> - if (! rlist_empty(&session_on_connect)) {
> - if (session_run_on_connect_triggers(con->session) != 0)
> - diag_raise();
> - }
> + char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
> + /* TODO: dirty read from tx thread */
> + struct tt_uuid uuid = INSTANCE_UUID;
> + random_bytes(con->salt, IPROTO_SALT_SIZE);
> + greeting_encode(greeting, tarantool_version_id(), &uuid,
> + con->salt, IPROTO_SALT_SIZE);
> + if (obuf_dup(out, greeting, IPROTO_GREETING_SIZE)
> + != IPROTO_GREETING_SIZE) {
> + diag_set(OutOfMemory, IPROTO_GREETING_SIZE,
> + "greeting obuf", "dup");
> + iproto_reply_error(out, diag_last_error(&fiber()->diag),
> + msg->header.sync, ::schema_version);
> iproto_wpos_create(&msg->wpos, out);
> - } catch (Exception *e) {
> - tx_reply_error(msg);
> msg->close_connection = true;
> - }
> -}
> -
> -/**
> - * Send a response to connect to the client or close the
> - * connection in case on_connect trigger failed.
> - */
> -static void
> -net_send_greeting(struct cmsg *m)
> -{
> - struct iproto_msg *msg = (struct iproto_msg *) m;
> - struct iproto_connection *con = msg->connection;
> - if (msg->close_connection) {
> - struct obuf *out = msg->wpos.obuf;
> - int64_t nwr = sio_writev(con->output.fd, out->iov,
> - obuf_iovcnt(out));
> -
> - if (nwr > 0) {
> - /* Count statistics. */
> - rmean_collect(rmean_net, IPROTO_SENT, nwr);
> - } else if (nwr < 0 && ! sio_wouldblock(errno)) {
> - diag_log();
> - }
> - assert(iproto_connection_is_idle(con));
> - iproto_connection_close(con);
> - iproto_msg_delete(msg);
> + iproto_connection_fail(msg);
> return;
> }
> + iproto_wpos_create(&msg->wpos, out);
> con->wend = msg->wpos;
> - /*
> - * Connect is synchronous, so no one could have been
> - * messing up with the connection while it was in
> - * progress.
> - */
> assert(evio_has_fd(&con->output));
> - /* Handshake OK, start reading input. */
> ev_feed_event(con->loop, &con->output, EV_WRITE);
> iproto_msg_delete(msg);
> }
>
> -static const struct cmsg_hop connect_route[] = {
> - { tx_process_connect, &net_pipe },
> - { net_send_greeting, NULL },
> -};
> +static int
> +tx_init_connect(struct iproto_msg *msg)
> +{
> + struct iproto_connection *con = msg->connection;
> + obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
> + obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
> + con->tx.p_obuf = &con->obuf[0];
> + iproto_wpos_create(&con->wpos, con->tx.p_obuf);
> + iproto_wpos_create(&con->wend, con->tx.p_obuf);
> +
> + con->session = session_create(SESSION_TYPE_BINARY);
> + if (con->session == NULL)
> + return -1;
> + con->session->meta.connection = con;
> +
> + tx_fiber_init(con->session, 0);
> + if (! rlist_empty(&session_on_connect)) {
> + if (session_run_on_connect_triggers(con->session) != 0)
> + return -1;
> + }
> +
> + return 0;
> +}
>
> /** }}} */
>
> @@ -2037,11 +2137,10 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
> mempool_free(&iproto_connection_pool, con);
> return -1;
> }
> - cmsg_init(&msg->base, connect_route);
> msg->p_ibuf = con->p_ibuf;
> msg->wpos = con->wpos;
> msg->close_connection = false;
> - cpipe_push(&tx_pipe, &msg->base);
> + iproto_process_connect(msg);
> return 0;
> }
>
> @@ -2054,6 +2153,8 @@ static struct evio_service binary; /* iproto binary listener */
> static int
> net_cord_f(va_list /* ap */)
> {
> + slab_cache_create(&iproto_slabc, &runtime);
> +
> mempool_create(&iproto_msg_pool, &cord()->slabc,
> sizeof(struct iproto_msg));
> mempool_create(&iproto_connection_pool, &cord()->slabc,
> @@ -2297,7 +2398,8 @@ iproto_listen(const char *uri)
> size_t
> iproto_mem_used(void)
> {
> - return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc);
> + return slab_cache_used(&net_cord.slabc)
> + + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc);
> }
>
> size_t
> diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result
> index 5d064b7648..bfa9c2b759 100644
> --- a/test/box-py/bad_trigger.result
> +++ b/test/box-py/bad_trigger.result
> @@ -14,6 +14,7 @@ type(box.session.on_connect(f1))
> - function
> ...
> greeting: True
> +Nothing to read yet: Resource temporarily unavailable
> fixheader: True
> error code 32
> error message: [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value)
> diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py
> index 7d200b9218..4739dfe136 100644
> --- a/test/box-py/bad_trigger.test.py
> +++ b/test/box-py/bad_trigger.test.py
> @@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection
> from lib.tarantool_connection import TarantoolConnection
> from tarantool import NetworkError
> from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \
> - REQUEST_TYPE_ERROR
> + REQUEST_TYPE_ERROR, REQUEST_TYPE_PING
> import socket
> import msgpack
>
> @@ -26,9 +26,25 @@ s = conn.socket
> # Read greeting
> print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE
>
> -# Read error packet
> +# Check socket
> IPROTO_FIXHEADER_SIZE = 5
> -fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
> +s.setblocking(False)
> +fixheader = None
> +try:
> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
> +except socket.error as err:
> + print 'Nothing to read yet:', str(err).split(']')[1]
> +else:
> + print 'Received fixheader'
> +s.setblocking(True)
> +
> +# Send ping
> +query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING })
> +s.send(msgpack.dumps(len(query)) + query)
> +
> +# Read error packet
> +if not fixheader:
> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
> print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE
> unpacker.feed(fixheader)
> packet_len = unpacker.unpack()
>