[Tarantool-patches] [PATCH 4/7] iproto: implement streams in iproto

Vladimir Davydov vdavydov at tarantool.org
Fri Aug 6 13:30:04 MSK 2021


On Thu, Aug 05, 2021 at 09:17:42PM +0300, mechanik20051988 wrote:
> From: mechanik20051988 <mechanik20.05.1988 at gmail.com>
> 
> Implement streams in iproto. There is a hash table of streams for
> each connection. When a new request comes with a non-zero stream ID,
> we look for the stream with such ID in this table and if it does not
> exist, we create it. The request is placed in the queue of pending
> requests, and if this queue was empty at the time of its receipt, it
> is pushed to the tx thread for processing. When a request belonging to
> stream returns to the network thread after processing is completed, we
> take the next request out of the queue of pending requests and send it
> for processing to tx thread. If there is no pending requests we remove
> stream object from hash table and destroy it. Requests with zero stream
> ID are processed in the old way.

Disclaimer: I think that request streaming is a property of the
scheduler, not the networking protocol. IMO streams should be
implemented in tx, as a part of the fiber pool. Still, we've agreed to
proceed with committing the existing implementation for the sake of our
release policy.

A few minor review comments are inline.

> diff --git a/src/box/errcode.h b/src/box/errcode.h
> index d2854677f..6c8c00256 100644
> --- a/src/box/errcode.h
> +++ b/src/box/errcode.h
> @@ -278,6 +278,7 @@ struct errcode_record {
>  	/*223 */_(ER_INTERFERING_PROMOTE,	"Instance with replica id %u was promoted first") \
>  	/*224 */_(ER_ELECTION_DISABLED,		"Elections were turned off")\
>  	/*225 */_(ER_TXN_ROLLBACK,		"Transaction was rolled back") \
> +	/*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process this type (%u) of requests in stream") \

You can use iproto_type_name() to report the request type name instead
of id in this error message, which is more user-friendly, e.g.

"Unable to process AUTH request in stream".

> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index 3ed641eea..3642cbd02 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -63,6 +63,8 @@
>  #include "execute.h"
>  #include "errinj.h"
>  #include "tt_static.h"
> +#include "salad/stailq.h"
> +#include "assoc.h"
>  
>  enum {
>  	IPROTO_SALT_SIZE = 32,
> @@ -73,6 +75,21 @@ enum {
>  	 ENDPOINT_NAME_MAX = 10
>  };
>  
> +struct iproto_connection;
> +
> +struct iproto_stream {
> +	/**
> +	 * Queue of pending requests (iproto messages) for this stream,
> +	 * processed sequentially. This field is accesable only from
> +	 * iproto thread. Queue items has iproto_msg type.
> +	 */
> +	struct stailq pending_requests;
> +	/** Id of this stream, used as a key in streams hash table */
> +	uint64_t id;
> +	/** This stream connection */
> +	struct iproto_connection *connection;
> +};
> +
>  /**
>   * A position in connection output buffer.
>   * Since we use rotating buffers to recycle memory,
> @@ -135,6 +152,7 @@ struct iproto_thread {
>  	 */
>  	struct mempool iproto_msg_pool;
>  	struct mempool iproto_connection_pool;
> +	struct mempool stream_pool;

s/stream_pool/iproto_stream_pool

for consistency with other pool names.

>  	/*
>  	 * List of stopped connections
>  	 */
> @@ -303,6 +321,16 @@ struct iproto_msg
>  	 * and the connection must be closed.
>  	 */
>  	bool close_connection;
> +	/**
> +	 * A stailq_entry to hold message in stream.
> +	 * All messages processed in stream sequently. Before processing
> +	 * all messages added to queue of pending requests. If this queue
> +	 * was empty message begins to be processed, otherwise it waits until
> +	 * all previous messages are processed.
> +	 */
> +	struct stailq_entry in_stream;
> +	/** Stream that owns this message, or NULL. */
> +	struct iproto_stream *stream;
>  };
>  
>  static struct iproto_msg *
> @@ -504,6 +532,11 @@ struct iproto_connection
>  	 */
>  	enum iproto_connection_state state;
>  	struct rlist in_stop_list;
> +	/**
> +	 * Hash table that holds all streams for this connection.
> +	 * This field is accesable only from iproto thread.
> +	 */
> +	struct mh_i64ptr_t *streams;
>  	/**
>  	 * Kharon is used to implement box.session.push().
>  	 * When a new push is ready, tx uses kharon to notify
> @@ -557,6 +590,44 @@ struct iproto_connection
>  	struct iproto_thread *iproto_thread;
>  };
>  
> +static inline void
> +errinj_stream_count_add(MAYBE_UNUSED int val)
> +{
> +#ifndef NDEBUG
> +	struct errinj *inj =
> +		errinj(ERRINJ_IPROTO_STREAM_COUNT, ERRINJ_INT);
> +	__atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST);
> +#endif
> +}
> +
> +static inline void
> +errinj_stream_msg_count_add(MAYBE_UNUSED int val)
> +{
> +#ifndef NDEBUG
> +	struct errinj *inj =
> +		errinj(ERRINJ_IPROTO_STREAM_MSG_COUNT, ERRINJ_INT);
> +	__atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST);
> +#endif
> +}

Please add a TODO in the code with a reference to the ticket you
created:

/*
 * TODO(gh-6293): ...
 */

