Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v7 0/3] iproto: greeting enhancement
@ 2020-12-19 17:39 Ilya Kosarev
  2020-12-19 17:39 ` [Tarantool-patches] [PATCH v7 1/3] iproto: move msg fields initialization to iproto_msg_new() Ilya Kosarev
                   ` (2 more replies)
  0 siblings, 3 replies; 10+ messages in thread
From: Ilya Kosarev @ 2020-12-19 17:39 UTC (permalink / raw)
  To: v.shpilevoy, alyapunov; +Cc: tarantool-patches

These patches move greeting from tx thread to iproto and introduce
minor improvements into iproto logic.

Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto
Issue: https://github.com/tarantool/tarantool/issues/3776

Changes in v2:
 - docbot request provided
 - ChangeLog provided
 - net_send_msg() used instead of net_send_error() where needed
 - error cases made clear in iproto_msg_decode()
 - replaced net_check_connection() with iproto_connection_fail() predicate
 - improved tx session initialization fail processing to avoid extra tries
 - fixed obuf_dup() fail processing in iproto_process_connect()
 - added some comments
 - some comments style fixed

Changes in v3:
 - added braces to the confusing one-line if statement
 - updated ChangeLog
 - added description for iproto_msg_decode()
 - simplified error processing in iproto_msg_decode()

Changes in v4:
 - fixed msg->close_connection initialization bug
 - fixed false-sharing problem in iproto_connection struct
 - added needed assertion
 - added needed comments
 - names refactoring
 - simplified patch a bit: removed extra return value, extra 
 
Changes in v5:
 - reworked to avoid lazy initialization and extra changes

Changes in v6:
 - some changes are picket out into separate commits
 - main commit message rewrited to become more comprehensive
 - removed some extra changes
 - style fixes
 - greeting allocation fixed

Changes in v7:
 - fixed functions signatures & added comments
 - partial greeting write taken into account
 - added needed state

Ilya Kosarev (3):
  iproto: move msg fields initialization to iproto_msg_new()
  iproto: fix comment and add assert on destruction
  iproto: move greeting from tx thread to iproto

 src/box/iproto.cc                             | 141 +++++++++++++-----
 test/app/gh-4787-netbox-empty-errmsg.result   |  18 ---
 test/app/gh-4787-netbox-empty-errmsg.test.lua |   8 -
 3 files changed, 107 insertions(+), 60 deletions(-)

-- 
2.17.1

^ permalink raw reply	[flat|nested] 10+ messages in thread

* [Tarantool-patches] [PATCH v7 1/3] iproto: move msg fields initialization to iproto_msg_new()
  2020-12-19 17:39 [Tarantool-patches] [PATCH v7 0/3] iproto: greeting enhancement Ilya Kosarev
@ 2020-12-19 17:39 ` Ilya Kosarev
  2020-12-19 17:39 ` [Tarantool-patches] [PATCH v7 2/3] iproto: fix comment and add assert on destruction Ilya Kosarev
  2020-12-19 17:39 ` [Tarantool-patches] [PATCH v7 3/3] iproto: move greeting from tx thread to iproto Ilya Kosarev
  2 siblings, 0 replies; 10+ messages in thread
From: Ilya Kosarev @ 2020-12-19 17:39 UTC (permalink / raw)
  To: v.shpilevoy, alyapunov; +Cc: tarantool-patches

msg->close_connection flag was only initialized in iproto_on_accept()
while other struct iproto_msg fields are being initialized in
iproto_msg_new(). It is potentially dangerous for new logic involving
msg->close_connection flag, so it is now moved to iproto_msg_new().

Part of #3776
---
 src/box/iproto.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index b8f65e5eca..6a1e509225 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -565,6 +565,7 @@ iproto_msg_new(struct iproto_connection *con)
 			 "connection %s", sio_socketname(con->input.fd));
 		return NULL;
 	}
+	msg->close_connection = false;
 	msg->connection = con;
 	rmean_collect(rmean_net, IPROTO_REQUESTS, 1);
 	return msg;
@@ -2040,7 +2041,6 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
 	cmsg_init(&msg->base, connect_route);
 	msg->p_ibuf = con->p_ibuf;
 	msg->wpos = con->wpos;
-	msg->close_connection = false;
 	cpipe_push(&tx_pipe, &msg->base);
 	return 0;
 }
-- 
2.17.1

^ permalink raw reply	[flat|nested] 10+ messages in thread

* [Tarantool-patches] [PATCH v7 2/3] iproto: fix comment and add assert on destruction
  2020-12-19 17:39 [Tarantool-patches] [PATCH v7 0/3] iproto: greeting enhancement Ilya Kosarev
  2020-12-19 17:39 ` [Tarantool-patches] [PATCH v7 1/3] iproto: move msg fields initialization to iproto_msg_new() Ilya Kosarev
@ 2020-12-19 17:39 ` Ilya Kosarev
  2020-12-19 17:39 ` [Tarantool-patches] [PATCH v7 3/3] iproto: move greeting from tx thread to iproto Ilya Kosarev
  2 siblings, 0 replies; 10+ messages in thread
From: Ilya Kosarev @ 2020-12-19 17:39 UTC (permalink / raw)
  To: v.shpilevoy, alyapunov; +Cc: tarantool-patches

The comment in tx_process_destroy() about obuf destroting was wrong.
Memory for them is actually allocated from tx-belonging slab cache and
tx_process_destroy() obviously happens in tx, so the comment is fixed
to reflect the reality.
It is also implied that connection is in IPROTO_CONNECTION_DESTROYED
state in tx_process_destroy(). Now it is verified with assert().

