[Tarantool-patches] [PATCH 4/7] iproto: implement streams in iproto
Vladimir Davydov
vdavydov at tarantool.org
Fri Aug 6 13:30:04 MSK 2021
On Thu, Aug 05, 2021 at 09:17:42PM +0300, mechanik20051988 wrote:
> From: mechanik20051988 <mechanik20.05.1988 at gmail.com>
>
> Implement streams in iproto. There is a hash table of streams for
> each connection. When a new request comes with a non-zero stream ID,
> we look for the stream with such ID in this table and if it does not
> exist, we create it. The request is placed in the queue of pending
> requests, and if this queue was empty at the time of its receipt, it
> is pushed to the tx thread for processing. When a request belonging to
> stream returns to the network thread after processing is completed, we
> take the next request out of the queue of pending requests and send it
> for processing to tx thread. If there is no pending requests we remove
> stream object from hash table and destroy it. Requests with zero stream
> ID are processed in the old way.
Disclaimer: I think that request streaming is a property of the
scheduler, not the networking protocol. IMO streams should be
implemented in tx, as a part of the fiber pool. Still, we've agreed to
proceed with committing the existing implementation for the sake of our
release policy.
A few minor review comments are inline.
> diff --git a/src/box/errcode.h b/src/box/errcode.h
> index d2854677f..6c8c00256 100644
> --- a/src/box/errcode.h
> +++ b/src/box/errcode.h
> @@ -278,6 +278,7 @@ struct errcode_record {
> /*223 */_(ER_INTERFERING_PROMOTE, "Instance with replica id %u was promoted first") \
> /*224 */_(ER_ELECTION_DISABLED, "Elections were turned off")\
> /*225 */_(ER_TXN_ROLLBACK, "Transaction was rolled back") \
> + /*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process this type (%u) of requests in stream") \
You can use iproto_type_name() to report the request type name instead
of id in this error message, which is more user-friendly, e.g.
"Unable to process AUTH request in stream".
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index 3ed641eea..3642cbd02 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -63,6 +63,8 @@
> #include "execute.h"
> #include "errinj.h"
> #include "tt_static.h"
> +#include "salad/stailq.h"
> +#include "assoc.h"
>
> enum {
> IPROTO_SALT_SIZE = 32,
> @@ -73,6 +75,21 @@ enum {
> ENDPOINT_NAME_MAX = 10
> };
>
> +struct iproto_connection;
> +
> +struct iproto_stream {
> + /**
> + * Queue of pending requests (iproto messages) for this stream,
> + * processed sequentially. This field is accesable only from
> + * iproto thread. Queue items has iproto_msg type.
> + */
> + struct stailq pending_requests;
> + /** Id of this stream, used as a key in streams hash table */
> + uint64_t id;
> + /** This stream connection */
> + struct iproto_connection *connection;
> +};
> +
> /**
> * A position in connection output buffer.
> * Since we use rotating buffers to recycle memory,
> @@ -135,6 +152,7 @@ struct iproto_thread {
> */
> struct mempool iproto_msg_pool;
> struct mempool iproto_connection_pool;
> + struct mempool stream_pool;
s/stream_pool/iproto_stream_pool
for consistency with other pool names.
> /*
> * List of stopped connections
> */
> @@ -303,6 +321,16 @@ struct iproto_msg
> * and the connection must be closed.
> */
> bool close_connection;
> + /**
> + * A stailq_entry to hold message in stream.
> + * All messages processed in stream sequently. Before processing
> + * all messages added to queue of pending requests. If this queue
> + * was empty message begins to be processed, otherwise it waits until
> + * all previous messages are processed.
> + */
> + struct stailq_entry in_stream;
> + /** Stream that owns this message, or NULL. */
> + struct iproto_stream *stream;
> };
>
> static struct iproto_msg *
> @@ -504,6 +532,11 @@ struct iproto_connection
> */
> enum iproto_connection_state state;
> struct rlist in_stop_list;
> + /**
> + * Hash table that holds all streams for this connection.
> + * This field is accesable only from iproto thread.
> + */
> + struct mh_i64ptr_t *streams;
> /**
> * Kharon is used to implement box.session.push().
> * When a new push is ready, tx uses kharon to notify
> @@ -557,6 +590,44 @@ struct iproto_connection
> struct iproto_thread *iproto_thread;
> };
>
> +static inline void
> +errinj_stream_count_add(MAYBE_UNUSED int val)
> +{
> +#ifndef NDEBUG
> + struct errinj *inj =
> + errinj(ERRINJ_IPROTO_STREAM_COUNT, ERRINJ_INT);
> + __atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST);
> +#endif
> +}
> +
> +static inline void
> +errinj_stream_msg_count_add(MAYBE_UNUSED int val)
> +{
> +#ifndef NDEBUG
> + struct errinj *inj =
> + errinj(ERRINJ_IPROTO_STREAM_MSG_COUNT, ERRINJ_INT);
> + __atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST);
> +#endif
> +}
Please add a TODO in the code with a reference to the ticket you
created:
/*
* TODO(gh-6293): ...
*/
> +
> +static struct iproto_stream *
> +iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
> +{
> + struct iproto_thread *iproto_thread = connection->iproto_thread;
> + struct iproto_stream *stream = (struct iproto_stream *)
> + mempool_alloc(&iproto_thread->stream_pool);
> + if (stream == NULL) {
> + diag_set(OutOfMemory, sizeof(*stream),
> + "mempool_alloc", "stream");
> + return NULL;
> + }
> + errinj_stream_count_add(1);
> + stailq_create(&stream->pending_requests);
> + stream->id = stream_id;
> + stream->connection = connection;
> + return stream;
> +}
> +
> /**
> * Return true if we have not enough spare messages
> * in the message pool.
> @@ -576,6 +647,14 @@ iproto_msg_delete(struct iproto_msg *msg)
> iproto_resume(iproto_thread);
> }
>
> +static void
> +iproto_stream_delete(struct iproto_stream *stream)
> +{
> + assert(stailq_empty(&stream->pending_requests));
> + errinj_stream_count_add(-1);
> + mempool_free(&stream->connection->iproto_thread->stream_pool, stream);
> +}
> +
> static struct iproto_msg *
> iproto_msg_new(struct iproto_connection *con)
> {
> @@ -594,6 +673,7 @@ iproto_msg_new(struct iproto_connection *con)
> }
> msg->close_connection = false;
> msg->connection = con;
> + msg->stream = NULL;
> rmean_collect(con->iproto_thread->rmean, IPROTO_REQUESTS, 1);
> return msg;
> }
> @@ -821,6 +901,63 @@ iproto_connection_input_buffer(struct iproto_connection *con)
> return new_ibuf;
> }
>
> +/**
> + * Check if message belongs to stream (stream_id != 0), and if it
> + * is so create new stream or get stream from connection streams
> + * hash table. Put message to stream pending messages list.
> + * @retval 0 - the message is ready to push to TX thread (either if
> + * stream_id is not set (is zero) or the stream is not
> + * processing other messages).
> + * 1 - the message is postponed because its stream is busy
> + * processing previous message(s).
> + * -1 - memory error.
> + */
> +static int
> +iproto_msg_set_stream(struct iproto_msg *msg)
> +{
> + uint64_t stream_id = msg->header.stream_id;
> + if (stream_id == 0)
> + return 0;
> +
> + struct iproto_connection *con = msg->connection;
> + struct iproto_stream *stream = NULL;
> + mh_int_t pos = mh_i64ptr_find(con->streams, stream_id, 0);
> + if (pos == mh_end(con->streams)) {
> + stream = iproto_stream_new(msg->connection, msg->header.stream_id);
> + if (stream == NULL)
> + return -1;
> + struct mh_i64ptr_node_t node;
> + node.key = stream_id;
> + node.val = stream;
> + pos = mh_i64ptr_put(con->streams, &node, NULL, NULL);
> + if (pos == mh_end(con->streams)) {
> + iproto_stream_delete(stream);
> + diag_set(OutOfMemory, pos + 1, "mh_streams_put",
> + "mh_streams_node");
> + return -1;
> + }
> + }
> + /*
> + * Not all messages belongs to stream. We can't determine which
> + * messages belong to stream in `iproto_msg_new`, so we increment
> + * ERRINJ_IPROTO_STREAM_MSG_COUNT here, when we already know it.
> + * In `iproto_msg_delete` we decrement ERRINJ_IPROTO_STREAM_MSG_COUNT
> + * only if msg->stream != NULL.
> + */
> + errinj_stream_msg_count_add(1);
> + stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val;
> + msg->stream = stream;
> + /*
> + * If the request queue in the stream is not empty, it means
> + * that some previous message wasn't processed yet. Regardless
> + * of this, we put the message in the queue, but we start processing
> + * the message only if the message queue in the stream was empty.
> + */
> + bool was_not_empty = !stailq_empty(&stream->pending_requests);
> + stailq_add_tail_entry(&stream->pending_requests, msg, in_stream);
> + return was_not_empty ? 1 : 0;
> +}
> +
> /**
> * Enqueue all requests which were read up. If a request limit is
> * reached - stop the connection input even if not the whole batch
> @@ -830,7 +967,7 @@ iproto_connection_input_buffer(struct iproto_connection *con)
> * @param in Buffer to parse.
> *
> * @retval 0 Success.
> - * @retval -1 Invalid MessagePack error.
> + * @retval -1 Invalid MessagePack or memory error.
> */
> static inline int
> iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
> @@ -883,12 +1020,25 @@ err_msgpack:
> msg->len = reqend - reqstart; /* total request length */
>
> iproto_msg_decode(msg, &pos, reqend, &stop_input);
> +
> + int rc = iproto_msg_set_stream(msg);
> + if (rc < 0) {
> + iproto_msg_delete(msg);
> + return -1;
> + }
> /*
> - * This can't throw, but should not be
> - * done in case of exception.
> + * rc > 0, means that stream pending requests queue is not
> + * empty, skip push.
> */
> - cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
> - n_requests++;
> + if (rc == 0) {
> + /*
> + * This can't throw, but should not be
> + * done in case of exception.
> + */
> + cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
> + n_requests++;
> + }
> +
> /* Request is parsed */
> assert(reqend > reqstart);
> assert(con->parse_size >= (size_t) (reqend - reqstart));
> @@ -1130,6 +1280,13 @@ iproto_connection_new(struct iproto_thread *iproto_thread, int fd)
> diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con");
> return NULL;
> }
> + con->streams = mh_i64ptr_new();
> + if (con->streams == NULL) {
> + diag_set(OutOfMemory, sizeof(*(con->streams)),
> + "mh_streams_new", "streams");
> + mempool_free(&con->iproto_thread->iproto_connection_pool, con);
> + return NULL;
> + }
> con->iproto_thread = iproto_thread;
> con->input.data = con->output.data = con;
> con->loop = loop();
> @@ -1178,6 +1335,9 @@ iproto_connection_delete(struct iproto_connection *con)
> con->obuf[0].iov[0].iov_base == NULL);
> assert(con->obuf[1].pos == 0 &&
> con->obuf[1].iov[0].iov_base == NULL);
> +
> + assert(mh_size(con->streams) == 0);
> + mh_i64ptr_delete(con->streams);
> mempool_free(&con->iproto_thread->iproto_connection_pool, con);
> }
>
> @@ -1225,7 +1385,9 @@ static void
> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
> bool *stop_input)
> {
> + uint64_t stream_id;
> uint8_t type;
> + bool request_is_not_for_stream;
> struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
>
> if (xrow_header_decode(&msg->header, pos, reqend, true))
> @@ -1233,6 +1395,16 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
> assert(*pos == reqend);
>
> type = msg->header.type;
> + stream_id = msg->header.stream_id;
> + request_is_not_for_stream =
> + ((type > IPROTO_TYPE_STAT_MAX &&
> + type != IPROTO_PING) || type == IPROTO_AUTH);
> +
> + if (stream_id != 0 && request_is_not_for_stream) {
> + diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM,
> + (uint32_t) type);
> + goto error;
> + }
>
> /*
> * Parse request before putting it into the queue
> @@ -1326,6 +1498,7 @@ tx_fiber_init(struct session *session, uint64_t sync)
> fiber_set_user(f, &session->credentials);
> }
>
> +
Extra new line. Please remove.
> static void
> tx_process_disconnect(struct cmsg *m)
> {
> @@ -1857,7 +2030,38 @@ net_send_msg(struct cmsg *m)
> {
> struct iproto_msg *msg = (struct iproto_msg *) m;
> struct iproto_connection *con = msg->connection;
> + struct iproto_stream *stream = msg->stream;
> + struct iproto_msg *tmp;
> +
> + if (stream == NULL)
> + goto send_msg;
> +
> + tmp = stailq_shift_entry(&stream->pending_requests,
> + struct iproto_msg, in_stream);
> + assert(tmp == msg);
> + (void)tmp;
> + errinj_stream_msg_count_add(-1);
> +
> + if (stailq_empty(&stream->pending_requests)) {
> + struct mh_i64ptr_node_t node = { stream->id, NULL };
> + mh_i64ptr_remove(con->streams, &node, 0);
> + iproto_stream_delete(stream);
> + } else {
> + /*
> + * If there are new messages for this stream
> + * then schedule their processing.
> + */
> + struct iproto_msg *next =
> + stailq_first_entry(&stream->pending_requests,
> + struct iproto_msg,
> + in_stream);
> + assert(next != NULL);
> + next->wpos = con->wpos;
> + cpipe_push_input(&con->iproto_thread->tx_pipe, &next->base);
> + cpipe_flush_input(&con->iproto_thread->tx_pipe);
> + }
Would be nice to factor this out into a separate helper function.
>
> +send_msg:
> if (msg->len != 0) {
> /* Discard request (see iproto_enqueue_batch()). */
> msg->p_ibuf->rpos += msg->len;
> @@ -2045,6 +2249,8 @@ net_cord_f(va_list ap)
> sizeof(struct iproto_msg));
> mempool_create(&iproto_thread->iproto_connection_pool, &cord()->slabc,
> sizeof(struct iproto_connection));
> + mempool_create(&iproto_thread->stream_pool, &cord()->slabc,
> + sizeof(struct iproto_stream));
>
> evio_service_init(loop(), &iproto_thread->binary, "binary",
> iproto_on_accept, iproto_thread);
More information about the Tarantool-patches
mailing list