[Tarantool-patches] [PATCH v2 5/8] iproto: implement streams in iproto

mechanik20051988 mechanik20051988 at tarantool.org
Mon Aug 9 17:37:57 MSK 2021


From: mechanik20051988 <mechanik20.05.1988 at gmail.com>

Implement streams in iproto. There is a hash table of streams for
each connection. When a new request comes with a non-zero stream ID,
we look for the stream with such ID in this table and if it does not
exist, we create it. The request is placed in the queue of pending
requests, and if this queue was empty at the time of its receipt, it
is pushed to the tx thread for processing. When a request belonging to
stream returns to the network thread after processing is completed, we
take the next request out of the queue of pending requests and send it
for processing to tx thread. If there is no pending requests we remove
stream object from hash table and destroy it. Requests with zero stream
ID are processed in the old way.

Part of #5860

@TarantoolBot document
Title: streams are implemented in iproto
A distinctive feature of streams is that all requests in them
are processed sequentially. The execution of the next request
in stream will not start until the previous one is completed.
To separate requests belonging to and not belonging to streams
we use stream ID field in binary iproto protocol: requests with
non-zero stream ID belongs to some stream. Stream ID is unique
within the connection and indicates which stream the request
belongs to. For streams from different connections, the IDs may
be the same.
---
 src/box/errcode.h      |   1 +
 src/box/iproto.cc      | 228 ++++++++++++++++++++++++++++++++++++++++-
 src/lib/core/errinj.h  |   2 +
 test/box/errinj.result |   2 +
 test/box/error.result  |   1 +
 5 files changed, 229 insertions(+), 5 deletions(-)

diff --git a/src/box/errcode.h b/src/box/errcode.h
index ef2b2e9b1..f8fda23c1 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -281,6 +281,7 @@ struct errcode_record {
 	/*226 */_(ER_NOT_LEADER,		"The instance is not a leader. New leader is %u")\
 	/*227 */_(ER_SYNC_QUEUE_UNCLAIMED,	"The synchronous transaction queue doesn't belong to any instance")\
 	/*228 */_(ER_SYNC_QUEUE_FOREIGN,	"The synchronous transaction queue belongs to other instance with id %u")\
+	/*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process %s request in stream") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index dcf60e1be..3b792130b 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -64,6 +64,8 @@
 #include "execute.h"
 #include "errinj.h"
 #include "tt_static.h"
+#include "salad/stailq.h"
+#include "assoc.h"
 
 enum {
 	IPROTO_SALT_SIZE = 32,
@@ -74,6 +76,21 @@ enum {
 	 ENDPOINT_NAME_MAX = 10
 };
 
+struct iproto_connection;
+
+struct iproto_stream {
+	/**
+	 * Queue of pending requests (iproto messages) for this stream,
+	 * processed sequentially. This field is accesable only from
+	 * iproto thread. Queue items has iproto_msg type.
+	 */
+	struct stailq pending_requests;
+	/** Id of this stream, used as a key in streams hash table */
+	uint64_t id;
+	/** This stream connection */
+	struct iproto_connection *connection;
+};
+
 /**
  * A position in connection output buffer.
  * Since we use rotating buffers to recycle memory,
@@ -136,6 +153,7 @@ struct iproto_thread {
 	 */
 	struct mempool iproto_msg_pool;
 	struct mempool iproto_connection_pool;
+	struct mempool iproto_stream_pool;
 	/*
 	 * List of stopped connections
 	 */
@@ -304,6 +322,16 @@ struct iproto_msg
 	 * and the connection must be closed.
 	 */
 	bool close_connection;
+	/**
+	 * A stailq_entry to hold message in stream.
+	 * All messages processed in stream sequently. Before processing
+	 * all messages added to queue of pending requests. If this queue
+	 * was empty message begins to be processed, otherwise it waits until
+	 * all previous messages are processed.
+	 */
+	struct stailq_entry in_stream;
+	/** Stream that owns this message, or NULL. */
+	struct iproto_stream *stream;
 };
 
 static struct iproto_msg *
@@ -505,6 +533,11 @@ struct iproto_connection
 	 */
 	enum iproto_connection_state state;
 	struct rlist in_stop_list;
