[Tarantool-patches] [PATCH v9 3/3] iproto: move greeting from tx thread to iproto

Ilya Kosarev i.kosarev at tarantool.org
Thu Dec 24 23:13:03 MSK 2020


On connection, an evio service callback is invoked to accept it. The
next step after acceptance was to process connection to tx thread
through cbus. This meant that any connection interaction involves tx
thread even before we get to send the greeting to the client.
Consequently, the client might reach the instance with enormous number
of connections, leading to the file descriptors limit exhaustion in
case of unresponsive tx thread (for example, when building secondary
index) and extra tx_process_connects afterwords, even in case the
instance doesn't fail with too many open files error.
This patch allows iproto to accept connection and send greeting by
itself. Thus the connection is being established and stays in
fetch_schema state while tx thread is unresponsive. It also starts
reading input and can process with closing the connection if needed
(starting from closing the descriptor).

Closes #3776
---
 src/box/iproto.cc                             | 187 ++++++++++++++----
 test/app/gh-4787-netbox-empty-errmsg.result   |  18 --
 test/app/gh-4787-netbox-empty-errmsg.test.lua |   8 -
 3 files changed, 146 insertions(+), 67 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index f7330af21..b92a0433b 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -356,6 +356,11 @@ enum iproto_connection_state {
 	 * takes an already active socket in a constructor.
 	 */
 	IPROTO_CONNECTION_ALIVE,
+	/**
+	 * Connection is failed in iproto, but tx doesn't know
+	 * about it yet. Used for unsuccessful greeting.
+	 */
+	IPROTO_CONNECTION_FAILED,
 	/**
 	 * Socket was closed, a notification is sent to the TX
 	 * thread to close the session.
@@ -453,6 +458,8 @@ struct iproto_connection
 	 * meaningless.
 	 */
 	size_t parse_size;
+	/** Iproto buffer used to send greeting. */
+	char greeting_buf[IPROTO_GREETING_SIZE];
 	/**
 	 * Nubmer of active long polling requests that have already
 	 * discarded their arguments in order not to stall other
@@ -679,7 +686,8 @@ iproto_connection_close(struct iproto_connection *con)
 		 */
 		con->p_ibuf->wpos -= con->parse_size;
 		cpipe_push(&tx_pipe, &con->disconnect_msg);
-		assert(con->state == IPROTO_CONNECTION_ALIVE);
+		assert(con->state == IPROTO_CONNECTION_ALIVE ||
+		       con->state == IPROTO_CONNECTION_FAILED);
 		con->state = IPROTO_CONNECTION_CLOSED;
 	} else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
 		iproto_connection_try_to_start_destroy(con);
@@ -963,7 +971,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
 	 * otherwise we might deplete the fiber pool in tx
 	 * thread and deadlock.
 	 */
-	if (iproto_check_msg_max()) {
+	if (iproto_check_msg_max() && con->session != NULL) {
 		iproto_connection_stop_msg_max_limit(con);
 		return;
 	}
@@ -1091,6 +1099,58 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher,
 	}
 }
 
