Tarantool development patches archive
 help / color / mirror / Atom feed
From: Leonid Vasiliev <lvasiliev@tarantool.org>
To: Ilya Kosarev <i.kosarev@tarantool.org>, alyapunov@tarantool.org
Cc: tarantool-patches@dev.tarantool.org
Subject: Re: [Tarantool-patches] [PATCH v2] iproto: make iproto thread more independent from tx
Date: Mon, 21 Sep 2020 18:51:29 +0300	[thread overview]
Message-ID: <280d8634-36ce-611c-faab-c8e887c5f74c@tarantool.org> (raw)
In-Reply-To: <20200918130942.16546-1-i.kosarev@tarantool.org>

Hi! Thank you for the patch.

LGTM.

All comments below are at your discretion:

On 9/18/20 4:09 PM, Ilya Kosarev wrote:
> On connection, an evio service callback is invoked to accept it. The
> next step after acception was to process connection to tx thread
> through cbus. This meant that any connection interaction involves
> tx thread even before we get to decode what does the client want
> from us. Consequently, a number of problems appears. The main one
> is that we might get descriptor leak in case of unresponsive tx thread
> (for example, when building secondary index). There are some other
> cases where we might not want to spend precious tx time to process
> the connection in case iproto can do it all alone.
> This patch allows iproto to accept connection and send greeting by
> itself. The connection is initialized in tx thread when the real
> request comes through iproto_msg_decode(). In case request type was not
> recognized we can also send reply with an error without using tx. It is
> planned to add more iproto logic to prevent extra interaction with
> tx thread. This patch already to some extent solves descriptors leakage
> problem as far as connection establishes and stays in fetch_schema
> state while tx thread is unresponsive.
> The other user visible change is that on_connect triggers won't run on
> connections that don't provide any input, as reflected in
> bad_trigger.test.py.
> 
> Part of #3776
> 
> @TarantoolBot document
> Title: iproto: on_connect triggers execution
> Update the documentation for on_connect triggers to reflect that they
> are now being executed only with the first request. Though the triggers
> are still the first thing to be executed on a new connection. While it
> is quite a synthetic case to establish a connection without making
> any requests it is technically possible and now your triggers won't be
> executed in this case. Some request is required to start their
> execution.
> ---
> Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto
> Issue: https://github.com/tarantool/tarantool/issues/3776
> 
> @ChangeLog:
>   * Make iproto thread more independent from tx (gh-3776).

- Changelog about changes visible to the user. I would suggest writing
about on_connect triggers here. on_disconnect triggers will also not
work if there was not one message has been sent, as I understand it.

> 
> Changes in v2:
> - docbot request provided
> - ChangeLog provided
> - net_send_msg() used instead of net_send_error() where needed
> - error cases made clear in iproto_msg_decode()
> - replaced net_check_connection() with iproto_connection_fail() predicate
> - improved tx session initialization fail processing to avoid extra tries
> - fixed obuf_dup() fail processing in iproto_process_connect()
> - added some comments
> - some comments style fixed
> 
>   src/box/iproto.cc               | 304 +++++++++++++++++++++-----------
>   test/box-py/bad_trigger.result  |   1 +
>   test/box-py/bad_trigger.test.py |  22 ++-
>   3 files changed, 223 insertions(+), 104 deletions(-)
> 
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index b8f65e5eca..67bbd357a3 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -245,7 +245,7 @@ iproto_msg_new(struct iproto_connection *con);
>   static void
>   iproto_resume(void);
>   
> -static void
> +static int
>   iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>   		  bool *stop_input);
>   
> @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)
>   	iproto_resume();
>   }
>   
> +static inline void
> +iproto_connection_delete(struct iproto_connection *con);
> +
>   /**
>    * A single global queue for all requests in all connections. All
>    * requests from all connections are processed concurrently.
> @@ -280,6 +283,11 @@ static struct cord net_cord;
>    * in the tx thread.
>    */
>   static struct slab_cache net_slabc;
> +/**
> + * Slab cache used for allocating memory for output network buffers
> + * in the iproto thread.
> + */
> +static struct slab_cache iproto_slabc;
>   
>   struct rmean *rmean_net;
>   
> @@ -298,6 +306,9 @@ const char *rmean_net_strings[IPROTO_LAST] = {
>   	"REQUESTS",
>   };
>   
> +static int
> +tx_init_connect(struct iproto_msg *msg);
> +
>   static void
>   tx_process_destroy(struct cmsg *m);
>   
> @@ -463,6 +474,7 @@ struct iproto_connection
>   	struct ev_io output;
>   	/** Logical session. */
>   	struct session *session;
> +	bool init_failed;
>   	ev_loop *loop;
>   	/**
>   	 * Pre-allocated disconnect msg. Is sent right after
> @@ -650,7 +662,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con)
>   	 * other parts of the connection.
>   	 */
>   	con->state = IPROTO_CONNECTION_DESTROYED;
> -	cpipe_push(&tx_pipe, &con->destroy_msg);
> +	if (con->session != NULL)
> +		cpipe_push(&tx_pipe, &con->destroy_msg);
> +	else {
> +		/*
> +		 * In case session was not created we can safely destroy
> +		 * not involving tx thread. Thus we also need to destroy
> +		 * obuf, which still belongs to iproto thread.
> +		 */
> +		obuf_destroy(&con->obuf[0]);
> +		obuf_destroy(&con->obuf[1]);
> +		iproto_connection_delete(con);
> +	}
>   }
>   
>   /**
> @@ -677,9 +700,18 @@ iproto_connection_close(struct iproto_connection *con)
>   		 * is done only once.
>   		 */
>   		con->p_ibuf->wpos -= con->parse_size;
> -		cpipe_push(&tx_pipe, &con->disconnect_msg);
>   		assert(con->state == IPROTO_CONNECTION_ALIVE);
>   		con->state = IPROTO_CONNECTION_CLOSED;
> +		rlist_del(&con->in_stop_list);
> +		if (con->session != NULL)
> +			cpipe_push(&tx_pipe, &con->disconnect_msg);
> +		else
> +			/*
> +			 * In case session was not created we can safely
> +			 * try to start destroy not involving tx thread.
> +			 */
> +			iproto_connection_try_to_start_destroy(con);
> +		return;
>   	} else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
>   		iproto_connection_try_to_start_destroy(con);
>   	} else {
> @@ -809,6 +841,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>   	assert(rlist_empty(&con->in_stop_list));
>   	int n_requests = 0;
>   	bool stop_input = false;
> +	bool obuf_in_iproto = (con->session == NULL);
>   	const char *errmsg;
>   	while (con->parse_size != 0 && !stop_input) {
>   		if (iproto_check_msg_max()) {
> @@ -853,12 +886,20 @@ err_msgpack:
>   
>   		msg->len = reqend - reqstart; /* total request length */
>   
> -		iproto_msg_decode(msg, &pos, reqend, &stop_input);
> -		/*
> -		 * This can't throw, but should not be
> -		 * done in case of exception.
> -		 */
> -		cpipe_push_input(&tx_pipe, &msg->base);
> +		if (iproto_msg_decode(msg, &pos, reqend, &stop_input) != -1) {
> +			if (obuf_in_iproto) {
> +				/*
> +				 * If session was not created yet and obuf is
> +				 * still in iproto we need to destroy it. New
> +				 * one will be created in tx thread if needed.
> +				 */
> +				obuf_destroy(&con->obuf[0]);
> +				obuf_destroy(&con->obuf[1]);
> +				obuf_in_iproto = false;
> +			}
> +			cpipe_push_input(&tx_pipe, &msg->base);
> +		}
> +
>   		n_requests++;
>   		/* Request is parsed */
>   		assert(reqend > reqstart);
> @@ -1105,8 +1146,8 @@ iproto_connection_new(int fd)
>   	ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
>   	ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
>   	ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
> -	obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
> -	obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
> +	obuf_create(&con->obuf[0], &iproto_slabc, iproto_readahead);
> +	obuf_create(&con->obuf[1], &iproto_slabc, iproto_readahead);
>   	con->p_ibuf = &con->ibuf[0];
>   	con->tx.p_obuf = &con->obuf[0];
>   	iproto_wpos_create(&con->wpos, con->tx.p_obuf);
> @@ -1114,6 +1155,7 @@ iproto_connection_new(int fd)
>   	con->parse_size = 0;
>   	con->long_poll_count = 0;
>   	con->session = NULL;
> +	con->init_failed = false;
>   	rlist_create(&con->in_stop_list);
>   	/* It may be very awkward to allocate at close. */
>   	cmsg_init(&con->destroy_msg, destroy_route);
> @@ -1134,10 +1176,6 @@ iproto_connection_delete(struct iproto_connection *con)
>   	assert(!evio_has_fd(&con->input));
>   	assert(con->session == NULL);
>   	assert(con->state == IPROTO_CONNECTION_DESTROYED);
> -	/*
> -	 * The output buffers must have been deleted
> -	 * in tx thread.
> -	 */
>   	ibuf_destroy(&con->ibuf[0]);
>   	ibuf_destroy(&con->ibuf[1]);
>   	assert(con->obuf[0].pos == 0 &&
> @@ -1172,6 +1210,9 @@ tx_reply_error(struct iproto_msg *msg);
>   static void
>   tx_reply_iproto_error(struct cmsg *m);
>   
> +static void
> +tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos);
> +
>   static void
>   net_send_msg(struct cmsg *msg);
>   
> @@ -1244,7 +1285,7 @@ static const struct cmsg_hop error_route[] = {
>   	{ net_send_error, NULL },
>   };
>   
> -static void
> +static int
>   iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>   		  bool *stop_input)
>   {
> @@ -1314,13 +1355,27 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>   			 (uint32_t) type);

- You have three possible return values. Maybe add a function
description?

>   		goto error;
>   	}
> -	return;
> +	return 0;
>   error:
>   	/** Log and send the error. */
>   	diag_log();
>   	diag_create(&msg->diag);
>   	diag_move(&fiber()->diag, &msg->diag);
> -	cmsg_init(&msg->base, error_route);
> +	if (msg->connection->session != NULL) {
> +		cmsg_init(&msg->base, error_route);
> +		return 1;
> +	}
> +	/*
> +	 * In case session was not created we can process error path
> +	 * without tx thread.
> +	 */
> +	tx_accept_wpos(msg->connection, &msg->wpos);
> +	struct obuf *out = msg->connection->tx.p_obuf;
> +	iproto_reply_error(out, diag_last_error(&msg->diag),
> +			   msg->header.sync, ::schema_version);
> +	iproto_wpos_create(&msg->wpos, out);
> +	net_send_msg(&(msg->base));
> +	return -1;
>   }

