From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Sat, 29 Dec 2018 13:57:04 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] Re: [PATCH 1/5] recovery: stop writing to xstream on system error Message-ID: <20181229105704.4oy2gn7hiyni5xvo@esperanza> References: <20181229090909.GD17043@chai> <20181229095052.2glj66mjb3teo73c@esperanza> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20181229095052.2glj66mjb3teo73c@esperanza> To: Konstantin Osipov Cc: tarantool-patches@freelists.org List-ID: On Sat, Dec 29, 2018 at 12:50:52PM +0300, Vladimir Davydov wrote: > On Sat, Dec 29, 2018 at 12:09:09PM +0300, Konstantin Osipov wrote: > > * Vladimir Davydov [18/12/29 10:00]: > > > > force_recovery as an option should only affect local recovery, not > > relays. Why is it set for relay xlog? > > For the record. Discussed f2f and agreed that it seems weird to set > force_recovery for relay threads, however, changing this behavior now > may break existing customers. So we should push this patch as is to > 1.10, but for 2.1 do not set force_recovery flag instead. Here's the alternative fix that will go to 2.1: https://github.com/tarantool/tarantool/commits/dv/gh-3910-fix-replication-crash-2.1 >From b641dd89d650b02af3f1adee3c3f0579893a1296 Mon Sep 17 00:00:00 2001 From: Vladimir Davydov Date: Sat, 29 Dec 2018 13:49:43 +0300 Subject: [PATCH] recovery: ignore box.cfg.force_recovery in relay threads In case force_recovery flag is set, recover_xlog() ignores any errors returned by xstream_write(), even SocketError or FiberIsCancelled. This may result in permanent replication breakdown as described in the next paragraph. Suppose there's a master and a replica and the master has force_recovery flag set. The replica gets stalled on WAL while applying a row fetched from the master. As a result, it stops sending ACKs. In the meantime, the master writes a lot of new rows to its WAL so that the relay thread sending changes to the replica fills up all the space available in the network buffer and blocks on the replication socket. Note, at this moment it may occur that a packet fragment has been written to the socket. The WAL delay on the replica takes long enough for replication to break on timeout: the relay reader fiber on the master doesn't receive an ACK from the replica in time and cancels the relay writer fiber. The relay writer fiber wakes up and returns to recover_xlog(), which happily continues to scan the xlog attempting to send more rows (force_recovery is set), failing, and complaining to the log. While the relay thread is still scanning the log, the replica finishes the long WAL write and reads more data from the socket, freeing up some space in the network buffer for the relay to write more rows. The relay thread, which happens to be still in recover_xlog(), writes a new row to the socket after the packet fragment it had written when it was cancelled, effectively corrupting the stream and breaking a replication with an unrecoverable error, e.g. xrow.c:99 E> ER_INVALID_MSGPACK: Invalid MsgPack - packet header Actually, taking into account force_recovery in relay threads looks dubious - after all this option was implemented to allow start of a tarantool instance when local data are corrupted, not to force replication from a corrupted data set. The latter is dangerous anyway - it's better to rebootstrap replicas in case of master data corruption. That being said, let's ignore force_recovery option in relay threads. It's difficult to write a test for this case, since too many conditions have to be satisfied simultaneously for the issue to occur. Injecting errors doesn't really help here and would look artificial, because it'd rely too much on the implementation. So I'm committing this one without a test case. Part of #3910 diff --git a/src/box/relay.cc b/src/box/relay.cc index a01c2a2e..a799f23d 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -302,8 +302,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, relay_delete(relay); }); - relay->r = recovery_new(cfg_gets("wal_dir"), - cfg_geti("force_recovery"), + relay->r = recovery_new(cfg_gets("wal_dir"), false, start_vclock); vclock_copy(&relay->stop_vclock, stop_vclock); @@ -610,8 +609,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, relay_start(relay, fd, sync, relay_send_row); vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock); - relay->r = recovery_new(cfg_gets("wal_dir"), - cfg_geti("force_recovery"), + relay->r = recovery_new(cfg_gets("wal_dir"), false, replica_clock); vclock_copy(&relay->tx.vclock, replica_clock); relay->version_id = replica_version_id; diff --git a/test/replication/force_recovery.result b/test/replication/force_recovery.result new file mode 100644 index 00000000..f5045285 --- /dev/null +++ b/test/replication/force_recovery.result @@ -0,0 +1,110 @@ +test_run = require('test_run').new() +--- +... +fio = require('fio') +--- +... +-- +-- Test that box.cfg.force_recovery is ignored by relay threads (gh-3910). +-- +_ = box.schema.space.create('test') +--- +... +_ = box.space.test:create_index('primary') +--- +... +box.schema.user.grant('guest', 'replication') +--- +... +-- Deploy a replica. +test_run:cmd("create server test with rpl_master=default, script='replication/replica.lua'") +--- +- true +... +test_run:cmd("start server test") +--- +- true +... +-- Stop the replica and wait for the relay thread to exit. +test_run:cmd("stop server test") +--- +- true +... +test_run:wait_cond(function() return box.info.replication[2].downstream.status == 'stopped' end, 10) +--- +- true +... +-- Delete an xlog file that is needed by the replica. +box.snapshot() +--- +- ok +... +xlog = fio.pathjoin(box.cfg.wal_dir, string.format('%020d.xlog', box.info.signature)) +--- +... +box.space.test:replace{1} +--- +- [1] +... +box.snapshot() +--- +- ok +... +box.space.test:replace{2} +--- +- [2] +... +fio.unlink(xlog) +--- +- true +... +-- Check that even though box.cfg.force_recovery is set, +-- replication will still fail due to LSN gap. +box.cfg{force_recovery = true} +--- +... +test_run:cmd("start server test") +--- +- true +... +test_run:cmd("switch test") +--- +- true +... +box.space.test:select() +--- +- [] +... +box.info.replication[1].upstream.status == 'stopped' or box.info +--- +- true +... +test_run:cmd("switch default") +--- +- true +... +box.cfg{force_recovery = false} +--- +... +-- Cleanup. +test_run:cmd("stop server test") +--- +- true +... +test_run:cmd("cleanup server test") +--- +- true +... +test_run:cmd("delete server test") +--- +- true +... +test_run:cleanup_cluster() +--- +... +box.schema.user.revoke('guest', 'replication') +--- +... +box.space.test:drop() +--- +... diff --git a/test/replication/force_recovery.test.lua b/test/replication/force_recovery.test.lua new file mode 100644 index 00000000..54307814 --- /dev/null +++ b/test/replication/force_recovery.test.lua @@ -0,0 +1,43 @@ +test_run = require('test_run').new() +fio = require('fio') + +-- +-- Test that box.cfg.force_recovery is ignored by relay threads (gh-3910). +-- +_ = box.schema.space.create('test') +_ = box.space.test:create_index('primary') +box.schema.user.grant('guest', 'replication') + +-- Deploy a replica. +test_run:cmd("create server test with rpl_master=default, script='replication/replica.lua'") +test_run:cmd("start server test") + +-- Stop the replica and wait for the relay thread to exit. +test_run:cmd("stop server test") +test_run:wait_cond(function() return box.info.replication[2].downstream.status == 'stopped' end, 10) + +-- Delete an xlog file that is needed by the replica. +box.snapshot() +xlog = fio.pathjoin(box.cfg.wal_dir, string.format('%020d.xlog', box.info.signature)) +box.space.test:replace{1} +box.snapshot() +box.space.test:replace{2} +fio.unlink(xlog) + +-- Check that even though box.cfg.force_recovery is set, +-- replication will still fail due to LSN gap. +box.cfg{force_recovery = true} +test_run:cmd("start server test") +test_run:cmd("switch test") +box.space.test:select() +box.info.replication[1].upstream.status == 'stopped' or box.info +test_run:cmd("switch default") +box.cfg{force_recovery = false} + +-- Cleanup. +test_run:cmd("stop server test") +test_run:cmd("cleanup server test") +test_run:cmd("delete server test") +test_run:cleanup_cluster() +box.schema.user.revoke('guest', 'replication') +box.space.test:drop() diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index 984d2e81..fc7c0c46 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -7,6 +7,7 @@ "hot_standby.test.lua": {}, "rebootstrap.test.lua": {}, "wal_rw_stress.test.lua": {}, + "force_recovery.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"}