[server 5/5] replication: introduce orphan mode

Vladimir Davydov vdavydov.dev at gmail.com
Wed Jan 24 20:44:54 MSK 2018


This patch modifies the replication configuration procedure so as to
fully conform to the specification presented in #2958. In a nutshell,
now box.cfg() tries to synchronize all connected replicas before
returning. If it fails to connect enough replicas to form a quorum, it
leaves the server in a degraded 'orphan' mode, which is basically
read-only. More details below.

First of all, it's worth mentioning that we already have 'orphan' status
in Tarantool (between 'loading' and 'hot_standby'), but it has nothing
to do with replication. Actually, it's unclear why it was introduced in
the first place so we agreed to silently drop it.

We assume that a replica is synchronized if its lag is not greater than
the value of new configuration option box.cfg.replication_sync_lag.
Otherwise a replica is considered to be syncing and has "sync" status.
If replication_sync_lag is unset (nil) or set to TIMEOUT_INFINITY, then
a replica skips the "sync" state and switches to "follow" immediately.
For the sake of backward compatibility, the new option is unset by
default.

If box.cfg() is called for the very first time (bootstrap) for a given
instance, then

 1. It tries to connect to all configured replicas for as long as it
    takes (replication_timeout isn't taken into account). If it fails to
    connect to at least one replica, bootstrap is aborted.

 2. If this is a cluster bootstrap and the current instance turns out to
    be the new cluster leader, then it performs local bootstrap and
    switches to 'running' state and leaves box.cfg() immediately.

 3. Otherwise (i.e. if this is bootstrap of a slave replica), then it
    bootstraps from a remote master and then stays in 'orphan' state
    until it synchronizes with all replicas before switching to
    'running' state and leaving box.cfg().

If box.cfg() is called after bootstrap, in order to recover from the
local storage, then

 1. It recovers the last snapshot and xlogs stored in the local
    directory.

 2. Then it switches to 'orphan' mode and tries to connect to at least
    as many replicas as specified by box.cfg.replication_connect_quorum
    for a time period which is a multiple of box.cfg.replication_timeout
    (4x). If it fails, it doesn't abort, but leaves box.cfg() in
    'orphan' mode. The state will switch to 'running' asynchronously as
    soon as the instance has synced with 'replication_connect_quorum'
    replicas.

 3. If it managed to connect to enough replicas to form a quorum at step
    2, it synchronizes with them: box.cfg() doesn't return until at
    least 'replication_connect_quorum' replicas have been synchronized.

If box.cfg() is called after recovery to reconfigure replication, then
it tries to connect to all specified replicas within a time period which
is a multiple of box.cfg.replication_timeout (4x). The value of
box.cfg.replication_connect_quorum isn't taken into account, neither is
the value of box.cfg.replication_sync_lag - box.cfg() returns as soon as
all configured replicas have been connected.

Just like any other status, the new one is reflected by box.info.status.

Suggested by @kostja

Follow-up #2958
Closes #999
---
 src/box/applier.cc               |  17 ++++-
 src/box/applier.h                |   7 +-
 src/box/box.cc                   |  81 ++++++++++++++++------
 src/box/box.h                    |   8 +++
 src/box/lua/load_cfg.lua         |   4 +-
 src/box/replication.cc           | 145 +++++++++++++++++++++++++++++++++------
 src/box/replication.h            |  82 +++++++++++++++++++---
 src/cfg.c                        |  10 +++
 src/cfg.h                        |   3 +
 test/replication/quorum.lua      |  21 ++++--
 test/replication/quorum.result   | 135 ++++++++++++++++++------------------
 test/replication/quorum.test.lua | 100 +++++++++++++--------------
 test/replication/quorum1.lua     |   1 +
 test/replication/quorum2.lua     |   1 +
 test/replication/quorum3.lua     |   1 +
 test/replication/suite.ini       |   2 +-
 16 files changed, 435 insertions(+), 183 deletions(-)
 create mode 120000 test/replication/quorum1.lua
 create mode 120000 test/replication/quorum2.lua
 create mode 120000 test/replication/quorum3.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index f8f4e7e7..60d4446e 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -80,6 +80,7 @@ applier_log_error(struct applier *applier, struct error *e)
 	case APPLIER_AUTH:
 		say_info("failed to authenticate");
 		break;
+	case APPLIER_SYNC:
 	case APPLIER_FOLLOW:
 	case APPLIER_INITIAL_JOIN:
 	case APPLIER_FINAL_JOIN:
@@ -110,7 +111,8 @@ applier_writer_f(va_list ap)
 		fiber_cond_wait_timeout(&applier->writer_cond,
 					replication_timeout);
 		/* Send ACKs only when in FOLLOW mode ,*/
-		if (applier->state != APPLIER_FOLLOW)
+		if (applier->state != APPLIER_SYNC &&
+		    applier->state != APPLIER_FOLLOW)
 			continue;
 		try {
 			struct xrow_header xrow;
@@ -353,7 +355,7 @@ applier_subscribe(struct applier *applier)
 	coio_write_xrow(coio, &row);
 
 	if (applier->state == APPLIER_READY) {
-		applier_set_state(applier, APPLIER_FOLLOW);
+		applier_set_state(applier, APPLIER_SYNC);
 	} else {
 		/*
 		 * Tarantool < 1.7.0 sends replica id during
@@ -406,6 +408,8 @@ applier_subscribe(struct applier *applier)
 		fiber_start(applier->writer, applier);
 	}
 
+	applier->lag = TIMEOUT_INFINITY;
+
 	/*
 	 * Process a stream of rows from the binary log.
 	 */
@@ -415,6 +419,12 @@ applier_subscribe(struct applier *applier)
 			say_info("final data received");
 			applier_set_state(applier, APPLIER_JOINED);
 			applier_set_state(applier, APPLIER_READY);
+			applier_set_state(applier, APPLIER_SYNC);
+		}
+
+		if (applier->state == APPLIER_SYNC &&
+		    applier->lag <= replication_sync_lag) {
+			/* Applier is synced, switch to "follow". */
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
@@ -449,7 +459,8 @@ applier_subscribe(struct applier *applier)
 				      row.lsn);
 			xstream_write_xc(applier->subscribe_stream, &row);
 		}
-		if (applier->state == APPLIER_FOLLOW)
+		if (applier->state == APPLIER_SYNC ||
+		    applier->state == APPLIER_FOLLOW)
 			fiber_cond_signal(&applier->writer_cond);
 		if (ibuf_used(ibuf) == 0)
 			ibuf_reset(ibuf);
diff --git a/src/box/applier.h b/src/box/applier.h
index ea66825d..c035b6d4 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -58,9 +58,10 @@ enum { APPLIER_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */
 	_(APPLIER_INITIAL_JOIN, 5)                                   \
 	_(APPLIER_FINAL_JOIN, 6)                                     \
 	_(APPLIER_JOINED, 7)                                         \
-	_(APPLIER_FOLLOW, 8)                                         \
-	_(APPLIER_STOPPED, 9)                                        \
-	_(APPLIER_DISCONNECTED, 10)                                  \
+	_(APPLIER_SYNC, 8)                                           \
+	_(APPLIER_FOLLOW, 9)                                         \
+	_(APPLIER_STOPPED, 10)                                       \
+	_(APPLIER_DISCONNECTED, 11)                                  \
 
 /** States for the applier */
 ENUM(applier_state, applier_STATE);
diff --git a/src/box/box.cc b/src/box/box.cc
index b80ad4a1..1dbca083 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -104,13 +104,12 @@ static struct gc_consumer *backup_gc;
 static bool is_box_configured = false;
 static bool is_ro = true;
 
-static const int REPLICATION_CONNECT_QUORUM_ALL = INT_MAX;
-
 /**
- * Min number of masters to connect for configuration to succeed.
- * If set to REPLICATION_CONNECT_QUORUM_ALL, wait for all masters.
+ * The following flag is set if the instance failed to
+ * synchronize to a sufficient number of replicas to form
+ * a quorum and so was forced to switch to read-only mode.
  */
-static int replication_connect_quorum = REPLICATION_CONNECT_QUORUM_ALL;
+static bool is_orphan = true;
 
 /* Use the shared instance of xstream for all appliers */
 static struct xstream join_stream;
@@ -135,7 +134,7 @@ static int
 box_check_writable(void)
 {
 	/* box is only writable if box.cfg.read_only == false and */
-	if (is_ro) {
+	if (is_ro || is_orphan) {
 		diag_set(ClientError, ER_READONLY);
 		diag_log();
 		return -1;
@@ -222,6 +221,18 @@ box_is_ro(void)
 	return is_ro;
 }
 
+void
+box_clear_orphan(void)
+{
+	if (!is_orphan)
+		return; /* nothing to do */
+
+	is_orphan = false;
+
+	/* Update the title to reflect the new status. */
+	title("running");
+}
+
 struct wal_stream {
 	struct xstream base;
 	/** How many rows have been recovered so far. */
@@ -389,6 +400,17 @@ box_check_replication_connect_quorum(void)
 	return quorum;
 }
 
+static double
+box_check_replication_sync_lag(void)
+{
+	double lag = cfg_getd_default("replication_sync_lag", TIMEOUT_INFINITY);
+	if (lag < 0) {
+		tnt_raise(ClientError, ER_CFG, "replication_sync_lag",
+			  "the value must be greater or equal to 0");
+	}
+	return lag;
+}
+
 static void
 box_check_instance_uuid(struct tt_uuid *uuid)
 {
@@ -471,6 +493,7 @@ box_check_config()
 	box_check_replication();
 	box_check_replication_timeout();
 	box_check_replication_connect_quorum();
+	box_check_replication_sync_lag();
 	box_check_readahead(cfg_geti("readahead"));
 	box_check_checkpoint_count(cfg_geti("checkpoint_count"));
 	box_check_wal_max_rows(cfg_geti64("rows_per_wal"));
@@ -528,7 +551,7 @@ cfg_get_replication(int *p_count)
  * don't start appliers.
  */
 static void
-box_sync_replication(double timeout, int quorum)
+box_sync_replication(double timeout, bool connect_all)
 {
 	int count = 0;
 	struct applier **appliers = cfg_get_replication(&count);
@@ -540,7 +563,7 @@ box_sync_replication(double timeout, int quorum)
 			applier_delete(appliers[i]); /* doesn't affect diag */
 	});
 
-	replicaset_connect(appliers, count, quorum, timeout);
+	replicaset_connect(appliers, count, timeout, connect_all);
 
 	guard.is_active = false;
 }
@@ -559,8 +582,7 @@ box_set_replication(void)
 
 	box_check_replication();
 	/* Try to connect to all replicas within the timeout period */
-	box_sync_replication(replication_connect_quorum_timeout(),
-			     replication_connect_quorum);
+	box_sync_replication(replication_connect_quorum_timeout(), true);
 	/* Follow replica */
 	replicaset_follow();
 }
@@ -575,6 +597,8 @@ void
 box_set_replication_connect_quorum(void)
 {
 	replication_connect_quorum = box_check_replication_connect_quorum();
+	if (is_box_configured)
+		replicaset_check_quorum();
 }
 
 void
@@ -1551,11 +1575,11 @@ bootstrap_from_master(struct replica *master)
  * Bootstrap a new instance either as the first master in a
  * replica set or as a replica of an existing master.
  *
- * @param[out] start_vclock  the start vector time of the new
- * instance
+ * @param[out] is_bootstrap_leader  set if this instance is
+ *                                  the leader of a new cluster
  */
 static void
-bootstrap(const struct tt_uuid *replicaset_uuid)
+bootstrap(const struct tt_uuid *replicaset_uuid, bool *is_bootstrap_leader)
 {
 	/* Use the first replica by URI as a bootstrap leader */
 	struct replica *master = replicaset_first();
@@ -1572,6 +1596,7 @@ bootstrap(const struct tt_uuid *replicaset_uuid)
 		}
 	} else {
 		bootstrap_master(replicaset_uuid);
+		*is_bootstrap_leader = true;
 	}
 	if (engine_begin_checkpoint() ||
 	    engine_commit_checkpoint(&replicaset.vclock))
@@ -1643,6 +1668,7 @@ box_cfg_xc(void)
 	box_set_too_long_threshold();
 	box_set_replication_timeout();
 	box_set_replication_connect_quorum();
+	replication_sync_lag = box_check_replication_sync_lag();
 	xstream_create(&join_stream, apply_initial_join_row);
 	xstream_create(&subscribe_stream, apply_row);
 
@@ -1671,6 +1697,7 @@ box_cfg_xc(void)
 		 */
 		box_bind();
 	}
