[tarantool-patches] [PATCH] evio: fix timeout calculations

Serge Petrenko sergepetrenko at tarantool.org
Thu Mar 14 18:57:12 MSK 2019



> 14 марта 2019 г., в 18:16, Serge Petrenko <sergepetrenko at tarantool.org> написал(а):
> 
> The function evio_timeout_update() failed to update the starting time
> point, which lead to timeouts happening much faster than they should if
> there were consecutive calls to the function.
> This lead, for example, to applier timing out while reading a several
> megabyte-size row in 0.2 seconds even if replication_timeout was set to
> 15 seconds.
> 
> Closes #4042
> ---
> https://github.com/tarantool/tarantool/tree/sp/gh-4042-applier-timeout
> https://github.com/tarantool/tarantool/issues/4042
> 
> src/box/xrow_io.cc                         |  4 +-
> src/lib/core/coio.cc                       | 18 ++--
> src/lib/core/coio.h                        |  2 +-
> src/lib/core/evio.h                        |  5 +-
> test/replication/long_row_timeout.result   | 98 ++++++++++++++++++++++
> test/replication/long_row_timeout.test.lua | 43 ++++++++++
> test/replication/replica_big.lua           | 12 +++
> test/replication/suite.cfg                 |  1 +
> 8 files changed, 169 insertions(+), 14 deletions(-)
> create mode 100644 test/replication/long_row_timeout.result
> create mode 100644 test/replication/long_row_timeout.test.lua
> create mode 100644 test/replication/replica_big.lua
> 
> diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
> index 4b6e68bc1..48707982b 100644
> --- a/src/box/xrow_io.cc
> +++ b/src/box/xrow_io.cc
> @@ -71,7 +71,7 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
> 	/* Read fixed header */
> 	if (ibuf_used(in) < 1)
> 		coio_breadn_timeout(coio, in, 1, delay);
> -	coio_timeout_update(start, &delay);
> +	coio_timeout_update(&start, &delay);
> 
> 	/* Read length */
> 	if (mp_typeof(*in->rpos) != MP_UINT) {
> @@ -81,7 +81,7 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
> 	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
> 	if (to_read > 0)
> 		coio_breadn_timeout(coio, in, to_read, delay);
> -	coio_timeout_update(start, &delay);
> +	coio_timeout_update(&start, &delay);
> 
> 	uint32_t len = mp_decode_uint((const char **) &in->rpos);
> 
> diff --git a/src/lib/core/coio.cc b/src/lib/core/coio.cc
> index 6b94cf2a6..e88d724d5 100644
> --- a/src/lib/core/coio.cc
> +++ b/src/lib/core/coio.cc
> @@ -210,7 +210,7 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
> 		if (!uri->host_hint) freeaddrinfo(ai);
> 		else free(ai_local.ai_addr);
> 	});
> -	evio_timeout_update(loop(), start, &delay);
> +	evio_timeout_update(loop(), &start, &delay);
> 
> 	coio_timeout_init(&start, &delay, timeout);
> 	assert(! evio_has_fd(coio));
> @@ -232,7 +232,7 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
> 		}
> 		ai = ai->ai_next;
> 		ev_now_update(loop);
> -		coio_timeout_update(start, &delay);
> +		coio_timeout_update(&start, &delay);
> 	}
> 
> 	tnt_raise(SocketError, sio_socketname(coio->fd), "connection failed");
> @@ -278,7 +278,7 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
> 		fiber_testcancel();
> 		if (is_timedout)
> 			tnt_raise(TimedOut);
> -		coio_timeout_update(start, &delay);
> +		coio_timeout_update(&start, &delay);
> 	}
> }
> 
> @@ -338,7 +338,7 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
> 		fiber_testcancel();
> 		if (is_timedout)
> 			tnt_raise(TimedOut);
> -		coio_timeout_update(start, &delay);
> +		coio_timeout_update(&start, &delay);
> 	}
> }
> 
> @@ -433,7 +433,7 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz,
> 
> 		if (is_timedout)
> 			tnt_raise(TimedOut);
> -		coio_timeout_update(start, &delay);
> +		coio_timeout_update(&start, &delay);
> 	}
> }
> 
> @@ -499,7 +499,7 @@ coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
> 
> 		if (is_timedout)
> 			tnt_raise(TimedOut);
> -		coio_timeout_update(start, &delay);
> +		coio_timeout_update(&start, &delay);
> 	}
> 	return total;
> }
> @@ -545,7 +545,7 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags,
> 		fiber_testcancel();
> 		if (is_timedout)
> 			tnt_raise(TimedOut);
> -		coio_timeout_update(start, &delay);
> +		coio_timeout_update(&start, &delay);
> 	}
> }
> 
> @@ -590,7 +590,7 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags,
> 		fiber_testcancel();
> 		if (is_timedout)
> 			tnt_raise(TimedOut);
> -		coio_timeout_update(start, &delay);
> +		coio_timeout_update(&start, &delay);
> 	}
> }
> 
> @@ -759,7 +759,7 @@ coio_write_fd_timeout(int fd, const void *data, size_t size, ev_tstamp timeout)
> 		}
> 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
> 			coio_wait(fd, COIO_WRITE, delay);
> -			evio_timeout_update(loop, start, &delay);
> +			evio_timeout_update(loop, &start, &delay);
> 		} else if (errno != EINTR) {
> 			diag_set(SocketError, sio_socketname(fd), "write");
> 			return -1;
> diff --git a/src/lib/core/coio.h b/src/lib/core/coio.h
> index 104abf720..6a2337689 100644
> --- a/src/lib/core/coio.h
> +++ b/src/lib/core/coio.h
> @@ -88,7 +88,7 @@ coio_timeout_init(ev_tstamp *start, ev_tstamp *delay,
> }
> 
> static inline void
> -coio_timeout_update(ev_tstamp start, ev_tstamp *delay)
> +coio_timeout_update(ev_tstamp *start, ev_tstamp *delay)
> {
> 	return evio_timeout_update(loop(), start, delay);
> }
> diff --git a/src/lib/core/evio.h b/src/lib/core/evio.h
> index bd48235bc..c86be9e6a 100644
> --- a/src/lib/core/evio.h
> +++ b/src/lib/core/evio.h
> @@ -148,9 +148,10 @@ evio_timeout_init(ev_loop *loop, ev_tstamp *start, ev_tstamp *delay,
> }
> 
> static inline void
> -evio_timeout_update(ev_loop *loop, ev_tstamp start, ev_tstamp *delay)
> +evio_timeout_update(ev_loop *loop, ev_tstamp *start, ev_tstamp *delay)
> {
> -	ev_tstamp elapsed = ev_monotonic_now(loop) - start;
> +	ev_tstamp elapsed = ev_monotonic_now(loop) - *start;
> +	*start += elapsed;
> 	*delay = (elapsed >= *delay) ? 0 : *delay - elapsed;
> }
> 
> diff --git a/test/replication/long_row_timeout.result b/test/replication/long_row_timeout.result
> new file mode 100644
> index 000000000..690fdcbb4
> --- /dev/null
> +++ b/test/replication/long_row_timeout.result
> @@ -0,0 +1,98 @@
> +fiber = require('fiber')
> +---
> +...
> +digest = require('digest')
> +---
> +...
> +test_run = require('test_run').new()
> +---
> +...
> +--
> +-- gh-4042 applier read times out too fast when reading large rows.
> +--
> +box.schema.user.grant('guest', 'replication')
> +---
> +...
> +test_run:cmd('create server replica with rpl_master=default, script="replication/replica_big.lua"')
> +---
> +- true
> +...
> +test_run:cmd('start server replica')
> +---
> +- true
> +...
> +box.info.replication[2].downstream.status
> +---
> +- follow
> +...
> +tup_sz = box.cfg.memtx_max_tuple_size
> +---
> +...
> +box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024, memtx_memory = 1024 * 1024 * 1024}
> +---
> +...
> +-- insert some big rows which cannot be read in one go, so applier yields
> +-- on read a couple of times.
> +s = box.schema.space.create('test')
> +---
> +...
> +_ = s:create_index('pk')
> +---
> +...
> +for i = 1,5 do box.space.test:insert{i, require('digest').urandom(20 * 1024 * 1024)} end
> +---
> +...
> +-- replication_disconnect_timeout is 4 * replication_timeout, check that
> +-- replica doesn't time out too early.
> +test_run:cmd('setopt delimiter ";"')
> +---
> +- true
> +...
> +ok = true;
> +---
> +...
> +start = fiber.time();
> +---
> +...
> +while fiber.time() - start < 3 * box.cfg.replication_timeout do
> +    if box.info.replication[2].downstream.status ~= 'follow' then
> +        ok = false
> +        break
> +    end
> +    fiber.sleep(0.001)
> +end;
> +---
> +...
> +test_run:cmd('setopt delimiter ""');
> +---
> +- true
> +...
> +ok
> +---
> +- true
> +...
> +s:drop()
> +---
> +...
> +test_run:cmd('stop server replica')
> +---
> +- true
> +...
> +test_run:cmd('cleanup server replica')
> +---
> +- true
> +...
> +test_run:cmd('delete server replica')
> +---
> +- true
> +...
> +test_run:cleanup_cluster()
> +---
> +...
> +-- cannot decrease memtx_memory to its original value.
> +box.cfg{memtx_max_tuple_size = tup_sz}
> +---
> +...
> +box.schema.user.revoke('guest', 'replication')
> +---
> +...
> diff --git a/test/replication/long_row_timeout.test.lua b/test/replication/long_row_timeout.test.lua
> new file mode 100644
> index 000000000..21a522018
> --- /dev/null
> +++ b/test/replication/long_row_timeout.test.lua
> @@ -0,0 +1,43 @@
> +fiber = require('fiber')
> +digest = require('digest')
> +test_run = require('test_run').new()
> +
> +--
> +-- gh-4042 applier read times out too fast when reading large rows.
> +--
> +box.schema.user.grant('guest', 'replication')
> +test_run:cmd('create server replica with rpl_master=default, script="replication/replica_big.lua"')
> +test_run:cmd('start server replica')
> +box.info.replication[2].downstream.status
> +tup_sz = box.cfg.memtx_max_tuple_size
> +box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024, memtx_memory = 1024 * 1024 * 1024}
> +
> +-- insert some big rows which cannot be read in one go, so applier yields
> +-- on read a couple of times.
> +s = box.schema.space.create('test')
> +_ = s:create_index('pk')
> +for i = 1,5 do box.space.test:insert{i, require('digest').urandom(20 * 1024 * 1024)} end
> +-- replication_disconnect_timeout is 4 * replication_timeout, check that
> +-- replica doesn't time out too early.
> +test_run:cmd('setopt delimiter ";"')
> +ok = true;
> +start = fiber.time();
> +while fiber.time() - start < 3 * box.cfg.replication_timeout do
> +    if box.info.replication[2].downstream.status ~= 'follow' then
> +        ok = false
> +        break
> +    end
> +    fiber.sleep(0.001)
> +end;
> +test_run:cmd('setopt delimiter ""');
> +
> +ok
> +
> +s:drop()
> +test_run:cmd('stop server replica')
> +test_run:cmd('cleanup server replica')
> +test_run:cmd('delete server replica')
> +test_run:cleanup_cluster()
> +-- cannot decrease memtx_memory to its original value.
> +box.cfg{memtx_max_tuple_size = tup_sz}
> +box.schema.user.revoke('guest', 'replication')
> diff --git a/test/replication/replica_big.lua b/test/replication/replica_big.lua
> new file mode 100644
> index 000000000..b768420fd
> --- /dev/null
> +++ b/test/replication/replica_big.lua
> @@ -0,0 +1,12 @@
> +#!/usr/bin/env tarantool
> +
> +box.cfg({
> +    listen                      = os.getenv("LISTEN"),
> +    replication                 = os.getenv("MASTER"),
> +    memtx_memory                = 1024 * 1024 * 1024,
> +    memtx_max_tuple_size        = 21 * 1024 * 1024,
> +    replication_timeout         = 0.1,
> +    replication_connect_timeout = 0.5,
> +})
> +
> +require('console').listen(os.getenv('ADMIN'))
> diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
> index 5e8809731..91e884ece 100644
> --- a/test/replication/suite.cfg
> +++ b/test/replication/suite.cfg
> @@ -9,6 +9,7 @@
>     "wal_rw_stress.test.lua": {},
>     "force_recovery.test.lua": {},
>     "on_schema_init.test.lua": {},
> +    "long_row_timeout.test.lua": {},
>     "*": {
>         "memtx": {"engine": "memtx"},
>         "vinyl": {"engine": "vinyl"}
> -- 
> 2.17.2 (Apple Git-113)
> 
> 

Rewrote the test a little to use less memtx_memory in main testing instance.
Here’s the diff.

diff --git a/test/replication/long_row_timeout.result b/test/replication/long_row_timeout.result
index 690fdcbb4..3f93f1913 100644
--- a/test/replication/long_row_timeout.result
+++ b/test/replication/long_row_timeout.result
@@ -28,7 +28,7 @@ box.info.replication[2].downstream.status
 tup_sz = box.cfg.memtx_max_tuple_size
 ---
 ...
-box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024, memtx_memory = 1024 * 1024 * 1024}
+box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024}
 ---
 ...
 -- insert some big rows which cannot be read in one go, so applier yields
@@ -39,7 +39,7 @@ s = box.schema.space.create('test')
 _ = s:create_index('pk')
 ---
 ...
-for i = 1,5 do box.space.test:insert{i, require('digest').urandom(20 * 1024 * 1024)} end
+for i = 1,5 do box.space.test:replace{1, digest.urandom(20 * 1024 * 1024)} collectgarbage('collect') end
 ---
 ...
 -- replication_disconnect_timeout is 4 * replication_timeout, check that
@@ -89,7 +89,6 @@ test_run:cmd('delete server replica')
 test_run:cleanup_cluster()
 ---
 ...
--- cannot decrease memtx_memory to its original value.
 box.cfg{memtx_max_tuple_size = tup_sz}
 ---
 ...
diff --git a/test/replication/long_row_timeout.test.lua b/test/replication/long_row_timeout.test.lua
index 21a522018..3993f1657 100644
--- a/test/replication/long_row_timeout.test.lua
+++ b/test/replication/long_row_timeout.test.lua
@@ -10,13 +10,13 @@ test_run:cmd('create server replica with rpl_master=default, script="replication
 test_run:cmd('start server replica')
 box.info.replication[2].downstream.status
 tup_sz = box.cfg.memtx_max_tuple_size
-box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024, memtx_memory = 1024 * 1024 * 1024}
+box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024}
 
 -- insert some big rows which cannot be read in one go, so applier yields
 -- on read a couple of times.
 s = box.schema.space.create('test')
 _ = s:create_index('pk')
-for i = 1,5 do box.space.test:insert{i, require('digest').urandom(20 * 1024 * 1024)} end
+for i = 1,5 do box.space.test:replace{1, digest.urandom(20 * 1024 * 1024)} collectgarbage('collect') end
 -- replication_disconnect_timeout is 4 * replication_timeout, check that
 -- replica doesn't time out too early.
 test_run:cmd('setopt delimiter ";"')
@@ -38,6 +38,5 @@ test_run:cmd('stop server replica')
 test_run:cmd('cleanup server replica')
 test_run:cmd('delete server replica')
 test_run:cleanup_cluster()
--- cannot decrease memtx_memory to its original value.
 box.cfg{memtx_max_tuple_size = tup_sz}
 box.schema.user.revoke('guest', 'replication')


More information about the Tarantool-patches mailing list