Tarantool development patches archive
 help / color / mirror / Atom feed
From: mechanik20051988 via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: v.shpilevoy@tarantool.org, vdavydov@tarantool.org
Cc: tarantool-patches@dev.tarantool.org,
	mechanik20051988 <mechanik20.05.1988@gmail.com>
Subject: [Tarantool-patches] [PATCH 4/7] iproto: implement streams in iproto
Date: Thu,  5 Aug 2021 21:17:42 +0300	[thread overview]
Message-ID: <29a71e8ded595c28414e405a68c8927227fa28cb.1628184138.git.mechanik20.05.1988@gmail.com> (raw)
In-Reply-To: <cover.1628184138.git.mechanik20.05.1988@gmail.com>

From: mechanik20051988 <mechanik20.05.1988@gmail.com>

Implement streams in iproto. There is a hash table of streams for
each connection. When a new request comes with a non-zero stream ID,
we look for the stream with such ID in this table and if it does not
exist, we create it. The request is placed in the queue of pending
requests, and if this queue was empty at the time of its receipt, it
is pushed to the tx thread for processing. When a request belonging to
stream returns to the network thread after processing is completed, we
take the next request out of the queue of pending requests and send it
for processing to tx thread. If there is no pending requests we remove
stream object from hash table and destroy it. Requests with zero stream
ID are processed in the old way.

Part of #5860

@TarantoolBot document
Title: streams are implemented in iproto
A distinctive feature of streams is that all requests in them
are processed sequentially. The execution of the next request
in stream will not start until the previous one is completed.
To separate requests belonging to and not belonging to streams
we use stream ID field in binary iproto protocol: requests with
non-zero stream ID belongs to some stream. Stream ID is unique
within the connection and indicates which stream the request
belongs to. For streams from different connections, the IDs may
be the same.
---
 src/box/errcode.h      |   1 +
 src/box/iproto.cc      | 216 ++++++++++++++++++++++++++++++++++++++++-
 src/lib/core/errinj.h  |   2 +
 test/box/errinj.result |   2 +
 test/box/error.result  |   1 +
 5 files changed, 217 insertions(+), 5 deletions(-)

diff --git a/src/box/errcode.h b/src/box/errcode.h
index d2854677f..6c8c00256 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -278,6 +278,7 @@ struct errcode_record {
 	/*223 */_(ER_INTERFERING_PROMOTE,	"Instance with replica id %u was promoted first") \
 	/*224 */_(ER_ELECTION_DISABLED,		"Elections were turned off")\
 	/*225 */_(ER_TXN_ROLLBACK,		"Transaction was rolled back") \
+	/*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process this type (%u) of requests in stream") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 3ed641eea..3642cbd02 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -63,6 +63,8 @@
 #include "execute.h"
 #include "errinj.h"
 #include "tt_static.h"
+#include "salad/stailq.h"
+#include "assoc.h"
 
 enum {
 	IPROTO_SALT_SIZE = 32,
@@ -73,6 +75,21 @@ enum {
 	 ENDPOINT_NAME_MAX = 10
 };
 
+struct iproto_connection;
+
+struct iproto_stream {
+	/**
+	 * Queue of pending requests (iproto messages) for this stream,
+	 * processed sequentially. This field is accesable only from
+	 * iproto thread. Queue items has iproto_msg type.
+	 */
+	struct stailq pending_requests;
+	/** Id of this stream, used as a key in streams hash table */
+	uint64_t id;
+	/** This stream connection */
+	struct iproto_connection *connection;
+};
+
 /**
  * A position in connection output buffer.
  * Since we use rotating buffers to recycle memory,
@@ -135,6 +152,7 @@ struct iproto_thread {
 	 */
 	struct mempool iproto_msg_pool;
 	struct mempool iproto_connection_pool;
+	struct mempool stream_pool;
 	/*
 	 * List of stopped connections
 	 */
@@ -303,6 +321,16 @@ struct iproto_msg
 	 * and the connection must be closed.
 	 */
 	bool close_connection;
+	/**
+	 * A stailq_entry to hold message in stream.
+	 * All messages processed in stream sequently. Before processing
+	 * all messages added to queue of pending requests. If this queue
+	 * was empty message begins to be processed, otherwise it waits until
+	 * all previous messages are processed.
+	 */
+	struct stailq_entry in_stream;
+	/** Stream that owns this message, or NULL. */
+	struct iproto_stream *stream;
 };
 
 static struct iproto_msg *
@@ -504,6 +532,11 @@ struct iproto_connection
 	 */
 	enum iproto_connection_state state;
 	struct rlist in_stop_list;
+	/**
+	 * Hash table that holds all streams for this connection.
+	 * This field is accesable only from iproto thread.
+	 */
+	struct mh_i64ptr_t *streams;
 	/**
 	 * Kharon is used to implement box.session.push().
 	 * When a new push is ready, tx uses kharon to notify
@@ -557,6 +590,44 @@ struct iproto_connection
 	struct iproto_thread *iproto_thread;
 };
 
+static inline void
+errinj_stream_count_add(MAYBE_UNUSED int val)
+{
+#ifndef NDEBUG
+	struct errinj *inj =
+		errinj(ERRINJ_IPROTO_STREAM_COUNT, ERRINJ_INT);
+	__atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST);
+#endif
+}
+
+static inline void
+errinj_stream_msg_count_add(MAYBE_UNUSED int val)
+{
+#ifndef NDEBUG
+	struct errinj *inj =
+		errinj(ERRINJ_IPROTO_STREAM_MSG_COUNT, ERRINJ_INT);
+	__atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST);
+#endif
+}
+
+static struct iproto_stream *
+iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
+{
+	struct iproto_thread *iproto_thread = connection->iproto_thread;
+	struct iproto_stream *stream = (struct iproto_stream *)
+		mempool_alloc(&iproto_thread->stream_pool);
+	if (stream == NULL) {
+		diag_set(OutOfMemory, sizeof(*stream),
+			 "mempool_alloc", "stream");
+		return NULL;
+	}
+	errinj_stream_count_add(1);
+	stailq_create(&stream->pending_requests);
+	stream->id = stream_id;
+	stream->connection = connection;
+	return stream;
+}
+
 /**
  * Return true if we have not enough spare messages
  * in the message pool.
@@ -576,6 +647,14 @@ iproto_msg_delete(struct iproto_msg *msg)
 	iproto_resume(iproto_thread);
 }
 
+static void
+iproto_stream_delete(struct iproto_stream *stream)
+{
+	assert(stailq_empty(&stream->pending_requests));
+	errinj_stream_count_add(-1);
+	mempool_free(&stream->connection->iproto_thread->stream_pool, stream);
+}
+
 static struct iproto_msg *
 iproto_msg_new(struct iproto_connection *con)
 {
@@ -594,6 +673,7 @@ iproto_msg_new(struct iproto_connection *con)
 	}
 	msg->close_connection = false;
 	msg->connection = con;
+	msg->stream = NULL;
 	rmean_collect(con->iproto_thread->rmean, IPROTO_REQUESTS, 1);
 	return msg;
 }
@@ -821,6 +901,63 @@ iproto_connection_input_buffer(struct iproto_connection *con)
 	return new_ibuf;
 }
 
+/**
+ * Check if message belongs to stream (stream_id != 0), and if it
+ * is so create new stream or get stream from connection streams
+ * hash table. Put message to stream pending messages list.
+ * @retval 0 - the message is ready to push to TX thread (either if
+ *             stream_id is not set (is zero) or the stream is not
+ *             processing other messages).
+ *         1 - the message is postponed because its stream is busy
+ *             processing previous message(s).
+ *        -1 - memory error.
+ */
+static int
+iproto_msg_set_stream(struct iproto_msg *msg)
+{
+	uint64_t stream_id = msg->header.stream_id;
+	if (stream_id == 0)
+		return 0;
+
+	struct iproto_connection *con = msg->connection;
+	struct iproto_stream *stream = NULL;
+	mh_int_t pos = mh_i64ptr_find(con->streams, stream_id, 0);
+	if (pos == mh_end(con->streams)) {
+		stream = iproto_stream_new(msg->connection, msg->header.stream_id);
+		if (stream == NULL)
+			return -1;
+		struct mh_i64ptr_node_t node;
+		node.key = stream_id;
+		node.val = stream;
+		pos = mh_i64ptr_put(con->streams, &node, NULL, NULL);
+		if (pos == mh_end(con->streams)) {
+			iproto_stream_delete(stream);
+			diag_set(OutOfMemory, pos + 1, "mh_streams_put",
+				 "mh_streams_node");
+			return -1;
+		}
+	}
+	/*
+	 * Not all messages belongs to stream. We can't determine which
+	 * messages belong to stream in `iproto_msg_new`, so we increment
+	 * ERRINJ_IPROTO_STREAM_MSG_COUNT here, when we already know it.
+	 * In `iproto_msg_delete` we decrement ERRINJ_IPROTO_STREAM_MSG_COUNT
+	 * only if msg->stream != NULL.
+	 */
+	errinj_stream_msg_count_add(1);
+	stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val;
+	msg->stream = stream;
+	/*
+	 * If the request queue in the stream is not empty, it means
+	 * that some previous message wasn't processed yet. Regardless
+	 * of this, we put the message in the queue, but we start processing
+	 * the message only if the message queue in the stream was empty.
+	 */
+	bool was_not_empty = !stailq_empty(&stream->pending_requests);
+	stailq_add_tail_entry(&stream->pending_requests, msg, in_stream);
+	return was_not_empty ? 1 : 0;
+}
+
 /**
  * Enqueue all requests which were read up. If a request limit is
  * reached - stop the connection input even if not the whole batch
@@ -830,7 +967,7 @@ iproto_connection_input_buffer(struct iproto_connection *con)
  * @param in Buffer to parse.
  *
  * @retval  0 Success.
- * @retval -1 Invalid MessagePack error.
+ * @retval -1 Invalid MessagePack or memory error.
  */
 static inline int
 iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
@@ -883,12 +1020,25 @@ err_msgpack:
 		msg->len = reqend - reqstart; /* total request length */
 
 		iproto_msg_decode(msg, &pos, reqend, &stop_input);
+
+		int rc = iproto_msg_set_stream(msg);
+		if (rc < 0) {
+			iproto_msg_delete(msg);
+			return -1;
+		}
 		/*
-		 * This can't throw, but should not be
-		 * done in case of exception.
+		 * rc > 0, means that stream pending requests queue is not
+		 * empty, skip push.
 		 */
-		cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
-		n_requests++;
+		if (rc == 0) {
+			/*
+			 * This can't throw, but should not be
+			 * done in case of exception.
+			 */
+			cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
+			n_requests++;
+		}
+
 		/* Request is parsed */
 		assert(reqend > reqstart);
 		assert(con->parse_size >= (size_t) (reqend - reqstart));
@@ -1130,6 +1280,13 @@ iproto_connection_new(struct iproto_thread *iproto_thread, int fd)
 		diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con");
 		return NULL;
 	}
+	con->streams = mh_i64ptr_new();
+	if (con->streams == NULL) {
+		diag_set(OutOfMemory, sizeof(*(con->streams)),
+			 "mh_streams_new", "streams");
+		mempool_free(&con->iproto_thread->iproto_connection_pool, con);
+		return NULL;
+	}
 	con->iproto_thread = iproto_thread;
 	con->input.data = con->output.data = con;
 	con->loop = loop();
@@ -1178,6 +1335,9 @@ iproto_connection_delete(struct iproto_connection *con)
 	       con->obuf[0].iov[0].iov_base == NULL);
 	assert(con->obuf[1].pos == 0 &&
 	       con->obuf[1].iov[0].iov_base == NULL);
+
+	assert(mh_size(con->streams) == 0);
+	mh_i64ptr_delete(con->streams);
 	mempool_free(&con->iproto_thread->iproto_connection_pool, con);
 }
 
@@ -1225,7 +1385,9 @@ static void
 iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		  bool *stop_input)
 {
+	uint64_t stream_id;
 	uint8_t type;
+	bool request_is_not_for_stream;
 	struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
 
 	if (xrow_header_decode(&msg->header, pos, reqend, true))
@@ -1233,6 +1395,16 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 	assert(*pos == reqend);
 
 	type = msg->header.type;
+	stream_id = msg->header.stream_id;
+	request_is_not_for_stream =
+		((type > IPROTO_TYPE_STAT_MAX &&
+		 type != IPROTO_PING) || type == IPROTO_AUTH);
+
+	if (stream_id != 0 && request_is_not_for_stream) {
+		diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM,
+			 (uint32_t) type);
+		goto error;
+	}
 
 	/*
 	 * Parse request before putting it into the queue
@@ -1326,6 +1498,7 @@ tx_fiber_init(struct session *session, uint64_t sync)
 	fiber_set_user(f, &session->credentials);
 }
 
+
 static void
 tx_process_disconnect(struct cmsg *m)
 {
@@ -1857,7 +2030,38 @@ net_send_msg(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
+	struct iproto_stream *stream = msg->stream;
+	struct iproto_msg *tmp;
+
+	if (stream == NULL)
+		goto send_msg;
+
+	tmp = stailq_shift_entry(&stream->pending_requests,
+				 struct iproto_msg, in_stream);
+	assert(tmp == msg);
+	(void)tmp;
+	errinj_stream_msg_count_add(-1);
+
+	if (stailq_empty(&stream->pending_requests)) {
+		struct mh_i64ptr_node_t node = { stream->id, NULL };
+		mh_i64ptr_remove(con->streams, &node, 0);
+		iproto_stream_delete(stream);
+	} else {
+		/*
+		 * If there are new messages for this stream
+		 * then schedule their processing.
+		 */
+		struct iproto_msg *next =
+			stailq_first_entry(&stream->pending_requests,
+					   struct iproto_msg,
+					   in_stream);
+		assert(next != NULL);
+		next->wpos = con->wpos;
+		cpipe_push_input(&con->iproto_thread->tx_pipe, &next->base);
+		cpipe_flush_input(&con->iproto_thread->tx_pipe);
+	}
 
+send_msg:
 	if (msg->len != 0) {
 		/* Discard request (see iproto_enqueue_batch()). */
 		msg->p_ibuf->rpos += msg->len;
@@ -2045,6 +2249,8 @@ net_cord_f(va_list  ap)
 		       sizeof(struct iproto_msg));
 	mempool_create(&iproto_thread->iproto_connection_pool, &cord()->slabc,
 		       sizeof(struct iproto_connection));
+	mempool_create(&iproto_thread->stream_pool, &cord()->slabc,
+		       sizeof(struct iproto_stream));
 
 	evio_service_init(loop(), &iproto_thread->binary, "binary",
 			  iproto_on_accept, iproto_thread);
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index 3fe4c7c22..01b3eddef 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -155,6 +155,8 @@ struct errinj {
 	_(ERRINJ_IPROTO_WRITE_ERROR_DELAY, ERRINJ_BOOL, {.bparam = false})\
 	_(ERRINJ_APPLIER_READ_TX_ROW_DELAY, ERRINJ_BOOL, {.bparam = false})\
 	_(ERRINJ_NETBOX_IO_DELAY, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_IPROTO_STREAM_COUNT, ERRINJ_INT, {.iparam = 0}) \
+	_(ERRINJ_IPROTO_STREAM_MSG_COUNT, ERRINJ_INT, {.iparam = 0}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/test/box/errinj.result b/test/box/errinj.result
index adb682ac3..8b45cdefc 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -59,6 +59,8 @@ evals
   - ERRINJ_INDEX_ALLOC: false
   - ERRINJ_INDEX_RESERVE: false
   - ERRINJ_IPROTO_SINGLE_THREAD_STAT: -1
+  - ERRINJ_IPROTO_STREAM_COUNT: 0
+  - ERRINJ_IPROTO_STREAM_MSG_COUNT: 0
   - ERRINJ_IPROTO_TX_DELAY: false
   - ERRINJ_IPROTO_WRITE_ERROR_DELAY: false
   - ERRINJ_LOG_ROTATE: false
diff --git a/test/box/error.result b/test/box/error.result
index 25def757e..7984f6bb3 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -444,6 +444,7 @@ t;
  |   223: box.error.INTERFERING_PROMOTE
  |   224: box.error.ELECTION_DISABLED
  |   225: box.error.TXN_ROLLBACK
+ |   226: box.error.UNABLE_TO_PROCESS_IN_STREAM
  | ...
 
 test_run:cmd("setopt delimiter ''");
-- 
2.20.1


  parent reply	other threads:[~2021-08-05 18:19 UTC|newest]

Thread overview: 20+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-08-05 18:17 [Tarantool-patches] [PATCH 0/7] implement iproto streams mechanik20051988 via Tarantool-patches
2021-08-05 18:17 ` [Tarantool-patches] [PATCH 1/7] iproto: implement stream id in binary iproto protocol mechanik20051988 via Tarantool-patches
2021-08-06  8:20   ` Vladimir Davydov via Tarantool-patches
2021-08-05 18:17 ` [Tarantool-patches] [PATCH 2/7] salad: fix segfault in case when mhash table allocation failure mechanik20051988 via Tarantool-patches
2021-08-06  8:33   ` Vladimir Davydov via Tarantool-patches
2021-08-05 18:17 ` [Tarantool-patches] [PATCH 3/7] txn: detach transaction from fiber mechanik20051988 via Tarantool-patches
2021-08-06  8:51   ` Vladimir Davydov via Tarantool-patches
2021-08-05 18:17 ` mechanik20051988 via Tarantool-patches [this message]
2021-08-06 10:30   ` [Tarantool-patches] [PATCH 4/7] iproto: implement streams in iproto Vladimir Davydov via Tarantool-patches
2021-08-05 18:17 ` [Tarantool-patches] [PATCH 5/7] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches
2021-08-06 12:03   ` Vladimir Davydov via Tarantool-patches
2021-08-05 18:17 ` [Tarantool-patches] [PATCH 6/7] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches
2021-08-06 12:59   ` Vladimir Davydov via Tarantool-patches
2021-08-09 10:39     ` Vladimir Davydov via Tarantool-patches
2021-08-09 10:40       ` [Tarantool-patches] [PATCH 1/2] xrow: remove unused call_request::header Vladimir Davydov via Tarantool-patches
2021-08-09 10:40         ` [Tarantool-patches] [PATCH 2/2] iproto: clear request::header for client requests Vladimir Davydov via Tarantool-patches
2021-08-09 11:27           ` Evgeny Mekhanik via Tarantool-patches
2021-08-09 11:26         ` [Tarantool-patches] [PATCH 1/2] xrow: remove unused call_request::header Evgeny Mekhanik via Tarantool-patches
2021-08-05 18:17 ` [Tarantool-patches] [PATCH 7/7] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches
2021-08-06 14:04   ` Vladimir Davydov via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=29a71e8ded595c28414e405a68c8927227fa28cb.1628184138.git.mechanik20.05.1988@gmail.com \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=mechanik20.05.1988@gmail.com \
    --cc=mechanik20051988@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --cc=vdavydov@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH 4/7] iproto: implement streams in iproto' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox