Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v9 0/3] iproto: greeting enhancement
@ 2020-12-24 20:13 Ilya Kosarev
  2020-12-24 20:13 ` [Tarantool-patches] [PATCH v9 1/3] iproto: move msg fields initialization to iproto_msg_new() Ilya Kosarev
                   ` (2 more replies)
  0 siblings, 3 replies; 7+ messages in thread
From: Ilya Kosarev @ 2020-12-24 20:13 UTC (permalink / raw)
  To: v.shpilevoy, alyapunov; +Cc: tarantool-patches

These patches move greeting from tx thread to iproto and introduce
improvements into iproto logic.

Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto
Issue: https://github.com/tarantool/tarantool/issues/3776

Changes in v2:
 - docbot request provided
 - ChangeLog provided
 - net_send_msg() used instead of net_send_error() where needed
 - error cases made clear in iproto_msg_decode()
 - replaced net_check_connection() with iproto_connection_fail() predicate
 - improved tx session initialization fail processing to avoid extra tries
 - fixed obuf_dup() fail processing in iproto_process_connect()
 - added some comments
 - some comments style fixed

Changes in v3:
 - added braces to the confusing one-line if statement
 - updated ChangeLog
 - added description for iproto_msg_decode()
 - simplified error processing in iproto_msg_decode()

Changes in v4:
 - fixed msg->close_connection initialization bug
 - fixed false-sharing problem in iproto_connection struct
 - added needed assertion
 - added needed comments
 - names refactoring
 - simplified patch a bit: removed extra return value, extra 
 
Changes in v5:
 - reworked to avoid lazy initialization and extra changes

Changes in v6:
 - some changes are picket out into separate commits
 - main commit message rewrited to become more comprehensive
 - removed some extra changes
 - style fixes
 - greeting allocation fixed

Changes in v7:
 - fixed functions signatures & added comments
 - partial greeting write taken into account
 - added needed state

Changes in v8:
 - style fix
 - iproto now starts to read right after greeting and can close the socket

Changes in v9:
 - simplified processing of closed connections in tx (no extra state reading)
 - removed extra iproto_connection_stop_msg_max_limit() not applicable call

Ilya Kosarev (3):
  iproto: move msg fields initialization to iproto_msg_new()
  iproto: fix comment and add assert on destruction
  iproto: move greeting from tx thread to iproto

 src/box/iproto.cc                             | 194 ++++++++++++++----
 test/app/gh-4787-netbox-empty-errmsg.result   |  18 --
 test/app/gh-4787-netbox-empty-errmsg.test.lua |   8 -
 3 files changed, 150 insertions(+), 70 deletions(-)

-- 
2.17.1

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [Tarantool-patches] [PATCH v9 1/3] iproto: move msg fields initialization to iproto_msg_new()
  2020-12-24 20:13 [Tarantool-patches] [PATCH v9 0/3] iproto: greeting enhancement Ilya Kosarev
@ 2020-12-24 20:13 ` Ilya Kosarev
  2020-12-24 20:13 ` [Tarantool-patches] [PATCH v9 2/3] iproto: fix comment and add assert on destruction Ilya Kosarev
  2020-12-24 20:13 ` [Tarantool-patches] [PATCH v9 3/3] iproto: move greeting from tx thread to iproto Ilya Kosarev
  2 siblings, 0 replies; 7+ messages in thread
From: Ilya Kosarev @ 2020-12-24 20:13 UTC (permalink / raw)
  To: v.shpilevoy, alyapunov; +Cc: tarantool-patches

msg->close_connection flag was only initialized in iproto_on_accept()
while other struct iproto_msg fields are being initialized in
iproto_msg_new(). It is potentially dangerous for new logic involving
msg->close_connection flag, so it is now moved to iproto_msg_new().

Part of #3776
---
 src/box/iproto.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index b8f65e5ec..6a1e50922 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -565,6 +565,7 @@ iproto_msg_new(struct iproto_connection *con)
 			 "connection %s", sio_socketname(con->input.fd));
 		return NULL;
 	}
+	msg->close_connection = false;
 	msg->connection = con;
 	rmean_collect(rmean_net, IPROTO_REQUESTS, 1);
 	return msg;
@@ -2040,7 +2041,6 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
 	cmsg_init(&msg->base, connect_route);
 	msg->p_ibuf = con->p_ibuf;
 	msg->wpos = con->wpos;
-	msg->close_connection = false;
 	cpipe_push(&tx_pipe, &msg->base);
 	return 0;
 }
-- 
2.17.1

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [Tarantool-patches] [PATCH v9 2/3] iproto: fix comment and add assert on destruction
  2020-12-24 20:13 [Tarantool-patches] [PATCH v9 0/3] iproto: greeting enhancement Ilya Kosarev
  2020-12-24 20:13 ` [Tarantool-patches] [PATCH v9 1/3] iproto: move msg fields initialization to iproto_msg_new() Ilya Kosarev
@ 2020-12-24 20:13 ` Ilya Kosarev
  2020-12-28 11:49   ` Vladislav Shpilevoy
  2020-12-24 20:13 ` [Tarantool-patches] [PATCH v9 3/3] iproto: move greeting from tx thread to iproto Ilya Kosarev
  2 siblings, 1 reply; 7+ messages in thread
From: Ilya Kosarev @ 2020-12-24 20:13 UTC (permalink / raw)
  To: v.shpilevoy, alyapunov; +Cc: tarantool-patches

The comment in tx_process_destroy() about obuf destroting was wrong.
Memory for them is actually allocated from tx-belonging slab cache and
tx_process_destroy() obviously happens in tx, so the comment is fixed
to reflect the reality.
It is also implied that connection is in IPROTO_CONNECTION_DESTROYED
state in tx_process_destroy(). Now it is verified with assert().

Part of #3776
---
 src/box/iproto.cc | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 6a1e50922..f7330af21 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1379,13 +1379,14 @@ tx_process_destroy(struct cmsg *m)
 {
 	struct iproto_connection *con =
 		container_of(m, struct iproto_connection, destroy_msg);
+	assert(con->state == IPROTO_CONNECTION_DESTROYED);
 	if (con->session) {
 		session_destroy(con->session);
 		con->session = NULL; /* safety */
 	}
 	/*
-	 * Got to be done in iproto thread since
-	 * that's where the memory is allocated.
+	 * obuf is being destroyed in tx thread cause it is where
+	 * it was allocated.
 	 */
 	obuf_destroy(&con->obuf[0]);
 	obuf_destroy(&con->obuf[1]);
-- 
2.17.1

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [Tarantool-patches] [PATCH v9 3/3] iproto: move greeting from tx thread to iproto
  2020-12-24 20:13 [Tarantool-patches] [PATCH v9 0/3] iproto: greeting enhancement Ilya Kosarev
  2020-12-24 20:13 ` [Tarantool-patches] [PATCH v9 1/3] iproto: move msg fields initialization to iproto_msg_new() Ilya Kosarev
  2020-12-24 20:13 ` [Tarantool-patches] [PATCH v9 2/3] iproto: fix comment and add assert on destruction Ilya Kosarev
@ 2020-12-24 20:13 ` Ilya Kosarev
  2020-12-28 11:21   ` Vladislav Shpilevoy
  2 siblings, 1 reply; 7+ messages in thread
From: Ilya Kosarev @ 2020-12-24 20:13 UTC (permalink / raw)
  To: v.shpilevoy, alyapunov; +Cc: tarantool-patches

On connection, an evio service callback is invoked to accept it. The
next step after acceptance was to process connection to tx thread
through cbus. This meant that any connection interaction involves tx
thread even before we get to send the greeting to the client.
Consequently, the client might reach the instance with enormous number
of connections, leading to the file descriptors limit exhaustion in
case of unresponsive tx thread (for example, when building secondary
index) and extra tx_process_connects afterwords, even in case the
instance doesn't fail with too many open files error.
This patch allows iproto to accept connection and send greeting by
itself. Thus the connection is being established and stays in
fetch_schema state while tx thread is unresponsive. It also starts
reading input and can process with closing the connection if needed
(starting from closing the descriptor).

Closes #3776
---
 src/box/iproto.cc                             | 187 ++++++++++++++----
 test/app/gh-4787-netbox-empty-errmsg.result   |  18 --
 test/app/gh-4787-netbox-empty-errmsg.test.lua |   8 -
 3 files changed, 146 insertions(+), 67 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index f7330af21..b92a0433b 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -356,6 +356,11 @@ enum iproto_connection_state {
 	 * takes an already active socket in a constructor.
 	 */
 	IPROTO_CONNECTION_ALIVE,
+	/**
+	 * Connection is failed in iproto, but tx doesn't know
+	 * about it yet. Used for unsuccessful greeting.
+	 */
+	IPROTO_CONNECTION_FAILED,
 	/**
 	 * Socket was closed, a notification is sent to the TX
 	 * thread to close the session.
@@ -453,6 +458,8 @@ struct iproto_connection
 	 * meaningless.
 	 */
 	size_t parse_size;
+	/** Iproto buffer used to send greeting. */
+	char greeting_buf[IPROTO_GREETING_SIZE];
 	/**
 	 * Nubmer of active long polling requests that have already
 	 * discarded their arguments in order not to stall other
@@ -679,7 +686,8 @@ iproto_connection_close(struct iproto_connection *con)
 		 */
 		con->p_ibuf->wpos -= con->parse_size;
 		cpipe_push(&tx_pipe, &con->disconnect_msg);
-		assert(con->state == IPROTO_CONNECTION_ALIVE);
+		assert(con->state == IPROTO_CONNECTION_ALIVE ||
+		       con->state == IPROTO_CONNECTION_FAILED);
 		con->state = IPROTO_CONNECTION_CLOSED;
 	} else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
 		iproto_connection_try_to_start_destroy(con);
@@ -963,7 +971,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
 	 * otherwise we might deplete the fiber pool in tx
 	 * thread and deadlock.
 	 */
-	if (iproto_check_msg_max()) {
+	if (iproto_check_msg_max() && con->session != NULL) {
 		iproto_connection_stop_msg_max_limit(con);
 		return;
 	}
@@ -1091,6 +1099,58 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher,
 	}
 }
 
+/**
+ * Flush greeting from it's buffer. It requires separate flush function
+ * because greeting is being sent from iproto thread which can't use obuf
+ * and thus we can send greeting without involving tx.
+ */
+static int
+iproto_connection_greeting_flush(struct iproto_connection *con)
+{
+	struct iovec greeting;
+	greeting.iov_base = &con->greeting_buf;
+	greeting.iov_len = IPROTO_GREETING_SIZE;
+	ssize_t nwr = sio_writev(con->output.fd, &greeting, 1);
+
+	if (nwr > 0) {
+		/* Count statistics */
+		rmean_collect(rmean_net, IPROTO_SENT, nwr);
+		if (nwr < IPROTO_GREETING_SIZE) {
+			diag_set(SocketError, sio_socketname(con->output.fd),
+				 "partial greeting write %d", nwr);
+			return -1;
+		}
+		return 1;
+	} else if (nwr < 0 && !sio_wouldblock(errno)) {
+		return -1;
+	}
+
+	return 0;
+}
+
+static void
+iproto_connection_on_greeting(ev_loop *loop, struct ev_io *watcher,
+			      int /* revents */)
+{
+	struct iproto_connection *con =
+		(struct iproto_connection *)watcher->data;
+	int rc = iproto_connection_greeting_flush(con);
+	if (rc <= 0) {
+		if (rc == 0) {
+			ev_io_start(loop, &con->output);
+		} else {
+			diag_log();
+			con->state = IPROTO_CONNECTION_FAILED;
+		}
+		return;
+	}
+	ev_io_stop(con->loop, &con->output);
+	ev_io_init(&con->output, iproto_connection_on_output,
+		   con->output.fd, EV_WRITE);
+	/* Start reading input. */
+	ev_feed_event(con->loop, &con->input, EV_READ);
+}
+
 static struct iproto_connection *
 iproto_connection_new(int fd)
 {
@@ -1103,7 +1163,7 @@ iproto_connection_new(int fd)
 	con->input.data = con->output.data = con;
 	con->loop = loop();
 	ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
-	ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
+	ev_io_init(&con->output, iproto_connection_on_greeting, fd, EV_WRITE);
 	ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
 	ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
 	obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
@@ -1484,8 +1544,16 @@ static inline struct iproto_msg *
 tx_accept_msg(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
-	tx_accept_wpos(msg->connection, &msg->wpos);
-	tx_fiber_init(msg->connection->session, msg->header.sync);
+	struct iproto_connection *con = msg->connection;
+	if (con->state != IPROTO_CONNECTION_ALIVE) {
+		/*
+		 * Connection might be closed from iproto already.
+		 * No action required in this case.
+		 */
+		return NULL;
+	}
+	tx_accept_wpos(con, &msg->wpos);
+	tx_fiber_init(con->session, msg->header.sync);
 	return msg;
 }
 
@@ -1510,6 +1578,8 @@ static void
 tx_reply_iproto_error(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg == NULL)
+		return;
 	struct obuf *out = msg->connection->tx.p_obuf;
 	iproto_reply_error(out, diag_last_error(&msg->diag),
 			   msg->header.sync, ::schema_version);
@@ -1530,6 +1600,8 @@ static void
 tx_process1(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg == NULL)
+		return;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1556,6 +1628,8 @@ static void
 tx_process_select(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg == NULL)
+		return;
 	struct obuf *out;
 	struct obuf_svp svp;
 	struct port port;
@@ -1610,6 +1684,8 @@ static void
 tx_process_call(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg == NULL)
+		return;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1689,6 +1765,8 @@ static void
 tx_process_misc(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg == NULL)
+		return;
 	struct iproto_connection *con = msg->connection;
 	struct obuf *out = con->tx.p_obuf;
 	if (tx_check_schema(msg->header.schema_version))
@@ -1732,6 +1810,8 @@ static void
 tx_process_sql(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg == NULL)
+		return;
 	struct obuf *out;
 	struct port port;
 	struct sql_bind *bind = NULL;
@@ -1828,6 +1908,8 @@ static void
 tx_process_replication(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg == NULL)
+		return;
 	struct iproto_connection *con = msg->connection;
 	struct ev_io io;
 	coio_create(&io, con->input.fd);
@@ -1938,50 +2020,80 @@ net_end_subscribe(struct cmsg *m)
 }
 
 /**
- * Handshake a connection: invoke the on-connect trigger
- * and possibly authenticate. Try to send the client an error
- * upon a failure.
+ * Handshake a connection: send greeting for it.
+ */
+static void
+iproto_process_connect(struct iproto_connection *con)
+{
+	/*
+	 * INSTANCE_UUID is guaranteed to be inited before this moment.
+	 * We start listening either in local_recovery() or bootstrap().
+	 * The INSTANCE_UUID is ensured to be inited in the beginning of
+	 * both methods. In case of local_recovery() it is verified that
+	 * INSTANCE_UUID was read from the snapshot in memtx_engine_new().
+	 * In bootstrap() INSTANCE_UUID is either taken from the
+	 * instance_uuid box.cfg{} param or created on the spot.
+	 */
+	random_bytes(con->salt, IPROTO_SALT_SIZE);
+	greeting_encode(con->greeting_buf, tarantool_version_id(),
+			&INSTANCE_UUID, con->salt, IPROTO_SALT_SIZE);
+	assert(evio_has_fd(&con->output));
+	ev_feed_event(con->loop, &con->output, EV_WRITE);
+}
+
+/**
+ * Create the session and invoke the on_connect triggers.
  */
 static void
 tx_process_connect(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
-	struct obuf *out = msg->connection->tx.p_obuf;
-	try {              /* connect. */
-		con->session = session_create(SESSION_TYPE_BINARY);
-		if (con->session == NULL)
-			diag_raise();
-		con->session->meta.connection = con;
-		tx_fiber_init(con->session, 0);
-		char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
-		/* TODO: dirty read from tx thread */
-		struct tt_uuid uuid = INSTANCE_UUID;
-		random_bytes(con->salt, IPROTO_SALT_SIZE);
-		greeting_encode(greeting, tarantool_version_id(), &uuid,
-				con->salt, IPROTO_SALT_SIZE);
-		obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
-		if (! rlist_empty(&session_on_connect)) {
-			if (session_run_on_connect_triggers(con->session) != 0)
-				diag_raise();
-		}
-		iproto_wpos_create(&msg->wpos, out);
-	} catch (Exception *e) {
+
+	if (con->state != IPROTO_CONNECTION_ALIVE &&
+	    con->state != IPROTO_CONNECTION_FAILED) {
+		/*
+		 * Connection might be closed from iproto already.
+		 * No action required in this case.
+		 */
+		return;
+	}
+
+	con->session = session_create(SESSION_TYPE_BINARY);
+	if (con->session == NULL) {
 		tx_reply_error(msg);
 		msg->close_connection = true;
+		return;
+	}
+	con->session->meta.connection = con;
+
+	tx_fiber_init(con->session, 0);
+	if (!rlist_empty(&session_on_connect)) {
+		if (session_run_on_connect_triggers(con->session) != 0) {
+			tx_reply_error(msg);
+			msg->close_connection = true;
+		}
 	}
 }
 
 /**
- * Send a response to connect to the client or close the
- * connection in case on_connect trigger failed.
+ * Try to send the client an error upon a failure. Start reading
+ * input in case the connection is inited and all good.
  */
 static void
-net_send_greeting(struct cmsg *m)
+net_finish_connect(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
-	if (msg->close_connection) {
+	if (con->state != IPROTO_CONNECTION_ALIVE &&
+	    con->state != IPROTO_CONNECTION_FAILED) {
+		/*
+		 * Connection might be closed from iproto already.
+		 */
+		iproto_msg_delete(msg);
+		return;
+	}
+	if (msg->close_connection || con->state == IPROTO_CONNECTION_FAILED) {
 		struct obuf *out = msg->wpos.obuf;
 		int64_t nwr = sio_writev(con->output.fd, out->iov,
 					 obuf_iovcnt(out));
@@ -1998,20 +2110,12 @@ net_send_greeting(struct cmsg *m)
 		return;
 	}
 	con->wend = msg->wpos;
-	/*
-	 * Connect is synchronous, so no one could have been
-	 * messing up with the connection while it was in
-	 * progress.
-	 */
-	assert(evio_has_fd(&con->output));
-	/* Handshake OK, start reading input. */
-	ev_feed_event(con->loop, &con->output, EV_WRITE);
 	iproto_msg_delete(msg);
 }
 
 static const struct cmsg_hop connect_route[] = {
 	{ tx_process_connect, &net_pipe },
-	{ net_send_greeting, NULL },
+	{ net_finish_connect, NULL },
 };
 
 /** }}} */
@@ -2042,6 +2146,7 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
 	cmsg_init(&msg->base, connect_route);
 	msg->p_ibuf = con->p_ibuf;
 	msg->wpos = con->wpos;
+	iproto_process_connect(con);
 	cpipe_push(&tx_pipe, &msg->base);
 	return 0;
 }
diff --git a/test/app/gh-4787-netbox-empty-errmsg.result b/test/app/gh-4787-netbox-empty-errmsg.result
index d30337a05..6389b27bc 100644
--- a/test/app/gh-4787-netbox-empty-errmsg.result
+++ b/test/app/gh-4787-netbox-empty-errmsg.result
@@ -38,24 +38,6 @@ req_during_auth()
  | - Connection is not established, state is "auth"
  | ...
 
--- Check the same for 'initial' state.
-ok, err = nil
- | ---
- | ...
-do                                                                              \
-    c = netbox.connect(box.cfg.listen, {wait_connected = false})                \
-    ok, err = pcall(c.call, c, 'echo', {}, {is_async = true})                   \
-end
- | ---
- | ...
-ok, err
- | ---
- | - false
- | - Connection is not established, state is "initial"
- | ...
-c:close()
- | ---
- | ...
 box.schema.user.drop('test')
  | ---
  | ...
diff --git a/test/app/gh-4787-netbox-empty-errmsg.test.lua b/test/app/gh-4787-netbox-empty-errmsg.test.lua
index 0eecaa1bf..55ea43f26 100755
--- a/test/app/gh-4787-netbox-empty-errmsg.test.lua
+++ b/test/app/gh-4787-netbox-empty-errmsg.test.lua
@@ -21,12 +21,4 @@ end
 
 req_during_auth()
 
--- Check the same for 'initial' state.
-ok, err = nil
-do                                                                              \
-    c = netbox.connect(box.cfg.listen, {wait_connected = false})                \
-    ok, err = pcall(c.call, c, 'echo', {}, {is_async = true})                   \
-end
-ok, err
-c:close()
 box.schema.user.drop('test')
-- 
2.17.1

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [Tarantool-patches] [PATCH v9 3/3] iproto: move greeting from tx thread to iproto
  2020-12-24 20:13 ` [Tarantool-patches] [PATCH v9 3/3] iproto: move greeting from tx thread to iproto Ilya Kosarev
@ 2020-12-28 11:21   ` Vladislav Shpilevoy
  0 siblings, 0 replies; 7+ messages in thread
From: Vladislav Shpilevoy @ 2020-12-28 11:21 UTC (permalink / raw)
  To: Ilya Kosarev, alyapunov; +Cc: tarantool-patches

Hi! Thanks for the patch!

We are getting closer and closer to the final solution!

> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index f7330af21..b92a0433b 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -963,7 +971,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
>  	 * otherwise we might deplete the fiber pool in tx
>  	 * thread and deadlock.
>  	 */
> -	if (iproto_check_msg_max()) {
> +	if (iproto_check_msg_max() && con->session != NULL) {

Ok, I see why didn't it close anything before. It didn't even try to read,
because my tests used the entire net_msg limit, and only then closed the socket.
On the server side it didn't detect the close, because didn't call sio_read().

It is better now, but leads to another issue - you allow to allocate infinite
numer of network messages until the connection is established. Net_msg max
was here for a reason. That will lead to OOM eventually if connection count is
big, and they send a lot or messages, or a few big messages. Because you will
allocate net_msg objects and also will consume ibuf and keep it until TX
responds. Even if the connection is already closed.

I tried to find a way how to resolve this. For example, don't read anything
until TX acks the connection. But it has 2 issues: there is no cross-platform
way to find if a socket is closed without calling read() on it and consuming
all pending data. And this way is basically an ugly version of the next step
of what to do with IProto - make TX control its behaviour and introduce a
proper state machine for TX.

With that said, it looks that this patch won't fix the issue with the current
approach. Or rather it fixes the issue, but simply introduces another one, not
less severe, about memory instead of descriptors.

It seems, you need to implement TX control over IProto now. Make it tell IProto
that it should stop accepting new connections and stop sending messages to TX.

It should not even hurt replication, because if TX does not dispatch IProto
messages, it includes service messages as well, and therefore there is no
difference if you not send them to TX, or send and they are not dispatched.


After writing the text above, I had a discussion with Mons to sync about what
to do here, and in similar places, such as applier fibers running the entire
process out of memory when they read their sockets too aggressively (that
should persuade you that OOM on network messages is real).

For one-thread communications that can be solved with a simple queue limit.

For our case we already have a queue limit - net_msg_max. But it is not enough
apparently. We need a proper state machine for the entire thread. You need
to find all the states TX thread can be in, create a diagram, and describe
what it does in each state, and what it can't do. For example, obviously,
during local recovery TX thread does not yield, and therefore does not handle
messages from other threads (because does not return control to the scheduler
fiber) (although you better validate this statement, we may have missed
something).

After we have the states, we can decide what to do in each state with other
threads. For example, before starting local recovery we sync with IProto thread.
We make it stop generating new messages, handle all pending messages, and start
recovery. After local recovery we let IProto work normally, like now in master
branch.

Also you need to find what was the reason why listen is even started before local
recovery. Because it seems that if we have data to recover locally, it happens
regardless of replication settings. And that makes me wonder why the hell do we
even start listen so early then? Anyway during local recovery we can't accept
replication messages as well, AFAIK (but that can be false).

It is worth checking if this commit was really needed:
https://github.com/tarantool/tarantool/commit/601bb92a173943185b9050dab4572dc3e6f8e9a1

It could be just that Kostja said we need to do that, and Vova did, because
it was easier than argue, but there was no real necessity in it.

The state machine for TX thread would be good to make global. Because now it is
inside memtx engine if I remember correctly. But it does not belong to memtx engine
really. The states are going to be also used to run triggers installed by users,
from what I understood after talking to Mons.

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [Tarantool-patches] [PATCH v9 2/3] iproto: fix comment and add assert on destruction
  2020-12-24 20:13 ` [Tarantool-patches] [PATCH v9 2/3] iproto: fix comment and add assert on destruction Ilya Kosarev