Part of #3776
---
 src/box/iproto.cc | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 6a1e509225..f7330af21d 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1379,13 +1379,14 @@ tx_process_destroy(struct cmsg *m)
 {
 	struct iproto_connection *con =
 		container_of(m, struct iproto_connection, destroy_msg);
+	assert(con->state == IPROTO_CONNECTION_DESTROYED);
 	if (con->session) {
 		session_destroy(con->session);
 		con->session = NULL; /* safety */
 	}
 	/*
-	 * Got to be done in iproto thread since
-	 * that's where the memory is allocated.
+	 * obuf is being destroyed in tx thread cause it is where
+	 * it was allocated.
 	 */
 	obuf_destroy(&con->obuf[0]);
 	obuf_destroy(&con->obuf[1]);
-- 
2.17.1

^ permalink raw reply	[flat|nested] 10+ messages in thread

* [Tarantool-patches] [PATCH v7 3/3] iproto: move greeting from tx thread to iproto
  2020-12-19 17:39 [Tarantool-patches] [PATCH v7 0/3] iproto: greeting enhancement Ilya Kosarev
  2020-12-19 17:39 ` [Tarantool-patches] [PATCH v7 1/3] iproto: move msg fields initialization to iproto_msg_new() Ilya Kosarev
  2020-12-19 17:39 ` [Tarantool-patches] [PATCH v7 2/3] iproto: fix comment and add assert on destruction Ilya Kosarev
@ 2020-12-19 17:39 ` Ilya Kosarev
  2020-12-21 16:20   ` Vladislav Shpilevoy
  2 siblings, 1 reply; 10+ messages in thread
From: Ilya Kosarev @ 2020-12-19 17:39 UTC (permalink / raw)
  To: v.shpilevoy, alyapunov; +Cc: tarantool-patches

On connection, an evio service callback is invoked to accept it. The
next step after acceptance was to process connection to tx thread
through cbus. This meant that any connection interaction involves tx
thread even before we get to send the greeting to the client.
Consequently, the client might reach the instance with enormous number
of connections, leading to the file descriptors limit exhaustion in
case of unresponsive tx thread (for example, when building secondary
index) and extra tx_process_connects afterwords, even in case the
instance doesn't fail with too many open files error.
This patch allows iproto to accept connection and send greeting by
itself. Thus the connection is being established and stays in
fetch_schema state while tx thread is unresponsive.

Closes #3776
---
 src/box/iproto.cc                             | 134 ++++++++++++++----
 test/app/gh-4787-netbox-empty-errmsg.result   |  18 ---
 test/app/gh-4787-netbox-empty-errmsg.test.lua |   8 --
 3 files changed, 103 insertions(+), 57 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index f7330af21d..93bf869299 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -356,6 +356,11 @@ enum iproto_connection_state {
 	 * takes an already active socket in a constructor.
 	 */
 	IPROTO_CONNECTION_ALIVE,
+	/**
+	 * Connection is failed in iproto, but tx doesn't know
+	 * about it yet. Used for unsuccessful greeting.
+	 */
+	IPROTO_CONNECTION_FAILED,
 	/**
 	 * Socket was closed, a notification is sent to the TX
 	 * thread to close the session.
@@ -453,6 +458,8 @@ struct iproto_connection
 	 * meaningless.
 	 */
 	size_t parse_size;
+	/** Iproto buffer used to send greeting. */
+	char greeting_buf[IPROTO_GREETING_SIZE];
 	/**
 	 * Nubmer of active long polling requests that have already
 	 * discarded their arguments in order not to stall other
@@ -679,7 +686,8 @@ iproto_connection_close(struct iproto_connection *con)
 		 */
 		con->p_ibuf->wpos -= con->parse_size;
 		cpipe_push(&tx_pipe, &con->disconnect_msg);
-		assert(con->state == IPROTO_CONNECTION_ALIVE);
+		assert(con->state == IPROTO_CONNECTION_ALIVE ||
+		       con->state == IPROTO_CONNECTION_FAILED);
 		con->state = IPROTO_CONNECTION_CLOSED;
 	} else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
 		iproto_connection_try_to_start_destroy(con);
@@ -1091,6 +1099,56 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher,
 	}
 }
 
