From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH] replication: fix rebootstrap crash in case master has replica's rows Date: Sat, 13 Oct 2018 18:36:56 +0300 Message-Id: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: During SUBSCRIBE the master sends only those rows originating from the subscribed replica that aren't present on the replica. Such rows may appear after a sudden power loss in case the replica doesn't issue fdatasync() after each WAL write, which is the default behavior. This means that a replica can write some rows to WAL, relay them to another replica, then stop without syncing WAL file. If this happens we expect the replica to read its own rows from other members of the cluster upon restart. For more details see commit eae84efbfbf9 ("replication: recover missing local data from replica"). Obviously, this feature only makes sense for SUBSCRIBE. During JOIN we must relay all rows. This is how it initially worked, but commit adc28591f77f ("replication: do not delete relay on applier disconnect"), witlessly removed the corresponding check from relay_send_row() so that now we don't send any rows originating from the joined replica: @@ -595,8 +630,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) * it). In the latter case packet's LSN is less than or equal to * local master's LSN at the moment it received 'SUBSCRIBE' request. */ - if (relay->replica == NULL || - packet->replica_id != relay->replica->id || + if (packet->replica_id != relay->replica->id || packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe, packet->replica_id)) { relay_send(relay, packet); (relay->local_vclock_at_subscribe is initialized to 0 on JOIN) This only affects the case of rebootstrap, automatic or manual, because when a new replica joins a cluster there can't be any rows on the master originating from it. On manual rebootstrap, i.e. when the replica files are deleted by the user and the replica is restarted from an empty directory with the same UUID (set via box.cfg.instance_uuid), this isn't critical - the replica will still receive those rows it should have received during JOIN once it subscribes. However, in case of automatic rebootstrap this can result in broken order of xlog/snap files, because the replica directory still contains old xlog/snap files created before rebootstrap. The rebootstrap logic expects them to have strictly less vclocks than new files, but if JOIN stops prematurely, this condition may not hold, leading to a crash when the vclock of a new xlog/snap is inserted into the corresponding xdir. This patch fixes this issue by restoring pre eae84efbfbf9 behavior: now we create a new relay for FINAL JOIN instead of reusing the one attached to the joined replica so that relay_send_row() can detect JOIN phase and relay all rows in this case. It also adds a comment so that we don't make such a mistake in future. Apart from fixing the issue, this patch also fixes a relay leak in relay_initial_join() in case engine_join_xc() fails, which was also introduced by the above mentioned commit. A note about xlog/panic_on_broken_lsn test. Now the relay status isn't reported by box.info.replication if FINAL JOIN failed and the replica never subscribed (this is how it worked before commit eae84efbfbf9) so we need to tweak the test a bit to handle this. Closes #3740 --- https://github.com/tarantool/tarantool/issues/3740 https://github.com/tarantool/tarantool/commits/dv/gh-3740-fix-rebootstrap-crash src/box/box.cc | 3 +- src/box/relay.cc | 36 +++++--- src/box/relay.h | 4 +- test/replication/replica_rejoin.result | 137 +++++++++++++++++++++++++++++++ test/replication/replica_rejoin.test.lua | 51 ++++++++++++ test/xlog/panic_on_broken_lsn.result | 5 +- test/xlog/panic_on_broken_lsn.test.lua | 3 +- 7 files changed, 218 insertions(+), 21 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index 7e32b9fc..79a818ec 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1518,8 +1518,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header) * Final stage: feed replica with WALs in range * (start_vclock, stop_vclock). */ - relay_final_join(replica, io->fd, header->sync, - &start_vclock, &stop_vclock); + relay_final_join(io->fd, header->sync, &start_vclock, &stop_vclock); say_info("final data sent."); /* Send end of WAL stream marker */ diff --git a/src/box/relay.cc b/src/box/relay.cc index d5df487e..0a1e95af 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -32,6 +32,7 @@ #include "trivia/config.h" #include "trivia/util.h" +#include "scoped_guard.h" #include "cbus.h" #include "cfg.h" #include "errinj.h" @@ -262,10 +263,14 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) struct relay *relay = relay_new(NULL); if (relay == NULL) diag_raise(); + relay_start(relay, fd, sync, relay_send_initial_join_row); + auto relay_guard = make_scoped_guard([=] { + relay_stop(relay); + relay_delete(relay); + }); + engine_join_xc(vclock, &relay->stream); - relay_stop(relay); - relay_delete(relay); } int @@ -284,11 +289,19 @@ relay_final_join_f(va_list ap) } void -relay_final_join(struct replica *replica, int fd, uint64_t sync, - struct vclock *start_vclock, struct vclock *stop_vclock) +relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, + struct vclock *stop_vclock) { - struct relay *relay = replica->relay; + struct relay *relay = relay_new(NULL); + if (relay == NULL) + diag_raise(); + relay_start(relay, fd, sync, relay_send_row); + auto relay_guard = make_scoped_guard([=] { + relay_stop(relay); + relay_delete(relay); + }); + relay->r = recovery_new(cfg_gets("wal_dir"), cfg_geti("force_recovery"), start_vclock); @@ -298,9 +311,6 @@ relay_final_join(struct replica *replica, int fd, uint64_t sync, relay_final_join_f, relay); if (rc == 0) rc = cord_cojoin(&relay->cord); - - relay_stop(relay); - if (rc != 0) diag_raise(); @@ -660,15 +670,19 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) packet->bodycnt = 0; } /* - * We're feeding a WAL, thus responding to SUBSCRIBE request. - * In that case, only send a row if it is not from the same replica + * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE + * request. If this is FINAL JOIN (i.e. relay->replica is NULL), + * we must relay all rows, even those originating from the replica + * itself (there may be such rows if this is rebootstrap). If this + * SUBSCRIBE, only send a row if it is not from the same replica * (i.e. don't send replica's own rows back) or if this row is * missing on the other side (i.e. in case of sudden power-loss, * data was not written to WAL, so remote master can't recover * it). In the latter case packet's LSN is less than or equal to * local master's LSN at the moment it received 'SUBSCRIBE' request. */ - if (packet->replica_id != relay->replica->id || + if (relay->replica == NULL || + packet->replica_id != relay->replica->id || packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe, packet->replica_id)) { struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN, diff --git a/src/box/relay.h b/src/box/relay.h index 53bf68eb..c0848899 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -106,8 +106,8 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock); * @param sync sync from incoming JOIN request */ void -relay_final_join(struct replica *replica, int fd, uint64_t sync, - struct vclock *start_vclock, struct vclock *stop_vclock); +relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, + struct vclock *stop_vclock); /** * Subscribe a replica to updates. diff --git a/test/replication/replica_rejoin.result b/test/replication/replica_rejoin.result index 4370fae4..df1057d1 100644 --- a/test/replication/replica_rejoin.result +++ b/test/replication/replica_rejoin.result @@ -229,11 +229,148 @@ box.space.test:select() - [20, 20] - [30, 30] ... +-- +-- gh-3740: rebootstrap crashes if the master has rows originating +-- from the replica. +-- +-- Bootstrap a new replica. +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... +test_run:cleanup_cluster() +--- +... +box.space.test:truncate() +--- +... +test_run:cmd("start server replica") +--- +- true +... +-- Subscribe the master to the replica. +replica_listen = test_run:cmd("eval replica 'return box.cfg.listen'") +--- +... +replica_listen ~= nil +--- +- true +... +box.cfg{replication = replica_listen} +--- +... +-- Unsubscribe the replica from the master. +test_run:cmd("switch replica") +--- +- true +... +box.cfg{replication = ''} +--- +... +-- Bump vclock on the master. +test_run:cmd("switch default") +--- +- true +... +box.space.test:replace{1} +--- +- [1] +... +-- Bump vclock on the replica. +test_run:cmd("switch replica") +--- +- true +... +for i = 1, 10 do box.space.test:replace{2} end +--- +... +vclock = test_run:get_vclock('replica') +--- +... +_ = test_run:wait_vclock('default', vclock) +--- +... +-- Restart the master and force garbage collection. +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("restart server default") +replica_listen = test_run:cmd("eval replica 'return box.cfg.listen'") +--- +... +replica_listen ~= nil +--- +- true +... +box.cfg{replication = replica_listen} +--- +... +default_checkpoint_count = box.cfg.checkpoint_count +--- +... +box.cfg{checkpoint_count = 1} +--- +... +box.snapshot() +--- +- ok +... +box.cfg{checkpoint_count = default_checkpoint_count} +--- +... +fio = require('fio') +--- +... +#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) == 1 +--- +- true +... +-- Bump vclock on the replica again. +test_run:cmd("switch replica") +--- +- true +... +for i = 1, 10 do box.space.test:replace{2} end +--- +... +vclock = test_run:get_vclock('replica') +--- +... +_ = test_run:wait_vclock('default', vclock) +--- +... +-- Restart the replica. It should successfully rebootstrap. +test_run:cmd("restart server replica") +box.space.test:select() +--- +- - [1] + - [2] +... +box.snapshot() +--- +- ok +... +box.space.test:replace{2} +--- +- [2] +... -- Cleanup. test_run:cmd("switch default") --- - true ... +box.cfg{replication = ''} +--- +... test_run:cmd("stop server replica") --- - true diff --git a/test/replication/replica_rejoin.test.lua b/test/replication/replica_rejoin.test.lua index f998f60d..40094a1a 100644 --- a/test/replication/replica_rejoin.test.lua +++ b/test/replication/replica_rejoin.test.lua @@ -83,8 +83,59 @@ test_run:cmd("switch replica") box.info.status -- orphan box.space.test:select() +-- +-- gh-3740: rebootstrap crashes if the master has rows originating +-- from the replica. +-- + +-- Bootstrap a new replica. +test_run:cmd("switch default") +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") +test_run:cleanup_cluster() +box.space.test:truncate() +test_run:cmd("start server replica") +-- Subscribe the master to the replica. +replica_listen = test_run:cmd("eval replica 'return box.cfg.listen'") +replica_listen ~= nil +box.cfg{replication = replica_listen} +-- Unsubscribe the replica from the master. +test_run:cmd("switch replica") +box.cfg{replication = ''} +-- Bump vclock on the master. +test_run:cmd("switch default") +box.space.test:replace{1} +-- Bump vclock on the replica. +test_run:cmd("switch replica") +for i = 1, 10 do box.space.test:replace{2} end +vclock = test_run:get_vclock('replica') +_ = test_run:wait_vclock('default', vclock) +-- Restart the master and force garbage collection. +test_run:cmd("switch default") +test_run:cmd("restart server default") +replica_listen = test_run:cmd("eval replica 'return box.cfg.listen'") +replica_listen ~= nil +box.cfg{replication = replica_listen} +default_checkpoint_count = box.cfg.checkpoint_count +box.cfg{checkpoint_count = 1} +box.snapshot() +box.cfg{checkpoint_count = default_checkpoint_count} +fio = require('fio') +#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) == 1 +-- Bump vclock on the replica again. +test_run:cmd("switch replica") +for i = 1, 10 do box.space.test:replace{2} end +vclock = test_run:get_vclock('replica') +_ = test_run:wait_vclock('default', vclock) +-- Restart the replica. It should successfully rebootstrap. +test_run:cmd("restart server replica") +box.space.test:select() +box.snapshot() +box.space.test:replace{2} + -- Cleanup. test_run:cmd("switch default") +box.cfg{replication = ''} test_run:cmd("stop server replica") test_run:cmd("cleanup server replica") box.space.test:drop() diff --git a/test/xlog/panic_on_broken_lsn.result b/test/xlog/panic_on_broken_lsn.result index ad10217b..b8583def 100644 --- a/test/xlog/panic_on_broken_lsn.result +++ b/test/xlog/panic_on_broken_lsn.result @@ -140,10 +140,7 @@ test_run:cmd('start server replica with crash_expected=True') fiber = require('fiber') --- ... -while box.info.replication == nil do fiber.sleep(0.001) end ---- -... -while box.info.replication[2].downstream.status ~= "stopped" do fiber.sleep(0.001) end +while box.info.replication[2] == nil do fiber.sleep(0.001) end --- ... box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1) diff --git a/test/xlog/panic_on_broken_lsn.test.lua b/test/xlog/panic_on_broken_lsn.test.lua index dd450d15..717919d5 100644 --- a/test/xlog/panic_on_broken_lsn.test.lua +++ b/test/xlog/panic_on_broken_lsn.test.lua @@ -59,8 +59,7 @@ box.space.test:auto_increment{'v1'} test_run:cmd('create server replica with rpl_master=default, script="xlog/replica.lua"') test_run:cmd('start server replica with crash_expected=True') fiber = require('fiber') -while box.info.replication == nil do fiber.sleep(0.001) end -while box.info.replication[2].downstream.status ~= "stopped" do fiber.sleep(0.001) end +while box.info.replication[2] == nil do fiber.sleep(0.001) end box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1) logpath = fio.pathjoin(fio.cwd(), 'replica.log') -- 2.11.0