[server 2/5] replication: gather all replicaset variables in struct
Vladimir Davydov
vdavydov.dev at gmail.com
Wed Jan 24 20:44:51 MSK 2018
There is already a handful of global variables describing the replica
set state and there is going to be more so let's consolidate them in
a singleton struct:
replicaset => replicaset.hash
replica_pool => replicaset.pool
anon_replicas => replicaset.anon
replicaset_vclock => replicaset.vclock
While we are at it, let's also move INSTANCE_UUID definition from
xrow.c to replication.cc, where it truly belongs. The only reason
I see for it to be defined in xrow.c is to compile vinyl unit tests
without linking replication.o, but we can easily circumvent this by
defining INSTANCE_UUID in vy_iterators_helpers.c.
Suggested by @kostja
---
src/box/applier.cc | 14 +++---
src/box/box.cc | 10 ++--
src/box/lua/info.c | 8 ++--
src/box/relay.cc | 2 +-
src/box/replication.cc | 101 +++++++++++++++++-----------------------
src/box/replication.h | 42 +++++++++++++----
src/box/wal.cc | 6 +--
src/box/xrow.c | 5 --
test/unit/vy_iterators_helper.c | 3 ++
9 files changed, 100 insertions(+), 91 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index f0073bad..f8f4e7e7 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -114,7 +114,7 @@ applier_writer_f(va_list ap)
continue;
try {
struct xrow_header xrow;
- xrow_encode_vclock(&xrow, &replicaset_vclock);
+ xrow_encode_vclock(&xrow, &replicaset.vclock);
coio_write_xrow(&io, &xrow);
} catch (SocketError *e) {
/*
@@ -261,7 +261,7 @@ applier_join(struct applier *applier)
* Used to initialize the replica's initial
* vclock in bootstrap_from_master()
*/
- xrow_decode_vclock_xc(&row, &replicaset_vclock);
+ xrow_decode_vclock_xc(&row, &replicaset.vclock);
}
applier_set_state(applier, APPLIER_INITIAL_JOIN);
@@ -284,7 +284,7 @@ applier_join(struct applier *applier)
* vclock yet, do it now. In 1.7+
* this vlcock is not used.
*/
- xrow_decode_vclock_xc(&row, &replicaset_vclock);
+ xrow_decode_vclock_xc(&row, &replicaset.vclock);
}
break; /* end of stream */
} else if (iproto_type_is_error(row.type)) {
@@ -313,7 +313,7 @@ applier_join(struct applier *applier)
coio_read_xrow(coio, ibuf, &row);
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
- vclock_follow(&replicaset_vclock, row.replica_id,
+ vclock_follow(&replicaset.vclock, row.replica_id,
row.lsn);
xstream_write_xc(applier->subscribe_stream, &row);
} else if (row.type == IPROTO_OK) {
@@ -349,7 +349,7 @@ applier_subscribe(struct applier *applier)
struct xrow_header row;
xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
- &replicaset_vclock);
+ &replicaset.vclock);
coio_write_xrow(coio, &row);
if (applier->state == APPLIER_READY) {
@@ -437,7 +437,7 @@ applier_subscribe(struct applier *applier)
applier->lag = ev_now(loop()) - row.tm;
applier->last_row_time = ev_monotonic_now(loop());
- if (vclock_get(&replicaset_vclock, row.replica_id) < row.lsn) {
+ if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
/**
* Promote the replica set vclock before
* applying the row. If there is an
@@ -445,7 +445,7 @@ applier_subscribe(struct applier *applier)
* the row is skipped when the replication
* is resumed.
*/
- vclock_follow(&replicaset_vclock, row.replica_id,
+ vclock_follow(&replicaset.vclock, row.replica_id,
row.lsn);
xstream_write_xc(applier->subscribe_stream, &row);
}
diff --git a/src/box/box.cc b/src/box/box.cc
index c7eb22af..b80ad4a1 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1531,7 +1531,7 @@ bootstrap_from_master(struct replica *master)
*/
engine_begin_final_recovery_xc();
struct recovery_journal journal;
- recovery_journal_create(&journal, &replicaset_vclock);
+ recovery_journal_create(&journal, &replicaset.vclock);
journal_set(&journal.base);
applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
@@ -1574,7 +1574,7 @@ bootstrap(const struct tt_uuid *replicaset_uuid)
bootstrap_master(replicaset_uuid);
}
if (engine_begin_checkpoint() ||
- engine_commit_checkpoint(&replicaset_vclock))
+ engine_commit_checkpoint(&replicaset.vclock))
panic("failed to create a checkpoint");
}
@@ -1755,10 +1755,10 @@ box_cfg_xc(void)
/*
* Initialize the replica set vclock from recovery.
* The local WAL may contain rows from remote masters,
- * so we must reflect this in replicaset_vclock to
+ * so we must reflect this in replicaset vclock to
* not attempt to apply these rows twice.
*/
- vclock_copy(&replicaset_vclock, &recovery->vclock);
+ vclock_copy(&replicaset.vclock, &recovery->vclock);
/** Begin listening only when the local recovery is complete. */
box_listen();
@@ -1807,7 +1807,7 @@ box_cfg_xc(void)
int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
wal_init(wal_mode, cfg_gets("wal_dir"), &INSTANCE_UUID,
- &replicaset_vclock, wal_max_rows, wal_max_size);
+ &replicaset.vclock, wal_max_rows, wal_max_size);
rmean_cleanup(rmean_box);
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index f5efbb27..8e8fd9d9 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -136,7 +136,7 @@ lbox_pushreplica(lua_State *L, struct replica *replica)
lua_settable(L, -3);
lua_pushstring(L, "lsn");
- luaL_pushuint64(L, vclock_get(&replicaset_vclock, replica->id));
+ luaL_pushuint64(L, vclock_get(&replicaset.vclock, replica->id));
lua_settable(L, -3);
if (applier != NULL && applier->state != APPLIER_OFF) {
@@ -207,7 +207,7 @@ lbox_info_lsn(struct lua_State *L)
/* See comments in lbox_info_id */
struct replica *self = replica_by_uuid(&INSTANCE_UUID);
if (self != NULL && self->id != REPLICA_ID_NIL) {
- luaL_pushint64(L, vclock_get(&replicaset_vclock, self->id));
+ luaL_pushint64(L, vclock_get(&replicaset.vclock, self->id));
} else {
luaL_pushint64(L, -1);
}
@@ -217,7 +217,7 @@ lbox_info_lsn(struct lua_State *L)
static int
lbox_info_signature(struct lua_State *L)
{
- luaL_pushint64(L, vclock_sum(&replicaset_vclock));
+ luaL_pushint64(L, vclock_sum(&replicaset.vclock));
return 1;
}
@@ -253,7 +253,7 @@ lbox_info_server(struct lua_State *L)
static int
lbox_info_vclock(struct lua_State *L)
{
- lbox_pushvclock(L, &replicaset_vclock);
+ lbox_pushvclock(L, &replicaset.vclock);
return 1;
}
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 7102a573..967874e7 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -236,7 +236,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
diag_raise();
ERROR_INJECT(ERRINJ_RELAY_FINAL_SLEEP, {
- while (vclock_compare(stop_vclock, &replicaset_vclock) == 0)
+ while (vclock_compare(stop_vclock, &replicaset.vclock) == 0)
fiber_sleep(0.001);
});
}
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 498b9269..6e35a396 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -42,19 +42,13 @@
#include "error.h"
#include "vclock.h" /* VCLOCK_MAX */
-struct vclock replicaset_vclock;
uint32_t instance_id = REPLICA_ID_NIL;
-/**
- * Globally unique identifier of this replica set.
- * A replica set is a set of appliers and their matching
- * relays, usually connected in full mesh.
- */
+struct tt_uuid INSTANCE_UUID;
struct tt_uuid REPLICASET_UUID;
double replication_timeout = 1.0; /* seconds */
-typedef rb_tree(struct replica) replicaset_t;
-rb_proto(, replicaset_, replicaset_t, struct replica)
+struct replicaset replicaset;
static int
replica_compare_by_uuid(const struct replica *a, const struct replica *b)
@@ -62,38 +56,29 @@ replica_compare_by_uuid(const struct replica *a, const struct replica *b)
return tt_uuid_compare(&a->uuid, &b->uuid);
}
-rb_gen(, replicaset_, replicaset_t, struct replica, link,
- replica_compare_by_uuid);
+rb_gen(static MAYBE_UNUSED, replica_hash_, replica_hash_t,
+ struct replica, in_hash, replica_compare_by_uuid);
-#define replicaset_foreach_safe(set, item, next) \
- for (item = replicaset_first(set); \
- item != NULL && ((next = replicaset_next(set, item)) || 1); \
+#define replica_hash_foreach_safe(hash, item, next) \
+ for (item = replica_hash_first(hash); \
+ item != NULL && ((next = replica_hash_next(hash, item)) || 1); \
item = next)
-static struct mempool replica_pool;
-static replicaset_t replicaset;
-
-/**
- * List of replicas that haven't received a UUID.
- * It contains both replicas that are still trying
- * to connect and those that failed to connect.
- */
-static RLIST_HEAD(anon_replicas);
-
void
replication_init(void)
{
- mempool_create(&replica_pool, &cord()->slabc,
- sizeof(struct replica));
memset(&replicaset, 0, sizeof(replicaset));
- replicaset_new(&replicaset);
- vclock_create(&replicaset_vclock);
+ mempool_create(&replicaset.pool, &cord()->slabc,
+ sizeof(struct replica));
+ replica_hash_new(&replicaset.hash);
+ rlist_create(&replicaset.anon);
+ vclock_create(&replicaset.vclock);
}
void
replication_free(void)
{
- mempool_destroy(&replica_pool);
+ mempool_destroy(&replicaset.pool);
}
void
@@ -121,7 +106,8 @@ replica_is_orphan(struct replica *replica)
static struct replica *
replica_new(void)
{
- struct replica *replica = (struct replica *) mempool_alloc(&replica_pool);
+ struct replica *replica = (struct replica *)
+ mempool_alloc(&replicaset.pool);
if (replica == NULL)
tnt_raise(OutOfMemory, sizeof(*replica), "malloc",
"struct replica");
@@ -141,7 +127,7 @@ replica_delete(struct replica *replica)
assert(replica_is_orphan(replica));
if (replica->gc != NULL)
gc_consumer_unregister(replica->gc);
- mempool_free(&replica_pool, replica);
+ mempool_free(&replicaset.pool, replica);
}
struct replica *
@@ -153,7 +139,7 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid)
assert(replica_by_uuid(replica_uuid) == NULL);
struct replica *replica = replica_new();
replica->uuid = *replica_uuid;
- replicaset_insert(&replicaset, replica);
+ replica_hash_insert(&replicaset.hash, replica);
replica_set_id(replica, replica_id);
return replica;
}
@@ -187,7 +173,7 @@ replica_clear_id(struct replica *replica)
*/
replica->id = REPLICA_ID_NIL;
if (replica_is_orphan(replica)) {
- replicaset_remove(&replicaset, replica);
+ replica_hash_remove(&replicaset.hash, replica);
replica_delete(replica);
}
}
@@ -210,7 +196,7 @@ replica_on_receive_uuid(struct trigger *trigger, void *event)
assert(!tt_uuid_is_nil(&applier->uuid));
replica->uuid = applier->uuid;
- struct replica *orig = replicaset_search(&replicaset, replica);
+ struct replica *orig = replica_hash_search(&replicaset.hash, replica);
if (orig != NULL && orig->applier != NULL) {
say_error("duplicate connection to the same replica: "
"instance uuid %s, addr1 %s, addr2 %s",
@@ -233,7 +219,7 @@ replica_on_receive_uuid(struct trigger *trigger, void *event)
replica_delete(replica);
} else {
/* Add a new struct replica */
- replicaset_insert(&replicaset, replica);
+ replica_hash_insert(&replicaset.hash, replica);
}
}
@@ -244,15 +230,15 @@ replica_on_receive_uuid(struct trigger *trigger, void *event)
static void
replicaset_update(struct applier **appliers, int count)
{
- replicaset_t uniq;
+ replica_hash_t uniq;
memset(&uniq, 0, sizeof(uniq));
- replicaset_new(&uniq);
- RLIST_HEAD(anon_replicas_new);
+ replica_hash_new(&uniq);
+ RLIST_HEAD(anon_replicas);
struct replica *replica, *next;
auto uniq_guard = make_scoped_guard([&]{
- replicaset_foreach_safe(&uniq, replica, next) {
- replicaset_remove(&uniq, replica);
+ replica_hash_foreach_safe(&uniq, replica, next) {
+ replica_hash_remove(&uniq, replica);
replica_delete(replica);
}
});
@@ -272,7 +258,7 @@ replicaset_update(struct applier **appliers, int count)
* that will insert it into the replica set
* when it is finally connected.
*/
- rlist_add_entry(&anon_replicas_new, replica, in_anon);
+ rlist_add_entry(&anon_replicas, replica, in_anon);
trigger_create(&replica->on_connect,
replica_on_receive_uuid, NULL, NULL);
trigger_add(&applier->on_state, &replica->on_connect);
@@ -282,11 +268,11 @@ replicaset_update(struct applier **appliers, int count)
assert(!tt_uuid_is_nil(&applier->uuid));
replica->uuid = applier->uuid;
- if (replicaset_search(&uniq, replica) != NULL) {
+ if (replica_hash_search(&uniq, replica) != NULL) {
tnt_raise(ClientError, ER_CFG, "replication",
"duplicate connection to the same replica");
}
- replicaset_insert(&uniq, replica);
+ replica_hash_insert(&uniq, replica);
}
/*
@@ -302,20 +288,21 @@ replicaset_update(struct applier **appliers, int count)
applier_delete(replica->applier);
replica->applier = NULL;
}
- rlist_foreach_entry_safe(replica, &anon_replicas, in_anon, next) {
+ rlist_foreach_entry_safe(replica, &replicaset.anon, in_anon, next) {
assert(replica->applier != NULL);
applier_stop(replica->applier);
applier_delete(replica->applier);
replica->applier = NULL;
replica_delete(replica);
}
- rlist_create(&anon_replicas);
+ rlist_create(&replicaset.anon);
/* Save new appliers */
- replicaset_foreach_safe(&uniq, replica, next) {
- replicaset_remove(&uniq, replica);
+ replica_hash_foreach_safe(&uniq, replica, next) {
+ replica_hash_remove(&uniq, replica);
- struct replica *orig = replicaset_search(&replicaset, replica);
+ struct replica *orig = replica_hash_search(&replicaset.hash,
+ replica);
if (orig != NULL) {
/* Use existing struct replica */
orig->applier = replica->applier;
@@ -326,15 +313,15 @@ replicaset_update(struct applier **appliers, int count)
replica = orig;
} else {
/* Add a new struct replica */
- replicaset_insert(&replicaset, replica);
+ replica_hash_insert(&replicaset.hash, replica);
}
}
- rlist_swap(&anon_replicas, &anon_replicas_new);
+ rlist_swap(&replicaset.anon, &anon_replicas);
- assert(replicaset_first(&uniq) == NULL);
- replicaset_foreach_safe(&replicaset, replica, next) {
+ assert(replica_hash_first(&uniq) == NULL);
+ replica_hash_foreach_safe(&replicaset.hash, replica, next) {
if (replica_is_orphan(replica)) {
- replicaset_remove(&replicaset, replica);
+ replica_hash_remove(&replicaset.hash, replica);
replica_delete(replica);
}
}
@@ -474,7 +461,7 @@ replicaset_follow(void)
if (replica->applier != NULL)
applier_resume(replica->applier);
}
- rlist_foreach_entry(replica, &anon_replicas, in_anon) {
+ rlist_foreach_entry(replica, &replicaset.anon, in_anon) {
/* Restart appliers that failed to connect. */
applier_start(replica->applier);
}
@@ -494,7 +481,7 @@ replica_clear_relay(struct replica *replica)
assert(replica->relay != NULL);
replica->relay = NULL;
if (replica_is_orphan(replica)) {
- replicaset_remove(&replicaset, replica);
+ replica_hash_remove(&replicaset.hash, replica);
replica_delete(replica);
}
}
@@ -502,13 +489,13 @@ replica_clear_relay(struct replica *replica)
struct replica *
replicaset_first(void)
{
- return replicaset_first(&replicaset);
+ return replica_hash_first(&replicaset.hash);
}
struct replica *
replicaset_next(struct replica *replica)
{
- return replicaset_next(&replicaset, replica);
+ return replica_hash_next(&replicaset.hash, replica);
}
struct replica *
@@ -516,5 +503,5 @@ replica_by_uuid(const struct tt_uuid *uuid)
{
struct replica key;
key.uuid = *uuid;
- return replicaset_search(&replicaset, &key);
+ return replica_hash_search(&replicaset.hash, &key);
}
diff --git a/src/box/replication.h b/src/box/replication.h
index 5f192b20..c81b6739 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -36,6 +36,8 @@
#define RB_COMPACT 1
#include <small/rb.h> /* replicaset_t */
#include <small/rlist.h>
+#include <small/mempool.h>
+#include "vclock.h"
/**
* @module replication - global state of multi-master
@@ -132,25 +134,47 @@ replication_init(void);
void
replication_free(void);
-/** Instance id vclock identifier */
+/** Instance id vclock identifier. */
extern uint32_t instance_id;
-/**
- * tx-thread local vclock reflecting the
- * state of the cluster, as maintained by the appliers.
- */
-extern struct vclock replicaset_vclock;
-
/** UUID of the instance. */
extern struct tt_uuid INSTANCE_UUID;
/** UUID of the replica set. */
extern struct tt_uuid REPLICASET_UUID;
+typedef rb_tree(struct replica) replica_hash_t;
+
+/**
+ * Replica set state.
+ *
+ * A replica set is a set of appliers and their matching
+ * relays, usually connected in full mesh.
+ */
+struct replicaset {
+ /** Memory pool for struct replica allocations. */
+ struct mempool pool;
+ /** Hash of replicas indexed by UUID. */
+ replica_hash_t hash;
+ /**
+ * List of replicas that haven't received a UUID.
+ * It contains both replicas that are still trying
+ * to connect and those that failed to connect.
+ */
+ struct rlist anon;
+ /**
+ * TX thread local vclock reflecting the state
+ * of the cluster as maintained by appliers.
+ */
+ struct vclock vclock;
+
+};
+extern struct replicaset replicaset;
+
/**
* Summary information about a replica in the replica set.
*/
struct replica {
- /** Link in a replica set. */
- rb_node(struct replica) link;
+ /** Link in replicaset::hash. */
+ rb_node(struct replica) in_hash;
/**
* Replica UUID or nil if the replica or nil if the
* applier has not received from the master yet.
diff --git a/src/box/wal.cc b/src/box/wal.cc
index c1238c84..4576cfe0 100644
--- a/src/box/wal.cc
+++ b/src/box/wal.cc
@@ -770,7 +770,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
* and promote vclock.
*/
if ((*last)->replica_id == instance_id) {
- vclock_follow(&replicaset_vclock, instance_id,
+ vclock_follow(&replicaset.vclock, instance_id,
(*last)->lsn);
break;
}
@@ -786,11 +786,11 @@ wal_write_in_wal_mode_none(struct journal *journal,
{
struct wal_writer *writer = (struct wal_writer *) journal;
wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
- int64_t old_lsn = vclock_get(&replicaset_vclock, instance_id);
+ int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
if (new_lsn > old_lsn) {
/* There were local writes, promote vclock. */
- vclock_follow(&replicaset_vclock, instance_id, new_lsn);
+ vclock_follow(&replicaset.vclock, instance_id, new_lsn);
}
return vclock_sum(&writer->vclock);
}
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 9bc6f4c1..cdc26ebc 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -43,11 +43,6 @@
#include "scramble.h"
#include "iproto_constants.h"
-/**
- * Globally unique identifier of this instance.
- */
-struct tt_uuid INSTANCE_UUID;
-
int
xrow_header_decode(struct xrow_header *header, const char **pos,
const char *end)
diff --git a/test/unit/vy_iterators_helper.c b/test/unit/vy_iterators_helper.c
index 8e94d773..a35f4ef9 100644
--- a/test/unit/vy_iterators_helper.c
+++ b/test/unit/vy_iterators_helper.c
@@ -1,8 +1,11 @@
#include "vy_iterators_helper.h"
#include "memory.h"
#include "fiber.h"
+#include "tt_uuid.h"
#include "say.h"
+struct tt_uuid INSTANCE_UUID;
+
struct tuple_format *vy_key_format = NULL;
struct vy_mem_env mem_env;
struct vy_cache_env cache_env;
--
2.11.0
More information about the Tarantool-patches
mailing list