+/**
+ * Flush greeting from it's buffer. It requires separate flush function
+ * because greeting is being sent from iproto thread which can't use obuf
+ * and thus we can send greeting without involving tx.
+ */
+static int
+iproto_connection_greeting_flush(struct iproto_connection *con)
+{
+	struct iovec greeting;
+	greeting.iov_base = &con->greeting_buf;
+	greeting.iov_len = IPROTO_GREETING_SIZE;
+	ssize_t nwr = sio_writev(con->output.fd, &greeting, 1);
+
+	if (nwr > 0) {
+		/* Count statistics */
+		rmean_collect(rmean_net, IPROTO_SENT, nwr);
+		if (nwr < IPROTO_GREETING_SIZE) {
+			diag_set(SocketError, sio_socketname(con->output.fd),
+				 "partial write %d", nwr);
+			return -1;
+		}
+		return 1;
+	} else if (nwr < 0 && !sio_wouldblock(errno)) {
+		return -1;
+	}
+
+	return 0;
+}
+
+static void
+iproto_connection_on_greeting(ev_loop *loop, struct ev_io *watcher,
+			      int /* revents */)
+{
+	struct iproto_connection *con =
+		(struct iproto_connection *)watcher->data;
+	int rc = iproto_connection_greeting_flush(con);
+	if (rc <= 0) {
+		if (rc == 0) {
+			ev_io_start(loop, &con->output);
+		} else {
+			diag_log();
+			con->state = IPROTO_CONNECTION_FAILED;
+		}
+		return;
+	}
+	ev_io_stop(con->loop, &con->output);
+	ev_io_init(&con->output, iproto_connection_on_output,
+		   con->output.fd, EV_WRITE);
+}
+
 static struct iproto_connection *
 iproto_connection_new(int fd)
 {
@@ -1103,7 +1161,7 @@ iproto_connection_new(int fd)
 	con->input.data = con->output.data = con;
 	con->loop = loop();
 	ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
-	ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
+	ev_io_init(&con->output, iproto_connection_on_greeting, fd, EV_WRITE);
 	ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
 	ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
 	obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
@@ -1938,50 +1996,63 @@ net_end_subscribe(struct cmsg *m)
 }
 
 /**
- * Handshake a connection: invoke the on-connect trigger
- * and possibly authenticate. Try to send the client an error
- * upon a failure.
+ * Handshake a connection: send greeting for it.
+ */
+static void
+iproto_process_connect(struct iproto_connection *con)
+{
+	/*
+	 * INSTANCE_UUID is guaranteed to be inited before this moment.
+	 * We start listening either in local_recovery() or bootstrap().
+	 * The INSTANCE_UUID is ensured to be inited in the beginning of
+	 * both methods. In case of local_recovery() it is verified that
+	 * INSTANCE_UUID was read from the snapshot in memtx_engine_new().
+	 * In bootstrap() INSTANCE_UUID is either taken from the
+	 * instance_uuid box.cfg{} param or created on the spot.
+	 */
+	random_bytes(con->salt, IPROTO_SALT_SIZE);
+	greeting_encode(con->greeting_buf, tarantool_version_id(),
+			&INSTANCE_UUID, con->salt, IPROTO_SALT_SIZE);
+	assert(evio_has_fd(&con->output));
+	ev_feed_event(con->loop, &con->output, EV_WRITE);
+}
+
+/**
+ * Create the session and invoke the on_connect triggers.
  */
 static void
 tx_process_connect(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
-	struct obuf *out = msg->connection->tx.p_obuf;
-	try {              /* connect. */
-		con->session = session_create(SESSION_TYPE_BINARY);
-		if (con->session == NULL)
-			diag_raise();
-		con->session->meta.connection = con;
-		tx_fiber_init(con->session, 0);
-		char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
-		/* TODO: dirty read from tx thread */
-		struct tt_uuid uuid = INSTANCE_UUID;
-		random_bytes(con->salt, IPROTO_SALT_SIZE);
-		greeting_encode(greeting, tarantool_version_id(), &uuid,
-				con->salt, IPROTO_SALT_SIZE);
-		obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
-		if (! rlist_empty(&session_on_connect)) {
-			if (session_run_on_connect_triggers(con->session) != 0)
-				diag_raise();
-		}
-		iproto_wpos_create(&msg->wpos, out);
-	} catch (Exception *e) {
+
+	con->session = session_create(SESSION_TYPE_BINARY);
+	if (con->session == NULL) {
 		tx_reply_error(msg);
 		msg->close_connection = true;
+		return;
+	}
+	con->session->meta.connection = con;
+
+	tx_fiber_init(con->session, 0);
+	if (! rlist_empty(&session_on_connect)) {
+		if (session_run_on_connect_triggers(con->session) != 0) {
+			tx_reply_error(msg);
+			msg->close_connection = true;
+		}
 	}
 }
 
 /**
- * Send a response to connect to the client or close the
- * connection in case on_connect trigger failed.
+ * Try to send the client an error upon a failure. Start reading
+ * input in case the connection is inited and all good.
  */
 static void
-net_send_greeting(struct cmsg *m)
+net_finish_connect(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
-	if (msg->close_connection) {
+	if (msg->close_connection || con->state == IPROTO_CONNECTION_FAILED) {
 		struct obuf *out = msg->wpos.obuf;
 		int64_t nwr = sio_writev(con->output.fd, out->iov,
 					 obuf_iovcnt(out));
@@ -2005,13 +2076,13 @@ net_send_greeting(struct cmsg *m)
 	 */
 	assert(evio_has_fd(&con->output));
 	/* Handshake OK, start reading input. */
-	ev_feed_event(con->loop, &con->output, EV_WRITE);
+	ev_feed_event(con->loop, &con->input, EV_READ);
 	iproto_msg_delete(msg);
 }
 
 static const struct cmsg_hop connect_route[] = {
 	{ tx_process_connect, &net_pipe },
-	{ net_send_greeting, NULL },
+	{ net_finish_connect, NULL },
 };
 
 /** }}} */
@@ -2042,6 +2113,7 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
 	cmsg_init(&msg->base, connect_route);
 	msg->p_ibuf = con->p_ibuf;
 	msg->wpos = con->wpos;
+	iproto_process_connect(con);
 	cpipe_push(&tx_pipe, &msg->base);
 	return 0;
 }
diff --git a/test/app/gh-4787-netbox-empty-errmsg.result b/test/app/gh-4787-netbox-empty-errmsg.result
index d30337a050..6389b27bc8 100644
--- a/test/app/gh-4787-netbox-empty-errmsg.result
+++ b/test/app/gh-4787-netbox-empty-errmsg.result
@@ -38,24 +38,6 @@ req_during_auth()
  | - Connection is not established, state is "auth"
  | ...
 
--- Check the same for 'initial' state.
-ok, err = nil
- | ---
- | ...
-do                                                                              \
-    c = netbox.connect(box.cfg.listen, {wait_connected = false})                \
-    ok, err = pcall(c.call, c, 'echo', {}, {is_async = true})                   \
-end
- | ---
- | ...
-ok, err
- | ---
- | - false
- | - Connection is not established, state is "initial"
- | ...
-c:close()
- | ---
- | ...
 box.schema.user.drop('test')
  | ---
  | ...
diff --git a/test/app/gh-4787-netbox-empty-errmsg.test.lua b/test/app/gh-4787-netbox-empty-errmsg.test.lua
index 0eecaa1bf0..55ea43f26f 100755
--- a/test/app/gh-4787-netbox-empty-errmsg.test.lua
+++ b/test/app/gh-4787-netbox-empty-errmsg.test.lua
@@ -21,12 +21,4 @@ end
 
 req_during_auth()
 
--- Check the same for 'initial' state.
-ok, err = nil
-do                                                                              \
-    c = netbox.connect(box.cfg.listen, {wait_connected = false})                \
-    ok, err = pcall(c.call, c, 'echo', {}, {is_async = true})                   \
-end
-ok, err
-c:close()
 box.schema.user.drop('test')
-- 
2.17.1

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [Tarantool-patches] [PATCH v7 3/3] iproto: move greeting from tx thread to iproto
  2020-12-19 17:39 ` [Tarantool-patches] [PATCH v7 3/3] iproto: move greeting from tx thread to iproto Ilya Kosarev
@ 2020-12-21 16:20   ` Vladislav Shpilevoy
  2020-12-22  0:50     ` Ilya Kosarev
  0 siblings, 1 reply; 10+ messages in thread
From: Vladislav Shpilevoy @ 2020-12-21 16:20 UTC (permalink / raw)
  To: Ilya Kosarev, alyapunov; +Cc: tarantool-patches

Hi! Thanks for the fixes!

Unfortunately, it seems the patch does not help. Probably it did earlier,
but now the descriptors are not closed.

The example you use in the ticket is not really hard enough - you
use only one connection, and you treat 'fetch_schema' as success.
Also you assume the connection won't be re-created repeatedly by the
external code.

To test it more thoroughly I used these scripts:

Instance 1:

	box.cfg({listen = 3301})
	box.schema.user.grant('guest', 'super')
	while true do os.execute("sleep 1000") end

I used big sleep so as the instance wouldn't have a chance to handle
the pending connection requests.

Instance 2:

	fiber = require('fiber')
	netbox = require('net.box')
	host = 'localhost:3301'
	count = 0

	function many_connect()
	    while true do
	        local c = netbox.connect(host, {wait_connected = false})
	        while c.state ~= 'fetch_schema' do fiber.yield() end
	        count = count + 1
	        c:close()
	    end
	end

	f1 = fiber.new(many_connect)
	f2 = fiber.new(many_connect)
	f3 = fiber.new(many_connect)
	f4 = fiber.new(many_connect)

I create 4 fibers to reach the descriptor limit faster. You
can try more or less, but with big enough sleep() timeout the
result will be the same. On my machine with 4 fibers I hit the
descriptor limit (2560 in my case) almost immediately.

Each fiber does a totally normal thing - creates a connection and
closes it. Lets assume, they also tried to execute some request,
it failed a timeout, and they decide to re-create the connection.

If the descriptors would be closed on Instance 1, these fibers
would work fine, and 'count' would grow until Instance 1 stops
the sleep(). Or until it will hit OOM due to too many heavy
struct iproto_connection objects, which is not addressed here, but
we agreed to do it separately, so it is ok for now maybe.

But what happens is that it prints

	SystemError accept, called on fd 16, aka [::]:3301: Too many open files

And no new connections can be accepted. I killed Instance 2 and
started it again - it starts failing immediately. Which means the
descriptors are not closed.

> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index f7330af21d..93bf869299 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc> @@ -1938,50 +1996,63 @@ net_end_subscribe(struct cmsg *m)
> +
> +/**
> + * Create the session and invoke the on_connect triggers.
>   */
>  static void
>  tx_process_connect(struct cmsg *m)
>  {
>  	struct iproto_msg *msg = (struct iproto_msg *) m;
>  	struct iproto_connection *con = msg->connection;
> -	struct obuf *out = msg->connection->tx.p_obuf;
> -	try {              /* connect. */
> -		con->session = session_create(SESSION_TYPE_BINARY);
> -		if (con->session == NULL)
> -			diag_raise();
> -		con->session->meta.connection = con;
> -		tx_fiber_init(con->session, 0);
> -		char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
> -		/* TODO: dirty read from tx thread */
> -		struct tt_uuid uuid = INSTANCE_UUID;
> -		random_bytes(con->salt, IPROTO_SALT_SIZE);
> -		greeting_encode(greeting, tarantool_version_id(), &uuid,
> -				con->salt, IPROTO_SALT_SIZE);
> -		obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
> -		if (! rlist_empty(&session_on_connect)) {
> -			if (session_run_on_connect_triggers(con->session) != 0)
> -				diag_raise();
> -		}
> -		iproto_wpos_create(&msg->wpos, out);
> -	} catch (Exception *e) {
> +
> +	con->session = session_create(SESSION_TYPE_BINARY);
> +	if (con->session == NULL) {
>  		tx_reply_error(msg);
>  		msg->close_connection = true;
> +		return;
> +	}
> +	con->session->meta.connection = con;
> +
> +	tx_fiber_init(con->session, 0);
> +	if (! rlist_empty(&session_on_connect)) {

In the new code we don't use ' ' after unary operators.

> +		if (session_run_on_connect_triggers(con->session) != 0) {
> +			tx_reply_error(msg);
> +			msg->close_connection = true;
> +		}
>  	}
>  }

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [Tarantool-patches] [PATCH v7 3/3] iproto: move greeting from tx thread to iproto
  2020-12-21 16:20   ` Vladislav Shpilevoy
@ 2020-12-22  0:50     ` Ilya Kosarev
  0 siblings, 0 replies; 10+ messages in thread
From: Ilya Kosarev @ 2020-12-22  0:50 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 4737 bytes --]


Hi!

Sent v8 of the patch. It includes improvement to fix the problem. 
>Понедельник, 21 декабря 2020, 19:20 +03:00 от Vladislav Shpilevoy <v.shpilevoy@tarantool.org>:
> 
>Hi! Thanks for the fixes!
>
>Unfortunately, it seems the patch does not help. Probably it did earlier,
>but now the descriptors are not closed.
>
>The example you use in the ticket is not really hard enough - you
>use only one connection, and you treat 'fetch_schema' as success.
>Also you assume the connection won't be re-created repeatedly by the
>external code.
>
>To test it more thoroughly I used these scripts:
>
>Instance 1:
>
>box.cfg({listen = 3301})
>box.schema.user.grant('guest', 'super')
>while true do os.execute("sleep 1000") end
>
>I used big sleep so as the instance wouldn't have a chance to handle
>the pending connection requests.
>
>Instance 2:
>
>fiber = require('fiber')
>netbox = require('net.box')
>host = 'localhost:3301'
>count = 0
>
>function many_connect()
>while true do
>local c = netbox.connect(host, {wait_connected = false})
>while c.state ~= 'fetch_schema' do fiber.yield() end
>count = count + 1
>c:close()
>end
>end
>
>f1 = fiber.new(many_connect)
>f2 = fiber.new(many_connect)
>f3 = fiber.new(many_connect)
>f4 = fiber.new(many_connect)
>
>I create 4 fibers to reach the descriptor limit faster. You
>can try more or less, but with big enough sleep() timeout the
>result will be the same. On my machine with 4 fibers I hit the
>descriptor limit (2560 in my case) almost immediately.
>
>Each fiber does a totally normal thing - creates a connection and
>closes it. Lets assume, they also tried to execute some request,
>it failed a timeout, and they decide to re-create the connection.
>
>If the descriptors would be closed on Instance 1, these fibers
>would work fine, and 'count' would grow until Instance 1 stops
>the sleep(). Or until it will hit OOM due to too many heavy
>struct iproto_connection objects, which is not addressed here, but
>we agreed to do it separately, so it is ok for now maybe.
>
>But what happens is that it prints
>
>SystemError accept, called on fd 16, aka [::]:3301: Too many open files
>
>And no new connections can be accepted. I killed Instance 2 and
>started it again - it starts failing immediately. Which means the
>descriptors are not closed.
Yes, all true. I actually forgot that iproto has to be able to
close the socket by itself. Well, now it is fixed. The idea
is that we actually need to start reading immediately when
the greeting is sent. Then we only need to care about not
processing extra request on actually closed connection.
It is achieved through patching of tx_accept_msg() and
it’s callees. So in case the connections closes, the
iproto_connection_close() in invoked and it closes the socket.
When everything finally moves to the tx, all requests are being
ignored depending on the the connection state. Fixed in v8 of the patch.
>
>> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
>> index f7330af21d..93bf869299 100644
>> --- a/src/box/iproto.cc
>> +++ b/src/box/iproto.cc> @@ -1938,50 +1996,63 @@ net_end_subscribe(struct cmsg *m)
>> +
>> +/**
>> + * Create the session and invoke the on_connect triggers.
>> */
>> static void
>> tx_process_connect(struct cmsg *m)
>> {
>> struct iproto_msg *msg = (struct iproto_msg *) m;
>> struct iproto_connection *con = msg->connection;
>> - struct obuf *out = msg->connection->tx.p_obuf;
>> - try { /* connect. */
>> - con->session = session_create(SESSION_TYPE_BINARY);
>> - if (con->session == NULL)
>> - diag_raise();
>> - con->session->meta.connection = con;
>> - tx_fiber_init(con->session, 0);
>> - char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
>> - /* TODO: dirty read from tx thread */
>> - struct tt_uuid uuid = INSTANCE_UUID;
>> - random_bytes(con->salt, IPROTO_SALT_SIZE);
>> - greeting_encode(greeting, tarantool_version_id(), &uuid,
>> - con->salt, IPROTO_SALT_SIZE);
>> - obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
>> - if (! rlist_empty(&session_on_connect)) {
>> - if (session_run_on_connect_triggers(con->session) != 0)
>> - diag_raise();
>> - }
>> - iproto_wpos_create(&msg->wpos, out);
>> - } catch (Exception *e) {
>> +
>> + con->session = session_create(SESSION_TYPE_BINARY);
>> + if (con->session == NULL) {
>> tx_reply_error(msg);
>> msg->close_connection = true;
>> + return;
>> + }
>> + con->session->meta.connection = con;
>> +
>> + tx_fiber_init(con->session, 0);
>> + if (! rlist_empty(&session_on_connect)) {
>
>In the new code we don't use ' ' after unary operators.
Fixed in v8.
>
>> + if (session_run_on_connect_triggers(con->session) != 0) {
>> + tx_reply_error(msg);
>> + msg->close_connection = true;
>> + }
>> }
>> } 
 
 
--
Ilya Kosarev
 

[-- Attachment #2: Type: text/html, Size: 6079 bytes --]

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [Tarantool-patches] [PATCH v7 3/3] iproto: move greeting from tx thread to iproto
  2020-12-24 20:13     ` Ilya Kosarev
@ 2020-12-26 13:15       ` Vladislav Shpilevoy
  0 siblings, 0 replies; 10+ messages in thread
From: Vladislav Shpilevoy @ 2020-12-26 13:15 UTC (permalink / raw)
  To: Ilya Kosarev; +Cc: tarantool-patches

On 24.12.2020 23:13, Ilya Kosarev wrote:
>  
> Hi!
>  
> Sent v9 with fixes. Answers below.
> 
>     Вторник, 22 декабря 2020, 17:21 +03:00 от Vladislav Shpilevoy <v.shpilevoy@tarantool.org </compose?To=v.shpilevoy@tarantool.org>>:
>      
>     Thanks for the patch!
> 
>     Did you even test it?
> 
>     I used exactly the same test as in my last email and I still get
>     "too many open files".
> 
> Yes, it does print this. But it also actually closes sockets
> independently from tx.

It does not. I couldn't connect after this message started being printed.
If it would work, the variable `count` in my test would grow infinitely.

On the latest version of the branch it seems to be working though. Maybe.
On the same test. I didn't look at the code yet. Maybe it could break via
another test.

> The issue is in input stop through
> iproto_connection_stop_msg_max_limit() with
> iproto_check_msg_max() condition, which is not really
> applicable while tx is not involved, so i changed the
> condition for limitation in v9.
> 
> 
>     See 2 comments below.
> 
>     > diff --git a/src/box/iproto.cc b/src/box/iproto.cc
>     > index f7330af21d..b48a774c92 100644
>     > --- a/src/box/iproto.cc
>     > +++ b/src/box/iproto.cc
>     > @@ -1484,8 +1544,16 @@ static inline struct iproto_msg *
>     > tx_accept_msg(struct cmsg *m)
>     > {
>     > struct iproto_msg *msg = (struct iproto_msg *) m;
>     > - tx_accept_wpos(msg->connection, &msg->wpos);
>     > - tx_fiber_init(msg->connection->session, msg->header.sync);
>     > + struct iproto_connection *con = msg->connection;
>     > + if (con->state != IPROTO_CONNECTION_ALIVE) {
> 
>     1. Connection state can only be changed and read by iproto thread.
>     The variable is not protected anyhow, so you can't simply read/write
>     it in 2 threads.
> 
> Why is it not fine?

Do I really need to explain, why it is not fine to read and write a
variable from multiple threads without any protection? The most
obvious reason - you can end up reading garbage, in case something
won't be right with the alignment.

Non-obvious reason - the state can be checked to the bad state
right after you checked that it was in a good state. That makes
the check basically useless.

Another non-obvious reason - memory read/write reorderings, but I
don't know if they can hit anything in iproto code.

Please, stop doing that. You probably did already, but I didn't look
at the code, as I said. Because I am on a kind of 'vacation'.

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [Tarantool-patches] [PATCH v7 3/3] iproto: move greeting from tx thread to iproto
  2020-12-22 14:21   ` Vladislav Shpilevoy
@ 2020-12-24 20:13     ` Ilya Kosarev
  2020-12-26 13:15       ` Vladislav Shpilevoy
  0 siblings, 1 reply; 10+ messages in thread
From: Ilya Kosarev @ 2020-12-24 20:13 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 1980 bytes --]


 
Hi!
 
Sent v9 with fixes. Answers below.
>Вторник, 22 декабря 2020, 17:21 +03:00 от Vladislav Shpilevoy < v.shpilevoy@tarantool.org >:
> 
>Thanks for the patch!
>
>Did you even test it?
>
>I used exactly the same test as in my last email and I still get
>"too many open files".
Yes, it does print this. But it also actually closes sockets
independently from tx. The issue is in input stop through
iproto_connection_stop_msg_max_limit() with
iproto_check_msg_max() condition, which is not really
applicable while tx is not involved, so i changed the
condition for limitation in v9.
>
>See 2 comments below.
>
>> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
>> index f7330af21d..b48a774c92 100644
>> --- a/src/box/iproto.cc
>> +++ b/src/box/iproto.cc
>> @@ -1484,8 +1544,16 @@ static inline struct iproto_msg *
>> tx_accept_msg(struct cmsg *m)
>> {
>> struct iproto_msg *msg = (struct iproto_msg *) m;
>> - tx_accept_wpos(msg->connection, &msg->wpos);
>> - tx_fiber_init(msg->connection->session, msg->header.sync);
>> + struct iproto_connection *con = msg->connection;
>> + if (con->state != IPROTO_CONNECTION_ALIVE) {
>
>1. Connection state can only be changed and read by iproto thread.
>The variable is not protected anyhow, so you can't simply read/write
>it in 2 threads.
Why is it not fine? I think it may lead to false sharing problem, but
as far as i see it is not going to affect iproto thread, as long as we
are not writing connection state in tx, only reading.
>
>> + /*
>> + * Connection might be closed from iproto already.
>> + * No action required in this case.
>> + */
>> + return msg;
>
>2. Why do you return a message instead of NULL here, if the
>connection is already dead?
Ok, this can be simplified, fixed in v9.
>
>I ask you to spend more time on finishing the patch. If you will
>rush to "fix" it without thinking it through, the review will
>never end, and it will waste time both yours and mine. 
 
 
--
Ilya Kosarev
 
 

[-- Attachment #2: Type: text/html, Size: 3802 bytes --]

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [Tarantool-patches] [PATCH v7 3/3] iproto: move greeting from tx thread to iproto
  2020-12-22  0:50 ` [Tarantool-patches] [PATCH v7 3/3] iproto: move greeting from tx thread to iproto Ilya Kosarev
@ 2020-12-22 14:21   ` Vladislav Shpilevoy
  2020-12-24 20:13     ` Ilya Kosarev
  0 siblings, 1 reply; 10+ messages in thread
From: Vladislav Shpilevoy @ 2020-12-22 14:21 UTC (permalink / raw)
  To: Ilya Kosarev, alyapunov; +Cc: tarantool-patches

Thanks for the patch!

Did you even test it?

I used exactly the same test as in my last email and I still get
"too many open files".

See 2 comments below.

> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index f7330af21d..b48a774c92 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -1484,8 +1544,16 @@ static inline struct iproto_msg *
>  tx_accept_msg(struct cmsg *m)
>  {
>  	struct iproto_msg *msg = (struct iproto_msg *) m;
> -	tx_accept_wpos(msg->connection, &msg->wpos);
> -	tx_fiber_init(msg->connection->session, msg->header.sync);
> +	struct iproto_connection *con = msg->connection;
> +	if (con->state != IPROTO_CONNECTION_ALIVE) {

1. Connection state can only be changed and read by iproto thread.
The variable is not protected anyhow, so you can't simply read/write
it in 2 threads.

> +		/*
> +		 * Connection might be closed from iproto already.
> +		 * No action required in this case.
> +		 */
> +		return msg;

2. Why do you return a message instead of NULL here, if the
connection is already dead?

I ask you to spend more time on finishing the patch. If you will
rush to "fix" it without thinking it through, the review will
never end, and it will waste time both yours and mine.

^ permalink raw reply	[flat|nested] 10+ messages in thread

* [Tarantool-patches] [PATCH v7 3/3] iproto: move greeting from tx thread to iproto
  2020-12-22  0:49 [Tarantool-patches] [PATCH v7 0/3] iproto: greeting enhancement Ilya Kosarev
@ 2020-12-22  0:50 ` Ilya Kosarev
  2020-12-22 14:21   ` Vladislav Shpilevoy
  0 siblings, 1 reply; 10+ messages in thread
From: Ilya Kosarev @ 2020-12-22  0:50 UTC (permalink / raw)
  To: v.shpilevoy, alyapunov; +Cc: tarantool-patches

On connection, an evio service callback is invoked to accept it. The
next step after acceptance was to process connection to tx thread
through cbus. This meant that any connection interaction involves tx
thread even before we get to send the greeting to the client.
Consequently, the client might reach the instance with enormous number
of connections, leading to the file descriptors limit exhaustion in
case of unresponsive tx thread (for example, when building secondary
index) and extra tx_process_connects afterwords, even in case the
instance doesn't fail with too many open files error.
This patch allows iproto to accept connection and send greeting by
itself. Thus the connection is being established and stays in
fetch_schema state while tx thread is unresponsive. It also starts
reading input and can process with closing the connection if needed
(starting from closing the descriptor).

Closes #3776
---
 src/box/iproto.cc                             | 185 ++++++++++++++----
 test/app/gh-4787-netbox-empty-errmsg.result   |  18 --
 test/app/gh-4787-netbox-empty-errmsg.test.lua |   8 -
 3 files changed, 145 insertions(+), 66 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index f7330af21d..b48a774c92 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -356,6 +356,11 @@ enum iproto_connection_state {
 	 * takes an already active socket in a constructor.
 	 */
 	IPROTO_CONNECTION_ALIVE,
+	/**
+	 * Connection is failed in iproto, but tx doesn't know
+	 * about it yet. Used for unsuccessful greeting.
+	 */
+	IPROTO_CONNECTION_FAILED,
 	/**
 	 * Socket was closed, a notification is sent to the TX
 	 * thread to close the session.
@@ -453,6 +458,8 @@ struct iproto_connection
 	 * meaningless.
 	 */
 	size_t parse_size;
+	/** Iproto buffer used to send greeting. */
+	char greeting_buf[IPROTO_GREETING_SIZE];
 	/**
 	 * Nubmer of active long polling requests that have already
 	 * discarded their arguments in order not to stall other
@@ -679,7 +686,8 @@ iproto_connection_close(struct iproto_connection *con)
 		 */
 		con->p_ibuf->wpos -= con->parse_size;
 		cpipe_push(&tx_pipe, &con->disconnect_msg);
-		assert(con->state == IPROTO_CONNECTION_ALIVE);
+		assert(con->state == IPROTO_CONNECTION_ALIVE ||
+		       con->state == IPROTO_CONNECTION_FAILED);
 		con->state = IPROTO_CONNECTION_CLOSED;
 	} else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
 		iproto_connection_try_to_start_destroy(con);
@@ -1091,6 +1099,58 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher,
 	}
 }
 
+/**
+ * Flush greeting from it's buffer. It requires separate flush function
+ * because greeting is being sent from iproto thread which can't use obuf
+ * and thus we can send greeting without involving tx.
+ */
+static int
+iproto_connection_greeting_flush(struct iproto_connection *con)
+{
+	struct iovec greeting;
+	greeting.iov_base = &con->greeting_buf;
+	greeting.iov_len = IPROTO_GREETING_SIZE;
+	ssize_t nwr = sio_writev(con->output.fd, &greeting, 1);
+
+	if (nwr > 0) {
+		/* Count statistics */
+		rmean_collect(rmean_net, IPROTO_SENT, nwr);
+		if (nwr < IPROTO_GREETING_SIZE) {
+			diag_set(SocketError, sio_socketname(con->output.fd),
+				 "partial greeting write %d", nwr);
+			return -1;
+		}
+		return 1;
+	} else if (nwr < 0 && !sio_wouldblock(errno)) {
+		return -1;
+	}
+
+	return 0;
+}
+
+static void
+iproto_connection_on_greeting(ev_loop *loop, struct ev_io *watcher,
+			      int /* revents */)
+{
+	struct iproto_connection *con =
+		(struct iproto_connection *)watcher->data;
+	int rc = iproto_connection_greeting_flush(con);
+	if (rc <= 0) {
+		if (rc == 0) {
+			ev_io_start(loop, &con->output);
+		} else {
+			diag_log();
+			con->state = IPROTO_CONNECTION_FAILED;
+		}
+		return;
+	}
+	ev_io_stop(con->loop, &con->output);
+	ev_io_init(&con->output, iproto_connection_on_output,
+		   con->output.fd, EV_WRITE);
+	/* Start reading input. */
+	ev_feed_event(con->loop, &con->input, EV_READ);
+}
+
 static struct iproto_connection *
 iproto_connection_new(int fd)
 {
@@ -1103,7 +1163,7 @@ iproto_connection_new(int fd)
 	con->input.data = con->output.data = con;
 	con->loop = loop();
 	ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
-	ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
+	ev_io_init(&con->output, iproto_connection_on_greeting, fd, EV_WRITE);
 	ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
 	ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
 	obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
@@ -1484,8 +1544,16 @@ static inline struct iproto_msg *
 tx_accept_msg(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
-	tx_accept_wpos(msg->connection, &msg->wpos);
-	tx_fiber_init(msg->connection->session, msg->header.sync);
+	struct iproto_connection *con = msg->connection;
+	if (con->state != IPROTO_CONNECTION_ALIVE) {
+		/*
+		 * Connection might be closed from iproto already.
+		 * No action required in this case.
+		 */
+		return msg;
+	}
+	tx_accept_wpos(con, &msg->wpos);
+	tx_fiber_init(con->session, msg->header.sync);
 	return msg;
 }
 
@@ -1510,6 +1578,8 @@ static void
 tx_reply_iproto_error(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg->connection->state != IPROTO_CONNECTION_ALIVE)
+		return;
 	struct obuf *out = msg->connection->tx.p_obuf;
 	iproto_reply_error(out, diag_last_error(&msg->diag),
 			   msg->header.sync, ::schema_version);
@@ -1530,6 +1600,8 @@ static void
 tx_process1(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg->connection->state != IPROTO_CONNECTION_ALIVE)
+		return;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1556,6 +1628,8 @@ static void
 tx_process_select(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg->connection->state != IPROTO_CONNECTION_ALIVE)
+		return;
 	struct obuf *out;
 	struct obuf_svp svp;
 	struct port port;
@@ -1610,6 +1684,8 @@ static void
 tx_process_call(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg->connection->state != IPROTO_CONNECTION_ALIVE)
+		return;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1689,6 +1765,8 @@ static void
 tx_process_misc(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg->connection->state != IPROTO_CONNECTION_ALIVE)
+		return;
 	struct iproto_connection *con = msg->connection;
 	struct obuf *out = con->tx.p_obuf;
 	if (tx_check_schema(msg->header.schema_version))
@@ -1732,6 +1810,8 @@ static void
 tx_process_sql(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg->connection->state != IPROTO_CONNECTION_ALIVE)
+		return;
 	struct obuf *out;
 	struct port port;
 	struct sql_bind *bind = NULL;
@@ -1828,6 +1908,8 @@ static void
 tx_process_replication(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg->connection->state != IPROTO_CONNECTION_ALIVE)
+		return;
 	struct iproto_connection *con = msg->connection;
 	struct ev_io io;
 	coio_create(&io, con->input.fd);
@@ -1938,50 +2020,80 @@ net_end_subscribe(struct cmsg *m)
 }
 
 /**
- * Handshake a connection: invoke the on-connect trigger
- * and possibly authenticate. Try to send the client an error
- * upon a failure.
+ * Handshake a connection: send greeting for it.
+ */
+static void
+iproto_process_connect(struct iproto_connection *con)
+{
+	/*
+	 * INSTANCE_UUID is guaranteed to be inited before this moment.
+	 * We start listening either in local_recovery() or bootstrap().
+	 * The INSTANCE_UUID is ensured to be inited in the beginning of
+	 * both methods. In case of local_recovery() it is verified that
+	 * INSTANCE_UUID was read from the snapshot in memtx_engine_new().
+	 * In bootstrap() INSTANCE_UUID is either taken from the
+	 * instance_uuid box.cfg{} param or created on the spot.
+	 */
+	random_bytes(con->salt, IPROTO_SALT_SIZE);
+	greeting_encode(con->greeting_buf, tarantool_version_id(),
+			&INSTANCE_UUID, con->salt, IPROTO_SALT_SIZE);
+	assert(evio_has_fd(&con->output));
+	ev_feed_event(con->loop, &con->output, EV_WRITE);
+}
+
+/**
+ * Create the session and invoke the on_connect triggers.
  */
 static void
 tx_process_connect(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
-	struct obuf *out = msg->connection->tx.p_obuf;
-	try {              /* connect. */
-		con->session = session_create(SESSION_TYPE_BINARY);
-		if (con->session == NULL)
-			diag_raise();
-		con->session->meta.connection = con;
-		tx_fiber_init(con->session, 0);
-		char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
-		/* TODO: dirty read from tx thread */
-		struct tt_uuid uuid = INSTANCE_UUID;
-		random_bytes(con->salt, IPROTO_SALT_SIZE);
-		greeting_encode(greeting, tarantool_version_id(), &uuid,
-				con->salt, IPROTO_SALT_SIZE);
-		obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
-		if (! rlist_empty(&session_on_connect)) {
-			if (session_run_on_connect_triggers(con->session) != 0)
-				diag_raise();
-		}
-		iproto_wpos_create(&msg->wpos, out);
-	} catch (Exception *e) {
+
+	if (con->state != IPROTO_CONNECTION_ALIVE &&
+	    con->state != IPROTO_CONNECTION_FAILED) {
+		/*
+		 * Connection might be closed from iproto already.
+		 * No action required in this case.
+		 */
+		return;
+	}
+
+	con->session = session_create(SESSION_TYPE_BINARY);
+	if (con->session == NULL) {
 		tx_reply_error(msg);
 		msg->close_connection = true;
+		return;
+	}
+	con->session->meta.connection = con;
+
+	tx_fiber_init(con->session, 0);
+	if (!rlist_empty(&session_on_connect)) {
+		if (session_run_on_connect_triggers(con->session) != 0) {
+			tx_reply_error(msg);
+			msg->close_connection = true;
+		}
 	}
 }
 
 /**
- * Send a response to connect to the client or close the
- * connection in case on_connect trigger failed.
+ * Try to send the client an error upon a failure. Start reading
+ * input in case the connection is inited and all good.
  */
 static void
-net_send_greeting(struct cmsg *m)
+net_finish_connect(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
-	if (msg->close_connection) {
+	if (con->state != IPROTO_CONNECTION_ALIVE &&
+	    con->state != IPROTO_CONNECTION_FAILED) {
+		/*
+		 * Connection might be closed from iproto already.
+		 */
+		iproto_msg_delete(msg);
+		return;
+	}
+	if (msg->close_connection || con->state == IPROTO_CONNECTION_FAILED) {
 		struct obuf *out = msg->wpos.obuf;
 		int64_t nwr = sio_writev(con->output.fd, out->iov,
 					 obuf_iovcnt(out));
@@ -1998,20 +2110,12 @@ net_send_greeting(struct cmsg *m)
 		return;
 	}
 	con->wend = msg->wpos;
-	/*
-	 * Connect is synchronous, so no one could have been
-	 * messing up with the connection while it was in
-	 * progress.
-	 */
-	assert(evio_has_fd(&con->output));
-	/* Handshake OK, start reading input. */
-	ev_feed_event(con->loop, &con->output, EV_WRITE);
 	iproto_msg_delete(msg);
 }
 
 static const struct cmsg_hop connect_route[] = {
 	{ tx_process_connect, &net_pipe },
-	{ net_send_greeting, NULL },
+	{ net_finish_connect, NULL },
 };
 
 /** }}} */
@@ -2042,6 +2146,7 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
 	cmsg_init(&msg->base, connect_route);
 	msg->p_ibuf = con->p_ibuf;
 	msg->wpos = con->wpos;
+	iproto_process_connect(con);
 	cpipe_push(&tx_pipe, &msg->base);
 	return 0;
 }
diff --git a/test/app/gh-4787-netbox-empty-errmsg.result b/test/app/gh-4787-netbox-empty-errmsg.result
index d30337a050..6389b27bc8 100644
--- a/test/app/gh-4787-netbox-empty-errmsg.result
+++ b/test/app/gh-4787-netbox-empty-errmsg.result
@@ -38,24 +38,6 @@ req_during_auth()
  | - Connection is not established, state is "auth"
  | ...
 
--- Check the same for 'initial' state.
-ok, err = nil
- | ---
- | ...
-do                                                                              \
-    c = netbox.connect(box.cfg.listen, {wait_connected = false})                \
-    ok, err = pcall(c.call, c, 'echo', {}, {is_async = true})                   \
-end
- | ---
- | ...
-ok, err
- | ---
- | - false
- | - Connection is not established, state is "initial"
- | ...
-c:close()
- | ---
- | ...
 box.schema.user.drop('test')
  | ---
  | ...
diff --git a/test/app/gh-4787-netbox-empty-errmsg.test.lua b/test/app/gh-4787-netbox-empty-errmsg.test.lua
index 0eecaa1bf0..55ea43f26f 100755
--- a/test/app/gh-4787-netbox-empty-errmsg.test.lua
+++ b/test/app/gh-4787-netbox-empty-errmsg.test.lua
@@ -21,12 +21,4 @@ end
 
 req_during_auth()
 
--- Check the same for 'initial' state.
-ok, err = nil
-do                                                                              \
-    c = netbox.connect(box.cfg.listen, {wait_connected = false})                \
-    ok, err = pcall(c.call, c, 'echo', {}, {is_async = true})                   \
-end
-ok, err
-c:close()
 box.schema.user.drop('test')
-- 
2.17.1

^ permalink raw reply	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2020-12-26 13:15 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-12-19 17:39 [Tarantool-patches] [PATCH v7 0/3] iproto: greeting enhancement Ilya Kosarev
2020-12-19 17:39 ` [Tarantool-patches] [PATCH v7 1/3] iproto: move msg fields initialization to iproto_msg_new() Ilya Kosarev
2020-12-19 17:39 ` [Tarantool-patches] [PATCH v7 2/3] iproto: fix comment and add assert on destruction Ilya Kosarev
2020-12-19 17:39 ` [Tarantool-patches] [PATCH v7 3/3] iproto: move greeting from tx thread to iproto Ilya Kosarev
2020-12-21 16:20   ` Vladislav Shpilevoy
2020-12-22  0:50     ` Ilya Kosarev
2020-12-22  0:49 [Tarantool-patches] [PATCH v7 0/3] iproto: greeting enhancement Ilya Kosarev
2020-12-22  0:50 ` [Tarantool-patches] [PATCH v7 3/3] iproto: move greeting from tx thread to iproto Ilya Kosarev
2020-12-22 14:21   ` Vladislav Shpilevoy
2020-12-24 20:13     ` Ilya Kosarev
2020-12-26 13:15       ` Vladislav Shpilevoy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox