[Tarantool-patches] [PATCH v5] iproto: move greeting from tx thread to iproto

Ilya Kosarev i.kosarev at tarantool.org
Wed Dec 9 01:09:02 MSK 2020


On connection, an evio service callback is invoked to accept it.The
next step after acception was to process connection to tx thread
through cbus. This meant that any connection interaction involves
tx thread even before we get to decode what does the client want
from us. Consequently, a number of problems appears. The main one
is that we might get descriptor leak in case of unresponsive
tx thread (for example, when building secondary index).
This patch allows iproto to accept connection and send greeting by
itself. Thus the connection is being established and stays in
fetch_schema state while tx thread is unresponsive. It solves
descriptors leakage problem.

Closes #3776
---
Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto
Issue: https://github.com/tarantool/tarantool/issues/3776

Changes in v2:
 - docbot request provided
 - ChangeLog provided
 - net_send_msg() used instead of net_send_error() where needed
 - error cases made clear in iproto_msg_decode()
 - replaced net_check_connection() with iproto_connection_fail() predicate
 - improved tx session initialization fail processing to avoid extra tries
 - fixed obuf_dup() fail processing in iproto_process_connect()
 - added some comments
 - some comments style fixed

Changes in v3:
 - added braces to the confusing one-line if statement
 - updated ChangeLog
 - added description for iproto_msg_decode()
 - simplified error processing in iproto_msg_decode()

Changes in v4:
 - fixed msg->close_connection initialization bug
 - fixed false-sharing problem in iproto_connection struct
 - added needed assertion
 - added needed comments
 - names refactoring
 - simplified patch a bit: removed extra return value, extra code

Changes in v5:
 - reworked to avoid lazy initialization and extra changes

 src/box/iproto.cc                             | 147 ++++++++++++++----
 test/app/gh-4787-netbox-empty-errmsg.result   |  18 ---
 test/app/gh-4787-netbox-empty-errmsg.test.lua |   8 -
 3 files changed, 113 insertions(+), 60 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index b8f65e5eca..8c122dc58d 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)
 	iproto_resume();
 }
 
+static inline void
+iproto_connection_delete(struct iproto_connection *con);
+
 /**
  * A single global queue for all requests in all connections. All
  * requests from all connections are processed concurrently.
@@ -453,6 +456,8 @@ struct iproto_connection
 	 * meaningless.
 	 */
 	size_t parse_size;
+	/** Iproto buffer used to send greeting. */
+	struct iovec iproto_output_buf;
 	/**
 	 * Nubmer of active long polling requests that have already
 	 * discarded their arguments in order not to stall other
@@ -566,6 +571,7 @@ iproto_msg_new(struct iproto_connection *con)
 		return NULL;
 	}
 	msg->connection = con;
+	msg->close_connection = false;
 	rmean_collect(rmean_net, IPROTO_REQUESTS, 1);
 	return msg;
 }
@@ -1090,6 +1096,51 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher,
 	}
 }
 
+static int
+iproto_buf_flush(struct iproto_connection *con)
+{
+	int fd = con->output.fd;
+	ssize_t nwr = sio_writev(fd, &con->iproto_output_buf, 1);
+
+	if (nwr > 0) {
+		/* Count statistics */
+		rmean_collect(rmean_net, IPROTO_SENT, nwr);
+		return 1;
+	} else if (nwr < 0 && ! sio_wouldblock(errno)) {
+		diag_raise();
+	}
+
+	return nwr;
+}
+
+static void
+iproto_connection_on_greeting(ev_loop *loop, struct ev_io *watcher,
+			      int /* revents */)
+{
+	struct iproto_connection *con = (struct iproto_connection *) watcher->data;
+	try {
+		int rc;
+		while ((rc = iproto_buf_flush(con)) <= 0) {
+			if (rc != 0) {
+				ev_io_start(loop, &con->output);
+				return;
+			}
+		}
+		if (ev_is_active(&con->output))
+			ev_io_stop(con->loop, &con->output);
+		ev_io_init(&con->output, iproto_connection_on_output,
+			   con->output.fd, EV_WRITE);
+		if (con->input.cb != iproto_connection_on_input)
+			ev_io_init(&con->input, iproto_connection_on_input,
+				   con->input.fd, EV_READ);
+		else
+			ev_feed_event(loop, &con->input, EV_READ);
+	} catch (Exception *e) {
+		e->log();
+		con->state = IPROTO_CONNECTION_CLOSED;
+	}
+}
+
 static struct iproto_connection *
 iproto_connection_new(int fd)
 {
@@ -1101,8 +1152,8 @@ iproto_connection_new(int fd)
 	}
 	con->input.data = con->output.data = con;
 	con->loop = loop();
