[Tarantool-patches] [PATCH v4] iproto: make iproto thread more independent from tx

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Oct 28 01:06:58 MSK 2020


Спасибо за патч!

Смотри 8 комментов далее по тексту.

> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index b8f65e5ec..45b1db6ff 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -280,6 +283,11 @@ static struct cord net_cord;
>   * in the tx thread.
>   */
>  static struct slab_cache net_slabc;
> +/**
> + * Slab cache used for allocating memory for output network buffers
> + * in the iproto thread.
> + */
> +static struct slab_cache iproto_slabc;

1. В патче почти все нормально в целом, кроме вот этой темы, что один
и тот же obuf сначала создается для ипрото потока, и потом тот же
obuf надо создавать для tx потока. Причем пока он принадлежит ипрото,
он все еще находится в struct iproto_connection.tx. Зачем ипрото этот
obuf вообще? Он ведь туда ничего кроме ошибок не может писать. А
ошибки мы пишем напрямую через sio_writev, когда они происходят в iproto
потоке. Так во всяком случае было раньше. И если оставить этот костыль -
это цена того, что obuf останется принадлежать tx потоку, то я за.

>  struct rmean *rmean_net;
>  
> @@ -650,7 +667,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con)
>  	 * other parts of the connection.
>  	 */
>  	con->state = IPROTO_CONNECTION_DESTROYED;
> -	cpipe_push(&tx_pipe, &con->destroy_msg);
> +	if (con->session != NULL)
> +		cpipe_push(&tx_pipe, &con->destroy_msg);
> +	else {

2. Если else имеет {}, if тоже должен их иметь.

> +		/*
> +		 * In case session was not created we can safely destroy
> +		 * not involving tx thread. Thus we also need to destroy
> +		 * obuf, which still belongs to iproto thread.
> +		 */
> +		obuf_destroy(&con->obuf[0]);
> +		obuf_destroy(&con->obuf[1]);

3. Комментарии в struct iproto_connection до сих пор утверждают,
что obuf принадлежит ипрото треду полностью.

> +		iproto_connection_delete(con);
> +	}
>  }
> @@ -1134,10 +1182,6 @@ iproto_connection_delete(struct iproto_connection *con)
>  	assert(!evio_has_fd(&con->input));
>  	assert(con->session == NULL);
>  	assert(con->state == IPROTO_CONNECTION_DESTROYED);
> -	/*
> -	 * The output buffers must have been deleted
> -	 * in tx thread.
> -	 */

4. Не надо это вообще без коммента оставлять. Надо нормально расписать,
как и кто теперь удаляет эти буферы.

>  	ibuf_destroy(&con->ibuf[0]);
>  	ibuf_destroy(&con->ibuf[1]);
>  	assert(con->obuf[0].pos == 0 &&
> @@ -1167,13 +1211,13 @@ static void
>  tx_process_sql(struct cmsg *msg);
>  
>  static void
> -tx_reply_error(struct iproto_msg *msg);
> +reply_error(struct iproto_msg *msg);
>  
>  static void
>  tx_reply_iproto_error(struct cmsg *m);
>  
>  static void
> -net_send_msg(struct cmsg *msg);
> +obuf_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos);

5. Функция называется obuf, и при этом ни один из аргументов не obuf.
Выглядит не очень. Комментарий у нее тоже устарел - он говорит, что
wpos - Last flushed write position, received from iproto thread. Но она
теперь зовется из ипрото треда иногда.

Еще сильно не хватает нормального разделения, как понять, кому сейчас
принадлежат obuf-ы. Только по session != NULL это не понять - она
становится NULL после уничтожения сессии. Но обуфы от этого не стали
ипрото треду принадлежать. Это сильно путает.

Скорее всего это надо делать через новые состояния. Переименовать
IPROTO_CONNECTION_ALIVE в IPROTO_CONNECTION_NEW. Это когда только создано.
Обуфы принадлежат ипрото. Потом добавить состояние
IPROTO_CONNECTION_ALIVE - это когда уже ушли запросы в тх тред, и обуфы
принадлежат ему.

Тогда можно будет убрать obuf_destroy(), например, из
iproto_connection_try_to_start_destroy(). Сейчас ты там делаешь:

	obuf_destroy(&con->obuf[0]);
	obuf_destroy(&con->obuf[1]);
	iproto_connection_delete(con);

В итоге у тебя delete не делает часть работы. И не способен проверить,
что работа сделана, так как состояние и session != NULL там ни о чем не
говорят.

Еще obuf_destroy() можно будет скрыть из iproto_enqueue_batch() в
новую функцию типа iproto_connection_activate(), которая будет проверять,
что предыдущее состояние - это NEW, будет ставить ACTIVE, и уничтожать
обуфы. Если она уже ACTIVE, то ничего не будет делать.

Это все имеет смысл, если есть какие-то доказательства, что нельзя
перестать использовать obuf в iproto потоке. Пока я вижу только одно
применение ему в iproto потоке - чтоб записать ошибку в iproto_msg_decode.

>  static void
>  net_send_error(struct cmsg *msg);
> @@ -1244,7 +1288,19 @@ static const struct cmsg_hop error_route[] = {
>  	{ net_send_error, NULL },
>  };
>  
> -static void
> +/**
> + * Decode the request from @a pos to @a reqend and put it into the queue.
> + * @param msg[inout] iproto message.
> + * @param pos[inout] the start of the request to parse.
> + * @param reqend the end of the request to parse.
> + * @param stop_input[out] in case JOIN is running we need to stop IO.
> + *
> + * @retval -1 No route was inited. Decoding was not successfulll and

6. successfulll -> successful.

> + * the session is not inited, thus we will process the error in iproto.
> + * @retval  0 The route corresponding to the request or the error route
> + * was successfully inited.
> + */
> +static int
>  iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>  		  bool *stop_input)
>  {
> @@ -1382,10 +1448,6 @@ tx_process_destroy(struct cmsg *m)
>  		session_destroy(con->session);
>  		con->session = NULL; /* safety */
>  	}
> -	/*
> -	 * Got to be done in iproto thread since
> -	 * that's where the memory is allocated.
> -	 */

7. Комментарий не надо удалять. Напиши новый с объяснением, почему
здесь эти буферы можно удалять. И можно добавить проверку, что
state == IPROTO_CONNECTION_DESTROYED.

>  	obuf_destroy(&con->obuf[0]);
>  	obuf_destroy(&con->obuf[1]);
>  }
> @@ -1865,6 +1964,30 @@ tx_process_replication(struct cmsg *m)
>  	}
>  }
>  
> +/**
> + * Check connection health and try to send an error to the client
> + * in case of internal connection init or on_connect trigger failure.
> + */
> +static bool
> +iproto_connection_check_valid(struct iproto_msg *msg)
> +{
> +	if (!msg->close_connection)
> +		return false;

8. Функция называется check_valid, а на самом деле она удостоверяется,
что коннект not valid. Не выглядит странно это тебе?

> +	struct iproto_connection *con = msg->connection;
> +	int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov,
> +				 obuf_iovcnt(msg->wpos.obuf));
> +	if (nwr > 0) {
> +		/* Count statistics. */
> +		rmean_collect(rmean_net, IPROTO_SENT, nwr);
> +	} else if (nwr < 0 && ! sio_wouldblock(errno)) {
> +		diag_log();
> +	}
> +	assert(iproto_connection_is_idle(con));
> +	iproto_connection_close(con);
> +	iproto_msg_delete(msg);
> +	return true;
> +}


More information about the Tarantool-patches mailing list