[tarantool-patches] [PATCH v2 1/2] iproto: fix error with unstoppable batching
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Fri May 4 00:05:19 MSK 2018
IProto connection stops input reading, when active request count
is reached. But when multiple requests are in a batch, the IProto
does not check the limit, so it can be violated.
Lets check the limit during batch parsing after each message too,
not only once before parsing.
---
src/box/iproto.cc | 143 +++++++++++++++++++++++++++---------------
test/box/errinj.result | 69 ++++++++++++++++++++
test/box/errinj.test.lua | 29 +++++++++
test/box/net_msg_max.result | 21 ++++++-
test/box/net_msg_max.test.lua | 13 +++-
5 files changed, 222 insertions(+), 53 deletions(-)
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 9ccaf1dc7..dd7f97ecf 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -192,9 +192,13 @@ static struct mempool iproto_msg_pool;
static struct iproto_msg *
iproto_msg_new(struct iproto_connection *con)
{
+ ERROR_INJECT(ERRINJ_TESTING, { return NULL; });
struct iproto_msg *msg =
- (struct iproto_msg *) mempool_alloc_xc(&iproto_msg_pool);
- msg->connection = con;
+ (struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
+ if (msg != NULL)
+ msg->connection = con;
+ else
+ diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");
return msg;
}
@@ -384,30 +388,6 @@ iproto_check_msg_max()
return request_count > IPROTO_MSG_MAX;
}
-/**
- * Throttle the queue to the tx thread and ensure the fiber pool
- * in tx thread is not depleted by a flood of incoming requests:
- * resume a stopped connection only if there is a spare message
- * object in the message pool.
- */
-static void
-iproto_resume()
-{
- /*
- * Most of the time we have nothing to do here: throttling
- * is not active.
- */
- if (rlist_empty(&stopped_connections))
- return;
- if (iproto_check_msg_max())
- return;
-
- struct iproto_connection *con;
- con = rlist_first_entry(&stopped_connections, struct iproto_connection,
- in_stop_list);
- ev_feed_event(con->loop, &con->input, EV_READ);
-}
-
/**
* A connection is idle when the client is gone
* and there are no outstanding msgs in the msg queue.
@@ -575,20 +555,31 @@ iproto_connection_input_buffer(struct iproto_connection *con)
return new_ibuf;
}
-/** Enqueue all requests which were read up. */
-static inline void
+/**
+ * Enqueue all requests which were read up. If a requests limit is
+ * reached - stop the connection input even if not the whole batch
+ * is enqueued. Else try to read more feeding read event to the
+ * event loop.
+ * @param con Connection to enqueue in.
+ * @param in Buffer to parse.
+ *
+ * @retval 0 Success.
+ * @retval -1 Invalid MessagePack error.
+ */
+static inline int
iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
{
int n_requests = 0;
bool stop_input = false;
- while (con->parse_size && stop_input == false) {
+ while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) {
const char *reqstart = in->wpos - con->parse_size;
const char *pos = reqstart;
/* Read request length. */
if (mp_typeof(*pos) != MP_UINT) {
cpipe_flush_input(&tx_pipe);
- tnt_raise(ClientError, ER_INVALID_MSGPACK,
- "packet length");
+ diag_set(ClientError, ER_INVALID_MSGPACK,
+ "packet length");
+ return -1;
}
if (mp_check_uint(pos, in->wpos) >= 0)
break;
@@ -597,6 +588,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
if (reqend > in->wpos)
break;
struct iproto_msg *msg = iproto_msg_new(con);
+ if (msg == NULL) {
+ /*
+ * Do not tread it as an error - just wait
+ * until some of requests are finished.
+ */
+ iproto_connection_stop(con);
+ return 0;
+ }
msg->p_ibuf = con->p_ibuf;
msg->wpos = con->wpos;
@@ -644,9 +643,49 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
* requests, keep reading input, if only to avoid
* a deadlock on this connection.
*/
- ev_feed_event(con->loop, &con->input, EV_READ);
+ if (iproto_check_msg_max())
+ iproto_connection_stop(con);
+ else
+ ev_feed_event(con->loop, &con->input, EV_READ);
}
cpipe_flush_input(&tx_pipe);
+ return 0;
+}
+
+/**
+ * Enqueue connection's pending requests. Completely ressurect the
+ * connection, if it has no more requests, and the limit still is
+ * not reached.
+ */
+static void
+iproto_connection_resume(struct iproto_connection *con)
+{
+ assert(! iproto_check_msg_max());
+ rlist_del(&con->in_stop_list);
+ if (iproto_enqueue_batch(con, con->p_ibuf) != 0) {
+ struct error *e = box_error_last();
+ iproto_write_error(con->input.fd, e, ::schema_version, 0);
+ error_log(e);
+ iproto_connection_close(con);
+ }
+}
+
+/**
+ * Throttle the queue to the tx thread and ensure the fiber pool
+ * in tx thread is not depleted by a flood of incoming requests:
+ * resume a stopped connection only if there is a spare message
+ * object in the message pool.
+ */
+static void
+iproto_resume()
+{
+ while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
+ struct iproto_connection *con =
+ rlist_first_entry(&stopped_connections,
+ struct iproto_connection,
+ in_stop_list);
+ iproto_connection_resume(con);
+ }
}
static void
@@ -657,16 +696,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
(struct iproto_connection *) watcher->data;
int fd = con->input.fd;
assert(fd >= 0);
- if (! rlist_empty(&con->in_stop_list)) {
- /* Resumed stopped connection. */
- rlist_del(&con->in_stop_list);
- /*
- * This connection may have no input, so
- * resume one more connection which might have
- * input.
- */
- iproto_resume();
- }
+ assert(rlist_empty(&con->in_stop_list));
/*
* Throttle if there are too many pending requests,
* otherwise we might deplete the fiber pool in tx
@@ -703,7 +733,8 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
in->wpos += nrd;
con->parse_size += nrd;
/* Enqueue all requests which are fully read up. */
- iproto_enqueue_batch(con, in);
+ if (iproto_enqueue_batch(con, in) != 0)
+ diag_raise();
} catch (Exception *e) {
/* Best effort at sending the error message to the client. */
iproto_write_error(fd, e, ::schema_version, 0);
@@ -801,7 +832,11 @@ static struct iproto_connection *
iproto_connection_new(int fd)
{
struct iproto_connection *con = (struct iproto_connection *)
- mempool_alloc_xc(&iproto_connection_pool);
+ mempool_alloc(&iproto_connection_pool);
+ if (con == NULL) {
+ diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con");
+ return NULL;
+ }
con->input.data = con->output.data = con;
con->loop = loop();
ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
@@ -1471,7 +1506,8 @@ net_end_join(struct cmsg *m)
* Enqueue any messages if they are in the readahead
* queue. Will simply start input otherwise.
*/
- iproto_enqueue_batch(con, msg->p_ibuf);
+ if (iproto_enqueue_batch(con, msg->p_ibuf) != 0)
+ iproto_connection_close(con);
}
static void
@@ -1574,20 +1610,29 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
{
(void) addr;
(void) addrlen;
- struct iproto_connection *con;
-
- con = iproto_connection_new(fd);
+ struct iproto_msg *msg;
+ struct iproto_connection *con = iproto_connection_new(fd);
+ if (con == NULL)
+ goto error_conn;
/*
* Ignore msg allocation failure - the queue size is
* fixed so there is a limited number of msgs in
* use, all stored in just a few blocks of the memory pool.
*/
- struct iproto_msg *msg = iproto_msg_new(con);
+ msg = iproto_msg_new(con);
+ if (msg == NULL)
+ goto error_msg;
cmsg_init(&msg->base, connect_route);
msg->p_ibuf = con->p_ibuf;
msg->wpos = con->wpos;
msg->close_connection = false;
cpipe_push(&tx_pipe, &msg->base);
+ return;
+error_msg:
+ mempool_free(&iproto_connection_pool, con);
+error_conn:
+ close(fd);
+ return;
}
static struct evio_service binary; /* iproto binary listener */
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 5b4bc23a3..5da3e2642 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -1150,6 +1150,75 @@ cn:close()
s:drop()
---
...
+--
+-- In messages memory pool is over, stop the connection, until the
+-- pool has free memory.
+--
+started = 0
+---
+...
+finished = 0
+---
+...
+continue = false
+---
+...
+test_run:cmd('setopt delimiter ";"')
+---
+- true
+...
+function long_poll_f()
+ started = started + 1
+ f = fiber.self()
+ while not continue do fiber.sleep(0.01) end
+ finished = finished + 1
+end;
+---
+...
+test_run:cmd('setopt delimiter ""');
+---
+- true
+...
+cn = net_box.connect(box.cfg.listen)
+---
+...
+function long_poll() cn:call('long_poll_f') end
+---
+...
+_ = fiber.create(long_poll)
+---
+...
+while started ~= 1 do fiber.sleep(0.01) end
+---
+...
+-- Simulate OOM for new requests.
+errinj.set("ERRINJ_TESTING", true)
+---
+- ok
+...
+_ = fiber.create(long_poll)
+---
+...
+fiber.sleep(0.1)
+---
+...
+started == 1
+---
+- true
+...
+continue = true
+---
+...
+errinj.set("ERRINJ_TESTING", false)
+---
+- ok
+...
+while finished ~= 2 do fiber.sleep(0.01) end
+---
+...
+cn:close()
+---
+...
box.schema.user.revoke('guest', 'read,write,execute','universe')
---
...
diff --git a/test/box/errinj.test.lua b/test/box/errinj.test.lua
index e1460d1b6..365d535c2 100644
--- a/test/box/errinj.test.lua
+++ b/test/box/errinj.test.lua
@@ -390,6 +390,35 @@ ok, err
cn:close()
s:drop()
+--
+-- In messages memory pool is over, stop the connection, until the
+-- pool has free memory.
+--
+started = 0
+finished = 0
+continue = false
+test_run:cmd('setopt delimiter ";"')
+function long_poll_f()
+ started = started + 1
+ f = fiber.self()
+ while not continue do fiber.sleep(0.01) end
+ finished = finished + 1
+end;
+test_run:cmd('setopt delimiter ""');
+cn = net_box.connect(box.cfg.listen)
+function long_poll() cn:call('long_poll_f') end
+_ = fiber.create(long_poll)
+while started ~= 1 do fiber.sleep(0.01) end
+-- Simulate OOM for new requests.
+errinj.set("ERRINJ_TESTING", true)
+_ = fiber.create(long_poll)
+fiber.sleep(0.1)
+started == 1
+continue = true
+errinj.set("ERRINJ_TESTING", false)
+while finished ~= 2 do fiber.sleep(0.01) end
+cn:close()
+
box.schema.user.revoke('guest', 'read,write,execute','universe')
--
diff --git a/test/box/net_msg_max.result b/test/box/net_msg_max.result
index dde2016b7..de22bcbb9 100644
--- a/test/box/net_msg_max.result
+++ b/test/box/net_msg_max.result
@@ -70,10 +70,13 @@ end;
...
-- Wait until 'active' stops growing - it means, that the input
-- is blocked.
+-- No more messages.
function wait_active(value)
while value ~= active do
fiber.sleep(0.01)
end
+ fiber.sleep(0.01)
+ assert(value == active)
end;
---
...
@@ -99,11 +102,23 @@ run_workers(conn2)
wait_active(run_max * 2)
---
...
-active == run_max * 2 or active
+wait_finished(active)
---
-- true
...
-wait_finished(active)
+--
+-- Test that each message in a batch is checked. When a limit is
+-- reached, other messages must be processed later.
+--
+run_max = limit * 5
+---
+...
+run_workers(conn)
+---
+...
+wait_active(limit + 1)
+---
+...
+wait_finished(run_max)
---
...
conn2:close()
diff --git a/test/box/net_msg_max.test.lua b/test/box/net_msg_max.test.lua
index 560e37017..39f8f53f7 100644
--- a/test/box/net_msg_max.test.lua
+++ b/test/box/net_msg_max.test.lua
@@ -44,6 +44,9 @@ function wait_active(value)
while value ~= active do
fiber.sleep(0.01)
end
+ fiber.sleep(0.01)
+-- No more messages.
+ assert(value == active)
end;
function wait_finished(needed)
@@ -58,9 +61,17 @@ test_run:cmd("setopt delimiter ''");
run_workers(conn)
run_workers(conn2)
wait_active(run_max * 2)
-active == run_max * 2 or active
wait_finished(active)
+--
+-- Test that each message in a batch is checked. When a limit is
+-- reached, other messages must be processed later.
+--
+run_max = limit * 5
+run_workers(conn)
+wait_active(limit + 1)
+wait_finished(run_max)
+
conn2:close()
conn:close()
--
2.15.1 (Apple Git-101)
More information about the Tarantool-patches
mailing list