-	ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
-	ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
+	ev_io_init(&con->input, NULL, fd, EV_READ);
+	ev_io_init(&con->output, iproto_connection_on_greeting, fd, EV_WRITE);
 	ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
 	ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
 	obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
@@ -1378,13 +1429,14 @@ tx_process_destroy(struct cmsg *m)
 {
 	struct iproto_connection *con =
 		container_of(m, struct iproto_connection, destroy_msg);
+	assert(con->state == IPROTO_CONNECTION_DESTROYED);
 	if (con->session) {
 		session_destroy(con->session);
 		con->session = NULL; /* safety */
 	}
 	/*
-	 * Got to be done in iproto thread since
-	 * that's where the memory is allocated.
+	 * obuf is being destroyed in tx thread cause it is where
+	 * it was allocated.
 	 */
 	obuf_destroy(&con->obuf[0]);
 	obuf_destroy(&con->obuf[1]);
@@ -1936,50 +1988,72 @@ net_end_subscribe(struct cmsg *m)
 }
 
 /**
- * Handshake a connection: invoke the on-connect trigger
- * and possibly authenticate. Try to send the client an error
- * upon a failure.
+ * Handshake a connection: send greeting for it.
+ */
+static void
+iproto_process_connect(struct iproto_msg *msg)
+{
+	struct iproto_connection *con = msg->connection;
+	con->iproto_output_buf.iov_base = static_alloc(IPROTO_GREETING_SIZE);
+	con->iproto_output_buf.iov_len = IPROTO_GREETING_SIZE;
+	/*
+	 * INSTANCE_UUID is guaranteed to be inited before this moment.
+	 * We start listening either in local_recovery() or bootstrap().
+	 * The INSTANCE_UUID is ensured to be inited in the beginning of
+	 * both methods. In case of local_recovery() it is verified that
+	 * INSTANCE_UUID was read from the snapshot in memtx_engine_new().
+	 * In bootstrap() INSTANCE_UUID is either taken from the
+	 * instance_uuid box.cfg{} param or created on the spot.
+	 */
+	struct tt_uuid uuid = INSTANCE_UUID;
+	random_bytes(con->salt, IPROTO_SALT_SIZE);
+	greeting_encode((char *)con->iproto_output_buf.iov_base,
+			tarantool_version_id(), &uuid,
+			con->salt, IPROTO_SALT_SIZE);
+	assert(evio_has_fd(&con->output));
+	ev_feed_event(con->loop, &con->output, EV_WRITE);
+}
+
+/**
+ * Create the session and invoke the on_connect triggers.
  */
 static void
 tx_process_connect(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
-	struct obuf *out = msg->connection->tx.p_obuf;
-	try {              /* connect. */
-		con->session = session_create(SESSION_TYPE_BINARY);
-		if (con->session == NULL)
-			diag_raise();
-		con->session->meta.connection = con;
-		tx_fiber_init(con->session, 0);
-		char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
-		/* TODO: dirty read from tx thread */
-		struct tt_uuid uuid = INSTANCE_UUID;
-		random_bytes(con->salt, IPROTO_SALT_SIZE);
-		greeting_encode(greeting, tarantool_version_id(), &uuid,
-				con->salt, IPROTO_SALT_SIZE);
-		obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
-		if (! rlist_empty(&session_on_connect)) {
-			if (session_run_on_connect_triggers(con->session) != 0)
-				diag_raise();
-		}
-		iproto_wpos_create(&msg->wpos, out);
-	} catch (Exception *e) {
+	if (con->state != IPROTO_CONNECTION_ALIVE) {
+		msg->close_connection = true;
+		return;
+	}
+
+	con->session = session_create(SESSION_TYPE_BINARY);
+	if (con->session == NULL) {
 		tx_reply_error(msg);
 		msg->close_connection = true;
+		return;
+	}
+	con->session->meta.connection = con;
+
+	tx_fiber_init(con->session, 0);
+	if (! rlist_empty(&session_on_connect)) {
+		if (session_run_on_connect_triggers(con->session) != 0) {
+			tx_reply_error(msg);
+			msg->close_connection = true;
+		}
 	}
 }
 
 /**
- * Send a response to connect to the client or close the
- * connection in case on_connect trigger failed.
+ * Try to send the client an error upon a failure. Start reading
+ * input in case the connection is inited and all good.
  */
 static void
-net_send_greeting(struct cmsg *m)
+net_finish_connect(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
-	if (msg->close_connection) {
+	if (msg->close_connection || con->state != IPROTO_CONNECTION_ALIVE) {
 		struct obuf *out = msg->wpos.obuf;
 		int64_t nwr = sio_writev(con->output.fd, out->iov,
 					 obuf_iovcnt(out));
@@ -1991,6 +2065,7 @@ net_send_greeting(struct cmsg *m)
 			diag_log();
 		}
 		assert(iproto_connection_is_idle(con));
+		con->state = IPROTO_CONNECTION_ALIVE;
 		iproto_connection_close(con);
 		iproto_msg_delete(msg);
 		return;
@@ -2003,13 +2078,17 @@ net_send_greeting(struct cmsg *m)
 	 */
 	assert(evio_has_fd(&con->output));
 	/* Handshake OK, start reading input. */
-	ev_feed_event(con->loop, &con->output, EV_WRITE);
+	if (con->input.cb != iproto_connection_on_input)
+		ev_io_init(&con->input, iproto_connection_on_input,
+			   con->input.fd, EV_READ);
+	else
+		ev_feed_event(con->loop, &con->input, EV_READ);
 	iproto_msg_delete(msg);
 }
 
 static const struct cmsg_hop connect_route[] = {
 	{ tx_process_connect, &net_pipe },
-	{ net_send_greeting, NULL },
+	{ net_finish_connect, NULL },
 };
 
 /** }}} */
@@ -2040,7 +2119,7 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
 	cmsg_init(&msg->base, connect_route);
 	msg->p_ibuf = con->p_ibuf;
 	msg->wpos = con->wpos;
-	msg->close_connection = false;
+	iproto_process_connect(msg);
 	cpipe_push(&tx_pipe, &msg->base);
 	return 0;
 }
diff --git a/test/app/gh-4787-netbox-empty-errmsg.result b/test/app/gh-4787-netbox-empty-errmsg.result
index d30337a050..6389b27bc8 100644
--- a/test/app/gh-4787-netbox-empty-errmsg.result
+++ b/test/app/gh-4787-netbox-empty-errmsg.result
@@ -38,24 +38,6 @@ req_during_auth()
  | - Connection is not established, state is "auth"
  | ...
 
--- Check the same for 'initial' state.
-ok, err = nil
- | ---
- | ...
-do                                                                              \
-    c = netbox.connect(box.cfg.listen, {wait_connected = false})                \
-    ok, err = pcall(c.call, c, 'echo', {}, {is_async = true})                   \
-end
- | ---
- | ...
-ok, err
- | ---
- | - false
- | - Connection is not established, state is "initial"
- | ...
-c:close()
- | ---
- | ...
 box.schema.user.drop('test')
  | ---
  | ...
diff --git a/test/app/gh-4787-netbox-empty-errmsg.test.lua b/test/app/gh-4787-netbox-empty-errmsg.test.lua
index 0eecaa1bf0..55ea43f26f 100755
--- a/test/app/gh-4787-netbox-empty-errmsg.test.lua
+++ b/test/app/gh-4787-netbox-empty-errmsg.test.lua
@@ -21,12 +21,4 @@ end
 
 req_during_auth()
 
--- Check the same for 'initial' state.
-ok, err = nil
-do                                                                              \
-    c = netbox.connect(box.cfg.listen, {wait_connected = false})                \
-    ok, err = pcall(c.call, c, 'echo', {}, {is_async = true})                   \
-end
-ok, err
-c:close()
 box.schema.user.drop('test')
-- 
2.17.1



More information about the Tarantool-patches mailing list