Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v3] iproto: make iproto thread more independent from tx
@ 2020-09-25 22:53 Ilya Kosarev
  2020-10-01 23:45 ` Vladislav Shpilevoy
  0 siblings, 1 reply; 4+ messages in thread
From: Ilya Kosarev @ 2020-09-25 22:53 UTC (permalink / raw)
  To: v.shpilevoy; +Cc: tarantool-patches

On connection, an evio service callback is invoked to accept it. The
next step after acception was to process connection to tx thread
through cbus. This meant that any connection interaction involves
tx thread even before we get to decode what does the client want
from us. Consequently, a number of problems appears. The main one
is that we might get descriptor leak in case of unresponsive tx thread
(for example, when building secondary index). There are some other
cases where we might not want to spend precious tx time to process
the connection in case iproto can do it all alone.
This patch allows iproto to accept connection and send greeting by
itself. The connection is initialized in tx thread when the real
request comes through iproto_msg_decode(). In case request type was not
recognized we can also send reply with an error without using tx. It is
planned to add more iproto logic to prevent extra interaction with
tx thread. This patch already to some extent solves descriptors leakage
problem as far as connection establishes and stays in fetch_schema
state while tx thread is unresponsive.
The other user visible change is that on_connect triggers won't run on
connections that don't provide any input, as reflected in
bad_trigger.test.py.

Part of #3776

@TarantoolBot document
Title: iproto: on_connect triggers execution
Update the documentation for on_connect triggers to reflect that they
are now being executed only with the first request. Though the triggers
are still the first thing to be executed on a new connection. While it
is quite a synthetic case to establish a connection without making
any requests it is technically possible and now your triggers won't be
executed in this case. Some request is required to start their
execution.
---
Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto
Issue: https://github.com/tarantool/tarantool/issues/3776

@ChangeLog:
 * Make iproto thread more independent from tx. Connections are now
 first processed in iproto not involving tx thread. On_connect (and,
 consequently, on_disconnect) triggers won't run on connections that
 don't provide any input (gh-3776).

Changes in v2:
 - docbot request provided
 - ChangeLog provided
 - net_send_msg() used instead of net_send_error() where needed
 - error cases made clear in iproto_msg_decode()
 - replaced net_check_connection() with iproto_connection_fail() predicate
 - improved tx session initialization fail processing to avoid extra tries
 - fixed obuf_dup() fail processing in iproto_process_connect()
 - added some comments
 - some comments style fixed

Changes in v3:
 - added braces to the confusing one-line if statement
 - updated ChangeLog
 - added description for iproto_msg_decode()
 - simplified error processing in iproto_msg_decode()

 src/box/iproto.cc               | 317 +++++++++++++++++++++-----------
 test/box-py/bad_trigger.result  |   1 +
 test/box-py/bad_trigger.test.py |  22 ++-
 test/sql/prepared.result        |   4 +
 test/sql/prepared.test.lua      |   1 +
 5 files changed, 239 insertions(+), 106 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index b8f65e5ec..9f98fce86 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -245,7 +245,7 @@ iproto_msg_new(struct iproto_connection *con);
 static void
 iproto_resume(void);
 
-static void
+static int
 iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		  bool *stop_input);
 
@@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)
 	iproto_resume();
 }
 
+static inline void
+iproto_connection_delete(struct iproto_connection *con);
+
 /**
  * A single global queue for all requests in all connections. All
  * requests from all connections are processed concurrently.
@@ -280,6 +283,11 @@ static struct cord net_cord;
  * in the tx thread.
  */
 static struct slab_cache net_slabc;
+/**
+ * Slab cache used for allocating memory for output network buffers
+ * in the iproto thread.
+ */
+static struct slab_cache iproto_slabc;
 
 struct rmean *rmean_net;
 
@@ -298,6 +306,9 @@ const char *rmean_net_strings[IPROTO_LAST] = {
 	"REQUESTS",
 };
 
+static int
+tx_init_connect(struct iproto_msg *msg);
+
 static void
 tx_process_destroy(struct cmsg *m);
 
@@ -463,6 +474,7 @@ struct iproto_connection
 	struct ev_io output;
 	/** Logical session. */
 	struct session *session;
+	bool init_failed;
 	ev_loop *loop;
 	/**
 	 * Pre-allocated disconnect msg. Is sent right after
@@ -650,7 +662,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con)
 	 * other parts of the connection.
 	 */
 	con->state = IPROTO_CONNECTION_DESTROYED;
-	cpipe_push(&tx_pipe, &con->destroy_msg);
+	if (con->session != NULL)
+		cpipe_push(&tx_pipe, &con->destroy_msg);
+	else {
+		/*
+		 * In case session was not created we can safely destroy
+		 * not involving tx thread. Thus we also need to destroy
+		 * obuf, which still belongs to iproto thread.
+		 */
+		obuf_destroy(&con->obuf[0]);
+		obuf_destroy(&con->obuf[1]);
+		iproto_connection_delete(con);
+	}
 }
 
 /**
@@ -677,9 +700,19 @@ iproto_connection_close(struct iproto_connection *con)
 		 * is done only once.
 		 */
 		con->p_ibuf->wpos -= con->parse_size;
-		cpipe_push(&tx_pipe, &con->disconnect_msg);
 		assert(con->state == IPROTO_CONNECTION_ALIVE);
 		con->state = IPROTO_CONNECTION_CLOSED;
+		rlist_del(&con->in_stop_list);
+		if (con->session != NULL) {
+			cpipe_push(&tx_pipe, &con->disconnect_msg);
+		} else {
+			/*
+			 * In case session was not created we can safely
+			 * try to start destroy not involving tx thread.
+			 */
+			iproto_connection_try_to_start_destroy(con);
+		}
+		return;
 	} else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
 		iproto_connection_try_to_start_destroy(con);
 	} else {
@@ -809,6 +842,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 	assert(rlist_empty(&con->in_stop_list));
 	int n_requests = 0;
 	bool stop_input = false;
+	bool obuf_in_iproto = (con->session == NULL);
 	const char *errmsg;
 	while (con->parse_size != 0 && !stop_input) {
 		if (iproto_check_msg_max()) {
@@ -853,12 +887,20 @@ err_msgpack:
 
 		msg->len = reqend - reqstart; /* total request length */
 
-		iproto_msg_decode(msg, &pos, reqend, &stop_input);
-		/*
-		 * This can't throw, but should not be
-		 * done in case of exception.
-		 */
-		cpipe_push_input(&tx_pipe, &msg->base);
+		if (iproto_msg_decode(msg, &pos, reqend, &stop_input) != -1) {
+			if (obuf_in_iproto) {
+				/*
+				 * If session was not created yet and obuf is
+				 * still in iproto we need to destroy it. New
+				 * one will be created in tx thread if needed.
+				 */
+				obuf_destroy(&con->obuf[0]);
+				obuf_destroy(&con->obuf[1]);
+				obuf_in_iproto = false;
+			}
+			cpipe_push_input(&tx_pipe, &msg->base);
+		}
+
 		n_requests++;
 		/* Request is parsed */
 		assert(reqend > reqstart);
@@ -1105,8 +1147,8 @@ iproto_connection_new(int fd)
 	ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
 	ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
 	ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
-	obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
-	obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
+	obuf_create(&con->obuf[0], &iproto_slabc, iproto_readahead);
+	obuf_create(&con->obuf[1], &iproto_slabc, iproto_readahead);
 	con->p_ibuf = &con->ibuf[0];
 	con->tx.p_obuf = &con->obuf[0];
 	iproto_wpos_create(&con->wpos, con->tx.p_obuf);
@@ -1114,6 +1156,7 @@ iproto_connection_new(int fd)
 	con->parse_size = 0;
 	con->long_poll_count = 0;
 	con->session = NULL;
+	con->init_failed = false;
 	rlist_create(&con->in_stop_list);
 	/* It may be very awkward to allocate at close. */
 	cmsg_init(&con->destroy_msg, destroy_route);
@@ -1134,10 +1177,6 @@ iproto_connection_delete(struct iproto_connection *con)
 	assert(!evio_has_fd(&con->input));
 	assert(con->session == NULL);
 	assert(con->state == IPROTO_CONNECTION_DESTROYED);
-	/*
-	 * The output buffers must have been deleted
-	 * in tx thread.
-	 */
 	ibuf_destroy(&con->ibuf[0]);
 	ibuf_destroy(&con->ibuf[1]);
 	assert(con->obuf[0].pos == 0 &&
@@ -1172,6 +1211,9 @@ tx_reply_error(struct iproto_msg *msg);
 static void
 tx_reply_iproto_error(struct cmsg *m);
 
+static void
+tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos);
+
 static void
 net_send_msg(struct cmsg *msg);
 
@@ -1244,7 +1286,18 @@ static const struct cmsg_hop error_route[] = {
 	{ net_send_error, NULL },
 };
 
-static void
+/**
+ * Decode the request from @a pos to @a reqend and put it into the queue.
+ * @param msg[inout] iproto message.
+ * @param pos[inout] the start of the request to parse.
+ * @param reqend the end of the request to parse.
+ * @param stop_input[out] in case JOIN is running we need to stop IO.
+ *
+ * @retval -1 Error. Session was not inited, so we do not need to evoke tx.
+ * @retval  1 Error. Session was created, so we have to init the error route.
+ * @retval  0 Success. The route corresponding to the request is inited.
+ */
+static int
 iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		  bool *stop_input)
 {
@@ -1314,13 +1367,24 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 			 (uint32_t) type);
 		goto error;
 	}
-	return;
+	return 0;
 error:
 	/** Log and send the error. */
 	diag_log();
-	diag_create(&msg->diag);
-	diag_move(&fiber()->diag, &msg->diag);
-	cmsg_init(&msg->base, error_route);
+	if (msg->connection->session != NULL) {
+		diag_create(&msg->diag);
+		diag_move(&fiber()->diag, &msg->diag);
+		cmsg_init(&msg->base, error_route);
+		return 1;
+	}
+	/*
+	 * In case session was not created we can process error path
+	 * without tx thread.
+	 */
+	tx_accept_wpos(msg->connection, &msg->wpos);
+	tx_reply_error(msg);
+	net_send_msg(&(msg->base));
+	return -1;
 }
 
 static void
@@ -1382,10 +1446,6 @@ tx_process_destroy(struct cmsg *m)
 		session_destroy(con->session);
 		con->session = NULL; /* safety */
 	}
-	/*
-	 * Got to be done in iproto thread since
-	 * that's where the memory is allocated.
-	 */
 	obuf_destroy(&con->obuf[0]);
 	obuf_destroy(&con->obuf[1]);
 }
@@ -1478,13 +1538,27 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
 	}
 }
 
-static inline struct iproto_msg *
-tx_accept_msg(struct cmsg *m)
+static inline int
+tx_accept_msg(struct cmsg *m, struct iproto_msg **msg)
 {
-	struct iproto_msg *msg = (struct iproto_msg *) m;
-	tx_accept_wpos(msg->connection, &msg->wpos);
-	tx_fiber_init(msg->connection->session, msg->header.sync);
-	return msg;
+	*msg = (struct iproto_msg *) m;
+	/*
+	 * In case connection init failed we don't need to try anymore.
+	 */
+	if ((*msg)->connection->init_failed)
+		return -1;
+	/*
+	 * In case session was not created we need to init connection in tx and
+	 * create it here.
+	 */
+	if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) {
+		(*msg)->connection->init_failed = true;
+		(*msg)->close_connection = true;
+		return -1;
+	}
+	tx_accept_wpos((*msg)->connection, &(*msg)->wpos);
+	tx_fiber_init((*msg)->connection->session, (*msg)->header.sync);
+	return 0;
 }
 
 /**
@@ -1507,7 +1581,14 @@ tx_reply_error(struct iproto_msg *msg)
 static void
 tx_reply_iproto_error(struct cmsg *m)
 {
-	struct iproto_msg *msg = tx_accept_msg(m);
+	struct iproto_msg *msg;
+	/*
+	 * We don't need to check tx_accept_msg() return value here
+	 * as far as if we might only process iproto error in tx
+	 * in case connection session is already created and
+	 * thus tx_accept_msg() can't fail.
+	 */
+	tx_accept_msg(m, &msg);
 	struct obuf *out = msg->connection->tx.p_obuf;
 	iproto_reply_error(out, diag_last_error(&msg->diag),
 			   msg->header.sync, ::schema_version);
@@ -1527,7 +1608,9 @@ tx_inject_delay(void)
 static void
 tx_process1(struct cmsg *m)
 {
-	struct iproto_msg *msg = tx_accept_msg(m);
+	struct iproto_msg *msg;
+	if (tx_accept_msg(m, &msg) != 0)
+		goto error;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1553,17 +1636,20 @@ error:
 static void
 tx_process_select(struct cmsg *m)
 {
-	struct iproto_msg *msg = tx_accept_msg(m);
+	struct iproto_msg *msg;
+	struct request *req;
 	struct obuf *out;
 	struct obuf_svp svp;
 	struct port port;
 	int count;
 	int rc;
-	struct request *req = &msg->dml;
+	if (tx_accept_msg(m, &msg) != 0)
+		goto error;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
 	tx_inject_delay();
+	req = &msg->dml;
 	rc = box_select(req->space_id, req->index_id,
 			req->iterator, req->offset, req->limit,
 			req->key, req->key_end, &port);
@@ -1607,7 +1693,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event)
 static void
 tx_process_call(struct cmsg *m)
 {
-	struct iproto_msg *msg = tx_accept_msg(m);
+	struct iproto_msg *msg;
+	if (tx_accept_msg(m, &msg) != 0)
+		goto error;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1686,13 +1774,15 @@ error:
 static void
 tx_process_misc(struct cmsg *m)
 {
-	struct iproto_msg *msg = tx_accept_msg(m);
-	struct iproto_connection *con = msg->connection;
-	struct obuf *out = con->tx.p_obuf;
+	struct iproto_msg *msg;
+	if (tx_accept_msg(m, &msg) != 0)
+		goto error;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
 	try {
+		struct iproto_connection *con = msg->connection;
+		struct obuf *out = con->tx.p_obuf;
 		struct ballot ballot;
 		switch (msg->header.type) {
 		case IPROTO_AUTH:
@@ -1729,7 +1819,7 @@ error:
 static void
 tx_process_sql(struct cmsg *m)
 {
-	struct iproto_msg *msg = tx_accept_msg(m);
+	struct iproto_msg *msg;
 	struct obuf *out;
 	struct port port;
 	struct sql_bind *bind = NULL;
@@ -1738,6 +1828,8 @@ tx_process_sql(struct cmsg *m)
 	uint32_t len;
 	bool is_unprepare = false;
 
+	if (tx_accept_msg(m, &msg) != 0)
+		goto error;
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 	assert(msg->header.type == IPROTO_EXECUTE ||
@@ -1825,7 +1917,11 @@ error:
 static void
 tx_process_replication(struct cmsg *m)
 {
-	struct iproto_msg *msg = tx_accept_msg(m);
+	struct iproto_msg *msg;
+	if (tx_accept_msg(m, &msg) != 0) {
+		tx_reply_error(msg);
+		return;
+	}
 	struct iproto_connection *con = msg->connection;
 	struct ev_io io;
 	coio_create(&io, con->input.fd);
@@ -1865,6 +1961,29 @@ tx_process_replication(struct cmsg *m)
 	}
 }
 
+/**
+ * Check connection health and try to send an error to the client
+ * in case of internal connection init or on_connect trigger failure.
+ */
+static bool
+iproto_connection_fail(struct iproto_msg *msg)
+{
+	if (!msg->close_connection)
+		return false;
+	struct iproto_connection *con = msg->connection;
+	int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov,
+				 obuf_iovcnt(msg->wpos.obuf));
+	if (nwr > 0) {
+		/* Count statistics. */
+		rmean_collect(rmean_net, IPROTO_SENT, nwr);
+	} else if (nwr < 0 && ! sio_wouldblock(errno)) {
+		diag_log();
+	}
+	iproto_connection_close(con);
+	iproto_msg_delete(msg);
+	return true;
+}
+
 static void
 net_send_msg(struct cmsg *m)
 {
@@ -1881,6 +2000,9 @@ net_send_msg(struct cmsg *m)
 	}
 	con->wend = msg->wpos;
 
+	if (iproto_connection_fail(msg))
+		return;
+
 	if (evio_has_fd(&con->output)) {
 		if (! ev_is_active(&con->output))
 			ev_feed_event(con->loop, &con->output, EV_WRITE);
@@ -1910,6 +2032,10 @@ net_end_join(struct cmsg *m)
 	struct iproto_connection *con = msg->connection;
 
 	msg->p_ibuf->rpos += msg->len;
+
+	if (iproto_connection_fail(msg))
+		return;
+
 	iproto_msg_delete(msg);
 
 	assert(! ev_is_active(&con->input));
@@ -1928,6 +2054,10 @@ net_end_subscribe(struct cmsg *m)
 	struct iproto_connection *con = msg->connection;
 
 	msg->p_ibuf->rpos += msg->len;
+
+	if (iproto_connection_fail(msg))
+		return;
+
 	iproto_msg_delete(msg);
 
 	assert(! ev_is_active(&con->input));
@@ -1936,81 +2066,60 @@ net_end_subscribe(struct cmsg *m)
 }
 
 /**
- * Handshake a connection: invoke the on-connect trigger
- * and possibly authenticate. Try to send the client an error
- * upon a failure.
+ * Handshake a connection: prepare greeting for it.
  */
 static void
-tx_process_connect(struct cmsg *m)
+iproto_process_connect(struct iproto_msg *msg)
 {
-	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
 	struct obuf *out = msg->connection->tx.p_obuf;
-	try {              /* connect. */
-		con->session = session_create(SESSION_TYPE_BINARY);
-		if (con->session == NULL)
-			diag_raise();
-		con->session->meta.connection = con;
-		tx_fiber_init(con->session, 0);
-		char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
-		/* TODO: dirty read from tx thread */
-		struct tt_uuid uuid = INSTANCE_UUID;
-		random_bytes(con->salt, IPROTO_SALT_SIZE);
-		greeting_encode(greeting, tarantool_version_id(), &uuid,
-				con->salt, IPROTO_SALT_SIZE);
-		obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
-		if (! rlist_empty(&session_on_connect)) {
-			if (session_run_on_connect_triggers(con->session) != 0)
-				diag_raise();
-		}
+	char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
+	/* TODO: dirty read from tx thread */
+	struct tt_uuid uuid = INSTANCE_UUID;
+	random_bytes(con->salt, IPROTO_SALT_SIZE);
+	greeting_encode(greeting, tarantool_version_id(), &uuid,
+			con->salt, IPROTO_SALT_SIZE);
+	if (obuf_dup(out, greeting, IPROTO_GREETING_SIZE)
+				 != IPROTO_GREETING_SIZE) {
+		diag_set(OutOfMemory, IPROTO_GREETING_SIZE,
+			 "greeting obuf", "dup");
+		iproto_reply_error(out, diag_last_error(&fiber()->diag),
+				   msg->header.sync, ::schema_version);
 		iproto_wpos_create(&msg->wpos, out);
-	} catch (Exception *e) {
-		tx_reply_error(msg);
 		msg->close_connection = true;
-	}
-}
-
-/**
- * Send a response to connect to the client or close the
- * connection in case on_connect trigger failed.
- */
-static void
-net_send_greeting(struct cmsg *m)
-{
-	struct iproto_msg *msg = (struct iproto_msg *) m;
-	struct iproto_connection *con = msg->connection;
-	if (msg->close_connection) {
-		struct obuf *out = msg->wpos.obuf;
-		int64_t nwr = sio_writev(con->output.fd, out->iov,
-					 obuf_iovcnt(out));
-
-		if (nwr > 0) {
-			/* Count statistics. */
-			rmean_collect(rmean_net, IPROTO_SENT, nwr);
-		} else if (nwr < 0 && ! sio_wouldblock(errno)) {
-			diag_log();
-		}
-		assert(iproto_connection_is_idle(con));
-		iproto_connection_close(con);
-		iproto_msg_delete(msg);
+		iproto_connection_fail(msg);
 		return;
 	}
+	iproto_wpos_create(&msg->wpos, out);
 	con->wend = msg->wpos;
-	/*
-	 * Connect is synchronous, so no one could have been
-	 * messing up with the connection while it was in
-	 * progress.
-	 */
 	assert(evio_has_fd(&con->output));
-	/* Handshake OK, start reading input. */
 	ev_feed_event(con->loop, &con->output, EV_WRITE);
 	iproto_msg_delete(msg);
 }
 
-static const struct cmsg_hop connect_route[] = {
-	{ tx_process_connect, &net_pipe },
-	{ net_send_greeting, NULL },
-};
+static int
+tx_init_connect(struct iproto_msg *msg)
+{
+	struct iproto_connection *con = msg->connection;
+	obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
+	obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
+	con->tx.p_obuf = &con->obuf[0];
+	iproto_wpos_create(&con->wpos, con->tx.p_obuf);
+	iproto_wpos_create(&con->wend, con->tx.p_obuf);
+
+	con->session = session_create(SESSION_TYPE_BINARY);
+	if (con->session == NULL)
+		return -1;
+	con->session->meta.connection = con;
+
+	tx_fiber_init(con->session, 0);
+	if (! rlist_empty(&session_on_connect)) {
+		if (session_run_on_connect_triggers(con->session) != 0)
+			return -1;
+	}
+
+	return 0;
+}
 
 /** }}} */
 
@@ -2037,11 +2146,10 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
 		mempool_free(&iproto_connection_pool, con);
 		return -1;
 	}
-	cmsg_init(&msg->base, connect_route);
 	msg->p_ibuf = con->p_ibuf;
 	msg->wpos = con->wpos;
 	msg->close_connection = false;
-	cpipe_push(&tx_pipe, &msg->base);
+	iproto_process_connect(msg);
 	return 0;
 }
 
@@ -2054,6 +2162,8 @@ static struct evio_service binary; /* iproto binary listener */
 static int
 net_cord_f(va_list /* ap */)
 {
+	slab_cache_create(&iproto_slabc, &runtime);
+
 	mempool_create(&iproto_msg_pool, &cord()->slabc,
 		       sizeof(struct iproto_msg));
 	mempool_create(&iproto_connection_pool, &cord()->slabc,
@@ -2297,7 +2407,8 @@ iproto_listen(const char *uri)
 size_t
 iproto_mem_used(void)
 {
-	return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc);
+	return slab_cache_used(&net_cord.slabc)
+	     + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc);
 }
 
 size_t
diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result
index 5d064b764..bfa9c2b75 100644
--- a/test/box-py/bad_trigger.result
+++ b/test/box-py/bad_trigger.result
@@ -14,6 +14,7 @@ type(box.session.on_connect(f1))
 - function
 ...
 greeting:  True
+Nothing to read yet:  Resource temporarily unavailable
 fixheader:  True
 error code 32
 error message:  [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value)
diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py
index 7d200b921..4739dfe13 100644
--- a/test/box-py/bad_trigger.test.py
+++ b/test/box-py/bad_trigger.test.py
@@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection
 from lib.tarantool_connection import TarantoolConnection
 from tarantool import NetworkError
 from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \
-    REQUEST_TYPE_ERROR
+    REQUEST_TYPE_ERROR, REQUEST_TYPE_PING
 import socket
 import msgpack
 
@@ -26,9 +26,25 @@ s = conn.socket
 # Read greeting
 print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE
 
-# Read error packet
+# Check socket
 IPROTO_FIXHEADER_SIZE = 5
-fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
+s.setblocking(False)
+fixheader = None
+try:
+    fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
+except socket.error as err:
+    print 'Nothing to read yet:', str(err).split(']')[1]
+else:
+    print 'Received fixheader'
+s.setblocking(True)
+
+# Send ping
+query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING })
+s.send(msgpack.dumps(len(query)) + query)
+
+# Read error packet
+if not fixheader:
+    fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
 print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE
 unpacker.feed(fixheader)
 packet_len = unpacker.unpack()
diff --git a/test/sql/prepared.result b/test/sql/prepared.result
index 0db2cc03f..e6d4a947f 100644
--- a/test/sql/prepared.result
+++ b/test/sql/prepared.result
@@ -752,6 +752,10 @@ if is_remote then
 end;
  | ---
  | ...
+test_run:wait_cond(function() return box.info.sql().cache.size == 0 end, 10);
+ | ---
+ | - true
+ | ...
 box.cfg{sql_cache_size = 0};
  | ---
  | ...
diff --git a/test/sql/prepared.test.lua b/test/sql/prepared.test.lua
index d8e8a44cb..798885230 100644
--- a/test/sql/prepared.test.lua
+++ b/test/sql/prepared.test.lua
@@ -271,6 +271,7 @@ if is_remote then
     cn:close()
     cn = remote.connect(box.cfg.listen)
 end;
+test_run:wait_cond(function() return box.info.sql().cache.size == 0 end, 10);
 box.cfg{sql_cache_size = 0};
 box.cfg{sql_cache_size = 3000};
 
-- 
2.17.1

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] iproto: make iproto thread more independent from tx
  2020-09-25 22:53 [Tarantool-patches] [PATCH v3] iproto: make iproto thread more independent from tx Ilya Kosarev
@ 2020-10-01 23:45 ` Vladislav Shpilevoy
  2020-10-19  9:49   ` Ilya Kosarev
  0 siblings, 1 reply; 4+ messages in thread
From: Vladislav Shpilevoy @ 2020-10-01 23:45 UTC (permalink / raw)
  To: Ilya Kosarev; +Cc: tarantool-patches

Hi! Thanks for the patch! Very nice! Extreme niceness! Optimistic connection handling!

See 17 small minor comments below.

1. The branch fails quite hard all the iproto-related tests on my machine,
some with crashes. Also when I tried to run it in the console, I get
'Peer closed' right after netbox.connect.

However I see CI green, and I am not sure why does it pass there. On my
machine all networking it totally broken on your branch.

I fixed it by adding this:

	@@ -578,6 +578,7 @@ iproto_msg_new(struct iproto_connection *con)
	 		return NULL;
	 	}
	 	msg->connection = con;
	+	msg->close_connection = false;
	 	rmean_collect(rmean_net, IPROTO_REQUESTS, 1);
	 	return msg;
	 }

It seems it is your bug, because previously it was used only on
connection, and initialized only in iproto_on_accept(). Now you
use it on each request, but not initialize on each request. Also this
should have been a huge luck it didn't fail on any CI job.

On 26.09.2020 00:53, Ilya Kosarev wrote:
> On connection, an evio service callback is invoked to accept it. The
> next step after acception was to process connection to tx thread
> through cbus. This meant that any connection interaction involves
> tx thread even before we get to decode what does the client want
> from us. Consequently, a number of problems appears. The main one
> is that we might get descriptor leak in case of unresponsive tx thread
> (for example, when building secondary index).

2. It is not a leak really. Just too many clients waiting for being
processed. Descriptors were not lost completely.

As for the issue - what was the problem not to start listening until
recovery is complete?

And how does this patch solve it, if the connections are still kept,
and so their descriptors? How did you validate the problem was gone?
It seems the descriptors still are created, and still are not closed.

If the problem is not gone, that why does this patch does what it does?

> There are some other
> cases where we might not want to spend precious tx time to process
> the connection in case iproto can do it all alone.

3. What are these other cases? I can think only of SWIM in iproto thread,
but it is not related at all.

> This patch allows iproto to accept connection and send greeting by
> itself. The connection is initialized in tx thread when the real
> request comes through iproto_msg_decode(). In case request type was not
> recognized we can also send reply with an error without using tx. It is
> planned to add more iproto logic to prevent extra interaction with
> tx thread. This patch already to some extent solves descriptors leakage
> problem as far as connection establishes and stays in fetch_schema
> state while tx thread is unresponsive.
> The other user visible change is that on_connect triggers won't run on
> connections that don't provide any input, as reflected in
> bad_trigger.test.py.
> 
> Part of #3776
> 
> @TarantoolBot document
> Title: iproto: on_connect triggers execution
> Update the documentation for on_connect triggers to reflect that they
> are now being executed only with the first request. Though the triggers
> are still the first thing to be executed on a new connection. While it
> is quite a synthetic case to establish a connection without making
> any requests it is technically possible and now your triggers won't be
> executed in this case. Some request is required to start their
> execution.

4. I am not sure I understand. According to 3776 there was a problem, that
the connections were made. So they didn't do any requests? And the
case is not that syntetic as it seems?

What was the problem to preserve the original behaviour? For example,
you could make the on_connect execution delayed. TX thread could tell
when recovery is done, and iproto would send the pending connections to it.

>  src/box/iproto.cc               | 317 +++++++++++++++++++++-----------
>  test/box-py/bad_trigger.result  |   1 +
>  test/box-py/bad_trigger.test.py |  22 ++-
>  test/sql/prepared.result        |   4 +
>  test/sql/prepared.test.lua      |   1 +
>  5 files changed, 239 insertions(+), 106 deletions(-)
> 
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index b8f65e5ec..9f98fce86 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -463,6 +474,7 @@ struct iproto_connection
>  	struct ev_io output;
>  	/** Logical session. */
>  	struct session *session;
> +	bool init_failed;

5. Flags need 'is_' prefix.

6. What is this? Why is it needed? Usually we leave comments
to each struct member. Otherwise it is hard to understand why
some of them are needed.

7. The field is accessed from tx thread only, and is close to
fields mutated from iproto thread. That leads to false-sharing
problem. Please, move it to iproto_connection.tx sub-struct.

>  	ev_loop *loop;
>  	/**
>  	 * Pre-allocated disconnect msg. Is sent right after
> @@ -677,9 +700,19 @@ iproto_connection_close(struct iproto_connection *con)
>  		 * is done only once.
>  		 */
>  		con->p_ibuf->wpos -= con->parse_size;
> -		cpipe_push(&tx_pipe, &con->disconnect_msg);
>  		assert(con->state == IPROTO_CONNECTION_ALIVE);
>  		con->state = IPROTO_CONNECTION_CLOSED;
> +		rlist_del(&con->in_stop_list);
> +		if (con->session != NULL) {
> +			cpipe_push(&tx_pipe, &con->disconnect_msg);
> +		} else {
> +			/*
> +			 * In case session was not created we can safely
> +			 * try to start destroy not involving tx thread.
> +			 */
> +			iproto_connection_try_to_start_destroy(con);
> +		}
> +		return;

8. You wouldn't need the 'return' and wouldn't need to duplicate
'rlist_del(&con->in_stop_list);' call, if you would move
'rlist_del(&con->in_stop_list);' to the beginning of this function
from its end.

>  	} else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
>  		iproto_connection_try_to_start_destroy(con);
>  	} else {
> @@ -809,6 +842,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>  	assert(rlist_empty(&con->in_stop_list));
>  	int n_requests = 0;
>  	bool stop_input = false;
> +	bool obuf_in_iproto = (con->session == NULL);

9. We always name flags starting from 'is_' prefix or a similar one like 'has_',
'does_'. Please, follow that agreement.

>  	const char *errmsg;
>  	while (con->parse_size != 0 && !stop_input) {
>  		if (iproto_check_msg_max()) {
> @@ -1314,13 +1367,24 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>  			 (uint32_t) type);
>  		goto error;
>  	}
> -	return;
> +	return 0;
>  error:
>  	/** Log and send the error. */
>  	diag_log();
> -	diag_create(&msg->diag);
> -	diag_move(&fiber()->diag, &msg->diag);
> -	cmsg_init(&msg->base, error_route);
> +	if (msg->connection->session != NULL) {
> +		diag_create(&msg->diag);
> +		diag_move(&fiber()->diag, &msg->diag);
> +		cmsg_init(&msg->base, error_route);
> +		return 1;

10. Why do you need 3 return values? You only check for -1. So you
could return 0 here like it was before, and -1 below. In the
calling code you would check != 0 like we do everywhere.

> +	}
> +	/*
> +	 * In case session was not created we can process error path
> +	 * without tx thread.
> +	 */
> +	tx_accept_wpos(msg->connection, &msg->wpos);
> +	tx_reply_error(msg);

11. There is a naming convention, that all tx_ functions are called in
TX thread always. All net_ and iproto_ functions are called in IProto
thread. Lets not violate the rule and obfuscate the iproto.cc code
even more that it is. Please, come up with new names reflecting where
the functions are used. Or at least not specifying it as tx_.

> +	net_send_msg(&(msg->base));

12. The function is called 'iproto_msg_decode'. Lets leave it do decoding,
and not send anything from it. It just violates its purpose and name.

> +	return -1;
>  }
>  
>  static void
> @@ -1478,13 +1538,27 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
>  	}
>  }
>  
> -static inline struct iproto_msg *
> -tx_accept_msg(struct cmsg *m)
> +static inline int
> +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg)
>  {
> -	struct iproto_msg *msg = (struct iproto_msg *) m;
> -	tx_accept_wpos(msg->connection, &msg->wpos);
> -	tx_fiber_init(msg->connection->session, msg->header.sync);
> -	return msg;
> +	*msg = (struct iproto_msg *) m;

13. Why do you change the return type and add an out parameter? You
could just return NULL in case of an error.

> +	/*
> +	 * In case connection init failed we don't need to try anymore.
> +	 */
> +	if ((*msg)->connection->init_failed)
> +		return -1;
> +	/*
> +	 * In case session was not created we need to init connection in tx and
> +	 * create it here.
> +	 */
> +	if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) {
> +		(*msg)->connection->init_failed = true;
> +		(*msg)->close_connection = true;
> +		return -1;
> +	}
> +	tx_accept_wpos((*msg)->connection, &(*msg)->wpos);
> +	tx_fiber_init((*msg)->connection->session, (*msg)->header.sync);
> +	return 0;
>  }
>  
>  /**
> @@ -1507,7 +1581,14 @@ tx_reply_error(struct iproto_msg *msg)
>  static void
>  tx_reply_iproto_error(struct cmsg *m)
>  {
> -	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct iproto_msg *msg;
> +	/*
> +	 * We don't need to check tx_accept_msg() return value here
> +	 * as far as if we might only process iproto error in tx
> +	 * in case connection session is already created and
> +	 * thus tx_accept_msg() can't fail.
> +	 */
> +	tx_accept_msg(m, &msg);

14. Well, if it fails, you have msg == NULL, and the code below will
crash. Otherwise it should be an assertion, not just a comment.

>  	struct obuf *out = msg->connection->tx.p_obuf;
>  	iproto_reply_error(out, diag_last_error(&msg->diag),
>  			   msg->header.sync, ::schema_version);
> @@ -1865,6 +1961,29 @@ tx_process_replication(struct cmsg *m)
>  	}
>  }
>  
> +/**
> + * Check connection health and try to send an error to the client
> + * in case of internal connection init or on_connect trigger failure.
> + */
> +static bool
> +iproto_connection_fail(struct iproto_msg *msg)

15. The function is called 'connection_fail' and literally does nothing
when the connection is fine. Does not this name look wrong to you?

> +{
> +	if (!msg->close_connection)
> +		return false;
> +	struct iproto_connection *con = msg->connection;
> +	int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov,
> +				 obuf_iovcnt(msg->wpos.obuf));

16. You can't just write to a socket whenever you want. It may be
not writable. Please, use libev events and callbacks to send anything.
You can do a write only when you get EV_WRITE event from libev, who
in turn gets it from select/poll/epoll/kqueue/whatever-else-is-provided
by the OS.

> +	if (nwr > 0) {
> +		/* Count statistics. */
> +		rmean_collect(rmean_net, IPROTO_SENT, nwr);
> +	} else if (nwr < 0 && ! sio_wouldblock(errno)) {
> +		diag_log();
> +	}
> +	iproto_connection_close(con);
> +	iproto_msg_delete(msg);
> +	return true;
> +}
> +
>  static void
>  net_send_msg(struct cmsg *m)
>  {
> @@ -1936,81 +2066,60 @@ net_end_subscribe(struct cmsg *m)
>  }
>  
>  /**
> - * Handshake a connection: invoke the on-connect trigger
> - * and possibly authenticate. Try to send the client an error
> - * upon a failure.
> + * Handshake a connection: prepare greeting for it.
>   */
>  static void
> -tx_process_connect(struct cmsg *m)
> +iproto_process_connect(struct iproto_msg *msg)
>  {
> -	struct iproto_msg *msg = (struct iproto_msg *) m;
>  	struct iproto_connection *con = msg->connection;
>  	struct obuf *out = msg->connection->tx.p_obuf;
> -	try {              /* connect. */
> -		con->session = session_create(SESSION_TYPE_BINARY);
> -		if (con->session == NULL)
> -			diag_raise();
> -		con->session->meta.connection = con;
> -		tx_fiber_init(con->session, 0);
> -		char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
> -		/* TODO: dirty read from tx thread */
> -		struct tt_uuid uuid = INSTANCE_UUID;
> -		random_bytes(con->salt, IPROTO_SALT_SIZE);
> -		greeting_encode(greeting, tarantool_version_id(), &uuid,
> -				con->salt, IPROTO_SALT_SIZE);
> -		obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
> -		if (! rlist_empty(&session_on_connect)) {
> -			if (session_run_on_connect_triggers(con->session) != 0)
> -				diag_raise();
> -		}
> +	char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
> +	/* TODO: dirty read from tx thread */
> +	struct tt_uuid uuid = INSTANCE_UUID;
> +	random_bytes(con->salt, IPROTO_SALT_SIZE);

17. Sorry, but wtf?! This data right now may be being written by TX thread,
so you literally may read and return *garbage* here. It is not acceptable.
It is not even a short term solution, it is just a bug, if you can't prove
the value is already initialized and constant by that moment.

If listen starts before recovery, and UUID is stored in the xlogs, then it
is definitely a bug.

> +	greeting_encode(greeting, tarantool_version_id(), &uuid,
> +			con->salt, IPROTO_SALT_SIZE);
> +	if (obuf_dup(out, greeting, IPROTO_GREETING_SIZE)
> +				 != IPROTO_GREETING_SIZE) {
> +		diag_set(OutOfMemory, IPROTO_GREETING_SIZE,
> +			 "greeting obuf", "dup");
> +		iproto_reply_error(out, diag_last_error(&fiber()->diag),
> +				   msg->header.sync, ::schema_version);
>  		iproto_wpos_create(&msg->wpos, out);
> -	} catch (Exception *e) {
> -		tx_reply_error(msg);
>  		msg->close_connection = true;
> -	}
> -}

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] iproto: make iproto thread more independent from tx
  2020-10-01 23:45 ` Vladislav Shpilevoy
@ 2020-10-19  9:49   ` Ilya Kosarev
  2020-10-27 22:06     ` Vladislav Shpilevoy
  0 siblings, 1 reply; 4+ messages in thread
From: Ilya Kosarev @ 2020-10-19  9:49 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 16538 bytes --]


Hi,
 
Thanks for your review!
 
Sent v4 of the patch considering the comments and privat discussion.
 
Some answers are below.
  
>Пятница, 2 октября 2020, 2:45 +03:00 от Vladislav Shpilevoy <v.shpilevoy@tarantool.org>:
> 
>Hi! Thanks for the patch! Very nice! Extreme niceness! Optimistic connection handling!
>
>See 17 small minor comments below.
>
>1. The branch fails quite hard all the iproto-related tests on my machine,
>some with crashes. Also when I tried to run it in the console, I get
>'Peer closed' right after netbox.connect.
>
>However I see CI green, and I am not sure why does it pass there. On my
>machine all networking it totally broken on your branch.
>
>I fixed it by adding this:
>
>@@ -578,6 +578,7 @@ iproto_msg_new(struct iproto_connection *con)
>return NULL;
>}
>msg->connection = con;
>+ msg->close_connection = false;
>rmean_collect(rmean_net, IPROTO_REQUESTS, 1);
>return msg;
>}
>
>It seems it is your bug, because previously it was used only on
>connection, and initialized only in iproto_on_accept(). Now you
>use it on each request, but not initialize on each request. Also this
>should have been a huge luck it didn't fail on any CI job.
Right, my fault here. Fixed in v4.
>
>On 26.09.2020 00:53, Ilya Kosarev wrote:
>> On connection, an evio service callback is invoked to accept it. The
>> next step after acception was to process connection to tx thread
>> through cbus. This meant that any connection interaction involves
>> tx thread even before we get to decode what does the client want
>> from us. Consequently, a number of problems appears. The main one
>> is that we might get descriptor leak in case of unresponsive tx thread
>> (for example, when building secondary index).
>
>2. It is not a leak really. Just too many clients waiting for being
>processed. Descriptors were not lost completely.
>
>As for the issue - what was the problem not to start listening until
>recovery is complete?
>
>And how does this patch solve it, if the connections are still kept,
>and so their descriptors? How did you validate the problem was gone?
>It seems the descriptors still are created, and still are not closed.
>
>If the problem is not gone, that why does this patch does what it does?
This topic was discussed with voice. Short answer here:
Previously iproto had only accepted connections without any
further actions, leading to the very fast descriptors exhaustion.
We can’t not start listening before recovery: it is needed for replication.
Patches solves is, cause the connections are now greeted and
they are not trying reconnect, wasting sockets. There is a reproducer
in the ticket, which i can't see how to include in test-run. But it shows
the difference.
>> There are some other
>> cases where we might not want to spend precious tx time to process
>> the connection in case iproto can do it all alone.
>
>3. What are these other cases? I can think only of SWIM in iproto thread,
>but it is not related at all.
The cases are that we can now add more logic to request
processing in iproto, if needed, when tx is not really needed to answer.
For example, see #4646.
This can also solve some problems with improper tx answers. 
>
>> This patch allows iproto to accept connection and send greeting by
>> itself. The connection is initialized in tx thread when the real
>> request comes through iproto_msg_decode(). In case request type was not
>> recognized we can also send reply with an error without using tx. It is
>> planned to add more iproto logic to prevent extra interaction with
>> tx thread. This patch already to some extent solves descriptors leakage
>> problem as far as connection establishes and stays in fetch_schema
>> state while tx thread is unresponsive.
>> The other user visible change is that on_connect triggers won't run on
>> connections that don't provide any input, as reflected in
>> bad_trigger.test.py.
>>
>> Part of #3776
>>
>> @TarantoolBot document
>> Title: iproto: on_connect triggers execution
>> Update the documentation for on_connect triggers to reflect that they
>> are now being executed only with the first request. Though the triggers
>> are still the first thing to be executed on a new connection. While it
>> is quite a synthetic case to establish a connection without making
>> any requests it is technically possible and now your triggers won't be
>> executed in this case. Some request is required to start their
>> execution.
>
>4. I am not sure I understand. According to 3776 there was a problem, that
>the connections were made. So they didn't do any requests? And the
>case is not that syntetic as it seems?
They didn’t make any requests just because they weren’t even greeted.
Instead they were reconnecting all the time. As soon as they are greeted
they are sending requests.
>
>What was the problem to preserve the original behaviour? For example,
>you could make the on_connect execution delayed. TX thread could tell
>when recovery is done, and iproto would send the pending connections to it.
I think we don’t really need to deal with the connections that don’t send requests.
>
>> src/box/iproto.cc | 317 +++++++++++++++++++++-----------
>> test/box-py/bad_trigger.result | 1 +
>> test/box-py/bad_trigger.test.py | 22 ++-
>> test/sql/prepared.result | 4 +
>> test/sql/prepared.test.lua | 1 +
>> 5 files changed, 239 insertions(+), 106 deletions(-)
>>
>> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
>> index b8f65e5ec..9f98fce86 100644
>> --- a/src/box/iproto.cc
>> +++ b/src/box/iproto.cc
>> @@ -463,6 +474,7 @@ struct iproto_connection
>> struct ev_io output;
>> /** Logical session. */
>> struct session *session;
>> + bool init_failed;
>
>5. Flags need 'is_' prefix.
Right, ok. Done in v4.
>
>6. What is this? Why is it needed? Usually we leave comments
>to each struct member. Otherwise it is hard to understand why
>some of them are needed.
Right, added the comment in v4.
>
>7. The field is accessed from tx thread only, and is close to
>fields mutated from iproto thread. That leads to false-sharing
>problem. Please, move it to iproto_connection.tx sub-struct.
Right! Fixed in v4.
>
>> ev_loop *loop;
>> /**
>> * Pre-allocated disconnect msg. Is sent right after
>> @@ -677,9 +700,19 @@ iproto_connection_close(struct iproto_connection *con)
>> * is done only once.
>> */
>> con->p_ibuf->wpos -= con->parse_size;
>> - cpipe_push(&tx_pipe, &con->disconnect_msg);
>> assert(con->state == IPROTO_CONNECTION_ALIVE);
>> con->state = IPROTO_CONNECTION_CLOSED;
>> + rlist_del(&con->in_stop_list);
>> + if (con->session != NULL) {
>> + cpipe_push(&tx_pipe, &con->disconnect_msg);
>> + } else {
>> + /*
>> + * In case session was not created we can safely
>> + * try to start destroy not involving tx thread.
>> + */
>> + iproto_connection_try_to_start_destroy(con);
>> + }
>> + return;
>
>8. You wouldn't need the 'return' and wouldn't need to duplicate
>'rlist_del(&con->in_stop_list);' call, if you would move
>'rlist_del(&con->in_stop_list);' to the beginning of this function
>from its end.
Right, done in v4.
>
>> } else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
>> iproto_connection_try_to_start_destroy(con);
>> } else {
>> @@ -809,6 +842,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>> assert(rlist_empty(&con->in_stop_list));
>> int n_requests = 0;
>> bool stop_input = false;
>> + bool obuf_in_iproto = (con->session == NULL);
>
>9. We always name flags starting from 'is_' prefix or a similar one like 'has_',
>'does_'. Please, follow that agreement.
All right, added in v4.
>
>> const char *errmsg;
>> while (con->parse_size != 0 && !stop_input) {
>> if (iproto_check_msg_max()) {
>> @@ -1314,13 +1367,24 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>> (uint32_t) type);
>> goto error;
>> }
>> - return;
>> + return 0;
>> error:
>> /** Log and send the error. */
>> diag_log();
>> - diag_create(&msg->diag);
>> - diag_move(&fiber()->diag, &msg->diag);
>> - cmsg_init(&msg->base, error_route);
>> + if (msg->connection->session != NULL) {
>> + diag_create(&msg->diag);
>> + diag_move(&fiber()->diag, &msg->diag);
>> + cmsg_init(&msg->base, error_route);
>> + return 1;
>
>10. Why do you need 3 return values? You only check for -1. So you
>could return 0 here like it was before, and -1 below. In the
>calling code you would check != 0 like we do everywhere.
Well, the idea was that we have 2 different error cases. But actually
it should be: 0 if the route was inited. -1 otherwise. Now it is like that.
Fixed in v4.
>
>> + }
>> + /*
>> + * In case session was not created we can process error path
>> + * without tx thread.
>> + */
>> + tx_accept_wpos(msg->connection, &msg->wpos);
>> + tx_reply_error(msg);
>
>11. There is a naming convention, that all tx_ functions are called in
>TX thread always. All net_ and iproto_ functions are called in IProto
>thread. Lets not violate the rule and obfuscate the iproto.cc code
>even more that it is. Please, come up with new names reflecting where
>the functions are used. Or at least not specifying it as tx_.
Well, this convention is already broken to some extent. For example,
tx_reply_error() itself calls  iproto_reply_error() and
iproto_wpos_create(). tx_process_sql() calls  iproto_reply_sql().
Though i agree that it is a good idea to follow the convention. I decided to
introduce reply_error() and obuf_accept_wpos() in v4.
>
>> + net_send_msg(&(msg->base));
>
>12. The function is called 'iproto_msg_decode'. Lets leave it do decoding,
>and not send anything from it. It just violates its purpose and name.
Right, fixed in v4. 
>
>> + return -1;
>> }
>>
>> static void
>> @@ -1478,13 +1538,27 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
>> }
>> }
>>
>> -static inline struct iproto_msg *
>> -tx_accept_msg(struct cmsg *m)
>> +static inline int
>> +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg)
>> {
>> - struct iproto_msg *msg = (struct iproto_msg *) m;
>> - tx_accept_wpos(msg->connection, &msg->wpos);
>> - tx_fiber_init(msg->connection->session, msg->header.sync);
>> - return msg;
>> + *msg = (struct iproto_msg *) m;
>
>13. Why do you change the return type and add an out parameter? You
>could just return NULL in case of an error.
No, i can’t, cause i will need this msg to process the error. I can’t get NULL
instead.
>
>> + /*
>> + * In case connection init failed we don't need to try anymore.
>> + */
>> + if ((*msg)->connection->init_failed)
>> + return -1;
>> + /*
>> + * In case session was not created we need to init connection in tx and
>> + * create it here.
>> + */
>> + if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) {
>> + (*msg)->connection->init_failed = true;
>> + (*msg)->close_connection = true;
>> + return -1;
>> + }
>> + tx_accept_wpos((*msg)->connection, &(*msg)->wpos);
>> + tx_fiber_init((*msg)->connection->session, (*msg)->header.sync);
>> + return 0;
>> }
>>
>> /**
>> @@ -1507,7 +1581,14 @@ tx_reply_error(struct iproto_msg *msg)
>> static void
>> tx_reply_iproto_error(struct cmsg *m)
>> {
>> - struct iproto_msg *msg = tx_accept_msg(m);
>> + struct iproto_msg *msg;
>> + /*
>> + * We don't need to check tx_accept_msg() return value here
>> + * as far as if we might only process iproto error in tx
>> + * in case connection session is already created and
>> + * thus tx_accept_msg() can't fail.
>> + */
>> + tx_accept_msg(m, &msg);
>
>14. Well, if it fails, you have msg == NULL, and the code below will
>crash. Otherwise it should be an assertion, not just a comment.
Right, introduced the assertion in v4.
>
>> struct obuf *out = msg->connection->tx.p_obuf;
>> iproto_reply_error(out, diag_last_error(&msg->diag),
>> msg->header.sync, ::schema_version);
>> @@ -1865,6 +1961,29 @@ tx_process_replication(struct cmsg *m)
>> }
>> }
>>
>> +/**
>> + * Check connection health and try to send an error to the client
>> + * in case of internal connection init or on_connect trigger failure.
>> + */
>> +static bool
>> +iproto_connection_fail(struct iproto_msg *msg)
>
>15. The function is called 'connection_fail' and literally does nothing
>when the connection is fine. Does not this name look wrong to you?
Ok, right, it will be iproto_connection_check_valid() in v4.
>
>> +{
>> + if (!msg->close_connection)
>> + return false;
>> + struct iproto_connection *con = msg->connection;
>> + int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov,
>> + obuf_iovcnt(msg->wpos.obuf));
>
>16. You can't just write to a socket whenever you want. It may be
>not writable. Please, use libev events and callbacks to send anything.
>You can do a write only when you get EV_WRITE event from libev, who
>in turn gets it from select/poll/epoll/kqueue/whatever-else-is-provided
>by the OS.
Right. As discussed in voice, it is special error case. In case we will
do the normal way, we won’t be able to send the error. I checked the
case with bad_trigger.test and it clearly shows that the client won’t
receive  the error message in case we will try to wait for the event here,
while sio_writev() does its job.
>
>> + if (nwr > 0) {
>> + /* Count statistics. */
>> + rmean_collect(rmean_net, IPROTO_SENT, nwr);
>> + } else if (nwr < 0 && ! sio_wouldblock(errno)) {
>> + diag_log();
>> + }
>> + iproto_connection_close(con);
>> + iproto_msg_delete(msg);
>> + return true;
>> +}
>> +
>> static void
>> net_send_msg(struct cmsg *m)
>> {
>> @@ -1936,81 +2066,60 @@ net_end_subscribe(struct cmsg *m)
>> }
>>
>> /**
>> - * Handshake a connection: invoke the on-connect trigger
>> - * and possibly authenticate. Try to send the client an error
>> - * upon a failure.
>> + * Handshake a connection: prepare greeting for it.
>> */
>> static void
>> -tx_process_connect(struct cmsg *m)
>> +iproto_process_connect(struct iproto_msg *msg)
>> {
>> - struct iproto_msg *msg = (struct iproto_msg *) m;
>> struct iproto_connection *con = msg->connection;
>> struct obuf *out = msg->connection->tx.p_obuf;
>> - try { /* connect. */
>> - con->session = session_create(SESSION_TYPE_BINARY);
>> - if (con->session == NULL)
>> - diag_raise();
>> - con->session->meta.connection = con;
>> - tx_fiber_init(con->session, 0);
>> - char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
>> - /* TODO: dirty read from tx thread */
>> - struct tt_uuid uuid = INSTANCE_UUID;
>> - random_bytes(con->salt, IPROTO_SALT_SIZE);
>> - greeting_encode(greeting, tarantool_version_id(), &uuid,
>> - con->salt, IPROTO_SALT_SIZE);
>> - obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
>> - if (! rlist_empty(&session_on_connect)) {
>> - if (session_run_on_connect_triggers(con->session) != 0)
>> - diag_raise();
>> - }
>> + char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
>> + /* TODO: dirty read from tx thread */
>> + struct tt_uuid uuid = INSTANCE_UUID;
>> + random_bytes(con->salt, IPROTO_SALT_SIZE);
>
>17. Sorry, but wtf?! This data right now may be being written by TX thread,
>so you literally may read and return *garbage* here. It is not acceptable.
>It is not even a short term solution, it is just a bug, if you can't prove
>the value is already initialized and constant by that moment.
>
>If listen starts before recovery, and UUID is stored in the xlogs, then it
>is definitely a bug.
As discussed in voice, actually, everything is fine. My fault to leave the
wrong comment here (I was not totally sure everything is fine, but the
comment was also wrong before my patch). In v4 it is replaced with
the correct one, that explains the situation. Here it is:
/*
 * INSTANCE_UUID is guaranteed to be inited before this moment.
 * We start listening either in local_recovery() or bootstrap().
 * The INSTANCE_UUID is ensured to be inited in the beginning of
 * both methods. In case of local_recovery() it is verified that
 * INSTANCE_UUID was read from the snapshot in memtx_engine_new().
 * In bootstrap() INSTANCE_UUID is either taken from the
 * instance_uuid box.cfg{} param or created on the spot.
 */
>
>> + greeting_encode(greeting, tarantool_version_id(), &uuid,
>> + con->salt, IPROTO_SALT_SIZE);
>> + if (obuf_dup(out, greeting, IPROTO_GREETING_SIZE)
>> + != IPROTO_GREETING_SIZE) {
>> + diag_set(OutOfMemory, IPROTO_GREETING_SIZE,
>> + "greeting obuf", "dup");
>> + iproto_reply_error(out, diag_last_error(&fiber()->diag),
>> + msg->header.sync, ::schema_version);
>> iproto_wpos_create(&msg->wpos, out);
>> - } catch (Exception *e) {
>> - tx_reply_error(msg);
>> msg->close_connection = true;
>> - }
>> -}
--
Ilya Kosarev

[-- Attachment #2: Type: text/html, Size: 23534 bytes --]

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] iproto: make iproto thread more independent from tx
  2020-10-19  9:49   ` Ilya Kosarev
@ 2020-10-27 22:06     ` Vladislav Shpilevoy
  0 siblings, 0 replies; 4+ messages in thread
From: Vladislav Shpilevoy @ 2020-10-27 22:06 UTC (permalink / raw)
  To: Ilya Kosarev; +Cc: tarantool-patches

Привет. Далее будет ревью на русском, чтобы лучше понимать друг
друга.

>     > There are some other
>     > cases where we might not want to spend precious tx time to process
>     > the connection in case iproto can do it all alone.
> 
>     3. What are these other cases? I can think only of SWIM in iproto thread,
>     but it is not related at all.
> 
> The cases are that we can now add more logic to request
> processing in iproto, if needed, when tx is not really needed to answer.
> For example, see #4646.
> This can also solve some problems with improper tx answers. 

Какие improper tx answers?

>     > This patch allows iproto to accept connection and send greeting by
>     > itself. The connection is initialized in tx thread when the real
>     > request comes through iproto_msg_decode(). In case request type was not
>     > recognized we can also send reply with an error without using tx. It is
>     > planned to add more iproto logic to prevent extra interaction with
>     > tx thread. This patch already to some extent solves descriptors leakage
>     > problem as far as connection establishes and stays in fetch_schema
>     > state while tx thread is unresponsive.
>     > The other user visible change is that on_connect triggers won't run on
>     > connections that don't provide any input, as reflected in
>     > bad_trigger.test.py.
>     >
>     > Part of #3776
>     >
>     > @TarantoolBot document
>     > Title: iproto: on_connect triggers execution
>     > Update the documentation for on_connect triggers to reflect that they
>     > are now being executed only with the first request. Though the triggers
>     > are still the first thing to be executed on a new connection. While it
>     > is quite a synthetic case to establish a connection without making
>     > any requests it is technically possible and now your triggers won't be
>     > executed in this case. Some request is required to start their
>     > execution.
> 
>     4. I am not sure I understand. According to 3776 there was a problem, that
>     the connections were made. So they didn't do any requests? And the
>     case is not that syntetic as it seems?
> 
> They didn’t make any requests just because they weren’t even greeted.
> Instead they were reconnecting all the time. As soon as they are greeted
> they are sending requests.

Не совсем так. Да, они не были greeted, но проблема не в том, что они слали или
не слали реквесты. А в том, что даже если они делали дисконнект, он должен был
пройти через tx поток. После твоего патча вроде как не должен. В итоге они так
же коннектятся, зависают, отключаются, но теперь закрытие сокета ловится как
EV_READ в ипрото потоке, и там же закрывается. Правда объект iproto_connection,
все буферы, и struct iproto_msg не удаляются все равно. Как бы это не вылетело
по памяти еще раньше.

Все-таки неправильно, что ты сразу шлешь запросы в tx поток. Надо чтоб TX явно
говорил, что это можно делать. До тех пор даже не пытаться туда ничего слать.
Если учесть, что инстанс может подниматься часами, у него может кончиться память
при агрессивных реконнектах с кучи клиентов.

Сейчас время на это есть, поэтому кмк лучше сделать это одним нормальным
патчсетом, торопиться некуда.

>     What was the problem to preserve the original behaviour? For example,
>     you could make the on_connect execution delayed. TX thread could tell
>     when recovery is done, and iproto would send the pending connections to it.
> 
> I think we don’t really need to deal with the connections that don’t send requests.

Тут не про то, кто как думает, а как не сломать совместимость. Походу это ломает.
Раньше триггер звался на все коннекты, и юзер мог их например отлуплять, выдав
ошибку в триггере. Или как-то логгировать.

Я все еще не вижу препятствий это не ломать. Пока ТХ тред не дал разрешение передавать
ему запросы, отлупляем всех.

Когда ТХ тред дал разрешение, все еще не закрытые коннекты идут в ТХ, даже если у них
нет запросов. В чем проблема это сделать и не ломать то, что уже есть?

>     > const char *errmsg;
>     > while (con->parse_size != 0 && !stop_input) {
>     > if (iproto_check_msg_max()) {
>     > @@ -1314,13 +1367,24 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>     > (uint32_t) type);
>     > goto error;
>     > }
>     > - return;
>     > + return 0;
>     > error:
>     > /** Log and send the error. */
>     > diag_log();
>     > - diag_create(&msg->diag);
>     > - diag_move(&fiber()->diag, &msg->diag);
>     > - cmsg_init(&msg->base, error_route);
>     > + if (msg->connection->session != NULL) {
>     > + diag_create(&msg->diag);
>     > + diag_move(&fiber()->diag, &msg->diag);
>     > + cmsg_init(&msg->base, error_route);
>     > + return 1;
> 
>     10. Why do you need 3 return values? You only check for -1. So you
>     could return 0 here like it was before, and -1 below. In the
>     calling code you would check != 0 like we do everywhere.
> 
> Well, the idea was that we have 2 different error cases. But actually
> it should be: 0 if the route was inited. -1 otherwise. Now it is like that.
> Fixed in v4.

Идея понятна, что там разное может внутри произойти. Но ты использовал только
2 случая. Какой смысл в третьем? Это был просто мертвый код. Добавлять его
"на всякий случай" нет никакого смысла.

>     > + }
>     > + /*
>     > + * In case session was not created we can process error path
>     > + * without tx thread.
>     > + */
>     > + tx_accept_wpos(msg->connection, &msg->wpos);
>     > + tx_reply_error(msg);
> 
>     11. There is a naming convention, that all tx_ functions are called in
>     TX thread always. All net_ and iproto_ functions are called in IProto
>     thread. Lets not violate the rule and obfuscate the iproto.cc code
>     even more that it is. Please, come up with new names reflecting where
>     the functions are used. Or at least not specifying it as tx_.
> 
> Well, this convention is already broken to some extent. For example,
> tx_reply_error() itself calls iproto_reply_error() and
> iproto_wpos_create(). tx_process_sql() calls iproto_reply_sql().

Возможно я напиздел. Скорее всего iproto_ - это общие функции. net_ - для
сетевого потока, и tx_ - для главного потока. С некоторыми исключениями,
которые скорее всего просто неправильны.

>     > +{
>     > + if (!msg->close_connection)
>     > + return false;
>     > + struct iproto_connection *con = msg->connection;
>     > + int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov,
>     > + obuf_iovcnt(msg->wpos.obuf));
> 
>     16. You can't just write to a socket whenever you want. It may be
>     not writable. Please, use libev events and callbacks to send anything.
>     You can do a write only when you get EV_WRITE event from libev, who
>     in turn gets it from select/poll/epoll/kqueue/whatever-else-is-provided
>     by the OS.
> 
> Right. As discussed in voice, it is special error case. In case we will
> do the normal way, we won’t be able to send the error. I checked the
> case with bad_trigger.test and it clearly shows that the client won’t
> receive  the error message in case we will try to wait for the event here,
> while sio_writev() does its job.

Ясен пень, если ты сокет закроешь, то писать в него будет нельзя, и никакие
события на него больше не придут от libev. Надо его не закрывать, и тогда
все будет.

Голосом мы обсудили, что это не специальный случай, когда записать ошибку нормально
невозможно, а что просто автору оригинального кода было лень сделать это
нормально через события. И он решил записать в сокет напрямую. Теперь это
протаскивается дальше. Еще вероятная причина - все заточено на использование
obuf, в который нельзя было писать в ипрото потоке впринципе. Я пока не могу
сказать, что с этим делать. Зависит от того, что дальше будет с obuf. См
другое письмо.

^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2020-10-27 22:06 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-09-25 22:53 [Tarantool-patches] [PATCH v3] iproto: make iproto thread more independent from tx Ilya Kosarev
2020-10-01 23:45 ` Vladislav Shpilevoy
2020-10-19  9:49   ` Ilya Kosarev
2020-10-27 22:06     ` Vladislav Shpilevoy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox