Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH] iproto: make iproto thread more independent from tx
@ 2020-08-14 10:47 Ilya Kosarev
  2020-09-14 16:17 ` Leonid Vasiliev
  0 siblings, 1 reply; 3+ messages in thread
From: Ilya Kosarev @ 2020-08-14 10:47 UTC (permalink / raw)
  To: alyapunov; +Cc: tarantool-patches

On connection, an evio service callback is invoked to accept it. The
next step after acception was to process connection to tx thread
through cbus. This meant that any connection interaction involves
tx thread even before we get to decode what does the client want
from us. Consequently, a number of problems appears. The main one
is that we might get descriptor leak in case of unresponsive tx thread
(for example, when building secondary index). There are some other
cases where we might not want to spend precious tx time to process
the connection in case iproto can do it all alone.
This patch allows iproto to accept connection and send greeting by
itself. The connection is initialized in tx thread when the real
request comes through iproto_msg_decode(). In case request type was not
recognized we can also send reply with an error without using tx. It is
planned to add more iproto logic to prevent extra interaction with
tx thread. This patch already to some extent solves descriptors leakage
problem as far as connection establishes and stays in fetch_schema
state while tx thread is unresponsive.
The other user visible change is that on_connect triggers won't run on
connections that don't provide any input, as reflected in
bad_trigger.test.py.

Part of #3776
---
Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto
Issue: https://github.com/tarantool/tarantool/issues/3776

 src/box/iproto.cc               | 287 ++++++++++++++++++++------------
 test/box-py/bad_trigger.result  |   1 +
 test/box-py/bad_trigger.test.py |  22 ++-
 3 files changed, 202 insertions(+), 108 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index b8f65e5eca..a027d15c1d 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -245,7 +245,7 @@ iproto_msg_new(struct iproto_connection *con);
 static void
 iproto_resume(void);
 
-static void
+static int
 iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		  bool *stop_input);
 
@@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)
 	iproto_resume();
 }
 
+static inline void
+iproto_connection_delete(struct iproto_connection *con);
+
 /**
  * A single global queue for all requests in all connections. All
  * requests from all connections are processed concurrently.
@@ -280,6 +283,11 @@ static struct cord net_cord;
  * in the tx thread.
  */
 static struct slab_cache net_slabc;
+/**
+ * Slab cache used for allocating memory for output network buffers
+ * in the iproto thread.
+ */
+static struct slab_cache iproto_slabc;
 
 struct rmean *rmean_net;
 
@@ -298,6 +306,9 @@ const char *rmean_net_strings[IPROTO_LAST] = {
 	"REQUESTS",
 };
 
+static int
+tx_init_connect(struct iproto_msg *msg);
+
 static void
 tx_process_destroy(struct cmsg *m);
 
@@ -650,7 +661,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con)
 	 * other parts of the connection.
 	 */
 	con->state = IPROTO_CONNECTION_DESTROYED;
-	cpipe_push(&tx_pipe, &con->destroy_msg);
+	if (con->session != NULL)
+		cpipe_push(&tx_pipe, &con->destroy_msg);
+	else {
+		/*
+		* In case session was not created we can safely destroy
+		* not involving tx thread. Thus we also need to destroy
+		* obuf, which still belongs to iproto thread.
+		*/
+		obuf_destroy(&con->obuf[0]);
+		obuf_destroy(&con->obuf[1]);
+		iproto_connection_delete(con);
+	}
 }
 
 /**
@@ -677,9 +699,18 @@ iproto_connection_close(struct iproto_connection *con)
 		 * is done only once.
 		 */
 		con->p_ibuf->wpos -= con->parse_size;
-		cpipe_push(&tx_pipe, &con->disconnect_msg);
 		assert(con->state == IPROTO_CONNECTION_ALIVE);
 		con->state = IPROTO_CONNECTION_CLOSED;
+		rlist_del(&con->in_stop_list);
+		if (con->session != NULL)
+			cpipe_push(&tx_pipe, &con->disconnect_msg);
+		else
+			/*
+			* In case session was not created we can safely
+			* try to start destroy not involving tx thread.
+			*/
+			iproto_connection_try_to_start_destroy(con);
+		return;
 	} else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
 		iproto_connection_try_to_start_destroy(con);
 	} else {
@@ -809,6 +840,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 	assert(rlist_empty(&con->in_stop_list));
 	int n_requests = 0;
 	bool stop_input = false;
+	bool obuf_in_iproto = (con->session == NULL);
 	const char *errmsg;
 	while (con->parse_size != 0 && !stop_input) {
 		if (iproto_check_msg_max()) {
@@ -853,12 +885,20 @@ err_msgpack:
 
 		msg->len = reqend - reqstart; /* total request length */
 
-		iproto_msg_decode(msg, &pos, reqend, &stop_input);
-		/*
-		 * This can't throw, but should not be
-		 * done in case of exception.
-		 */
-		cpipe_push_input(&tx_pipe, &msg->base);
+		if (iproto_msg_decode(msg, &pos, reqend, &stop_input) == 0) {
+			if (obuf_in_iproto) {
+				/*
+				* If session was not created yet and obuf is
+				* still in iproto we need to destroy it. New
+				* one will be created in tx thread if needed.
+				*/
+				obuf_destroy(&con->obuf[0]);
+				obuf_destroy(&con->obuf[1]);
+				obuf_in_iproto = false;
+			}
+			cpipe_push_input(&tx_pipe, &msg->base);
+		}
+
 		n_requests++;
 		/* Request is parsed */
 		assert(reqend > reqstart);
@@ -1105,8 +1145,8 @@ iproto_connection_new(int fd)
 	ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
 	ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
 	ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
-	obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
-	obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
+	obuf_create(&con->obuf[0], &iproto_slabc, iproto_readahead);
+	obuf_create(&con->obuf[1], &iproto_slabc, iproto_readahead);
 	con->p_ibuf = &con->ibuf[0];
 	con->tx.p_obuf = &con->obuf[0];
 	iproto_wpos_create(&con->wpos, con->tx.p_obuf);
@@ -1134,10 +1174,6 @@ iproto_connection_delete(struct iproto_connection *con)
 	assert(!evio_has_fd(&con->input));
 	assert(con->session == NULL);
 	assert(con->state == IPROTO_CONNECTION_DESTROYED);
-	/*
-	 * The output buffers must have been deleted
-	 * in tx thread.
-	 */
 	ibuf_destroy(&con->ibuf[0]);
 	ibuf_destroy(&con->ibuf[1]);
 	assert(con->obuf[0].pos == 0 &&
@@ -1172,6 +1208,9 @@ tx_reply_error(struct iproto_msg *msg);
 static void
 tx_reply_iproto_error(struct cmsg *m);
 
+static void
+tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos);
+
 static void
 net_send_msg(struct cmsg *msg);
 
@@ -1244,7 +1283,7 @@ static const struct cmsg_hop error_route[] = {
 	{ net_send_error, NULL },
 };
 
-static void
+static int
 iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		  bool *stop_input)
 {
@@ -1314,13 +1353,27 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 			 (uint32_t) type);
 		goto error;
 	}
-	return;
+	return 0;
 error:
 	/** Log and send the error. */
 	diag_log();
 	diag_create(&msg->diag);
 	diag_move(&fiber()->diag, &msg->diag);
-	cmsg_init(&msg->base, error_route);
+	if (msg->connection->session != NULL) {
+		cmsg_init(&msg->base, error_route);
+		return 0;
+	}
+	/*
+	* In case session was not created we can process error path
+	* without tx thread.
+	*/
+	tx_accept_wpos(msg->connection, &msg->wpos);
+	struct obuf *out = msg->connection->tx.p_obuf;
+	iproto_reply_error(out, diag_last_error(&msg->diag),
+			   msg->header.sync, ::schema_version);
+	iproto_wpos_create(&msg->wpos, out);
+	net_send_error(&(msg->base));
+	return -1;
 }
 
 static void
@@ -1382,10 +1435,6 @@ tx_process_destroy(struct cmsg *m)
 		session_destroy(con->session);
 		con->session = NULL; /* safety */
 	}
-	/*
-	 * Got to be done in iproto thread since
-	 * that's where the memory is allocated.
-	 */
 	obuf_destroy(&con->obuf[0]);
 	obuf_destroy(&con->obuf[1]);
 }
@@ -1478,13 +1527,21 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
 	}
 }
 
-static inline struct iproto_msg *
-tx_accept_msg(struct cmsg *m)
+static inline int
+tx_accept_msg(struct cmsg *m, struct iproto_msg **msg)
 {
-	struct iproto_msg *msg = (struct iproto_msg *) m;
-	tx_accept_wpos(msg->connection, &msg->wpos);
-	tx_fiber_init(msg->connection->session, msg->header.sync);
-	return msg;
+	*msg = (struct iproto_msg *) m;
+	/*
+	* In case session was not created we need to init connection in tx and
+	* create it here.
+	*/
+	if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) {
+		(*msg)->close_connection = true;
+		return -1;
+	}
+	tx_accept_wpos((*msg)->connection, &(*msg)->wpos);
+	tx_fiber_init((*msg)->connection->session, (*msg)->header.sync);
+	return 0;
 }
 
 /**
@@ -1507,7 +1564,8 @@ tx_reply_error(struct iproto_msg *msg)
 static void
 tx_reply_iproto_error(struct cmsg *m)
 {
-	struct iproto_msg *msg = tx_accept_msg(m);
+	struct iproto_msg *msg;
+	tx_accept_msg(m, &msg);
 	struct obuf *out = msg->connection->tx.p_obuf;
 	iproto_reply_error(out, diag_last_error(&msg->diag),
 			   msg->header.sync, ::schema_version);
@@ -1527,7 +1585,9 @@ tx_inject_delay(void)
 static void
 tx_process1(struct cmsg *m)
 {
-	struct iproto_msg *msg = tx_accept_msg(m);
+	struct iproto_msg *msg;
+	if (tx_accept_msg(m, &msg) != 0)
+		goto error;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1553,17 +1613,20 @@ error:
 static void
 tx_process_select(struct cmsg *m)
 {
-	struct iproto_msg *msg = tx_accept_msg(m);
+	struct iproto_msg *msg;
+	struct request *req;
 	struct obuf *out;
 	struct obuf_svp svp;
 	struct port port;
 	int count;
 	int rc;
-	struct request *req = &msg->dml;
+	if (tx_accept_msg(m, &msg) != 0)
+		goto error;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
 	tx_inject_delay();
+	req = &msg->dml;
 	rc = box_select(req->space_id, req->index_id,
 			req->iterator, req->offset, req->limit,
 			req->key, req->key_end, &port);
@@ -1607,7 +1670,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event)
 static void
 tx_process_call(struct cmsg *m)
 {
-	struct iproto_msg *msg = tx_accept_msg(m);
+	struct iproto_msg *msg;
+	if (tx_accept_msg(m, &msg) != 0)
+		goto error;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1686,13 +1751,15 @@ error:
 static void
 tx_process_misc(struct cmsg *m)
 {
-	struct iproto_msg *msg = tx_accept_msg(m);
-	struct iproto_connection *con = msg->connection;
-	struct obuf *out = con->tx.p_obuf;
+	struct iproto_msg *msg;
+	if (tx_accept_msg(m, &msg) != 0)
+		goto error;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
 	try {
+		struct iproto_connection *con = msg->connection;
+		struct obuf *out = con->tx.p_obuf;
 		struct ballot ballot;
 		switch (msg->header.type) {
 		case IPROTO_AUTH:
@@ -1729,7 +1796,7 @@ error:
 static void
 tx_process_sql(struct cmsg *m)
 {
-	struct iproto_msg *msg = tx_accept_msg(m);
+	struct iproto_msg *msg;
 	struct obuf *out;
 	struct port port;
 	struct sql_bind *bind = NULL;
@@ -1738,6 +1805,8 @@ tx_process_sql(struct cmsg *m)
 	uint32_t len;
 	bool is_unprepare = false;
 
+	if (tx_accept_msg(m, &msg) != 0)
+		goto error;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 	assert(msg->header.type == IPROTO_EXECUTE ||
@@ -1825,7 +1894,11 @@ error:
 static void
 tx_process_replication(struct cmsg *m)
 {
-	struct iproto_msg *msg = tx_accept_msg(m);
+	struct iproto_msg *msg;
+	if (tx_accept_msg(m, &msg) != 0) {
+		tx_reply_error(msg);
+		return;
+	}
 	struct iproto_connection *con = msg->connection;
 	struct ev_io io;
 	coio_create(&io, con->input.fd);
@@ -1865,6 +1938,29 @@ tx_process_replication(struct cmsg *m)
 	}
 }
 
+static int
+net_check_connection(struct iproto_msg *msg)
+{
+	if (!msg->close_connection)
+		return 0;
+
+	struct iproto_connection *con = msg->connection;
+	struct obuf *out = msg->wpos.obuf;
+	int64_t nwr = sio_writev(con->output.fd, out->iov,
+				 obuf_iovcnt(out));
+
+	if (nwr > 0) {
+		/* Count statistics. */
+		rmean_collect(rmean_net, IPROTO_SENT, nwr);
+	} else if (nwr < 0 && ! sio_wouldblock(errno)) {
+		diag_log();
+	}
+	assert(iproto_connection_is_idle(con));
+	iproto_connection_close(con);
+	iproto_msg_delete(msg);
+	return -1;
+}
+
 static void
 net_send_msg(struct cmsg *m)
 {
@@ -1881,6 +1977,9 @@ net_send_msg(struct cmsg *m)
 	}
 	con->wend = msg->wpos;
 
+	if (net_check_connection(msg) != 0)
+		return;
+
 	if (evio_has_fd(&con->output)) {
 		if (! ev_is_active(&con->output))
 			ev_feed_event(con->loop, &con->output, EV_WRITE);
@@ -1910,6 +2009,10 @@ net_end_join(struct cmsg *m)
 	struct iproto_connection *con = msg->connection;
 
 	msg->p_ibuf->rpos += msg->len;
+
+	if (net_check_connection(msg) != 0)
+		return;
+
 	iproto_msg_delete(msg);
 
 	assert(! ev_is_active(&con->input));
@@ -1928,6 +2031,10 @@ net_end_subscribe(struct cmsg *m)
 	struct iproto_connection *con = msg->connection;
 
 	msg->p_ibuf->rpos += msg->len;
+
+	if (net_check_connection(msg) != 0)
+		return;
+
 	iproto_msg_delete(msg);
 
 	assert(! ev_is_active(&con->input));
@@ -1936,81 +2043,49 @@ net_end_subscribe(struct cmsg *m)
 }
 
 /**
- * Handshake a connection: invoke the on-connect trigger
- * and possibly authenticate. Try to send the client an error
- * upon a failure.
+ * Handshake a connection: prepare greeting for it.
  */
 static void
-tx_process_connect(struct cmsg *m)
+iproto_process_connect(struct iproto_msg *msg)
 {
-	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
 	struct obuf *out = msg->connection->tx.p_obuf;
-	try {              /* connect. */
-		con->session = session_create(SESSION_TYPE_BINARY);
-		if (con->session == NULL)
-			diag_raise();
-		con->session->meta.connection = con;
-		tx_fiber_init(con->session, 0);
-		char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
-		/* TODO: dirty read from tx thread */
-		struct tt_uuid uuid = INSTANCE_UUID;
-		random_bytes(con->salt, IPROTO_SALT_SIZE);
-		greeting_encode(greeting, tarantool_version_id(), &uuid,
-				con->salt, IPROTO_SALT_SIZE);
-		obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
-		if (! rlist_empty(&session_on_connect)) {
-			if (session_run_on_connect_triggers(con->session) != 0)
-				diag_raise();
-		}
-		iproto_wpos_create(&msg->wpos, out);
-	} catch (Exception *e) {
-		tx_reply_error(msg);
-		msg->close_connection = true;
-	}
-}
-
-/**
- * Send a response to connect to the client or close the
- * connection in case on_connect trigger failed.
- */
-static void
-net_send_greeting(struct cmsg *m)
-{
-	struct iproto_msg *msg = (struct iproto_msg *) m;
-	struct iproto_connection *con = msg->connection;
-	if (msg->close_connection) {
-		struct obuf *out = msg->wpos.obuf;
-		int64_t nwr = sio_writev(con->output.fd, out->iov,
-					 obuf_iovcnt(out));
-
-		if (nwr > 0) {
-			/* Count statistics. */
-			rmean_collect(rmean_net, IPROTO_SENT, nwr);
-		} else if (nwr < 0 && ! sio_wouldblock(errno)) {
-			diag_log();
-		}
-		assert(iproto_connection_is_idle(con));
-		iproto_connection_close(con);
-		iproto_msg_delete(msg);
-		return;
-	}
+	char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
+	struct tt_uuid uuid = INSTANCE_UUID;
+	random_bytes(con->salt, IPROTO_SALT_SIZE);
+	greeting_encode(greeting, tarantool_version_id(), &uuid,
+			con->salt, IPROTO_SALT_SIZE);
+	obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
+	iproto_wpos_create(&msg->wpos, out);
 	con->wend = msg->wpos;
-	/*
-	 * Connect is synchronous, so no one could have been
-	 * messing up with the connection while it was in
-	 * progress.
-	 */
 	assert(evio_has_fd(&con->output));
-	/* Handshake OK, start reading input. */
 	ev_feed_event(con->loop, &con->output, EV_WRITE);
 	iproto_msg_delete(msg);
 }
 
-static const struct cmsg_hop connect_route[] = {
-	{ tx_process_connect, &net_pipe },
-	{ net_send_greeting, NULL },
-};
+static int
+tx_init_connect(struct iproto_msg *msg)
+{
+	struct iproto_connection *con = msg->connection;
+	obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
+	obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
+	con->tx.p_obuf = &con->obuf[0];
+	iproto_wpos_create(&con->wpos, con->tx.p_obuf);
+	iproto_wpos_create(&con->wend, con->tx.p_obuf);
+
+	con->session = session_create(SESSION_TYPE_BINARY);
+	if (con->session == NULL)
+		return -1;
+	con->session->meta.connection = con;
+
+	tx_fiber_init(con->session, 0);
+	if (! rlist_empty(&session_on_connect)) {
+		if (session_run_on_connect_triggers(con->session) != 0)
+			return -1;
+	}
+
+	return 0;
+}
 
 /** }}} */
 
@@ -2037,11 +2112,10 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
 		mempool_free(&iproto_connection_pool, con);
 		return -1;
 	}
-	cmsg_init(&msg->base, connect_route);
 	msg->p_ibuf = con->p_ibuf;
 	msg->wpos = con->wpos;
 	msg->close_connection = false;
-	cpipe_push(&tx_pipe, &msg->base);
+	iproto_process_connect(msg);
 	return 0;
 }
 
@@ -2054,6 +2128,8 @@ static struct evio_service binary; /* iproto binary listener */
 static int
 net_cord_f(va_list /* ap */)
 {
+	slab_cache_create(&iproto_slabc, &runtime);
+
 	mempool_create(&iproto_msg_pool, &cord()->slabc,
 		       sizeof(struct iproto_msg));
 	mempool_create(&iproto_connection_pool, &cord()->slabc,
@@ -2297,7 +2373,8 @@ iproto_listen(const char *uri)
 size_t
 iproto_mem_used(void)
 {
-	return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc);
+	return slab_cache_used(&net_cord.slabc)
+	     + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc);
 }
 
 size_t
diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result
index 5d064b7648..bfa9c2b759 100644
--- a/test/box-py/bad_trigger.result
+++ b/test/box-py/bad_trigger.result
@@ -14,6 +14,7 @@ type(box.session.on_connect(f1))
 - function
 ...
 greeting:  True
+Nothing to read yet:  Resource temporarily unavailable
 fixheader:  True
 error code 32
 error message:  [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value)
diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py
index 7d200b9218..7f45f5e713 100644
--- a/test/box-py/bad_trigger.test.py
+++ b/test/box-py/bad_trigger.test.py
@@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection
 from lib.tarantool_connection import TarantoolConnection
 from tarantool import NetworkError
 from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \
-    REQUEST_TYPE_ERROR
+    REQUEST_TYPE_ERROR, REQUEST_TYPE_PING
 import socket
 import msgpack
 
@@ -26,9 +26,25 @@ s = conn.socket
 # Read greeting
 print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE
 
-# Read error packet
+# Check soscket
 IPROTO_FIXHEADER_SIZE = 5
-fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
+s.setblocking(False)
+fixheader = None
+try:
+    fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
+except socket.error as err:
+    print 'Nothing to read yet:', str(err).split(']')[1]
+else:
+    print 'Received fixheader'
+s.setblocking(True)
+
+# Send ping
+query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING })
+s.send(msgpack.dumps(len(query)) + query)
+
+# Read error packet
+if not fixheader:
+    fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
 print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE
 unpacker.feed(fixheader)
 packet_len = unpacker.unpack()
-- 
2.17.1

^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [Tarantool-patches] [PATCH] iproto: make iproto thread more independent from tx
  2020-08-14 10:47 [Tarantool-patches] [PATCH] iproto: make iproto thread more independent from tx Ilya Kosarev
@ 2020-09-14 16:17 ` Leonid Vasiliev
  2020-09-18 13:10   ` Ilya Kosarev
  0 siblings, 1 reply; 3+ messages in thread
From: Leonid Vasiliev @ 2020-09-14 16:17 UTC (permalink / raw)
  To: Ilya Kosarev, alyapunov; +Cc: tarantool-patches


Hi! Thank you for the patch.
Look some comments/questions below:

- Please add @ChangeLog
- Look like "@TarantoolBot document" needs to be added to update
https://www.tarantool.io/en/doc/2.5/reference/reference_lua/net_box/#net-box-on-connect
- How do you check the problem is gone? Any stress test?

On 14.08.2020 13:47, Ilya Kosarev wrote:
> On connection, an evio service callback is invoked to accept it. The
> next step after acception was to process connection to tx thread
> through cbus. This meant that any connection interaction involves
> tx thread even before we get to decode what does the client want
> from us. Consequently, a number of problems appears. The main one
> is that we might get descriptor leak in case of unresponsive tx thread
> (for example, when building secondary index). There are some other
> cases where we might not want to spend precious tx time to process
> the connection in case iproto can do it all alone.
> This patch allows iproto to accept connection and send greeting by
> itself. The connection is initialized in tx thread when the real
> request comes through iproto_msg_decode(). In case request type was not
> recognized we can also send reply with an error without using tx. It is
> planned to add more iproto logic to prevent extra interaction with
> tx thread. This patch already to some extent solves descriptors leakage
> problem as far as connection establishes and stays in fetch_schema
> state while tx thread is unresponsive.
> The other user visible change is that on_connect triggers won't run on
> connections that don't provide any input, as reflected in
> bad_trigger.test.py.

- Is it ok from exploitation point of view? Did you have a conversation
with Mons?

> 
> Part of #3776
> ---
> Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto
> Issue: https://github.com/tarantool/tarantool/issues/3776
> 
>   src/box/iproto.cc               | 287 ++++++++++++++++++++------------
>   test/box-py/bad_trigger.result  |   1 +
>   test/box-py/bad_trigger.test.py |  22 ++-
>   3 files changed, 202 insertions(+), 108 deletions(-)
> 
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index b8f65e5eca..a027d15c1d 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
- "on_connect trigger must be processed before any other request on this
connection." (from comment) - It looks like it's worth clarifying this
guarantee.

> @@ -245,7 +245,7 @@ iproto_msg_new(struct iproto_connection *con);
>   static void
>   iproto_resume(void);
>   
> -static void
> +static int
>   iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>   		  bool *stop_input);
>   
> @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)
>   	iproto_resume();
>   }
>   
> +static inline void
> +iproto_connection_delete(struct iproto_connection *con);
> +
>   /**
>    * A single global queue for all requests in all connections. All
>    * requests from all connections are processed concurrently.
> @@ -280,6 +283,11 @@ static struct cord net_cord;
>    * in the tx thread.
>    */
>   static struct slab_cache net_slabc;
> +/**
> + * Slab cache used for allocating memory for output network buffers
> + * in the iproto thread.
> + */
> +static struct slab_cache iproto_slabc;
>   
>   struct rmean *rmean_net;
>   
> @@ -298,6 +306,9 @@ const char *rmean_net_strings[IPROTO_LAST] = {
>   	"REQUESTS",
>   };
>   
> +static int
> +tx_init_connect(struct iproto_msg *msg);
> +
>   static void
>   tx_process_destroy(struct cmsg *m);
>   
> @@ -650,7 +661,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con)
>   	 * other parts of the connection.
>   	 */
>   	con->state = IPROTO_CONNECTION_DESTROYED;
> -	cpipe_push(&tx_pipe, &con->destroy_msg);
> +	if (con->session != NULL)
> +		cpipe_push(&tx_pipe, &con->destroy_msg);
> +	else {
> +		/*
> +		* In case session was not created we can safely destroy
> +		* not involving tx thread. Thus we also need to destroy
> +		* obuf, which still belongs to iproto thread.
> +		*/
> +		obuf_destroy(&con->obuf[0]);
> +		obuf_destroy(&con->obuf[1]);
> +		iproto_connection_delete(con);
> +	}
>   }
>   
>   /**
> @@ -677,9 +699,18 @@ iproto_connection_close(struct iproto_connection *con)
>   		 * is done only once.
>   		 */
>   		con->p_ibuf->wpos -= con->parse_size;
> -		cpipe_push(&tx_pipe, &con->disconnect_msg);
>   		assert(con->state == IPROTO_CONNECTION_ALIVE);
>   		con->state = IPROTO_CONNECTION_CLOSED;
> +		rlist_del(&con->in_stop_list);

- Is done below(rlist_del()). Or is there some kind of trick?

> +		if (con->session != NULL)
> +			cpipe_push(&tx_pipe, &con->disconnect_msg);
> +		else
> +			/*
> +			* In case session was not created we can safely
> +			* try to start destroy not involving tx thread.
> +			*/
> +			iproto_connection_try_to_start_destroy(con);
> +		return;
>   	} else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
>   		iproto_connection_try_to_start_destroy(con);
>   	} else {
> @@ -809,6 +840,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>   	assert(rlist_empty(&con->in_stop_list));
>   	int n_requests = 0;
>   	bool stop_input = false;
> +	bool obuf_in_iproto = (con->session == NULL);
>   	const char *errmsg;
>   	while (con->parse_size != 0 && !stop_input) {
>   		if (iproto_check_msg_max()) {
> @@ -853,12 +885,20 @@ err_msgpack:
>   
>   		msg->len = reqend - reqstart; /* total request length */
>   
> -		iproto_msg_decode(msg, &pos, reqend, &stop_input);
> -		/*
> -		 * This can't throw, but should not be
> -		 * done in case of exception.
> -		 */
> -		cpipe_push_input(&tx_pipe, &msg->base);
> +		if (iproto_msg_decode(msg, &pos, reqend, &stop_input) == 0) {
> +			if (obuf_in_iproto) {
> +				/*
> +				* If session was not created yet and obuf is
> +				* still in iproto we need to destroy it. New
> +				* one will be created in tx thread if needed.
> +				*/
> +				obuf_destroy(&con->obuf[0]);
> +				obuf_destroy(&con->obuf[1]);
> +				obuf_in_iproto = false; > +			}
> +			cpipe_push_input(&tx_pipe, &msg->base);
> +		}
> +
>   		n_requests++;
>   		/* Request is parsed */
>   		assert(reqend > reqstart);
> @@ -1105,8 +1145,8 @@ iproto_connection_new(int fd)
>   	ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
>   	ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
>   	ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
> -	obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
> -	obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
> +	obuf_create(&con->obuf[0], &iproto_slabc, iproto_readahead);
> +	obuf_create(&con->obuf[1], &iproto_slabc, iproto_readahead);
>   	con->p_ibuf = &con->ibuf[0];
>   	con->tx.p_obuf = &con->obuf[0];
>   	iproto_wpos_create(&con->wpos, con->tx.p_obuf);
> @@ -1134,10 +1174,6 @@ iproto_connection_delete(struct iproto_connection *con)
>   	assert(!evio_has_fd(&con->input));
>   	assert(con->session == NULL);
>   	assert(con->state == IPROTO_CONNECTION_DESTROYED);
> -	/*
> -	 * The output buffers must have been deleted
> -	 * in tx thread.
> -	 */
>   	ibuf_destroy(&con->ibuf[0]);
>   	ibuf_destroy(&con->ibuf[1]);
>   	assert(con->obuf[0].pos == 0 &&
> @@ -1172,6 +1208,9 @@ tx_reply_error(struct iproto_msg *msg);
>   static void
>   tx_reply_iproto_error(struct cmsg *m);
>   
> +static void
> +tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos);
> +
>   static void
>   net_send_msg(struct cmsg *msg);
>   
> @@ -1244,7 +1283,7 @@ static const struct cmsg_hop error_route[] = {
>   	{ net_send_error, NULL },
>   };
>   
> -static void
> +static int
>   iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>   		  bool *stop_input)
>   {
> @@ -1314,13 +1353,27 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>   			 (uint32_t) type);
>   		goto error;
>   	}
> -	return;
> +	return 0;
>   error:
>   	/** Log and send the error. */
>   	diag_log();
>   	diag_create(&msg->diag);
>   	diag_move(&fiber()->diag, &msg->diag);

- Unneeded in case session == NULL. Use net_send_msg() instead of
net_send_error().

> -	cmsg_init(&msg->base, error_route);
> +	if (msg->connection->session != NULL) {
> +		cmsg_init(&msg->base, error_route);
> +		return 0;

- May be return -1 (decode fail). What does the return value mean
otherwise?

> +	}
> +	/*
> +	* In case session was not created we can process error path
> +	* without tx thread.
> +	*/
> +	tx_accept_wpos(msg->connection, &msg->wpos);
> +	struct obuf *out = msg->connection->tx.p_obuf;
> +	iproto_reply_error(out, diag_last_error(&msg->diag),
> +			   msg->header.sync, ::schema_version);
> +	iproto_wpos_create(&msg->wpos, out);
> +	net_send_error(&(msg->base));
> +	return -1;
>   }
>   
>   static void
> @@ -1382,10 +1435,6 @@ tx_process_destroy(struct cmsg *m)
>   		session_destroy(con->session);
>   		con->session = NULL; /* safety */
>   	}
> -	/*
> -	 * Got to be done in iproto thread since
> -	 * that's where the memory is allocated.
> -	 */
>   	obuf_destroy(&con->obuf[0]);
>   	obuf_destroy(&con->obuf[1]);
>   }
> @@ -1478,13 +1527,21 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
>   	}
>   }
>   
> -static inline struct iproto_msg *
> -tx_accept_msg(struct cmsg *m)
> +static inline int
> +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg)
>   {
> -	struct iproto_msg *msg = (struct iproto_msg *) m;
> -	tx_accept_wpos(msg->connection, &msg->wpos);
> -	tx_fiber_init(msg->connection->session, msg->header.sync);
> -	return msg;
> +	*msg = (struct iproto_msg *) m;
> +	/*
> +	* In case session was not created we need to init connection in tx and
> +	* create it here.
> +	*/
> +	if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) {
> +		(*msg)->close_connection = true;
> +		return -1;
> +	}
> +	tx_accept_wpos((*msg)->connection, &(*msg)->wpos);
> +	tx_fiber_init((*msg)->connection->session, (*msg)->header.sync);
> +	return 0;
>   }
>   
>   /**
> @@ -1507,7 +1564,8 @@ tx_reply_error(struct iproto_msg *msg)
>   static void
>   tx_reply_iproto_error(struct cmsg *m)
>   {
> -	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct iproto_msg *msg;
> +	tx_accept_msg(m, &msg);

- Please add a comment why the return value check is unnecessary.

>   	struct obuf *out = msg->connection->tx.p_obuf;
>   	iproto_reply_error(out, diag_last_error(&msg->diag),
>   			   msg->header.sync, ::schema_version);
> @@ -1527,7 +1585,9 @@ tx_inject_delay(void)
>   static void
>   tx_process1(struct cmsg *m)
>   {
> -	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct iproto_msg *msg;
> +	if (tx_accept_msg(m, &msg) != 0)
> +		goto error;
>   	if (tx_check_schema(msg->header.schema_version))
>   		goto error;
>   
> @@ -1553,17 +1613,20 @@ error:
>   static void
>   tx_process_select(struct cmsg *m)
>   {
> -	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct iproto_msg *msg;
> +	struct request *req;

- Unneeded change.

>   	struct obuf *out;
>   	struct obuf_svp svp;
>   	struct port port;
>   	int count;
>   	int rc;
> -	struct request *req = &msg->dml;
> +	if (tx_accept_msg(m, &msg) != 0)
> +		goto error;
>   	if (tx_check_schema(msg->header.schema_version))
>   		goto error;
>   
>   	tx_inject_delay();
> +	req = &msg->dml;
>   	rc = box_select(req->space_id, req->index_id,
>   			req->iterator, req->offset, req->limit,
>   			req->key, req->key_end, &port);
> @@ -1607,7 +1670,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event)
>   static void
>   tx_process_call(struct cmsg *m)
>   {
> -	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct iproto_msg *msg;
> +	if (tx_accept_msg(m, &msg) != 0)
> +		goto error;
>   	if (tx_check_schema(msg->header.schema_version))
>   		goto error;
>   
> @@ -1686,13 +1751,15 @@ error:
>   static void
>   tx_process_misc(struct cmsg *m)
>   {
> -	struct iproto_msg *msg = tx_accept_msg(m);
> -	struct iproto_connection *con = msg->connection;
> -	struct obuf *out = con->tx.p_obuf;
> +	struct iproto_msg *msg;
> +	if (tx_accept_msg(m, &msg) != 0)
> +		goto error;
>   	if (tx_check_schema(msg->header.schema_version))
>   		goto error;
>   
>   	try {
> +		struct iproto_connection *con = msg->connection;
> +		struct obuf *out = con->tx.p_obuf;
>   		struct ballot ballot;
>   		switch (msg->header.type) {
>   		case IPROTO_AUTH:
> @@ -1729,7 +1796,7 @@ error:
>   static void
>   tx_process_sql(struct cmsg *m)
>   {
> -	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct iproto_msg *msg;
>   	struct obuf *out;
>   	struct port port;
>   	struct sql_bind *bind = NULL;
> @@ -1738,6 +1805,8 @@ tx_process_sql(struct cmsg *m)
>   	uint32_t len;
>   	bool is_unprepare = false;
>   
> +	if (tx_accept_msg(m, &msg) != 0)
> +		goto error;
>   	if (tx_check_schema(msg->header.schema_version))
>   		goto error;
>   	assert(msg->header.type == IPROTO_EXECUTE ||
> @@ -1825,7 +1894,11 @@ error:
>   static void
>   tx_process_replication(struct cmsg *m)
>   {
> -	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct iproto_msg *msg;
> +	if (tx_accept_msg(m, &msg) != 0) {
> +		tx_reply_error(msg);
> +		return;
> +	}
>   	struct iproto_connection *con = msg->connection;
>   	struct ev_io io;
>   	coio_create(&io, con->input.fd);
> @@ -1865,6 +1938,29 @@ tx_process_replication(struct cmsg *m)
>   	}
>   }
>   
> +static int
> +net_check_connection(struct iproto_msg *msg)

- Seems like the "bool" type should be used for net_check_connection
(https://www.tarantool.io/en/doc/1.10/dev_guide/c_style_guide/#chapter-16-function-return-values-and-names).
In the function, you not only check a connection status, but also close
it if necessary. I think it would be nice to add a description of the
function.

> +{
> +	if (!msg->close_connection)
> +		return 0;
> +
> +	struct iproto_connection *con = msg->connection;
> +	struct obuf *out = msg->wpos.obuf;
> +	int64_t nwr = sio_writev(con->output.fd, out->iov,
> +				 obuf_iovcnt(out));
> +
> +	if (nwr > 0) {
> +		/* Count statistics. */
> +		rmean_collect(rmean_net, IPROTO_SENT, nwr);
> +	} else if (nwr < 0 && ! sio_wouldblock(errno)) {
> +		diag_log();
> +	}
> +	assert(iproto_connection_is_idle(con));
> +	iproto_connection_close(con);
> +	iproto_msg_delete(msg);
> +	return -1;
> +}
> +
>   static void
>   net_send_msg(struct cmsg *m)
>   {
> @@ -1881,6 +1977,9 @@ net_send_msg(struct cmsg *m)
>   	}
>   	con->wend = msg->wpos;
>   
> +	if (net_check_connection(msg) != 0)
> +		return;
> +
>   	if (evio_has_fd(&con->output)) {
>   		if (! ev_is_active(&con->output))
>   			ev_feed_event(con->loop, &con->output, EV_WRITE);
> @@ -1910,6 +2009,10 @@ net_end_join(struct cmsg *m)
>   	struct iproto_connection *con = msg->connection;
>   
>   	msg->p_ibuf->rpos += msg->len;
> +
> +	if (net_check_connection(msg) != 0)
> +		return;
> +
>   	iproto_msg_delete(msg);
>   
>   	assert(! ev_is_active(&con->input));
> @@ -1928,6 +2031,10 @@ net_end_subscribe(struct cmsg *m)
>   	struct iproto_connection *con = msg->connection;
>   
>   	msg->p_ibuf->rpos += msg->len;
> +
> +	if (net_check_connection(msg) != 0)
> +		return;
> +
>   	iproto_msg_delete(msg);
>   
>   	assert(! ev_is_active(&con->input));
> @@ -1936,81 +2043,49 @@ net_end_subscribe(struct cmsg *m)
>   }
>   
>   /**
> - * Handshake a connection: invoke the on-connect trigger
> - * and possibly authenticate. Try to send the client an error
> - * upon a failure.
> + * Handshake a connection: prepare greeting for it.
>    */
>   static void
> -tx_process_connect(struct cmsg *m)
> +iproto_process_connect(struct iproto_msg *msg)
>   {
> -	struct iproto_msg *msg = (struct iproto_msg *) m;
>   	struct iproto_connection *con = msg->connection;
>   	struct obuf *out = msg->connection->tx.p_obuf;
> -	try {              /* connect. */

- Why "try - catch" has been deleted?

> -		con->session = session_create(SESSION_TYPE_BINARY);
> -		if (con->session == NULL)
> -			diag_raise();
> -		con->session->meta.connection = con;
> -		tx_fiber_init(con->session, 0);
> -		char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
> -		/* TODO: dirty read from tx thread */
> -		struct tt_uuid uuid = INSTANCE_UUID;
> -		random_bytes(con->salt, IPROTO_SALT_SIZE);
> -		greeting_encode(greeting, tarantool_version_id(), &uuid,
> -				con->salt, IPROTO_SALT_SIZE);
> -		obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
> -		if (! rlist_empty(&session_on_connect)) {
> -			if (session_run_on_connect_triggers(con->session) != 0)
> -				diag_raise();
> -		}
> -		iproto_wpos_create(&msg->wpos, out);
> -	} catch (Exception *e) {
> -		tx_reply_error(msg);
> -		msg->close_connection = true;
> -	}
> -}
> -
> -/**
> - * Send a response to connect to the client or close the
> - * connection in case on_connect trigger failed.
> - */
> -static void
> -net_send_greeting(struct cmsg *m)
> -{
> -	struct iproto_msg *msg = (struct iproto_msg *) m;
> -	struct iproto_connection *con = msg->connection;
> -	if (msg->close_connection) {
> -		struct obuf *out = msg->wpos.obuf;
> -		int64_t nwr = sio_writev(con->output.fd, out->iov,
> -					 obuf_iovcnt(out));
> -
> -		if (nwr > 0) {
> -			/* Count statistics. */
> -			rmean_collect(rmean_net, IPROTO_SENT, nwr);
> -		} else if (nwr < 0 && ! sio_wouldblock(errno)) {
> -			diag_log();
> -		}
> -		assert(iproto_connection_is_idle(con));
> -		iproto_connection_close(con);
> -		iproto_msg_delete(msg);
> -		return;
> -	}
> +	char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
> +	struct tt_uuid uuid = INSTANCE_UUID;
> +	random_bytes(con->salt, IPROTO_SALT_SIZE);
> +	greeting_encode(greeting, tarantool_version_id(), &uuid,
> +			con->salt, IPROTO_SALT_SIZE);
> +	obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
> +	iproto_wpos_create(&msg->wpos, out);
>   	con->wend = msg->wpos;
> -	/*
> -	 * Connect is synchronous, so no one could have been
> -	 * messing up with the connection while it was in
> -	 * progress.
> -	 */
>   	assert(evio_has_fd(&con->output));
> -	/* Handshake OK, start reading input. */
>   	ev_feed_event(con->loop, &con->output, EV_WRITE);
>   	iproto_msg_delete(msg);
>   }
>   
> -static const struct cmsg_hop connect_route[] = {
> -	{ tx_process_connect, &net_pipe },
> -	{ net_send_greeting, NULL },
> -};
> +static int
> +tx_init_connect(struct iproto_msg *msg)
> +{
> +	struct iproto_connection *con = msg->connection;
> +	obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
> +	obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
> +	con->tx.p_obuf = &con->obuf[0];
> +	iproto_wpos_create(&con->wpos, con->tx.p_obuf);
> +	iproto_wpos_create(&con->wend, con->tx.p_obuf);
> +
> +	con->session = session_create(SESSION_TYPE_BINARY);
> +	if (con->session == NULL)
> +		return -1;
> +	con->session->meta.connection = con;
> +
> +	tx_fiber_init(con->session, 0);
> +	if (! rlist_empty(&session_on_connect)) {
> +		if (session_run_on_connect_triggers(con->session) != 0)
> +			return -1;
> +	}
> +
> +	return 0;
> +}
>   
>   /** }}} */
>   
> @@ -2037,11 +2112,10 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
>   		mempool_free(&iproto_connection_pool, con);
>   		return -1;
>   	}
> -	cmsg_init(&msg->base, connect_route);
>   	msg->p_ibuf = con->p_ibuf;
>   	msg->wpos = con->wpos;
>   	msg->close_connection = false;
> -	cpipe_push(&tx_pipe, &msg->base);
> +	iproto_process_connect(msg);
>   	return 0;
>   }
>   
> @@ -2054,6 +2128,8 @@ static struct evio_service binary; /* iproto binary listener */
>   static int
>   net_cord_f(va_list /* ap */)
>   {
> +	slab_cache_create(&iproto_slabc, &runtime);
> +
>   	mempool_create(&iproto_msg_pool, &cord()->slabc,
>   		       sizeof(struct iproto_msg));
>   	mempool_create(&iproto_connection_pool, &cord()->slabc,
> @@ -2297,7 +2373,8 @@ iproto_listen(const char *uri)
>   size_t
>   iproto_mem_used(void)
>   {
> -	return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc);
> +	return slab_cache_used(&net_cord.slabc)
> +	     + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc);
>   }
>   
>   size_t
> diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result
> index 5d064b7648..bfa9c2b759 100644
> --- a/test/box-py/bad_trigger.result
> +++ b/test/box-py/bad_trigger.result
> @@ -14,6 +14,7 @@ type(box.session.on_connect(f1))
>   - function
>   ...
>   greeting:  True
> +Nothing to read yet:  Resource temporarily unavailable
>   fixheader:  True
>   error code 32
>   error message:  [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value)
> diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py
> index 7d200b9218..7f45f5e713 100644
> --- a/test/box-py/bad_trigger.test.py
> +++ b/test/box-py/bad_trigger.test.py
> @@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection
>   from lib.tarantool_connection import TarantoolConnection
>   from tarantool import NetworkError
>   from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \
> -    REQUEST_TYPE_ERROR
> +    REQUEST_TYPE_ERROR, REQUEST_TYPE_PING
>   import socket
>   import msgpack
>   
> @@ -26,9 +26,25 @@ s = conn.socket
>   # Read greeting
>   print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE
>   
> -# Read error packet
> +# Check soscket

- Typo "socket"

>   IPROTO_FIXHEADER_SIZE = 5
> -fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
> +s.setblocking(False)
> +fixheader = None
> +try:
> +    fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
> +except socket.error as err:
> +    print 'Nothing to read yet:', str(err).split(']')[1]
> +else:
> +    print 'Received fixheader'
> +s.setblocking(True)
> +
> +# Send ping
> +query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING })
> +s.send(msgpack.dumps(len(query)) + query)
> +
> +# Read error packet
> +if not fixheader:
> +    fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
>   print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE
>   unpacker.feed(fixheader)
>   packet_len = unpacker.unpack()
> 

^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [Tarantool-patches] [PATCH] iproto: make iproto thread more independent from tx
  2020-09-14 16:17 ` Leonid Vasiliev
