[Tarantool-patches] [PATCH v5] iproto: move greeting from tx thread to iproto
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Fri Dec 11 00:52:39 MSK 2020
Hi! Thanks for the patch!
I suggest you to invite Alexander L. to start a second review. We
don't have much time until release.
See 13 comments below.
On 08.12.2020 23:09, Ilya Kosarev wrote:
> On connection, an evio service callback is invoked to accept it.The
> next step after acception was to process connection to tx thread
1. acception -> acceptance.
Also missing whitespace in the first line after 'it.'.
> through cbus. This meant that any connection interaction involves
> tx thread even before we get to decode what does the client want
> from us. Consequently, a number of problems appears. The main one
> is that we might get descriptor leak in case of unresponsive
> tx thread (for example, when building secondary index).
2. It wasn't a leak. Socket close events were queued infinitely until
tx finished bootstrap. Please, provide more info here. Now it is
just misleading.
On the whole, the commit message is super miser. Imagine how will it
look to somebody who didn't do any reviews and didn't discuss it before.
The problem is far from trivial, but from the commit message it looks
like somebody just forgot a close() somewhere.
> This patch allows iproto to accept connection and send greeting by
> itself. Thus the connection is being established and stays in
> fetch_schema state while tx thread is unresponsive. It solves
> descriptors leakage problem.
>
> Closes #3776
> ---
> src/box/iproto.cc | 147 ++++++++++++++----
> test/app/gh-4787-netbox-empty-errmsg.result | 18 ---
> test/app/gh-4787-netbox-empty-errmsg.test.lua | 8 -
> 3 files changed, 113 insertions(+), 60 deletions(-)
>
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index b8f65e5eca..8c122dc58d 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)
> iproto_resume();
> }
>
> +static inline void
> +iproto_connection_delete(struct iproto_connection *con);
3. This additional declaration is not necessary.
> +
> /**
> * A single global queue for all requests in all connections. All
> * requests from all connections are processed concurrently.
> @@ -453,6 +456,8 @@ struct iproto_connection
> * meaningless.
> */
> size_t parse_size;
> + /** Iproto buffer used to send greeting. */
> + struct iovec iproto_output_buf;
> /**
> * Nubmer of active long polling requests that have already
> * discarded their arguments in order not to stall other
> @@ -566,6 +571,7 @@ iproto_msg_new(struct iproto_connection *con)
> return NULL;
> }
> msg->connection = con;
> + msg->close_connection = false;
4. You don't need this change now. However still looks dangerous. So I would
move it to a separate commit.
> rmean_collect(rmean_net, IPROTO_REQUESTS, 1);
> return msg;
> }
> @@ -1090,6 +1096,51 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher,
> }
> }
>
> +static int
> +iproto_buf_flush(struct iproto_connection *con)
> +{
> + int fd = con->output.fd;
> + ssize_t nwr = sio_writev(fd, &con->iproto_output_buf, 1);
> +
> + if (nwr > 0) {
> + /* Count statistics */
> + rmean_collect(rmean_net, IPROTO_SENT, nwr);
> + return 1;
> + } else if (nwr < 0 && ! sio_wouldblock(errno)) {
5. We don't use whitespaces after unary operators. The same in some
other places. Please, fix all of them.
> + diag_raise();
6. Please, don't use exceptions in the new code. I know you
copy-pasted it from iproto_flush, but it does not mean it is good.
> + }
> +
> + return nwr;
> +}
> +
> +static void
> +iproto_connection_on_greeting(ev_loop *loop, struct ev_io *watcher,
> + int /* revents */)
> +{
> + struct iproto_connection *con = (struct iproto_connection *) watcher->data;
7. This is out of 80 symbols.
> + try {
> + int rc;
> + while ((rc = iproto_buf_flush(con)) <= 0) {
> + if (rc != 0) {
> + ev_io_start(loop, &con->output);
> + return;
> + }
> + }
> + if (ev_is_active(&con->output))
> + ev_io_stop(con->loop, &con->output);
8. ev_io_stop() is safe to call on a non-active ev_watcher. You don't
need this 'if'.
> + ev_io_init(&con->output, iproto_connection_on_output,
> + con->output.fd, EV_WRITE);
> + if (con->input.cb != iproto_connection_on_input)
> + ev_io_init(&con->input, iproto_connection_on_input,
> + con->input.fd, EV_READ);
> + else
> + ev_feed_event(loop, &con->input, EV_READ);
9. This code block above I don't understand. Why is it conditional? Why
cb is not equal to iproto_connection_on_input always? And why do you use
a callback as a flag that it is time to start reading. I thought
you are supposed to use the connection state for that.
> + } catch (Exception *e) {
> + e->log();
> + con->state = IPROTO_CONNECTION_CLOSED;
> + }
> +}
> +
> static struct iproto_connection *
> iproto_connection_new(int fd)
> {
> @@ -1101,8 +1152,8 @@ iproto_connection_new(int fd)
> }
> con->input.data = con->output.data = con;
> con->loop = loop();
> - ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
> - ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
> + ev_io_init(&con->input, NULL, fd, EV_READ);
10. Why did you nullify the callback? Ev_io_init does not lead to event
listen start. It only initializes the struct.
> + ev_io_init(&con->output, iproto_connection_on_greeting, fd, EV_WRITE);
> ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
> ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
> obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
> @@ -1378,13 +1429,14 @@ tx_process_destroy(struct cmsg *m)
> {
> struct iproto_connection *con =
> container_of(m, struct iproto_connection, destroy_msg);
> + assert(con->state == IPROTO_CONNECTION_DESTROYED);
11. Please, introduce the states in a separate commit. I think I
already asked for that when we discussed the previous version,
but not sure.
> if (con->session) {
> session_destroy(con->session);
> con->session = NULL; /* safety */
> }
> /*
> - * Got to be done in iproto thread since
> - * that's where the memory is allocated.
> + * obuf is being destroyed in tx thread cause it is where
> + * it was allocated.
> */
> obuf_destroy(&con->obuf[0]);
> obuf_destroy(&con->obuf[1]);
> @@ -1936,50 +1988,72 @@ net_end_subscribe(struct cmsg *m)
> }
>
> /**
> - * Handshake a connection: invoke the on-connect trigger
> - * and possibly authenticate. Try to send the client an error
> - * upon a failure.
> + * Handshake a connection: send greeting for it.
> + */
> +static void
> +iproto_process_connect(struct iproto_msg *msg)
> +{
> + struct iproto_connection *con = msg->connection;
> + con->iproto_output_buf.iov_base = static_alloc(IPROTO_GREETING_SIZE);
12. There is a serious problem in that allocation. Static allocator is
very volatile. What is returned from static_alloc(), does not belong to
you. This memory is heavily reused, and can be harnessed only for very
short living memory if you are sure there are no other static allocs
around.
Here you allocate a long-living object on it. So it will be just overridden
in case of any serious load, because all new connections will call this
alloc, and the static buffer will be recycled and turned into a pile of
garbage.
> + con->iproto_output_buf.iov_len = IPROTO_GREETING_SIZE;
> + /*
> + * INSTANCE_UUID is guaranteed to be inited before this moment.
> + * We start listening either in local_recovery() or bootstrap().
> + * The INSTANCE_UUID is ensured to be inited in the beginning of
> + * both methods. In case of local_recovery() it is verified that
> + * INSTANCE_UUID was read from the snapshot in memtx_engine_new().
> + * In bootstrap() INSTANCE_UUID is either taken from the
> + * instance_uuid box.cfg{} param or created on the spot.
> + */
> + struct tt_uuid uuid = INSTANCE_UUID;
13. Why do you need to copy it on the stack? Why can't you use pointer
at INSTANCE_UUID variable?
> + random_bytes(con->salt, IPROTO_SALT_SIZE);
> + greeting_encode((char *)con->iproto_output_buf.iov_base,
> + tarantool_version_id(), &uuid,
> + con->salt, IPROTO_SALT_SIZE);
> + assert(evio_has_fd(&con->output));
> + ev_feed_event(con->loop, &con->output, EV_WRITE);
> +}
More information about the Tarantool-patches
mailing list