From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng1.m.smailru.net (smtpng1.m.smailru.net [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id D0965446439 for ; Wed, 28 Oct 2020 01:06:59 +0300 (MSK) From: Vladislav Shpilevoy References: <20201019094949.22232-1-i.kosarev@tarantool.org> Message-ID: <722075b7-821c-9b8f-0f6b-1ada811dea47@tarantool.org> Date: Tue, 27 Oct 2020 23:06:58 +0100 MIME-Version: 1.0 In-Reply-To: <20201019094949.22232-1-i.kosarev@tarantool.org> Content-Type: text/plain; charset="utf-8" Content-Language: en-US Content-Transfer-Encoding: 8bit Subject: Re: [Tarantool-patches] [PATCH v4] iproto: make iproto thread more independent from tx List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Ilya Kosarev Cc: tarantool-patches@dev.tarantool.org Спасибо за патч! Смотри 8 комментов далее по тексту. > diff --git a/src/box/iproto.cc b/src/box/iproto.cc > index b8f65e5ec..45b1db6ff 100644 > --- a/src/box/iproto.cc > +++ b/src/box/iproto.cc > @@ -280,6 +283,11 @@ static struct cord net_cord; > * in the tx thread. > */ > static struct slab_cache net_slabc; > +/** > + * Slab cache used for allocating memory for output network buffers > + * in the iproto thread. > + */ > +static struct slab_cache iproto_slabc; 1. В патче почти все нормально в целом, кроме вот этой темы, что один и тот же obuf сначала создается для ипрото потока, и потом тот же obuf надо создавать для tx потока. Причем пока он принадлежит ипрото, он все еще находится в struct iproto_connection.tx. Зачем ипрото этот obuf вообще? Он ведь туда ничего кроме ошибок не может писать. А ошибки мы пишем напрямую через sio_writev, когда они происходят в iproto потоке. Так во всяком случае было раньше. И если оставить этот костыль - это цена того, что obuf останется принадлежать tx потоку, то я за. > struct rmean *rmean_net; > > @@ -650,7 +667,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con) > * other parts of the connection. > */ > con->state = IPROTO_CONNECTION_DESTROYED; > - cpipe_push(&tx_pipe, &con->destroy_msg); > + if (con->session != NULL) > + cpipe_push(&tx_pipe, &con->destroy_msg); > + else { 2. Если else имеет {}, if тоже должен их иметь. > + /* > + * In case session was not created we can safely destroy > + * not involving tx thread. Thus we also need to destroy > + * obuf, which still belongs to iproto thread. > + */ > + obuf_destroy(&con->obuf[0]); > + obuf_destroy(&con->obuf[1]); 3. Комментарии в struct iproto_connection до сих пор утверждают, что obuf принадлежит ипрото треду полностью. > + iproto_connection_delete(con); > + } > } > @@ -1134,10 +1182,6 @@ iproto_connection_delete(struct iproto_connection *con) > assert(!evio_has_fd(&con->input)); > assert(con->session == NULL); > assert(con->state == IPROTO_CONNECTION_DESTROYED); > - /* > - * The output buffers must have been deleted > - * in tx thread. > - */ 4. Не надо это вообще без коммента оставлять. Надо нормально расписать, как и кто теперь удаляет эти буферы. > ibuf_destroy(&con->ibuf[0]); > ibuf_destroy(&con->ibuf[1]); > assert(con->obuf[0].pos == 0 && > @@ -1167,13 +1211,13 @@ static void > tx_process_sql(struct cmsg *msg); > > static void > -tx_reply_error(struct iproto_msg *msg); > +reply_error(struct iproto_msg *msg); > > static void > tx_reply_iproto_error(struct cmsg *m); > > static void > -net_send_msg(struct cmsg *msg); > +obuf_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos); 5. Функция называется obuf, и при этом ни один из аргументов не obuf. Выглядит не очень. Комментарий у нее тоже устарел - он говорит, что wpos - Last flushed write position, received from iproto thread. Но она теперь зовется из ипрото треда иногда. Еще сильно не хватает нормального разделения, как понять, кому сейчас принадлежат obuf-ы. Только по session != NULL это не понять - она становится NULL после уничтожения сессии. Но обуфы от этого не стали ипрото треду принадлежать. Это сильно путает. Скорее всего это надо делать через новые состояния. Переименовать IPROTO_CONNECTION_ALIVE в IPROTO_CONNECTION_NEW. Это когда только создано. Обуфы принадлежат ипрото. Потом добавить состояние IPROTO_CONNECTION_ALIVE - это когда уже ушли запросы в тх тред, и обуфы принадлежат ему. Тогда можно будет убрать obuf_destroy(), например, из iproto_connection_try_to_start_destroy(). Сейчас ты там делаешь: obuf_destroy(&con->obuf[0]); obuf_destroy(&con->obuf[1]); iproto_connection_delete(con); В итоге у тебя delete не делает часть работы. И не способен проверить, что работа сделана, так как состояние и session != NULL там ни о чем не говорят. Еще obuf_destroy() можно будет скрыть из iproto_enqueue_batch() в новую функцию типа iproto_connection_activate(), которая будет проверять, что предыдущее состояние - это NEW, будет ставить ACTIVE, и уничтожать обуфы. Если она уже ACTIVE, то ничего не будет делать. Это все имеет смысл, если есть какие-то доказательства, что нельзя перестать использовать obuf в iproto потоке. Пока я вижу только одно применение ему в iproto потоке - чтоб записать ошибку в iproto_msg_decode. > static void > net_send_error(struct cmsg *msg); > @@ -1244,7 +1288,19 @@ static const struct cmsg_hop error_route[] = { > { net_send_error, NULL }, > }; > > -static void > +/** > + * Decode the request from @a pos to @a reqend and put it into the queue. > + * @param msg[inout] iproto message. > + * @param pos[inout] the start of the request to parse. > + * @param reqend the end of the request to parse. > + * @param stop_input[out] in case JOIN is running we need to stop IO. > + * > + * @retval -1 No route was inited. Decoding was not successfulll and 6. successfulll -> successful. > + * the session is not inited, thus we will process the error in iproto. > + * @retval 0 The route corresponding to the request or the error route > + * was successfully inited. > + */ > +static int > iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, > bool *stop_input) > { > @@ -1382,10 +1448,6 @@ tx_process_destroy(struct cmsg *m) > session_destroy(con->session); > con->session = NULL; /* safety */ > } > - /* > - * Got to be done in iproto thread since > - * that's where the memory is allocated. > - */ 7. Комментарий не надо удалять. Напиши новый с объяснением, почему здесь эти буферы можно удалять. И можно добавить проверку, что state == IPROTO_CONNECTION_DESTROYED. > obuf_destroy(&con->obuf[0]); > obuf_destroy(&con->obuf[1]); > } > @@ -1865,6 +1964,30 @@ tx_process_replication(struct cmsg *m) > } > } > > +/** > + * Check connection health and try to send an error to the client > + * in case of internal connection init or on_connect trigger failure. > + */ > +static bool > +iproto_connection_check_valid(struct iproto_msg *msg) > +{ > + if (!msg->close_connection) > + return false; 8. Функция называется check_valid, а на самом деле она удостоверяется, что коннект not valid. Не выглядит странно это тебе? > + struct iproto_connection *con = msg->connection; > + int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov, > + obuf_iovcnt(msg->wpos.obuf)); > + if (nwr > 0) { > + /* Count statistics. */ > + rmean_collect(rmean_net, IPROTO_SENT, nwr); > + } else if (nwr < 0 && ! sio_wouldblock(errno)) { > + diag_log(); > + } > + assert(iproto_connection_is_idle(con)); > + iproto_connection_close(con); > + iproto_msg_delete(msg); > + return true; > +}