[Tarantool-patches] [PATCH] replication: stop pushing TimedOut error to the replica
Serge Petrenko
sergepetrenko at tarantool.org
Fri Jul 9 10:40:48 MSK 2021
Every error that happens during master processes a join or subscribe
request is sent to the replica for better diagnostics.
This could lead to the following situation with the TimedOut error:
it could be written on top of a half-written row and make the replica stop
replication with ER_INVALID_MSGPACK error. The error is unrecoverable
and the only way to resume replication after it happens is to reset
box.cfg.replication.
Here's what happened:
1) Replica is under heavy load, meaning it's event loop is occupied by
some fiber not yielding control to others.
2) applier and other fibers aren't scheduled while the event loop is
blocked. This means applier doesn't send heartbeat messages to the
master and doesn't read any data coming from the master.
3) The unread master's data piles up. First in replica's receive buffer, then
in master's send buffer.
4) Once master's send buffer is full, the corresponding socket stops
being writeable and the relay yields waiting for the socket to become
writeable again. The send buffer might contain a partially written
row by now.
5) Replication timeout happens on master, because it hasn't heard from
replica for a while. An exception is raised, and the exception is
pushed to the replica's socket. Now two situations are possible:
a) the socket becomes writeable by the time exception is raised.
In this case the exception is logged to the buffer right after
a partially written row. Once replica receives the half-written
row with an exception logged on top, it errors with
ER_INVALID_MSGPACK. Replication is broken.
b) the socket isn't writeable still (the most probable scenario)
The exception isn't logged to the socket and the connection is
closed. Replica eventually receives a partially-written row and
retries connection to the master normally.
In order to prevent case a) from happening, let's not push TimedOut
errors to the socket at all. They're the only errors that could be
raised while a row is being written, i.e. the only errors that could
lead to the situation described in 5a.
Closes #4040
---
https://github.com/tarantool/tarantool/issues/4040
https://github.com/tarantool/tarantool/compare/sp/gh-4040-invalid-msgpack
.../unreleased/gh-4040-invalid-msgpack.md | 4 +
src/box/applier.cc | 2 +
src/box/iproto.cc | 6 +
src/box/xrow.c | 9 +
src/lib/core/errinj.h | 3 +
test/box/errinj.result | 5 +-
test/replication/errinj.result | 12 +-
test/replication/errinj.test.lua | 8 +-
.../gh-4040-invalid-msgpack.result | 180 ++++++++++++++++++
.../gh-4040-invalid-msgpack.test.lua | 74 +++++++
test/replication/suite.cfg | 1 +
test/replication/suite.ini | 2 +-
12 files changed, 296 insertions(+), 10 deletions(-)
create mode 100644 changelogs/unreleased/gh-4040-invalid-msgpack.md
create mode 100644 test/replication/gh-4040-invalid-msgpack.result
create mode 100644 test/replication/gh-4040-invalid-msgpack.test.lua
diff --git a/changelogs/unreleased/gh-4040-invalid-msgpack.md b/changelogs/unreleased/gh-4040-invalid-msgpack.md
new file mode 100644
index 000000000..1a89fefae
--- /dev/null
+++ b/changelogs/unreleased/gh-4040-invalid-msgpack.md
@@ -0,0 +1,4 @@
+## bugfix/replication
+
+* Fix replication stopping occasionally with `ER_INVALID_MSGPACK` when replica
+ is under high load (gh-4040).
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 07fe7f5c7..a9b6cacbc 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -635,6 +635,8 @@ applier_read_tx_row(struct applier *applier, double timeout)
struct xrow_header *row = &tx_row->row;
+ ERROR_INJECT_YIELD(ERRINJ_APPLIER_READ_TX_ROW_DELAY);
+
coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
applier->lag = ev_now(loop()) - row->tm;
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index ac1cb6081..3ed641eea 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1840,6 +1840,12 @@ tx_process_replication(struct cmsg *m)
}
} catch (SocketError *e) {
return; /* don't write error response to prevent SIGPIPE */
+ } catch (TimedOut *e) {
+ /*
+ * In case of a timeout the error could come after a partially
+ * written row. Do not push it on top.
+ */
+ return;
} catch (Exception *e) {
iproto_write_error(con->input.fd, e, ::schema_version,
msg->header.sync);
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 16cb2484c..fae9861a5 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -43,6 +43,7 @@
#include "scramble.h"
#include "iproto_constants.h"
#include "mpstream/mpstream.h"
+#include "errinj.h"
static_assert(IPROTO_DATA < 0x7f && IPROTO_METADATA < 0x7f &&
IPROTO_SQL_INFO < 0x7f, "encoded IPROTO_BODY keys must fit into "\
@@ -549,6 +550,12 @@ iproto_write_error(int fd, const struct error *e, uint32_t schema_version,
size_t region_svp = region_used(region);
mpstream_iproto_encode_error(&stream, e);
+ struct errinj *inj = errinj(ERRINJ_IPROTO_WRITE_ERROR_LARGE,
+ ERRINJ_INT);
+ if (inj != NULL && inj->iparam > 0) {
+ char garbage[inj->iparam];
+ mpstream_encode_strn(&stream, garbage, inj->iparam);
+ }
mpstream_flush(&stream);
if (is_error)
goto cleanup;
@@ -564,6 +571,8 @@ iproto_write_error(int fd, const struct error *e, uint32_t schema_version,
schema_version, payload_size);
ssize_t unused;
+
+ ERROR_INJECT_YIELD(ERRINJ_IPROTO_WRITE_ERROR_DELAY);
unused = write(fd, header, sizeof(header));
unused = write(fd, payload, payload_size);
(void) unused;
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index 359174b16..0d8ec967d 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -152,6 +152,9 @@ struct errinj {
_(ERRINJ_STDIN_ISATTY, ERRINJ_INT, {.iparam = -1}) \
_(ERRINJ_SNAP_COMMIT_FAIL, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_IPROTO_SINGLE_THREAD_STAT, ERRINJ_INT, {.iparam = -1}) \
+ _(ERRINJ_IPROTO_WRITE_ERROR_DELAY, ERRINJ_BOOL, {.bparam = false})\
+ _(ERRINJ_IPROTO_WRITE_ERROR_LARGE, ERRINJ_INT, {.iparam = -1})\
+ _(ERRINJ_APPLIER_READ_TX_ROW_DELAY, ERRINJ_BOOL, {.bparam = false})\
ENUM0(errinj_id, ERRINJ_LIST);
extern struct errinj errinjs[];
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 43daf5f0f..e5193a3d3 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -43,7 +43,8 @@ end
...
evals
---
-- - ERRINJ_APPLIER_SLOW_ACK: false
+- - ERRINJ_APPLIER_READ_TX_ROW_DELAY: false
+ - ERRINJ_APPLIER_SLOW_ACK: false
- ERRINJ_AUTO_UPGRADE: false
- ERRINJ_BUILD_INDEX: -1
- ERRINJ_BUILD_INDEX_DELAY: false
@@ -59,6 +60,8 @@ evals
- ERRINJ_INDEX_RESERVE: false
- ERRINJ_IPROTO_SINGLE_THREAD_STAT: -1
- ERRINJ_IPROTO_TX_DELAY: false
+ - ERRINJ_IPROTO_WRITE_ERROR_DELAY: false
+ - ERRINJ_IPROTO_WRITE_ERROR_LARGE: -1
- ERRINJ_LOG_ROTATE: false
- ERRINJ_MEMTX_DELAY_GC: false
- ERRINJ_PORT_DUMP: false
diff --git a/test/replication/errinj.result b/test/replication/errinj.result
index f04a38c45..9d13f6aa7 100644
--- a/test/replication/errinj.result
+++ b/test/replication/errinj.result
@@ -275,8 +275,9 @@ test_run:cmd("switch replica")
fiber = require'fiber'
---
...
-while box.info.replication[1].upstream.message ~= 'timed out' do fiber.sleep(0.0001) end
+test_run:wait_upstream(1, {status='disconnected', message_re='unexpected EOF'})
---
+- true
...
test_run:cmd("switch default")
---
@@ -312,8 +313,9 @@ box.info.replication[1].upstream.lag < 1
- true
...
-- wait for ack timeout
-while box.info.replication[1].upstream.message ~= 'timed out' do fiber.sleep(0.0001) end
+test_run:wait_upstream(1, {status='disconnected', message_re='unexpected EOF'})
---
+- true
...
test_run:cmd("switch default")
---
@@ -450,8 +452,9 @@ test_run:cmd("switch replica_timeout")
-- due to infinite read timeout connection never breaks,
-- replica shows state 'follow' so old behaviour hangs
-- here in infinite loop.
-while box.info.replication[1].upstream.message ~= 'timed out' do fiber.sleep(0.0001) end
+test_run:wait_upstream(1, {status='disconnected', message_re='timed out'})
---
+- true
...
test_run:cmd("switch default")
---
@@ -530,8 +533,9 @@ test_run:cmd("switch replica_timeout")
fiber = require('fiber')
---
...
-while box.info.replication[1].upstream.message ~= 'timed out' do fiber.sleep(0.0001) end
+test_run:wait_upstream(1, {status='disconnected', message_re='timed out'})
---
+- true
...
test_run:cmd("stop server default")
---
diff --git a/test/replication/errinj.test.lua b/test/replication/errinj.test.lua
index 53637e248..19234ab35 100644
--- a/test/replication/errinj.test.lua
+++ b/test/replication/errinj.test.lua
@@ -118,7 +118,7 @@ box.cfg{replication_timeout = 0.0001}
test_run:cmd("start server replica")
test_run:cmd("switch replica")
fiber = require'fiber'
-while box.info.replication[1].upstream.message ~= 'timed out' do fiber.sleep(0.0001) end
+test_run:wait_upstream(1, {status='disconnected', message_re='unexpected EOF'})
test_run:cmd("switch default")
-- Disable heartbeat messages on the master so as not
@@ -132,7 +132,7 @@ box.info.replication[1].upstream.status
box.info.replication[1].upstream.lag > 0
box.info.replication[1].upstream.lag < 1
-- wait for ack timeout
-while box.info.replication[1].upstream.message ~= 'timed out' do fiber.sleep(0.0001) end
+test_run:wait_upstream(1, {status='disconnected', message_re='unexpected EOF'})
test_run:cmd("switch default")
errinj.set("ERRINJ_RELAY_REPORT_INTERVAL", 0)
@@ -188,7 +188,7 @@ test_run:cmd("switch replica_timeout")
-- due to infinite read timeout connection never breaks,
-- replica shows state 'follow' so old behaviour hangs
-- here in infinite loop.
-while box.info.replication[1].upstream.message ~= 'timed out' do fiber.sleep(0.0001) end
+test_run:wait_upstream(1, {status='disconnected', message_re='timed out'})
test_run:cmd("switch default")
test_run:cmd("stop server replica_timeout")
@@ -221,7 +221,7 @@ for i = 0, 9999 do box.space.test:replace({i, 4, 5, 'test'}) end
test_run:cmd("start server replica_timeout with args='0.00001 0.5'")
test_run:cmd("switch replica_timeout")
fiber = require('fiber')
-while box.info.replication[1].upstream.message ~= 'timed out' do fiber.sleep(0.0001) end
+test_run:wait_upstream(1, {status='disconnected', message_re='timed out'})
test_run:cmd("stop server default")
test_run:cmd("deploy server default")
diff --git a/test/replication/gh-4040-invalid-msgpack.result b/test/replication/gh-4040-invalid-msgpack.result
new file mode 100644
index 000000000..4348ebd90
--- /dev/null
+++ b/test/replication/gh-4040-invalid-msgpack.result
@@ -0,0 +1,180 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+--
+-- gh-4040. ER_INVALID_MSGPACK on a replica when master's relay times out after
+-- not being able to write a full row to the socket.
+--
+test_run:cmd('create server master with script="replication/master1.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('create server replica with rpl_master=master,\
+ script="replication/replica_timeout.lua"')
+ | ---
+ | - true
+ | ...
+
+test_run:cmd('start server master')
+ | ---
+ | - true
+ | ...
+test_run:switch('master')
+ | ---
+ | - true
+ | ...
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+box.cfg{replication_timeout=0.5}
+ | ---
+ | ...
+_ = box.schema.space.create('test')
+ | ---
+ | ...
+_ = box.space.test:create_index('pk')
+ | ---
+ | ...
+
+test_run:cmd('start server replica with args="1000"')
+ | ---
+ | - true
+ | ...
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+
+box.cfg{log_level=6}
+ | ---
+ | ...
+sign = box.info.signature
+ | ---
+ | ...
+box.error.injection.set('ERRINJ_APPLIER_READ_TX_ROW_DELAY', true)
+ | ---
+ | - ok
+ | ...
+test_run:switch('master')
+ | ---
+ | - true
+ | ...
+
+-- Find the send buffer size. Testing uses Unix domain sockets. Create such a
+-- socket and assume relay's socket has the same parameters.
+socket = require('socket')
+ | ---
+ | ...
+soc = socket('AF_UNIX', 'SOCK_STREAM', 0)
+ | ---
+ | ...
+bufsize = soc:getsockopt('SOL_SOCKET', 'SO_SNDBUF')
+ | ---
+ | ...
+require('log').info("SO_SNDBUF size is %d", bufsize)
+ | ---
+ | ...
+-- Master shouldn't try to write the error while the socket isn't writeable.
+box.error.injection.set('ERRINJ_IPROTO_WRITE_ERROR_DELAY', true)
+ | ---
+ | - ok
+ | ...
+box.error.injection.set('ERRINJ_IPROTO_WRITE_ERROR_LARGE', bufsize)
+ | ---
+ | - ok
+ | ...
+-- Generate enough data to fill the sendbuf.
+-- This will make the relay yield in the middle of writing a row waiting for the
+-- socket to become writeable.
+tbl = {1}
+ | ---
+ | ...
+filler = string.rep('b', 100)
+ | ---
+ | ...
+for i = 2, 0.7 * bufsize / 100 do\
+ tbl[i] = filler\
+end
+ | ---
+ | ...
+for i = 1,10 do\
+ tbl[1] = i\
+ box.space.test:replace(tbl)\
+end
+ | ---
+ | ...
+
+-- Wait for the timeout to happen. The relay's send buffer should full by now
+-- and contain a half-written row.
+test_run:wait_downstream(2, {status='stopped'})
+ | ---
+ | - true
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+-- Wait until replica starts receiving the data.
+-- This will make master's socket writeable again.
+box.error.injection.set('ERRINJ_APPLIER_READ_TX_ROW_DELAY', false)
+ | ---
+ | - ok
+ | ...
+test_run:wait_cond(function() return box.info.signature > sign end)
+ | ---
+ | - true
+ | ...
+
+test_run:switch('master')
+ | ---
+ | - true
+ | ...
+box.error.injection.set('ERRINJ_IPROTO_WRITE_ERROR_DELAY', false)
+ | ---
+ | - ok
+ | ...
+box.error.injection.set('ERRINJ_IPROTO_WRITE_ERROR_LARGE', -1)
+ | ---
+ | - ok
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+
+-- There shouldn't be any errors other than the connection reset.
+test_run:wait_upstream(1, {status='disconnected', message_re='unexpected EOF'})
+ | ---
+ | - true
+ | ...
+assert(test_run:grep_log('replica', 'ER_INVALID_MSGPACK') == nil)
+ | ---
+ | - true
+ | ...
+
+-- Cleanup.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('stop server master')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server master')
+ | ---
+ | - true
+ | ...
+
diff --git a/test/replication/gh-4040-invalid-msgpack.test.lua b/test/replication/gh-4040-invalid-msgpack.test.lua
new file mode 100644
index 000000000..0e51affac
--- /dev/null
+++ b/test/replication/gh-4040-invalid-msgpack.test.lua
@@ -0,0 +1,74 @@
+test_run = require('test_run').new()
+
+--
+-- gh-4040. ER_INVALID_MSGPACK on a replica when master's relay times out after
+-- not being able to write a full row to the socket.
+--
+test_run:cmd('create server master with script="replication/master1.lua"')
+test_run:cmd('create server replica with rpl_master=master,\
+ script="replication/replica_timeout.lua"')
+
+test_run:cmd('start server master')
+test_run:switch('master')
+box.schema.user.grant('guest', 'replication')
+box.cfg{replication_timeout=0.5}
+_ = box.schema.space.create('test')
+_ = box.space.test:create_index('pk')
+
+test_run:cmd('start server replica with args="1000"')
+test_run:switch('replica')
+
+box.cfg{log_level=6}
+sign = box.info.signature
+box.error.injection.set('ERRINJ_APPLIER_READ_TX_ROW_DELAY', true)
+test_run:switch('master')
+
+-- Find the send buffer size. Testing uses Unix domain sockets. Create such a
+-- socket and assume relay's socket has the same parameters.
+socket = require('socket')
+soc = socket('AF_UNIX', 'SOCK_STREAM', 0)
+bufsize = soc:getsockopt('SOL_SOCKET', 'SO_SNDBUF')
+require('log').info("SO_SNDBUF size is %d", bufsize)
+-- Master shouldn't try to write the error while the socket isn't writeable.
+box.error.injection.set('ERRINJ_IPROTO_WRITE_ERROR_DELAY', true)
+box.error.injection.set('ERRINJ_IPROTO_WRITE_ERROR_LARGE', bufsize)
+-- Generate enough data to fill the sendbuf.
+-- This will make the relay yield in the middle of writing a row waiting for the
+-- socket to become writeable.
+tbl = {1}
+filler = string.rep('b', 100)
+for i = 2, 0.7 * bufsize / 100 do\
+ tbl[i] = filler\
+end
+for i = 1,10 do\
+ tbl[1] = i\
+ box.space.test:replace(tbl)\
+end
+
+-- Wait for the timeout to happen. The relay's send buffer should full by now
+-- and contain a half-written row.
+test_run:wait_downstream(2, {status='stopped'})
+
+test_run:switch('replica')
+-- Wait until replica starts receiving the data.
+-- This will make master's socket writeable again.
+box.error.injection.set('ERRINJ_APPLIER_READ_TX_ROW_DELAY', false)
+test_run:wait_cond(function() return box.info.signature > sign end)
+
+test_run:switch('master')
+box.error.injection.set('ERRINJ_IPROTO_WRITE_ERROR_DELAY', false)
+box.error.injection.set('ERRINJ_IPROTO_WRITE_ERROR_LARGE', -1)
+
+test_run:switch('replica')
+
+-- There shouldn't be any errors other than the connection reset.
+test_run:wait_upstream(1, {status='disconnected', message_re='unexpected EOF'})
+assert(test_run:grep_log('replica', 'ER_INVALID_MSGPACK') == nil)
+
+-- Cleanup.
+test_run:switch('default')
+test_run:cmd('stop server replica')
+test_run:cmd('stop server master')
+test_run:cmd('delete server replica')
+test_run:cmd('delete server master')
+
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 69f2f3511..a51a2d51a 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -33,6 +33,7 @@
"on_schema_init.test.lua": {},
"long_row_timeout.test.lua": {},
"join_without_snap.test.lua": {},
+ "gh-4040-invalid-msgpack.test.lua": {},
"gh-4114-local-space-replication.test.lua": {},
"gh-4402-info-errno.test.lua": {},
"gh-4605-empty-password.test.lua": {},
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index 6ae041d12..18981996d 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
script = master.lua
description = tarantool/box, replication
disabled = consistent.test.lua
-release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua hang_on_synchro_fail.test.lua anon_register_gap.test.lua gh-5213-qsync-applier-order.test.lua gh-5213-qsync-applier-order-3.test.lua gh-6027-applier-error-show.test.lua gh-6032-promote-wal-write.test.lua gh-6057-qsync-confirm-async-no-wal.test.lua gh-5447-downstream-lag.test.lua
+release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua hang_on_synchro_fail.test.lua anon_register_gap.test.lua gh-5213-qsync-applier-order.test.lua gh-5213-qsync-applier-order-3.test.lua gh-6027-applier-error-show.test.lua gh-6032-promote-wal-write.test.lua gh-6057-qsync-confirm-async-no-wal.test.lua gh-5447-downstream-lag.test.lua gh-4040-invalid-msgpack.test.lua
config = suite.cfg
lua_libs = lua/fast_replica.lua lua/rlimit.lua
use_unix_sockets = True
--
2.30.1 (Apple Git-130)
More information about the Tarantool-patches
mailing list