+	bool is_bootstrap_leader = false;
 	if (last_checkpoint_lsn >= 0) {
 		struct wal_stream wal_stream;
 		wal_stream_create(&wal_stream, cfg_geti64("rows_per_wal"));
@@ -1711,7 +1738,6 @@ box_cfg_xc(void)
 				&last_checkpoint_vclock);
 
 		engine_begin_final_recovery_xc();
-		title("orphan");
 		recovery_follow_local(recovery, &wal_stream.base, "hot_standby",
 				      cfg_getd("wal_dir_rescan_delay"));
 		title("hot_standby");
@@ -1762,9 +1788,11 @@ box_cfg_xc(void)
 
 		/** Begin listening only when the local recovery is complete. */
 		box_listen();
+
+		title("orphan");
+
 		/* Wait for the cluster to start up */
-		box_sync_replication(TIMEOUT_INFINITY,
-				     replication_connect_quorum);
+		box_sync_replication(replication_connect_quorum_timeout(), false);
 	} else {
 		if (!tt_uuid_is_nil(&instance_uuid))
 			INSTANCE_UUID = instance_uuid;
@@ -1776,6 +1804,8 @@ box_cfg_xc(void)
 		 */
 		box_listen();
 
+		title("orphan");
+
 		/*
 		 * Wait for the cluster to start up.
 		 *
@@ -1784,11 +1814,9 @@ box_cfg_xc(void)
 		 * receive the same replica set UUID when a new cluster
 		 * is deployed.
 		 */
-		box_sync_replication(TIMEOUT_INFINITY,
-				     REPLICATION_CONNECT_QUORUM_ALL);
-
+		box_sync_replication(TIMEOUT_INFINITY, true);
 		/* Bootstrap a new master */
-		bootstrap(&replicaset_uuid);
+		bootstrap(&replicaset_uuid, &is_bootstrap_leader);
 	}
 	fiber_gc();
 
