* Re: [tarantool-patches] [PATCH] evio: fix timeout calculations
2019-03-14 15:16 [PATCH] evio: fix timeout calculations Serge Petrenko
@ 2019-03-14 15:57 ` Serge Petrenko
2019-03-14 16:43 ` Vladimir Davydov
0 siblings, 1 reply; 3+ messages in thread
From: Serge Petrenko @ 2019-03-14 15:57 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
> 14 марта 2019 г., в 18:16, Serge Petrenko <sergepetrenko@tarantool.org> написал(а):
>
> The function evio_timeout_update() failed to update the starting time
> point, which lead to timeouts happening much faster than they should if
> there were consecutive calls to the function.
> This lead, for example, to applier timing out while reading a several
> megabyte-size row in 0.2 seconds even if replication_timeout was set to
> 15 seconds.
>
> Closes #4042
> ---
> https://github.com/tarantool/tarantool/tree/sp/gh-4042-applier-timeout
> https://github.com/tarantool/tarantool/issues/4042
>
> src/box/xrow_io.cc | 4 +-
> src/lib/core/coio.cc | 18 ++--
> src/lib/core/coio.h | 2 +-
> src/lib/core/evio.h | 5 +-
> test/replication/long_row_timeout.result | 98 ++++++++++++++++++++++
> test/replication/long_row_timeout.test.lua | 43 ++++++++++
> test/replication/replica_big.lua | 12 +++
> test/replication/suite.cfg | 1 +
> 8 files changed, 169 insertions(+), 14 deletions(-)
> create mode 100644 test/replication/long_row_timeout.result
> create mode 100644 test/replication/long_row_timeout.test.lua
> create mode 100644 test/replication/replica_big.lua
>
> diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
> index 4b6e68bc1..48707982b 100644
> --- a/src/box/xrow_io.cc
> +++ b/src/box/xrow_io.cc
> @@ -71,7 +71,7 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
> /* Read fixed header */
> if (ibuf_used(in) < 1)
> coio_breadn_timeout(coio, in, 1, delay);
> - coio_timeout_update(start, &delay);
> + coio_timeout_update(&start, &delay);
>
> /* Read length */
> if (mp_typeof(*in->rpos) != MP_UINT) {
> @@ -81,7 +81,7 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
> ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
> if (to_read > 0)
> coio_breadn_timeout(coio, in, to_read, delay);
> - coio_timeout_update(start, &delay);
> + coio_timeout_update(&start, &delay);
>
> uint32_t len = mp_decode_uint((const char **) &in->rpos);
>
> diff --git a/src/lib/core/coio.cc b/src/lib/core/coio.cc
> index 6b94cf2a6..e88d724d5 100644
> --- a/src/lib/core/coio.cc
> +++ b/src/lib/core/coio.cc
> @@ -210,7 +210,7 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
> if (!uri->host_hint) freeaddrinfo(ai);
> else free(ai_local.ai_addr);
> });
> - evio_timeout_update(loop(), start, &delay);
> + evio_timeout_update(loop(), &start, &delay);
>
> coio_timeout_init(&start, &delay, timeout);
> assert(! evio_has_fd(coio));
> @@ -232,7 +232,7 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
> }
> ai = ai->ai_next;
> ev_now_update(loop);
> - coio_timeout_update(start, &delay);
> + coio_timeout_update(&start, &delay);
> }
>
> tnt_raise(SocketError, sio_socketname(coio->fd), "connection failed");
> @@ -278,7 +278,7 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
> fiber_testcancel();
> if (is_timedout)
> tnt_raise(TimedOut);
> - coio_timeout_update(start, &delay);
> + coio_timeout_update(&start, &delay);
> }
> }
>
> @@ -338,7 +338,7 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
> fiber_testcancel();
> if (is_timedout)
> tnt_raise(TimedOut);
> - coio_timeout_update(start, &delay);
> + coio_timeout_update(&start, &delay);
> }
> }
>
> @@ -433,7 +433,7 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz,
>
> if (is_timedout)
> tnt_raise(TimedOut);
> - coio_timeout_update(start, &delay);
> + coio_timeout_update(&start, &delay);
> }
> }
>
> @@ -499,7 +499,7 @@ coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
>
> if (is_timedout)
> tnt_raise(TimedOut);
> - coio_timeout_update(start, &delay);
> + coio_timeout_update(&start, &delay);
> }
> return total;
> }
> @@ -545,7 +545,7 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags,
> fiber_testcancel();
> if (is_timedout)
> tnt_raise(TimedOut);
> - coio_timeout_update(start, &delay);
> + coio_timeout_update(&start, &delay);
> }
> }
>
> @@ -590,7 +590,7 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags,
> fiber_testcancel();
> if (is_timedout)
> tnt_raise(TimedOut);
> - coio_timeout_update(start, &delay);
> + coio_timeout_update(&start, &delay);
> }
> }
>
> @@ -759,7 +759,7 @@ coio_write_fd_timeout(int fd, const void *data, size_t size, ev_tstamp timeout)
> }
> if (errno == EAGAIN || errno == EWOULDBLOCK) {
> coio_wait(fd, COIO_WRITE, delay);
> - evio_timeout_update(loop, start, &delay);
> + evio_timeout_update(loop, &start, &delay);
> } else if (errno != EINTR) {
> diag_set(SocketError, sio_socketname(fd), "write");
> return -1;
> diff --git a/src/lib/core/coio.h b/src/lib/core/coio.h
> index 104abf720..6a2337689 100644
> --- a/src/lib/core/coio.h
> +++ b/src/lib/core/coio.h
> @@ -88,7 +88,7 @@ coio_timeout_init(ev_tstamp *start, ev_tstamp *delay,
> }
>
> static inline void
> -coio_timeout_update(ev_tstamp start, ev_tstamp *delay)
> +coio_timeout_update(ev_tstamp *start, ev_tstamp *delay)
> {
> return evio_timeout_update(loop(), start, delay);
> }
> diff --git a/src/lib/core/evio.h b/src/lib/core/evio.h
> index bd48235bc..c86be9e6a 100644
> --- a/src/lib/core/evio.h
> +++ b/src/lib/core/evio.h
> @@ -148,9 +148,10 @@ evio_timeout_init(ev_loop *loop, ev_tstamp *start, ev_tstamp *delay,
> }
>
> static inline void
> -evio_timeout_update(ev_loop *loop, ev_tstamp start, ev_tstamp *delay)
> +evio_timeout_update(ev_loop *loop, ev_tstamp *start, ev_tstamp *delay)
> {
> - ev_tstamp elapsed = ev_monotonic_now(loop) - start;
> + ev_tstamp elapsed = ev_monotonic_now(loop) - *start;
> + *start += elapsed;
> *delay = (elapsed >= *delay) ? 0 : *delay - elapsed;
> }
>
> diff --git a/test/replication/long_row_timeout.result b/test/replication/long_row_timeout.result
> new file mode 100644
> index 000000000..690fdcbb4
> --- /dev/null
> +++ b/test/replication/long_row_timeout.result
> @@ -0,0 +1,98 @@
> +fiber = require('fiber')
> +---
> +...
> +digest = require('digest')
> +---
> +...
> +test_run = require('test_run').new()
> +---
> +...
> +--
> +-- gh-4042 applier read times out too fast when reading large rows.
> +--
> +box.schema.user.grant('guest', 'replication')
> +---
> +...
> +test_run:cmd('create server replica with rpl_master=default, script="replication/replica_big.lua"')
> +---
> +- true
> +...
> +test_run:cmd('start server replica')
> +---
> +- true
> +...
> +box.info.replication[2].downstream.status
> +---
> +- follow
> +...
> +tup_sz = box.cfg.memtx_max_tuple_size
> +---
> +...
> +box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024, memtx_memory = 1024 * 1024 * 1024}
> +---
> +...
> +-- insert some big rows which cannot be read in one go, so applier yields
> +-- on read a couple of times.
> +s = box.schema.space.create('test')
> +---
> +...
> +_ = s:create_index('pk')
> +---
> +...
> +for i = 1,5 do box.space.test:insert{i, require('digest').urandom(20 * 1024 * 1024)} end
> +---
> +...
> +-- replication_disconnect_timeout is 4 * replication_timeout, check that
> +-- replica doesn't time out too early.
> +test_run:cmd('setopt delimiter ";"')
> +---
> +- true
> +...
> +ok = true;
> +---
> +...
> +start = fiber.time();
> +---
> +...
> +while fiber.time() - start < 3 * box.cfg.replication_timeout do
> + if box.info.replication[2].downstream.status ~= 'follow' then
> + ok = false
> + break
> + end
> + fiber.sleep(0.001)
> +end;
> +---
> +...
> +test_run:cmd('setopt delimiter ""');
> +---
> +- true
> +...
> +ok
> +---
> +- true
> +...
> +s:drop()
> +---
> +...
> +test_run:cmd('stop server replica')
> +---
> +- true
> +...
> +test_run:cmd('cleanup server replica')
> +---
> +- true
> +...
> +test_run:cmd('delete server replica')
> +---
> +- true
> +...
> +test_run:cleanup_cluster()
> +---
> +...
> +-- cannot decrease memtx_memory to its original value.
> +box.cfg{memtx_max_tuple_size = tup_sz}
> +---
> +...
> +box.schema.user.revoke('guest', 'replication')
> +---
> +...
> diff --git a/test/replication/long_row_timeout.test.lua b/test/replication/long_row_timeout.test.lua
> new file mode 100644
> index 000000000..21a522018
> --- /dev/null
> +++ b/test/replication/long_row_timeout.test.lua
> @@ -0,0 +1,43 @@
> +fiber = require('fiber')
> +digest = require('digest')
> +test_run = require('test_run').new()
> +
> +--
> +-- gh-4042 applier read times out too fast when reading large rows.
> +--
> +box.schema.user.grant('guest', 'replication')
> +test_run:cmd('create server replica with rpl_master=default, script="replication/replica_big.lua"')
> +test_run:cmd('start server replica')
> +box.info.replication[2].downstream.status
> +tup_sz = box.cfg.memtx_max_tuple_size
> +box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024, memtx_memory = 1024 * 1024 * 1024}
> +
> +-- insert some big rows which cannot be read in one go, so applier yields
> +-- on read a couple of times.
> +s = box.schema.space.create('test')
> +_ = s:create_index('pk')
> +for i = 1,5 do box.space.test:insert{i, require('digest').urandom(20 * 1024 * 1024)} end
> +-- replication_disconnect_timeout is 4 * replication_timeout, check that
> +-- replica doesn't time out too early.
> +test_run:cmd('setopt delimiter ";"')
> +ok = true;
> +start = fiber.time();
> +while fiber.time() - start < 3 * box.cfg.replication_timeout do
> + if box.info.replication[2].downstream.status ~= 'follow' then
> + ok = false
> + break
> + end
> + fiber.sleep(0.001)
> +end;
> +test_run:cmd('setopt delimiter ""');
> +
> +ok
> +
> +s:drop()
> +test_run:cmd('stop server replica')
> +test_run:cmd('cleanup server replica')
> +test_run:cmd('delete server replica')
> +test_run:cleanup_cluster()
> +-- cannot decrease memtx_memory to its original value.
> +box.cfg{memtx_max_tuple_size = tup_sz}
> +box.schema.user.revoke('guest', 'replication')
> diff --git a/test/replication/replica_big.lua b/test/replication/replica_big.lua
> new file mode 100644
> index 000000000..b768420fd
> --- /dev/null
> +++ b/test/replication/replica_big.lua
> @@ -0,0 +1,12 @@
> +#!/usr/bin/env tarantool
> +
> +box.cfg({
> + listen = os.getenv("LISTEN"),
> + replication = os.getenv("MASTER"),
> + memtx_memory = 1024 * 1024 * 1024,
> + memtx_max_tuple_size = 21 * 1024 * 1024,
> + replication_timeout = 0.1,
> + replication_connect_timeout = 0.5,
> +})
> +
> +require('console').listen(os.getenv('ADMIN'))
> diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
> index 5e8809731..91e884ece 100644
> --- a/test/replication/suite.cfg
> +++ b/test/replication/suite.cfg
> @@ -9,6 +9,7 @@
> "wal_rw_stress.test.lua": {},
> "force_recovery.test.lua": {},
> "on_schema_init.test.lua": {},
> + "long_row_timeout.test.lua": {},
> "*": {
> "memtx": {"engine": "memtx"},
> "vinyl": {"engine": "vinyl"}
> --
> 2.17.2 (Apple Git-113)
>
>
Rewrote the test a little to use less memtx_memory in main testing instance.
Here’s the diff.
diff --git a/test/replication/long_row_timeout.result b/test/replication/long_row_timeout.result
index 690fdcbb4..3f93f1913 100644
--- a/test/replication/long_row_timeout.result
+++ b/test/replication/long_row_timeout.result
@@ -28,7 +28,7 @@ box.info.replication[2].downstream.status
tup_sz = box.cfg.memtx_max_tuple_size
---
...
-box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024, memtx_memory = 1024 * 1024 * 1024}
+box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024}
---
...
-- insert some big rows which cannot be read in one go, so applier yields
@@ -39,7 +39,7 @@ s = box.schema.space.create('test')
_ = s:create_index('pk')
---
...
-for i = 1,5 do box.space.test:insert{i, require('digest').urandom(20 * 1024 * 1024)} end
+for i = 1,5 do box.space.test:replace{1, digest.urandom(20 * 1024 * 1024)} collectgarbage('collect') end
---
...
-- replication_disconnect_timeout is 4 * replication_timeout, check that
@@ -89,7 +89,6 @@ test_run:cmd('delete server replica')
test_run:cleanup_cluster()
---
...
--- cannot decrease memtx_memory to its original value.
box.cfg{memtx_max_tuple_size = tup_sz}
---
...
diff --git a/test/replication/long_row_timeout.test.lua b/test/replication/long_row_timeout.test.lua
index 21a522018..3993f1657 100644
--- a/test/replication/long_row_timeout.test.lua
+++ b/test/replication/long_row_timeout.test.lua
@@ -10,13 +10,13 @@ test_run:cmd('create server replica with rpl_master=default, script="replication
test_run:cmd('start server replica')
box.info.replication[2].downstream.status
tup_sz = box.cfg.memtx_max_tuple_size
-box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024, memtx_memory = 1024 * 1024 * 1024}
+box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024}
-- insert some big rows which cannot be read in one go, so applier yields
-- on read a couple of times.
s = box.schema.space.create('test')
_ = s:create_index('pk')
-for i = 1,5 do box.space.test:insert{i, require('digest').urandom(20 * 1024 * 1024)} end
+for i = 1,5 do box.space.test:replace{1, digest.urandom(20 * 1024 * 1024)} collectgarbage('collect') end
-- replication_disconnect_timeout is 4 * replication_timeout, check that
-- replica doesn't time out too early.
test_run:cmd('setopt delimiter ";"')
@@ -38,6 +38,5 @@ test_run:cmd('stop server replica')
test_run:cmd('cleanup server replica')
test_run:cmd('delete server replica')
test_run:cleanup_cluster()
--- cannot decrease memtx_memory to its original value.
box.cfg{memtx_max_tuple_size = tup_sz}
box.schema.user.revoke('guest', 'replication')
^ permalink raw reply [flat|nested] 3+ messages in thread