+	/**
+	 * Hash table that holds all streams for this connection.
+	 * This field is accesable only from iproto thread.
+	 */
+	struct mh_i64ptr_t *streams;
 	/**
 	 * Kharon is used to implement box.session.push().
 	 * When a new push is ready, tx uses kharon to notify
@@ -572,6 +605,48 @@ struct iproto_connection
 } while (0);
 #endif
 
+/*
+ * TODO(gh-6293): Implement necessary statistic for iproto streams
+ * and remove it from errinj.
+ */
+static inline void
+errinj_stream_count_add(MAYBE_UNUSED int val)
+{
+#ifndef NDEBUG
+	struct errinj *inj =
+		errinj(ERRINJ_IPROTO_STREAM_COUNT, ERRINJ_INT);
+	__atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST);
+#endif
+}
+
+static inline void
+errinj_stream_msg_count_add(MAYBE_UNUSED int val)
+{
+#ifndef NDEBUG
+	struct errinj *inj =
+		errinj(ERRINJ_IPROTO_STREAM_MSG_COUNT, ERRINJ_INT);
+	__atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST);
+#endif
+}
+
+static struct iproto_stream *
+iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
+{
+	struct iproto_thread *iproto_thread = connection->iproto_thread;
+	struct iproto_stream *stream = (struct iproto_stream *)
+		mempool_alloc(&iproto_thread->iproto_stream_pool);
+	if (stream == NULL) {
+		diag_set(OutOfMemory, sizeof(*stream),
+			 "mempool_alloc", "stream");
+		return NULL;
+	}
+	errinj_stream_count_add(1);
+	stailq_create(&stream->pending_requests);
+	stream->id = stream_id;
+	stream->connection = connection;
+	return stream;
+}
+
 /**
  * Return true if we have not enough spare messages
  * in the message pool.
@@ -591,6 +666,14 @@ iproto_msg_delete(struct iproto_msg *msg)
 	iproto_resume(iproto_thread);
 }
 
+static void
+iproto_stream_delete(struct iproto_stream *stream)
+{
+	assert(stailq_empty(&stream->pending_requests));
+	errinj_stream_count_add(-1);
+	mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream);
+}
+
 static struct iproto_msg *
 iproto_msg_new(struct iproto_connection *con)
 {
@@ -609,6 +692,7 @@ iproto_msg_new(struct iproto_connection *con)
 	}
 	msg->close_connection = false;
 	msg->connection = con;
+	msg->stream = NULL;
 	rmean_collect(con->iproto_thread->rmean, IPROTO_REQUESTS, 1);
 	return msg;
 }
@@ -836,6 +920,63 @@ iproto_connection_input_buffer(struct iproto_connection *con)
 	return new_ibuf;
 }
 
+/**
+ * Check if message belongs to stream (stream_id != 0), and if it
+ * is so create new stream or get stream from connection streams
+ * hash table. Put message to stream pending messages list.
+ * @retval 0 - the message is ready to push to TX thread (either if
+ *             stream_id is not set (is zero) or the stream is not
+ *             processing other messages).
+ *         1 - the message is postponed because its stream is busy
+ *             processing previous message(s).
+ *        -1 - memory error.
+ */
+static int
+iproto_msg_start_processing_in_stream(struct iproto_msg *msg)
+{
+	uint64_t stream_id = msg->header.stream_id;
+	if (stream_id == 0)
+		return 0;
+
+	struct iproto_connection *con = msg->connection;
+	struct iproto_stream *stream = NULL;
+	mh_int_t pos = mh_i64ptr_find(con->streams, stream_id, 0);
+	if (pos == mh_end(con->streams)) {
+		stream = iproto_stream_new(msg->connection, msg->header.stream_id);
+		if (stream == NULL)
+			return -1;
+		struct mh_i64ptr_node_t node;
+		node.key = stream_id;
+		node.val = stream;
+		pos = mh_i64ptr_put(con->streams, &node, NULL, NULL);
+		if (pos == mh_end(con->streams)) {
+			iproto_stream_delete(stream);
+			diag_set(OutOfMemory, pos + 1, "mh_streams_put",
+				 "mh_streams_node");
+			return -1;
+		}
+	}
+	/*
+	 * Not all messages belongs to stream. We can't determine which
+	 * messages belong to stream in `iproto_msg_new`, so we increment
+	 * ERRINJ_IPROTO_STREAM_MSG_COUNT here, when we already know it.
+	 * In `iproto_msg_delete` we decrement ERRINJ_IPROTO_STREAM_MSG_COUNT
+	 * only if msg->stream != NULL.
+	 */
+	errinj_stream_msg_count_add(1);
+	stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val;
+	msg->stream = stream;
+	/*
+	 * If the request queue in the stream is not empty, it means
+	 * that some previous message wasn't processed yet. Regardless
+	 * of this, we put the message in the queue, but we start processing
+	 * the message only if the message queue in the stream was empty.
+	 */
+	bool was_not_empty = !stailq_empty(&stream->pending_requests);
+	stailq_add_tail_entry(&stream->pending_requests, msg, in_stream);
+	return was_not_empty ? 1 : 0;
+}
+
 /**
  * Enqueue all requests which were read up. If a request limit is
  * reached - stop the connection input even if not the whole batch
@@ -845,7 +986,7 @@ iproto_connection_input_buffer(struct iproto_connection *con)
  * @param in Buffer to parse.
  *
  * @retval  0 Success.
- * @retval -1 Invalid MessagePack error.
+ * @retval -1 Invalid MessagePack or memory error.
  */
 static inline int
 iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
