[tarantool-patches] [PATCH v7 3/3] box: adds replication sync after cfg. update

Vladimir Davydov vdavydov.dev at gmail.com
Fri Aug 31 16:01:50 MSK 2018


I reworked this patch, added a proper test, and forced pushed it
to the branch:

https://github.com/tarantool/tarantool/commits/OKriw/gh-3427-replication-no-sync-1.9

Kostja, please take a look. The new patch is below:

>From 43839be2feb3aad444b44b84076d5f0f6b374b55 Mon Sep 17 00:00:00 2001
From: Vladimir Davydov <vdavydov.dev at gmail.com>
Date: Fri, 31 Aug 2018 13:11:58 +0300
Subject: [PATCH] box: sync on replication configuration update

Now box.cfg() doesn't return until 'quorum' appliers are in sync not
only on initial configuration, but also on replication configuration
update. If it fails to synchronize within replication_sync_timeout,
box.cfg() returns without an error, but the instance enters 'orphan'
state, which is basically read-only mode. In the meantime, appliers
will keep trying to synchronize in the background, and the instance
will leave 'orphan' state as soon as enough appliers are in sync.

Note, this patch also changes logging a bit:
 - 'ready to accept request' is printed on startup before syncing
   with the replica set, because although the instance is read-only
   at that time, it can indeed accept all sorts of ro requests.
 - For 'connecting', 'connected', 'synchronizing' messages, we now
   use 'info' logging level, not 'verbose' as they used to be, because
   those messages are important as they give the admin idea what's
   going on with the instance, and they can't flood logs.
 - 'sync complete' message is also printed as 'info', not 'crit',
   because there's nothing critical about it (it's not an error).

Also note that we only enter 'orphan' state if failed to synchronize.
In particular, if the instnace manages to synchronize with all replicas
within a timeout, it will jump from 'loading' straight into 'running'
bypassing 'orphan' state. This is done for the sake of consistency
between initial configuration and reconfiguration.

Closes #3427

@TarantoolBot document
Title: Sync on replication configuration update
The behavior of box.cfg() on replication configuration update is
now consistent with initial configuration, that is box.cfg() will
not return until it synchronizes with as many masters as specified
by replication_connect_quorum configuration option or the timeout
specified by replication_connect_sync occurs. On timeout, it will
return without an error, but the instance will enter 'orphan' state.
It will leave 'orphan' state as soon as enough appliers have synced.

diff --git a/src/box/box.cc b/src/box/box.cc
index dcedfd00..63b2974f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -110,7 +110,7 @@ static fiber_cond ro_cond;
  * synchronize to a sufficient number of replicas to form
  * a quorum and so was forced to switch to read-only mode.
  */
-static bool is_orphan = true;
+static bool is_orphan;
 
 /* Use the shared instance of xstream for all appliers */
 static struct xstream join_stream;
@@ -219,16 +219,22 @@ box_wait_ro(bool ro, double timeout)
 }
 
 void
-box_clear_orphan(void)
+box_set_orphan(bool orphan)
 {
-	if (!is_orphan)
+	if (is_orphan == orphan)
 		return; /* nothing to do */
 
-	is_orphan = false;
+	is_orphan = orphan;
 	fiber_cond_broadcast(&ro_cond);
 
 	/* Update the title to reflect the new status. */
-	title("running");
+	if (is_orphan) {
+		say_crit("entering orphan mode");
+		title("orphan");
+	} else {
+		say_crit("leaving orphan mode");
+		title("running");
+	}
 }
 
 struct wal_stream {
@@ -646,6 +652,8 @@ box_set_replication(void)
 	box_sync_replication(true);
 	/* Follow replica */
 	replicaset_follow();
+	/* Wait until appliers are in sync */
+	replicaset_sync();
 }
 
 void
@@ -1893,8 +1901,6 @@ box_cfg_xc(void)
 		/** Begin listening only when the local recovery is complete. */
 		box_listen();
 
-		title("orphan");
-
 		/*
 		 * In case of recovering from a checkpoint we
 		 * don't need to wait for 'quorum' masters, since
@@ -1913,8 +1919,6 @@ box_cfg_xc(void)
 		 */
 		box_listen();
 
-		title("orphan");
-
 		/*
 		 * Wait for the cluster to start up.
 		 *
@@ -1951,25 +1955,17 @@ box_cfg_xc(void)
 
 	rmean_cleanup(rmean_box);
 
-	/*
-	 * If this instance is a leader of a newly bootstrapped
-	 * cluster, it is uptodate by definition so leave the
-	 * 'orphan' mode right away to let it initialize cluster
-	 * schema.
-	 */
-	if (is_bootstrap_leader)
-		box_clear_orphan();
-
 	/* Follow replica */
 	replicaset_follow();
 
 	fiber_gc();
 	is_box_configured = true;
 
+	title("running");
+	say_info("ready to accept requests");
+
 	if (!is_bootstrap_leader)
 		replicaset_sync();
-
-	say_info("ready to accept requests");
 }
 
 void
diff --git a/src/box/box.h b/src/box/box.h
index 6e1c13f5..5e5ec7d3 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -100,12 +100,16 @@ int
 box_wait_ro(bool ro, double timeout);
 
 /**
- * Switch this instance from 'orphan' to 'running' state.
- * Called on initial configuration as soon as this instance
- * synchronizes with enough replicas to form a quorum.
+ * Switch this instance from 'orphan' to 'running' state or
+ * vice versa depending on the value of the function argument.
+ *
+ * An instance enters 'orphan' state on returning from box.cfg()
+ * if it failed to synchornize with 'quorum' replicas within a
+ * specified timeout. It will keep trying to synchronize in the
+ * background and leave 'orphan' state once it's done.
  */
 void
-box_clear_orphan(void);
+box_set_orphan(bool orphan);
 
 /** True if snapshot is in progress. */
 extern bool box_checkpoint_is_in_progress;
diff --git a/src/box/replication.cc b/src/box/replication.cc
index ff56f442..b0952740 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -548,7 +548,8 @@ replicaset_connect(struct applier **appliers, int count,
 		replicaset_update(appliers, count);
 		return;
 	}
-	say_verbose("connecting to %d replicas", count);
+
+	say_info("connecting to %d replicas", count);
 
 	/*
 	 * Simultaneously connect to remote peers to receive their UUIDs
@@ -602,7 +603,7 @@ replicaset_connect(struct applier **appliers, int count,
 		if (connect_quorum && state.connected < quorum)
 			goto error;
 	} else {
-		say_verbose("connected to %d replicas", state.connected);
+		say_info("connected to %d replicas", state.connected);
 	}
 
 	for (int i = 0; i < count; i++) {
@@ -636,13 +637,6 @@ error:
 void
 replicaset_follow(void)
 {
-	if (replicaset.applier.total == 0) {
-		/*
-		 * Replication is not configured.
-		 */
-		box_clear_orphan();
-		return;
-	}
 	struct replica *replica;
 	replicaset_foreach(replica) {
 		/* Resume connected appliers. */
@@ -653,13 +647,6 @@ replicaset_follow(void)
 		/* Restart appliers that failed to connect. */
 		applier_start(replica->applier);
 	}
-	if (replicaset_quorum() == 0) {
-		/*
-		 * Leaving orphan mode immediately since
-		 * replication_connect_quorum is set to 0.
-		 */
-		box_clear_orphan();
-	}
 }
 
 void