- I meant something like:
	/** Log and send the error. */
	diag_log();
	if (msg->connection->session != NULL) {
		diag_create(&msg->diag);
		diag_move(&fiber()->diag, &msg->diag);
		cmsg_init(&msg->base, error_route);
		return 1;
	}
	/*
	 * In case session was not created we can process error path
	 * without tx thread.
	 */
	tx_accept_wpos(msg->connection, &msg->wpos);
	struct obuf *out = msg->connection->tx.p_obuf;
	iproto_reply_error(out, diag_last_error(&fiber()->diag),
			   msg->header.sync, ::schema_version);
	iproto_wpos_create(&msg->wpos, out);
	net_send_msg(&(msg->base));
	return -1;

>   
>   static void
> @@ -1382,10 +1437,6 @@ tx_process_destroy(struct cmsg *m)
>   		session_destroy(con->session);
>   		con->session = NULL; /* safety */
>   	}
> -	/*
> -	 * Got to be done in iproto thread since
> -	 * that's where the memory is allocated.
> -	 */
>   	obuf_destroy(&con->obuf[0]);
>   	obuf_destroy(&con->obuf[1]);
>   }
> @@ -1478,13 +1529,27 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
>   	}
>   }
>   
> -static inline struct iproto_msg *
> -tx_accept_msg(struct cmsg *m)
> +static inline int
> +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg)
>   {
> -	struct iproto_msg *msg = (struct iproto_msg *) m;
> -	tx_accept_wpos(msg->connection, &msg->wpos);
> -	tx_fiber_init(msg->connection->session, msg->header.sync);
> -	return msg;
> +	*msg = (struct iproto_msg *) m;
> +	/*
> +	 * In case connection init failed we don't need to try anymore.
> +	 */
> +	if ((*msg)->connection->init_failed)
> +		return -1;
> +	/*
> +	 * In case session was not created we need to init connection in tx and
> +	 * create it here.
> +	 */
> +	if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) {
> +		(*msg)->connection->init_failed = true;
> +		(*msg)->close_connection = true;
> +		return -1;
> +	}
> +	tx_accept_wpos((*msg)->connection, &(*msg)->wpos);
> +	tx_fiber_init((*msg)->connection->session, (*msg)->header.sync);
> +	return 0;
>   }
>   
>   /**
> @@ -1507,7 +1572,14 @@ tx_reply_error(struct iproto_msg *msg)
>   static void
>   tx_reply_iproto_error(struct cmsg *m)
>   {
> -	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct iproto_msg *msg;
> +	/*
> +	 * We don't need to check tx_accept_msg() return value here
> +	 * as far as if we might only process iproto error in tx
> +	 * in case connection session is already created and
> +	 * thus tx_accept_msg() can't fail.
> +	 */
> +	tx_accept_msg(m, &msg);
>   	struct obuf *out = msg->connection->tx.p_obuf;
>   	iproto_reply_error(out, diag_last_error(&msg->diag),
>   			   msg->header.sync, ::schema_version);
> @@ -1527,7 +1599,9 @@ tx_inject_delay(void)
>   static void
>   tx_process1(struct cmsg *m)
>   {
> -	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct iproto_msg *msg;
> +	if (tx_accept_msg(m, &msg) != 0)
> +		goto error;
>   	if (tx_check_schema(msg->header.schema_version))
>   		goto error;
>   
> @@ -1553,17 +1627,20 @@ error:
>   static void
>   tx_process_select(struct cmsg *m)
>   {
> -	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct iproto_msg *msg;
> +	struct request *req;
>   	struct obuf *out;
>   	struct obuf_svp svp;
>   	struct port port;
>   	int count;
>   	int rc;
> -	struct request *req = &msg->dml;
> +	if (tx_accept_msg(m, &msg) != 0)
> +		goto error;
>   	if (tx_check_schema(msg->header.schema_version))
>   		goto error;
>   
>   	tx_inject_delay();
> +	req = &msg->dml;
>   	rc = box_select(req->space_id, req->index_id,
>   			req->iterator, req->offset, req->limit,
>   			req->key, req->key_end, &port);
> @@ -1607,7 +1684,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event)
>   static void
>   tx_process_call(struct cmsg *m)
>   {
> -	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct iproto_msg *msg;
> +	if (tx_accept_msg(m, &msg) != 0)
> +		goto error;
>   	if (tx_check_schema(msg->header.schema_version))
>   		goto error;
>   
> @@ -1686,13 +1765,15 @@ error:
>   static void
>   tx_process_misc(struct cmsg *m)
>   {
> -	struct iproto_msg *msg = tx_accept_msg(m);
> -	struct iproto_connection *con = msg->connection;
> -	struct obuf *out = con->tx.p_obuf;
> +	struct iproto_msg *msg;
> +	if (tx_accept_msg(m, &msg) != 0)
> +		goto error;
>   	if (tx_check_schema(msg->header.schema_version))
>   		goto error;
>   
>   	try {
> +		struct iproto_connection *con = msg->connection;
> +		struct obuf *out = con->tx.p_obuf;
>   		struct ballot ballot;
>   		switch (msg->header.type) {
>   		case IPROTO_AUTH:
> @@ -1729,7 +1810,7 @@ error:
>   static void
>   tx_process_sql(struct cmsg *m)
>   {
> -	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct iproto_msg *msg;
>   	struct obuf *out;
>   	struct port port;
>   	struct sql_bind *bind = NULL;
> @@ -1738,6 +1819,8 @@ tx_process_sql(struct cmsg *m)
>   	uint32_t len;
>   	bool is_unprepare = false;
>   
> +	if (tx_accept_msg(m, &msg) != 0)
> +		goto error;
>   	if (tx_check_schema(msg->header.schema_version))
>   		goto error;
>   	assert(msg->header.type == IPROTO_EXECUTE ||
> @@ -1825,7 +1908,11 @@ error:
>   static void
>   tx_process_replication(struct cmsg *m)
>   {
> -	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct iproto_msg *msg;
> +	if (tx_accept_msg(m, &msg) != 0) {
> +		tx_reply_error(msg);
> +		return;
> +	}
>   	struct iproto_connection *con = msg->connection;
>   	struct ev_io io;
>   	coio_create(&io, con->input.fd);
> @@ -1865,6 +1952,29 @@ tx_process_replication(struct cmsg *m)
>   	}
>   }
>   
> +/**
> + * Check connection health and try to send an error to the client
> + * in case of internal connection init or on_connect trigger failure.
> + */
> +static bool
> +iproto_connection_fail(struct iproto_msg *msg)
> +{
> +	if (!msg->close_connection)
> +		return false;
> +	struct iproto_connection *con = msg->connection;
> +	int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov,
> +				 obuf_iovcnt(msg->wpos.obuf));
> +	if (nwr > 0) {
> +		/* Count statistics. */
> +		rmean_collect(rmean_net, IPROTO_SENT, nwr);
> +	} else if (nwr < 0 && ! sio_wouldblock(errno)) {
> +		diag_log();
> +	}
> +	iproto_connection_close(con);
> +	iproto_msg_delete(msg);
> +	return true;
> +}
> +
>   static void
>   net_send_msg(struct cmsg *m)
>   {
> @@ -1881,6 +1991,9 @@ net_send_msg(struct cmsg *m)
>   	}
>   	con->wend = msg->wpos;
>   
> +	if (iproto_connection_fail(msg))
> +		return;
> +
>   	if (evio_has_fd(&con->output)) {
>   		if (! ev_is_active(&con->output))
>   			ev_feed_event(con->loop, &con->output, EV_WRITE);
> @@ -1910,6 +2023,10 @@ net_end_join(struct cmsg *m)
>   	struct iproto_connection *con = msg->connection;
>   
>   	msg->p_ibuf->rpos += msg->len;
> +
> +	if (iproto_connection_fail(msg))
> +		return;
> +
>   	iproto_msg_delete(msg);
>   
>   	assert(! ev_is_active(&con->input));
> @@ -1928,6 +2045,10 @@ net_end_subscribe(struct cmsg *m)
>   	struct iproto_connection *con = msg->connection;
>   
>   	msg->p_ibuf->rpos += msg->len;
> +
> +	if (iproto_connection_fail(msg))
> +		return;
> +
>   	iproto_msg_delete(msg);
>   
>   	assert(! ev_is_active(&con->input));
> @@ -1936,81 +2057,60 @@ net_end_subscribe(struct cmsg *m)
>   }
>   
>   /**
> - * Handshake a connection: invoke the on-connect trigger
> - * and possibly authenticate. Try to send the client an error
> - * upon a failure.
> + * Handshake a connection: prepare greeting for it.
>    */
>   static void
> -tx_process_connect(struct cmsg *m)
> +iproto_process_connect(struct iproto_msg *msg)
>   {
> -	struct iproto_msg *msg = (struct iproto_msg *) m;
>   	struct iproto_connection *con = msg->connection;
>   	struct obuf *out = msg->connection->tx.p_obuf;
> -	try {              /* connect. */
> -		con->session = session_create(SESSION_TYPE_BINARY);
> -		if (con->session == NULL)
> -			diag_raise();
> -		con->session->meta.connection = con;
> -		tx_fiber_init(con->session, 0);
> -		char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
> -		/* TODO: dirty read from tx thread */
> -		struct tt_uuid uuid = INSTANCE_UUID;
> -		random_bytes(con->salt, IPROTO_SALT_SIZE);
> -		greeting_encode(greeting, tarantool_version_id(), &uuid,
> -				con->salt, IPROTO_SALT_SIZE);
> -		obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
> -		if (! rlist_empty(&session_on_connect)) {
> -			if (session_run_on_connect_triggers(con->session) != 0)
> -				diag_raise();
> -		}
> +	char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
> +	/* TODO: dirty read from tx thread */
> +	struct tt_uuid uuid = INSTANCE_UUID;
> +	random_bytes(con->salt, IPROTO_SALT_SIZE);
> +	greeting_encode(greeting, tarantool_version_id(), &uuid,
> +			con->salt, IPROTO_SALT_SIZE);
> +	if (obuf_dup(out, greeting, IPROTO_GREETING_SIZE)
> +				 != IPROTO_GREETING_SIZE) {
> +		diag_set(OutOfMemory, IPROTO_GREETING_SIZE,
> +			 "greeting obuf", "dup");
> +		iproto_reply_error(out, diag_last_error(&fiber()->diag),
> +				   msg->header.sync, ::schema_version);
>   		iproto_wpos_create(&msg->wpos, out);
> -	} catch (Exception *e) {
> -		tx_reply_error(msg);
>   		msg->close_connection = true;
> -	}
> -}
> -
> -/**
> - * Send a response to connect to the client or close the
> - * connection in case on_connect trigger failed.
> - */
> -static void
> -net_send_greeting(struct cmsg *m)
> -{
> -	struct iproto_msg *msg = (struct iproto_msg *) m;
> -	struct iproto_connection *con = msg->connection;
> -	if (msg->close_connection) {
> -		struct obuf *out = msg->wpos.obuf;
> -		int64_t nwr = sio_writev(con->output.fd, out->iov,
> -					 obuf_iovcnt(out));
> -
> -		if (nwr > 0) {
> -			/* Count statistics. */
> -			rmean_collect(rmean_net, IPROTO_SENT, nwr);
> -		} else if (nwr < 0 && ! sio_wouldblock(errno)) {
> -			diag_log();
> -		}
> -		assert(iproto_connection_is_idle(con));
> -		iproto_connection_close(con);
> -		iproto_msg_delete(msg);
> +		iproto_connection_fail(msg);
>   		return;
>   	}
> +	iproto_wpos_create(&msg->wpos, out);
>   	con->wend = msg->wpos;
> -	/*
> -	 * Connect is synchronous, so no one could have been
> -	 * messing up with the connection while it was in
> -	 * progress.
> -	 */
>   	assert(evio_has_fd(&con->output));
> -	/* Handshake OK, start reading input. */
>   	ev_feed_event(con->loop, &con->output, EV_WRITE);
>   	iproto_msg_delete(msg);
>   }
>   
> -static const struct cmsg_hop connect_route[] = {
> -	{ tx_process_connect, &net_pipe },
> -	{ net_send_greeting, NULL },
> -};
> +static int
> +tx_init_connect(struct iproto_msg *msg)
> +{
> +	struct iproto_connection *con = msg->connection;
> +	obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
> +	obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
> +	con->tx.p_obuf = &con->obuf[0];
> +	iproto_wpos_create(&con->wpos, con->tx.p_obuf);
> +	iproto_wpos_create(&con->wend, con->tx.p_obuf);
> +
> +	con->session = session_create(SESSION_TYPE_BINARY);
> +	if (con->session == NULL)
> +		return -1;
> +	con->session->meta.connection = con;
> +
> +	tx_fiber_init(con->session, 0);
> +	if (! rlist_empty(&session_on_connect)) {
> +		if (session_run_on_connect_triggers(con->session) != 0)
> +			return -1;
> +	}
> +
> +	return 0;
> +}
>   
>   /** }}} */
>   
> @@ -2037,11 +2137,10 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
>   		mempool_free(&iproto_connection_pool, con);
>   		return -1;
>   	}
> -	cmsg_init(&msg->base, connect_route);
>   	msg->p_ibuf = con->p_ibuf;
>   	msg->wpos = con->wpos;
>   	msg->close_connection = false;
> -	cpipe_push(&tx_pipe, &msg->base);
> +	iproto_process_connect(msg);
>   	return 0;
>   }
>   
> @@ -2054,6 +2153,8 @@ static struct evio_service binary; /* iproto binary listener */
>   static int
>   net_cord_f(va_list /* ap */)
>   {
> +	slab_cache_create(&iproto_slabc, &runtime);
> +
>   	mempool_create(&iproto_msg_pool, &cord()->slabc,
>   		       sizeof(struct iproto_msg));
>   	mempool_create(&iproto_connection_pool, &cord()->slabc,
> @@ -2297,7 +2398,8 @@ iproto_listen(const char *uri)
>   size_t
>   iproto_mem_used(void)
>   {
> -	return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc);
> +	return slab_cache_used(&net_cord.slabc)
> +	     + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc);
>   }
>   
>   size_t
> diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result
> index 5d064b7648..bfa9c2b759 100644
> --- a/test/box-py/bad_trigger.result
> +++ b/test/box-py/bad_trigger.result
> @@ -14,6 +14,7 @@ type(box.session.on_connect(f1))
>   - function
>   ...
>   greeting:  True
> +Nothing to read yet:  Resource temporarily unavailable
>   fixheader:  True
>   error code 32
>   error message:  [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value)
> diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py
> index 7d200b9218..4739dfe136 100644
> --- a/test/box-py/bad_trigger.test.py
> +++ b/test/box-py/bad_trigger.test.py
> @@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection
>   from lib.tarantool_connection import TarantoolConnection
>   from tarantool import NetworkError
>   from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \
> -    REQUEST_TYPE_ERROR
> +    REQUEST_TYPE_ERROR, REQUEST_TYPE_PING
>   import socket
>   import msgpack
>   
> @@ -26,9 +26,25 @@ s = conn.socket
>   # Read greeting
>   print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE
>   
> -# Read error packet
> +# Check socket
>   IPROTO_FIXHEADER_SIZE = 5
> -fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
> +s.setblocking(False)
> +fixheader = None
> +try:
> +    fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
> +except socket.error as err:
> +    print 'Nothing to read yet:', str(err).split(']')[1]
> +else:
> +    print 'Received fixheader'
> +s.setblocking(True)
> +
> +# Send ping
> +query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING })
> +s.send(msgpack.dumps(len(query)) + query)
> +
> +# Read error packet
> +if not fixheader:
> +    fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
>   print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE
>   unpacker.feed(fixheader)
>   packet_len = unpacker.unpack()
> 

  reply	other threads:[~2020-09-21 15:52 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-09-18 13:09 Ilya Kosarev
2020-09-21 15:51 ` Leonid Vasiliev [this message]
2020-09-25 22:57   ` Ilya Kosarev

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=280d8634-36ce-611c-faab-c8e887c5f74c@tarantool.org \
    --to=lvasiliev@tarantool.org \
    --cc=alyapunov@tarantool.org \
    --cc=i.kosarev@tarantool.org \
    --cc=tarantool-patches@dev.tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH v2] iproto: make iproto thread more independent from tx' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox