[PATCH] replication: fix rebootstrap crash in case master has replica's rows

Vladimir Davydov vdavydov.dev at gmail.com
Sat Oct 13 18:36:56 MSK 2018


During SUBSCRIBE the master sends only those rows originating from the
subscribed replica that aren't present on the replica. Such rows may
appear after a sudden power loss in case the replica doesn't issue
fdatasync() after each WAL write, which is the default behavior. This
means that a replica can write some rows to WAL, relay them to another
replica, then stop without syncing WAL file. If this happens we expect
the replica to read its own rows from other members of the cluster upon
restart. For more details see commit eae84efbfbf9 ("replication: recover
missing local data from replica").

Obviously, this feature only makes sense for SUBSCRIBE. During JOIN
we must relay all rows. This is how it initially worked, but commit
adc28591f77f ("replication: do not delete relay on applier disconnect"),
witlessly removed the corresponding check from relay_send_row() so that
now we don't send any rows originating from the joined replica:

  @@ -595,8 +630,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
           * it). In the latter case packet's LSN is less than or equal to
           * local master's LSN at the moment it received 'SUBSCRIBE' request.
           */
  -       if (relay->replica == NULL ||
  -           packet->replica_id != relay->replica->id ||
  +       if (packet->replica_id != relay->replica->id ||
              packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
                                        packet->replica_id)) {
                  relay_send(relay, packet);

(relay->local_vclock_at_subscribe is initialized to 0 on JOIN)

This only affects the case of rebootstrap, automatic or manual, because
when a new replica joins a cluster there can't be any rows on the master
originating from it. On manual rebootstrap, i.e. when the replica files
are deleted by the user and the replica is restarted from an empty
directory with the same UUID (set via box.cfg.instance_uuid), this isn't
critical - the replica will still receive those rows it should have
received during JOIN once it subscribes. However, in case of automatic
rebootstrap this can result in broken order of xlog/snap files, because
the replica directory still contains old xlog/snap files created before
rebootstrap. The rebootstrap logic expects them to have strictly less
vclocks than new files, but if JOIN stops prematurely, this condition
may not hold, leading to a crash when the vclock of a new xlog/snap is
inserted into the corresponding xdir.

This patch fixes this issue by restoring pre eae84efbfbf9 behavior: now
we create a new relay for FINAL JOIN instead of reusing the one attached
to the joined replica so that relay_send_row() can detect JOIN phase and
relay all rows in this case. It also adds a comment so that we don't
make such a mistake in future.

Apart from fixing the issue, this patch also fixes a relay leak in
relay_initial_join() in case engine_join_xc() fails, which was also
introduced by the above mentioned commit.

A note about xlog/panic_on_broken_lsn test. Now the relay status isn't
reported by box.info.replication if FINAL JOIN failed and the replica
never subscribed (this is how it worked before commit eae84efbfbf9) so
we need to tweak the test a bit to handle this.

Closes #3740
---
https://github.com/tarantool/tarantool/issues/3740
https://github.com/tarantool/tarantool/commits/dv/gh-3740-fix-rebootstrap-crash

 src/box/box.cc                           |   3 +-
 src/box/relay.cc                         |  36 +++++---
 src/box/relay.h                          |   4 +-
 test/replication/replica_rejoin.result   | 137 +++++++++++++++++++++++++++++++
 test/replication/replica_rejoin.test.lua |  51 ++++++++++++
 test/xlog/panic_on_broken_lsn.result     |   5 +-
 test/xlog/panic_on_broken_lsn.test.lua   |   3 +-
 7 files changed, 218 insertions(+), 21 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 7e32b9fc..79a818ec 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1518,8 +1518,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	 * Final stage: feed replica with WALs in range
 	 * (start_vclock, stop_vclock).
 	 */
-	relay_final_join(replica, io->fd, header->sync,
-			 &start_vclock, &stop_vclock);
+	relay_final_join(io->fd, header->sync, &start_vclock, &stop_vclock);
 	say_info("final data sent.");
 
 	/* Send end of WAL stream marker */
diff --git a/src/box/relay.cc b/src/box/relay.cc
index d5df487e..0a1e95af 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -32,6 +32,7 @@
 
 #include "trivia/config.h"
 #include "trivia/util.h"
+#include "scoped_guard.h"
 #include "cbus.h"
 #include "cfg.h"
 #include "errinj.h"
@@ -262,10 +263,14 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
 	struct relay *relay = relay_new(NULL);
 	if (relay == NULL)
 		diag_raise();
+
 	relay_start(relay, fd, sync, relay_send_initial_join_row);
+	auto relay_guard = make_scoped_guard([=] {
+		relay_stop(relay);
+		relay_delete(relay);
+	});
+
 	engine_join_xc(vclock, &relay->stream);
-	relay_stop(relay);
-	relay_delete(relay);
 }
 
 int
@@ -284,11 +289,19 @@ relay_final_join_f(va_list ap)
 }
 
 void
-relay_final_join(struct replica *replica, int fd, uint64_t sync,
-		 struct vclock *start_vclock, struct vclock *stop_vclock)
+relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
+		 struct vclock *stop_vclock)
 {
-	struct relay *relay = replica->relay;
+	struct relay *relay = relay_new(NULL);
+	if (relay == NULL)
+		diag_raise();
+
 	relay_start(relay, fd, sync, relay_send_row);
+	auto relay_guard = make_scoped_guard([=] {
+		relay_stop(relay);
+		relay_delete(relay);
+	});
+
 	relay->r = recovery_new(cfg_gets("wal_dir"),
 			       cfg_geti("force_recovery"),
 			       start_vclock);
@@ -298,9 +311,6 @@ relay_final_join(struct replica *replica, int fd, uint64_t sync,
 			      relay_final_join_f, relay);
 	if (rc == 0)
 		rc = cord_cojoin(&relay->cord);
-
-	relay_stop(relay);
-
 	if (rc != 0)
 		diag_raise();
 
@@ -660,15 +670,19 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 		packet->bodycnt = 0;
 	}
 	/*
-	 * We're feeding a WAL, thus responding to SUBSCRIBE request.
-	 * In that case, only send a row if it is not from the same replica
+	 * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
+	 * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
+	 * we must relay all rows, even those originating from the replica
+	 * itself (there may be such rows if this is rebootstrap). If this
+	 * SUBSCRIBE, only send a row if it is not from the same replica
 	 * (i.e. don't send replica's own rows back) or if this row is
 	 * missing on the other side (i.e. in case of sudden power-loss,
 	 * data was not written to WAL, so remote master can't recover
 	 * it). In the latter case packet's LSN is less than or equal to
 	 * local master's LSN at the moment it received 'SUBSCRIBE' request.
 	 */
-	if (packet->replica_id != relay->replica->id ||
+	if (relay->replica == NULL ||
+	    packet->replica_id != relay->replica->id ||
 	    packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
 				      packet->replica_id)) {
 		struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN,
diff --git a/src/box/relay.h b/src/box/relay.h
index 53bf68eb..c0848899 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -106,8 +106,8 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock);
  * @param sync      sync from incoming JOIN request
  */
 void
-relay_final_join(struct replica *replica, int fd, uint64_t sync,
-		 struct vclock *start_vclock, struct vclock *stop_vclock);
+relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
+		 struct vclock *stop_vclock);
 
 /**
  * Subscribe a replica to updates.
diff --git a/test/replication/replica_rejoin.result b/test/replication/replica_rejoin.result
index 4370fae4..df1057d1 100644
--- a/test/replication/replica_rejoin.result
+++ b/test/replication/replica_rejoin.result
@@ -229,11 +229,148 @@ box.space.test:select()
   - [20, 20]
   - [30, 30]
 ...
+--
+-- gh-3740: rebootstrap crashes if the master has rows originating
+-- from the replica.
+--
+-- Bootstrap a new replica.
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+test_run:cleanup_cluster()
+---
+...
+box.space.test:truncate()
+---
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+-- Subscribe the master to the replica.
+replica_listen = test_run:cmd("eval replica 'return box.cfg.listen'")
+---
+...
+replica_listen ~= nil
+---
+- true
+...
+box.cfg{replication = replica_listen}
+---
+...
+-- Unsubscribe the replica from the master.
+test_run:cmd("switch replica")
+---
+- true
+...
+box.cfg{replication = ''}
+---
+...
+-- Bump vclock on the master.
+test_run:cmd("switch default")
+---
+- true
+...
+box.space.test:replace{1}
+---
+- [1]
+...
+-- Bump vclock on the replica.
+test_run:cmd("switch replica")
+---
+- true
+...
+for i = 1, 10 do box.space.test:replace{2} end
+---
+...
+vclock = test_run:get_vclock('replica')
+---
+...
+_ = test_run:wait_vclock('default', vclock)
+---
+...
+-- Restart the master and force garbage collection.
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server default")
+replica_listen = test_run:cmd("eval replica 'return box.cfg.listen'")
+---
+...
+replica_listen ~= nil
+---
+- true
+...
+box.cfg{replication = replica_listen}
+---
+...
+default_checkpoint_count = box.cfg.checkpoint_count
+---
+...
+box.cfg{checkpoint_count = 1}
+---
+...
+box.snapshot()
+---
+- ok
+...
+box.cfg{checkpoint_count = default_checkpoint_count}
+---
+...
+fio = require('fio')
+---
+...
+#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) == 1
+---
+- true
+...
+-- Bump vclock on the replica again.
+test_run:cmd("switch replica")
+---
+- true
+...
+for i = 1, 10 do box.space.test:replace{2} end
+---
+...
+vclock = test_run:get_vclock('replica')
+---
+...
+_ = test_run:wait_vclock('default', vclock)
+---
+...
+-- Restart the replica. It should successfully rebootstrap.
+test_run:cmd("restart server replica")
+box.space.test:select()
+---
+- - [1]
+  - [2]
+...
+box.snapshot()
+---
+- ok
+...
+box.space.test:replace{2}
+---
+- [2]
+...
 -- Cleanup.
 test_run:cmd("switch default")
 ---
 - true
 ...
+box.cfg{replication = ''}
+---
+...
 test_run:cmd("stop server replica")
 ---
 - true
diff --git a/test/replication/replica_rejoin.test.lua b/test/replication/replica_rejoin.test.lua
index f998f60d..40094a1a 100644
--- a/test/replication/replica_rejoin.test.lua
+++ b/test/replication/replica_rejoin.test.lua
@@ -83,8 +83,59 @@ test_run:cmd("switch replica")
 box.info.status -- orphan
 box.space.test:select()
 
+--
+-- gh-3740: rebootstrap crashes if the master has rows originating
+-- from the replica.
+--
+
+-- Bootstrap a new replica.
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+test_run:cleanup_cluster()
+box.space.test:truncate()
+test_run:cmd("start server replica")
+-- Subscribe the master to the replica.
+replica_listen = test_run:cmd("eval replica 'return box.cfg.listen'")
+replica_listen ~= nil
+box.cfg{replication = replica_listen}
+-- Unsubscribe the replica from the master.
+test_run:cmd("switch replica")
+box.cfg{replication = ''}
+-- Bump vclock on the master.
+test_run:cmd("switch default")
+box.space.test:replace{1}
+-- Bump vclock on the replica.
+test_run:cmd("switch replica")
+for i = 1, 10 do box.space.test:replace{2} end
+vclock = test_run:get_vclock('replica')
+_ = test_run:wait_vclock('default', vclock)
+-- Restart the master and force garbage collection.
+test_run:cmd("switch default")
+test_run:cmd("restart server default")
+replica_listen = test_run:cmd("eval replica 'return box.cfg.listen'")
+replica_listen ~= nil
+box.cfg{replication = replica_listen}
+default_checkpoint_count = box.cfg.checkpoint_count
+box.cfg{checkpoint_count = 1}
+box.snapshot()
+box.cfg{checkpoint_count = default_checkpoint_count}
+fio = require('fio')
+#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) == 1
+-- Bump vclock on the replica again.
+test_run:cmd("switch replica")
+for i = 1, 10 do box.space.test:replace{2} end
+vclock = test_run:get_vclock('replica')
+_ = test_run:wait_vclock('default', vclock)
+-- Restart the replica. It should successfully rebootstrap.
+test_run:cmd("restart server replica")
+box.space.test:select()
+box.snapshot()
+box.space.test:replace{2}
+
 -- Cleanup.
 test_run:cmd("switch default")
+box.cfg{replication = ''}
 test_run:cmd("stop server replica")
 test_run:cmd("cleanup server replica")
 box.space.test:drop()
diff --git a/test/xlog/panic_on_broken_lsn.result b/test/xlog/panic_on_broken_lsn.result
index ad10217b..b8583def 100644
--- a/test/xlog/panic_on_broken_lsn.result
+++ b/test/xlog/panic_on_broken_lsn.result
@@ -140,10 +140,7 @@ test_run:cmd('start server replica with crash_expected=True')
 fiber = require('fiber')
 ---
 ...
-while box.info.replication == nil do fiber.sleep(0.001) end
----
-...
-while box.info.replication[2].downstream.status ~= "stopped" do fiber.sleep(0.001) end
+while box.info.replication[2] == nil do fiber.sleep(0.001) end
 ---
 ...
 box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
diff --git a/test/xlog/panic_on_broken_lsn.test.lua b/test/xlog/panic_on_broken_lsn.test.lua
index dd450d15..717919d5 100644
--- a/test/xlog/panic_on_broken_lsn.test.lua
+++ b/test/xlog/panic_on_broken_lsn.test.lua
@@ -59,8 +59,7 @@ box.space.test:auto_increment{'v1'}
 test_run:cmd('create server replica with rpl_master=default, script="xlog/replica.lua"')
 test_run:cmd('start server replica with crash_expected=True')
 fiber = require('fiber')
-while box.info.replication == nil do fiber.sleep(0.001) end
-while box.info.replication[2].downstream.status ~= "stopped" do fiber.sleep(0.001) end
+while box.info.replication[2] == nil do fiber.sleep(0.001) end
 box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
 
 logpath = fio.pathjoin(fio.cwd(), 'replica.log')
-- 
2.11.0




More information about the Tarantool-patches mailing list