@@ -1811,14 +1839,25 @@ box_cfg_xc(void)
 
 	rmean_cleanup(rmean_box);
 
+	/*
+	 * If this instance is a leader of a newly bootstrapped
+	 * cluster, it is uptodate by definition so leave the
+	 * 'orphan' mode right away to let it initialize cluster
+	 * schema.
+	 */
+	if (is_bootstrap_leader)
+		box_clear_orphan();
+
 	/* Follow replica */
 	replicaset_follow();
 
-	title("running");
 	say_info("ready to accept requests");
 
 	fiber_gc();
 	is_box_configured = true;
+
+	if (!is_bootstrap_leader)
+		replicaset_sync();
 }
 
 void
diff --git a/src/box/box.h b/src/box/box.h
index dc5d8867..730cf572 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -89,6 +89,14 @@ box_set_ro(bool ro);
 bool
 box_is_ro(void);
 
+/**
+ * Switch this instance from 'orphan' to 'running' state.
+ * Called on initial configuration as soon as this instance
+ * synchronizes with enough replicas to form a quorum.
+ */
+void
+box_clear_orphan(void);
+
 /** True if snapshot is in progress. */
 extern bool box_checkpoint_is_in_progress;
 /** Incremented with each next snapshot. */
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 1f42bef8..6b0ca9ce 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -56,7 +56,8 @@ local default_cfg = {
     checkpoint_count    = 2,
     worker_pool_threads = 4,
     replication_timeout = 1,
-    replication_connect_quorum = nil
+    replication_sync_lag = nil, -- no sync
+    replication_connect_quorum = nil, -- connect all
 }
 
 -- types of available options
@@ -110,6 +111,7 @@ local template_cfg = {
     hot_standby         = 'boolean',
     worker_pool_threads = 'number',
     replication_timeout = 'number',
+    replication_sync_lag = 'number',
     replication_connect_quorum = 'number',
 }
 
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 3100b679..2e106a7c 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -47,6 +47,8 @@ struct tt_uuid INSTANCE_UUID;
 struct tt_uuid REPLICASET_UUID;
 
 double replication_timeout = 1.0; /* seconds */
+int replication_connect_quorum = REPLICATION_CONNECT_QUORUM_ALL;
+double replication_sync_lag = TIMEOUT_INFINITY;
 
 struct replicaset replicaset;
 
@@ -64,6 +66,16 @@ rb_gen(static MAYBE_UNUSED, replica_hash_, replica_hash_t,
 	     item != NULL && ((next = replica_hash_next(hash, item)) || 1); \
 	     item = next)
 
+/**
+ * Return the number of replicas that have to be synchronized
+ * in order to form a quorum in the replica set.
+ */
+static inline int
+replicaset_quorum(void)
+{
+	return MIN(replication_connect_quorum, replicaset.applier.total);
+}
+
 void
 replication_init(void)
 {
@@ -73,12 +85,14 @@ replication_init(void)
 	replica_hash_new(&replicaset.hash);
 	rlist_create(&replicaset.anon);
 	vclock_create(&replicaset.vclock);
+	fiber_cond_create(&replicaset.applier.cond);
 }
 
 void
 replication_free(void)
 {
 	mempool_destroy(&replicaset.pool);
+	fiber_cond_destroy(&replicaset.applier.cond);
 }
 
 void
@@ -103,6 +117,9 @@ replica_is_orphan(struct replica *replica)
 	       replica->relay == NULL;
 }
 
+static void
+replica_on_applier_state_f(struct trigger *trigger, void *event);
+
 static struct replica *
 replica_new(void)
 {
@@ -117,7 +134,9 @@ replica_new(void)
 	replica->relay = NULL;
 	replica->gc = NULL;
 	rlist_create(&replica->in_anon);
-	trigger_create(&replica->on_connect, NULL, NULL, NULL);
+	trigger_create(&replica->on_applier_state,
+		       replica_on_applier_state_f, NULL, NULL);
+	replica->is_synced = false;
 	return replica;
 }
 
@@ -182,7 +201,10 @@ static void
 replica_set_applier(struct replica *replica, struct applier *applier)
 {
 	assert(replica->applier == NULL);
+	assert(!replica->is_synced);
 	replica->applier = applier;
+	trigger_add(&replica->applier->on_state,
+		    &replica->on_applier_state);
 }
 
 static void
@@ -190,27 +212,37 @@ replica_clear_applier(struct replica *replica)
 {
 	assert(replica->applier != NULL);
 	replica->applier = NULL;
-	trigger_clear(&replica->on_connect);
+	replica->is_synced = false;
+	trigger_clear(&replica->on_applier_state);
 }
 
 static void
-replica_on_receive_uuid(struct trigger *trigger, void *event)
+replica_on_applier_sync(struct replica *replica)
 {
-	struct replica *replica = container_of(trigger,
-				struct replica, on_connect);
-	struct applier *applier = (struct applier *)event;
+	if (replica->is_synced)
+		return;
 
-	assert(tt_uuid_is_nil(&replica->uuid));
-	assert(replica->applier == applier);
+	replica->is_synced = true;
+	replicaset.applier.synced++;
 
-	if (applier->state != APPLIER_CONNECTED)
-		return;
+	replicaset_check_quorum();
+}
+
+static void
+replica_on_applier_connect(struct replica *replica)
+{
+	struct applier *applier = replica->applier;
 
-	trigger_clear(trigger);
+	if (!tt_uuid_is_nil(&replica->uuid)) {
+		assert(tt_uuid_is_equal(&replica->uuid, &applier->uuid));
+		return;
+	}
 
 	assert(!tt_uuid_is_nil(&applier->uuid));
 	replica->uuid = applier->uuid;
 
+	replicaset.applier.connected++;
+
 	struct replica *orig = replica_hash_search(&replicaset.hash, replica);
 	if (orig != NULL && orig->applier != NULL) {
 		say_error("duplicate connection to the same replica: "
@@ -238,6 +270,37 @@ replica_on_receive_uuid(struct trigger *trigger, void *event)
 	}
 }
 
+static void
+replica_on_applier_state_f(struct trigger *trigger, void *event)
+{
+	(void)event;
+	struct replica *replica = container_of(trigger,
+			struct replica, on_applier_state);
+	switch (replica->applier->state) {
+	case APPLIER_CONNECTED:
+		replica_on_applier_connect(replica);
+		break;
+	case APPLIER_FOLLOW:
+		replica_on_applier_sync(replica);
+		break;
+	case APPLIER_OFF:
+		/*
+		 * Connection to self, duplicate connection
+		 * to the same master, or the applier fiber
+		 * has been cancelled. Assume synced.
+		 */
+		replica_on_applier_sync(replica);
+		break;
+	case APPLIER_STOPPED:
+		/* Unrecoverable error. */
+		replicaset.applier.failed++;
+		break;
+	default:
+		break;
+	}
+	fiber_cond_signal(&replicaset.applier.cond);
+}
+
 /**
  * Update the replica set with new "applier" objects
  * upon reconfiguration of box.cfg.replication.
@@ -251,6 +314,7 @@ replicaset_update(struct applier **appliers, int count)
 	RLIST_HEAD(anon_replicas);
 	struct replica *replica, *next;
 	struct applier *applier;
+	int connected = 0;
 
 	auto uniq_guard = make_scoped_guard([&]{
 		replica_hash_foreach_safe(&uniq, replica, next) {
@@ -275,9 +339,6 @@ replicaset_update(struct applier **appliers, int count)
 			 * when it is finally connected.
 			 */
 			rlist_add_entry(&anon_replicas, replica, in_anon);
-			trigger_create(&replica->on_connect,
-				       replica_on_receive_uuid, NULL, NULL);
-			trigger_add(&applier->on_state, &replica->on_connect);
 			continue;
 		}
 
@@ -289,6 +350,7 @@ replicaset_update(struct applier **appliers, int count)
 				  "duplicate connection to the same replica");
 		}
 		replica_hash_insert(&uniq, replica);
+		connected++;
 	}
 
 	/*
@@ -332,6 +394,11 @@ replicaset_update(struct applier **appliers, int count)
 	}
 	rlist_swap(&replicaset.anon, &anon_replicas);
 
+	replicaset.applier.total = count;
+	replicaset.applier.connected = connected;
+	replicaset.applier.synced = 0;
+	replicaset.applier.failed = 0;
+
 	assert(replica_hash_first(&uniq) == NULL);
 	replica_hash_foreach_safe(&replicaset.hash, replica, next) {
 		if (replica_is_orphan(replica)) {
@@ -382,8 +449,8 @@ applier_on_connect_f(struct trigger *trigger, void *event)
 }
 
 void
-replicaset_connect(struct applier **appliers, int count, int quorum,
-		   double timeout)
+replicaset_connect(struct applier **appliers, int count,
+		   double timeout, bool connect_all)
 {
 	if (count == 0) {
 		/* Cleanup the replica set. */
@@ -391,8 +458,6 @@ replicaset_connect(struct applier **appliers, int count, int quorum,
 		return;
 	}
 
-	quorum = MIN(quorum, count);
-
 	/*
 	 * Simultaneously connect to remote peers to receive their UUIDs
 	 * and fill the resulting set:
@@ -427,15 +492,18 @@ replicaset_connect(struct applier **appliers, int count, int quorum,
 		applier_start(applier);
 	}
 
-	while (state.connected < quorum && count - state.failed >= quorum) {
+	while (state.connected < count) {
 		double wait_start = ev_monotonic_now(loop());
 		if (fiber_cond_wait_timeout(&state.wakeup, timeout) != 0)
 			break;
+		if (state.failed > 0 && connect_all)
+			break;
 		timeout -= ev_monotonic_now(loop()) - wait_start;
 	}
-	if (state.connected < quorum) {
+	if (state.connected < count) {
 		/* Timeout or connection failure. */
-		goto error;
+		if (connect_all)
+			goto error;
 	}
 
 	for (int i = 0; i < count; i++) {
@@ -469,6 +537,11 @@ error:
 void
 replicaset_follow(void)
 {
+	if (replicaset.applier.total == 0) {
+		/* Replication is not configured. */
+		box_clear_orphan();
+		return;
+	}
 	struct replica *replica;
 	replicaset_foreach(replica) {
 		/* Resume connected appliers. */
@@ -482,6 +555,36 @@ replicaset_follow(void)
 }
 
 void
+replicaset_sync(void)
+{
+	int quorum = replicaset_quorum();
+
+	if (replicaset.applier.connected < quorum) {
+		/*
+		 * Not enough replicas connected to form a quorum.
+		 * Do not stall configuration, leave the instance
+		 * in 'orphan' state.
+		 */
+		return;
+	}
+
+	/*
+	 * Wait until a quorum is formed. Abort waiting if
+	 * a quorum cannot be formed because of errors.
+	 */
+	while (replicaset.applier.synced < quorum &&
+	       replicaset.applier.failed <= replicaset.applier.total - quorum)
+		fiber_cond_wait(&replicaset.applier.cond);
+}
+
+void
+replicaset_check_quorum(void)
+{
+	if (replicaset.applier.synced >= replicaset_quorum())
+		box_clear_orphan();
+}
+
+void
 replica_set_relay(struct replica *replica, struct relay *relay)
 {
 	assert(replica->id != REPLICA_ID_NIL);
diff --git a/src/box/replication.h b/src/box/replication.h
index c81b6739..54f809b1 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -37,6 +37,7 @@
 #include <small/rb.h> /* replicaset_t */
 #include <small/rlist.h>
 #include <small/mempool.h>
+#include "fiber_cond.h"
 #include "vclock.h"
 
 /**
@@ -86,12 +87,16 @@
  * and is implemented in @file vclock.h
  */
 
+#include <limits.h>
+
 #if defined(__cplusplus)
 extern "C" {
 #endif /* defined(__cplusplus) */
 
 struct gc_consumer;
 
+static const int REPLICATION_CONNECT_QUORUM_ALL = INT_MAX;
+
 /**
  * Network timeout. Determines how often master and slave exchange
  * heartbeat messages. Set by box.cfg.replication_timeout.
@@ -99,6 +104,19 @@ struct gc_consumer;
 extern double replication_timeout;
 
 /**
+ * Minimal number of replicas to sync for this instance to switch
+ * to the write mode. If set to REPLICATION_CONNECT_QUORUM_ALL,
+ * wait for all configured masters.
+ */
+extern int replication_connect_quorum;
+
+/**
+ * Switch applier from "sync" to "follow" as soon as the replication
+ * lag is less than the value of the following variable.
+ */
+extern double replication_sync_lag;
+
+/**
  * Wait for the given period of time before trying to reconnect
  * to a master.
  */
@@ -165,7 +183,35 @@ struct replicaset {
 	 * of the cluster as maintained by appliers.
 	 */
 	struct vclock vclock;
-
+	/** Applier state. */
+	struct {
+		/**
+		 * Total number of replicas with attached
+		 * appliers.
+		 */
+		int total;
+		/**
+		 * Number of appliers that have successfully
+		 * connected and received their UUIDs.
+		 */
+		int connected;
+		/**
+		 * Number of appliers that have successfully
+		 * synchronized and hence contribute to the
+		 * quorum.
+		 */
+		int synced;
+		/**
+		 * Number of appliers that have been stopped
+		 * due to unrecoverable errors.
+		 */
+		int failed;
+		/**
+		 * Signaled whenever an applier changes its
+		 * state.
+		 */
+		struct fiber_cond cond;
+	} applier;
 };
 extern struct replicaset replicaset;
 
@@ -194,11 +240,14 @@ struct replica {
 	/** Link in the anon_replicas list. */
 	struct rlist in_anon;
 	/**
-	 * Trigger invoked when the applier connects to the
-	 * remote master for the first time. Used to insert
-	 * the replica into the replica set.
+	 * Trigger invoked when the applier changes its state.
 	 */
-	struct trigger on_connect;
+	struct trigger on_applier_state;
+	/**
+	 * Set if the applier has successfully synchornized to
+	 * the master (left "sync" and entered "follow" state).
+	 */
+	bool is_synced;
 };
 
 enum {
@@ -270,7 +319,7 @@ struct replica *
 replicaset_add(uint32_t replica_id, const struct tt_uuid *instance_uuid);
 
 /**
- * Connect \a quorum appliers to remote peers and receive UUID.
+ * Try to connect appliers to remote peers and receive UUID.
  * Appliers that did not connect will connect asynchronously.
  * On success, update the replica set with new appliers.
  * \post appliers are connected to remote hosts and paused.
@@ -279,10 +328,12 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *instance_uuid);
  * \param appliers the array of appliers
  * \param count size of appliers array
  * \param timeout connection timeout
+ * \param connect_all if this flag is set, fail unless all
+ *                    appliers have successfully connected
  */
 void
-replicaset_connect(struct applier **appliers, int count, int quorum,
-		   double timeout);
+replicaset_connect(struct applier **appliers, int count,
+		   double timeout, bool connect_all);
 
 /**
  * Resume all appliers registered with the replica set.
@@ -290,6 +341,21 @@ replicaset_connect(struct applier **appliers, int count, int quorum,
 void
 replicaset_follow(void);
 
+/**
+ * Wait until a replication quorum is formed.
+ * Return immediately if a quorum cannot be
+ * formed because of errors.
+ */
+void
+replicaset_sync(void);
+
+/**
+ * Check if a replication quorum has been formed and
+ * switch the server to the write mode if so.
+ */
+void
+replicaset_check_quorum(void);
+
 #endif /* defined(__cplusplus) */
 
 #endif
diff --git a/src/cfg.c b/src/cfg.c
index eb9b98ce..7c7d6e79 100644
--- a/src/cfg.c
+++ b/src/cfg.c
@@ -107,6 +107,16 @@ cfg_getd(const char *param)
 	return val;
 }
 
+double
+cfg_getd_default(const char *param, double default_val)
+{
+	cfg_get(param);
+	int ok;
+	double val = lua_tonumberx(tarantool_L, -1, &ok);
+	return ok ? val : default_val;
+}
+
+
 int
 cfg_getarr_size(const char *name)
 {
diff --git a/src/cfg.h b/src/cfg.h
index 018d26c1..8499388b 100644
--- a/src/cfg.h
+++ b/src/cfg.h
@@ -52,6 +52,9 @@ cfg_gets(const char *param);
 double
 cfg_getd(const char *param);
 
+double
+cfg_getd_default(const char *param, double default_val);
+
 int
 cfg_getarr_size(const char *name);
 
diff --git a/test/replication/quorum.lua b/test/replication/quorum.lua
index 7c226fa7..5138425a 100644
--- a/test/replication/quorum.lua
+++ b/test/replication/quorum.lua
@@ -1,21 +1,32 @@
 #!/usr/bin/env tarantool
 
--- see autobootstrap_guest.lua
-local USER = 'cluster'
-local PASSWORD = 'somepassword'
+-- get instance name from filename (quorum1.lua => quorum1)
+local INSTANCE_ID = string.match(arg[0], "%d")
+
 local SOCKET_DIR = require('fio').cwd()
 local function instance_uri(instance_id)
-    return SOCKET_DIR..'/autobootstrap_guest'..instance_id..'.sock';
+    --return 'localhost:'..(3310 + instance_id)
+    return SOCKET_DIR..'/quorum'..instance_id..'.sock';
 end
 
+-- start console first
 require('console').listen(os.getenv('ADMIN'))
 
 box.cfg({
-    replication_connect_quorum = 2;
+    listen = instance_uri(INSTANCE_ID);
     replication_timeout = 0.05;
+    replication_sync_lag = 0.01;
+    replication_connect_quorum = 3;
     replication = {
         instance_uri(1);
         instance_uri(2);
         instance_uri(3);
     };
 })
+
+box.once("bootstrap", function()
+    local test_run = require('test_run').new()
+    box.schema.user.grant("guest", 'replication')
+    box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+    box.space.test:create_index('primary')
+end)
diff --git a/test/replication/quorum.result b/test/replication/quorum.result
index 63fbefe6..f6e60aee 100644
--- a/test/replication/quorum.result
+++ b/test/replication/quorum.result
@@ -1,10 +1,7 @@
 test_run = require('test_run').new()
 ---
 ...
-engine = test_run:get_cfg('engine')
----
-...
-SERVERS = {'autobootstrap_guest1', 'autobootstrap_guest2', 'autobootstrap_guest3'}
+SERVERS = {'quorum1', 'quorum2', 'quorum3'}
 ---
 ...
 -- Deploy a cluster.
@@ -14,139 +11,141 @@ test_run:create_cluster(SERVERS)
 test_run:wait_fullmesh(SERVERS)
 ---
 ...
--- Create a new replica and switch to it.
-test_run:cmd('create server test with script "replication/quorum.lua"')
+-- Stop one replica and try to restart another one.
+-- It should successfully restart, but stay in the
+-- 'orphan' mode, which disables write accesses.
+-- There are three ways for the replica to leave the
+-- 'orphan' mode:
+-- * reconfigure replication
+-- * reset box.cfg.replication_connect_quorum
+-- * wait until a quorum is formed asynchronously
+test_run:cmd('stop server quorum1')
 ---
 - true
 ...
-test_run:cmd('start server test')
+test_run:cmd('switch quorum2')
 ---
 - true
 ...
-test_run:cmd('switch test')
+test_run:cmd('restart server quorum2')
+box.info.status -- orphan
 ---
-- true
+- orphan
 ...
--- Stop one master and try to restart the replica.
--- It should successfully restart because it has
--- replication_connect_quorum set to 2 (see quorum.lua)
--- and two other masters are still running.
-test_run:cmd('stop server autobootstrap_guest1')
+box.space.test:replace{100} -- error
 ---
-- true
+- error: Can't modify data because this instance is in read-only mode.
 ...
-test_run:cmd('restart server test')
-fio = require('fio')
+box.cfg{replication={}}
 ---
 ...
-fiber = require('fiber')
+box.info.status -- running
 ---
+- running
 ...
-SERVERS = {'autobootstrap_guest1', 'autobootstrap_guest2', 'autobootstrap_guest3'}
+test_run:cmd('restart server quorum2')
+box.info.status -- orphan
 ---
+- orphan
 ...
-SOCKET_DIR = fio.cwd()
+box.space.test:replace{100} -- error
 ---
+- error: Can't modify data because this instance is in read-only mode.
 ...
-test_run:cmd("setopt delimiter ';'")
+box.cfg{replication_connect_quorum = 2}
 ---
-- true
 ...
-function instance_uri(name)
-    return SOCKET_DIR .. '/' .. name .. '.sock'
-end;
+fiber = require('fiber')
 ---
 ...
-function cfg_replication(servers)
-    local replication = {}
-    for _, srv in ipairs(servers) do
-        table.insert(replication, instance_uri(srv))
-    end
-    box.cfg{replication = replication}
-end;
+while box.info.status == 'orphan' do fiber.sleep(0.001) end
 ---
 ...
-test_run:cmd("setopt delimiter ''");
+box.info.status -- running
 ---
-- true
+- running
+...
+test_run:cmd('restart server quorum2')
+box.info.status -- orphan
+---
+- orphan
 ...
--- Set a stricter value for replication_connect_quorum and
--- check that replication configuration fails.
-box.cfg{replication_connect_quorum = 3}
+box.space.test:replace{100} -- error
 ---
+- error: Can't modify data because this instance is in read-only mode.
 ...
-cfg_replication(SERVERS) -- fail
+test_run:cmd('start server quorum1')
 ---
-- error: 'Incorrect value for option ''replication'': failed to connect to one or
-    more replicas'
+- true
 ...
-box.cfg{replication_connect_quorum = nil} -- default: wait for all
+fiber = require('fiber')
 ---
 ...
-cfg_replication(SERVERS) -- fail
+while box.info.status == 'orphan' do fiber.sleep(0.001) end
 ---
-- error: 'Incorrect value for option ''replication'': failed to connect to one or
-    more replicas'
 ...
--- Lower replication quorum and check that replication
--- configuration succeeds.
-box.cfg{replication_connect_quorum = 2}
+box.info.status -- running
 ---
+- running
 ...
-cfg_replication(SERVERS) -- success
+-- Check that the replica follows all masters.
+box.info.id == 1 or box.info.replication[1].upstream.status == 'follow'
 ---
+- true
 ...
--- Start the master that was down and check that
--- the replica follows it. To do that, we need to
--- stop other masters.
-test_run:cmd('start server autobootstrap_guest1')
+box.info.id == 2 or box.info.replication[2].upstream.status == 'follow'
 ---
 - true
 ...
-test_run:cmd('stop server autobootstrap_guest2')
+box.info.id == 3 or box.info.replication[3].upstream.status == 'follow'
 ---
 - true
 ...
-test_run:cmd('stop server autobootstrap_guest3')
+-- Check that box.cfg() doesn't return until the instance
+-- catches up with all configured replicas.
+test_run:cmd('switch quorum3')
 ---
 - true
 ...
-test_run:cmd('switch autobootstrap_guest1')
+box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.01)
+---
+- ok
+...
+test_run:cmd('switch quorum2')
 ---
 - true
 ...
-box.space.test:auto_increment{'test'}
+box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.01)
 ---
-- [1, 'test']
+- ok
 ...
-test_run:cmd('switch test')
+test_run:cmd('stop server quorum1')
 ---
 - true
 ...
-while box.space.test:count() < 1 do fiber.sleep(0.001) end
+for i = 1, 10 do box.space.test:insert{i} end
 ---
 ...
-box.space.test:select()
+fiber.sleep(0.1)
 ---
-- - [1, 'test']
 ...
--- Cleanup.
-test_run:cmd('switch default')
+test_run:cmd('start server quorum1')
 ---
 - true
 ...
-test_run:cmd('stop server test')
+test_run:cmd('switch quorum1')
 ---
 - true
 ...
-test_run:cmd('cleanup server test')
+box.space.test:count() -- 10
 ---
-- true
+- 10
 ...
-test_run:cmd('stop server autobootstrap_guest1')
+-- Cleanup.
+test_run:cmd('switch default')
 ---
 - true
 ...
-for _, srv in ipairs(SERVERS) do test_run:cmd(string.format('cleanup server %s', srv)) end
+test_run:drop_cluster(SERVERS)
 ---
 ...
diff --git a/test/replication/quorum.test.lua b/test/replication/quorum.test.lua
index 499e4188..44c98b7b 100644
--- a/test/replication/quorum.test.lua
+++ b/test/replication/quorum.test.lua
@@ -1,69 +1,65 @@
 test_run = require('test_run').new()
-engine = test_run:get_cfg('engine')
 
-SERVERS = {'autobootstrap_guest1', 'autobootstrap_guest2', 'autobootstrap_guest3'}
+SERVERS = {'quorum1', 'quorum2', 'quorum3'}
 
 -- Deploy a cluster.
 test_run:create_cluster(SERVERS)
 test_run:wait_fullmesh(SERVERS)
 
--- Create a new replica and switch to it.
-test_run:cmd('create server test with script "replication/quorum.lua"')
-test_run:cmd('start server test')
-test_run:cmd('switch test')
+-- Stop one replica and try to restart another one.
+-- It should successfully restart, but stay in the
+-- 'orphan' mode, which disables write accesses.
+-- There are three ways for the replica to leave the
+-- 'orphan' mode:
+-- * reconfigure replication
+-- * reset box.cfg.replication_connect_quorum
+-- * wait until a quorum is formed asynchronously
+test_run:cmd('stop server quorum1')
 
--- Stop one master and try to restart the replica.
--- It should successfully restart because it has
--- replication_connect_quorum set to 2 (see quorum.lua)
--- and two other masters are still running.
-test_run:cmd('stop server autobootstrap_guest1')
-test_run:cmd('restart server test')
+test_run:cmd('switch quorum2')
 
-fio = require('fio')
+test_run:cmd('restart server quorum2')
+box.info.status -- orphan
+box.space.test:replace{100} -- error
+box.cfg{replication={}}
+box.info.status -- running
+
+test_run:cmd('restart server quorum2')
+box.info.status -- orphan
+box.space.test:replace{100} -- error
+box.cfg{replication_connect_quorum = 2}
 fiber = require('fiber')
+while box.info.status == 'orphan' do fiber.sleep(0.001) end
+box.info.status -- running
 
-SERVERS = {'autobootstrap_guest1', 'autobootstrap_guest2', 'autobootstrap_guest3'}
-SOCKET_DIR = fio.cwd()
-test_run:cmd("setopt delimiter ';'")
-function instance_uri(name)
-    return SOCKET_DIR .. '/' .. name .. '.sock'
-end;
-function cfg_replication(servers)
-    local replication = {}
-    for _, srv in ipairs(servers) do
-        table.insert(replication, instance_uri(srv))
-    end
-    box.cfg{replication = replication}
-end;
-test_run:cmd("setopt delimiter ''");
+test_run:cmd('restart server quorum2')
+box.info.status -- orphan
+box.space.test:replace{100} -- error
+test_run:cmd('start server quorum1')
+fiber = require('fiber')
+while box.info.status == 'orphan' do fiber.sleep(0.001) end
+box.info.status -- running
 
--- Set a stricter value for replication_connect_quorum and
--- check that replication configuration fails.
-box.cfg{replication_connect_quorum = 3}
-cfg_replication(SERVERS) -- fail
-box.cfg{replication_connect_quorum = nil} -- default: wait for all
-cfg_replication(SERVERS) -- fail
+-- Check that the replica follows all masters.
+box.info.id == 1 or box.info.replication[1].upstream.status == 'follow'
+box.info.id == 2 or box.info.replication[2].upstream.status == 'follow'
+box.info.id == 3 or box.info.replication[3].upstream.status == 'follow'
 
--- Lower replication quorum and check that replication
--- configuration succeeds.
-box.cfg{replication_connect_quorum = 2}
-cfg_replication(SERVERS) -- success
+-- Check that box.cfg() doesn't return until the instance
+-- catches up with all configured replicas.
+test_run:cmd('switch quorum3')
+box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.01)
+test_run:cmd('switch quorum2')
+box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.01)
+test_run:cmd('stop server quorum1')
+
+for i = 1, 10 do box.space.test:insert{i} end
+fiber.sleep(0.1)
 
--- Start the master that was down and check that
--- the replica follows it. To do that, we need to
--- stop other masters.
-test_run:cmd('start server autobootstrap_guest1')
-test_run:cmd('stop server autobootstrap_guest2')
-test_run:cmd('stop server autobootstrap_guest3')
-test_run:cmd('switch autobootstrap_guest1')
-box.space.test:auto_increment{'test'}
-test_run:cmd('switch test')
-while box.space.test:count() < 1 do fiber.sleep(0.001) end
-box.space.test:select()
+test_run:cmd('start server quorum1')
+test_run:cmd('switch quorum1')
+box.space.test:count() -- 10
 
 -- Cleanup.
 test_run:cmd('switch default')
-test_run:cmd('stop server test')
-test_run:cmd('cleanup server test')
-test_run:cmd('stop server autobootstrap_guest1')
-for _, srv in ipairs(SERVERS) do test_run:cmd(string.format('cleanup server %s', srv)) end
+test_run:drop_cluster(SERVERS)
diff --git a/test/replication/quorum1.lua b/test/replication/quorum1.lua
new file mode 120000
index 00000000..a827d73a
--- /dev/null
+++ b/test/replication/quorum1.lua
@@ -0,0 +1 @@
+quorum.lua
\ No newline at end of file
diff --git a/test/replication/quorum2.lua b/test/replication/quorum2.lua
new file mode 120000
index 00000000..a827d73a
--- /dev/null
+++ b/test/replication/quorum2.lua
@@ -0,0 +1 @@
+quorum.lua
\ No newline at end of file
diff --git a/test/replication/quorum3.lua b/test/replication/quorum3.lua
new file mode 120000
index 00000000..a827d73a
--- /dev/null
+++ b/test/replication/quorum3.lua
@@ -0,0 +1 @@
+quorum.lua
\ No newline at end of file
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index b8933346..113bcdbf 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
 script =  master.lua
 description = tarantool/box, replication
 disabled = consistent.test.lua
-release_disabled = catch.test.lua errinj.test.lua gc.test.lua
+release_disabled = catch.test.lua errinj.test.lua gc.test.lua quorum.test.lua
 config = suite.cfg
 lua_libs = lua/fast_replica.lua
 long_run = prune.test.lua
-- 
2.11.0




More information about the Tarantool-patches mailing list