[PATCH] evio: fix timeout calculations

Serge Petrenko sergepetrenko at tarantool.org
Thu Mar 14 18:16:41 MSK 2019


The function evio_timeout_update() failed to update the starting time
point, which lead to timeouts happening much faster than they should if
there were consecutive calls to the function.
This lead, for example, to applier timing out while reading a several
megabyte-size row in 0.2 seconds even if replication_timeout was set to
15 seconds.

Closes #4042
---
https://github.com/tarantool/tarantool/tree/sp/gh-4042-applier-timeout
https://github.com/tarantool/tarantool/issues/4042

 src/box/xrow_io.cc                         |  4 +-
 src/lib/core/coio.cc                       | 18 ++--
 src/lib/core/coio.h                        |  2 +-
 src/lib/core/evio.h                        |  5 +-
 test/replication/long_row_timeout.result   | 98 ++++++++++++++++++++++
 test/replication/long_row_timeout.test.lua | 43 ++++++++++
 test/replication/replica_big.lua           | 12 +++
 test/replication/suite.cfg                 |  1 +
 8 files changed, 169 insertions(+), 14 deletions(-)
 create mode 100644 test/replication/long_row_timeout.result
 create mode 100644 test/replication/long_row_timeout.test.lua
 create mode 100644 test/replication/replica_big.lua

diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
index 4b6e68bc1..48707982b 100644
--- a/src/box/xrow_io.cc
+++ b/src/box/xrow_io.cc
@@ -71,7 +71,7 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
 	/* Read fixed header */
 	if (ibuf_used(in) < 1)
 		coio_breadn_timeout(coio, in, 1, delay);
-	coio_timeout_update(start, &delay);
+	coio_timeout_update(&start, &delay);
 
 	/* Read length */
 	if (mp_typeof(*in->rpos) != MP_UINT) {
@@ -81,7 +81,7 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
 	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
 	if (to_read > 0)
 		coio_breadn_timeout(coio, in, to_read, delay);
-	coio_timeout_update(start, &delay);
+	coio_timeout_update(&start, &delay);
 
 	uint32_t len = mp_decode_uint((const char **) &in->rpos);
 
diff --git a/src/lib/core/coio.cc b/src/lib/core/coio.cc
index 6b94cf2a6..e88d724d5 100644
--- a/src/lib/core/coio.cc
+++ b/src/lib/core/coio.cc
@@ -210,7 +210,7 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
 		if (!uri->host_hint) freeaddrinfo(ai);
 		else free(ai_local.ai_addr);
 	});
-	evio_timeout_update(loop(), start, &delay);
+	evio_timeout_update(loop(), &start, &delay);
 
 	coio_timeout_init(&start, &delay, timeout);
 	assert(! evio_has_fd(coio));
@@ -232,7 +232,7 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
 		}
 		ai = ai->ai_next;
 		ev_now_update(loop);
-		coio_timeout_update(start, &delay);
+		coio_timeout_update(&start, &delay);
 	}
 
 	tnt_raise(SocketError, sio_socketname(coio->fd), "connection failed");
@@ -278,7 +278,7 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
 		fiber_testcancel();
 		if (is_timedout)
 			tnt_raise(TimedOut);
-		coio_timeout_update(start, &delay);
+		coio_timeout_update(&start, &delay);
 	}
 }
 
@@ -338,7 +338,7 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
 		fiber_testcancel();
 		if (is_timedout)
 			tnt_raise(TimedOut);
-		coio_timeout_update(start, &delay);
+		coio_timeout_update(&start, &delay);
 	}
 }
 
@@ -433,7 +433,7 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz,
 
 		if (is_timedout)
 			tnt_raise(TimedOut);
-		coio_timeout_update(start, &delay);
+		coio_timeout_update(&start, &delay);
 	}
 }
 
@@ -499,7 +499,7 @@ coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
 
 		if (is_timedout)
 			tnt_raise(TimedOut);
-		coio_timeout_update(start, &delay);
+		coio_timeout_update(&start, &delay);
 	}
 	return total;
 }
@@ -545,7 +545,7 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags,
 		fiber_testcancel();
 		if (is_timedout)
 			tnt_raise(TimedOut);
-		coio_timeout_update(start, &delay);
+		coio_timeout_update(&start, &delay);
 	}
 }
 
@@ -590,7 +590,7 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags,
 		fiber_testcancel();
 		if (is_timedout)
 			tnt_raise(TimedOut);
-		coio_timeout_update(start, &delay);
+		coio_timeout_update(&start, &delay);
 	}
 }
 
@@ -759,7 +759,7 @@ coio_write_fd_timeout(int fd, const void *data, size_t size, ev_tstamp timeout)
 		}
 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
 			coio_wait(fd, COIO_WRITE, delay);
-			evio_timeout_update(loop, start, &delay);
+			evio_timeout_update(loop, &start, &delay);
 		} else if (errno != EINTR) {
 			diag_set(SocketError, sio_socketname(fd), "write");
 			return -1;
diff --git a/src/lib/core/coio.h b/src/lib/core/coio.h
index 104abf720..6a2337689 100644
--- a/src/lib/core/coio.h
+++ b/src/lib/core/coio.h
@@ -88,7 +88,7 @@ coio_timeout_init(ev_tstamp *start, ev_tstamp *delay,
 }
 
 static inline void
-coio_timeout_update(ev_tstamp start, ev_tstamp *delay)
+coio_timeout_update(ev_tstamp *start, ev_tstamp *delay)
 {
 	return evio_timeout_update(loop(), start, delay);
 }
diff --git a/src/lib/core/evio.h b/src/lib/core/evio.h
index bd48235bc..c86be9e6a 100644
--- a/src/lib/core/evio.h
+++ b/src/lib/core/evio.h
@@ -148,9 +148,10 @@ evio_timeout_init(ev_loop *loop, ev_tstamp *start, ev_tstamp *delay,
 }
 
 static inline void
-evio_timeout_update(ev_loop *loop, ev_tstamp start, ev_tstamp *delay)
+evio_timeout_update(ev_loop *loop, ev_tstamp *start, ev_tstamp *delay)
 {
-	ev_tstamp elapsed = ev_monotonic_now(loop) - start;
+	ev_tstamp elapsed = ev_monotonic_now(loop) - *start;
+	*start += elapsed;
 	*delay = (elapsed >= *delay) ? 0 : *delay - elapsed;
 }
 
diff --git a/test/replication/long_row_timeout.result b/test/replication/long_row_timeout.result
new file mode 100644
index 000000000..690fdcbb4
--- /dev/null
+++ b/test/replication/long_row_timeout.result
@@ -0,0 +1,98 @@
+fiber = require('fiber')
+---
+...
+digest = require('digest')
+---
+...
+test_run = require('test_run').new()
+---
+...
+--
+-- gh-4042 applier read times out too fast when reading large rows.
+--
+box.schema.user.grant('guest', 'replication')
+---
+...
+test_run:cmd('create server replica with rpl_master=default, script="replication/replica_big.lua"')
+---
+- true
+...
+test_run:cmd('start server replica')
+---
+- true
+...
+box.info.replication[2].downstream.status
+---
+- follow
+...
+tup_sz = box.cfg.memtx_max_tuple_size
+---
+...
+box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024, memtx_memory = 1024 * 1024 * 1024}
+---
+...
+-- insert some big rows which cannot be read in one go, so applier yields
+-- on read a couple of times.
+s = box.schema.space.create('test')
+---
+...
+_ = s:create_index('pk')
+---
+...
+for i = 1,5 do box.space.test:insert{i, require('digest').urandom(20 * 1024 * 1024)} end
+---
+...
+-- replication_disconnect_timeout is 4 * replication_timeout, check that
+-- replica doesn't time out too early.
+test_run:cmd('setopt delimiter ";"')
+---
+- true
+...
+ok = true;
+---
+...
+start = fiber.time();
+---
+...
+while fiber.time() - start < 3 * box.cfg.replication_timeout do
+    if box.info.replication[2].downstream.status ~= 'follow' then
+        ok = false
+        break
+    end
+    fiber.sleep(0.001)
+end;
+---
+...
+test_run:cmd('setopt delimiter ""');
+---
+- true
+...
+ok
+---
+- true
+...
+s:drop()
+---
+...
+test_run:cmd('stop server replica')
+---
+- true
+...
+test_run:cmd('cleanup server replica')
+---
+- true
+...
+test_run:cmd('delete server replica')
+---
+- true
+...
+test_run:cleanup_cluster()
+---
+...
+-- cannot decrease memtx_memory to its original value.
+box.cfg{memtx_max_tuple_size = tup_sz}
+---
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
diff --git a/test/replication/long_row_timeout.test.lua b/test/replication/long_row_timeout.test.lua
new file mode 100644
index 000000000..21a522018
--- /dev/null
+++ b/test/replication/long_row_timeout.test.lua
@@ -0,0 +1,43 @@
+fiber = require('fiber')
+digest = require('digest')
+test_run = require('test_run').new()
+
+--
+-- gh-4042 applier read times out too fast when reading large rows.
+--
+box.schema.user.grant('guest', 'replication')
+test_run:cmd('create server replica with rpl_master=default, script="replication/replica_big.lua"')
+test_run:cmd('start server replica')
+box.info.replication[2].downstream.status
+tup_sz = box.cfg.memtx_max_tuple_size
+box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024, memtx_memory = 1024 * 1024 * 1024}
+
+-- insert some big rows which cannot be read in one go, so applier yields
+-- on read a couple of times.
+s = box.schema.space.create('test')
+_ = s:create_index('pk')
+for i = 1,5 do box.space.test:insert{i, require('digest').urandom(20 * 1024 * 1024)} end
+-- replication_disconnect_timeout is 4 * replication_timeout, check that
+-- replica doesn't time out too early.
+test_run:cmd('setopt delimiter ";"')
+ok = true;
+start = fiber.time();
+while fiber.time() - start < 3 * box.cfg.replication_timeout do
+    if box.info.replication[2].downstream.status ~= 'follow' then
+        ok = false
+        break
+    end
+    fiber.sleep(0.001)
+end;
+test_run:cmd('setopt delimiter ""');
+
+ok
+
+s:drop()
+test_run:cmd('stop server replica')
+test_run:cmd('cleanup server replica')
+test_run:cmd('delete server replica')
+test_run:cleanup_cluster()
+-- cannot decrease memtx_memory to its original value.
+box.cfg{memtx_max_tuple_size = tup_sz}
+box.schema.user.revoke('guest', 'replication')
diff --git a/test/replication/replica_big.lua b/test/replication/replica_big.lua
new file mode 100644
index 000000000..b768420fd
--- /dev/null
+++ b/test/replication/replica_big.lua
@@ -0,0 +1,12 @@
+#!/usr/bin/env tarantool
+
+box.cfg({
+    listen                      = os.getenv("LISTEN"),
+    replication                 = os.getenv("MASTER"),
+    memtx_memory                = 1024 * 1024 * 1024,
+    memtx_max_tuple_size        = 21 * 1024 * 1024,
+    replication_timeout         = 0.1,
+    replication_connect_timeout = 0.5,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 5e8809731..91e884ece 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -9,6 +9,7 @@
     "wal_rw_stress.test.lua": {},
     "force_recovery.test.lua": {},
     "on_schema_init.test.lua": {},
+    "long_row_timeout.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
-- 
2.17.2 (Apple Git-113)




More information about the Tarantool-patches mailing list