@ 2020-12-28 11:49   ` Vladislav Shpilevoy
  2020-12-28 11:49     ` Vladislav Shpilevoy
  0 siblings, 1 reply; 7+ messages in thread
From: Vladislav Shpilevoy @ 2020-12-28 11:49 UTC (permalink / raw)
  To: Ilya Kosarev, alyapunov; +Cc: tarantool-patches

First 2 commits are pushed to master, 2.6, 2.6, and 1.10
as obvious.

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [Tarantool-patches] [PATCH v9 2/3] iproto: fix comment and add assert on destruction
  2020-12-28 11:49   ` Vladislav Shpilevoy
@ 2020-12-28 11:49     ` Vladislav Shpilevoy
  0 siblings, 0 replies; 7+ messages in thread
From: Vladislav Shpilevoy @ 2020-12-28 11:49 UTC (permalink / raw)
  To: Ilya Kosarev, alyapunov; +Cc: tarantool-patches

Sorry, "2.5 and 2.6", not "2.6 and 2.6".

On 28.12.2020 14:49, Vladislav Shpilevoy via Tarantool-patches wrote:
> First 2 commits are pushed to master, 2.6, 2.6, and 1.10
> as obvious.
> 

^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2020-12-28 11:49 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-12-24 20:13 [Tarantool-patches] [PATCH v9 0/3] iproto: greeting enhancement Ilya Kosarev
2020-12-24 20:13 ` [Tarantool-patches] [PATCH v9 1/3] iproto: move msg fields initialization to iproto_msg_new() Ilya Kosarev
2020-12-24 20:13 ` [Tarantool-patches] [PATCH v9 2/3] iproto: fix comment and add assert on destruction Ilya Kosarev
2020-12-28 11:49   ` Vladislav Shpilevoy
2020-12-28 11:49     ` Vladislav Shpilevoy
2020-12-24 20:13 ` [Tarantool-patches] [PATCH v9 3/3] iproto: move greeting from tx thread to iproto Ilya Kosarev
2020-12-28 11:21   ` Vladislav Shpilevoy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox