[server 2/5] replication: gather all replicaset variables in struct

Vladimir Davydov vdavydov.dev at gmail.com
Wed Jan 24 20:44:51 MSK 2018


There is already a handful of global variables describing the replica
set state and there is going to be more so let's consolidate them in
a singleton struct:

  replicaset => replicaset.hash
  replica_pool => replicaset.pool
  anon_replicas => replicaset.anon
  replicaset_vclock => replicaset.vclock

While we are at it, let's also move INSTANCE_UUID definition from
xrow.c to replication.cc, where it truly belongs. The only reason
I see for it to be defined in xrow.c is to compile vinyl unit tests
without linking replication.o, but we can easily circumvent this by
defining INSTANCE_UUID in vy_iterators_helpers.c.

Suggested by @kostja
---
 src/box/applier.cc              |  14 +++---
 src/box/box.cc                  |  10 ++--
 src/box/lua/info.c              |   8 ++--
 src/box/relay.cc                |   2 +-
 src/box/replication.cc          | 101 +++++++++++++++++-----------------------
 src/box/replication.h           |  42 +++++++++++++----
 src/box/wal.cc                  |   6 +--
 src/box/xrow.c                  |   5 --
 test/unit/vy_iterators_helper.c |   3 ++
 9 files changed, 100 insertions(+), 91 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index f0073bad..f8f4e7e7 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -114,7 +114,7 @@ applier_writer_f(va_list ap)
 			continue;
 		try {
 			struct xrow_header xrow;
-			xrow_encode_vclock(&xrow, &replicaset_vclock);
+			xrow_encode_vclock(&xrow, &replicaset.vclock);
 			coio_write_xrow(&io, &xrow);
 		} catch (SocketError *e) {
 			/*
@@ -261,7 +261,7 @@ applier_join(struct applier *applier)
 		 * Used to initialize the replica's initial
 		 * vclock in bootstrap_from_master()
 		 */
-		xrow_decode_vclock_xc(&row, &replicaset_vclock);
+		xrow_decode_vclock_xc(&row, &replicaset.vclock);
 	}
 
 	applier_set_state(applier, APPLIER_INITIAL_JOIN);
@@ -284,7 +284,7 @@ applier_join(struct applier *applier)
 				 * vclock yet, do it now. In 1.7+
 				 * this vlcock is not used.
 				 */
-				xrow_decode_vclock_xc(&row, &replicaset_vclock);
+				xrow_decode_vclock_xc(&row, &replicaset.vclock);
 			}
 			break; /* end of stream */
 		} else if (iproto_type_is_error(row.type)) {
@@ -313,7 +313,7 @@ applier_join(struct applier *applier)
 		coio_read_xrow(coio, ibuf, &row);
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
-			vclock_follow(&replicaset_vclock, row.replica_id,
+			vclock_follow(&replicaset.vclock, row.replica_id,
 				      row.lsn);
 			xstream_write_xc(applier->subscribe_stream, &row);
 		} else if (row.type == IPROTO_OK) {
@@ -349,7 +349,7 @@ applier_subscribe(struct applier *applier)
 	struct xrow_header row;
 
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
-				 &replicaset_vclock);
+				 &replicaset.vclock);
 	coio_write_xrow(coio, &row);
 
 	if (applier->state == APPLIER_READY) {
@@ -437,7 +437,7 @@ applier_subscribe(struct applier *applier)
 		applier->lag = ev_now(loop()) - row.tm;
 		applier->last_row_time = ev_monotonic_now(loop());
 
-		if (vclock_get(&replicaset_vclock, row.replica_id) < row.lsn) {
+		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
 			/**
 			 * Promote the replica set vclock before
 			 * applying the row. If there is an
@@ -445,7 +445,7 @@ applier_subscribe(struct applier *applier)
 			 * the row is skipped when the replication
 			 * is resumed.
 			 */
-			vclock_follow(&replicaset_vclock, row.replica_id,
+			vclock_follow(&replicaset.vclock, row.replica_id,
 				      row.lsn);
 			xstream_write_xc(applier->subscribe_stream, &row);
 		}
diff --git a/src/box/box.cc b/src/box/box.cc
index c7eb22af..b80ad4a1 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1531,7 +1531,7 @@ bootstrap_from_master(struct replica *master)
 	 */
 	engine_begin_final_recovery_xc();
 	struct recovery_journal journal;
-	recovery_journal_create(&journal, &replicaset_vclock);
+	recovery_journal_create(&journal, &replicaset.vclock);
 	journal_set(&journal.base);
 
 	applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
@@ -1574,7 +1574,7 @@ bootstrap(const struct tt_uuid *replicaset_uuid)
 		bootstrap_master(replicaset_uuid);
 	}
 	if (engine_begin_checkpoint() ||
-	    engine_commit_checkpoint(&replicaset_vclock))
+	    engine_commit_checkpoint(&replicaset.vclock))
 		panic("failed to create a checkpoint");
 }
 
@@ -1755,10 +1755,10 @@ box_cfg_xc(void)
 		/*
 		 * Initialize the replica set vclock from recovery.
 		 * The local WAL may contain rows from remote masters,
-		 * so we must reflect this in replicaset_vclock to
+		 * so we must reflect this in replicaset vclock to
 		 * not attempt to apply these rows twice.
 		 */
-		vclock_copy(&replicaset_vclock, &recovery->vclock);
+		vclock_copy(&replicaset.vclock, &recovery->vclock);
 
 		/** Begin listening only when the local recovery is complete. */
 		box_listen();
@@ -1807,7 +1807,7 @@ box_cfg_xc(void)
 	int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
 	wal_init(wal_mode, cfg_gets("wal_dir"), &INSTANCE_UUID,
-		 &replicaset_vclock, wal_max_rows, wal_max_size);
+		 &replicaset.vclock, wal_max_rows, wal_max_size);
 
 	rmean_cleanup(rmean_box);
 
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index f5efbb27..8e8fd9d9 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -136,7 +136,7 @@ lbox_pushreplica(lua_State *L, struct replica *replica)
 	lua_settable(L, -3);
 
 	lua_pushstring(L, "lsn");
-	luaL_pushuint64(L, vclock_get(&replicaset_vclock, replica->id));
+	luaL_pushuint64(L, vclock_get(&replicaset.vclock, replica->id));
 	lua_settable(L, -3);
 
 	if (applier != NULL && applier->state != APPLIER_OFF) {
@@ -207,7 +207,7 @@ lbox_info_lsn(struct lua_State *L)
 	/* See comments in lbox_info_id */
 	struct replica *self = replica_by_uuid(&INSTANCE_UUID);
 	if (self != NULL && self->id != REPLICA_ID_NIL) {
-		luaL_pushint64(L, vclock_get(&replicaset_vclock, self->id));
+		luaL_pushint64(L, vclock_get(&replicaset.vclock, self->id));
 	} else {
 		luaL_pushint64(L, -1);
 	}
@@ -217,7 +217,7 @@ lbox_info_lsn(struct lua_State *L)
 static int
 lbox_info_signature(struct lua_State *L)
 {
-	luaL_pushint64(L, vclock_sum(&replicaset_vclock));
+	luaL_pushint64(L, vclock_sum(&replicaset.vclock));
 	return 1;
 }
 
@@ -253,7 +253,7 @@ lbox_info_server(struct lua_State *L)
 static int
 lbox_info_vclock(struct lua_State *L)
 {
-	lbox_pushvclock(L, &replicaset_vclock);
+	lbox_pushvclock(L, &replicaset.vclock);
 	return 1;
 }
 
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 7102a573..967874e7 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -236,7 +236,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
 		diag_raise();
 
 	ERROR_INJECT(ERRINJ_RELAY_FINAL_SLEEP, {
-		while (vclock_compare(stop_vclock, &replicaset_vclock) == 0)
+		while (vclock_compare(stop_vclock, &replicaset.vclock) == 0)
 			fiber_sleep(0.001);
 	});
 }
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 498b9269..6e35a396 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -42,19 +42,13 @@
 #include "error.h"
 #include "vclock.h" /* VCLOCK_MAX */
 
-struct vclock replicaset_vclock;
 uint32_t instance_id = REPLICA_ID_NIL;
-/**
- * Globally unique identifier of this replica set.
- * A replica set is a set of appliers and their matching
- * relays, usually connected in full mesh.
- */
+struct tt_uuid INSTANCE_UUID;
 struct tt_uuid REPLICASET_UUID;
 
 double replication_timeout = 1.0; /* seconds */
 
-typedef rb_tree(struct replica) replicaset_t;
-rb_proto(, replicaset_, replicaset_t, struct replica)
+struct replicaset replicaset;
 
 static int
 replica_compare_by_uuid(const struct replica *a, const struct replica *b)
@@ -62,38 +56,29 @@ replica_compare_by_uuid(const struct replica *a, const struct replica *b)
 	return tt_uuid_compare(&a->uuid, &b->uuid);
 }
 
-rb_gen(, replicaset_, replicaset_t, struct replica, link,
-       replica_compare_by_uuid);
+rb_gen(static MAYBE_UNUSED, replica_hash_, replica_hash_t,
+       struct replica, in_hash, replica_compare_by_uuid);
 
-#define replicaset_foreach_safe(set, item, next) \
-	for (item = replicaset_first(set); \
-	     item != NULL && ((next = replicaset_next(set, item)) || 1); \
+#define replica_hash_foreach_safe(hash, item, next) \
+	for (item = replica_hash_first(hash); \
+	     item != NULL && ((next = replica_hash_next(hash, item)) || 1); \
 	     item = next)
 
-static struct mempool replica_pool;
-static replicaset_t replicaset;
-
-/**
- * List of replicas that haven't received a UUID.
- * It contains both replicas that are still trying
- * to connect and those that failed to connect.
- */
-static RLIST_HEAD(anon_replicas);
-
 void
 replication_init(void)
 {
-	mempool_create(&replica_pool, &cord()->slabc,
-		       sizeof(struct replica));
 	memset(&replicaset, 0, sizeof(replicaset));
-	replicaset_new(&replicaset);
-	vclock_create(&replicaset_vclock);
+	mempool_create(&replicaset.pool, &cord()->slabc,
+		       sizeof(struct replica));
+	replica_hash_new(&replicaset.hash);
+	rlist_create(&replicaset.anon);
+	vclock_create(&replicaset.vclock);
 }
 
 void
 replication_free(void)
 {
-	mempool_destroy(&replica_pool);
+	mempool_destroy(&replicaset.pool);
 }
 
 void
@@ -121,7 +106,8 @@ replica_is_orphan(struct replica *replica)
 static struct replica *
 replica_new(void)
 {
-	struct replica *replica = (struct replica *) mempool_alloc(&replica_pool);
+	struct replica *replica = (struct replica *)
+			mempool_alloc(&replicaset.pool);
 	if (replica == NULL)
 		tnt_raise(OutOfMemory, sizeof(*replica), "malloc",
 			  "struct replica");
@@ -141,7 +127,7 @@ replica_delete(struct replica *replica)
 	assert(replica_is_orphan(replica));
 	if (replica->gc != NULL)
 		gc_consumer_unregister(replica->gc);
-	mempool_free(&replica_pool, replica);
+	mempool_free(&replicaset.pool, replica);
 }
 
 struct replica *
@@ -153,7 +139,7 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid)
 	assert(replica_by_uuid(replica_uuid) == NULL);
 	struct replica *replica = replica_new();
 	replica->uuid = *replica_uuid;
-	replicaset_insert(&replicaset, replica);
+	replica_hash_insert(&replicaset.hash, replica);
 	replica_set_id(replica, replica_id);
 	return replica;
 }
@@ -187,7 +173,7 @@ replica_clear_id(struct replica *replica)
 	 */
 	replica->id = REPLICA_ID_NIL;
 	if (replica_is_orphan(replica)) {
-		replicaset_remove(&replicaset, replica);
+		replica_hash_remove(&replicaset.hash, replica);
 		replica_delete(replica);
 	}
 }
@@ -210,7 +196,7 @@ replica_on_receive_uuid(struct trigger *trigger, void *event)
 	assert(!tt_uuid_is_nil(&applier->uuid));
 	replica->uuid = applier->uuid;
 
-	struct replica *orig = replicaset_search(&replicaset, replica);
+	struct replica *orig = replica_hash_search(&replicaset.hash, replica);
 	if (orig != NULL && orig->applier != NULL) {
 		say_error("duplicate connection to the same replica: "
 			  "instance uuid %s, addr1 %s, addr2 %s",
@@ -233,7 +219,7 @@ replica_on_receive_uuid(struct trigger *trigger, void *event)
 		replica_delete(replica);
 	} else {
 		/* Add a new struct replica */
-		replicaset_insert(&replicaset, replica);
+		replica_hash_insert(&replicaset.hash, replica);
 	}
 }
 
@@ -244,15 +230,15 @@ replica_on_receive_uuid(struct trigger *trigger, void *event)
 static void
 replicaset_update(struct applier **appliers, int count)
 {
-	replicaset_t uniq;
+	replica_hash_t uniq;
 	memset(&uniq, 0, sizeof(uniq));
-	replicaset_new(&uniq);
-	RLIST_HEAD(anon_replicas_new);
+	replica_hash_new(&uniq);
+	RLIST_HEAD(anon_replicas);
 	struct replica *replica, *next;
 
 	auto uniq_guard = make_scoped_guard([&]{
-		replicaset_foreach_safe(&uniq, replica, next) {
-			replicaset_remove(&uniq, replica);
+		replica_hash_foreach_safe(&uniq, replica, next) {
+			replica_hash_remove(&uniq, replica);
 			replica_delete(replica);
 		}
 	});
@@ -272,7 +258,7 @@ replicaset_update(struct applier **appliers, int count)
 			 * that will insert it into the replica set
 			 * when it is finally connected.
 			 */
-			rlist_add_entry(&anon_replicas_new, replica, in_anon);
+			rlist_add_entry(&anon_replicas, replica, in_anon);
 			trigger_create(&replica->on_connect,
 				       replica_on_receive_uuid, NULL, NULL);
 			trigger_add(&applier->on_state, &replica->on_connect);
@@ -282,11 +268,11 @@ replicaset_update(struct applier **appliers, int count)
 		assert(!tt_uuid_is_nil(&applier->uuid));
 		replica->uuid = applier->uuid;
 
-		if (replicaset_search(&uniq, replica) != NULL) {
+		if (replica_hash_search(&uniq, replica) != NULL) {
 			tnt_raise(ClientError, ER_CFG, "replication",
 				  "duplicate connection to the same replica");
 		}
-		replicaset_insert(&uniq, replica);
+		replica_hash_insert(&uniq, replica);
 	}
 
 	/*
@@ -302,20 +288,21 @@ replicaset_update(struct applier **appliers, int count)
 		applier_delete(replica->applier);
 		replica->applier = NULL;
 	}
-	rlist_foreach_entry_safe(replica, &anon_replicas, in_anon, next) {
+	rlist_foreach_entry_safe(replica, &replicaset.anon, in_anon, next) {
 		assert(replica->applier != NULL);
 		applier_stop(replica->applier);
 		applier_delete(replica->applier);
 		replica->applier = NULL;
 		replica_delete(replica);
 	}
-	rlist_create(&anon_replicas);
+	rlist_create(&replicaset.anon);
 
 	/* Save new appliers */
-	replicaset_foreach_safe(&uniq, replica, next) {
-		replicaset_remove(&uniq, replica);
+	replica_hash_foreach_safe(&uniq, replica, next) {
+		replica_hash_remove(&uniq, replica);
 
-		struct replica *orig = replicaset_search(&replicaset, replica);
+		struct replica *orig = replica_hash_search(&replicaset.hash,
+							   replica);
 		if (orig != NULL) {
 			/* Use existing struct replica */
 			orig->applier = replica->applier;
@@ -326,15 +313,15 @@ replicaset_update(struct applier **appliers, int count)
 			replica = orig;
 		} else {
 			/* Add a new struct replica */
-			replicaset_insert(&replicaset, replica);
+			replica_hash_insert(&replicaset.hash, replica);
 		}
 	}
-	rlist_swap(&anon_replicas, &anon_replicas_new);
+	rlist_swap(&replicaset.anon, &anon_replicas);
 
-	assert(replicaset_first(&uniq) == NULL);
-	replicaset_foreach_safe(&replicaset, replica, next) {
+	assert(replica_hash_first(&uniq) == NULL);
+	replica_hash_foreach_safe(&replicaset.hash, replica, next) {
 		if (replica_is_orphan(replica)) {
-			replicaset_remove(&replicaset, replica);
+			replica_hash_remove(&replicaset.hash, replica);
 			replica_delete(replica);
 		}
 	}
@@ -474,7 +461,7 @@ replicaset_follow(void)
 		if (replica->applier != NULL)
 			applier_resume(replica->applier);
 	}
-	rlist_foreach_entry(replica, &anon_replicas, in_anon) {
+	rlist_foreach_entry(replica, &replicaset.anon, in_anon) {
 		/* Restart appliers that failed to connect. */
 		applier_start(replica->applier);
 	}
@@ -494,7 +481,7 @@ replica_clear_relay(struct replica *replica)
 	assert(replica->relay != NULL);
 	replica->relay = NULL;
 	if (replica_is_orphan(replica)) {
-		replicaset_remove(&replicaset, replica);
+		replica_hash_remove(&replicaset.hash, replica);
 		replica_delete(replica);
 	}
 }
@@ -502,13 +489,13 @@ replica_clear_relay(struct replica *replica)
 struct replica *
 replicaset_first(void)
 {
-	return replicaset_first(&replicaset);
+	return replica_hash_first(&replicaset.hash);
 }
 
 struct replica *
 replicaset_next(struct replica *replica)
 {
-	return replicaset_next(&replicaset, replica);
+	return replica_hash_next(&replicaset.hash, replica);
 }
 
 struct replica *
@@ -516,5 +503,5 @@ replica_by_uuid(const struct tt_uuid *uuid)
 {
 	struct replica key;
 	key.uuid = *uuid;
-	return replicaset_search(&replicaset, &key);
+	return replica_hash_search(&replicaset.hash, &key);
 }
diff --git a/src/box/replication.h b/src/box/replication.h
index 5f192b20..c81b6739 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -36,6 +36,8 @@
 #define RB_COMPACT 1
 #include <small/rb.h> /* replicaset_t */
 #include <small/rlist.h>
+#include <small/mempool.h>
+#include "vclock.h"
 
 /**
  * @module replication - global state of multi-master
@@ -132,25 +134,47 @@ replication_init(void);
 void
 replication_free(void);
 
-/** Instance id vclock identifier */
+/** Instance id vclock identifier. */
 extern uint32_t instance_id;
-/**
- * tx-thread local vclock reflecting the
- * state of the cluster, as maintained by the appliers.
- */
-extern struct vclock replicaset_vclock;
-
 /** UUID of the instance. */
 extern struct tt_uuid INSTANCE_UUID;
 /** UUID of the replica set. */
 extern struct tt_uuid REPLICASET_UUID;
 
+typedef rb_tree(struct replica) replica_hash_t;
+
+/**
+ * Replica set state.
+ *
+ * A replica set is a set of appliers and their matching
+ * relays, usually connected in full mesh.
+ */
+struct replicaset {
+	/** Memory pool for struct replica allocations. */
+	struct mempool pool;
+	/** Hash of replicas indexed by UUID. */
+	replica_hash_t hash;
+	/**
+	 * List of replicas that haven't received a UUID.
+	 * It contains both replicas that are still trying
+	 * to connect and those that failed to connect.
+	 */
+	struct rlist anon;
+	/**
+	 * TX thread local vclock reflecting the state
+	 * of the cluster as maintained by appliers.
+	 */
+	struct vclock vclock;
+
+};
+extern struct replicaset replicaset;
+
 /**
  * Summary information about a replica in the replica set.
  */
 struct replica {
-	/** Link in a replica set. */
-	rb_node(struct replica) link;
+	/** Link in replicaset::hash. */
+	rb_node(struct replica) in_hash;
 	/**
 	 * Replica UUID or nil if the replica or nil if the
 	 * applier has not received from the master yet.
diff --git a/src/box/wal.cc b/src/box/wal.cc
index c1238c84..4576cfe0 100644
--- a/src/box/wal.cc
+++ b/src/box/wal.cc
@@ -770,7 +770,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 			 * and promote vclock.
 			 */
 			if ((*last)->replica_id == instance_id) {
-				vclock_follow(&replicaset_vclock, instance_id,
+				vclock_follow(&replicaset.vclock, instance_id,
 					      (*last)->lsn);
 				break;
 			}
@@ -786,11 +786,11 @@ wal_write_in_wal_mode_none(struct journal *journal,
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 	wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
-	int64_t old_lsn = vclock_get(&replicaset_vclock, instance_id);
+	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
 	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
 	if (new_lsn > old_lsn) {
 		/* There were local writes, promote vclock. */
-		vclock_follow(&replicaset_vclock, instance_id, new_lsn);
+		vclock_follow(&replicaset.vclock, instance_id, new_lsn);
 	}
 	return vclock_sum(&writer->vclock);
 }
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 9bc6f4c1..cdc26ebc 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -43,11 +43,6 @@
 #include "scramble.h"
 #include "iproto_constants.h"
 
-/**
- * Globally unique identifier of this instance.
- */
-struct tt_uuid INSTANCE_UUID;
-
 int
 xrow_header_decode(struct xrow_header *header, const char **pos,
 		   const char *end)
diff --git a/test/unit/vy_iterators_helper.c b/test/unit/vy_iterators_helper.c
index 8e94d773..a35f4ef9 100644
--- a/test/unit/vy_iterators_helper.c
+++ b/test/unit/vy_iterators_helper.c
@@ -1,8 +1,11 @@
 #include "vy_iterators_helper.h"
 #include "memory.h"
 #include "fiber.h"
+#include "tt_uuid.h"
 #include "say.h"
 
+struct tt_uuid INSTANCE_UUID;
+
 struct tuple_format *vy_key_format = NULL;
 struct vy_mem_env mem_env;
 struct vy_cache_env cache_env;
-- 
2.11.0




More information about the Tarantool-patches mailing list