From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 6AE18741FF; Fri, 6 Aug 2021 13:30:11 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 6AE18741FF DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1628245811; bh=xLTR8uDtzVZ0/x+lvR/dNZz0B9QNoTyISFZKuB9yc2s=; h=Date:To:Cc:References:In-Reply-To:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=tB4GYv/5qDN22oo8hFHDNFKEtYhSiuErdciqRdIxGtG9tebNFzND2SO3UpzhtQNY9 hnWJsBtMdh989zBSyuW8CcDQHPOdXz56x5hJIH7zGBUecqb/8oLGpTqLQah3CjsG7n oFPJ+k7XzgxsDHIIzEwMBtgjHovK4+NS9jWlrisw= Received: from smtpng1.i.mail.ru (smtpng1.i.mail.ru [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id C02B9741FD for ; Fri, 6 Aug 2021 13:30:09 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org C02B9741FD Received: by smtpng1.m.smailru.net with esmtpa (envelope-from ) id 1mBx7A-0001ka-IX; Fri, 06 Aug 2021 13:30:09 +0300 Date: Fri, 6 Aug 2021 13:30:04 +0300 To: mechanik20051988 Cc: v.shpilevoy@tarantool.org, tarantool-patches@dev.tarantool.org, mechanik20051988 Message-ID: <20210806103004.rlddqk5neurwsv2r@esperanza> References: <29a71e8ded595c28414e405a68c8927227fa28cb.1628184138.git.mechanik20.05.1988@gmail.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <29a71e8ded595c28414e405a68c8927227fa28cb.1628184138.git.mechanik20.05.1988@gmail.com> X-4EC0790: 10 X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD92087353F0EC44DD9BCE6B93DE0C6C3914462CDB1732D383C182A05F53808504056D5745ED556D40E0E20410C374A82C66B7A2C0C6486B24709810EA94435AD57 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE70043D879A87EF1BCEA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006378997215BCAA11D778638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D86FCD7C25C3859B501522E9500EABE91C117882F4460429724CE54428C33FAD305F5C1EE8F4F765FC2EE5AD8F952D28FBA471835C12D1D9774AD6D5ED66289B52BA9C0B312567BB23117882F446042972877693876707352026055571C92BF10FF04B652EEC242312D2E47CDBA5A96583BA9C0B312567BB231DD303D21008E29813377AFFFEAFD269A417C69337E82CC2E827F84554CEF50127C277FBC8AE2E8BA83251EDC214901ED5E8D9A59859A8B6A45692FFBBD75A6A089D37D7C0E48F6C5571747095F342E88FB05168BE4CE3AF X-C1DE0DAB: 0D63561A33F958A50B3CA20D7E29436B766368974AD1875FB989716E130245B1D59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA7501A9DF589746230F410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D3419891600CCEF4607379A15F218D2212DD0EC268F5778AA3CBB4DE83BD9CB4CC7E9353D5094FDC9E91D7E09C32AA3244C553586A661E1BA4F01A0288F02D1F76D30452B15D76AEC1483B48618A63566E0 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojFhlvmGwdUwQpP9knl26IdQ== X-Mailru-Sender: 689FA8AB762F7393C37E3C1AEC41BA5D4D9E3DCF7774A1E9CCCC78F8A1F81EC9274CEFED1673C562683ABF942079399BFB559BB5D741EB966A65DFF43FF7BE03240331F90058701C67EA787935ED9F1B X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH 4/7] iproto: implement streams in iproto X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Vladimir Davydov via Tarantool-patches Reply-To: Vladimir Davydov Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" On Thu, Aug 05, 2021 at 09:17:42PM +0300, mechanik20051988 wrote: > From: mechanik20051988 > > Implement streams in iproto. There is a hash table of streams for > each connection. When a new request comes with a non-zero stream ID, > we look for the stream with such ID in this table and if it does not > exist, we create it. The request is placed in the queue of pending > requests, and if this queue was empty at the time of its receipt, it > is pushed to the tx thread for processing. When a request belonging to > stream returns to the network thread after processing is completed, we > take the next request out of the queue of pending requests and send it > for processing to tx thread. If there is no pending requests we remove > stream object from hash table and destroy it. Requests with zero stream > ID are processed in the old way. Disclaimer: I think that request streaming is a property of the scheduler, not the networking protocol. IMO streams should be implemented in tx, as a part of the fiber pool. Still, we've agreed to proceed with committing the existing implementation for the sake of our release policy. A few minor review comments are inline. > diff --git a/src/box/errcode.h b/src/box/errcode.h > index d2854677f..6c8c00256 100644 > --- a/src/box/errcode.h > +++ b/src/box/errcode.h > @@ -278,6 +278,7 @@ struct errcode_record { > /*223 */_(ER_INTERFERING_PROMOTE, "Instance with replica id %u was promoted first") \ > /*224 */_(ER_ELECTION_DISABLED, "Elections were turned off")\ > /*225 */_(ER_TXN_ROLLBACK, "Transaction was rolled back") \ > + /*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process this type (%u) of requests in stream") \ You can use iproto_type_name() to report the request type name instead of id in this error message, which is more user-friendly, e.g. "Unable to process AUTH request in stream". > diff --git a/src/box/iproto.cc b/src/box/iproto.cc > index 3ed641eea..3642cbd02 100644 > --- a/src/box/iproto.cc > +++ b/src/box/iproto.cc > @@ -63,6 +63,8 @@ > #include "execute.h" > #include "errinj.h" > #include "tt_static.h" > +#include "salad/stailq.h" > +#include "assoc.h" > > enum { > IPROTO_SALT_SIZE = 32, > @@ -73,6 +75,21 @@ enum { > ENDPOINT_NAME_MAX = 10 > }; > > +struct iproto_connection; > + > +struct iproto_stream { > + /** > + * Queue of pending requests (iproto messages) for this stream, > + * processed sequentially. This field is accesable only from > + * iproto thread. Queue items has iproto_msg type. > + */ > + struct stailq pending_requests; > + /** Id of this stream, used as a key in streams hash table */ > + uint64_t id; > + /** This stream connection */ > + struct iproto_connection *connection; > +}; > + > /** > * A position in connection output buffer. > * Since we use rotating buffers to recycle memory, > @@ -135,6 +152,7 @@ struct iproto_thread { > */ > struct mempool iproto_msg_pool; > struct mempool iproto_connection_pool; > + struct mempool stream_pool; s/stream_pool/iproto_stream_pool for consistency with other pool names. > /* > * List of stopped connections > */ > @@ -303,6 +321,16 @@ struct iproto_msg > * and the connection must be closed. > */ > bool close_connection; > + /** > + * A stailq_entry to hold message in stream. > + * All messages processed in stream sequently. Before processing > + * all messages added to queue of pending requests. If this queue > + * was empty message begins to be processed, otherwise it waits until > + * all previous messages are processed. > + */ > + struct stailq_entry in_stream; > + /** Stream that owns this message, or NULL. */ > + struct iproto_stream *stream; > }; > > static struct iproto_msg * > @@ -504,6 +532,11 @@ struct iproto_connection > */ > enum iproto_connection_state state; > struct rlist in_stop_list; > + /** > + * Hash table that holds all streams for this connection. > + * This field is accesable only from iproto thread. > + */ > + struct mh_i64ptr_t *streams; > /** > * Kharon is used to implement box.session.push(). > * When a new push is ready, tx uses kharon to notify > @@ -557,6 +590,44 @@ struct iproto_connection > struct iproto_thread *iproto_thread; > }; > > +static inline void > +errinj_stream_count_add(MAYBE_UNUSED int val) > +{ > +#ifndef NDEBUG > + struct errinj *inj = > + errinj(ERRINJ_IPROTO_STREAM_COUNT, ERRINJ_INT); > + __atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST); > +#endif > +} > + > +static inline void > +errinj_stream_msg_count_add(MAYBE_UNUSED int val) > +{ > +#ifndef NDEBUG > + struct errinj *inj = > + errinj(ERRINJ_IPROTO_STREAM_MSG_COUNT, ERRINJ_INT); > + __atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST); > +#endif > +} Please add a TODO in the code with a reference to the ticket you created: /* * TODO(gh-6293): ... */ > + > +static struct iproto_stream * > +iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id) > +{ > + struct iproto_thread *iproto_thread = connection->iproto_thread; > + struct iproto_stream *stream = (struct iproto_stream *) > + mempool_alloc(&iproto_thread->stream_pool); > + if (stream == NULL) { > + diag_set(OutOfMemory, sizeof(*stream), > + "mempool_alloc", "stream"); > + return NULL; > + } > + errinj_stream_count_add(1); > + stailq_create(&stream->pending_requests); > + stream->id = stream_id; > + stream->connection = connection; > + return stream; > +} > + > /** > * Return true if we have not enough spare messages > * in the message pool. > @@ -576,6 +647,14 @@ iproto_msg_delete(struct iproto_msg *msg) > iproto_resume(iproto_thread); > } > > +static void > +iproto_stream_delete(struct iproto_stream *stream) > +{ > + assert(stailq_empty(&stream->pending_requests)); > + errinj_stream_count_add(-1); > + mempool_free(&stream->connection->iproto_thread->stream_pool, stream); > +} > + > static struct iproto_msg * > iproto_msg_new(struct iproto_connection *con) > { > @@ -594,6 +673,7 @@ iproto_msg_new(struct iproto_connection *con) > } > msg->close_connection = false; > msg->connection = con; > + msg->stream = NULL; > rmean_collect(con->iproto_thread->rmean, IPROTO_REQUESTS, 1); > return msg; > } > @@ -821,6 +901,63 @@ iproto_connection_input_buffer(struct iproto_connection *con) > return new_ibuf; > } > > +/** > + * Check if message belongs to stream (stream_id != 0), and if it > + * is so create new stream or get stream from connection streams > + * hash table. Put message to stream pending messages list. > + * @retval 0 - the message is ready to push to TX thread (either if > + * stream_id is not set (is zero) or the stream is not > + * processing other messages). > + * 1 - the message is postponed because its stream is busy > + * processing previous message(s). > + * -1 - memory error. > + */ > +static int > +iproto_msg_set_stream(struct iproto_msg *msg) > +{ > + uint64_t stream_id = msg->header.stream_id; > + if (stream_id == 0) > + return 0; > + > + struct iproto_connection *con = msg->connection; > + struct iproto_stream *stream = NULL; > + mh_int_t pos = mh_i64ptr_find(con->streams, stream_id, 0); > + if (pos == mh_end(con->streams)) { > + stream = iproto_stream_new(msg->connection, msg->header.stream_id); > + if (stream == NULL) > + return -1; > + struct mh_i64ptr_node_t node; > + node.key = stream_id; > + node.val = stream; > + pos = mh_i64ptr_put(con->streams, &node, NULL, NULL); > + if (pos == mh_end(con->streams)) { > + iproto_stream_delete(stream); > + diag_set(OutOfMemory, pos + 1, "mh_streams_put", > + "mh_streams_node"); > + return -1; > + } > + } > + /* > + * Not all messages belongs to stream. We can't determine which > + * messages belong to stream in `iproto_msg_new`, so we increment > + * ERRINJ_IPROTO_STREAM_MSG_COUNT here, when we already know it. > + * In `iproto_msg_delete` we decrement ERRINJ_IPROTO_STREAM_MSG_COUNT > + * only if msg->stream != NULL. > + */ > + errinj_stream_msg_count_add(1); > + stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val; > + msg->stream = stream; > + /* > + * If the request queue in the stream is not empty, it means > + * that some previous message wasn't processed yet. Regardless > + * of this, we put the message in the queue, but we start processing > + * the message only if the message queue in the stream was empty. > + */ > + bool was_not_empty = !stailq_empty(&stream->pending_requests); > + stailq_add_tail_entry(&stream->pending_requests, msg, in_stream); > + return was_not_empty ? 1 : 0; > +} > + > /** > * Enqueue all requests which were read up. If a request limit is > * reached - stop the connection input even if not the whole batch > @@ -830,7 +967,7 @@ iproto_connection_input_buffer(struct iproto_connection *con) > * @param in Buffer to parse. > * > * @retval 0 Success. > - * @retval -1 Invalid MessagePack error. > + * @retval -1 Invalid MessagePack or memory error. > */ > static inline int > iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) > @@ -883,12 +1020,25 @@ err_msgpack: > msg->len = reqend - reqstart; /* total request length */ > > iproto_msg_decode(msg, &pos, reqend, &stop_input); > + > + int rc = iproto_msg_set_stream(msg); > + if (rc < 0) { > + iproto_msg_delete(msg); > + return -1; > + } > /* > - * This can't throw, but should not be > - * done in case of exception. > + * rc > 0, means that stream pending requests queue is not > + * empty, skip push. > */ > - cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base); > - n_requests++; > + if (rc == 0) { > + /* > + * This can't throw, but should not be > + * done in case of exception. > + */ > + cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base); > + n_requests++; > + } > + > /* Request is parsed */ > assert(reqend > reqstart); > assert(con->parse_size >= (size_t) (reqend - reqstart)); > @@ -1130,6 +1280,13 @@ iproto_connection_new(struct iproto_thread *iproto_thread, int fd) > diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con"); > return NULL; > } > + con->streams = mh_i64ptr_new(); > + if (con->streams == NULL) { > + diag_set(OutOfMemory, sizeof(*(con->streams)), > + "mh_streams_new", "streams"); > + mempool_free(&con->iproto_thread->iproto_connection_pool, con); > + return NULL; > + } > con->iproto_thread = iproto_thread; > con->input.data = con->output.data = con; > con->loop = loop(); > @@ -1178,6 +1335,9 @@ iproto_connection_delete(struct iproto_connection *con) > con->obuf[0].iov[0].iov_base == NULL); > assert(con->obuf[1].pos == 0 && > con->obuf[1].iov[0].iov_base == NULL); > + > + assert(mh_size(con->streams) == 0); > + mh_i64ptr_delete(con->streams); > mempool_free(&con->iproto_thread->iproto_connection_pool, con); > } > > @@ -1225,7 +1385,9 @@ static void > iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, > bool *stop_input) > { > + uint64_t stream_id; > uint8_t type; > + bool request_is_not_for_stream; > struct iproto_thread *iproto_thread = msg->connection->iproto_thread; > > if (xrow_header_decode(&msg->header, pos, reqend, true)) > @@ -1233,6 +1395,16 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, > assert(*pos == reqend); > > type = msg->header.type; > + stream_id = msg->header.stream_id; > + request_is_not_for_stream = > + ((type > IPROTO_TYPE_STAT_MAX && > + type != IPROTO_PING) || type == IPROTO_AUTH); > + > + if (stream_id != 0 && request_is_not_for_stream) { > + diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM, > + (uint32_t) type); > + goto error; > + } > > /* > * Parse request before putting it into the queue > @@ -1326,6 +1498,7 @@ tx_fiber_init(struct session *session, uint64_t sync) > fiber_set_user(f, &session->credentials); > } > > + Extra new line. Please remove. > static void > tx_process_disconnect(struct cmsg *m) > { > @@ -1857,7 +2030,38 @@ net_send_msg(struct cmsg *m) > { > struct iproto_msg *msg = (struct iproto_msg *) m; > struct iproto_connection *con = msg->connection; > + struct iproto_stream *stream = msg->stream; > + struct iproto_msg *tmp; > + > + if (stream == NULL) > + goto send_msg; > + > + tmp = stailq_shift_entry(&stream->pending_requests, > + struct iproto_msg, in_stream); > + assert(tmp == msg); > + (void)tmp; > + errinj_stream_msg_count_add(-1); > + > + if (stailq_empty(&stream->pending_requests)) { > + struct mh_i64ptr_node_t node = { stream->id, NULL }; > + mh_i64ptr_remove(con->streams, &node, 0); > + iproto_stream_delete(stream); > + } else { > + /* > + * If there are new messages for this stream > + * then schedule their processing. > + */ > + struct iproto_msg *next = > + stailq_first_entry(&stream->pending_requests, > + struct iproto_msg, > + in_stream); > + assert(next != NULL); > + next->wpos = con->wpos; > + cpipe_push_input(&con->iproto_thread->tx_pipe, &next->base); > + cpipe_flush_input(&con->iproto_thread->tx_pipe); > + } Would be nice to factor this out into a separate helper function. > > +send_msg: > if (msg->len != 0) { > /* Discard request (see iproto_enqueue_batch()). */ > msg->p_ibuf->rpos += msg->len; > @@ -2045,6 +2249,8 @@ net_cord_f(va_list ap) > sizeof(struct iproto_msg)); > mempool_create(&iproto_thread->iproto_connection_pool, &cord()->slabc, > sizeof(struct iproto_connection)); > + mempool_create(&iproto_thread->stream_pool, &cord()->slabc, > + sizeof(struct iproto_stream)); > > evio_service_init(loop(), &iproto_thread->binary, "binary", > iproto_on_accept, iproto_thread);