+/**
+ * Flush greeting from it's buffer. It requires separate flush function
+ * because greeting is being sent from iproto thread which can't use obuf
+ * and thus we can send greeting without involving tx.
+ */
+static int
+iproto_connection_greeting_flush(struct iproto_connection *con)
+{
+	struct iovec greeting;
+	greeting.iov_base = &con->greeting_buf;
+	greeting.iov_len = IPROTO_GREETING_SIZE;
+	ssize_t nwr = sio_writev(con->output.fd, &greeting, 1);
+
+	if (nwr > 0) {
+		/* Count statistics */
+		rmean_collect(rmean_net, IPROTO_SENT, nwr);
+		if (nwr < IPROTO_GREETING_SIZE) {
+			diag_set(SocketError, sio_socketname(con->output.fd),
+				 "partial greeting write %d", nwr);
+			return -1;
+		}
+		return 1;
+	} else if (nwr < 0 && !sio_wouldblock(errno)) {
+		return -1;
+	}
+
+	return 0;
+}
+
+static void
+iproto_connection_on_greeting(ev_loop *loop, struct ev_io *watcher,
+			      int /* revents */)
+{
+	struct iproto_connection *con =
+		(struct iproto_connection *)watcher->data;
+	int rc = iproto_connection_greeting_flush(con);
+	if (rc <= 0) {
+		if (rc == 0) {
+			ev_io_start(loop, &con->output);
+		} else {
+			diag_log();
+			con->state = IPROTO_CONNECTION_FAILED;
+		}
+		return;
+	}
+	ev_io_stop(con->loop, &con->output);
+	ev_io_init(&con->output, iproto_connection_on_output,
+		   con->output.fd, EV_WRITE);
+	/* Start reading input. */
+	ev_feed_event(con->loop, &con->input, EV_READ);
+}
+
 static struct iproto_connection *
 iproto_connection_new(int fd)
 {
@@ -1103,7 +1163,7 @@ iproto_connection_new(int fd)
 	con->input.data = con->output.data = con;
 	con->loop = loop();
 	ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
-	ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
+	ev_io_init(&con->output, iproto_connection_on_greeting, fd, EV_WRITE);
 	ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
 	ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
 	obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
@@ -1484,8 +1544,16 @@ static inline struct iproto_msg *
 tx_accept_msg(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
-	tx_accept_wpos(msg->connection, &msg->wpos);
-	tx_fiber_init(msg->connection->session, msg->header.sync);
+	struct iproto_connection *con = msg->connection;
+	if (con->state != IPROTO_CONNECTION_ALIVE) {
+		/*
+		 * Connection might be closed from iproto already.
+		 * No action required in this case.
+		 */
+		return NULL;
+	}
+	tx_accept_wpos(con, &msg->wpos);
+	tx_fiber_init(con->session, msg->header.sync);
 	return msg;
 }
 
@@ -1510,6 +1578,8 @@ static void
 tx_reply_iproto_error(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg == NULL)
+		return;
 	struct obuf *out = msg->connection->tx.p_obuf;
 	iproto_reply_error(out, diag_last_error(&msg->diag),
 			   msg->header.sync, ::schema_version);
@@ -1530,6 +1600,8 @@ static void
 tx_process1(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg == NULL)
+		return;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1556,6 +1628,8 @@ static void
 tx_process_select(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg == NULL)
+		return;
 	struct obuf *out;
 	struct obuf_svp svp;
 	struct port port;
@@ -1610,6 +1684,8 @@ static void
 tx_process_call(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg == NULL)
+		return;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1689,6 +1765,8 @@ static void
 tx_process_misc(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg == NULL)
+		return;
 	struct iproto_connection *con = msg->connection;
 	struct obuf *out = con->tx.p_obuf;
 	if (tx_check_schema(msg->header.schema_version))
@@ -1732,6 +1810,8 @@ static void
 tx_process_sql(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg == NULL)
+		return;
 	struct obuf *out;
 	struct port port;
 	struct sql_bind *bind = NULL;
@@ -1828,6 +1908,8 @@ static void
 tx_process_replication(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
+	if (msg == NULL)
+		return;
 	struct iproto_connection *con = msg->connection;
 	struct ev_io io;
 	coio_create(&io, con->input.fd);
@@ -1938,50 +2020,80 @@ net_end_subscribe(struct cmsg *m)
 }
 
 /**
- * Handshake a connection: invoke the on-connect trigger
- * and possibly authenticate. Try to send the client an error
- * upon a failure.
+ * Handshake a connection: send greeting for it.
+ */
+static void
+iproto_process_connect(struct iproto_connection *con)
+{
+	/*
+	 * INSTANCE_UUID is guaranteed to be inited before this moment.
+	 * We start listening either in local_recovery() or bootstrap().
+	 * The INSTANCE_UUID is ensured to be inited in the beginning of
+	 * both methods. In case of local_recovery() it is verified that
+	 * INSTANCE_UUID was read from the snapshot in memtx_engine_new().
+	 * In bootstrap() INSTANCE_UUID is either taken from the
+	 * instance_uuid box.cfg{} param or created on the spot.
+	 */
+	random_bytes(con->salt, IPROTO_SALT_SIZE);
+	greeting_encode(con->greeting_buf, tarantool_version_id(),
+			&INSTANCE_UUID, con->salt, IPROTO_SALT_SIZE);
+	assert(evio_has_fd(&con->output));
+	ev_feed_event(con->loop, &con->output, EV_WRITE);
+}
+
+/**
+ * Create the session and invoke the on_connect triggers.
  */
 static void
 tx_process_connect(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
-	struct obuf *out = msg->connection->tx.p_obuf;
-	try {              /* connect. */
-		con->session = session_create(SESSION_TYPE_BINARY);
-		if (con->session == NULL)
-			diag_raise();
-		con->session->meta.connection = con;
-		tx_fiber_init(con->session, 0);
-		char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
-		/* TODO: dirty read from tx thread */
-		struct tt_uuid uuid = INSTANCE_UUID;
-		random_bytes(con->salt, IPROTO_SALT_SIZE);
-		greeting_encode(greeting, tarantool_version_id(), &uuid,
-				con->salt, IPROTO_SALT_SIZE);
-		obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
-		if (! rlist_empty(&session_on_connect)) {
-			if (session_run_on_connect_triggers(con->session) != 0)
-				diag_raise();
-		}
-		iproto_wpos_create(&msg->wpos, out);
-	} catch (Exception *e) {
+
+	if (con->state != IPROTO_CONNECTION_ALIVE &&
+	    con->state != IPROTO_CONNECTION_FAILED) {
+		/*
+		 * Connection might be closed from iproto already.
+		 * No action required in this case.
+		 */
+		return;
+	}
+
+	con->session = session_create(SESSION_TYPE_BINARY);
+	if (con->session == NULL) {
 		tx_reply_error(msg);
 		msg->close_connection = true;
+		return;
+	}
+	con->session->meta.connection = con;
+
+	tx_fiber_init(con->session, 0);
+	if (!rlist_empty(&session_on_connect)) {
+		if (session_run_on_connect_triggers(con->session) != 0) {
+			tx_reply_error(msg);
+			msg->close_connection = true;
+		}
 	}
 }
 
 /**
- * Send a response to connect to the client or close the
- * connection in case on_connect trigger failed.
+ * Try to send the client an error upon a failure. Start reading
+ * input in case the connection is inited and all good.
  */
 static void
-net_send_greeting(struct cmsg *m)
+net_finish_connect(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
-	if (msg->close_connection) {
+	if (con->state != IPROTO_CONNECTION_ALIVE &&
+	    con->state != IPROTO_CONNECTION_FAILED) {
+		/*
+		 * Connection might be closed from iproto already.
+		 */
+		iproto_msg_delete(msg);
+		return;
+	}
+	if (msg->close_connection || con->state == IPROTO_CONNECTION_FAILED) {
 		struct obuf *out = msg->wpos.obuf;
 		int64_t nwr = sio_writev(con->output.fd, out->iov,
 					 obuf_iovcnt(out));
@@ -1998,20 +2110,12 @@ net_send_greeting(struct cmsg *m)
 		return;
 	}
 	con->wend = msg->wpos;
-	/*
-	 * Connect is synchronous, so no one could have been
-	 * messing up with the connection while it was in
-	 * progress.
-	 */
-	assert(evio_has_fd(&con->output));
-	/* Handshake OK, start reading input. */
-	ev_feed_event(con->loop, &con->output, EV_WRITE);
 	iproto_msg_delete(msg);
 }
 
 static const struct cmsg_hop connect_route[] = {
 	{ tx_process_connect, &net_pipe },
-	{ net_send_greeting, NULL },
+	{ net_finish_connect, NULL },
 };
 
 /** }}} */
@@ -2042,6 +2146,7 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
 	cmsg_init(&msg->base, connect_route);
 	msg->p_ibuf = con->p_ibuf;
 	msg->wpos = con->wpos;
+	iproto_process_connect(con);
 	cpipe_push(&tx_pipe, &msg->base);
 	return 0;
 }
diff --git a/test/app/gh-4787-netbox-empty-errmsg.result b/test/app/gh-4787-netbox-empty-errmsg.result
index d30337a05..6389b27bc 100644
--- a/test/app/gh-4787-netbox-empty-errmsg.result
+++ b/test/app/gh-4787-netbox-empty-errmsg.result
@@ -38,24 +38,6 @@ req_during_auth()
  | - Connection is not established, state is "auth"
  | ...
 
--- Check the same for 'initial' state.
-ok, err = nil
- | ---
- | ...
-do                                                                              \
-    c = netbox.connect(box.cfg.listen, {wait_connected = false})                \
-    ok, err = pcall(c.call, c, 'echo', {}, {is_async = true})                   \
-end
- | ---
- | ...
-ok, err
- | ---
- | - false
- | - Connection is not established, state is "initial"
- | ...
-c:close()
- | ---
- | ...
 box.schema.user.drop('test')
  | ---
  | ...
diff --git a/test/app/gh-4787-netbox-empty-errmsg.test.lua b/test/app/gh-4787-netbox-empty-errmsg.test.lua
index 0eecaa1bf..55ea43f26 100755
--- a/test/app/gh-4787-netbox-empty-errmsg.test.lua
+++ b/test/app/gh-4787-netbox-empty-errmsg.test.lua
@@ -21,12 +21,4 @@ end
 
 req_during_auth()
 
--- Check the same for 'initial' state.
-ok, err = nil
-do                                                                              \
-    c = netbox.connect(box.cfg.listen, {wait_connected = false})                \
-    ok, err = pcall(c.call, c, 'echo', {}, {is_async = true})                   \
-end
-ok, err
-c:close()
 box.schema.user.drop('test')
-- 
2.17.1



More information about the Tarantool-patches mailing list