[Tarantool-patches] [PATCH v3] iproto: make iproto thread more independent from tx
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Fri Oct 2 02:45:51 MSK 2020
Hi! Thanks for the patch! Very nice! Extreme niceness! Optimistic connection handling!
See 17 small minor comments below.
1. The branch fails quite hard all the iproto-related tests on my machine,
some with crashes. Also when I tried to run it in the console, I get
'Peer closed' right after netbox.connect.
However I see CI green, and I am not sure why does it pass there. On my
machine all networking it totally broken on your branch.
I fixed it by adding this:
@@ -578,6 +578,7 @@ iproto_msg_new(struct iproto_connection *con)
return NULL;
}
msg->connection = con;
+ msg->close_connection = false;
rmean_collect(rmean_net, IPROTO_REQUESTS, 1);
return msg;
}
It seems it is your bug, because previously it was used only on
connection, and initialized only in iproto_on_accept(). Now you
use it on each request, but not initialize on each request. Also this
should have been a huge luck it didn't fail on any CI job.
On 26.09.2020 00:53, Ilya Kosarev wrote:
> On connection, an evio service callback is invoked to accept it. The
> next step after acception was to process connection to tx thread
> through cbus. This meant that any connection interaction involves
> tx thread even before we get to decode what does the client want
> from us. Consequently, a number of problems appears. The main one
> is that we might get descriptor leak in case of unresponsive tx thread
> (for example, when building secondary index).
2. It is not a leak really. Just too many clients waiting for being
processed. Descriptors were not lost completely.
As for the issue - what was the problem not to start listening until
recovery is complete?
And how does this patch solve it, if the connections are still kept,
and so their descriptors? How did you validate the problem was gone?
It seems the descriptors still are created, and still are not closed.
If the problem is not gone, that why does this patch does what it does?
> There are some other
> cases where we might not want to spend precious tx time to process
> the connection in case iproto can do it all alone.
3. What are these other cases? I can think only of SWIM in iproto thread,
but it is not related at all.
> This patch allows iproto to accept connection and send greeting by
> itself. The connection is initialized in tx thread when the real
> request comes through iproto_msg_decode(). In case request type was not
> recognized we can also send reply with an error without using tx. It is
> planned to add more iproto logic to prevent extra interaction with
> tx thread. This patch already to some extent solves descriptors leakage
> problem as far as connection establishes and stays in fetch_schema
> state while tx thread is unresponsive.
> The other user visible change is that on_connect triggers won't run on
> connections that don't provide any input, as reflected in
> bad_trigger.test.py.
>
> Part of #3776
>
> @TarantoolBot document
> Title: iproto: on_connect triggers execution
> Update the documentation for on_connect triggers to reflect that they
> are now being executed only with the first request. Though the triggers
> are still the first thing to be executed on a new connection. While it
> is quite a synthetic case to establish a connection without making
> any requests it is technically possible and now your triggers won't be
> executed in this case. Some request is required to start their
> execution.
4. I am not sure I understand. According to 3776 there was a problem, that
the connections were made. So they didn't do any requests? And the
case is not that syntetic as it seems?
What was the problem to preserve the original behaviour? For example,
you could make the on_connect execution delayed. TX thread could tell
when recovery is done, and iproto would send the pending connections to it.
> src/box/iproto.cc | 317 +++++++++++++++++++++-----------
> test/box-py/bad_trigger.result | 1 +
> test/box-py/bad_trigger.test.py | 22 ++-
> test/sql/prepared.result | 4 +
> test/sql/prepared.test.lua | 1 +
> 5 files changed, 239 insertions(+), 106 deletions(-)
>
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index b8f65e5ec..9f98fce86 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -463,6 +474,7 @@ struct iproto_connection
> struct ev_io output;
> /** Logical session. */
> struct session *session;
> + bool init_failed;
5. Flags need 'is_' prefix.
6. What is this? Why is it needed? Usually we leave comments
to each struct member. Otherwise it is hard to understand why
some of them are needed.
7. The field is accessed from tx thread only, and is close to
fields mutated from iproto thread. That leads to false-sharing
problem. Please, move it to iproto_connection.tx sub-struct.
> ev_loop *loop;
> /**
> * Pre-allocated disconnect msg. Is sent right after
> @@ -677,9 +700,19 @@ iproto_connection_close(struct iproto_connection *con)
> * is done only once.
> */
> con->p_ibuf->wpos -= con->parse_size;
> - cpipe_push(&tx_pipe, &con->disconnect_msg);
> assert(con->state == IPROTO_CONNECTION_ALIVE);
> con->state = IPROTO_CONNECTION_CLOSED;
> + rlist_del(&con->in_stop_list);
> + if (con->session != NULL) {
> + cpipe_push(&tx_pipe, &con->disconnect_msg);
> + } else {
> + /*
> + * In case session was not created we can safely
> + * try to start destroy not involving tx thread.
> + */
> + iproto_connection_try_to_start_destroy(con);
> + }
> + return;
8. You wouldn't need the 'return' and wouldn't need to duplicate
'rlist_del(&con->in_stop_list);' call, if you would move
'rlist_del(&con->in_stop_list);' to the beginning of this function
from its end.
> } else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
> iproto_connection_try_to_start_destroy(con);
> } else {
> @@ -809,6 +842,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
> assert(rlist_empty(&con->in_stop_list));
> int n_requests = 0;
> bool stop_input = false;
> + bool obuf_in_iproto = (con->session == NULL);
9. We always name flags starting from 'is_' prefix or a similar one like 'has_',
'does_'. Please, follow that agreement.
> const char *errmsg;
> while (con->parse_size != 0 && !stop_input) {
> if (iproto_check_msg_max()) {
> @@ -1314,13 +1367,24 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
> (uint32_t) type);
> goto error;
> }
> - return;
> + return 0;
> error:
> /** Log and send the error. */
> diag_log();
> - diag_create(&msg->diag);
> - diag_move(&fiber()->diag, &msg->diag);
> - cmsg_init(&msg->base, error_route);
> + if (msg->connection->session != NULL) {
> + diag_create(&msg->diag);
> + diag_move(&fiber()->diag, &msg->diag);
> + cmsg_init(&msg->base, error_route);
> + return 1;
10. Why do you need 3 return values? You only check for -1. So you
could return 0 here like it was before, and -1 below. In the
calling code you would check != 0 like we do everywhere.
> + }
> + /*
> + * In case session was not created we can process error path
> + * without tx thread.
> + */
> + tx_accept_wpos(msg->connection, &msg->wpos);
> + tx_reply_error(msg);
11. There is a naming convention, that all tx_ functions are called in
TX thread always. All net_ and iproto_ functions are called in IProto
thread. Lets not violate the rule and obfuscate the iproto.cc code
even more that it is. Please, come up with new names reflecting where
the functions are used. Or at least not specifying it as tx_.
> + net_send_msg(&(msg->base));
12. The function is called 'iproto_msg_decode'. Lets leave it do decoding,
and not send anything from it. It just violates its purpose and name.
> + return -1;
> }
>
> static void
> @@ -1478,13 +1538,27 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
> }
> }
>
> -static inline struct iproto_msg *
> -tx_accept_msg(struct cmsg *m)
> +static inline int
> +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg)
> {
> - struct iproto_msg *msg = (struct iproto_msg *) m;
> - tx_accept_wpos(msg->connection, &msg->wpos);
> - tx_fiber_init(msg->connection->session, msg->header.sync);
> - return msg;
> + *msg = (struct iproto_msg *) m;
13. Why do you change the return type and add an out parameter? You
could just return NULL in case of an error.
> + /*
> + * In case connection init failed we don't need to try anymore.
> + */
> + if ((*msg)->connection->init_failed)
> + return -1;
> + /*
> + * In case session was not created we need to init connection in tx and
> + * create it here.
> + */
> + if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) {
> + (*msg)->connection->init_failed = true;
> + (*msg)->close_connection = true;
> + return -1;
> + }
> + tx_accept_wpos((*msg)->connection, &(*msg)->wpos);
> + tx_fiber_init((*msg)->connection->session, (*msg)->header.sync);
> + return 0;
> }
>
> /**
> @@ -1507,7 +1581,14 @@ tx_reply_error(struct iproto_msg *msg)
> static void
> tx_reply_iproto_error(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + /*
> + * We don't need to check tx_accept_msg() return value here
> + * as far as if we might only process iproto error in tx
> + * in case connection session is already created and
> + * thus tx_accept_msg() can't fail.
> + */
> + tx_accept_msg(m, &msg);
14. Well, if it fails, you have msg == NULL, and the code below will
crash. Otherwise it should be an assertion, not just a comment.
> struct obuf *out = msg->connection->tx.p_obuf;
> iproto_reply_error(out, diag_last_error(&msg->diag),
> msg->header.sync, ::schema_version);
> @@ -1865,6 +1961,29 @@ tx_process_replication(struct cmsg *m)
> }
> }
>
> +/**
> + * Check connection health and try to send an error to the client
> + * in case of internal connection init or on_connect trigger failure.
> + */
> +static bool
> +iproto_connection_fail(struct iproto_msg *msg)
15. The function is called 'connection_fail' and literally does nothing
when the connection is fine. Does not this name look wrong to you?
> +{
> + if (!msg->close_connection)
> + return false;
> + struct iproto_connection *con = msg->connection;
> + int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov,
> + obuf_iovcnt(msg->wpos.obuf));
16. You can't just write to a socket whenever you want. It may be
not writable. Please, use libev events and callbacks to send anything.
You can do a write only when you get EV_WRITE event from libev, who
in turn gets it from select/poll/epoll/kqueue/whatever-else-is-provided
by the OS.
> + if (nwr > 0) {
> + /* Count statistics. */
> + rmean_collect(rmean_net, IPROTO_SENT, nwr);
> + } else if (nwr < 0 && ! sio_wouldblock(errno)) {
> + diag_log();
> + }
> + iproto_connection_close(con);
> + iproto_msg_delete(msg);
> + return true;
> +}
> +
> static void
> net_send_msg(struct cmsg *m)
> {
> @@ -1936,81 +2066,60 @@ net_end_subscribe(struct cmsg *m)
> }
>
> /**
> - * Handshake a connection: invoke the on-connect trigger
> - * and possibly authenticate. Try to send the client an error
> - * upon a failure.
> + * Handshake a connection: prepare greeting for it.
> */
> static void
> -tx_process_connect(struct cmsg *m)
> +iproto_process_connect(struct iproto_msg *msg)
> {
> - struct iproto_msg *msg = (struct iproto_msg *) m;
> struct iproto_connection *con = msg->connection;
> struct obuf *out = msg->connection->tx.p_obuf;
> - try { /* connect. */
> - con->session = session_create(SESSION_TYPE_BINARY);
> - if (con->session == NULL)
> - diag_raise();
> - con->session->meta.connection = con;
> - tx_fiber_init(con->session, 0);
> - char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
> - /* TODO: dirty read from tx thread */
> - struct tt_uuid uuid = INSTANCE_UUID;
> - random_bytes(con->salt, IPROTO_SALT_SIZE);
> - greeting_encode(greeting, tarantool_version_id(), &uuid,
> - con->salt, IPROTO_SALT_SIZE);
> - obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
> - if (! rlist_empty(&session_on_connect)) {
> - if (session_run_on_connect_triggers(con->session) != 0)
> - diag_raise();
> - }
> + char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
> + /* TODO: dirty read from tx thread */
> + struct tt_uuid uuid = INSTANCE_UUID;
> + random_bytes(con->salt, IPROTO_SALT_SIZE);
17. Sorry, but wtf?! This data right now may be being written by TX thread,
so you literally may read and return *garbage* here. It is not acceptable.
It is not even a short term solution, it is just a bug, if you can't prove
the value is already initialized and constant by that moment.
If listen starts before recovery, and UUID is stored in the xlogs, then it
is definitely a bug.
> + greeting_encode(greeting, tarantool_version_id(), &uuid,
> + con->salt, IPROTO_SALT_SIZE);
> + if (obuf_dup(out, greeting, IPROTO_GREETING_SIZE)
> + != IPROTO_GREETING_SIZE) {
> + diag_set(OutOfMemory, IPROTO_GREETING_SIZE,
> + "greeting obuf", "dup");
> + iproto_reply_error(out, diag_last_error(&fiber()->diag),
> + msg->header.sync, ::schema_version);
> iproto_wpos_create(&msg->wpos, out);
> - } catch (Exception *e) {
> - tx_reply_error(msg);
> msg->close_connection = true;
> - }
> -}
More information about the Tarantool-patches
mailing list