* [Tarantool-patches] [PATCH v5] iproto: move greeting from tx thread to iproto
@ 2020-12-08 22:09 Ilya Kosarev
2020-12-10 21:52 ` Vladislav Shpilevoy
0 siblings, 1 reply; 3+ messages in thread
From: Ilya Kosarev @ 2020-12-08 22:09 UTC (permalink / raw)
To: v.shpilevoy; +Cc: tarantool-patches
On connection, an evio service callback is invoked to accept it.The
next step after acception was to process connection to tx thread
through cbus. This meant that any connection interaction involves
tx thread even before we get to decode what does the client want
from us. Consequently, a number of problems appears. The main one
is that we might get descriptor leak in case of unresponsive
tx thread (for example, when building secondary index).
This patch allows iproto to accept connection and send greeting by
itself. Thus the connection is being established and stays in
fetch_schema state while tx thread is unresponsive. It solves
descriptors leakage problem.
Closes #3776
---
Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto
Issue: https://github.com/tarantool/tarantool/issues/3776
Changes in v2:
- docbot request provided
- ChangeLog provided
- net_send_msg() used instead of net_send_error() where needed
- error cases made clear in iproto_msg_decode()
- replaced net_check_connection() with iproto_connection_fail() predicate
- improved tx session initialization fail processing to avoid extra tries
- fixed obuf_dup() fail processing in iproto_process_connect()
- added some comments
- some comments style fixed
Changes in v3:
- added braces to the confusing one-line if statement
- updated ChangeLog
- added description for iproto_msg_decode()
- simplified error processing in iproto_msg_decode()
Changes in v4:
- fixed msg->close_connection initialization bug
- fixed false-sharing problem in iproto_connection struct
- added needed assertion
- added needed comments
- names refactoring
- simplified patch a bit: removed extra return value, extra code
Changes in v5:
- reworked to avoid lazy initialization and extra changes
src/box/iproto.cc | 147 ++++++++++++++----
test/app/gh-4787-netbox-empty-errmsg.result | 18 ---
test/app/gh-4787-netbox-empty-errmsg.test.lua | 8 -
3 files changed, 113 insertions(+), 60 deletions(-)
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index b8f65e5eca..8c122dc58d 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)
iproto_resume();
}
+static inline void
+iproto_connection_delete(struct iproto_connection *con);
+
/**
* A single global queue for all requests in all connections. All
* requests from all connections are processed concurrently.
@@ -453,6 +456,8 @@ struct iproto_connection
* meaningless.
*/
size_t parse_size;
+ /** Iproto buffer used to send greeting. */
+ struct iovec iproto_output_buf;
/**
* Nubmer of active long polling requests that have already
* discarded their arguments in order not to stall other
@@ -566,6 +571,7 @@ iproto_msg_new(struct iproto_connection *con)
return NULL;
}
msg->connection = con;
+ msg->close_connection = false;
rmean_collect(rmean_net, IPROTO_REQUESTS, 1);
return msg;
}
@@ -1090,6 +1096,51 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher,
}
}
+static int
+iproto_buf_flush(struct iproto_connection *con)
+{
+ int fd = con->output.fd;
+ ssize_t nwr = sio_writev(fd, &con->iproto_output_buf, 1);
+
+ if (nwr > 0) {
+ /* Count statistics */
+ rmean_collect(rmean_net, IPROTO_SENT, nwr);
+ return 1;
+ } else if (nwr < 0 && ! sio_wouldblock(errno)) {
+ diag_raise();
+ }
+
+ return nwr;
+}
+
+static void
+iproto_connection_on_greeting(ev_loop *loop, struct ev_io *watcher,
+ int /* revents */)
+{
+ struct iproto_connection *con = (struct iproto_connection *) watcher->data;
+ try {
+ int rc;
+ while ((rc = iproto_buf_flush(con)) <= 0) {
+ if (rc != 0) {
+ ev_io_start(loop, &con->output);
+ return;
+ }
+ }
+ if (ev_is_active(&con->output))
+ ev_io_stop(con->loop, &con->output);
+ ev_io_init(&con->output, iproto_connection_on_output,
+ con->output.fd, EV_WRITE);
+ if (con->input.cb != iproto_connection_on_input)
+ ev_io_init(&con->input, iproto_connection_on_input,
+ con->input.fd, EV_READ);
+ else
+ ev_feed_event(loop, &con->input, EV_READ);
+ } catch (Exception *e) {
+ e->log();
+ con->state = IPROTO_CONNECTION_CLOSED;
+ }
+}
+
static struct iproto_connection *
iproto_connection_new(int fd)
{
@@ -1101,8 +1152,8 @@ iproto_connection_new(int fd)
}
con->input.data = con->output.data = con;
con->loop = loop();
- ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
- ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
+ ev_io_init(&con->input, NULL, fd, EV_READ);
+ ev_io_init(&con->output, iproto_connection_on_greeting, fd, EV_WRITE);
ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
@@ -1378,13 +1429,14 @@ tx_process_destroy(struct cmsg *m)
{
struct iproto_connection *con =
container_of(m, struct iproto_connection, destroy_msg);
+ assert(con->state == IPROTO_CONNECTION_DESTROYED);
if (con->session) {
session_destroy(con->session);
con->session = NULL; /* safety */
}
/*
- * Got to be done in iproto thread since
- * that's where the memory is allocated.
+ * obuf is being destroyed in tx thread cause it is where
+ * it was allocated.
*/
obuf_destroy(&con->obuf[0]);
obuf_destroy(&con->obuf[1]);
@@ -1936,50 +1988,72 @@ net_end_subscribe(struct cmsg *m)
}
/**
- * Handshake a connection: invoke the on-connect trigger
- * and possibly authenticate. Try to send the client an error
- * upon a failure.
+ * Handshake a connection: send greeting for it.
+ */
+static void
+iproto_process_connect(struct iproto_msg *msg)
+{
+ struct iproto_connection *con = msg->connection;
+ con->iproto_output_buf.iov_base = static_alloc(IPROTO_GREETING_SIZE);
+ con->iproto_output_buf.iov_len = IPROTO_GREETING_SIZE;
+ /*
+ * INSTANCE_UUID is guaranteed to be inited before this moment.
+ * We start listening either in local_recovery() or bootstrap().
+ * The INSTANCE_UUID is ensured to be inited in the beginning of
+ * both methods. In case of local_recovery() it is verified that
+ * INSTANCE_UUID was read from the snapshot in memtx_engine_new().
+ * In bootstrap() INSTANCE_UUID is either taken from the
+ * instance_uuid box.cfg{} param or created on the spot.
+ */
+ struct tt_uuid uuid = INSTANCE_UUID;
+ random_bytes(con->salt, IPROTO_SALT_SIZE);
+ greeting_encode((char *)con->iproto_output_buf.iov_base,
+ tarantool_version_id(), &uuid,
+ con->salt, IPROTO_SALT_SIZE);
+ assert(evio_has_fd(&con->output));
+ ev_feed_event(con->loop, &con->output, EV_WRITE);
+}
+
+/**
+ * Create the session and invoke the on_connect triggers.
*/
static void
tx_process_connect(struct cmsg *m)
{
struct iproto_msg *msg = (struct iproto_msg *) m;
struct iproto_connection *con = msg->connection;
- struct obuf *out = msg->connection->tx.p_obuf;
- try { /* connect. */
- con->session = session_create(SESSION_TYPE_BINARY);
- if (con->session == NULL)
- diag_raise();
- con->session->meta.connection = con;
- tx_fiber_init(con->session, 0);
- char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
- /* TODO: dirty read from tx thread */
- struct tt_uuid uuid = INSTANCE_UUID;
- random_bytes(con->salt, IPROTO_SALT_SIZE);
- greeting_encode(greeting, tarantool_version_id(), &uuid,
- con->salt, IPROTO_SALT_SIZE);
- obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
- if (! rlist_empty(&session_on_connect)) {
- if (session_run_on_connect_triggers(con->session) != 0)
- diag_raise();
- }
- iproto_wpos_create(&msg->wpos, out);
- } catch (Exception *e) {
+ if (con->state != IPROTO_CONNECTION_ALIVE) {
+ msg->close_connection = true;
+ return;
+ }
+
+ con->session = session_create(SESSION_TYPE_BINARY);
+ if (con->session == NULL) {
tx_reply_error(msg);
msg->close_connection = true;
+ return;
+ }
+ con->session->meta.connection = con;
+
+ tx_fiber_init(con->session, 0);
+ if (! rlist_empty(&session_on_connect)) {
+ if (session_run_on_connect_triggers(con->session) != 0) {
+ tx_reply_error(msg);
+ msg->close_connection = true;
+ }
}
}
/**
- * Send a response to connect to the client or close the
- * connection in case on_connect trigger failed.
+ * Try to send the client an error upon a failure. Start reading
+ * input in case the connection is inited and all good.
*/
static void
-net_send_greeting(struct cmsg *m)
+net_finish_connect(struct cmsg *m)
{
struct iproto_msg *msg = (struct iproto_msg *) m;
struct iproto_connection *con = msg->connection;
- if (msg->close_connection) {
+ if (msg->close_connection || con->state != IPROTO_CONNECTION_ALIVE) {
struct obuf *out = msg->wpos.obuf;
int64_t nwr = sio_writev(con->output.fd, out->iov,
obuf_iovcnt(out));
@@ -1991,6 +2065,7 @@ net_send_greeting(struct cmsg *m)
diag_log();
}
assert(iproto_connection_is_idle(con));
+ con->state = IPROTO_CONNECTION_ALIVE;
iproto_connection_close(con);
iproto_msg_delete(msg);
return;
@@ -2003,13 +2078,17 @@ net_send_greeting(struct cmsg *m)
*/
assert(evio_has_fd(&con->output));
/* Handshake OK, start reading input. */
- ev_feed_event(con->loop, &con->output, EV_WRITE);
+ if (con->input.cb != iproto_connection_on_input)
+ ev_io_init(&con->input, iproto_connection_on_input,
+ con->input.fd, EV_READ);
+ else
+ ev_feed_event(con->loop, &con->input, EV_READ);
iproto_msg_delete(msg);
}
static const struct cmsg_hop connect_route[] = {
{ tx_process_connect, &net_pipe },
- { net_send_greeting, NULL },
+ { net_finish_connect, NULL },
};
/** }}} */
@@ -2040,7 +2119,7 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
cmsg_init(&msg->base, connect_route);
msg->p_ibuf = con->p_ibuf;
msg->wpos = con->wpos;
- msg->close_connection = false;
+ iproto_process_connect(msg);
cpipe_push(&tx_pipe, &msg->base);
return 0;
}
diff --git a/test/app/gh-4787-netbox-empty-errmsg.result b/test/app/gh-4787-netbox-empty-errmsg.result
index d30337a050..6389b27bc8 100644
--- a/test/app/gh-4787-netbox-empty-errmsg.result
+++ b/test/app/gh-4787-netbox-empty-errmsg.result
@@ -38,24 +38,6 @@ req_during_auth()
| - Connection is not established, state is "auth"
| ...
--- Check the same for 'initial' state.
-ok, err = nil
- | ---
- | ...
-do \
- c = netbox.connect(box.cfg.listen, {wait_connected = false}) \
- ok, err = pcall(c.call, c, 'echo', {}, {is_async = true}) \
-end
- | ---
- | ...
-ok, err
- | ---
- | - false
- | - Connection is not established, state is "initial"
- | ...
-c:close()
- | ---
- | ...
box.schema.user.drop('test')
| ---
| ...
diff --git a/test/app/gh-4787-netbox-empty-errmsg.test.lua b/test/app/gh-4787-netbox-empty-errmsg.test.lua
index 0eecaa1bf0..55ea43f26f 100755
--- a/test/app/gh-4787-netbox-empty-errmsg.test.lua
+++ b/test/app/gh-4787-netbox-empty-errmsg.test.lua
@@ -21,12 +21,4 @@ end
req_during_auth()
--- Check the same for 'initial' state.
-ok, err = nil
-do \
- c = netbox.connect(box.cfg.listen, {wait_connected = false}) \
- ok, err = pcall(c.call, c, 'echo', {}, {is_async = true}) \
-end
-ok, err
-c:close()
box.schema.user.drop('test')
--
2.17.1
^ permalink raw reply [flat|nested] 3+ messages in thread
* Re: [Tarantool-patches] [PATCH v5] iproto: move greeting from tx thread to iproto
2020-12-08 22:09 [Tarantool-patches] [PATCH v5] iproto: move greeting from tx thread to iproto Ilya Kosarev
@ 2020-12-10 21:52 ` Vladislav Shpilevoy
2020-12-12 23:38 ` Ilya Kosarev
0 siblings, 1 reply; 3+ messages in thread
From: Vladislav Shpilevoy @ 2020-12-10 21:52 UTC (permalink / raw)
To: Ilya Kosarev; +Cc: tarantool-patches
Hi! Thanks for the patch!
I suggest you to invite Alexander L. to start a second review. We
don't have much time until release.
See 13 comments below.
On 08.12.2020 23:09, Ilya Kosarev wrote:
> On connection, an evio service callback is invoked to accept it.The
> next step after acception was to process connection to tx thread
1. acception -> acceptance.
Also missing whitespace in the first line after 'it.'.
> through cbus. This meant that any connection interaction involves
> tx thread even before we get to decode what does the client want
> from us. Consequently, a number of problems appears. The main one
> is that we might get descriptor leak in case of unresponsive
> tx thread (for example, when building secondary index).
2. It wasn't a leak. Socket close events were queued infinitely until
tx finished bootstrap. Please, provide more info here. Now it is
just misleading.
On the whole, the commit message is super miser. Imagine how will it
look to somebody who didn't do any reviews and didn't discuss it before.
The problem is far from trivial, but from the commit message it looks
like somebody just forgot a close() somewhere.
> This patch allows iproto to accept connection and send greeting by
> itself. Thus the connection is being established and stays in
> fetch_schema state while tx thread is unresponsive. It solves
> descriptors leakage problem.
>
> Closes #3776
> ---
> src/box/iproto.cc | 147 ++++++++++++++----
> test/app/gh-4787-netbox-empty-errmsg.result | 18 ---
> test/app/gh-4787-netbox-empty-errmsg.test.lua | 8 -
> 3 files changed, 113 insertions(+), 60 deletions(-)
>
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index b8f65e5eca..8c122dc58d 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)
> iproto_resume();
> }
>
> +static inline void
> +iproto_connection_delete(struct iproto_connection *con);
3. This additional declaration is not necessary.
> +
> /**
> * A single global queue for all requests in all connections. All
> * requests from all connections are processed concurrently.
> @@ -453,6 +456,8 @@ struct iproto_connection
> * meaningless.
> */
> size_t parse_size;
> + /** Iproto buffer used to send greeting. */
> + struct iovec iproto_output_buf;
> /**
> * Nubmer of active long polling requests that have already
> * discarded their arguments in order not to stall other
> @@ -566,6 +571,7 @@ iproto_msg_new(struct iproto_connection *con)
> return NULL;
> }
> msg->connection = con;
> + msg->close_connection = false;
4. You don't need this change now. However still looks dangerous. So I would
move it to a separate commit.
> rmean_collect(rmean_net, IPROTO_REQUESTS, 1);
> return msg;
> }
> @@ -1090,6 +1096,51 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher,
> }
> }
>
> +static int
> +iproto_buf_flush(struct iproto_connection *con)
> +{
> + int fd = con->output.fd;
> + ssize_t nwr = sio_writev(fd, &con->iproto_output_buf, 1);
> +
> + if (nwr > 0) {
> + /* Count statistics */
> + rmean_collect(rmean_net, IPROTO_SENT, nwr);
> + return 1;
> + } else if (nwr < 0 && ! sio_wouldblock(errno)) {
5. We don't use whitespaces after unary operators. The same in some
other places. Please, fix all of them.
> + diag_raise();
6. Please, don't use exceptions in the new code. I know you
copy-pasted it from iproto_flush, but it does not mean it is good.
> + }
> +
> + return nwr;
> +}
> +
> +static void
> +iproto_connection_on_greeting(ev_loop *loop, struct ev_io *watcher,
> + int /* revents */)
> +{
> + struct iproto_connection *con = (struct iproto_connection *) watcher->data;
7. This is out of 80 symbols.
> + try {
> + int rc;
> + while ((rc = iproto_buf_flush(con)) <= 0) {
> + if (rc != 0) {
> + ev_io_start(loop, &con->output);
> + return;
> + }
> + }
> + if (ev_is_active(&con->output))
> + ev_io_stop(con->loop, &con->output);
8. ev_io_stop() is safe to call on a non-active ev_watcher. You don't
need this 'if'.
> + ev_io_init(&con->output, iproto_connection_on_output,
> + con->output.fd, EV_WRITE);
> + if (con->input.cb != iproto_connection_on_input)
> + ev_io_init(&con->input, iproto_connection_on_input,
> + con->input.fd, EV_READ);
> + else
> + ev_feed_event(loop, &con->input, EV_READ);
9. This code block above I don't understand. Why is it conditional? Why
cb is not equal to iproto_connection_on_input always? And why do you use
a callback as a flag that it is time to start reading. I thought
you are supposed to use the connection state for that.
> + } catch (Exception *e) {
> + e->log();
> + con->state = IPROTO_CONNECTION_CLOSED;
> + }
> +}
> +
> static struct iproto_connection *
> iproto_connection_new(int fd)
> {
> @@ -1101,8 +1152,8 @@ iproto_connection_new(int fd)
> }
> con->input.data = con->output.data = con;
> con->loop = loop();
> - ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
> - ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
> + ev_io_init(&con->input, NULL, fd, EV_READ);
10. Why did you nullify the callback? Ev_io_init does not lead to event
listen start. It only initializes the struct.
> + ev_io_init(&con->output, iproto_connection_on_greeting, fd, EV_WRITE);
> ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
> ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
> obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
> @@ -1378,13 +1429,14 @@ tx_process_destroy(struct cmsg *m)
> {
> struct iproto_connection *con =
> container_of(m, struct iproto_connection, destroy_msg);
> + assert(con->state == IPROTO_CONNECTION_DESTROYED);
11. Please, introduce the states in a separate commit. I think I
already asked for that when we discussed the previous version,
but not sure.
> if (con->session) {
> session_destroy(con->session);
> con->session = NULL; /* safety */
> }
> /*
> - * Got to be done in iproto thread since
> - * that's where the memory is allocated.
> + * obuf is being destroyed in tx thread cause it is where
> + * it was allocated.
> */
> obuf_destroy(&con->obuf[0]);
> obuf_destroy(&con->obuf[1]);
> @@ -1936,50 +1988,72 @@ net_end_subscribe(struct cmsg *m)
> }
>
> /**
> - * Handshake a connection: invoke the on-connect trigger
> - * and possibly authenticate. Try to send the client an error
> - * upon a failure.
> + * Handshake a connection: send greeting for it.
> + */
> +static void
> +iproto_process_connect(struct iproto_msg *msg)
> +{
> + struct iproto_connection *con = msg->connection;
> + con->iproto_output_buf.iov_base = static_alloc(IPROTO_GREETING_SIZE);
12. There is a serious problem in that allocation. Static allocator is
very volatile. What is returned from static_alloc(), does not belong to
you. This memory is heavily reused, and can be harnessed only for very
short living memory if you are sure there are no other static allocs
around.
Here you allocate a long-living object on it. So it will be just overridden
in case of any serious load, because all new connections will call this
alloc, and the static buffer will be recycled and turned into a pile of
garbage.
> + con->iproto_output_buf.iov_len = IPROTO_GREETING_SIZE;
> + /*
> + * INSTANCE_UUID is guaranteed to be inited before this moment.
> + * We start listening either in local_recovery() or bootstrap().
> + * The INSTANCE_UUID is ensured to be inited in the beginning of
> + * both methods. In case of local_recovery() it is verified that
> + * INSTANCE_UUID was read from the snapshot in memtx_engine_new().
> + * In bootstrap() INSTANCE_UUID is either taken from the
> + * instance_uuid box.cfg{} param or created on the spot.
> + */
> + struct tt_uuid uuid = INSTANCE_UUID;
13. Why do you need to copy it on the stack? Why can't you use pointer
at INSTANCE_UUID variable?
> + random_bytes(con->salt, IPROTO_SALT_SIZE);
> + greeting_encode((char *)con->iproto_output_buf.iov_base,
> + tarantool_version_id(), &uuid,
> + con->salt, IPROTO_SALT_SIZE);
> + assert(evio_has_fd(&con->output));
> + ev_feed_event(con->loop, &con->output, EV_WRITE);
> +}
^ permalink raw reply [flat|nested] 3+ messages in thread
* Re: [Tarantool-patches] [PATCH v5] iproto: move greeting from tx thread to iproto
2020-12-10 21:52 ` Vladislav Shpilevoy
@ 2020-12-12 23:38 ` Ilya Kosarev
0 siblings, 0 replies; 3+ messages in thread
From: Ilya Kosarev @ 2020-12-12 23:38 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
[-- Attachment #1: Type: text/plain, Size: 9281 bytes --]
Hi,
Thanks for your review!
Sent v6 of the patch (patchset) considering your comments. Some answers below.
>Пятница, 11 декабря 2020, 0:52 +03:00 от Vladislav Shpilevoy <v.shpilevoy@tarantool.org>:
>
>Hi! Thanks for the patch!
>
>I suggest you to invite Alexander L. to start a second review. We
>don't have much time until release.
>
>See 13 comments below.
>
>On 08.12.2020 23:09, Ilya Kosarev wrote:
>> On connection, an evio service callback is invoked to accept it.The
>> next step after acception was to process connection to tx thread
>
>1. acception -> acceptance.
Ok, right, fixed in v6 of the patch.
>
>Also missing whitespace in the first line after 'it.'.
>
>> through cbus. This meant that any connection interaction involves
>> tx thread even before we get to decode what does the client want
>> from us. Consequently, a number of problems appears. The main one
>> is that we might get descriptor leak in case of unresponsive
>> tx thread (for example, when building secondary index).
>
>2. It wasn't a leak. Socket close events were queued infinitely until
>tx finished bootstrap. Please, provide more info here. Now it is
>just misleading.
>
>On the whole, the commit message is super miser. Imagine how will it
>look to somebody who didn't do any reviews and didn't discuss it before.
>The problem is far from trivial, but from the commit message it looks
>like somebody just forgot a close() somewhere.
Ok, the commit message is rewritten in v6.
>
>> This patch allows iproto to accept connection and send greeting by
>> itself. Thus the connection is being established and stays in
>> fetch_schema state while tx thread is unresponsive. It solves
>> descriptors leakage problem.
>>
>> Closes #3776
>> ---
>> src/box/iproto.cc | 147 ++++++++++++++----
>> test/app/gh-4787-netbox-empty-errmsg.result | 18 ---
>> test/app/gh-4787-netbox-empty-errmsg.test.lua | 8 -
>> 3 files changed, 113 insertions(+), 60 deletions(-)
>>
>> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
>> index b8f65e5eca..8c122dc58d 100644
>> --- a/src/box/iproto.cc
>> +++ b/src/box/iproto.cc
>> @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)
>> iproto_resume();
>> }
>>
>> +static inline void
>> +iproto_connection_delete(struct iproto_connection *con);
>
>3. This additional declaration is not necessary.
Right, it was extra change, removed in v6.
>
>> +
>> /**
>> * A single global queue for all requests in all connections. All
>> * requests from all connections are processed concurrently.
>> @@ -453,6 +456,8 @@ struct iproto_connection
>> * meaningless.
>> */
>> size_t parse_size;
>> + /** Iproto buffer used to send greeting. */
>> + struct iovec iproto_output_buf;
>> /**
>> * Nubmer of active long polling requests that have already
>> * discarded their arguments in order not to stall other
>> @@ -566,6 +571,7 @@ iproto_msg_new(struct iproto_connection *con)
>> return NULL;
>> }
>> msg->connection = con;
>> + msg->close_connection = false;
>
>4. You don't need this change now. However still looks dangerous. So I would
>move it to a separate commit.
Ok, moved to separate commit.
>
>> rmean_collect(rmean_net, IPROTO_REQUESTS, 1);
>> return msg;
>> }
>> @@ -1090,6 +1096,51 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher,
>> }
>> }
>>
>> +static int
>> +iproto_buf_flush(struct iproto_connection *con)
>> +{
>> + int fd = con->output.fd;
>> + ssize_t nwr = sio_writev(fd, &con->iproto_output_buf, 1);
>> +
>> + if (nwr > 0) {
>> + /* Count statistics */
>> + rmean_collect(rmean_net, IPROTO_SENT, nwr);
>> + return 1;
>> + } else if (nwr < 0 && ! sio_wouldblock(errno)) {
>
>5. We don't use whitespaces after unary operators. The same in some
>other places. Please, fix all of them.
Right, copied old codestyle, fixed in v6.
>
>> + diag_raise();
>
>6. Please, don't use exceptions in the new code. I know you
>copy-pasted it from iproto_flush, but it does not mean it is good.
>
>> + }
>> +
>> + return nwr;
>> +}
>> +
>> +static void
>> +iproto_connection_on_greeting(ev_loop *loop, struct ev_io *watcher,
>> + int /* revents */)
>> +{
>> + struct iproto_connection *con = (struct iproto_connection *) watcher->data;
>
>7. This is out of 80 symbols.
Right, fixed in v6.
>
>> + try {
>> + int rc;
>> + while ((rc = iproto_buf_flush(con)) <= 0) {
>> + if (rc != 0) {
>> + ev_io_start(loop, &con->output);
>> + return;
>> + }
>> + }
>> + if (ev_is_active(&con->output))
>> + ev_io_stop(con->loop, &con->output);
>
>8. ev_io_stop() is safe to call on a non-active ev_watcher. You don't
>need this 'if'.
Right, fixed in v6.
>
>> + ev_io_init(&con->output, iproto_connection_on_output,
>> + con->output.fd, EV_WRITE);
>> + if (con->input.cb != iproto_connection_on_input)
>> + ev_io_init(&con->input, iproto_connection_on_input,
>> + con->input.fd, EV_READ);
>> + else
>> + ev_feed_event(loop, &con->input, EV_READ);
>
>9. This code block above I don't understand. Why is it conditional? Why
>cb is not equal to iproto_connection_on_input always? And why do you use
>a callback as a flag that it is time to start reading. I thought
>you are supposed to use the connection state for that.
Right, it is not really needed. Fixed in v6.
>
>> + } catch (Exception *e) {
>> + e->log();
>> + con->state = IPROTO_CONNECTION_CLOSED;
>> + }
>> +}
>> +
>> static struct iproto_connection *
>> iproto_connection_new(int fd)
>> {
>> @@ -1101,8 +1152,8 @@ iproto_connection_new(int fd)
>> }
>> con->input.data = con->output.data = con;
>> con->loop = loop();
>> - ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
>> - ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
>> + ev_io_init(&con->input, NULL, fd, EV_READ);
>
>10. Why did you nullify the callback? Ev_io_init does not lead to event
>listen start. It only initializes the struct.
Well, I was using it as a flag. But it is neither good nor really needed, fixed in v6.
>
>> + ev_io_init(&con->output, iproto_connection_on_greeting, fd, EV_WRITE);
>> ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
>> ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
>> obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
>> @@ -1378,13 +1429,14 @@ tx_process_destroy(struct cmsg *m)
>> {
>> struct iproto_connection *con =
>> container_of(m, struct iproto_connection, destroy_msg);
>> + assert(con->state == IPROTO_CONNECTION_DESTROYED);
>
>11. Please, introduce the states in a separate commit. I think I
>already asked for that when we discussed the previous version,
>but not sure.
Nothing is introduced, it is just a comment fixed and assertion added.
Moved to separate commit.
>
>> if (con->session) {
>> session_destroy(con->session);
>> con->session = NULL; /* safety */
>> }
>> /*
>> - * Got to be done in iproto thread since
>> - * that's where the memory is allocated.
>> + * obuf is being destroyed in tx thread cause it is where
>> + * it was allocated.
>> */
>> obuf_destroy(&con->obuf[0]);
>> obuf_destroy(&con->obuf[1]);
>> @@ -1936,50 +1988,72 @@ net_end_subscribe(struct cmsg *m)
>> }
>>
>> /**
>> - * Handshake a connection: invoke the on-connect trigger
>> - * and possibly authenticate. Try to send the client an error
>> - * upon a failure.
>> + * Handshake a connection: send greeting for it.
>> + */
>> +static void
>> +iproto_process_connect(struct iproto_msg *msg)
>> +{
>> + struct iproto_connection *con = msg->connection;
>> + con->iproto_output_buf.iov_base = static_alloc(IPROTO_GREETING_SIZE);
>
>12. There is a serious problem in that allocation. Static allocator is
>very volatile. What is returned from static_alloc(), does not belong to
>you. This memory is heavily reused, and can be harnessed only for very
>short living memory if you are sure there are no other static allocs
>around.
>
>Here you allocate a long-living object on it. So it will be just overridden
>in case of any serious load, because all new connections will call this
>alloc, and the static buffer will be recycled and turned into a pile of
>garbage.
All right, my bad, I decided to put it into struct iproto_connection as char
buffer as long as it is already 2176, and thus with greeting buf it is 2304.
struct iproto connection itself is allocated with mempool on cord slab cache.
>
>> + con->iproto_output_buf.iov_len = IPROTO_GREETING_SIZE;
>> + /*
>> + * INSTANCE_UUID is guaranteed to be inited before this moment.
>> + * We start listening either in local_recovery() or bootstrap().
>> + * The INSTANCE_UUID is ensured to be inited in the beginning of
>> + * both methods. In case of local_recovery() it is verified that
>> + * INSTANCE_UUID was read from the snapshot in memtx_engine_new().
>> + * In bootstrap() INSTANCE_UUID is either taken from the
>> + * instance_uuid box.cfg{} param or created on the spot.
>> + */
>> + struct tt_uuid uuid = INSTANCE_UUID;
>
>13. Why do you need to copy it on the stack? Why can't you use pointer
>at INSTANCE_UUID variable?
Right, fixed in v6.
>
>> + random_bytes(con->salt, IPROTO_SALT_SIZE);
>> + greeting_encode((char *)con->iproto_output_buf.iov_base,
>> + tarantool_version_id(), &uuid,
>> + con->salt, IPROTO_SALT_SIZE);
>> + assert(evio_has_fd(&con->output));
>> + ev_feed_event(con->loop, &con->output, EV_WRITE);
>> +}
--
Ilya Kosarev
[-- Attachment #2: Type: text/html, Size: 13264 bytes --]
^ permalink raw reply [flat|nested] 3+ messages in thread
end of thread, other threads:[~2020-12-12 23:38 UTC | newest]
Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-12-08 22:09 [Tarantool-patches] [PATCH v5] iproto: move greeting from tx thread to iproto Ilya Kosarev
2020-12-10 21:52 ` Vladislav Shpilevoy
2020-12-12 23:38 ` Ilya Kosarev
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox