From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id A4E4E6EC40; Mon, 9 Aug 2021 17:39:07 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org A4E4E6EC40 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1628519947; bh=w/bLiepVzizmY6QqKDfTKJz+zyiJKKrnnMQKb1NbWVc=; h=To:Cc:Date:In-Reply-To:References:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=l/mn8ASav9G+Xzhe2hl19wp5um+1zM2iU1XOzy39YmVYDF/gS6l37w/J+7o2tbn4x qWZavOqxzxzZHVn9/p5gE0fJ8hwNvW/IoYIsWrVIvMIM6TgxCrCJ+kgIK3a8pD7yPC QwzVKkMNc6Q7PaHuxR57TjPpPj0hU+s0TXsgyA/4= Received: from smtp30.i.mail.ru (smtp30.i.mail.ru [94.100.177.90]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 649A16EC44 for ; Mon, 9 Aug 2021 17:38:06 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 649A16EC44 Received: by smtp30.i.mail.ru with esmtpa (envelope-from ) id 1mD6Pl-0007bZ-88; Mon, 09 Aug 2021 17:38:05 +0300 To: tarantool-patches@dev.tarantool.org, vdavydov@tarantool.org, v.shpilevoy@tarantool.org Cc: mechanik20051988 Date: Mon, 9 Aug 2021 17:37:57 +0300 Message-Id: <76a20cba3d19c381fa7598083231919250fcfdeb.1628519206.git.mechanik20051988@tarantool.org> X-Mailer: git-send-email 2.20.1 In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-7564579A: EEAE043A70213CC8 X-77F55803: 4F1203BC0FB41BD92087353F0EC44DD91BCCB18F2C129F87F36E61E9E4584E9D182A05F53808504005914CC316983164BBCA1189B8BB6AF40DC8328C5A340ABB04EFD275A3A55D2D X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE728F774C865CF4B07EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006378D70459430292EC88638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D8ACFA2121AC5FBD184D4D25CBED224553117882F4460429724CE54428C33FAD305F5C1EE8F4F765FCAE9A1BBD95851C5BA471835C12D1D9774AD6D5ED66289B52BA9C0B312567BB23117882F4460429728776938767073520B1593CA6EC85F86DCB629EEF1311BF91D2E47CDBA5A96583BA9C0B312567BB2376E601842F6C81A19E625A9149C048EEFAD5A440E159F97D1B0CC92B5A49C88ED8FC6C240DEA7642DBF02ECDB25306B2B78CF848AE20165D0A6AB1C7CE11FEE3D95D32202655EC456136E347CC761E07C4224003CC836476EA7A3FFF5B025636E2021AF6380DFAD1A18204E546F3947CB11811A4A51E3B096D1867E19FE1407959CC434672EE6371089D37D7C0E48F6C8AA50765F79006375F69D999597C3EA2EFF80C71ABB335746BA297DBC24807EABDAD6C7F3747799A X-B7AD71C0: AC4F5C86D027EB782CDD5689AFBDA7A213B5FB47DCBC3458F0AFF96BAACF4158235E5A14AD4A4A4625E192CAD1D9E79D94463893BF8742D0C89EBE830F5F1225 X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8183A4AFAF3EA6BDC44E1F4276B809941966B668463F4F89F86E7E9BCD9A0ADF5F429439AA6C9AB8BF19C2B6934AE262D3EE7EAB7254005DCED7532B743992DF240BDC6A1CF3F042BAD6DF99611D93F60EF6EFF71F1B3C06F72699F904B3F4130E343918A1A30D5E7FCCB5012B2E24CD356 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34F0A5F58274334C95077B80C5ACE5839BCF5DAF7C69FC92E6F856677ED7A23DCC7FAC25E8CED088D81D7E09C32AA3244C60BF9BEFD3300F3E3EB280D2F647DEDC3C6EB905E3A8056B927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojGhQhWEp1aB9J/aXQYspgRg== X-Mailru-Sender: 583F1D7ACE8F49BD29FC049B2A5BF96391A1E5AAB912782F23EFE651AE5C88954D721902B9AA2386B79567116EAC6FCF4E830D9205DBEA545646F0D3C63A617F27ACC94E9A535D22112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: [Tarantool-patches] [PATCH v2 5/8] iproto: implement streams in iproto X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: mechanik20051988 via Tarantool-patches Reply-To: mechanik20051988 Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" From: mechanik20051988 Implement streams in iproto. There is a hash table of streams for each connection. When a new request comes with a non-zero stream ID, we look for the stream with such ID in this table and if it does not exist, we create it. The request is placed in the queue of pending requests, and if this queue was empty at the time of its receipt, it is pushed to the tx thread for processing. When a request belonging to stream returns to the network thread after processing is completed, we take the next request out of the queue of pending requests and send it for processing to tx thread. If there is no pending requests we remove stream object from hash table and destroy it. Requests with zero stream ID are processed in the old way. Part of #5860 @TarantoolBot document Title: streams are implemented in iproto A distinctive feature of streams is that all requests in them are processed sequentially. The execution of the next request in stream will not start until the previous one is completed. To separate requests belonging to and not belonging to streams we use stream ID field in binary iproto protocol: requests with non-zero stream ID belongs to some stream. Stream ID is unique within the connection and indicates which stream the request belongs to. For streams from different connections, the IDs may be the same. --- src/box/errcode.h | 1 + src/box/iproto.cc | 228 ++++++++++++++++++++++++++++++++++++++++- src/lib/core/errinj.h | 2 + test/box/errinj.result | 2 + test/box/error.result | 1 + 5 files changed, 229 insertions(+), 5 deletions(-) diff --git a/src/box/errcode.h b/src/box/errcode.h index ef2b2e9b1..f8fda23c1 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -281,6 +281,7 @@ struct errcode_record { /*226 */_(ER_NOT_LEADER, "The instance is not a leader. New leader is %u")\ /*227 */_(ER_SYNC_QUEUE_UNCLAIMED, "The synchronous transaction queue doesn't belong to any instance")\ /*228 */_(ER_SYNC_QUEUE_FOREIGN, "The synchronous transaction queue belongs to other instance with id %u")\ + /*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process %s request in stream") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/box/iproto.cc b/src/box/iproto.cc index dcf60e1be..3b792130b 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -64,6 +64,8 @@ #include "execute.h" #include "errinj.h" #include "tt_static.h" +#include "salad/stailq.h" +#include "assoc.h" enum { IPROTO_SALT_SIZE = 32, @@ -74,6 +76,21 @@ enum { ENDPOINT_NAME_MAX = 10 }; +struct iproto_connection; + +struct iproto_stream { + /** + * Queue of pending requests (iproto messages) for this stream, + * processed sequentially. This field is accesable only from + * iproto thread. Queue items has iproto_msg type. + */ + struct stailq pending_requests; + /** Id of this stream, used as a key in streams hash table */ + uint64_t id; + /** This stream connection */ + struct iproto_connection *connection; +}; + /** * A position in connection output buffer. * Since we use rotating buffers to recycle memory, @@ -136,6 +153,7 @@ struct iproto_thread { */ struct mempool iproto_msg_pool; struct mempool iproto_connection_pool; + struct mempool iproto_stream_pool; /* * List of stopped connections */ @@ -304,6 +322,16 @@ struct iproto_msg * and the connection must be closed. */ bool close_connection; + /** + * A stailq_entry to hold message in stream. + * All messages processed in stream sequently. Before processing + * all messages added to queue of pending requests. If this queue + * was empty message begins to be processed, otherwise it waits until + * all previous messages are processed. + */ + struct stailq_entry in_stream; + /** Stream that owns this message, or NULL. */ + struct iproto_stream *stream; }; static struct iproto_msg * @@ -505,6 +533,11 @@ struct iproto_connection */ enum iproto_connection_state state; struct rlist in_stop_list; + /** + * Hash table that holds all streams for this connection. + * This field is accesable only from iproto thread. + */ + struct mh_i64ptr_t *streams; /** * Kharon is used to implement box.session.push(). * When a new push is ready, tx uses kharon to notify @@ -572,6 +605,48 @@ struct iproto_connection } while (0); #endif +/* + * TODO(gh-6293): Implement necessary statistic for iproto streams + * and remove it from errinj. + */ +static inline void +errinj_stream_count_add(MAYBE_UNUSED int val) +{ +#ifndef NDEBUG + struct errinj *inj = + errinj(ERRINJ_IPROTO_STREAM_COUNT, ERRINJ_INT); + __atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST); +#endif +} + +static inline void +errinj_stream_msg_count_add(MAYBE_UNUSED int val) +{ +#ifndef NDEBUG + struct errinj *inj = + errinj(ERRINJ_IPROTO_STREAM_MSG_COUNT, ERRINJ_INT); + __atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST); +#endif +} + +static struct iproto_stream * +iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id) +{ + struct iproto_thread *iproto_thread = connection->iproto_thread; + struct iproto_stream *stream = (struct iproto_stream *) + mempool_alloc(&iproto_thread->iproto_stream_pool); + if (stream == NULL) { + diag_set(OutOfMemory, sizeof(*stream), + "mempool_alloc", "stream"); + return NULL; + } + errinj_stream_count_add(1); + stailq_create(&stream->pending_requests); + stream->id = stream_id; + stream->connection = connection; + return stream; +} + /** * Return true if we have not enough spare messages * in the message pool. @@ -591,6 +666,14 @@ iproto_msg_delete(struct iproto_msg *msg) iproto_resume(iproto_thread); } +static void +iproto_stream_delete(struct iproto_stream *stream) +{ + assert(stailq_empty(&stream->pending_requests)); + errinj_stream_count_add(-1); + mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream); +} + static struct iproto_msg * iproto_msg_new(struct iproto_connection *con) { @@ -609,6 +692,7 @@ iproto_msg_new(struct iproto_connection *con) } msg->close_connection = false; msg->connection = con; + msg->stream = NULL; rmean_collect(con->iproto_thread->rmean, IPROTO_REQUESTS, 1); return msg; } @@ -836,6 +920,63 @@ iproto_connection_input_buffer(struct iproto_connection *con) return new_ibuf; } +/** + * Check if message belongs to stream (stream_id != 0), and if it + * is so create new stream or get stream from connection streams + * hash table. Put message to stream pending messages list. + * @retval 0 - the message is ready to push to TX thread (either if + * stream_id is not set (is zero) or the stream is not + * processing other messages). + * 1 - the message is postponed because its stream is busy + * processing previous message(s). + * -1 - memory error. + */ +static int +iproto_msg_start_processing_in_stream(struct iproto_msg *msg) +{ + uint64_t stream_id = msg->header.stream_id; + if (stream_id == 0) + return 0; + + struct iproto_connection *con = msg->connection; + struct iproto_stream *stream = NULL; + mh_int_t pos = mh_i64ptr_find(con->streams, stream_id, 0); + if (pos == mh_end(con->streams)) { + stream = iproto_stream_new(msg->connection, msg->header.stream_id); + if (stream == NULL) + return -1; + struct mh_i64ptr_node_t node; + node.key = stream_id; + node.val = stream; + pos = mh_i64ptr_put(con->streams, &node, NULL, NULL); + if (pos == mh_end(con->streams)) { + iproto_stream_delete(stream); + diag_set(OutOfMemory, pos + 1, "mh_streams_put", + "mh_streams_node"); + return -1; + } + } + /* + * Not all messages belongs to stream. We can't determine which + * messages belong to stream in `iproto_msg_new`, so we increment + * ERRINJ_IPROTO_STREAM_MSG_COUNT here, when we already know it. + * In `iproto_msg_delete` we decrement ERRINJ_IPROTO_STREAM_MSG_COUNT + * only if msg->stream != NULL. + */ + errinj_stream_msg_count_add(1); + stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val; + msg->stream = stream; + /* + * If the request queue in the stream is not empty, it means + * that some previous message wasn't processed yet. Regardless + * of this, we put the message in the queue, but we start processing + * the message only if the message queue in the stream was empty. + */ + bool was_not_empty = !stailq_empty(&stream->pending_requests); + stailq_add_tail_entry(&stream->pending_requests, msg, in_stream); + return was_not_empty ? 1 : 0; +} + /** * Enqueue all requests which were read up. If a request limit is * reached - stop the connection input even if not the whole batch @@ -845,7 +986,7 @@ iproto_connection_input_buffer(struct iproto_connection *con) * @param in Buffer to parse. * * @retval 0 Success. - * @retval -1 Invalid MessagePack error. + * @retval -1 Invalid MessagePack or memory error. */ static inline int iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) @@ -898,12 +1039,25 @@ err_msgpack: msg->len = reqend - reqstart; /* total request length */ iproto_msg_decode(msg, &pos, reqend, &stop_input); + + int rc = iproto_msg_start_processing_in_stream(msg); + if (rc < 0) { + iproto_msg_delete(msg); + return -1; + } /* - * This can't throw, but should not be - * done in case of exception. + * rc > 0, means that stream pending requests queue is not + * empty, skip push. */ - cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base); - n_requests++; + if (rc == 0) { + /* + * This can't throw, but should not be + * done in case of exception. + */ + cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base); + n_requests++; + } + /* Request is parsed */ assert(reqend > reqstart); assert(con->parse_size >= (size_t) (reqend - reqstart)); @@ -1145,6 +1299,13 @@ iproto_connection_new(struct iproto_thread *iproto_thread, int fd) diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con"); return NULL; } + con->streams = mh_i64ptr_new(); + if (con->streams == NULL) { + diag_set(OutOfMemory, sizeof(*(con->streams)), + "mh_streams_new", "streams"); + mempool_free(&con->iproto_thread->iproto_connection_pool, con); + return NULL; + } con->iproto_thread = iproto_thread; con->input.data = con->output.data = con; con->loop = loop(); @@ -1193,6 +1354,9 @@ iproto_connection_delete(struct iproto_connection *con) con->obuf[0].iov[0].iov_base == NULL); assert(con->obuf[1].pos == 0 && con->obuf[1].iov[0].iov_base == NULL); + + assert(mh_size(con->streams) == 0); + mh_i64ptr_delete(con->streams); mempool_free(&con->iproto_thread->iproto_connection_pool, con); } @@ -1240,7 +1404,9 @@ static void iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, bool *stop_input) { + uint64_t stream_id; uint8_t type; + bool request_is_not_for_stream; struct iproto_thread *iproto_thread = msg->connection->iproto_thread; if (xrow_header_decode(&msg->header, pos, reqend, true)) @@ -1248,6 +1414,16 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, assert(*pos == reqend); type = msg->header.type; + stream_id = msg->header.stream_id; + request_is_not_for_stream = + ((type > IPROTO_TYPE_STAT_MAX && + type != IPROTO_PING) || type == IPROTO_AUTH); + + if (stream_id != 0 && request_is_not_for_stream) { + diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM, + iproto_type_name(type)); + goto error; + } /* * Parse request before putting it into the queue @@ -1873,12 +2049,52 @@ tx_process_replication(struct cmsg *m) } } +static void +iproto_msg_finish_processing_in_stream(struct iproto_msg *msg) +{ + struct iproto_connection *con = msg->connection; + struct iproto_stream *stream = msg->stream; + + assert(stream != NULL); + struct iproto_msg *tmp = + stailq_shift_entry(&stream->pending_requests, + struct iproto_msg, in_stream); + assert(tmp == msg); + (void)tmp; + errinj_stream_msg_count_add(-1); + + if (stailq_empty(&stream->pending_requests)) { + struct mh_i64ptr_node_t node = { stream->id, NULL }; + mh_i64ptr_remove(con->streams, &node, 0); + iproto_stream_delete(stream); + } else { + /* + * If there are new messages for this stream + * then schedule their processing. + */ + struct iproto_msg *next = + stailq_first_entry(&stream->pending_requests, + struct iproto_msg, + in_stream); + assert(next != NULL); + next->wpos = con->wpos; + cpipe_push_input(&con->iproto_thread->tx_pipe, &next->base); + cpipe_flush_input(&con->iproto_thread->tx_pipe); + } +} + static void net_send_msg(struct cmsg *m) { struct iproto_msg *msg = (struct iproto_msg *) m; struct iproto_connection *con = msg->connection; + struct iproto_stream *stream = msg->stream; + + if (stream == NULL) + goto send_msg; + iproto_msg_finish_processing_in_stream(msg); +send_msg: if (msg->len != 0) { /* Discard request (see iproto_enqueue_batch()). */ msg->p_ibuf->rpos += msg->len; @@ -2066,6 +2282,8 @@ net_cord_f(va_list ap) sizeof(struct iproto_msg)); mempool_create(&iproto_thread->iproto_connection_pool, &cord()->slabc, sizeof(struct iproto_connection)); + mempool_create(&iproto_thread->iproto_stream_pool, &cord()->slabc, + sizeof(struct iproto_stream)); evio_service_init(loop(), &iproto_thread->binary, "binary", iproto_on_accept, iproto_thread); diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h index 3fe4c7c22..01b3eddef 100644 --- a/src/lib/core/errinj.h +++ b/src/lib/core/errinj.h @@ -155,6 +155,8 @@ struct errinj { _(ERRINJ_IPROTO_WRITE_ERROR_DELAY, ERRINJ_BOOL, {.bparam = false})\ _(ERRINJ_APPLIER_READ_TX_ROW_DELAY, ERRINJ_BOOL, {.bparam = false})\ _(ERRINJ_NETBOX_IO_DELAY, ERRINJ_BOOL, {.bparam = false}) \ + _(ERRINJ_IPROTO_STREAM_COUNT, ERRINJ_INT, {.iparam = 0}) \ + _(ERRINJ_IPROTO_STREAM_MSG_COUNT, ERRINJ_INT, {.iparam = 0}) \ ENUM0(errinj_id, ERRINJ_LIST); extern struct errinj errinjs[]; diff --git a/test/box/errinj.result b/test/box/errinj.result index adb682ac3..8b45cdefc 100644 --- a/test/box/errinj.result +++ b/test/box/errinj.result @@ -59,6 +59,8 @@ evals - ERRINJ_INDEX_ALLOC: false - ERRINJ_INDEX_RESERVE: false - ERRINJ_IPROTO_SINGLE_THREAD_STAT: -1 + - ERRINJ_IPROTO_STREAM_COUNT: 0 + - ERRINJ_IPROTO_STREAM_MSG_COUNT: 0 - ERRINJ_IPROTO_TX_DELAY: false - ERRINJ_IPROTO_WRITE_ERROR_DELAY: false - ERRINJ_LOG_ROTATE: false diff --git a/test/box/error.result b/test/box/error.result index b7ac7a138..f80fdfed5 100644 --- a/test/box/error.result +++ b/test/box/error.result @@ -447,6 +447,7 @@ t; | 226: box.error.NOT_LEADER | 227: box.error.SYNC_QUEUE_UNCLAIMED | 228: box.error.SYNC_QUEUE_FOREIGN + | 229: box.error.UNABLE_TO_PROCESS_IN_STREAM | ... test_run:cmd("setopt delimiter ''"); -- 2.20.1