> +
> +static struct iproto_stream *
> +iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
> +{
> +	struct iproto_thread *iproto_thread = connection->iproto_thread;
> +	struct iproto_stream *stream = (struct iproto_stream *)
> +		mempool_alloc(&iproto_thread->stream_pool);
> +	if (stream == NULL) {
> +		diag_set(OutOfMemory, sizeof(*stream),
> +			 "mempool_alloc", "stream");
> +		return NULL;
> +	}
> +	errinj_stream_count_add(1);
> +	stailq_create(&stream->pending_requests);
> +	stream->id = stream_id;
> +	stream->connection = connection;
> +	return stream;
> +}
> +
>  /**
>   * Return true if we have not enough spare messages
>   * in the message pool.
> @@ -576,6 +647,14 @@ iproto_msg_delete(struct iproto_msg *msg)
>  	iproto_resume(iproto_thread);
>  }
>  
> +static void
> +iproto_stream_delete(struct iproto_stream *stream)
> +{
> +	assert(stailq_empty(&stream->pending_requests));
> +	errinj_stream_count_add(-1);
> +	mempool_free(&stream->connection->iproto_thread->stream_pool, stream);
> +}
> +
>  static struct iproto_msg *
>  iproto_msg_new(struct iproto_connection *con)
>  {
> @@ -594,6 +673,7 @@ iproto_msg_new(struct iproto_connection *con)
>  	}
>  	msg->close_connection = false;
>  	msg->connection = con;
> +	msg->stream = NULL;
>  	rmean_collect(con->iproto_thread->rmean, IPROTO_REQUESTS, 1);
>  	return msg;
>  }
> @@ -821,6 +901,63 @@ iproto_connection_input_buffer(struct iproto_connection *con)
>  	return new_ibuf;
>  }
>  
> +/**
> + * Check if message belongs to stream (stream_id != 0), and if it
> + * is so create new stream or get stream from connection streams
> + * hash table. Put message to stream pending messages list.
> + * @retval 0 - the message is ready to push to TX thread (either if
> + *             stream_id is not set (is zero) or the stream is not
> + *             processing other messages).
> + *         1 - the message is postponed because its stream is busy
> + *             processing previous message(s).
> + *        -1 - memory error.
> + */
> +static int
> +iproto_msg_set_stream(struct iproto_msg *msg)
> +{
> +	uint64_t stream_id = msg->header.stream_id;
> +	if (stream_id == 0)
> +		return 0;
> +
> +	struct iproto_connection *con = msg->connection;
> +	struct iproto_stream *stream = NULL;
> +	mh_int_t pos = mh_i64ptr_find(con->streams, stream_id, 0);
> +	if (pos == mh_end(con->streams)) {
> +		stream = iproto_stream_new(msg->connection, msg->header.stream_id);
> +		if (stream == NULL)
> +			return -1;
> +		struct mh_i64ptr_node_t node;
> +		node.key = stream_id;
> +		node.val = stream;
> +		pos = mh_i64ptr_put(con->streams, &node, NULL, NULL);
> +		if (pos == mh_end(con->streams)) {
> +			iproto_stream_delete(stream);
> +			diag_set(OutOfMemory, pos + 1, "mh_streams_put",
> +				 "mh_streams_node");
> +			return -1;
> +		}
> +	}
> +	/*
> +	 * Not all messages belongs to stream. We can't determine which
> +	 * messages belong to stream in `iproto_msg_new`, so we increment
> +	 * ERRINJ_IPROTO_STREAM_MSG_COUNT here, when we already know it.
> +	 * In `iproto_msg_delete` we decrement ERRINJ_IPROTO_STREAM_MSG_COUNT
> +	 * only if msg->stream != NULL.
> +	 */
> +	errinj_stream_msg_count_add(1);
> +	stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val;
> +	msg->stream = stream;
> +	/*
> +	 * If the request queue in the stream is not empty, it means
> +	 * that some previous message wasn't processed yet. Regardless
> +	 * of this, we put the message in the queue, but we start processing
> +	 * the message only if the message queue in the stream was empty.
> +	 */
> +	bool was_not_empty = !stailq_empty(&stream->pending_requests);
> +	stailq_add_tail_entry(&stream->pending_requests, msg, in_stream);
> +	return was_not_empty ? 1 : 0;
> +}
> +
>  /**
>   * Enqueue all requests which were read up. If a request limit is
>   * reached - stop the connection input even if not the whole batch
> @@ -830,7 +967,7 @@ iproto_connection_input_buffer(struct iproto_connection *con)
>   * @param in Buffer to parse.
>   *
>   * @retval  0 Success.
> - * @retval -1 Invalid MessagePack error.
> + * @retval -1 Invalid MessagePack or memory error.
>   */
>  static inline int
>  iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
> @@ -883,12 +1020,25 @@ err_msgpack:
>  		msg->len = reqend - reqstart; /* total request length */
>  
>  		iproto_msg_decode(msg, &pos, reqend, &stop_input);
> +
> +		int rc = iproto_msg_set_stream(msg);
> +		if (rc < 0) {
> +			iproto_msg_delete(msg);
> +			return -1;
> +		}
>  		/*
> -		 * This can't throw, but should not be
> -		 * done in case of exception.
> +		 * rc > 0, means that stream pending requests queue is not
> +		 * empty, skip push.
>  		 */
> -		cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
> -		n_requests++;
> +		if (rc == 0) {
> +			/*
> +			 * This can't throw, but should not be
> +			 * done in case of exception.
> +			 */
> +			cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
> +			n_requests++;
> +		}
> +
>  		/* Request is parsed */
>  		assert(reqend > reqstart);
>  		assert(con->parse_size >= (size_t) (reqend - reqstart));
> @@ -1130,6 +1280,13 @@ iproto_connection_new(struct iproto_thread *iproto_thread, int fd)
>  		diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con");
>  		return NULL;
>  	}
> +	con->streams = mh_i64ptr_new();
> +	if (con->streams == NULL) {
> +		diag_set(OutOfMemory, sizeof(*(con->streams)),
> +			 "mh_streams_new", "streams");
> +		mempool_free(&con->iproto_thread->iproto_connection_pool, con);
> +		return NULL;
> +	}
>  	con->iproto_thread = iproto_thread;
>  	con->input.data = con->output.data = con;
>  	con->loop = loop();
> @@ -1178,6 +1335,9 @@ iproto_connection_delete(struct iproto_connection *con)
>  	       con->obuf[0].iov[0].iov_base == NULL);
>  	assert(con->obuf[1].pos == 0 &&
>  	       con->obuf[1].iov[0].iov_base == NULL);
> +
> +	assert(mh_size(con->streams) == 0);
> +	mh_i64ptr_delete(con->streams);
>  	mempool_free(&con->iproto_thread->iproto_connection_pool, con);
>  }
>  
> @@ -1225,7 +1385,9 @@ static void
>  iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>  		  bool *stop_input)
>  {
> +	uint64_t stream_id;
>  	uint8_t type;
> +	bool request_is_not_for_stream;
>  	struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
>  
>  	if (xrow_header_decode(&msg->header, pos, reqend, true))
> @@ -1233,6 +1395,16 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>  	assert(*pos == reqend);
>  
>  	type = msg->header.type;
> +	stream_id = msg->header.stream_id;
> +	request_is_not_for_stream =
> +		((type > IPROTO_TYPE_STAT_MAX &&
> +		 type != IPROTO_PING) || type == IPROTO_AUTH);
> +
> +	if (stream_id != 0 && request_is_not_for_stream) {
> +		diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM,
> +			 (uint32_t) type);
> +		goto error;
> +	}
>  
>  	/*
>  	 * Parse request before putting it into the queue
> @@ -1326,6 +1498,7 @@ tx_fiber_init(struct session *session, uint64_t sync)
>  	fiber_set_user(f, &session->credentials);
>  }
>  
> +

Extra new line. Please remove.

>  static void
>  tx_process_disconnect(struct cmsg *m)
>  {
> @@ -1857,7 +2030,38 @@ net_send_msg(struct cmsg *m)
>  {
>  	struct iproto_msg *msg = (struct iproto_msg *) m;
>  	struct iproto_connection *con = msg->connection;
> +	struct iproto_stream *stream = msg->stream;
> +	struct iproto_msg *tmp;
> +
> +	if (stream == NULL)
> +		goto send_msg;
> +
> +	tmp = stailq_shift_entry(&stream->pending_requests,
> +				 struct iproto_msg, in_stream);
> +	assert(tmp == msg);
> +	(void)tmp;
> +	errinj_stream_msg_count_add(-1);
> +
> +	if (stailq_empty(&stream->pending_requests)) {
> +		struct mh_i64ptr_node_t node = { stream->id, NULL };
> +		mh_i64ptr_remove(con->streams, &node, 0);
> +		iproto_stream_delete(stream);
> +	} else {
> +		/*
> +		 * If there are new messages for this stream
> +		 * then schedule their processing.
> +		 */
> +		struct iproto_msg *next =
> +			stailq_first_entry(&stream->pending_requests,
> +					   struct iproto_msg,
> +					   in_stream);
> +		assert(next != NULL);
> +		next->wpos = con->wpos;
> +		cpipe_push_input(&con->iproto_thread->tx_pipe, &next->base);
> +		cpipe_flush_input(&con->iproto_thread->tx_pipe);
> +	}

Would be nice to factor this out into a separate helper function.

>  
> +send_msg:
>  	if (msg->len != 0) {
>  		/* Discard request (see iproto_enqueue_batch()). */
>  		msg->p_ibuf->rpos += msg->len;
> @@ -2045,6 +2249,8 @@ net_cord_f(va_list  ap)
>  		       sizeof(struct iproto_msg));
>  	mempool_create(&iproto_thread->iproto_connection_pool, &cord()->slabc,
>  		       sizeof(struct iproto_connection));
> +	mempool_create(&iproto_thread->stream_pool, &cord()->slabc,
> +		       sizeof(struct iproto_stream));
>  
>  	evio_service_init(loop(), &iproto_thread->binary, "binary",
>  			  iproto_on_accept, iproto_thread);


More information about the Tarantool-patches mailing list