[Tarantool-patches] [PATCH v4] iproto: make iproto thread more independent from tx
Ilya Kosarev
i.kosarev at tarantool.org
Mon Oct 19 12:49:49 MSK 2020
On connection, an evio service callback is invoked to accept it. The
next step after acception was to process connection to tx thread
through cbus. This meant that any connection interaction involves
tx thread even before we get to decode what does the client want
from us. Consequently, a number of problems appears. The main one
is that we might get descriptor leak in case of unresponsive tx thread
(for example, when building secondary index). There are some other
cases where we might not want to spend precious tx time to process
the connection in case iproto can do it all alone.
This patch allows iproto to accept connection and send greeting by
itself. The connection is initialized in tx thread when the real
request comes through iproto_msg_decode(). In case request type was not
recognized we can also send reply with an error without using tx. It is
planned to add more iproto logic to prevent extra interaction with
tx thread. This patch already to some extent solves descriptors leakage
problem as far as connection establishes and stays in fetch_schema
state while tx thread is unresponsive.
The other user visible change is that on_connect triggers won't run on
connections that don't provide any input, as reflected in
bad_trigger.test.py.
Part of #3776
@TarantoolBot document
Title: iproto: on_connect triggers execution
Update the documentation for on_connect triggers to reflect that they
are now being executed only with the first request. Though the triggers
are still the first thing to be executed on a new connection. While it
is quite a synthetic case to establish a connection without making
any requests it is technically possible and now your triggers won't be
executed in this case. Some request is required to start their
execution.
---
Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto
Issue: https://github.com/tarantool/tarantool/issues/3776
@ChangeLog:
* Make iproto thread more independent from tx. Connections are now
first processed in iproto not involving tx thread. On_connect (and,
consequently, on_disconnect) triggers won't run on connections that
don't provide any input (gh-3776).
Changes in v2:
- docbot request provided
- ChangeLog provided
- net_send_msg() used instead of net_send_error() where needed
- error cases made clear in iproto_msg_decode()
- replaced net_check_connection() with iproto_connection_fail() predicate
- improved tx session initialization fail processing to avoid extra tries
- fixed obuf_dup() fail processing in iproto_process_connect()
- added some comments
- some comments style fixed
Changes in v3:
- added braces to the confusing one-line if statement
- updated ChangeLog
- added description for iproto_msg_decode()
- simplified error processing in iproto_msg_decode()
Changes in v4:
- fixed msg->close_connection initialization bug
- fixed false-sharing problem in iproto_connection struct
- added needed assertion
- added needed comments
- names refactoring
- simplified patch a bit: removed extra return value, extra code
src/box/iproto.cc | 356 +++++++++++++++++++++-----------
test/box-py/bad_trigger.result | 1 +
test/box-py/bad_trigger.test.py | 22 +-
test/sql/prepared.result | 4 +
test/sql/prepared.test.lua | 1 +
5 files changed, 264 insertions(+), 120 deletions(-)
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index b8f65e5ec..45b1db6ff 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -245,7 +245,7 @@ iproto_msg_new(struct iproto_connection *con);
static void
iproto_resume(void);
-static void
+static int
iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
bool *stop_input);
@@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)
iproto_resume();
}
+static inline void
+iproto_connection_delete(struct iproto_connection *con);
+
/**
* A single global queue for all requests in all connections. All
* requests from all connections are processed concurrently.
@@ -280,6 +283,11 @@ static struct cord net_cord;
* in the tx thread.
*/
static struct slab_cache net_slabc;
+/**
+ * Slab cache used for allocating memory for output network buffers
+ * in the iproto thread.
+ */
+static struct slab_cache iproto_slabc;
struct rmean *rmean_net;
@@ -298,6 +306,9 @@ const char *rmean_net_strings[IPROTO_LAST] = {
"REQUESTS",
};
+static int
+tx_init_connect(struct iproto_msg *msg);
+
static void
tx_process_destroy(struct cmsg *m);
@@ -322,6 +333,9 @@ static const struct cmsg_hop disconnect_route[] = {
{ net_finish_disconnect, NULL }
};
+static void
+net_send_msg(struct cmsg *msg);
+
/**
* Kharon is in the dead world (iproto). Schedule an event to
* flush new obuf as reflected in the fresh wpos.
@@ -531,6 +545,8 @@ struct iproto_connection
* return.
*/
bool is_push_pending;
+ /** True if tx failed to create the session. */
+ bool is_init_failed;
} tx;
/** Authentication salt. */
char salt[IPROTO_SALT_SIZE];
@@ -566,6 +582,7 @@ iproto_msg_new(struct iproto_connection *con)
return NULL;
}
msg->connection = con;
+ msg->close_connection = false;
rmean_collect(rmean_net, IPROTO_REQUESTS, 1);
return msg;
}
@@ -650,7 +667,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con)
* other parts of the connection.
*/
con->state = IPROTO_CONNECTION_DESTROYED;
- cpipe_push(&tx_pipe, &con->destroy_msg);
+ if (con->session != NULL)
+ cpipe_push(&tx_pipe, &con->destroy_msg);
+ else {
+ /*
+ * In case session was not created we can safely destroy
+ * not involving tx thread. Thus we also need to destroy
+ * obuf, which still belongs to iproto thread.
+ */
+ obuf_destroy(&con->obuf[0]);
+ obuf_destroy(&con->obuf[1]);
+ iproto_connection_delete(con);
+ }
}
/**
@@ -661,6 +689,7 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con)
static inline void
iproto_connection_close(struct iproto_connection *con)
{
+ rlist_del(&con->in_stop_list);
if (evio_has_fd(&con->input)) {
/* Clears all pending events. */
ev_io_stop(con->loop, &con->input);
@@ -677,15 +706,22 @@ iproto_connection_close(struct iproto_connection *con)
* is done only once.
*/
con->p_ibuf->wpos -= con->parse_size;
- cpipe_push(&tx_pipe, &con->disconnect_msg);
assert(con->state == IPROTO_CONNECTION_ALIVE);
con->state = IPROTO_CONNECTION_CLOSED;
+ if (con->session != NULL) {
+ cpipe_push(&tx_pipe, &con->disconnect_msg);
+ } else {
+ /*
+ * In case session was not created we can safely
+ * try to start destroy not involving tx thread.
+ */
+ iproto_connection_try_to_start_destroy(con);
+ }
} else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
iproto_connection_try_to_start_destroy(con);
} else {
assert(con->state == IPROTO_CONNECTION_CLOSED);
}
- rlist_del(&con->in_stop_list);
}
static inline struct ibuf *
@@ -809,6 +845,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
assert(rlist_empty(&con->in_stop_list));
int n_requests = 0;
bool stop_input = false;
+ bool is_obuf_in_iproto = (con->session == NULL);
const char *errmsg;
while (con->parse_size != 0 && !stop_input) {
if (iproto_check_msg_max()) {
@@ -853,12 +890,22 @@ err_msgpack:
msg->len = reqend - reqstart; /* total request length */
- iproto_msg_decode(msg, &pos, reqend, &stop_input);
- /*
- * This can't throw, but should not be
- * done in case of exception.
- */
- cpipe_push_input(&tx_pipe, &msg->base);
+ if (iproto_msg_decode(msg, &pos, reqend, &stop_input) != 0) {
+ net_send_msg(&(msg->base));
+ } else {
+ if (is_obuf_in_iproto) {
+ /*
+ * If session was not created yet and obuf is
+ * still in iproto we need to destroy it. New
+ * one will be created in tx thread if needed.
+ */
+ obuf_destroy(&con->obuf[0]);
+ obuf_destroy(&con->obuf[1]);
+ is_obuf_in_iproto = false;
+ }
+ cpipe_push_input(&tx_pipe, &msg->base);
+ }
+
n_requests++;
/* Request is parsed */
assert(reqend > reqstart);
@@ -1105,8 +1152,8 @@ iproto_connection_new(int fd)
ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
- obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
- obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
+ obuf_create(&con->obuf[0], &iproto_slabc, iproto_readahead);
+ obuf_create(&con->obuf[1], &iproto_slabc, iproto_readahead);
con->p_ibuf = &con->ibuf[0];
con->tx.p_obuf = &con->obuf[0];
iproto_wpos_create(&con->wpos, con->tx.p_obuf);
@@ -1114,6 +1161,7 @@ iproto_connection_new(int fd)
con->parse_size = 0;
con->long_poll_count = 0;
con->session = NULL;
+ con->tx.is_init_failed = false;
rlist_create(&con->in_stop_list);
/* It may be very awkward to allocate at close. */
cmsg_init(&con->destroy_msg, destroy_route);
@@ -1134,10 +1182,6 @@ iproto_connection_delete(struct iproto_connection *con)
assert(!evio_has_fd(&con->input));
assert(con->session == NULL);
assert(con->state == IPROTO_CONNECTION_DESTROYED);
- /*
- * The output buffers must have been deleted
- * in tx thread.
- */
ibuf_destroy(&con->ibuf[0]);
ibuf_destroy(&con->ibuf[1]);
assert(con->obuf[0].pos == 0 &&
@@ -1167,13 +1211,13 @@ static void
tx_process_sql(struct cmsg *msg);
static void
-tx_reply_error(struct iproto_msg *msg);
+reply_error(struct iproto_msg *msg);
static void
tx_reply_iproto_error(struct cmsg *m);
static void
-net_send_msg(struct cmsg *msg);
+obuf_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos);
static void
net_send_error(struct cmsg *msg);
@@ -1244,7 +1288,19 @@ static const struct cmsg_hop error_route[] = {
{ net_send_error, NULL },
};
-static void
+/**
+ * Decode the request from @a pos to @a reqend and put it into the queue.
+ * @param msg[inout] iproto message.
+ * @param pos[inout] the start of the request to parse.
+ * @param reqend the end of the request to parse.
+ * @param stop_input[out] in case JOIN is running we need to stop IO.
+ *
+ * @retval -1 No route was inited. Decoding was not successfulll and
+ * the session is not inited, thus we will process the error in iproto.
+ * @retval 0 The route corresponding to the request or the error route
+ * was successfully inited.
+ */
+static int
iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
bool *stop_input)
{
@@ -1314,13 +1370,23 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
(uint32_t) type);
goto error;
}
- return;
+ return 0;
error:
/** Log and send the error. */
diag_log();
- diag_create(&msg->diag);
- diag_move(&fiber()->diag, &msg->diag);
- cmsg_init(&msg->base, error_route);
+ if (msg->connection->session != NULL) {
+ diag_create(&msg->diag);
+ diag_move(&fiber()->diag, &msg->diag);
+ cmsg_init(&msg->base, error_route);
+ return 0;
+ }
+ /*
+ * In case session was not created we can process error path
+ * without tx thread.
+ */
+ obuf_accept_wpos(msg->connection, &msg->wpos);
+ reply_error(msg);
+ return -1;
}
static void
@@ -1382,10 +1448,6 @@ tx_process_destroy(struct cmsg *m)
session_destroy(con->session);
con->session = NULL; /* safety */
}
- /*
- * Got to be done in iproto thread since
- * that's where the memory is allocated.
- */
obuf_destroy(&con->obuf[0]);
obuf_destroy(&con->obuf[1]);
}
@@ -1441,7 +1503,7 @@ tx_discard_input(struct iproto_msg *msg)
/**
* The goal of this function is to maintain the state of
- * two rotating connection output buffers in tx thread.
+ * two rotating connection output buffers.
*
* The function enforces the following rules:
* - if both out buffers are empty, any one is selected;
@@ -1455,7 +1517,7 @@ tx_discard_input(struct iproto_msg *msg)
* thread.
*/
static void
-tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
+obuf_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
{
struct obuf *prev = &con->obuf[con->tx.p_obuf == con->obuf];
if (wpos->obuf == con->tx.p_obuf) {
@@ -1478,13 +1540,27 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
}
}
-static inline struct iproto_msg *
-tx_accept_msg(struct cmsg *m)
+static inline int
+tx_accept_msg(struct cmsg *m, struct iproto_msg **msg)
{
- struct iproto_msg *msg = (struct iproto_msg *) m;
- tx_accept_wpos(msg->connection, &msg->wpos);
- tx_fiber_init(msg->connection->session, msg->header.sync);
- return msg;
+ *msg = (struct iproto_msg *) m;
+ /*
+ * In case connection init failed we don't need to try anymore.
+ */
+ if ((*msg)->connection->tx.is_init_failed)
+ return -1;
+ /*
+ * In case session was not created we need to init connection in tx and
+ * create it here.
+ */
+ if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) {
+ (*msg)->connection->tx.is_init_failed = true;
+ (*msg)->close_connection = true;
+ return -1;
+ }
+ obuf_accept_wpos((*msg)->connection, &(*msg)->wpos);
+ tx_fiber_init((*msg)->connection->session, (*msg)->header.sync);
+ return 0;
}
/**
@@ -1492,7 +1568,7 @@ tx_accept_msg(struct cmsg *m)
* write position. Doesn't throw.
*/
static void
-tx_reply_error(struct iproto_msg *msg)
+reply_error(struct iproto_msg *msg)
{
struct obuf *out = msg->connection->tx.p_obuf;
iproto_reply_error(out, diag_last_error(&fiber()->diag),
@@ -1507,7 +1583,15 @@ tx_reply_error(struct iproto_msg *msg)
static void
tx_reply_iproto_error(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
+ struct iproto_msg *msg;
+ int rc = tx_accept_msg(m, &msg);
+ /*
+ * tx_accept_msg() can't fail here as far as iproto error
+ * can only be processed in tx in case connection session
+ * is already created.
+ */
+ assert(rc == 0);
+ (void)rc;
struct obuf *out = msg->connection->tx.p_obuf;
iproto_reply_error(out, diag_last_error(&msg->diag),
msg->header.sync, ::schema_version);
@@ -1527,7 +1611,9 @@ tx_inject_delay(void)
static void
tx_process1(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
+ struct iproto_msg *msg;
+ if (tx_accept_msg(m, &msg) != 0)
+ goto error;
if (tx_check_schema(msg->header.schema_version))
goto error;
@@ -1547,23 +1633,26 @@ tx_process1(struct cmsg *m)
iproto_wpos_create(&msg->wpos, out);
return;
error:
- tx_reply_error(msg);
+ reply_error(msg);
}
static void
tx_process_select(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
+ struct iproto_msg *msg;
+ struct request *req;
struct obuf *out;
struct obuf_svp svp;
struct port port;
int count;
int rc;
- struct request *req = &msg->dml;
+ if (tx_accept_msg(m, &msg) != 0)
+ goto error;
if (tx_check_schema(msg->header.schema_version))
goto error;
tx_inject_delay();
+ req = &msg->dml;
rc = box_select(req->space_id, req->index_id,
req->iterator, req->offset, req->limit,
req->key, req->key_end, &port);
@@ -1590,7 +1679,7 @@ tx_process_select(struct cmsg *m)
iproto_wpos_create(&msg->wpos, out);
return;
error:
- tx_reply_error(msg);
+ reply_error(msg);
}
static int
@@ -1607,7 +1696,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event)
static void
tx_process_call(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
+ struct iproto_msg *msg;
+ if (tx_accept_msg(m, &msg) != 0)
+ goto error;
if (tx_check_schema(msg->header.schema_version))
goto error;
@@ -1680,19 +1771,21 @@ tx_process_call(struct cmsg *m)
iproto_wpos_create(&msg->wpos, out);
return;
error:
- tx_reply_error(msg);
+ reply_error(msg);
}
static void
tx_process_misc(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
- struct iproto_connection *con = msg->connection;
- struct obuf *out = con->tx.p_obuf;
+ struct iproto_msg *msg;
+ if (tx_accept_msg(m, &msg) != 0)
+ goto error;
if (tx_check_schema(msg->header.schema_version))
goto error;
try {
+ struct iproto_connection *con = msg->connection;
+ struct obuf *out = con->tx.p_obuf;
struct ballot ballot;
switch (msg->header.type) {
case IPROTO_AUTH:
@@ -1719,17 +1812,17 @@ tx_process_misc(struct cmsg *m)
}
iproto_wpos_create(&msg->wpos, out);
} catch (Exception *e) {
- tx_reply_error(msg);
+ reply_error(msg);
}
return;
error:
- tx_reply_error(msg);
+ reply_error(msg);
}
static void
tx_process_sql(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
+ struct iproto_msg *msg;
struct obuf *out;
struct port port;
struct sql_bind *bind = NULL;
@@ -1738,6 +1831,8 @@ tx_process_sql(struct cmsg *m)
uint32_t len;
bool is_unprepare = false;
+ if (tx_accept_msg(m, &msg) != 0)
+ goto error;
if (tx_check_schema(msg->header.schema_version))
goto error;
assert(msg->header.type == IPROTO_EXECUTE ||
@@ -1819,13 +1914,17 @@ tx_process_sql(struct cmsg *m)
iproto_wpos_create(&msg->wpos, out);
return;
error:
- tx_reply_error(msg);
+ reply_error(msg);
}
static void
tx_process_replication(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
+ struct iproto_msg *msg;
+ if (tx_accept_msg(m, &msg) != 0) {
+ reply_error(msg);
+ return;
+ }
struct iproto_connection *con = msg->connection;
struct ev_io io;
coio_create(&io, con->input.fd);
@@ -1865,6 +1964,30 @@ tx_process_replication(struct cmsg *m)
}
}
+/**
+ * Check connection health and try to send an error to the client
+ * in case of internal connection init or on_connect trigger failure.
+ */
+static bool
+iproto_connection_check_valid(struct iproto_msg *msg)
+{
+ if (!msg->close_connection)
+ return false;
+ struct iproto_connection *con = msg->connection;
+ int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov,
+ obuf_iovcnt(msg->wpos.obuf));
+ if (nwr > 0) {
+ /* Count statistics. */
+ rmean_collect(rmean_net, IPROTO_SENT, nwr);
+ } else if (nwr < 0 && ! sio_wouldblock(errno)) {
+ diag_log();
+ }
+ assert(iproto_connection_is_idle(con));
+ iproto_connection_close(con);
+ iproto_msg_delete(msg);
+ return true;
+}
+
static void
net_send_msg(struct cmsg *m)
{
@@ -1881,6 +2004,9 @@ net_send_msg(struct cmsg *m)
}
con->wend = msg->wpos;
+ if (iproto_connection_check_valid(msg))
+ return;
+
if (evio_has_fd(&con->output)) {
if (! ev_is_active(&con->output))
ev_feed_event(con->loop, &con->output, EV_WRITE);
@@ -1910,6 +2036,10 @@ net_end_join(struct cmsg *m)
struct iproto_connection *con = msg->connection;
msg->p_ibuf->rpos += msg->len;
+
+ if (iproto_connection_check_valid(msg))
+ return;
+
iproto_msg_delete(msg);
assert(! ev_is_active(&con->input));
@@ -1928,6 +2058,10 @@ net_end_subscribe(struct cmsg *m)
struct iproto_connection *con = msg->connection;
msg->p_ibuf->rpos += msg->len;
+
+ if (iproto_connection_check_valid(msg))
+ return;
+
iproto_msg_delete(msg);
assert(! ev_is_active(&con->input));
@@ -1936,81 +2070,68 @@ net_end_subscribe(struct cmsg *m)
}
/**
- * Handshake a connection: invoke the on-connect trigger
- * and possibly authenticate. Try to send the client an error
- * upon a failure.
+ * Handshake a connection: prepare greeting for it.
*/
static void
-tx_process_connect(struct cmsg *m)
+iproto_process_connect(struct iproto_msg *msg)
{
- struct iproto_msg *msg = (struct iproto_msg *) m;
struct iproto_connection *con = msg->connection;
struct obuf *out = msg->connection->tx.p_obuf;
- try { /* connect. */
- con->session = session_create(SESSION_TYPE_BINARY);
- if (con->session == NULL)
- diag_raise();
- con->session->meta.connection = con;
- tx_fiber_init(con->session, 0);
- char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
- /* TODO: dirty read from tx thread */
- struct tt_uuid uuid = INSTANCE_UUID;
- random_bytes(con->salt, IPROTO_SALT_SIZE);
- greeting_encode(greeting, tarantool_version_id(), &uuid,
- con->salt, IPROTO_SALT_SIZE);
- obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
- if (! rlist_empty(&session_on_connect)) {
- if (session_run_on_connect_triggers(con->session) != 0)
- diag_raise();
- }
+ char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
+ /*
+ * INSTANCE_UUID is guaranteed to be inited before this moment.
+ * We start listening either in local_recovery() or bootstrap().
+ * The INSTANCE_UUID is ensured to be inited in the beginning of
+ * both methods. In case of local_recovery() it is verified that
+ * INSTANCE_UUID was read from the snapshot in memtx_engine_new().
+ * In bootstrap() INSTANCE_UUID is either taken from the
+ * instance_uuid box.cfg{} param or created on the spot.
+ */
+ struct tt_uuid uuid = INSTANCE_UUID;
+ random_bytes(con->salt, IPROTO_SALT_SIZE);
+ greeting_encode(greeting, tarantool_version_id(), &uuid,
+ con->salt, IPROTO_SALT_SIZE);
+ if (obuf_dup(out, greeting, IPROTO_GREETING_SIZE)
+ != IPROTO_GREETING_SIZE) {
+ diag_set(OutOfMemory, IPROTO_GREETING_SIZE,
+ "greeting obuf", "dup");
+ iproto_reply_error(out, diag_last_error(&fiber()->diag),
+ msg->header.sync, ::schema_version);
iproto_wpos_create(&msg->wpos, out);
- } catch (Exception *e) {
- tx_reply_error(msg);
msg->close_connection = true;
- }
-}
-
-/**
- * Send a response to connect to the client or close the
- * connection in case on_connect trigger failed.
- */
-static void
-net_send_greeting(struct cmsg *m)
-{
- struct iproto_msg *msg = (struct iproto_msg *) m;
- struct iproto_connection *con = msg->connection;
- if (msg->close_connection) {
- struct obuf *out = msg->wpos.obuf;
- int64_t nwr = sio_writev(con->output.fd, out->iov,
- obuf_iovcnt(out));
-
- if (nwr > 0) {
- /* Count statistics. */
- rmean_collect(rmean_net, IPROTO_SENT, nwr);
- } else if (nwr < 0 && ! sio_wouldblock(errno)) {
- diag_log();
- }
- assert(iproto_connection_is_idle(con));
- iproto_connection_close(con);
- iproto_msg_delete(msg);
+ iproto_connection_check_valid(msg);
return;
}
+ iproto_wpos_create(&msg->wpos, out);
con->wend = msg->wpos;
- /*
- * Connect is synchronous, so no one could have been
- * messing up with the connection while it was in
- * progress.
- */
assert(evio_has_fd(&con->output));
- /* Handshake OK, start reading input. */
ev_feed_event(con->loop, &con->output, EV_WRITE);
iproto_msg_delete(msg);
}
-static const struct cmsg_hop connect_route[] = {
- { tx_process_connect, &net_pipe },
- { net_send_greeting, NULL },
-};
+static int
+tx_init_connect(struct iproto_msg *msg)
+{
+ struct iproto_connection *con = msg->connection;
+ obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
+ obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
+ con->tx.p_obuf = &con->obuf[0];
+ iproto_wpos_create(&con->wpos, con->tx.p_obuf);
+ iproto_wpos_create(&con->wend, con->tx.p_obuf);
+
+ con->session = session_create(SESSION_TYPE_BINARY);
+ if (con->session == NULL)
+ return -1;
+ con->session->meta.connection = con;
+
+ tx_fiber_init(con->session, 0);
+ if (! rlist_empty(&session_on_connect)) {
+ if (session_run_on_connect_triggers(con->session) != 0)
+ return -1;
+ }
+
+ return 0;
+}
/** }}} */
@@ -2037,11 +2158,9 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
mempool_free(&iproto_connection_pool, con);
return -1;
}
- cmsg_init(&msg->base, connect_route);
msg->p_ibuf = con->p_ibuf;
msg->wpos = con->wpos;
- msg->close_connection = false;
- cpipe_push(&tx_pipe, &msg->base);
+ iproto_process_connect(msg);
return 0;
}
@@ -2054,6 +2173,8 @@ static struct evio_service binary; /* iproto binary listener */
static int
net_cord_f(va_list /* ap */)
{
+ slab_cache_create(&iproto_slabc, &runtime);
+
mempool_create(&iproto_msg_pool, &cord()->slabc,
sizeof(struct iproto_msg));
mempool_create(&iproto_connection_pool, &cord()->slabc,
@@ -2144,7 +2265,7 @@ tx_end_push(struct cmsg *m)
struct iproto_kharon *kharon = (struct iproto_kharon *) m;
struct iproto_connection *con =
container_of(kharon, struct iproto_connection, kharon);
- tx_accept_wpos(con, &kharon->wpos);
+ obuf_accept_wpos(con, &kharon->wpos);
con->tx.is_push_sent = false;
if (con->tx.is_push_pending)
tx_begin_push(con);
@@ -2297,7 +2418,8 @@ iproto_listen(const char *uri)
size_t
iproto_mem_used(void)
{
- return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc);
+ return slab_cache_used(&net_cord.slabc)
+ + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc);
}
size_t
diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result
index 5d064b764..bfa9c2b75 100644
--- a/test/box-py/bad_trigger.result
+++ b/test/box-py/bad_trigger.result
@@ -14,6 +14,7 @@ type(box.session.on_connect(f1))
- function
...
greeting: True
+Nothing to read yet: Resource temporarily unavailable
fixheader: True
error code 32
error message: [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value)
diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py
index 7d200b921..4739dfe13 100644
--- a/test/box-py/bad_trigger.test.py
+++ b/test/box-py/bad_trigger.test.py
@@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection
from lib.tarantool_connection import TarantoolConnection
from tarantool import NetworkError
from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \
- REQUEST_TYPE_ERROR
+ REQUEST_TYPE_ERROR, REQUEST_TYPE_PING
import socket
import msgpack
@@ -26,9 +26,25 @@ s = conn.socket
# Read greeting
print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE
-# Read error packet
+# Check socket
IPROTO_FIXHEADER_SIZE = 5
-fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
+s.setblocking(False)
+fixheader = None
+try:
+ fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
+except socket.error as err:
+ print 'Nothing to read yet:', str(err).split(']')[1]
+else:
+ print 'Received fixheader'
+s.setblocking(True)
+
+# Send ping
+query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING })
+s.send(msgpack.dumps(len(query)) + query)
+
+# Read error packet
+if not fixheader:
+ fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE
unpacker.feed(fixheader)
packet_len = unpacker.unpack()
diff --git a/test/sql/prepared.result b/test/sql/prepared.result
index 0db2cc03f..e6d4a947f 100644
--- a/test/sql/prepared.result
+++ b/test/sql/prepared.result
@@ -752,6 +752,10 @@ if is_remote then
end;
| ---
| ...
+test_run:wait_cond(function() return box.info.sql().cache.size == 0 end, 10);
+ | ---
+ | - true
+ | ...
box.cfg{sql_cache_size = 0};
| ---
| ...
diff --git a/test/sql/prepared.test.lua b/test/sql/prepared.test.lua
index d8e8a44cb..798885230 100644
--- a/test/sql/prepared.test.lua
+++ b/test/sql/prepared.test.lua
@@ -271,6 +271,7 @@ if is_remote then
cn:close()
cn = remote.connect(box.cfg.listen)
end;
+test_run:wait_cond(function() return box.info.sql().cache.size == 0 end, 10);
box.cfg{sql_cache_size = 0};
box.cfg{sql_cache_size = 3000};
--
2.17.1
More information about the Tarantool-patches
mailing list