@@ -898,12 +1039,25 @@ err_msgpack:
 		msg->len = reqend - reqstart; /* total request length */
 
 		iproto_msg_decode(msg, &pos, reqend, &stop_input);
+
+		int rc = iproto_msg_start_processing_in_stream(msg);
+		if (rc < 0) {
+			iproto_msg_delete(msg);
+			return -1;
+		}
 		/*
-		 * This can't throw, but should not be
-		 * done in case of exception.
+		 * rc > 0, means that stream pending requests queue is not
+		 * empty, skip push.
 		 */
-		cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
-		n_requests++;
+		if (rc == 0) {
+			/*
+			 * This can't throw, but should not be
+			 * done in case of exception.
+			 */
+			cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
+			n_requests++;
+		}
+
 		/* Request is parsed */
 		assert(reqend > reqstart);
 		assert(con->parse_size >= (size_t) (reqend - reqstart));
@@ -1145,6 +1299,13 @@ iproto_connection_new(struct iproto_thread *iproto_thread, int fd)
 		diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con");
 		return NULL;
 	}
+	con->streams = mh_i64ptr_new();
+	if (con->streams == NULL) {
+		diag_set(OutOfMemory, sizeof(*(con->streams)),
+			 "mh_streams_new", "streams");
+		mempool_free(&con->iproto_thread->iproto_connection_pool, con);
+		return NULL;
+	}
 	con->iproto_thread = iproto_thread;
 	con->input.data = con->output.data = con;
 	con->loop = loop();
@@ -1193,6 +1354,9 @@ iproto_connection_delete(struct iproto_connection *con)
 	       con->obuf[0].iov[0].iov_base == NULL);
 	assert(con->obuf[1].pos == 0 &&
 	       con->obuf[1].iov[0].iov_base == NULL);
+
+	assert(mh_size(con->streams) == 0);
+	mh_i64ptr_delete(con->streams);
 	mempool_free(&con->iproto_thread->iproto_connection_pool, con);
 }
 
@@ -1240,7 +1404,9 @@ static void
 iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		  bool *stop_input)
 {
+	uint64_t stream_id;
 	uint8_t type;
+	bool request_is_not_for_stream;
 	struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
 
 	if (xrow_header_decode(&msg->header, pos, reqend, true))
@@ -1248,6 +1414,16 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 	assert(*pos == reqend);
 
 	type = msg->header.type;
+	stream_id = msg->header.stream_id;
+	request_is_not_for_stream =
+		((type > IPROTO_TYPE_STAT_MAX &&
+		 type != IPROTO_PING) || type == IPROTO_AUTH);
+
+	if (stream_id != 0 && request_is_not_for_stream) {
+		diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM,
+			 iproto_type_name(type));
+		goto error;
+	}
 
 	/*
 	 * Parse request before putting it into the queue
@@ -1873,12 +2049,52 @@ tx_process_replication(struct cmsg *m)
 	}
 }
 
+static void
+iproto_msg_finish_processing_in_stream(struct iproto_msg *msg)
+{
+	struct iproto_connection *con = msg->connection;
+	struct iproto_stream *stream = msg->stream;
+
+	assert(stream != NULL);
+	struct iproto_msg *tmp =
+		stailq_shift_entry(&stream->pending_requests,
+				   struct iproto_msg, in_stream);
+	assert(tmp == msg);
+	(void)tmp;
+	errinj_stream_msg_count_add(-1);
+
+	if (stailq_empty(&stream->pending_requests)) {
+		struct mh_i64ptr_node_t node = { stream->id, NULL };
+		mh_i64ptr_remove(con->streams, &node, 0);
+		iproto_stream_delete(stream);
+	} else {
+		/*
+		 * If there are new messages for this stream
+		 * then schedule their processing.
+		 */
+		struct iproto_msg *next =
+			stailq_first_entry(&stream->pending_requests,
+					   struct iproto_msg,
+					   in_stream);
+		assert(next != NULL);
+		next->wpos = con->wpos;
+		cpipe_push_input(&con->iproto_thread->tx_pipe, &next->base);
+		cpipe_flush_input(&con->iproto_thread->tx_pipe);
+	}
+}
+
 static void
 net_send_msg(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
+	struct iproto_stream *stream = msg->stream;
+
+	if (stream == NULL)
+		goto send_msg;
 
+	iproto_msg_finish_processing_in_stream(msg);
+send_msg:
 	if (msg->len != 0) {
 		/* Discard request (see iproto_enqueue_batch()). */
 		msg->p_ibuf->rpos += msg->len;
@@ -2066,6 +2282,8 @@ net_cord_f(va_list  ap)
 		       sizeof(struct iproto_msg));
 	mempool_create(&iproto_thread->iproto_connection_pool, &cord()->slabc,
 		       sizeof(struct iproto_connection));
+	mempool_create(&iproto_thread->iproto_stream_pool, &cord()->slabc,
+		       sizeof(struct iproto_stream));
 
 	evio_service_init(loop(), &iproto_thread->binary, "binary",
 			  iproto_on_accept, iproto_thread);
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index 3fe4c7c22..01b3eddef 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -155,6 +155,8 @@ struct errinj {
 	_(ERRINJ_IPROTO_WRITE_ERROR_DELAY, ERRINJ_BOOL, {.bparam = false})\
 	_(ERRINJ_APPLIER_READ_TX_ROW_DELAY, ERRINJ_BOOL, {.bparam = false})\
 	_(ERRINJ_NETBOX_IO_DELAY, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_IPROTO_STREAM_COUNT, ERRINJ_INT, {.iparam = 0}) \
+	_(ERRINJ_IPROTO_STREAM_MSG_COUNT, ERRINJ_INT, {.iparam = 0}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/test/box/errinj.result b/test/box/errinj.result
index adb682ac3..8b45cdefc 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -59,6 +59,8 @@ evals
   - ERRINJ_INDEX_ALLOC: false
   - ERRINJ_INDEX_RESERVE: false
   - ERRINJ_IPROTO_SINGLE_THREAD_STAT: -1
+  - ERRINJ_IPROTO_STREAM_COUNT: 0
+  - ERRINJ_IPROTO_STREAM_MSG_COUNT: 0
   - ERRINJ_IPROTO_TX_DELAY: false
   - ERRINJ_IPROTO_WRITE_ERROR_DELAY: false
   - ERRINJ_LOG_ROTATE: false
diff --git a/test/box/error.result b/test/box/error.result
index b7ac7a138..f80fdfed5 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -447,6 +447,7 @@ t;
  |   226: box.error.NOT_LEADER
  |   227: box.error.SYNC_QUEUE_UNCLAIMED
  |   228: box.error.SYNC_QUEUE_FOREIGN
+ |   229: box.error.UNABLE_TO_PROCESS_IN_STREAM
  | ...
 
 test_run:cmd("setopt delimiter ''");
-- 
2.20.1



More information about the Tarantool-patches mailing list