@@ -667,10 +654,16 @@ replicaset_sync(void)
 {
 	int quorum = replicaset_quorum();
 
-	if (quorum == 0)
+	if (quorum == 0) {
+		/*
+		 * Quorum is 0 or replication is not configured.
+		 * Leaving 'orphan' state immediately.
+		 */
+		box_set_orphan(false);
 		return;
+	}
 
-	say_verbose("synchronizing with %d replicas", quorum);
+	say_info("synchronizing with %d replicas", quorum);
 
 	/*
 	 * Wait until all connected replicas synchronize up to
@@ -691,22 +684,21 @@ replicaset_sync(void)
 		 * Do not stall configuration, leave the instance
 		 * in 'orphan' state.
 		 */
-		say_crit("entering orphan mode");
-		return;
+		say_crit("failed to synchronize with %d out of %d replicas",
+			 replicaset.applier.total - replicaset.applier.synced,
+			 replicaset.applier.total);
+		box_set_orphan(true);
+	} else {
+		say_info("replica set sync complete");
+		box_set_orphan(false);
 	}
-
-	say_crit("replica set sync complete, quorum of %d "
-		 "replicas formed", quorum);
 }
 
 void
 replicaset_check_quorum(void)
 {
-	if (replicaset.applier.synced >= replicaset_quorum()) {
-		if (replicaset_quorum() > 0)
-			say_crit("leaving orphan mode");
-		box_clear_orphan();
-	}
+	if (replicaset.applier.synced >= replicaset_quorum())
+		box_set_orphan(false);
 }
 
 void
diff --git a/test/replication/sync.result b/test/replication/sync.result
new file mode 100644
index 00000000..8994aa3e
--- /dev/null
+++ b/test/replication/sync.result
@@ -0,0 +1,236 @@
+fiber = require('fiber')
+---
+...
+test_run = require('test_run').new()
+---
+...
+engine = test_run:get_cfg('engine')
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+_ = box.schema.space.create('test', {engine = engine})
+---
+...
+_ = box.space.test:create_index('pk')
+---
+...
+-- Slow down replication a little to test replication_sync_lag.
+box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.001)
+---
+- ok
+...
+-- Helper that adds some records to the space and then starts
+-- a fiber to add more records in the background.
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+count = 0;
+---
+...
+function fill()
+    for i = count + 1, count + 100 do
+        box.space.test:replace{i}
+    end
+    fiber.create(function()
+        for i = count + 101, count + 200 do
+            fiber.sleep(0.0001)
+            box.space.test:replace{i}
+        end
+    end)
+    count = count + 200
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Deploy a replica.
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+-- Stop replication.
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+-- Fill the space.
+test_run:cmd("switch default")
+---
+- true
+...
+fill()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- Resume replication.
+--
+-- Since max allowed lag is small, all recoreds should arrive
+-- by the time box.cfg() returns.
+--
+box.cfg{replication_sync_lag = 0.001}
+---
+...
+box.cfg{replication = replication}
+---
+...
+box.space.test:count() == 200
+---
+- true
+...
+box.info.status -- running
+---
+- running
+...
+box.info.ro -- false
+---
+- false
+...
+-- Stop replication.
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+-- Fill the space.
+test_run:cmd("switch default")
+---
+- true
+...
+fill()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- Resume replication
+--
+-- Since max allowed lag is big, not all records will arrive
+-- upon returning from box.cfg() but the instance won't enter
+-- orphan state.
+--
+box.cfg{replication_sync_lag = 1}
+---
+...
+box.cfg{replication = replication}
+---
+...
+box.space.test:count() < 400
+---
+- true
+...
+box.info.status -- running
+---
+- running
+...
+box.info.ro -- false
+---
+- false
+...
+-- Wait for remaining rows to arrive.
+repeat fiber.sleep(0.01) until box.space.test:count() == 400
+---
+...
+-- Stop replication.
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+-- Fill the space.
+test_run:cmd("switch default")
+---
+- true
+...
+fill()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- Resume replication
+--
+-- Since max allowed lag is big, not all records will arrive
+-- upon returning from box.cfg() but the instance won't enter
+-- orphan state.
+--
+box.cfg{replication_sync_lag = 0.001, replication_sync_timeout = 0.001}
+---
+...
+box.cfg{replication = replication}
+---
+...
+box.space.test:count() < 600
+---
+- true
+...
+box.info.status -- orphan
+---
+- orphan
+...
+box.info.ro -- true
+---
+- true
+...
+-- Wait for remaining rows to arrive.
+repeat fiber.sleep(0.01) until box.space.test:count() == 600
+---
+...
+-- Make sure replica leaves oprhan state.
+repeat fiber.sleep(0.01) until box.info.status ~= 'orphan'
+---
+...
+box.info.status -- running
+---
+- running
+...
+box.info.ro -- false
+---
+- false
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0)
+---
+- ok
+...
+box.space.test:drop()
+---
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
diff --git a/test/replication/sync.test.lua b/test/replication/sync.test.lua
new file mode 100644
index 00000000..3cef825d
--- /dev/null
+++ b/test/replication/sync.test.lua
@@ -0,0 +1,116 @@
+fiber = require('fiber')
+test_run = require('test_run').new()
+engine = test_run:get_cfg('engine')
+
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('test', {engine = engine})
+_ = box.space.test:create_index('pk')
+
+-- Slow down replication a little to test replication_sync_lag.
+box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.001)
+
+-- Helper that adds some records to the space and then starts
+-- a fiber to add more records in the background.
+test_run:cmd("setopt delimiter ';'")
+count = 0;
+function fill()
+    for i = count + 1, count + 100 do
+        box.space.test:replace{i}
+    end
+    fiber.create(function()
+        for i = count + 101, count + 200 do
+            fiber.sleep(0.0001)
+            box.space.test:replace{i}
+        end
+    end)
+    count = count + 200
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Deploy a replica.
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+
+fiber = require('fiber')
+
+-- Stop replication.
+replication = box.cfg.replication
+box.cfg{replication = {}}
+
+-- Fill the space.
+test_run:cmd("switch default")
+fill()
+test_run:cmd("switch replica")
+
+-- Resume replication.
+--
+-- Since max allowed lag is small, all recoreds should arrive
+-- by the time box.cfg() returns.
+--
+box.cfg{replication_sync_lag = 0.001}
+box.cfg{replication = replication}
+box.space.test:count() == 200
+box.info.status -- running
+box.info.ro -- false
+
+-- Stop replication.
+replication = box.cfg.replication
+box.cfg{replication = {}}
+
+-- Fill the space.
+test_run:cmd("switch default")
+fill()
+test_run:cmd("switch replica")
+
+-- Resume replication
+--
+-- Since max allowed lag is big, not all records will arrive
+-- upon returning from box.cfg() but the instance won't enter
+-- orphan state.
+--
+box.cfg{replication_sync_lag = 1}
+box.cfg{replication = replication}
+box.space.test:count() < 400
+box.info.status -- running
+box.info.ro -- false
+
+-- Wait for remaining rows to arrive.
+repeat fiber.sleep(0.01) until box.space.test:count() == 400
+
+-- Stop replication.
+replication = box.cfg.replication
+box.cfg{replication = {}}
+
+-- Fill the space.
+test_run:cmd("switch default")
+fill()
+test_run:cmd("switch replica")
+
+-- Resume replication
+--
+-- Since max allowed lag is big, not all records will arrive
+-- upon returning from box.cfg() but the instance won't enter
+-- orphan state.
+--
+box.cfg{replication_sync_lag = 0.001, replication_sync_timeout = 0.001}
+box.cfg{replication = replication}
+box.space.test:count() < 600
+box.info.status -- orphan
+box.info.ro -- true
+
+-- Wait for remaining rows to arrive.
+repeat fiber.sleep(0.01) until box.space.test:count() == 600
+
+-- Make sure replica leaves oprhan state.
+repeat fiber.sleep(0.01) until box.info.status ~= 'orphan'
+box.info.status -- running
+box.info.ro -- false
+
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0)
+box.space.test:drop()
+box.schema.user.revoke('guest', 'replication')



More information about the Tarantool-patches mailing list