Tarantool development patches archive
 help / color / mirror / Atom feed
From: Konstantin Osipov <kostja@tarantool.org>
To: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Cc: tarantool-patches@freelists.org
Subject: [tarantool-patches] Re: [PATCH v2 1/2] iproto: fix error with unstoppable batching
Date: Fri, 4 May 2018 11:26:07 +0300	[thread overview]
Message-ID: <20180504082607.GA5623@atlas> (raw)
In-Reply-To: <00583e6bbf9ca53c3a44114351b9fdb9e539321d.1525381393.git.v.shpilevoy@tarantool.org>

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/05/04 00:07]:
> IProto connection stops input reading, when active request count

Vlad, while gerund in English is a valid grammatical form it is
rarely used in colloquial speech. Please try to avoid using it
unless you know what you're doing. Sorry for nitpicking.

Instead of trying to construct a complex sentence, simply try
to split your point in two smaller ones. English, despite being a
member of Indo-European family along with Russian, had lost
grammatical cases and respective endings long time ago. Without case
endings complex sentences are extremely hard to parse,
so they fell out of use.

> is reached. But when multiple requests are in a batch, the IProto
> does not check the limit, so it can be violated.
> 
> Lets check the limit during batch parsing after each message too,
> not only once before parsing.
> ---
>  src/box/iproto.cc             | 143 +++++++++++++++++++++++++++---------------
>  test/box/errinj.result        |  69 ++++++++++++++++++++
>  test/box/errinj.test.lua      |  29 +++++++++
>  test/box/net_msg_max.result   |  21 ++++++-
>  test/box/net_msg_max.test.lua |  13 +++-
>  5 files changed, 222 insertions(+), 53 deletions(-)
> 
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index 9ccaf1dc7..dd7f97ecf 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -192,9 +192,13 @@ static struct mempool iproto_msg_pool;
>  static struct iproto_msg *
>  iproto_msg_new(struct iproto_connection *con)
>  {
> +	ERROR_INJECT(ERRINJ_TESTING, { return NULL; });
>  	struct iproto_msg *msg =
> -		(struct iproto_msg *) mempool_alloc_xc(&iproto_msg_pool);
> -	msg->connection = con;
> +		(struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
> +	if (msg != NULL)
> +		msg->connection = con;
> +	else
> +		diag_set(OutOfMemory, sizeof(*msg), "mempool_alloc", "msg");

Nitpick: usually handling the error first and dealing with the
rest later makes it easier to understand what's going on.

This idiom would also make the code easier to parse in future, when more
stuff is added to the "else" branch:

if (msg == NULL) {
    diag_set()
    return NULL;
}

msg->connection = con;
return msg;


>  }
>  
> -/** Enqueue all requests which were read up. */
> -static inline void
> +/**
> + * Enqueue all requests which were read up. If a requests limit is

request limit; request is an adjective here - when a noun is used
as an adjective, its singular form is used.

> + * reached - stop the connection input even if not the whole batch
> + * is enqueued. Else try to read more feeding read event to the
> + * event loop.
> + * @param con Connection to enqueue in.
> + * @param in Buffer to parse.
> + *
> + * @retval  0 Success.
> + * @retval -1 Invalid MessagePack error.
> + */
> +static inline int
>  iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>  {
>  	int n_requests = 0;
>  	bool stop_input = false;
> -	while (con->parse_size && stop_input == false) {
> +	while (con->parse_size != 0 && !stop_input && !iproto_check_msg_max()) {
>  		const char *reqstart = in->wpos - con->parse_size;
>  		const char *pos = reqstart;
>  		/* Read request length. */
>  		if (mp_typeof(*pos) != MP_UINT) {
>  			cpipe_flush_input(&tx_pipe);
> -			tnt_raise(ClientError, ER_INVALID_MSGPACK,
> -				  "packet length");
> +			diag_set(ClientError, ER_INVALID_MSGPACK,
> +				 "packet length");
> +			return -1;
>  		}
>  		if (mp_check_uint(pos, in->wpos) >= 0)
>  			break;
> @@ -597,6 +588,14 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>  		if (reqend > in->wpos)
>  			break;
>  		struct iproto_msg *msg = iproto_msg_new(con);
> +		if (msg == NULL) {
> +			/*
> +			 * Do not tread it as an error - just wait
> +			 * until some of requests are finished.
> +			 */
> +			iproto_connection_stop(con);
> +			return 0;
> +		}
>  		msg->p_ibuf = con->p_ibuf;
>  		msg->wpos = con->wpos;
>  
> @@ -644,9 +643,49 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
>  		 * requests, keep reading input, if only to avoid
>  		 * a deadlock on this connection.
>  		 */
> -		ev_feed_event(con->loop, &con->input, EV_READ);
> +		if (iproto_check_msg_max())
> +			iproto_connection_stop(con);
> +		else
> +			ev_feed_event(con->loop, &con->input, EV_READ);

Looks like a double check for msg max in the same loop. Could you
please try to avoid that?

>  	}
>  	cpipe_flush_input(&tx_pipe);
> +	return 0;
> +}
> +
> +/**
> + * Enqueue connection's pending requests. Completely ressurect the

Please turn on spell checking in your editor.

> + * connection, if it has no more requests, and the limit still is
> + * not reached.
> + */
> +static void
> +iproto_connection_resume(struct iproto_connection *con)
> +{
> +	assert(! iproto_check_msg_max());
> +	rlist_del(&con->in_stop_list);
> +	if (iproto_enqueue_batch(con, con->p_ibuf) != 0) {
> +		struct error *e = box_error_last();
> +		iproto_write_error(con->input.fd, e, ::schema_version, 0);
> +		error_log(e);
> +		iproto_connection_close(con);
> +	}
> +}
> +
> +/**
> + * Throttle the queue to the tx thread and ensure the fiber pool
> + * in tx thread is not depleted by a flood of incoming requests:
> + * resume a stopped connection only if there is a spare message
> + * object in the message pool.
> + */
> +static void
> +iproto_resume()
> +{
> +	while (!iproto_check_msg_max() && !rlist_empty(&stopped_connections)) {
> +		struct iproto_connection *con =
> +			rlist_first_entry(&stopped_connections,
> +					  struct iproto_connection,
> +					  in_stop_list);
> +		iproto_connection_resume(con);
> +	}

By the look of it, resume() resumes all stopped connections. 

This is not the case, however, as long at iproto_connection_stop() is made
*after* reading input, so most likely with a request already
sitting in the input buffer. The tricky relationship between
iproto_resume() and the timing of calling iproto_connection_stop() 
requires an explanation. The comment needs a brush-up as well.

>  static void
> @@ -657,16 +696,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
>  		(struct iproto_connection *) watcher->data;
>  	int fd = con->input.fd;
>  	assert(fd >= 0);
> -	if (! rlist_empty(&con->in_stop_list)) {
> -		/* Resumed stopped connection. */
> -		rlist_del(&con->in_stop_list);
> -		/*
> -		 * This connection may have no input, so
> -		 * resume one more connection which might have
> -		 * input.
> -		 */
> -		iproto_resume();
> -	}
> +	assert(rlist_empty(&con->in_stop_list));

Nice.

>  	/*
>  	 * Throttle if there are too many pending requests,
>  	 * otherwise we might deplete the fiber pool in tx
> @@ -703,7 +733,8 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
>  		in->wpos += nrd;
>  		con->parse_size += nrd;
>  		/* Enqueue all requests which are fully read up. */
> -		iproto_enqueue_batch(con, in);
> +		if (iproto_enqueue_batch(con, in) != 0)
> +			diag_raise();
>  	} catch (Exception *e) {
>  		/* Best effort at sending the error message to the client. */
>  		iproto_write_error(fd, e, ::schema_version, 0);
> @@ -801,7 +832,11 @@ static struct iproto_connection *
>  iproto_connection_new(int fd)
>  {
>  	struct iproto_connection *con = (struct iproto_connection *)
> -		mempool_alloc_xc(&iproto_connection_pool);
> +		mempool_alloc(&iproto_connection_pool);
> +	if (con == NULL) {
> +		diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con");
> +		return NULL;
> +	}
>  	con->input.data = con->output.data = con;
>  	con->loop = loop();
>  	ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
> @@ -1471,7 +1506,8 @@ net_end_join(struct cmsg *m)
>  	 * Enqueue any messages if they are in the readahead
>  	 * queue. Will simply start input otherwise.
>  	 */
> -	iproto_enqueue_batch(con, msg->p_ibuf);
> +	if (iproto_enqueue_batch(con, msg->p_ibuf) != 0)
> +		iproto_connection_close(con);
>  }
>  
>  static void
> @@ -1574,20 +1610,29 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
>  {
>  	(void) addr;
>  	(void) addrlen;
> -	struct iproto_connection *con;
> -
> -	con = iproto_connection_new(fd);
> +	struct iproto_msg *msg;
> +	struct iproto_connection *con = iproto_connection_new(fd);
> +	if (con == NULL)
> +		goto error_conn;
>  	/*
>  	 * Ignore msg allocation failure - the queue size is
>  	 * fixed so there is a limited number of msgs in
>  	 * use, all stored in just a few blocks of the memory pool.
>  	 */
> -	struct iproto_msg *msg = iproto_msg_new(con);
> +	msg = iproto_msg_new(con);
> +	if (msg == NULL)
> +		goto error_msg;
>  	cmsg_init(&msg->base, connect_route);
>  	msg->p_ibuf = con->p_ibuf;
>  	msg->wpos = con->wpos;
>  	msg->close_connection = false;
>  	cpipe_push(&tx_pipe, &msg->base);
> +	return;
> +error_msg:
> +	mempool_free(&iproto_connection_pool, con);
> +error_conn:
> +	close(fd);
> +	return;
>  }
>  
>  static struct evio_service binary; /* iproto binary listener */
> diff --git a/test/box/errinj.result b/test/box/errinj.result
> index 5b4bc23a3..5da3e2642 100644
> --- a/test/box/errinj.result
> +++ b/test/box/errinj.result
> @@ -1150,6 +1150,75 @@ cn:close()
>  s:drop()
>  ---
>  ...
> +--
> +-- In messages memory pool is over, stop the connection, until the
> +-- pool has free memory.
> +--
> +started = 0
> +---
> +...
> +finished = 0
> +---
> +...
> +continue = false
> +---
> +...
> +test_run:cmd('setopt delimiter ";"')
> +---
> +- true
> +...
> +function long_poll_f()
> +    started = started + 1
> +    f = fiber.self()
> +    while not continue do fiber.sleep(0.01) end
> +    finished = finished + 1
> +end;
> +---
> +...
> +test_run:cmd('setopt delimiter ""');
> +---
> +- true
> +...
> +cn = net_box.connect(box.cfg.listen)
> +---
> +...
> +function long_poll() cn:call('long_poll_f') end
> +---
> +...
> +_ = fiber.create(long_poll)
> +---
> +...
> +while started ~= 1 do fiber.sleep(0.01) end
> +---
> +...
> +-- Simulate OOM for new requests.
> +errinj.set("ERRINJ_TESTING", true)
> +---
> +- ok
> +...
> +_ = fiber.create(long_poll)
> +---
> +...
> +fiber.sleep(0.1)

Please avoid 0.1 sleeps, entirely. If you're waiting for anything,
please wait in a loop with wait timeout set 0.01 seconds.
> +---
> +...
> +started == 1
> +---
> +- true
> +...
> +continue = true
> +---
> +...
> +errinj.set("ERRINJ_TESTING", false)
> +---
> +- ok
> +...
> +while finished ~= 2 do fiber.sleep(0.01) end

I don't understand how exactly you're testing that the connection
is stopped when the pool is exhausted.

It seems you simply wait for your long-poll fiber to block, then
resume it, then wait for it to finish.

> +---
> +...
> +cn:close()
> +---
> +...
>  box.schema.user.revoke('guest', 'read,write,execute','universe')
>  ---
>  ...
> diff --git a/test/box/errinj.test.lua b/test/box/errinj.test.lua
> index e1460d1b6..365d535c2 100644
> --- a/test/box/errinj.test.lua
> +++ b/test/box/errinj.test.lua
> @@ -390,6 +390,35 @@ ok, err
>  cn:close()
>  s:drop()
>  
> +--
> +-- In messages memory pool is over, stop the connection, until the
> +-- pool has free memory.
> +--
> +started = 0
> +finished = 0
> +continue = false
> +test_run:cmd('setopt delimiter ";"')
> +function long_poll_f()
> +    started = started + 1
> +    f = fiber.self()
> +    while not continue do fiber.sleep(0.01) end
> +    finished = finished + 1
> +end;
> +test_run:cmd('setopt delimiter ""');
> +cn = net_box.connect(box.cfg.listen)
> +function long_poll() cn:call('long_poll_f') end
> +_ = fiber.create(long_poll)
> +while started ~= 1 do fiber.sleep(0.01) end
> +-- Simulate OOM for new requests.
> +errinj.set("ERRINJ_TESTING", true)
> +_ = fiber.create(long_poll)
> +fiber.sleep(0.1)
> +started == 1
> +continue = true
> +errinj.set("ERRINJ_TESTING", false)
> +while finished ~= 2 do fiber.sleep(0.01) end
> +cn:close()
> +
>  box.schema.user.revoke('guest', 'read,write,execute','universe')
>  
>  --
> diff --git a/test/box/net_msg_max.result b/test/box/net_msg_max.result
> index dde2016b7..de22bcbb9 100644
> --- a/test/box/net_msg_max.result
> +++ b/test/box/net_msg_max.result
> @@ -70,10 +70,13 @@ end;
>  ...
>  -- Wait until 'active' stops growing - it means, that the input
>  -- is blocked.
> +-- No more messages.
>  function wait_active(value)
>  	while value ~= active do
>  		fiber.sleep(0.01)
>  	end
> +	fiber.sleep(0.01)
> +	assert(value == active)
>  end;
>  ---
>  ...
> @@ -99,11 +102,23 @@ run_workers(conn2)
>  wait_active(run_max * 2)
>  ---
>  ...
> -active == run_max * 2 or active
> +wait_finished(active)
>  ---
> -- true
>  ...
> -wait_finished(active)
> +--
> +-- Test that each message in a batch is checked. When a limit is
> +-- reached, other messages must be processed later.
> +--
> +run_max = limit * 5
> +---
> +...
> +run_workers(conn)
> +---
> +...
> +wait_active(limit + 1)
> +---
> +...
> +wait_finished(run_max)
>  ---
>  ...
>  conn2:close()
> diff --git a/test/box/net_msg_max.test.lua b/test/box/net_msg_max.test.lua
> index 560e37017..39f8f53f7 100644
> --- a/test/box/net_msg_max.test.lua
> +++ b/test/box/net_msg_max.test.lua
> @@ -44,6 +44,9 @@ function wait_active(value)
>  	while value ~= active do
>  		fiber.sleep(0.01)
>  	end
> +	fiber.sleep(0.01)
> +-- No more messages.
> +	assert(value == active)
>  end;
>  
>  function wait_finished(needed)
> @@ -58,9 +61,17 @@ test_run:cmd("setopt delimiter ''");
>  run_workers(conn)
>  run_workers(conn2)
>  wait_active(run_max * 2)
> -active == run_max * 2 or active
>  wait_finished(active)
>  
> +--
> +-- Test that each message in a batch is checked. When a limit is
> +-- reached, other messages must be processed later.
> +--
> +run_max = limit * 5
> +run_workers(conn)
> +wait_active(limit + 1)
> +wait_finished(run_max)
> +
>  conn2:close()
>  conn:close()

Thank you for working on this patch,

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

  reply	other threads:[~2018-05-04  8:26 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-05-03 21:05 [tarantool-patches] [PATCH v2 0/2] IProto fix and net_msg_max option Vladislav Shpilevoy
2018-05-03 21:05 ` [tarantool-patches] [PATCH v2 1/2] iproto: fix error with unstoppable batching Vladislav Shpilevoy
2018-05-04  8:26   ` Konstantin Osipov [this message]
2018-05-04 11:56     ` [tarantool-patches] " Vladislav Shpilevoy
2018-05-03 21:05 ` [tarantool-patches] [PATCH v2 2/2] iproto: allow to configure IPROTO_MSG_MAX Vladislav Shpilevoy
2018-05-04  8:46   ` [tarantool-patches] " Konstantin Osipov
2018-05-04 11:56     ` Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20180504082607.GA5623@atlas \
    --to=kostja@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='[tarantool-patches] Re: [PATCH v2 1/2] iproto: fix error with unstoppable batching' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox