From: Vladimir Davydov via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: mechanik20051988 <mechanik20051988@tarantool.org> Cc: v.shpilevoy@tarantool.org, tarantool-patches@dev.tarantool.org, mechanik20051988 <mechanik20.05.1988@gmail.com> Subject: Re: [Tarantool-patches] [PATCH 4/7] iproto: implement streams in iproto Date: Fri, 6 Aug 2021 13:30:04 +0300 [thread overview] Message-ID: <20210806103004.rlddqk5neurwsv2r@esperanza> (raw) In-Reply-To: <29a71e8ded595c28414e405a68c8927227fa28cb.1628184138.git.mechanik20.05.1988@gmail.com> On Thu, Aug 05, 2021 at 09:17:42PM +0300, mechanik20051988 wrote: > From: mechanik20051988 <mechanik20.05.1988@gmail.com> > > Implement streams in iproto. There is a hash table of streams for > each connection. When a new request comes with a non-zero stream ID, > we look for the stream with such ID in this table and if it does not > exist, we create it. The request is placed in the queue of pending > requests, and if this queue was empty at the time of its receipt, it > is pushed to the tx thread for processing. When a request belonging to > stream returns to the network thread after processing is completed, we > take the next request out of the queue of pending requests and send it > for processing to tx thread. If there is no pending requests we remove > stream object from hash table and destroy it. Requests with zero stream > ID are processed in the old way. Disclaimer: I think that request streaming is a property of the scheduler, not the networking protocol. IMO streams should be implemented in tx, as a part of the fiber pool. Still, we've agreed to proceed with committing the existing implementation for the sake of our release policy. A few minor review comments are inline. > diff --git a/src/box/errcode.h b/src/box/errcode.h > index d2854677f..6c8c00256 100644 > --- a/src/box/errcode.h > +++ b/src/box/errcode.h > @@ -278,6 +278,7 @@ struct errcode_record { > /*223 */_(ER_INTERFERING_PROMOTE, "Instance with replica id %u was promoted first") \ > /*224 */_(ER_ELECTION_DISABLED, "Elections were turned off")\ > /*225 */_(ER_TXN_ROLLBACK, "Transaction was rolled back") \ > + /*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process this type (%u) of requests in stream") \ You can use iproto_type_name() to report the request type name instead of id in this error message, which is more user-friendly, e.g. "Unable to process AUTH request in stream". > diff --git a/src/box/iproto.cc b/src/box/iproto.cc > index 3ed641eea..3642cbd02 100644 > --- a/src/box/iproto.cc > +++ b/src/box/iproto.cc > @@ -63,6 +63,8 @@ > #include "execute.h" > #include "errinj.h" > #include "tt_static.h" > +#include "salad/stailq.h" > +#include "assoc.h" > > enum { > IPROTO_SALT_SIZE = 32, > @@ -73,6 +75,21 @@ enum { > ENDPOINT_NAME_MAX = 10 > }; > > +struct iproto_connection; > + > +struct iproto_stream { > + /** > + * Queue of pending requests (iproto messages) for this stream, > + * processed sequentially. This field is accesable only from > + * iproto thread. Queue items has iproto_msg type. > + */ > + struct stailq pending_requests; > + /** Id of this stream, used as a key in streams hash table */ > + uint64_t id; > + /** This stream connection */ > + struct iproto_connection *connection; > +}; > + > /** > * A position in connection output buffer. > * Since we use rotating buffers to recycle memory, > @@ -135,6 +152,7 @@ struct iproto_thread { > */ > struct mempool iproto_msg_pool; > struct mempool iproto_connection_pool; > + struct mempool stream_pool; s/stream_pool/iproto_stream_pool for consistency with other pool names. > /* > * List of stopped connections > */ > @@ -303,6 +321,16 @@ struct iproto_msg > * and the connection must be closed. > */ > bool close_connection; > + /** > + * A stailq_entry to hold message in stream. > + * All messages processed in stream sequently. Before processing > + * all messages added to queue of pending requests. If this queue > + * was empty message begins to be processed, otherwise it waits until > + * all previous messages are processed. > + */ > + struct stailq_entry in_stream; > + /** Stream that owns this message, or NULL. */ > + struct iproto_stream *stream; > }; > > static struct iproto_msg * > @@ -504,6 +532,11 @@ struct iproto_connection > */ > enum iproto_connection_state state; > struct rlist in_stop_list; > + /** > + * Hash table that holds all streams for this connection. > + * This field is accesable only from iproto thread. > + */ > + struct mh_i64ptr_t *streams; > /** > * Kharon is used to implement box.session.push(). > * When a new push is ready, tx uses kharon to notify > @@ -557,6 +590,44 @@ struct iproto_connection > struct iproto_thread *iproto_thread; > }; > > +static inline void > +errinj_stream_count_add(MAYBE_UNUSED int val) > +{ > +#ifndef NDEBUG > + struct errinj *inj = > + errinj(ERRINJ_IPROTO_STREAM_COUNT, ERRINJ_INT); > + __atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST); > +#endif > +} > + > +static inline void > +errinj_stream_msg_count_add(MAYBE_UNUSED int val) > +{ > +#ifndef NDEBUG > + struct errinj *inj = > + errinj(ERRINJ_IPROTO_STREAM_MSG_COUNT, ERRINJ_INT); > + __atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST); > +#endif > +} Please add a TODO in the code with a reference to the ticket you created: /* * TODO(gh-6293): ... */ > + > +static struct iproto_stream * > +iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id) > +{ > + struct iproto_thread *iproto_thread = connection->iproto_thread; > + struct iproto_stream *stream = (struct iproto_stream *) > + mempool_alloc(&iproto_thread->stream_pool); > + if (stream == NULL) { > + diag_set(OutOfMemory, sizeof(*stream), > + "mempool_alloc", "stream"); > + return NULL; > + } > + errinj_stream_count_add(1); > + stailq_create(&stream->pending_requests); > + stream->id = stream_id; > + stream->connection = connection; > + return stream; > +} > + > /** > * Return true if we have not enough spare messages > * in the message pool. > @@ -576,6 +647,14 @@ iproto_msg_delete(struct iproto_msg *msg) > iproto_resume(iproto_thread); > } > > +static void > +iproto_stream_delete(struct iproto_stream *stream) > +{ > + assert(stailq_empty(&stream->pending_requests)); > + errinj_stream_count_add(-1); > + mempool_free(&stream->connection->iproto_thread->stream_pool, stream); > +} > + > static struct iproto_msg * > iproto_msg_new(struct iproto_connection *con) > { > @@ -594,6 +673,7 @@ iproto_msg_new(struct iproto_connection *con) > } > msg->close_connection = false; > msg->connection = con; > + msg->stream = NULL; > rmean_collect(con->iproto_thread->rmean, IPROTO_REQUESTS, 1); > return msg; > } > @@ -821,6 +901,63 @@ iproto_connection_input_buffer(struct iproto_connection *con) > return new_ibuf; > } > > +/** > + * Check if message belongs to stream (stream_id != 0), and if it > + * is so create new stream or get stream from connection streams > + * hash table. Put message to stream pending messages list. > + * @retval 0 - the message is ready to push to TX thread (either if > + * stream_id is not set (is zero) or the stream is not > + * processing other messages). > + * 1 - the message is postponed because its stream is busy > + * processing previous message(s). > + * -1 - memory error. > + */ > +static int > +iproto_msg_set_stream(struct iproto_msg *msg) > +{ > + uint64_t stream_id = msg->header.stream_id; > + if (stream_id == 0) > + return 0; > + > + struct iproto_connection *con = msg->connection; > + struct iproto_stream *stream = NULL; > + mh_int_t pos = mh_i64ptr_find(con->streams, stream_id, 0); > + if (pos == mh_end(con->streams)) { > + stream = iproto_stream_new(msg->connection, msg->header.stream_id); > + if (stream == NULL) > + return -1; > + struct mh_i64ptr_node_t node; > + node.key = stream_id; > + node.val = stream; > + pos = mh_i64ptr_put(con->streams, &node, NULL, NULL); > + if (pos == mh_end(con->streams)) { > + iproto_stream_delete(stream); > + diag_set(OutOfMemory, pos + 1, "mh_streams_put", > + "mh_streams_node"); > + return -1; > + } > + } > + /* > + * Not all messages belongs to stream. We can't determine which > + * messages belong to stream in `iproto_msg_new`, so we increment > + * ERRINJ_IPROTO_STREAM_MSG_COUNT here, when we already know it. > + * In `iproto_msg_delete` we decrement ERRINJ_IPROTO_STREAM_MSG_COUNT > + * only if msg->stream != NULL. > + */ > + errinj_stream_msg_count_add(1); > + stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val; > + msg->stream = stream; > + /* > + * If the request queue in the stream is not empty, it means > + * that some previous message wasn't processed yet. Regardless > + * of this, we put the message in the queue, but we start processing > + * the message only if the message queue in the stream was empty. > + */ > + bool was_not_empty = !stailq_empty(&stream->pending_requests); > + stailq_add_tail_entry(&stream->pending_requests, msg, in_stream); > + return was_not_empty ? 1 : 0; > +} > + > /** > * Enqueue all requests which were read up. If a request limit is > * reached - stop the connection input even if not the whole batch > @@ -830,7 +967,7 @@ iproto_connection_input_buffer(struct iproto_connection *con) > * @param in Buffer to parse. > * > * @retval 0 Success. > - * @retval -1 Invalid MessagePack error. > + * @retval -1 Invalid MessagePack or memory error. > */ > static inline int > iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) > @@ -883,12 +1020,25 @@ err_msgpack: > msg->len = reqend - reqstart; /* total request length */ > > iproto_msg_decode(msg, &pos, reqend, &stop_input); > + > + int rc = iproto_msg_set_stream(msg); > + if (rc < 0) { > + iproto_msg_delete(msg); > + return -1; > + } > /* > - * This can't throw, but should not be > - * done in case of exception. > + * rc > 0, means that stream pending requests queue is not > + * empty, skip push. > */ > - cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base); > - n_requests++; > + if (rc == 0) { > + /* > + * This can't throw, but should not be > + * done in case of exception. > + */ > + cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base); > + n_requests++; > + } > + > /* Request is parsed */ > assert(reqend > reqstart); > assert(con->parse_size >= (size_t) (reqend - reqstart)); > @@ -1130,6 +1280,13 @@ iproto_connection_new(struct iproto_thread *iproto_thread, int fd) > diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con"); > return NULL; > } > + con->streams = mh_i64ptr_new(); > + if (con->streams == NULL) { > + diag_set(OutOfMemory, sizeof(*(con->streams)), > + "mh_streams_new", "streams"); > + mempool_free(&con->iproto_thread->iproto_connection_pool, con); > + return NULL; > + } > con->iproto_thread = iproto_thread; > con->input.data = con->output.data = con; > con->loop = loop(); > @@ -1178,6 +1335,9 @@ iproto_connection_delete(struct iproto_connection *con) > con->obuf[0].iov[0].iov_base == NULL); > assert(con->obuf[1].pos == 0 && > con->obuf[1].iov[0].iov_base == NULL); > + > + assert(mh_size(con->streams) == 0); > + mh_i64ptr_delete(con->streams); > mempool_free(&con->iproto_thread->iproto_connection_pool, con); > } > > @@ -1225,7 +1385,9 @@ static void > iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, > bool *stop_input) > { > + uint64_t stream_id; > uint8_t type; > + bool request_is_not_for_stream; > struct iproto_thread *iproto_thread = msg->connection->iproto_thread; > > if (xrow_header_decode(&msg->header, pos, reqend, true)) > @@ -1233,6 +1395,16 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, > assert(*pos == reqend); > > type = msg->header.type; > + stream_id = msg->header.stream_id; > + request_is_not_for_stream = > + ((type > IPROTO_TYPE_STAT_MAX && > + type != IPROTO_PING) || type == IPROTO_AUTH); > + > + if (stream_id != 0 && request_is_not_for_stream) { > + diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM, > + (uint32_t) type); > + goto error; > + } > > /* > * Parse request before putting it into the queue > @@ -1326,6 +1498,7 @@ tx_fiber_init(struct session *session, uint64_t sync) > fiber_set_user(f, &session->credentials); > } > > + Extra new line. Please remove. > static void > tx_process_disconnect(struct cmsg *m) > { > @@ -1857,7 +2030,38 @@ net_send_msg(struct cmsg *m) > { > struct iproto_msg *msg = (struct iproto_msg *) m; > struct iproto_connection *con = msg->connection; > + struct iproto_stream *stream = msg->stream; > + struct iproto_msg *tmp; > + > + if (stream == NULL) > + goto send_msg; > + > + tmp = stailq_shift_entry(&stream->pending_requests, > + struct iproto_msg, in_stream); > + assert(tmp == msg); > + (void)tmp; > + errinj_stream_msg_count_add(-1); > + > + if (stailq_empty(&stream->pending_requests)) { > + struct mh_i64ptr_node_t node = { stream->id, NULL }; > + mh_i64ptr_remove(con->streams, &node, 0); > + iproto_stream_delete(stream); > + } else { > + /* > + * If there are new messages for this stream > + * then schedule their processing. > + */ > + struct iproto_msg *next = > + stailq_first_entry(&stream->pending_requests, > + struct iproto_msg, > + in_stream); > + assert(next != NULL); > + next->wpos = con->wpos; > + cpipe_push_input(&con->iproto_thread->tx_pipe, &next->base); > + cpipe_flush_input(&con->iproto_thread->tx_pipe); > + } Would be nice to factor this out into a separate helper function. > > +send_msg: > if (msg->len != 0) { > /* Discard request (see iproto_enqueue_batch()). */ > msg->p_ibuf->rpos += msg->len; > @@ -2045,6 +2249,8 @@ net_cord_f(va_list ap) > sizeof(struct iproto_msg)); > mempool_create(&iproto_thread->iproto_connection_pool, &cord()->slabc, > sizeof(struct iproto_connection)); > + mempool_create(&iproto_thread->stream_pool, &cord()->slabc, > + sizeof(struct iproto_stream)); > > evio_service_init(loop(), &iproto_thread->binary, "binary", > iproto_on_accept, iproto_thread);
next prev parent reply other threads:[~2021-08-06 10:30 UTC|newest] Thread overview: 20+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-08-05 18:17 [Tarantool-patches] [PATCH 0/7] implement iproto streams mechanik20051988 via Tarantool-patches 2021-08-05 18:17 ` [Tarantool-patches] [PATCH 1/7] iproto: implement stream id in binary iproto protocol mechanik20051988 via Tarantool-patches 2021-08-06 8:20 ` Vladimir Davydov via Tarantool-patches 2021-08-05 18:17 ` [Tarantool-patches] [PATCH 2/7] salad: fix segfault in case when mhash table allocation failure mechanik20051988 via Tarantool-patches 2021-08-06 8:33 ` Vladimir Davydov via Tarantool-patches 2021-08-05 18:17 ` [Tarantool-patches] [PATCH 3/7] txn: detach transaction from fiber mechanik20051988 via Tarantool-patches 2021-08-06 8:51 ` Vladimir Davydov via Tarantool-patches 2021-08-05 18:17 ` [Tarantool-patches] [PATCH 4/7] iproto: implement streams in iproto mechanik20051988 via Tarantool-patches 2021-08-06 10:30 ` Vladimir Davydov via Tarantool-patches [this message] 2021-08-05 18:17 ` [Tarantool-patches] [PATCH 5/7] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches 2021-08-06 12:03 ` Vladimir Davydov via Tarantool-patches 2021-08-05 18:17 ` [Tarantool-patches] [PATCH 6/7] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches 2021-08-06 12:59 ` Vladimir Davydov via Tarantool-patches 2021-08-09 10:39 ` Vladimir Davydov via Tarantool-patches 2021-08-09 10:40 ` [Tarantool-patches] [PATCH 1/2] xrow: remove unused call_request::header Vladimir Davydov via Tarantool-patches 2021-08-09 10:40 ` [Tarantool-patches] [PATCH 2/2] iproto: clear request::header for client requests Vladimir Davydov via Tarantool-patches 2021-08-09 11:27 ` Evgeny Mekhanik via Tarantool-patches 2021-08-09 11:26 ` [Tarantool-patches] [PATCH 1/2] xrow: remove unused call_request::header Evgeny Mekhanik via Tarantool-patches 2021-08-05 18:17 ` [Tarantool-patches] [PATCH 7/7] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches 2021-08-06 14:04 ` Vladimir Davydov via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20210806103004.rlddqk5neurwsv2r@esperanza \ --to=tarantool-patches@dev.tarantool.org \ --cc=mechanik20.05.1988@gmail.com \ --cc=mechanik20051988@tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --cc=vdavydov@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH 4/7] iproto: implement streams in iproto' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox