[PATCH 2/2] replication: stop syncing if quorum cannot be formed

Vladimir Davydov vdavydov.dev at gmail.com
Wed Feb 14 20:29:15 MSK 2018


If box.cfg() successfully connects to a number of replicas sufficient to
form a quorum (>= box.cfg.replication_connect_quorum), it won't return
until it syncs with all of them (lag <= box.cfg.replication_sync_lag).
If one of the replicas forming a quorum disconnects permanently while
sync is in progress, box.cfg() will hang forever.

Such a behavior is rather unreasonable. After all, syncing a quorum is
best-effort. It would be much more sensible to return from box.cfg()
leaving the instance in the 'orphan' mode in this case. This patch does
exactly that: now if we detect that not enough replicas are connected to
form a quorum while we are syncing we stop syncing immediately.
---
 src/box/replication.cc           | 92 ++++++++++++++++++++++++++--------------
 src/box/replication.h            | 30 ++++++++-----
 test/replication/errinj.result   | 12 ++----
 test/replication/errinj.test.lua |  9 ++--
 test/replication/replica_ack.lua | 11 -----
 5 files changed, 89 insertions(+), 65 deletions(-)
 delete mode 100644 test/replication/replica_ack.lua

diff --git a/src/box/replication.cc b/src/box/replication.cc
index 319ea57e..fc8f900b 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -137,7 +137,7 @@ replica_new(void)
 	rlist_create(&replica->in_anon);
 	trigger_create(&replica->on_applier_state,
 		       replica_on_applier_state_f, NULL, NULL);
-	replica->is_synced = false;
+	replica->state = REPLICA_DISCONNECTED;
 	return replica;
 }
 
@@ -202,7 +202,6 @@ static void
 replica_set_applier(struct replica *replica, struct applier *applier)
 {
 	assert(replica->applier == NULL);
-	assert(!replica->is_synced);
 	replica->applier = applier;
 	trigger_add(&replica->applier->on_state,
 		    &replica->on_applier_state);
@@ -213,17 +212,15 @@ replica_clear_applier(struct replica *replica)
 {
 	assert(replica->applier != NULL);
 	replica->applier = NULL;
-	replica->is_synced = false;
 	trigger_clear(&replica->on_applier_state);
 }
 
 static void
 replica_on_applier_sync(struct replica *replica)
 {
-	if (replica->is_synced)
-		return;
+	assert(replica->state == REPLICA_CONNECTED);
 
-	replica->is_synced = true;
+	replica->state = REPLICA_SYNCED;
 	replicaset.applier.synced++;
 
 	replicaset_check_quorum();
@@ -236,11 +233,10 @@ replica_on_applier_connect(struct replica *replica)
 
 	assert(tt_uuid_is_nil(&replica->uuid));
 	assert(!tt_uuid_is_nil(&applier->uuid));
+	assert(replica->state == REPLICA_DISCONNECTED);
 
 	replica->uuid = applier->uuid;
 
-	replicaset.applier.connected++;
-
 	struct replica *orig = replica_hash_search(&replicaset.hash, replica);
 	if (orig != NULL && orig->applier != NULL) {
 		say_error("duplicate connection to the same replica: "
@@ -262,10 +258,14 @@ replica_on_applier_connect(struct replica *replica)
 		replica_set_applier(orig, applier);
 		replica_clear_applier(replica);
 		replica_delete(replica);
+		replica = orig;
 	} else {
 		/* Add a new struct replica */
 		replica_hash_insert(&replicaset.hash, replica);
 	}
+
+	replica->state = REPLICA_CONNECTED;
+	replicaset.applier.connected++;
 }
 
 static void
@@ -275,24 +275,49 @@ replica_on_applier_reconnect(struct replica *replica)
 
 	assert(!tt_uuid_is_nil(&replica->uuid));
 	assert(!tt_uuid_is_nil(&applier->uuid));
+	assert(replica->state == REPLICA_DISCONNECTED);
 
-	if (tt_uuid_is_equal(&replica->uuid, &applier->uuid))
-		return;
+	if (!tt_uuid_is_equal(&replica->uuid, &applier->uuid)) {
+		/*
+		 * Master's UUID changed, most likely because it was
+		 * rebootstrapped. Try to look up a replica matching
+		 * the new UUID and reassign the applier to it.
+		 */
+		struct replica *orig = replica_by_uuid(&applier->uuid);
+		if (orig == NULL || orig->applier != NULL) {
+			tnt_raise(ClientError, ER_INSTANCE_UUID_MISMATCH,
+				  tt_uuid_str(&replica->uuid),
+				  tt_uuid_str(&applier->uuid));
+		}
 
-	/*
-	 * Master's UUID changed, most likely because it was
-	 * rebootstrapped. Try to look up a replica matching
-	 * the new UUID and reassign the applier to it.
-	 */
-	struct replica *new_replica = replica_by_uuid(&applier->uuid);
-	if (new_replica == NULL || new_replica->applier != NULL) {
-		tnt_raise(ClientError, ER_INSTANCE_UUID_MISMATCH,
-			  tt_uuid_str(&replica->uuid),
-			  tt_uuid_str(&applier->uuid));
+		replica_set_applier(orig, applier);
+		replica_clear_applier(replica);
+		replica->state = REPLICA_DISCONNECTED;
+		replica = orig;
 	}
 
-	replica_set_applier(new_replica, applier);
-	replica_clear_applier(replica);
+	replica->state = REPLICA_CONNECTED;
+	replicaset.applier.connected++;
+}
+
+static void
+replica_on_applier_disconnect(struct replica *replica)
+{
+	switch (replica->state) {
+	case REPLICA_SYNCED:
+		assert(replicaset.applier.synced > 0);
+		replicaset.applier.synced--;
+		FALLTHROUGH;
+	case REPLICA_CONNECTED:
+		assert(replicaset.applier.connected > 0);
+		replicaset.applier.connected--;
+		break;
+	case REPLICA_DISCONNECTED:
+		break;
+	default:
+		unreachable();
+	}
+	replica->state = REPLICA_DISCONNECTED;
 }
 
 static void
@@ -308,6 +333,9 @@ replica_on_applier_state_f(struct trigger *trigger, void *event)
 		else
 			replica_on_applier_reconnect(replica);
 		break;
+	case APPLIER_DISCONNECTED:
+		replica_on_applier_disconnect(replica);
+		break;
 	case APPLIER_FOLLOW:
 		replica_on_applier_sync(replica);
 		break;
@@ -321,7 +349,7 @@ replica_on_applier_state_f(struct trigger *trigger, void *event)
 		break;
 	case APPLIER_STOPPED:
 		/* Unrecoverable error. */
-		replicaset.applier.failed++;
+		replica_on_applier_disconnect(replica);
 		break;
 	default:
 		break;
@@ -342,7 +370,6 @@ replicaset_update(struct applier **appliers, int count)
 	RLIST_HEAD(anon_replicas);
 	struct replica *replica, *next;
 	struct applier *applier;
-	int connected = 0;
 
 	auto uniq_guard = make_scoped_guard([&]{
 		replica_hash_foreach_safe(&uniq, replica, next) {
@@ -378,7 +405,6 @@ replicaset_update(struct applier **appliers, int count)
 				  "duplicate connection to the same replica");
 		}
 		replica_hash_insert(&uniq, replica);
-		connected++;
 	}
 
 	/*
@@ -392,6 +418,7 @@ replicaset_update(struct applier **appliers, int count)
 			continue;
 		applier = replica->applier;
 		replica_clear_applier(replica);
+		replica->state = REPLICA_DISCONNECTED;
 		applier_stop(applier);
 		applier_delete(applier);
 	}
@@ -405,6 +432,10 @@ replicaset_update(struct applier **appliers, int count)
 	rlist_create(&replicaset.anon);
 
 	/* Save new appliers */
+	replicaset.applier.total = count;
+	replicaset.applier.connected = 0;
+	replicaset.applier.synced = 0;
+
 	replica_hash_foreach_safe(&uniq, replica, next) {
 		replica_hash_remove(&uniq, replica);
 
@@ -415,18 +446,17 @@ replicaset_update(struct applier **appliers, int count)
 			replica_set_applier(orig, replica->applier);
 			replica_clear_applier(replica);
 			replica_delete(replica);
+			replica = orig;
 		} else {
 			/* Add a new struct replica */
 			replica_hash_insert(&replicaset.hash, replica);
 		}
+
+		replica->state = REPLICA_CONNECTED;
+		replicaset.applier.connected++;
 	}
 	rlist_swap(&replicaset.anon, &anon_replicas);
 
-	replicaset.applier.total = count;
-	replicaset.applier.connected = connected;
-	replicaset.applier.synced = 0;
-	replicaset.applier.failed = 0;
-
 	assert(replica_hash_first(&uniq) == NULL);
 	replica_hash_foreach_safe(&replicaset.hash, replica, next) {
 		if (replica_is_orphan(replica)) {
@@ -601,7 +631,7 @@ replicaset_sync(void)
 	 * a quorum cannot be formed because of errors.
 	 */
 	while (replicaset.applier.synced < quorum &&
-	       replicaset.applier.failed <= replicaset.applier.total - quorum)
+	       replicaset.applier.connected >= quorum)
 		fiber_cond_wait(&replicaset.applier.cond);
 }
 
diff --git a/src/box/replication.h b/src/box/replication.h
index f964eed0..8a9d5754 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -200,11 +200,6 @@ struct replicaset {
 		 */
 		int synced;
 		/**
-		 * Number of appliers that have been stopped
-		 * due to unrecoverable errors.
-		 */
-		int failed;
-		/**
 		 * Signaled whenever an applier changes its
 		 * state.
 		 */
@@ -213,6 +208,24 @@ struct replicaset {
 };
 extern struct replicaset replicaset;
 
+enum replica_state {
+	/**
+	 * Applier has not connected to the master yet
+	 * or has disconnected.
+	 */
+	REPLICA_DISCONNECTED,
+	/**
+	 * Applier has connected to the master and
+	 * received UUID.
+	 */
+	REPLICA_CONNECTED,
+	/**
+	 * Applier has synchronized with the master
+	 * (left "sync" and entered "follow" state).
+	 */
+	REPLICA_SYNCED,
+};
+
 /**
  * Summary information about a replica in the replica set.
  */
@@ -241,11 +254,8 @@ struct replica {
 	 * Trigger invoked when the applier changes its state.
 	 */
 	struct trigger on_applier_state;
-	/**
-	 * Set if the applier has successfully synchornized to
-	 * the master (left "sync" and entered "follow" state).
-	 */
-	bool is_synced;
+	/** Replica sync state. */
+	enum replica_state state;
 };
 
 enum {
diff --git a/test/replication/errinj.result b/test/replication/errinj.result
index 2f4b2a84..bc8d059f 100644
--- a/test/replication/errinj.result
+++ b/test/replication/errinj.result
@@ -483,15 +483,11 @@ for i = 0, 9999 do box.space.test:replace({i, 4, 5, 'test'}) end
 -- during the join stage, i.e. a replica with a minuscule
 -- timeout successfully bootstraps and breaks connection only
 -- after subscribe.
-test_run:cmd("create server replica_ack with rpl_master=default, script='replication/replica_ack.lua'")
+test_run:cmd("start server replica_timeout with args='0.00001'")
 ---
 - true
 ...
-test_run:cmd("start server replica_ack")
----
-- true
-...
-test_run:cmd("switch replica_ack")
+test_run:cmd("switch replica_timeout")
 ---
 - true
 ...
@@ -517,11 +513,11 @@ test_run:cmd("switch default")
 ---
 - true
 ...
-test_run:cmd("stop server replica_ack")
+test_run:cmd("stop server replica_timeout")
 ---
 - true
 ...
-test_run:cmd("cleanup server replica_ack")
+test_run:cmd("cleanup server replica_timeout")
 ---
 - true
 ...
diff --git a/test/replication/errinj.test.lua b/test/replication/errinj.test.lua
index 84a81161..697fdfa9 100644
--- a/test/replication/errinj.test.lua
+++ b/test/replication/errinj.test.lua
@@ -203,9 +203,8 @@ for i = 0, 9999 do box.space.test:replace({i, 4, 5, 'test'}) end
 -- during the join stage, i.e. a replica with a minuscule
 -- timeout successfully bootstraps and breaks connection only
 -- after subscribe.
-test_run:cmd("create server replica_ack with rpl_master=default, script='replication/replica_ack.lua'")
-test_run:cmd("start server replica_ack")
-test_run:cmd("switch replica_ack")
+test_run:cmd("start server replica_timeout with args='0.00001'")
+test_run:cmd("switch replica_timeout")
 fiber = require('fiber')
 while box.info.replication[1].upstream.message ~= 'timed out' do fiber.sleep(0.0001) end
 
@@ -213,5 +212,5 @@ test_run:cmd("stop server default")
 test_run:cmd("deploy server default")
 test_run:cmd("start server default")
 test_run:cmd("switch default")
-test_run:cmd("stop server replica_ack")
-test_run:cmd("cleanup server replica_ack")
+test_run:cmd("stop server replica_timeout")
+test_run:cmd("cleanup server replica_timeout")
diff --git a/test/replication/replica_ack.lua b/test/replication/replica_ack.lua
deleted file mode 100644
index fcff3484..00000000
--- a/test/replication/replica_ack.lua
+++ /dev/null
@@ -1,11 +0,0 @@
-#!/usr/bin/env tarantool
-
-box.cfg({
-    listen              = os.getenv("LISTEN"),
-    replication         = os.getenv("MASTER"),
-    memtx_memory        = 107374182,
-    replication_timeout = 0.00001,
-    replication_connect_quorum = 0,
-})
-
-require('console').listen(os.getenv('ADMIN'))
-- 
2.11.0




More information about the Tarantool-patches mailing list