@ 2020-09-18 13:10   ` Ilya Kosarev
  0 siblings, 0 replies; 3+ messages in thread
From: Ilya Kosarev @ 2020-09-18 13:10 UTC (permalink / raw)
  To: Leonid Vasiliev; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 23760 bytes --]


Hi,
 
Thanks for your review!
 
See 12 answers below. Sent v2 of the patch considering your comments. 
>Понедельник, 14 сентября 2020, 19:17 +03:00 от Leonid Vasiliev < lvasiliev@tarantool.org >:
> 
>
>Hi! Thank you for the patch.
>Look some comments/questions below:
>
>- Please add @ChangeLog
*  
Right, will be fixed in v2.
>- Look like "@TarantoolBot document" needs to be added to update
>https://www.tarantool.io/en/doc/2.5/reference/reference_lua/net_box/#net-box-on-connect
*  
Right, doc bot request will be included in v2 of the patch.
>
>- How do you check the problem is gone? Any stress test?
*  
Yes, here is the test i adapted from Yaroslav’s version:
https://github.com/tarantool/tarantool/issues/3776#issuecomment-617736729
I didn’t find the way to implement it under test-run.
>On 14.08.2020 13:47, Ilya Kosarev wrote:
>> On connection, an evio service callback is invoked to accept it. The
>> next step after acception was to process connection to tx thread
>> through cbus. This meant that any connection interaction involves
>> tx thread even before we get to decode what does the client want
>> from us. Consequently, a number of problems appears. The main one
>> is that we might get descriptor leak in case of unresponsive tx thread
>> (for example, when building secondary index). There are some other
>> cases where we might not want to spend precious tx time to process
>> the connection in case iproto can do it all alone.
>> This patch allows iproto to accept connection and send greeting by
>> itself. The connection is initialized in tx thread when the real
>> request comes through iproto_msg_decode(). In case request type was not
>> recognized we can also send reply with an error without using tx. It is
>> planned to add more iproto logic to prevent extra interaction with
>> tx thread. This patch already to some extent solves descriptors leakage
>> problem as far as connection establishes and stays in fetch_schema
>> state while tx thread is unresponsive.
>> The other user visible change is that on_connect triggers won't run on
>> connections that don't provide any input, as reflected in
>> bad_trigger.test.py.
>
>- Is it ok from exploitation point of view? Did you have a conversation
>with Mons?
*  
Well, it was mentioned in conversation with Mons. I think i will clarify
it once more as soon as possible. Though i am quite sure it is fine as far
as any real application provides some input immediately on connection
thus сausing trigger execution.
>
>>
>> Part of #3776
>> ---
>> Branch:  https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto
>> Issue:  https://github.com/tarantool/tarantool/issues/3776
>>
>> src/box/iproto.cc | 287 ++++++++++++++++++++------------
>> test/box-py/bad_trigger.result | 1 +
>> test/box-py/bad_trigger.test.py | 22 ++-
>> 3 files changed, 202 insertions(+), 108 deletions(-)
>>
>> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
>> index b8f65e5eca..a027d15c1d 100644
>> --- a/src/box/iproto.cc
>> +++ b/src/box/iproto.cc
>- "on_connect trigger must be processed before any other request on this
>connection." (from comment) - It looks like it's worth clarifying this
>guarantee.
*  
Right. This guarantee is kept. Triggers are being processed in
tx_init_connect() from tx_accept_msg() before any other actions.
>
>> @@ -245,7 +245,7 @@ iproto_msg_new(struct iproto_connection *con);
>> static void
>> iproto_resume(void);
>>
>> -static void
>> +static int
>> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>> bool *stop_input);
>>
>> @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)
>> iproto_resume();
>> }
>>
>> +static inline void
>> +iproto_connection_delete(struct iproto_connection *con);
>> +
>> /**
>> * A single global queue for all requests in all connections. All
>> * requests from all connections are processed concurrently.
>> @@ -280,6 +283,11 @@ static struct cord net_cord;
>> * in the tx thread.
>> */
>> static struct slab_cache net_slabc;
>> +/**
>> + * Slab cache used for allocating memory for output network buffers
>> + * in the iproto thread.
>> + */
>> +static struct slab_cache iproto_slabc;
>>
>> struct rmean *rmean_net;
>>
>> @@ -298,6 +306,9 @@ const char *rmean_net_strings[IPROTO_LAST] = {
>> "REQUESTS",
>> };
>>
>> +static int
>> +tx_init_connect(struct iproto_msg *msg);
>> +
>> static void
>> tx_process_destroy(struct cmsg *m);
>>
>> @@ -650,7 +661,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con)
>> * other parts of the connection.
>> */
>> con->state = IPROTO_CONNECTION_DESTROYED;
>> - cpipe_push(&tx_pipe, &con->destroy_msg);
>> + if (con->session != NULL)
>> + cpipe_push(&tx_pipe, &con->destroy_msg);
>> + else {
>> + /*
>> + * In case session was not created we can safely destroy
>> + * not involving tx thread. Thus we also need to destroy
>> + * obuf, which still belongs to iproto thread.
>> + */
>> + obuf_destroy(&con->obuf[0]);
>> + obuf_destroy(&con->obuf[1]);
>> + iproto_connection_delete(con);
>> + }
>> }
>>
>> /**
>> @@ -677,9 +699,18 @@ iproto_connection_close(struct iproto_connection *con)
>> * is done only once.
>> */
>> con->p_ibuf->wpos -= con->parse_size;
>> - cpipe_push(&tx_pipe, &con->disconnect_msg);
>> assert(con->state == IPROTO_CONNECTION_ALIVE);
>> con->state = IPROTO_CONNECTION_CLOSED;
>> + rlist_del(&con->in_stop_list);
>
>- Is done below(rlist_del()). Or is there some kind of trick?
*  
Well, this one is fine. rlist_del() was called in the end of
iproto_connection_close() as far the next disconnect actions
were done later anyway. Now we might call iproto_connection_try_to_start_destroy()
directly from this conditional branch. Thus we need rlist_del() to be done already.
This conditional branch returns at the end and the second rlist_del() is not called. In
other cases the second one is still used.
>
>> + if (con->session != NULL)
>> + cpipe_push(&tx_pipe, &con->disconnect_msg);
>> + else
>> + /*
>> + * In case session was not created we can safely
>> + * try to start destroy not involving tx thread.
>> + */
>> + iproto_connection_try_to_start_destroy(con);
>> + return;
>> } else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
>> iproto_connection_try_to_start_destroy(con);
>> } else {
>> @@ -809,6 +840,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>> assert(rlist_empty(&con->in_stop_list));
>> int n_requests = 0;
>> bool stop_input = false;
>> + bool obuf_in_iproto = (con->session == NULL);
>> const char *errmsg;
>> while (con->parse_size != 0 && !stop_input) {
>> if (iproto_check_msg_max()) {
>> @@ -853,12 +885,20 @@ err_msgpack:
>>
>> msg->len = reqend - reqstart; /* total request length */
>>
>> - iproto_msg_decode(msg, &pos, reqend, &stop_input);
>> - /*
>> - * This can't throw, but should not be
>> - * done in case of exception.
>> - */
>> - cpipe_push_input(&tx_pipe, &msg->base);
>> + if (iproto_msg_decode(msg, &pos, reqend, &stop_input) == 0) {
>> + if (obuf_in_iproto) {
>> + /*
>> + * If session was not created yet and obuf is
>> + * still in iproto we need to destroy it. New
>> + * one will be created in tx thread if needed.
>> + */
>> + obuf_destroy(&con->obuf[0]);
>> + obuf_destroy(&con->obuf[1]);
>> + obuf_in_iproto = false; > + }
>> + cpipe_push_input(&tx_pipe, &msg->base);
>> + }
>> +
>> n_requests++;
>> /* Request is parsed */
>> assert(reqend > reqstart);
>> @@ -1105,8 +1145,8 @@ iproto_connection_new(int fd)
>> ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
>> ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
>> ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
>> - obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
>> - obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
>> + obuf_create(&con->obuf[0], &iproto_slabc, iproto_readahead);
>> + obuf_create(&con->obuf[1], &iproto_slabc, iproto_readahead);
>> con->p_ibuf = &con->ibuf[0];
>> con->tx.p_obuf = &con->obuf[0];
>> iproto_wpos_create(&con->wpos, con->tx.p_obuf);
>> @@ -1134,10 +1174,6 @@ iproto_connection_delete(struct iproto_connection *con)
>> assert(!evio_has_fd(&con->input));
>> assert(con->session == NULL);
>> assert(con->state == IPROTO_CONNECTION_DESTROYED);
>> - /*
>> - * The output buffers must have been deleted
>> - * in tx thread.
>> - */
>> ibuf_destroy(&con->ibuf[0]);
>> ibuf_destroy(&con->ibuf[1]);
>> assert(con->obuf[0].pos == 0 &&
>> @@ -1172,6 +1208,9 @@ tx_reply_error(struct iproto_msg *msg);
>> static void
>> tx_reply_iproto_error(struct cmsg *m);
>>
>> +static void
>> +tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos);
>> +
>> static void
>> net_send_msg(struct cmsg *msg);
>>
>> @@ -1244,7 +1283,7 @@ static const struct cmsg_hop error_route[] = {
>> { net_send_error, NULL },
>> };
>>
>> -static void
>> +static int
>> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>> bool *stop_input)
>> {
>> @@ -1314,13 +1353,27 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>> (uint32_t) type);
>> goto error;
>> }
>> - return;
>> + return 0;
>> error:
>> /** Log and send the error. */
>> diag_log();
>> diag_create(&msg->diag);
>> diag_move(&fiber()->diag, &msg->diag);
>
>- Unneeded in case session == NULL. Use net_send_msg() instead of
>net_send_error().
*  
Right, will be fixed in v2.
>
>> - cmsg_init(&msg->base, error_route);
>> + if (msg->connection->session != NULL) {
>> + cmsg_init(&msg->base, error_route);
>> + return 0;
>
>- May be return -1 (decode fail). What does the return value mean
>otherwise?
*  
Well, yes, i see, this is confusing. Basically -1 means we have nothing to do with
tx any more. I think it is better to introduce 2 error codes and return 1 here to separate
this case from the pure iproto case.
>
>> + }
>> + /*
>> + * In case session was not created we can process error path
>> + * without tx thread.
>> + */
>> + tx_accept_wpos(msg->connection, &msg->wpos);
>> + struct obuf *out = msg->connection->tx.p_obuf;
>> + iproto_reply_error(out, diag_last_error(&msg->diag),
>> + msg->header.sync, ::schema_version);
>> + iproto_wpos_create(&msg->wpos, out);
>> + net_send_error(&(msg->base));
>> + return -1;
>> }
>>
>> static void
>> @@ -1382,10 +1435,6 @@ tx_process_destroy(struct cmsg *m)
>> session_destroy(con->session);
>> con->session = NULL; /* safety */
>> }
>> - /*
>> - * Got to be done in iproto thread since
>> - * that's where the memory is allocated.
>> - */
>> obuf_destroy(&con->obuf[0]);
>> obuf_destroy(&con->obuf[1]);
>> }
>> @@ -1478,13 +1527,21 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
>> }
>> }
>>
>> -static inline struct iproto_msg *
>> -tx_accept_msg(struct cmsg *m)
>> +static inline int
>> +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg)
>> {
>> - struct iproto_msg *msg = (struct iproto_msg *) m;
>> - tx_accept_wpos(msg->connection, &msg->wpos);
>> - tx_fiber_init(msg->connection->session, msg->header.sync);
>> - return msg;
>> + *msg = (struct iproto_msg *) m;
>> + /*
>> + * In case session was not created we need to init connection in tx and
>> + * create it here.
>> + */
>> + if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) {
>> + (*msg)->close_connection = true;
>> + return -1;
>> + }
>> + tx_accept_wpos((*msg)->connection, &(*msg)->wpos);
>> + tx_fiber_init((*msg)->connection->session, (*msg)->header.sync);
>> + return 0;
>> }
>>
>> /**
>> @@ -1507,7 +1564,8 @@ tx_reply_error(struct iproto_msg *msg)
>> static void
>> tx_reply_iproto_error(struct cmsg *m)
>> {
>> - struct iproto_msg *msg = tx_accept_msg(m);
>> + struct iproto_msg *msg;
>> + tx_accept_msg(m, &msg);
>
>- Please add a comment why the return value check is unnecessary.
*  
Right, will be fixed in v2.
>
>> struct obuf *out = msg->connection->tx.p_obuf;
>> iproto_reply_error(out, diag_last_error(&msg->diag),
>> msg->header.sync, ::schema_version);
>> @@ -1527,7 +1585,9 @@ tx_inject_delay(void)
>> static void
>> tx_process1(struct cmsg *m)
>> {
>> - struct iproto_msg *msg = tx_accept_msg(m);
>> + struct iproto_msg *msg;
>> + if (tx_accept_msg(m, &msg) != 0)
>> + goto error;
>> if (tx_check_schema(msg->header.schema_version))
>> goto error;
>>
>> @@ -1553,17 +1613,20 @@ error:
>> static void
>> tx_process_select(struct cmsg *m)
>> {
>> - struct iproto_msg *msg = tx_accept_msg(m);
>> + struct iproto_msg *msg;
>> + struct request *req;
>
>- Unneeded change.
*  
This change is needed as far as variables 
have to be declared before goto.
>
>> struct obuf *out;
>> struct obuf_svp svp;
>> struct port port;
>> int count;
>> int rc;
>> - struct request *req = &msg->dml;
>> + if (tx_accept_msg(m, &msg) != 0)
>> + goto error;
>> if (tx_check_schema(msg->header.schema_version))
>> goto error;
>>
>> tx_inject_delay();
>> + req = &msg->dml;
>> rc = box_select(req->space_id, req->index_id,
>> req->iterator, req->offset, req->limit,
>> req->key, req->key_end, &port);
>> @@ -1607,7 +1670,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event)
>> static void
>> tx_process_call(struct cmsg *m)
>> {
>> - struct iproto_msg *msg = tx_accept_msg(m);
>> + struct iproto_msg *msg;
>> + if (tx_accept_msg(m, &msg) != 0)
>> + goto error;
>> if (tx_check_schema(msg->header.schema_version))
>> goto error;
>>
>> @@ -1686,13 +1751,15 @@ error:
>> static void
>> tx_process_misc(struct cmsg *m)
>> {
>> - struct iproto_msg *msg = tx_accept_msg(m);
>> - struct iproto_connection *con = msg->connection;
>> - struct obuf *out = con->tx.p_obuf;
>> + struct iproto_msg *msg;
>> + if (tx_accept_msg(m, &msg) != 0)
>> + goto error;
>> if (tx_check_schema(msg->header.schema_version))
>> goto error;
>>
>> try {
>> + struct iproto_connection *con = msg->connection;
>> + struct obuf *out = con->tx.p_obuf;
>> struct ballot ballot;
>> switch (msg->header.type) {
>> case IPROTO_AUTH:
>> @@ -1729,7 +1796,7 @@ error:
>> static void
>> tx_process_sql(struct cmsg *m)
>> {
>> - struct iproto_msg *msg = tx_accept_msg(m);
>> + struct iproto_msg *msg;
>> struct obuf *out;
>> struct port port;
>> struct sql_bind *bind = NULL;
>> @@ -1738,6 +1805,8 @@ tx_process_sql(struct cmsg *m)
>> uint32_t len;
>> bool is_unprepare = false;
>>
>> + if (tx_accept_msg(m, &msg) != 0)
>> + goto error;
>> if (tx_check_schema(msg->header.schema_version))
>> goto error;
>> assert(msg->header.type == IPROTO_EXECUTE ||
>> @@ -1825,7 +1894,11 @@ error:
>> static void
>> tx_process_replication(struct cmsg *m)
>> {
>> - struct iproto_msg *msg = tx_accept_msg(m);
>> + struct iproto_msg *msg;
>> + if (tx_accept_msg(m, &msg) != 0) {
>> + tx_reply_error(msg);
>> + return;
>> + }
>> struct iproto_connection *con = msg->connection;
>> struct ev_io io;
>> coio_create(&io, con->input.fd);
>> @@ -1865,6 +1938,29 @@ tx_process_replication(struct cmsg *m)
>> }
>> }
>>
>> +static int
>> +net_check_connection(struct iproto_msg *msg)
>
>- Seems like the "bool" type should be used for net_check_connection
>( https://www.tarantool.io/en/doc/1.10/dev_guide/c_style_guide/#chapter-16-function-return-values-and-names ).
>In the function, you not only check a connection status, but also close
>it if necessary. I think it would be nice to add a description of the
>function.
*  
Right. I see, it is confusing now. Will be fixed in v2.
>
>> +{
>> + if (!msg->close_connection)
>> + return 0;
>> +
>> + struct iproto_connection *con = msg->connection;
>> + struct obuf *out = msg->wpos.obuf;
>> + int64_t nwr = sio_writev(con->output.fd, out->iov,
>> + obuf_iovcnt(out));
>> +
>> + if (nwr > 0) {
>> + /* Count statistics. */
>> + rmean_collect(rmean_net, IPROTO_SENT, nwr);
>> + } else if (nwr < 0 && ! sio_wouldblock(errno)) {
>> + diag_log();
>> + }
>> + assert(iproto_connection_is_idle(con));
>> + iproto_connection_close(con);
>> + iproto_msg_delete(msg);
>> + return -1;
>> +}
>> +
>> static void
>> net_send_msg(struct cmsg *m)
>> {
>> @@ -1881,6 +1977,9 @@ net_send_msg(struct cmsg *m)
>> }
>> con->wend = msg->wpos;
>>
>> + if (net_check_connection(msg) != 0)
>> + return;
>> +
>> if (evio_has_fd(&con->output)) {
>> if (! ev_is_active(&con->output))
>> ev_feed_event(con->loop, &con->output, EV_WRITE);
>> @@ -1910,6 +2009,10 @@ net_end_join(struct cmsg *m)
>> struct iproto_connection *con = msg->connection;
>>
>> msg->p_ibuf->rpos += msg->len;
>> +
>> + if (net_check_connection(msg) != 0)
>> + return;
>> +
>> iproto_msg_delete(msg);
>>
>> assert(! ev_is_active(&con->input));
>> @@ -1928,6 +2031,10 @@ net_end_subscribe(struct cmsg *m)
>> struct iproto_connection *con = msg->connection;
>>
>> msg->p_ibuf->rpos += msg->len;
>> +
>> + if (net_check_connection(msg) != 0)
>> + return;
>> +
>> iproto_msg_delete(msg);
>>
>> assert(! ev_is_active(&con->input));
>> @@ -1936,81 +2043,49 @@ net_end_subscribe(struct cmsg *m)
>> }
>>
>> /**
>> - * Handshake a connection: invoke the on-connect trigger
>> - * and possibly authenticate. Try to send the client an error
>> - * upon a failure.
>> + * Handshake a connection: prepare greeting for it.
>> */
>> static void
>> -tx_process_connect(struct cmsg *m)
>> +iproto_process_connect(struct iproto_msg *msg)
>> {
>> - struct iproto_msg *msg = (struct iproto_msg *) m;
>> struct iproto_connection *con = msg->connection;
>> struct obuf *out = msg->connection->tx.p_obuf;
>> - try { /* connect. */
>
>- Why "try - catch" has been deleted?
*  
Hmm, right, I forgot about the obuf_dup_xc() function. Other potential
throwers were moved. Fail on obuf_dup() is now processed correctly.
>
>> - con->session = session_create(SESSION_TYPE_BINARY);
>> - if (con->session == NULL)
>> - diag_raise();
>> - con->session->meta.connection = con;
>> - tx_fiber_init(con->session, 0);
>> - char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
>> - /* TODO: dirty read from tx thread */
>> - struct tt_uuid uuid = INSTANCE_UUID;
>> - random_bytes(con->salt, IPROTO_SALT_SIZE);
>> - greeting_encode(greeting, tarantool_version_id(), &uuid,
>> - con->salt, IPROTO_SALT_SIZE);
>> - obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
>> - if (! rlist_empty(&session_on_connect)) {
>> - if (session_run_on_connect_triggers(con->session) != 0)
>> - diag_raise();
>> - }
>> - iproto_wpos_create(&msg->wpos, out);
>> - } catch (Exception *e) {
>> - tx_reply_error(msg);
>> - msg->close_connection = true;
>> - }
>> -}
>> -
>> -/**
>> - * Send a response to connect to the client or close the
>> - * connection in case on_connect trigger failed.
>> - */
>> -static void
>> -net_send_greeting(struct cmsg *m)
>> -{
>> - struct iproto_msg *msg = (struct iproto_msg *) m;
>> - struct iproto_connection *con = msg->connection;
>> - if (msg->close_connection) {
>> - struct obuf *out = msg->wpos.obuf;
>> - int64_t nwr = sio_writev(con->output.fd, out->iov,
>> - obuf_iovcnt(out));
>> -
>> - if (nwr > 0) {
>> - /* Count statistics. */
>> - rmean_collect(rmean_net, IPROTO_SENT, nwr);
>> - } else if (nwr < 0 && ! sio_wouldblock(errno)) {
>> - diag_log();
>> - }
>> - assert(iproto_connection_is_idle(con));
>> - iproto_connection_close(con);
>> - iproto_msg_delete(msg);
>> - return;
>> - }
>> + char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
>> + struct tt_uuid uuid = INSTANCE_UUID;
>> + random_bytes(con->salt, IPROTO_SALT_SIZE);
>> + greeting_encode(greeting, tarantool_version_id(), &uuid,
>> + con->salt, IPROTO_SALT_SIZE);
>> + obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
>> + iproto_wpos_create(&msg->wpos, out);
>> con->wend = msg->wpos;
>> - /*
>> - * Connect is synchronous, so no one could have been
>> - * messing up with the connection while it was in
>> - * progress.
>> - */
>> assert(evio_has_fd(&con->output));
>> - /* Handshake OK, start reading input. */
>> ev_feed_event(con->loop, &con->output, EV_WRITE);
>> iproto_msg_delete(msg);
>> }
>>
>> -static const struct cmsg_hop connect_route[] = {
>> - { tx_process_connect, &net_pipe },
>> - { net_send_greeting, NULL },
>> -};
>> +static int
>> +tx_init_connect(struct iproto_msg *msg)
>> +{
>> + struct iproto_connection *con = msg->connection;
>> + obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
>> + obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
>> + con->tx.p_obuf = &con->obuf[0];
>> + iproto_wpos_create(&con->wpos, con->tx.p_obuf);
>> + iproto_wpos_create(&con->wend, con->tx.p_obuf);
>> +
>> + con->session = session_create(SESSION_TYPE_BINARY);
>> + if (con->session == NULL)
>> + return -1;
>> + con->session->meta.connection = con;
>> +
>> + tx_fiber_init(con->session, 0);
>> + if (! rlist_empty(&session_on_connect)) {
>> + if (session_run_on_connect_triggers(con->session) != 0)
>> + return -1;
>> + }
>> +
>> + return 0;
>> +}
>>
>> /** }}} */
>>
>> @@ -2037,11 +2112,10 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
>> mempool_free(&iproto_connection_pool, con);
>> return -1;
>> }
>> - cmsg_init(&msg->base, connect_route);
>> msg->p_ibuf = con->p_ibuf;
>> msg->wpos = con->wpos;
>> msg->close_connection = false;
>> - cpipe_push(&tx_pipe, &msg->base);
>> + iproto_process_connect(msg);
>> return 0;
>> }
>>
>> @@ -2054,6 +2128,8 @@ static struct evio_service binary; /* iproto binary listener */
>> static int
>> net_cord_f(va_list /* ap */)
>> {
>> + slab_cache_create(&iproto_slabc, &runtime);
>> +
>> mempool_create(&iproto_msg_pool, &cord()->slabc,
>> sizeof(struct iproto_msg));
>> mempool_create(&iproto_connection_pool, &cord()->slabc,
>> @@ -2297,7 +2373,8 @@ iproto_listen(const char *uri)
>> size_t
>> iproto_mem_used(void)
>> {
>> - return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc);
>> + return slab_cache_used(&net_cord.slabc)
>> + + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc);
>> }
>>
>> size_t
>> diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result
>> index 5d064b7648..bfa9c2b759 100644
>> --- a/test/box-py/bad_trigger.result
>> +++ b/test/box-py/bad_trigger.result
>> @@ -14,6 +14,7 @@ type(box.session.on_connect(f1))
>> - function
>> ...
>> greeting: True
>> +Nothing to read yet: Resource temporarily unavailable
>> fixheader: True
>> error code 32
>> error message: [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value)
>> diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py
>> index 7d200b9218..7f45f5e713 100644
>> --- a/test/box-py/bad_trigger.test.py
>> +++ b/test/box-py/bad_trigger.test.py
>> @@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection
>> from lib.tarantool_connection import TarantoolConnection
>> from tarantool import NetworkError
>> from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \
>> - REQUEST_TYPE_ERROR
>> + REQUEST_TYPE_ERROR, REQUEST_TYPE_PING
>> import socket
>> import msgpack
>>
>> @@ -26,9 +26,25 @@ s = conn.socket
>> # Read greeting
>> print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE
>>
>> -# Read error packet
>> +# Check soscket
>
>- Typo "socket"
>> IPROTO_FIXHEADER_SIZE = 5
>> -fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
>> +s.setblocking(False)
>> +fixheader = None
>> +try:
>> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
>> +except socket.error as err:
>> + print 'Nothing to read yet:', str(err).split(']')[1]
>> +else:
>> + print 'Received fixheader'
>> +s.setblocking(True)
>> +
>> +# Send ping
>> +query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING })
>> +s.send(msgpack.dumps(len(query)) + query)
>> +
>> +# Read error packet
>> +if not fixheader:
>> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
>> print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE
>> unpacker.feed(fixheader)
>> packet_len = unpacker.unpack()
>>
 
 
--
Ilya Kosarev
 
 

[-- Attachment #2: Type: text/html, Size: 32219 bytes --]

^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2020-09-18 13:10 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-08-14 10:47 [Tarantool-patches] [PATCH] iproto: make iproto thread more independent from tx Ilya Kosarev
2020-09-14 16:17 ` Leonid Vasiliev
2020-09-18 13:10   ` Ilya Kosarev

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox