[Tarantool-patches] [PATCH v4] iproto: make iproto thread more independent from tx
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Wed Oct 28 01:06:58 MSK 2020
Спасибо за патч!
Смотри 8 комментов далее по тексту.
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index b8f65e5ec..45b1db6ff 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -280,6 +283,11 @@ static struct cord net_cord;
> * in the tx thread.
> */
> static struct slab_cache net_slabc;
> +/**
> + * Slab cache used for allocating memory for output network buffers
> + * in the iproto thread.
> + */
> +static struct slab_cache iproto_slabc;
1. В патче почти все нормально в целом, кроме вот этой темы, что один
и тот же obuf сначала создается для ипрото потока, и потом тот же
obuf надо создавать для tx потока. Причем пока он принадлежит ипрото,
он все еще находится в struct iproto_connection.tx. Зачем ипрото этот
obuf вообще? Он ведь туда ничего кроме ошибок не может писать. А
ошибки мы пишем напрямую через sio_writev, когда они происходят в iproto
потоке. Так во всяком случае было раньше. И если оставить этот костыль -
это цена того, что obuf останется принадлежать tx потоку, то я за.
> struct rmean *rmean_net;
>
> @@ -650,7 +667,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con)
> * other parts of the connection.
> */
> con->state = IPROTO_CONNECTION_DESTROYED;
> - cpipe_push(&tx_pipe, &con->destroy_msg);
> + if (con->session != NULL)
> + cpipe_push(&tx_pipe, &con->destroy_msg);
> + else {
2. Если else имеет {}, if тоже должен их иметь.
> + /*
> + * In case session was not created we can safely destroy
> + * not involving tx thread. Thus we also need to destroy
> + * obuf, which still belongs to iproto thread.
> + */
> + obuf_destroy(&con->obuf[0]);
> + obuf_destroy(&con->obuf[1]);
3. Комментарии в struct iproto_connection до сих пор утверждают,
что obuf принадлежит ипрото треду полностью.
> + iproto_connection_delete(con);
> + }
> }
> @@ -1134,10 +1182,6 @@ iproto_connection_delete(struct iproto_connection *con)
> assert(!evio_has_fd(&con->input));
> assert(con->session == NULL);
> assert(con->state == IPROTO_CONNECTION_DESTROYED);
> - /*
> - * The output buffers must have been deleted
> - * in tx thread.
> - */
4. Не надо это вообще без коммента оставлять. Надо нормально расписать,
как и кто теперь удаляет эти буферы.
> ibuf_destroy(&con->ibuf[0]);
> ibuf_destroy(&con->ibuf[1]);
> assert(con->obuf[0].pos == 0 &&
> @@ -1167,13 +1211,13 @@ static void
> tx_process_sql(struct cmsg *msg);
>
> static void
> -tx_reply_error(struct iproto_msg *msg);
> +reply_error(struct iproto_msg *msg);
>
> static void
> tx_reply_iproto_error(struct cmsg *m);
>
> static void
> -net_send_msg(struct cmsg *msg);
> +obuf_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos);
5. Функция называется obuf, и при этом ни один из аргументов не obuf.
Выглядит не очень. Комментарий у нее тоже устарел - он говорит, что
wpos - Last flushed write position, received from iproto thread. Но она
теперь зовется из ипрото треда иногда.
Еще сильно не хватает нормального разделения, как понять, кому сейчас
принадлежат obuf-ы. Только по session != NULL это не понять - она
становится NULL после уничтожения сессии. Но обуфы от этого не стали
ипрото треду принадлежать. Это сильно путает.
Скорее всего это надо делать через новые состояния. Переименовать
IPROTO_CONNECTION_ALIVE в IPROTO_CONNECTION_NEW. Это когда только создано.
Обуфы принадлежат ипрото. Потом добавить состояние
IPROTO_CONNECTION_ALIVE - это когда уже ушли запросы в тх тред, и обуфы
принадлежат ему.
Тогда можно будет убрать obuf_destroy(), например, из
iproto_connection_try_to_start_destroy(). Сейчас ты там делаешь:
obuf_destroy(&con->obuf[0]);
obuf_destroy(&con->obuf[1]);
iproto_connection_delete(con);
В итоге у тебя delete не делает часть работы. И не способен проверить,
что работа сделана, так как состояние и session != NULL там ни о чем не
говорят.
Еще obuf_destroy() можно будет скрыть из iproto_enqueue_batch() в
новую функцию типа iproto_connection_activate(), которая будет проверять,
что предыдущее состояние - это NEW, будет ставить ACTIVE, и уничтожать
обуфы. Если она уже ACTIVE, то ничего не будет делать.
Это все имеет смысл, если есть какие-то доказательства, что нельзя
перестать использовать obuf в iproto потоке. Пока я вижу только одно
применение ему в iproto потоке - чтоб записать ошибку в iproto_msg_decode.
> static void
> net_send_error(struct cmsg *msg);
> @@ -1244,7 +1288,19 @@ static const struct cmsg_hop error_route[] = {
> { net_send_error, NULL },
> };
>
> -static void
> +/**
> + * Decode the request from @a pos to @a reqend and put it into the queue.
> + * @param msg[inout] iproto message.
> + * @param pos[inout] the start of the request to parse.
> + * @param reqend the end of the request to parse.
> + * @param stop_input[out] in case JOIN is running we need to stop IO.
> + *
> + * @retval -1 No route was inited. Decoding was not successfulll and
6. successfulll -> successful.
> + * the session is not inited, thus we will process the error in iproto.
> + * @retval 0 The route corresponding to the request or the error route
> + * was successfully inited.
> + */
> +static int
> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
> bool *stop_input)
> {
> @@ -1382,10 +1448,6 @@ tx_process_destroy(struct cmsg *m)
> session_destroy(con->session);
> con->session = NULL; /* safety */
> }
> - /*
> - * Got to be done in iproto thread since
> - * that's where the memory is allocated.
> - */
7. Комментарий не надо удалять. Напиши новый с объяснением, почему
здесь эти буферы можно удалять. И можно добавить проверку, что
state == IPROTO_CONNECTION_DESTROYED.
> obuf_destroy(&con->obuf[0]);
> obuf_destroy(&con->obuf[1]);
> }
> @@ -1865,6 +1964,30 @@ tx_process_replication(struct cmsg *m)
> }
> }
>
> +/**
> + * Check connection health and try to send an error to the client
> + * in case of internal connection init or on_connect trigger failure.
> + */
> +static bool
> +iproto_connection_check_valid(struct iproto_msg *msg)
> +{
> + if (!msg->close_connection)
> + return false;
8. Функция называется check_valid, а на самом деле она удостоверяется,
что коннект not valid. Не выглядит странно это тебе?
> + struct iproto_connection *con = msg->connection;
> + int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov,
> + obuf_iovcnt(msg->wpos.obuf));
> + if (nwr > 0) {
> + /* Count statistics. */
> + rmean_collect(rmean_net, IPROTO_SENT, nwr);
> + } else if (nwr < 0 && ! sio_wouldblock(errno)) {
> + diag_log();
> + }
> + assert(iproto_connection_is_idle(con));
> + iproto_connection_close(con);
> + iproto_msg_delete(msg);
> + return true;
> +}
More information about the Tarantool-patches
mailing list