* [PATCH 0/5] Fix a couple of replication breakdown issues @ 2018-12-28 21:21 Vladimir Davydov 2018-12-28 21:21 ` [PATCH 1/5] recovery: stop writing to xstream on system error Vladimir Davydov ` (5 more replies) 0 siblings, 6 replies; 13+ messages in thread From: Vladimir Davydov @ 2018-12-28 21:21 UTC (permalink / raw) To: tarantool-patches For details see comments to the individual patches and the issue description. https://github.com/tarantool/tarantool/issues/3910 https://github.com/tarantool/tarantool/commits/dv/gh-3910-fix-replication-crash Vladimir Davydov (5): recovery: stop writing to xstream on system error relay: do not try to scan xlog if exiting relay: cleanup error handling relay: close xlog cursor in relay thread xlog: assure xlog is opened and closed in the same thread src/box/recovery.cc | 18 +++++++-- src/box/relay.cc | 111 +++++++++++++++++++++++++++++++++++----------------- src/box/xlog.c | 4 ++ 3 files changed, 94 insertions(+), 39 deletions(-) -- 2.11.0 ^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH 1/5] recovery: stop writing to xstream on system error 2018-12-28 21:21 [PATCH 0/5] Fix a couple of replication breakdown issues Vladimir Davydov @ 2018-12-28 21:21 ` Vladimir Davydov 2018-12-29 9:09 ` [tarantool-patches] " Konstantin Osipov 2018-12-28 21:21 ` [PATCH 2/5] relay: do not try to scan xlog if exiting Vladimir Davydov ` (4 subsequent siblings) 5 siblings, 1 reply; 13+ messages in thread From: Vladimir Davydov @ 2018-12-28 21:21 UTC (permalink / raw) To: tarantool-patches In case force_recovery flag is set, recover_xlog() ignores any errors returned by xstream_write(), even SocketError or FiberIsCancelled. This may result in permanent replication breakdown as described in the next paragraph. Suppose there's a master and a replica and the master has force_recovery flag set. The replica gets stalled on WAL while applying a row fetched from the master. As a result, it stops sending ACKs. In the meantime, the master writes a lot of new rows to its WAL so that the relay thread sending changes to the replica fills up all the space available in the network buffer and blocks on the replication socket. Note, at this moment it may occur that a packet fragment has been written to the socket. The WAL delay on the replica takes long enough for replication to break on timeout: the relay reader fiber on the master doesn't receive an ACK from the replica in time and cancels the relay writer fiber. The relay writer fiber wakes up and returns to recover_xlog(), which happily continues to scan the xlog attempting to send more rows (force_recovery is set), failing, and complaining to the log. While the relay thread is still scanning the log, the replica finishes the long WAL write and reads more data from the socket, freeing up some space in the network buffer for the relay to write more rows. The relay thread, which happens to be still in recover_xlog(), writes a new row to the socket after the packet fragment it had written when it was cancelled, effectively corrupting the stream and breaking a replication with an unrecoverable error, e.g. xrow.c:99 E> ER_INVALID_MSGPACK: Invalid MsgPack - packet header Actually, it's pointless to continue scanning an xlog if xstream_write() returned any error different from ClientError - this means that the xlog is scanned by a relay thread (not local recovery) and the connection is broken, in which case there isn't much we can do but stop the relay and wait for the replica to reconnect. So let's fix this issue by ignoring force_recovery option for any error that doesn't have type ClientError. It's difficult to write a test for this case, since too many conditions have to be satisfied simultaneously for the issue to occur. Injecting errors doesn't really help here and would look artificial, because it'd rely too much on the implementation. So I'm committing this one without a test case. Part of #3910 --- src/box/recovery.cc | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 64d50989..c3cc7454 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -279,7 +279,17 @@ recover_xlog(struct recovery *r, struct xstream *stream, } else { say_error("can't apply row: "); diag_log(); - if (!r->wal_dir.force_recovery) + /* + * Stop recovery if a system error occurred, + * no matter if force_recovery is set or not, + * because in this case we could have written + * a packet fragment to the stream so that + * the next write would corrupt data at the + * receiving end. + */ + struct error *e = diag_last_error(diag_get()); + if (!r->wal_dir.force_recovery || + !type_assignable(&type_ClientError, e->type)) diag_raise(); } } -- 2.11.0 ^ permalink raw reply [flat|nested] 13+ messages in thread
* [tarantool-patches] Re: [PATCH 1/5] recovery: stop writing to xstream on system error 2018-12-28 21:21 ` [PATCH 1/5] recovery: stop writing to xstream on system error Vladimir Davydov @ 2018-12-29 9:09 ` Konstantin Osipov 2018-12-29 9:50 ` Vladimir Davydov 0 siblings, 1 reply; 13+ messages in thread From: Konstantin Osipov @ 2018-12-29 9:09 UTC (permalink / raw) To: tarantool-patches * Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/29 10:00]: force_recovery as an option should only affect local recovery, not relays. Why is it set for relay xlog? -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 1/5] recovery: stop writing to xstream on system error 2018-12-29 9:09 ` [tarantool-patches] " Konstantin Osipov @ 2018-12-29 9:50 ` Vladimir Davydov 2018-12-29 10:57 ` Vladimir Davydov 0 siblings, 1 reply; 13+ messages in thread From: Vladimir Davydov @ 2018-12-29 9:50 UTC (permalink / raw) To: Konstantin Osipov; +Cc: tarantool-patches On Sat, Dec 29, 2018 at 12:09:09PM +0300, Konstantin Osipov wrote: > * Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/29 10:00]: > > force_recovery as an option should only affect local recovery, not > relays. Why is it set for relay xlog? For the record. Discussed f2f and agreed that it seems weird to set force_recovery for relay threads, however, changing this behavior now may break existing customers. So we should push this patch as is to 1.10, but for 2.1 do not set force_recovery flag instead. ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 1/5] recovery: stop writing to xstream on system error 2018-12-29 9:50 ` Vladimir Davydov @ 2018-12-29 10:57 ` Vladimir Davydov 2018-12-29 12:08 ` Konstantin Osipov 0 siblings, 1 reply; 13+ messages in thread From: Vladimir Davydov @ 2018-12-29 10:57 UTC (permalink / raw) To: Konstantin Osipov; +Cc: tarantool-patches On Sat, Dec 29, 2018 at 12:50:52PM +0300, Vladimir Davydov wrote: > On Sat, Dec 29, 2018 at 12:09:09PM +0300, Konstantin Osipov wrote: > > * Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/29 10:00]: > > > > force_recovery as an option should only affect local recovery, not > > relays. Why is it set for relay xlog? > > For the record. Discussed f2f and agreed that it seems weird to set > force_recovery for relay threads, however, changing this behavior now > may break existing customers. So we should push this patch as is to > 1.10, but for 2.1 do not set force_recovery flag instead. Here's the alternative fix that will go to 2.1: https://github.com/tarantool/tarantool/commits/dv/gh-3910-fix-replication-crash-2.1 From b641dd89d650b02af3f1adee3c3f0579893a1296 Mon Sep 17 00:00:00 2001 From: Vladimir Davydov <vdavydov.dev@gmail.com> Date: Sat, 29 Dec 2018 13:49:43 +0300 Subject: [PATCH] recovery: ignore box.cfg.force_recovery in relay threads In case force_recovery flag is set, recover_xlog() ignores any errors returned by xstream_write(), even SocketError or FiberIsCancelled. This may result in permanent replication breakdown as described in the next paragraph. Suppose there's a master and a replica and the master has force_recovery flag set. The replica gets stalled on WAL while applying a row fetched from the master. As a result, it stops sending ACKs. In the meantime, the master writes a lot of new rows to its WAL so that the relay thread sending changes to the replica fills up all the space available in the network buffer and blocks on the replication socket. Note, at this moment it may occur that a packet fragment has been written to the socket. The WAL delay on the replica takes long enough for replication to break on timeout: the relay reader fiber on the master doesn't receive an ACK from the replica in time and cancels the relay writer fiber. The relay writer fiber wakes up and returns to recover_xlog(), which happily continues to scan the xlog attempting to send more rows (force_recovery is set), failing, and complaining to the log. While the relay thread is still scanning the log, the replica finishes the long WAL write and reads more data from the socket, freeing up some space in the network buffer for the relay to write more rows. The relay thread, which happens to be still in recover_xlog(), writes a new row to the socket after the packet fragment it had written when it was cancelled, effectively corrupting the stream and breaking a replication with an unrecoverable error, e.g. xrow.c:99 E> ER_INVALID_MSGPACK: Invalid MsgPack - packet header Actually, taking into account force_recovery in relay threads looks dubious - after all this option was implemented to allow start of a tarantool instance when local data are corrupted, not to force replication from a corrupted data set. The latter is dangerous anyway - it's better to rebootstrap replicas in case of master data corruption. That being said, let's ignore force_recovery option in relay threads. It's difficult to write a test for this case, since too many conditions have to be satisfied simultaneously for the issue to occur. Injecting errors doesn't really help here and would look artificial, because it'd rely too much on the implementation. So I'm committing this one without a test case. Part of #3910 diff --git a/src/box/relay.cc b/src/box/relay.cc index a01c2a2e..a799f23d 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -302,8 +302,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, relay_delete(relay); }); - relay->r = recovery_new(cfg_gets("wal_dir"), - cfg_geti("force_recovery"), + relay->r = recovery_new(cfg_gets("wal_dir"), false, start_vclock); vclock_copy(&relay->stop_vclock, stop_vclock); @@ -610,8 +609,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, relay_start(relay, fd, sync, relay_send_row); vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock); - relay->r = recovery_new(cfg_gets("wal_dir"), - cfg_geti("force_recovery"), + relay->r = recovery_new(cfg_gets("wal_dir"), false, replica_clock); vclock_copy(&relay->tx.vclock, replica_clock); relay->version_id = replica_version_id; diff --git a/test/replication/force_recovery.result b/test/replication/force_recovery.result new file mode 100644 index 00000000..f5045285 --- /dev/null +++ b/test/replication/force_recovery.result @@ -0,0 +1,110 @@ +test_run = require('test_run').new() +--- +... +fio = require('fio') +--- +... +-- +-- Test that box.cfg.force_recovery is ignored by relay threads (gh-3910). +-- +_ = box.schema.space.create('test') +--- +... +_ = box.space.test:create_index('primary') +--- +... +box.schema.user.grant('guest', 'replication') +--- +... +-- Deploy a replica. +test_run:cmd("create server test with rpl_master=default, script='replication/replica.lua'") +--- +- true +... +test_run:cmd("start server test") +--- +- true +... +-- Stop the replica and wait for the relay thread to exit. +test_run:cmd("stop server test") +--- +- true +... +test_run:wait_cond(function() return box.info.replication[2].downstream.status == 'stopped' end, 10) +--- +- true +... +-- Delete an xlog file that is needed by the replica. +box.snapshot() +--- +- ok +... +xlog = fio.pathjoin(box.cfg.wal_dir, string.format('%020d.xlog', box.info.signature)) +--- +... +box.space.test:replace{1} +--- +- [1] +... +box.snapshot() +--- +- ok +... +box.space.test:replace{2} +--- +- [2] +... +fio.unlink(xlog) +--- +- true +... +-- Check that even though box.cfg.force_recovery is set, +-- replication will still fail due to LSN gap. +box.cfg{force_recovery = true} +--- +... +test_run:cmd("start server test") +--- +- true +... +test_run:cmd("switch test") +--- +- true +... +box.space.test:select() +--- +- [] +... +box.info.replication[1].upstream.status == 'stopped' or box.info +--- +- true +... +test_run:cmd("switch default") +--- +- true +... +box.cfg{force_recovery = false} +--- +... +-- Cleanup. +test_run:cmd("stop server test") +--- +- true +... +test_run:cmd("cleanup server test") +--- +- true +... +test_run:cmd("delete server test") +--- +- true +... +test_run:cleanup_cluster() +--- +... +box.schema.user.revoke('guest', 'replication') +--- +... +box.space.test:drop() +--- +... diff --git a/test/replication/force_recovery.test.lua b/test/replication/force_recovery.test.lua new file mode 100644 index 00000000..54307814 --- /dev/null +++ b/test/replication/force_recovery.test.lua @@ -0,0 +1,43 @@ +test_run = require('test_run').new() +fio = require('fio') + +-- +-- Test that box.cfg.force_recovery is ignored by relay threads (gh-3910). +-- +_ = box.schema.space.create('test') +_ = box.space.test:create_index('primary') +box.schema.user.grant('guest', 'replication') + +-- Deploy a replica. +test_run:cmd("create server test with rpl_master=default, script='replication/replica.lua'") +test_run:cmd("start server test") + +-- Stop the replica and wait for the relay thread to exit. +test_run:cmd("stop server test") +test_run:wait_cond(function() return box.info.replication[2].downstream.status == 'stopped' end, 10) + +-- Delete an xlog file that is needed by the replica. +box.snapshot() +xlog = fio.pathjoin(box.cfg.wal_dir, string.format('%020d.xlog', box.info.signature)) +box.space.test:replace{1} +box.snapshot() +box.space.test:replace{2} +fio.unlink(xlog) + +-- Check that even though box.cfg.force_recovery is set, +-- replication will still fail due to LSN gap. +box.cfg{force_recovery = true} +test_run:cmd("start server test") +test_run:cmd("switch test") +box.space.test:select() +box.info.replication[1].upstream.status == 'stopped' or box.info +test_run:cmd("switch default") +box.cfg{force_recovery = false} + +-- Cleanup. +test_run:cmd("stop server test") +test_run:cmd("cleanup server test") +test_run:cmd("delete server test") +test_run:cleanup_cluster() +box.schema.user.revoke('guest', 'replication') +box.space.test:drop() diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index 984d2e81..fc7c0c46 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -7,6 +7,7 @@ "hot_standby.test.lua": {}, "rebootstrap.test.lua": {}, "wal_rw_stress.test.lua": {}, + "force_recovery.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 1/5] recovery: stop writing to xstream on system error 2018-12-29 10:57 ` Vladimir Davydov @ 2018-12-29 12:08 ` Konstantin Osipov 0 siblings, 0 replies; 13+ messages in thread From: Konstantin Osipov @ 2018-12-29 12:08 UTC (permalink / raw) To: Vladimir Davydov; +Cc: tarantool-patches * Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/29 15:07]: > On Sat, Dec 29, 2018 at 12:50:52PM +0300, Vladimir Davydov wrote: > > On Sat, Dec 29, 2018 at 12:09:09PM +0300, Konstantin Osipov wrote: > > > * Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/29 10:00]: > > > > > > force_recovery as an option should only affect local recovery, not > > > relays. Why is it set for relay xlog? > > > > For the record. Discussed f2f and agreed that it seems weird to set > > force_recovery for relay threads, however, changing this behavior now > > may break existing customers. So we should push this patch as is to > > 1.10, but for 2.1 do not set force_recovery flag instead. > > Here's the alternative fix that will go to 2.1: Thanks. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH 2/5] relay: do not try to scan xlog if exiting 2018-12-28 21:21 [PATCH 0/5] Fix a couple of replication breakdown issues Vladimir Davydov 2018-12-28 21:21 ` [PATCH 1/5] recovery: stop writing to xstream on system error Vladimir Davydov @ 2018-12-28 21:21 ` Vladimir Davydov 2018-12-29 9:14 ` [tarantool-patches] " Konstantin Osipov 2018-12-28 21:21 ` [PATCH 3/5] relay: cleanup error handling Vladimir Davydov ` (3 subsequent siblings) 5 siblings, 1 reply; 13+ messages in thread From: Vladimir Davydov @ 2018-12-28 21:21 UTC (permalink / raw) To: tarantool-patches relay_process_wal_event() may be called if the relay fiber is already exiting, e.g. by wal_clear_watcher(). We must not try to scan xlogs in this case, because we could have written an incomplete packet fragment to the replication socket, as described in the previous commit message, so that writing another row would lead to corrupted replication stream and, as a result, permanent replication breakdown. Actually, there was a check for this case in relay_process_wal_event(), but it was broken by commit adc28591f77f ("replication: do not delete relay on applier disconnect"), which replaced it with a relay->status check, which is completely wrong, because relay->status is reset only after the relay thread exits. Part of #3910 --- src/box/relay.cc | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/box/relay.cc b/src/box/relay.cc index a01c2a2e..3d9703ea 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -409,10 +409,15 @@ static void relay_process_wal_event(struct wal_watcher *watcher, unsigned events) { struct relay *relay = container_of(watcher, struct relay, wal_watcher); - if (relay->state != RELAY_FOLLOW) { + if (fiber_is_cancelled()) { /* - * Do not try to send anything to the replica - * if it already closed its socket. + * The relay is exiting. Rescanning the WAL at this + * point would be pointless and even dangerous, + * because the relay could have written a packet + * fragment to the socket before being cancelled + * so that writing another row to the socket would + * lead to corrupted replication stream and, as + * a result, permanent replication breakdown. */ return; } -- 2.11.0 ^ permalink raw reply [flat|nested] 13+ messages in thread
* [tarantool-patches] Re: [PATCH 2/5] relay: do not try to scan xlog if exiting 2018-12-28 21:21 ` [PATCH 2/5] relay: do not try to scan xlog if exiting Vladimir Davydov @ 2018-12-29 9:14 ` Konstantin Osipov 2018-12-29 9:53 ` Vladimir Davydov 0 siblings, 1 reply; 13+ messages in thread From: Konstantin Osipov @ 2018-12-29 9:14 UTC (permalink / raw) To: tarantool-patches * Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/29 10:00]: > relay_process_wal_event() may be called if the relay fiber is already > exiting, e.g. by wal_clear_watcher(). We must not try to scan xlogs in > this case, because we could have written an incomplete packet fragment > to the replication socket, as described in the previous commit message, > so that writing another row would lead to corrupted replication stream > and, as a result, permanent replication breakdown. > > Actually, there was a check for this case in relay_process_wal_event(), > but it was broken by commit adc28591f77f ("replication: do not delete > relay on applier disconnect"), which replaced it with a relay->status > check, which is completely wrong, because relay->status is reset only > after the relay thread exits. > > Part of #3910 > --- > src/box/relay.cc | 11 ++++++++--- > 1 file changed, 8 insertions(+), 3 deletions(-) > > diff --git a/src/box/relay.cc b/src/box/relay.cc > index a01c2a2e..3d9703ea 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -409,10 +409,15 @@ static void > relay_process_wal_event(struct wal_watcher *watcher, unsigned events) > { > struct relay *relay = container_of(watcher, struct relay, wal_watcher); > - if (relay->state != RELAY_FOLLOW) { > + if (fiber_is_cancelled()) { When a relay is exiting, it's state is changed. Why would you need to look at fiber_is_cancelled() *instead of* a more explicit RELAY_FOLLOW state change? Why not fix the invariant that whenever relay is exiting it's state is not RELAY_FOLLOW? > /* > - * Do not try to send anything to the replica > - * if it already closed its socket. > + * The relay is exiting. Rescanning the WAL at this > + * point would be pointless and even dangerous, > + * because the relay could have written a packet > + * fragment to the socket before being cancelled > + * so that writing another row to the socket would > + * lead to corrupted replication stream and, as > + * a result, permanent replication breakdown. > */ > return; > } > -- > 2.11.0 > -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 2/5] relay: do not try to scan xlog if exiting 2018-12-29 9:14 ` [tarantool-patches] " Konstantin Osipov @ 2018-12-29 9:53 ` Vladimir Davydov 0 siblings, 0 replies; 13+ messages in thread From: Vladimir Davydov @ 2018-12-29 9:53 UTC (permalink / raw) To: Konstantin Osipov; +Cc: tarantool-patches On Sat, Dec 29, 2018 at 12:14:50PM +0300, Konstantin Osipov wrote: > * Vladimir Davydov <vdavydov.dev@gmail.com> [18/12/29 10:00]: > > relay_process_wal_event() may be called if the relay fiber is already > > exiting, e.g. by wal_clear_watcher(). We must not try to scan xlogs in > > this case, because we could have written an incomplete packet fragment > > to the replication socket, as described in the previous commit message, > > so that writing another row would lead to corrupted replication stream > > and, as a result, permanent replication breakdown. > > > > Actually, there was a check for this case in relay_process_wal_event(), > > but it was broken by commit adc28591f77f ("replication: do not delete > > relay on applier disconnect"), which replaced it with a relay->status > > check, which is completely wrong, because relay->status is reset only > > after the relay thread exits. > > > > Part of #3910 > > --- > > src/box/relay.cc | 11 ++++++++--- > > 1 file changed, 8 insertions(+), 3 deletions(-) > > > > diff --git a/src/box/relay.cc b/src/box/relay.cc > > index a01c2a2e..3d9703ea 100644 > > --- a/src/box/relay.cc > > +++ b/src/box/relay.cc > > @@ -409,10 +409,15 @@ static void > > relay_process_wal_event(struct wal_watcher *watcher, unsigned events) > > { > > struct relay *relay = container_of(watcher, struct relay, wal_watcher); > > - if (relay->state != RELAY_FOLLOW) { > > + if (fiber_is_cancelled()) { > > When a relay is exiting, it's state is changed. Why would you > need to look at fiber_is_cancelled() *instead of* a more explicit > RELAY_FOLLOW state change? Why not fix the invariant that > whenever relay is exiting it's state is not RELAY_FOLLOW? For the record. Discussed f2f. relay->state isn't used by the relay thread, only by the tx thread for reporting box.info. Relay thread uses fiber_is_cancelled() instead. This looks ugly, but this particular fix doesn't make things worse so it's OK to push it as is for now. In future we should rework relay machinery to make it more straightforward and use fewer callbacks. > > > /* > > - * Do not try to send anything to the replica > > - * if it already closed its socket. > > + * The relay is exiting. Rescanning the WAL at this > > + * point would be pointless and even dangerous, > > + * because the relay could have written a packet > > + * fragment to the socket before being cancelled > > + * so that writing another row to the socket would > > + * lead to corrupted replication stream and, as > > + * a result, permanent replication breakdown. > > */ > > return; > > } ^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH 3/5] relay: cleanup error handling 2018-12-28 21:21 [PATCH 0/5] Fix a couple of replication breakdown issues Vladimir Davydov 2018-12-28 21:21 ` [PATCH 1/5] recovery: stop writing to xstream on system error Vladimir Davydov 2018-12-28 21:21 ` [PATCH 2/5] relay: do not try to scan xlog if exiting Vladimir Davydov @ 2018-12-28 21:21 ` Vladimir Davydov 2018-12-28 21:21 ` [PATCH 4/5] relay: close xlog cursor in relay thread Vladimir Davydov ` (2 subsequent siblings) 5 siblings, 0 replies; 13+ messages in thread From: Vladimir Davydov @ 2018-12-28 21:21 UTC (permalink / raw) To: tarantool-patches A few changes intended to make error messages more clear, remove duplicates, etc: - Don't log an error when xstream_write() fails in recover_xlog() - it's a responsibility of the caller. Logging it there results in the same error occuring twice in the log. - If recover_xlog() fails to apply a row and continues due to force_recovery flag, log the row's LSN - it might be useful for problem analysis. - Don't override relay error in relay_process_wal_event(), otherwise we can get 'fiber is cancelled' error in the status, which is meaningless. - Break replication if we fail to send an ack as it's pointless to continue then. - Log a relay error only once - when the relay thread is exiting. Don't log subsequent errors - they don't make much sense. - Set the relay cord name before setting WAL watcher: the WAL watcher sends an event as soon as it's installed, which starts xlog recovery, which is logged by the relay so we want the relay name to be valid. Note, there's a catch here: we used the original cord name as cbus endpoint name so now we have to pass endpoint name explicitly - this looks better anyway. While we are at it, let's also add some comments to relay_subscribe_f() and remove diag_is_empty() check as diag is always set when relay exits. Part of #3910 --- src/box/recovery.cc | 6 +++-- src/box/relay.cc | 75 ++++++++++++++++++++++++++++++++--------------------- 2 files changed, 49 insertions(+), 32 deletions(-) diff --git a/src/box/recovery.cc b/src/box/recovery.cc index c3cc7454..e95b03e2 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -277,8 +277,6 @@ recover_xlog(struct recovery *r, struct xstream *stream, say_info("%.1fM rows processed", row_count / 1000000.); } else { - say_error("can't apply row: "); - diag_log(); /* * Stop recovery if a system error occurred, * no matter if force_recovery is set or not, @@ -291,6 +289,10 @@ recover_xlog(struct recovery *r, struct xstream *stream, if (!r->wal_dir.force_recovery || !type_assignable(&type_ClientError, e->type)) diag_raise(); + + say_error("skipping row {%u: %lld}", + (unsigned)row.replica_id, (long long)row.lsn); + diag_log(); } } } diff --git a/src/box/relay.cc b/src/box/relay.cc index 3d9703ea..988c01d3 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -406,6 +406,14 @@ relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock) } static void +relay_set_error(struct relay *relay, struct error *e) +{ + /* Don't override existing error. */ + if (diag_is_empty(&relay->diag)) + diag_add_error(&relay->diag, e); +} + +static void relay_process_wal_event(struct wal_watcher *watcher, unsigned events) { struct relay *relay = container_of(watcher, struct relay, wal_watcher); @@ -425,8 +433,7 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events) recover_remaining_wals(relay->r, &relay->stream, NULL, (events & WAL_EVENT_ROTATE) != 0); } catch (Exception *e) { - e->log(); - diag_move(diag_get(), &relay->diag); + relay_set_error(relay, e); fiber_cancel(fiber()); } } @@ -456,17 +463,8 @@ relay_reader_f(va_list ap) fiber_cond_signal(&relay->reader_cond); } } catch (Exception *e) { - if (diag_is_empty(&relay->diag)) { - /* Don't override existing error. */ - diag_move(diag_get(), &relay->diag); - fiber_cancel(relay_f); - } else if (!fiber_is_cancelled()) { - /* - * There is an relay error and this fiber - * fiber has another, log it. - */ - e->log(); - } + relay_set_error(relay, e); + fiber_cancel(relay_f); } ibuf_destroy(&ibuf); return 0; @@ -483,7 +481,8 @@ relay_send_heartbeat(struct relay *relay) try { relay_send(relay, &row); } catch (Exception *e) { - e->log(); + relay_set_error(relay, e); + fiber_cancel(fiber()); } } @@ -499,20 +498,25 @@ relay_subscribe_f(va_list ap) struct recovery *r = relay->r; coio_enable(); - cbus_endpoint_create(&relay->endpoint, cord_name(cord()), + relay_set_cord_name(relay->io.fd); + + /* Create cpipe to tx for propagating vclock. */ + cbus_endpoint_create(&relay->endpoint, tt_sprintf("relay_%p", relay), fiber_schedule_cb, fiber()); - cbus_pair("tx", cord_name(cord()), &relay->tx_pipe, &relay->relay_pipe, - NULL, NULL, cbus_process); + cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe, + &relay->relay_pipe, NULL, NULL, cbus_process); + /* Setup garbage collection trigger. */ struct trigger on_close_log = { RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL }; trigger_add(&r->on_close_log, &on_close_log); - wal_set_watcher(&relay->wal_watcher, cord_name(cord()), - relay_process_wal_event, cbus_process); - relay_set_cord_name(relay->io.fd); + /* Setup WAL watcher for sending new rows to the replica. */ + wal_set_watcher(&relay->wal_watcher, relay->endpoint.name, + relay_process_wal_event, cbus_process); + /* Start fiber for receiving replica acks. */ char name[FIBER_NAME_MAX]; snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader"); struct fiber *reader = fiber_new_xc(name, relay_reader_f); @@ -527,6 +531,10 @@ relay_subscribe_f(va_list ap) */ relay_send_heartbeat(relay); + /* + * Run the event loop until the connection is broken + * or an error occurs. + */ while (!fiber_is_cancelled()) { double timeout = replication_timeout; struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, @@ -571,26 +579,33 @@ relay_subscribe_f(va_list ap) relay_schedule_pending_gc(relay, send_vclock); } + /* + * Log the error that caused the relay to break the loop. + * Don't clear the error for status reporting. + */ + assert(!diag_is_empty(&relay->diag)); + diag_add_error(diag_get(), diag_last_error(&relay->diag)); + diag_log(); say_crit("exiting the relay loop"); + + /* Clear garbage collector trigger and WAL watcher. */ trigger_clear(&on_close_log); wal_clear_watcher(&relay->wal_watcher, cbus_process); - if (!fiber_is_dead(reader)) - fiber_cancel(reader); + + /* Join ack reader fiber. */ + fiber_cancel(reader); fiber_join(reader); + + /* Destroy cpipe to tx. */ cbus_unpair(&relay->tx_pipe, &relay->relay_pipe, NULL, NULL, cbus_process); cbus_endpoint_destroy(&relay->endpoint, cbus_process); - if (!diag_is_empty(&relay->diag)) { - /* An error has occurred while reading ACKs of xlog. */ - diag_move(&relay->diag, diag_get()); - /* Reference the diag in the status. */ - diag_add_error(&relay->diag, diag_last_error(diag_get())); - } + struct errinj *inj = errinj(ERRINJ_RELAY_EXIT_DELAY, ERRINJ_DOUBLE); if (inj != NULL && inj->dparam > 0) fiber_sleep(inj->dparam); - return diag_is_empty(diag_get()) ? 0: -1; + return -1; } /** Replication acceptor fiber handler. */ @@ -621,7 +636,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, vclock_copy(&relay->tx.vclock, replica_clock); relay->version_id = replica_version_id; - int rc = cord_costart(&relay->cord, tt_sprintf("relay_%p", relay), + int rc = cord_costart(&relay->cord, "subscribe", relay_subscribe_f, relay); if (rc == 0) rc = cord_cojoin(&relay->cord); -- 2.11.0 ^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH 4/5] relay: close xlog cursor in relay thread 2018-12-28 21:21 [PATCH 0/5] Fix a couple of replication breakdown issues Vladimir Davydov ` (2 preceding siblings ...) 2018-12-28 21:21 ` [PATCH 3/5] relay: cleanup error handling Vladimir Davydov @ 2018-12-28 21:21 ` Vladimir Davydov 2018-12-28 21:21 ` [PATCH 5/5] xlog: assure xlog is opened and closed in the same thread Vladimir Davydov 2018-12-29 11:40 ` [PATCH 0/5] Fix a couple of replication breakdown issues Vladimir Davydov 5 siblings, 0 replies; 13+ messages in thread From: Vladimir Davydov @ 2018-12-28 21:21 UTC (permalink / raw) To: tarantool-patches An xlog_cursor created and used by a relay via recovery context is destroyed by the main thread once the relay thread has exited. This is incorrect, because xlog_cursor uses cord's slab allocator and therefore must be destroyed in the same thread it was created by, otherwise we risk getting a use-after-free bug. So this patch moves recovery_delete() invocation to the end of the relay thread routine. No test is added, because our existing tests already cover this case - crashes don't usually happen, because we are lucky. The next patch will add some assertions to make the bug 100% reproducible. Closes #3910 --- src/box/relay.cc | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/src/box/relay.cc b/src/box/relay.cc index 988c01d3..8f5355ae 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -210,6 +210,26 @@ relay_cancel(struct relay *relay) } } +/** + * Called by a relay thread right before termination. + */ +static void +relay_exit(struct relay *relay) +{ + struct errinj *inj = errinj(ERRINJ_RELAY_EXIT_DELAY, ERRINJ_DOUBLE); + if (inj != NULL && inj->dparam > 0) + fiber_sleep(inj->dparam); + + /* + * Destroy the recovery context. We MUST do it in + * the relay thread, because it contains an xlog + * cursor, which must be closed in the same thread + * that opened it (it uses cord's slab allocator). + */ + recovery_delete(relay->r); + relay->r = NULL; +} + static void relay_stop(struct relay *relay) { @@ -277,6 +297,8 @@ int relay_final_join_f(va_list ap) { struct relay *relay = va_arg(ap, struct relay *); + auto guard = make_scoped_guard([=] { relay_exit(relay); }); + coio_enable(); relay_set_cord_name(relay->io.fd); @@ -601,10 +623,7 @@ relay_subscribe_f(va_list ap) NULL, NULL, cbus_process); cbus_endpoint_destroy(&relay->endpoint, cbus_process); - struct errinj *inj = errinj(ERRINJ_RELAY_EXIT_DELAY, ERRINJ_DOUBLE); - if (inj != NULL && inj->dparam > 0) - fiber_sleep(inj->dparam); - + relay_exit(relay); return -1; } -- 2.11.0 ^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH 5/5] xlog: assure xlog is opened and closed in the same thread 2018-12-28 21:21 [PATCH 0/5] Fix a couple of replication breakdown issues Vladimir Davydov ` (3 preceding siblings ...) 2018-12-28 21:21 ` [PATCH 4/5] relay: close xlog cursor in relay thread Vladimir Davydov @ 2018-12-28 21:21 ` Vladimir Davydov 2018-12-29 11:40 ` [PATCH 0/5] Fix a couple of replication breakdown issues Vladimir Davydov 5 siblings, 0 replies; 13+ messages in thread From: Vladimir Davydov @ 2018-12-28 21:21 UTC (permalink / raw) To: tarantool-patches xlog and xlog_cursor must be opened and closed in the same thread, because they use cord's slab allocator. Follow-up #3910 --- src/box/xlog.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/box/xlog.c b/src/box/xlog.c index 71b9b7cd..881dcd3b 100644 --- a/src/box/xlog.c +++ b/src/box/xlog.c @@ -791,6 +791,8 @@ xlog_clear(struct xlog *l) static void xlog_destroy(struct xlog *xlog) { + assert(xlog->obuf.slabc == &cord()->slabc); + assert(xlog->zbuf.slabc == &cord()->slabc); obuf_destroy(&xlog->obuf); obuf_destroy(&xlog->zbuf); ZSTD_freeCCtx(xlog->zctx); @@ -1816,6 +1818,7 @@ xlog_tx_cursor_next_row(struct xlog_tx_cursor *tx_cursor, int xlog_tx_cursor_destroy(struct xlog_tx_cursor *tx_cursor) { + assert(tx_cursor->rows.slabc == &cord()->slabc); ibuf_destroy(&tx_cursor->rows); return 0; } @@ -2049,6 +2052,7 @@ xlog_cursor_close(struct xlog_cursor *i, bool reuse_fd) assert(xlog_cursor_is_open(i)); if (i->fd >= 0 && !reuse_fd) close(i->fd); + assert(i->rbuf.slabc == &cord()->slabc); ibuf_destroy(&i->rbuf); if (i->state == XLOG_CURSOR_TX) xlog_tx_cursor_destroy(&i->tx_cursor); -- 2.11.0 ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [PATCH 0/5] Fix a couple of replication breakdown issues 2018-12-28 21:21 [PATCH 0/5] Fix a couple of replication breakdown issues Vladimir Davydov ` (4 preceding siblings ...) 2018-12-28 21:21 ` [PATCH 5/5] xlog: assure xlog is opened and closed in the same thread Vladimir Davydov @ 2018-12-29 11:40 ` Vladimir Davydov 5 siblings, 0 replies; 13+ messages in thread From: Vladimir Davydov @ 2018-12-29 11:40 UTC (permalink / raw) To: tarantool-patches Pushed to 1.10 and 2.1. ^ permalink raw reply [flat|nested] 13+ messages in thread
end of thread, other threads:[~2018-12-29 12:08 UTC | newest] Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2018-12-28 21:21 [PATCH 0/5] Fix a couple of replication breakdown issues Vladimir Davydov 2018-12-28 21:21 ` [PATCH 1/5] recovery: stop writing to xstream on system error Vladimir Davydov 2018-12-29 9:09 ` [tarantool-patches] " Konstantin Osipov 2018-12-29 9:50 ` Vladimir Davydov 2018-12-29 10:57 ` Vladimir Davydov 2018-12-29 12:08 ` Konstantin Osipov 2018-12-28 21:21 ` [PATCH 2/5] relay: do not try to scan xlog if exiting Vladimir Davydov 2018-12-29 9:14 ` [tarantool-patches] " Konstantin Osipov 2018-12-29 9:53 ` Vladimir Davydov 2018-12-28 21:21 ` [PATCH 3/5] relay: cleanup error handling Vladimir Davydov 2018-12-28 21:21 ` [PATCH 4/5] relay: close xlog cursor in relay thread Vladimir Davydov 2018-12-28 21:21 ` [PATCH 5/5] xlog: assure xlog is opened and closed in the same thread Vladimir Davydov 2018-12-29 11:40 ` [PATCH 0/5] Fix a couple of replication breakdown issues Vladimir Davydov
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox