[PATCH 2/2] replication: stop syncing if quorum cannot be formed
Vladimir Davydov
vdavydov.dev at gmail.com
Wed Feb 14 20:29:15 MSK 2018
If box.cfg() successfully connects to a number of replicas sufficient to
form a quorum (>= box.cfg.replication_connect_quorum), it won't return
until it syncs with all of them (lag <= box.cfg.replication_sync_lag).
If one of the replicas forming a quorum disconnects permanently while
sync is in progress, box.cfg() will hang forever.
Such a behavior is rather unreasonable. After all, syncing a quorum is
best-effort. It would be much more sensible to return from box.cfg()
leaving the instance in the 'orphan' mode in this case. This patch does
exactly that: now if we detect that not enough replicas are connected to
form a quorum while we are syncing we stop syncing immediately.
---
src/box/replication.cc | 92 ++++++++++++++++++++++++++--------------
src/box/replication.h | 30 ++++++++-----
test/replication/errinj.result | 12 ++----
test/replication/errinj.test.lua | 9 ++--
test/replication/replica_ack.lua | 11 -----
5 files changed, 89 insertions(+), 65 deletions(-)
delete mode 100644 test/replication/replica_ack.lua
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 319ea57e..fc8f900b 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -137,7 +137,7 @@ replica_new(void)
rlist_create(&replica->in_anon);
trigger_create(&replica->on_applier_state,
replica_on_applier_state_f, NULL, NULL);
- replica->is_synced = false;
+ replica->state = REPLICA_DISCONNECTED;
return replica;
}
@@ -202,7 +202,6 @@ static void
replica_set_applier(struct replica *replica, struct applier *applier)
{
assert(replica->applier == NULL);
- assert(!replica->is_synced);
replica->applier = applier;
trigger_add(&replica->applier->on_state,
&replica->on_applier_state);
@@ -213,17 +212,15 @@ replica_clear_applier(struct replica *replica)
{
assert(replica->applier != NULL);
replica->applier = NULL;
- replica->is_synced = false;
trigger_clear(&replica->on_applier_state);
}
static void
replica_on_applier_sync(struct replica *replica)
{
- if (replica->is_synced)
- return;
+ assert(replica->state == REPLICA_CONNECTED);
- replica->is_synced = true;
+ replica->state = REPLICA_SYNCED;
replicaset.applier.synced++;
replicaset_check_quorum();
@@ -236,11 +233,10 @@ replica_on_applier_connect(struct replica *replica)
assert(tt_uuid_is_nil(&replica->uuid));
assert(!tt_uuid_is_nil(&applier->uuid));
+ assert(replica->state == REPLICA_DISCONNECTED);
replica->uuid = applier->uuid;
- replicaset.applier.connected++;
-
struct replica *orig = replica_hash_search(&replicaset.hash, replica);
if (orig != NULL && orig->applier != NULL) {
say_error("duplicate connection to the same replica: "
@@ -262,10 +258,14 @@ replica_on_applier_connect(struct replica *replica)
replica_set_applier(orig, applier);
replica_clear_applier(replica);
replica_delete(replica);
+ replica = orig;
} else {
/* Add a new struct replica */
replica_hash_insert(&replicaset.hash, replica);
}
+
+ replica->state = REPLICA_CONNECTED;
+ replicaset.applier.connected++;
}
static void
@@ -275,24 +275,49 @@ replica_on_applier_reconnect(struct replica *replica)
assert(!tt_uuid_is_nil(&replica->uuid));
assert(!tt_uuid_is_nil(&applier->uuid));
+ assert(replica->state == REPLICA_DISCONNECTED);
- if (tt_uuid_is_equal(&replica->uuid, &applier->uuid))
- return;
+ if (!tt_uuid_is_equal(&replica->uuid, &applier->uuid)) {
+ /*
+ * Master's UUID changed, most likely because it was
+ * rebootstrapped. Try to look up a replica matching
+ * the new UUID and reassign the applier to it.
+ */
+ struct replica *orig = replica_by_uuid(&applier->uuid);
+ if (orig == NULL || orig->applier != NULL) {
+ tnt_raise(ClientError, ER_INSTANCE_UUID_MISMATCH,
+ tt_uuid_str(&replica->uuid),
+ tt_uuid_str(&applier->uuid));
+ }
- /*
- * Master's UUID changed, most likely because it was
- * rebootstrapped. Try to look up a replica matching
- * the new UUID and reassign the applier to it.
- */
- struct replica *new_replica = replica_by_uuid(&applier->uuid);
- if (new_replica == NULL || new_replica->applier != NULL) {
- tnt_raise(ClientError, ER_INSTANCE_UUID_MISMATCH,
- tt_uuid_str(&replica->uuid),
- tt_uuid_str(&applier->uuid));
+ replica_set_applier(orig, applier);
+ replica_clear_applier(replica);
+ replica->state = REPLICA_DISCONNECTED;
+ replica = orig;
}
- replica_set_applier(new_replica, applier);
- replica_clear_applier(replica);
+ replica->state = REPLICA_CONNECTED;
+ replicaset.applier.connected++;
+}
+
+static void
+replica_on_applier_disconnect(struct replica *replica)
+{
+ switch (replica->state) {
+ case REPLICA_SYNCED:
+ assert(replicaset.applier.synced > 0);
+ replicaset.applier.synced--;
+ FALLTHROUGH;
+ case REPLICA_CONNECTED:
+ assert(replicaset.applier.connected > 0);
+ replicaset.applier.connected--;
+ break;
+ case REPLICA_DISCONNECTED:
+ break;
+ default:
+ unreachable();
+ }
+ replica->state = REPLICA_DISCONNECTED;
}
static void
@@ -308,6 +333,9 @@ replica_on_applier_state_f(struct trigger *trigger, void *event)
else
replica_on_applier_reconnect(replica);
break;
+ case APPLIER_DISCONNECTED:
+ replica_on_applier_disconnect(replica);
+ break;
case APPLIER_FOLLOW:
replica_on_applier_sync(replica);
break;
@@ -321,7 +349,7 @@ replica_on_applier_state_f(struct trigger *trigger, void *event)
break;
case APPLIER_STOPPED:
/* Unrecoverable error. */
- replicaset.applier.failed++;
+ replica_on_applier_disconnect(replica);
break;
default:
break;
@@ -342,7 +370,6 @@ replicaset_update(struct applier **appliers, int count)
RLIST_HEAD(anon_replicas);
struct replica *replica, *next;
struct applier *applier;
- int connected = 0;
auto uniq_guard = make_scoped_guard([&]{
replica_hash_foreach_safe(&uniq, replica, next) {
@@ -378,7 +405,6 @@ replicaset_update(struct applier **appliers, int count)
"duplicate connection to the same replica");
}
replica_hash_insert(&uniq, replica);
- connected++;
}
/*
@@ -392,6 +418,7 @@ replicaset_update(struct applier **appliers, int count)
continue;
applier = replica->applier;
replica_clear_applier(replica);
+ replica->state = REPLICA_DISCONNECTED;
applier_stop(applier);
applier_delete(applier);
}
@@ -405,6 +432,10 @@ replicaset_update(struct applier **appliers, int count)
rlist_create(&replicaset.anon);
/* Save new appliers */
+ replicaset.applier.total = count;
+ replicaset.applier.connected = 0;
+ replicaset.applier.synced = 0;
+
replica_hash_foreach_safe(&uniq, replica, next) {
replica_hash_remove(&uniq, replica);
@@ -415,18 +446,17 @@ replicaset_update(struct applier **appliers, int count)
replica_set_applier(orig, replica->applier);
replica_clear_applier(replica);
replica_delete(replica);
+ replica = orig;
} else {
/* Add a new struct replica */
replica_hash_insert(&replicaset.hash, replica);
}
+
+ replica->state = REPLICA_CONNECTED;
+ replicaset.applier.connected++;
}
rlist_swap(&replicaset.anon, &anon_replicas);
- replicaset.applier.total = count;
- replicaset.applier.connected = connected;
- replicaset.applier.synced = 0;
- replicaset.applier.failed = 0;
-
assert(replica_hash_first(&uniq) == NULL);
replica_hash_foreach_safe(&replicaset.hash, replica, next) {
if (replica_is_orphan(replica)) {
@@ -601,7 +631,7 @@ replicaset_sync(void)
* a quorum cannot be formed because of errors.
*/
while (replicaset.applier.synced < quorum &&
- replicaset.applier.failed <= replicaset.applier.total - quorum)
+ replicaset.applier.connected >= quorum)
fiber_cond_wait(&replicaset.applier.cond);
}
diff --git a/src/box/replication.h b/src/box/replication.h
index f964eed0..8a9d5754 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -200,11 +200,6 @@ struct replicaset {
*/
int synced;
/**
- * Number of appliers that have been stopped
- * due to unrecoverable errors.
- */
- int failed;
- /**
* Signaled whenever an applier changes its
* state.
*/
@@ -213,6 +208,24 @@ struct replicaset {
};
extern struct replicaset replicaset;
+enum replica_state {
+ /**
+ * Applier has not connected to the master yet
+ * or has disconnected.
+ */
+ REPLICA_DISCONNECTED,
+ /**
+ * Applier has connected to the master and
+ * received UUID.
+ */
+ REPLICA_CONNECTED,
+ /**
+ * Applier has synchronized with the master
+ * (left "sync" and entered "follow" state).
+ */
+ REPLICA_SYNCED,
+};
+
/**
* Summary information about a replica in the replica set.
*/
@@ -241,11 +254,8 @@ struct replica {
* Trigger invoked when the applier changes its state.
*/
struct trigger on_applier_state;
- /**
- * Set if the applier has successfully synchornized to
- * the master (left "sync" and entered "follow" state).
- */
- bool is_synced;
+ /** Replica sync state. */
+ enum replica_state state;
};
enum {
diff --git a/test/replication/errinj.result b/test/replication/errinj.result
index 2f4b2a84..bc8d059f 100644
--- a/test/replication/errinj.result
+++ b/test/replication/errinj.result
@@ -483,15 +483,11 @@ for i = 0, 9999 do box.space.test:replace({i, 4, 5, 'test'}) end
-- during the join stage, i.e. a replica with a minuscule
-- timeout successfully bootstraps and breaks connection only
-- after subscribe.
-test_run:cmd("create server replica_ack with rpl_master=default, script='replication/replica_ack.lua'")
+test_run:cmd("start server replica_timeout with args='0.00001'")
---
- true
...
-test_run:cmd("start server replica_ack")
----
-- true
-...
-test_run:cmd("switch replica_ack")
+test_run:cmd("switch replica_timeout")
---
- true
...
@@ -517,11 +513,11 @@ test_run:cmd("switch default")
---
- true
...
-test_run:cmd("stop server replica_ack")
+test_run:cmd("stop server replica_timeout")
---
- true
...
-test_run:cmd("cleanup server replica_ack")
+test_run:cmd("cleanup server replica_timeout")
---
- true
...
diff --git a/test/replication/errinj.test.lua b/test/replication/errinj.test.lua
index 84a81161..697fdfa9 100644
--- a/test/replication/errinj.test.lua
+++ b/test/replication/errinj.test.lua
@@ -203,9 +203,8 @@ for i = 0, 9999 do box.space.test:replace({i, 4, 5, 'test'}) end
-- during the join stage, i.e. a replica with a minuscule
-- timeout successfully bootstraps and breaks connection only
-- after subscribe.
-test_run:cmd("create server replica_ack with rpl_master=default, script='replication/replica_ack.lua'")
-test_run:cmd("start server replica_ack")
-test_run:cmd("switch replica_ack")
+test_run:cmd("start server replica_timeout with args='0.00001'")
+test_run:cmd("switch replica_timeout")
fiber = require('fiber')
while box.info.replication[1].upstream.message ~= 'timed out' do fiber.sleep(0.0001) end
@@ -213,5 +212,5 @@ test_run:cmd("stop server default")
test_run:cmd("deploy server default")
test_run:cmd("start server default")
test_run:cmd("switch default")
-test_run:cmd("stop server replica_ack")
-test_run:cmd("cleanup server replica_ack")
+test_run:cmd("stop server replica_timeout")
+test_run:cmd("cleanup server replica_timeout")
diff --git a/test/replication/replica_ack.lua b/test/replication/replica_ack.lua
deleted file mode 100644
index fcff3484..00000000
--- a/test/replication/replica_ack.lua
+++ /dev/null
@@ -1,11 +0,0 @@
-#!/usr/bin/env tarantool
-
-box.cfg({
- listen = os.getenv("LISTEN"),
- replication = os.getenv("MASTER"),
- memtx_memory = 107374182,
- replication_timeout = 0.00001,
- replication_connect_quorum = 0,
-})
-
-require('console').listen(os.getenv('ADMIN'))
--
2.11.0
More information about the Tarantool-patches
mailing list