[Tarantool-patches] [PATCH v5] iproto: move greeting from tx thread to iproto

Ilya Kosarev i.kosarev at tarantool.org
Sun Dec 13 02:38:57 MSK 2020


Hi,
Thanks for your review!
 
Sent v6 of the patch (patchset) considering your comments. Some answers below. 
>Пятница, 11 декабря 2020, 0:52 +03:00 от Vladislav Shpilevoy <v.shpilevoy at tarantool.org>:
> 
>Hi! Thanks for the patch!
>
>I suggest you to invite Alexander L. to start a second review. We
>don't have much time until release.
>
>See 13 comments below.
>
>On 08.12.2020 23:09, Ilya Kosarev wrote:
>> On connection, an evio service callback is invoked to accept it.The
>> next step after acception was to process connection to tx thread
>
>1. acception -> acceptance.
Ok, right, fixed in v6 of the patch.
>
>Also missing whitespace in the first line after 'it.'.
>
>> through cbus. This meant that any connection interaction involves
>> tx thread even before we get to decode what does the client want
>> from us. Consequently, a number of problems appears. The main one
>> is that we might get descriptor leak in case of unresponsive
>> tx thread (for example, when building secondary index).
>
>2. It wasn't a leak. Socket close events were queued infinitely until
>tx finished bootstrap. Please, provide more info here. Now it is
>just misleading.
>
>On the whole, the commit message is super miser. Imagine how will it
>look to somebody who didn't do any reviews and didn't discuss it before.
>The problem is far from trivial, but from the commit message it looks
>like somebody just forgot a close() somewhere.
Ok, the commit message is rewritten in v6.
>
>> This patch allows iproto to accept connection and send greeting by
>> itself. Thus the connection is being established and stays in
>> fetch_schema state while tx thread is unresponsive. It solves
>> descriptors leakage problem.
>>
>> Closes #3776
>> ---
>> src/box/iproto.cc | 147 ++++++++++++++----
>> test/app/gh-4787-netbox-empty-errmsg.result | 18 ---
>> test/app/gh-4787-netbox-empty-errmsg.test.lua | 8 -
>> 3 files changed, 113 insertions(+), 60 deletions(-)
>>
>> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
>> index b8f65e5eca..8c122dc58d 100644
>> --- a/src/box/iproto.cc
>> +++ b/src/box/iproto.cc
>> @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)
>> iproto_resume();
>> }
>>
>> +static inline void
>> +iproto_connection_delete(struct iproto_connection *con);
>
>3. This additional declaration is not necessary.
Right, it was extra change, removed in v6.
>
>> +
>> /**
>> * A single global queue for all requests in all connections. All
>> * requests from all connections are processed concurrently.
>> @@ -453,6 +456,8 @@ struct iproto_connection
>> * meaningless.
>> */
>> size_t parse_size;
>> + /** Iproto buffer used to send greeting. */
>> + struct iovec iproto_output_buf;
>> /**
>> * Nubmer of active long polling requests that have already
>> * discarded their arguments in order not to stall other
>> @@ -566,6 +571,7 @@ iproto_msg_new(struct iproto_connection *con)
>> return NULL;
>> }
>> msg->connection = con;
>> + msg->close_connection = false;
>
>4. You don't need this change now. However still looks dangerous. So I would
>move it to a separate commit.
Ok, moved to separate commit.
 
>
>> rmean_collect(rmean_net, IPROTO_REQUESTS, 1);
>> return msg;
>> }
>> @@ -1090,6 +1096,51 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher,
>> }
>> }
>>
>> +static int
>> +iproto_buf_flush(struct iproto_connection *con)
>> +{
>> + int fd = con->output.fd;
>> + ssize_t nwr = sio_writev(fd, &con->iproto_output_buf, 1);
>> +
>> + if (nwr > 0) {
>> + /* Count statistics */
>> + rmean_collect(rmean_net, IPROTO_SENT, nwr);
>> + return 1;
>> + } else if (nwr < 0 && ! sio_wouldblock(errno)) {
>
>5. We don't use whitespaces after unary operators. The same in some
>other places. Please, fix all of them.
Right, copied old codestyle, fixed in v6.
>
>> + diag_raise();
>
>6. Please, don't use exceptions in the new code. I know you
>copy-pasted it from iproto_flush, but it does not mean it is good.
>
>> + }
>> +
>> + return nwr;
>> +}
>> +
>> +static void
>> +iproto_connection_on_greeting(ev_loop *loop, struct ev_io *watcher,
>> + int /* revents */)
>> +{
>> + struct iproto_connection *con = (struct iproto_connection *) watcher->data;
>
>7. This is out of 80 symbols.
Right, fixed in v6.
>
>> + try {
>> + int rc;
>> + while ((rc = iproto_buf_flush(con)) <= 0) {
>> + if (rc != 0) {
>> + ev_io_start(loop, &con->output);
>> + return;
>> + }
>> + }
>> + if (ev_is_active(&con->output))
>> + ev_io_stop(con->loop, &con->output);
>
>8. ev_io_stop() is safe to call on a non-active ev_watcher. You don't
>need this 'if'.
Right, fixed in v6.
>
>> + ev_io_init(&con->output, iproto_connection_on_output,
>> + con->output.fd, EV_WRITE);
>> + if (con->input.cb != iproto_connection_on_input)
>> + ev_io_init(&con->input, iproto_connection_on_input,
>> + con->input.fd, EV_READ);
>> + else
>> + ev_feed_event(loop, &con->input, EV_READ);
>
>9. This code block above I don't understand. Why is it conditional? Why
>cb is not equal to iproto_connection_on_input always? And why do you use
>a callback as a flag that it is time to start reading. I thought
>you are supposed to use the connection state for that.
Right, it is not really needed. Fixed in v6.
>
>> + } catch (Exception *e) {
>> + e->log();
>> + con->state = IPROTO_CONNECTION_CLOSED;
>> + }
>> +}
>> +
>> static struct iproto_connection *
>> iproto_connection_new(int fd)
>> {
>> @@ -1101,8 +1152,8 @@ iproto_connection_new(int fd)
>> }
>> con->input.data = con->output.data = con;
>> con->loop = loop();
>> - ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
>> - ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
>> + ev_io_init(&con->input, NULL, fd, EV_READ);
>
>10. Why did you nullify the callback? Ev_io_init does not lead to event
>listen start. It only initializes the struct.
Well, I was using it as a flag. But it is neither good nor really needed, fixed in v6.
>
>> + ev_io_init(&con->output, iproto_connection_on_greeting, fd, EV_WRITE);
>> ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
>> ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
>> obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
>> @@ -1378,13 +1429,14 @@ tx_process_destroy(struct cmsg *m)
>> {
>> struct iproto_connection *con =
>> container_of(m, struct iproto_connection, destroy_msg);
>> + assert(con->state == IPROTO_CONNECTION_DESTROYED);
>
>11. Please, introduce the states in a separate commit. I think I
>already asked for that when we discussed the previous version,
>but not sure.
Nothing is introduced, it is just a comment fixed and assertion added.
Moved to separate commit.
>
>> if (con->session) {
>> session_destroy(con->session);
>> con->session = NULL; /* safety */
>> }
>> /*
>> - * Got to be done in iproto thread since
>> - * that's where the memory is allocated.
>> + * obuf is being destroyed in tx thread cause it is where
>> + * it was allocated.
>> */
>> obuf_destroy(&con->obuf[0]);
>> obuf_destroy(&con->obuf[1]);
>> @@ -1936,50 +1988,72 @@ net_end_subscribe(struct cmsg *m)
>> }
>>
>> /**
>> - * Handshake a connection: invoke the on-connect trigger
>> - * and possibly authenticate. Try to send the client an error
>> - * upon a failure.
>> + * Handshake a connection: send greeting for it.
>> + */
>> +static void
>> +iproto_process_connect(struct iproto_msg *msg)
>> +{
>> + struct iproto_connection *con = msg->connection;
>> + con->iproto_output_buf.iov_base = static_alloc(IPROTO_GREETING_SIZE);
>
>12. There is a serious problem in that allocation. Static allocator is
>very volatile. What is returned from static_alloc(), does not belong to
>you. This memory is heavily reused, and can be harnessed only for very
>short living memory if you are sure there are no other static allocs
>around.
>
>Here you allocate a long-living object on it. So it will be just overridden
>in case of any serious load, because all new connections will call this
>alloc, and the static buffer will be recycled and turned into a pile of
>garbage.
All right, my bad, I decided to put it into struct iproto_connection as char
buffer as long as it is already 2176, and thus with greeting buf it is 2304.
struct iproto connection itself is allocated with mempool on cord slab cache.
>
>> + con->iproto_output_buf.iov_len = IPROTO_GREETING_SIZE;
>> + /*
>> + * INSTANCE_UUID is guaranteed to be inited before this moment.
>> + * We start listening either in local_recovery() or bootstrap().
>> + * The INSTANCE_UUID is ensured to be inited in the beginning of
>> + * both methods. In case of local_recovery() it is verified that
>> + * INSTANCE_UUID was read from the snapshot in memtx_engine_new().
>> + * In bootstrap() INSTANCE_UUID is either taken from the
>> + * instance_uuid box.cfg{} param or created on the spot.
>> + */
>> + struct tt_uuid uuid = INSTANCE_UUID;
>
>13. Why do you need to copy it on the stack? Why can't you use pointer
>at INSTANCE_UUID variable?
Right, fixed in v6.
>
>> + random_bytes(con->salt, IPROTO_SALT_SIZE);
>> + greeting_encode((char *)con->iproto_output_buf.iov_base,
>> + tarantool_version_id(), &uuid,
>> + con->salt, IPROTO_SALT_SIZE);
>> + assert(evio_has_fd(&con->output));
>> + ev_feed_event(con->loop, &con->output, EV_WRITE);
>> +}
 
--
Ilya Kosarev
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20201213/36274f33/attachment.html>


More information about the Tarantool-patches mailing list