* [tarantool-patches] [replication 1/1] replication: Introduce anonymous replicas
2018-05-04 6:45 ` [tarantool-patches] " Georgy Kirichenko
@ 2018-05-04 10:45 ` Ilya Markov
0 siblings, 0 replies; 3+ messages in thread
From: Ilya Markov @ 2018-05-04 10:45 UTC (permalink / raw)
To: georgy; +Cc: tarantool-patches
Introduce anonymous replicas - replicas info about which is not stored
in space _cluster so their info is not replicated.
Information about these replicas is stored only in runtime data structure on
the master node. Their replica_id are separated from ordinary replicas'
ids.
Closes #3340
---
branch: gh-3340-anonymous-replicas
src/box/alter.cc | 2 +-
src/box/applier.cc | 9 +-
src/box/box.cc | 82 +++++++++++----
src/box/box.h | 1 +
src/box/iproto_constants.h | 2 +
src/box/lua/cfg.cc | 14 +++
src/box/lua/info.c | 4 +
src/box/lua/load_cfg.lua | 3 +
src/box/replication.cc | 25 ++++-
src/box/replication.h | 20 +++-
src/box/xrow.c | 27 ++++-
src/box/xrow.h | 37 ++++---
test/app-tap/init_script.result | 47 +++++----
test/box-tap/cfg.test.lua | 3 +-
test/box/admin.result | 2 +
test/box/cfg.result | 4 +
test/replication/replica_anon.lua | 11 ++
test/replication/status.result | 216 ++++++++++++++++++++++++++++++++++++++
test/replication/status.test.lua | 69 ++++++++++++
19 files changed, 507 insertions(+), 71 deletions(-)
create mode 100644 test/replication/replica_anon.lua
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 1e97953..3ae6fcf 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -2705,7 +2705,7 @@ on_commit_dd_cluster(struct trigger *trigger, void *event)
replica_set_id(replica, id);
} else {
try {
- replica = replicaset_add(id, &uuid);
+ replica = replicaset_add(id, &uuid, false);
/* Can't throw exceptions from on_commit trigger */
} catch(Exception *e) {
panic("Can't register replica: %s", e->errmsg);
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 9aa951c..e61ab98 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -47,6 +47,7 @@
#include "xrow_io.h"
#include "error.h"
#include "session.h"
+#include "box.h"
STRS(applier_state, applier_STATE);
@@ -262,7 +263,11 @@ applier_join(struct applier *applier)
struct ev_io *coio = &applier->io;
struct ibuf *ibuf = &applier->ibuf;
struct xrow_header row;
- xrow_encode_join_xc(&row, &INSTANCE_UUID);
+ if (ANONYMOUS_REPLICA && !box_is_ro()) {
+ tnt_raise(ClientError, ER_CFG, "replication_anon",
+ "Only read_only replica can be anonymous");
+ }
+ xrow_encode_join_xc(&row, &INSTANCE_UUID, ANONYMOUS_REPLICA);
coio_write_xrow(coio, &row);
/**
@@ -373,7 +378,7 @@ applier_subscribe(struct applier *applier)
struct vclock remote_vclock_at_subscribe;
xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
- &replicaset.vclock);
+ &replicaset.vclock, ANONYMOUS_REPLICA);
coio_write_xrow(coio, &row);
if (applier->state == APPLIER_READY) {
diff --git a/src/box/box.cc b/src/box/box.cc
index 8f80af1..7490953 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -399,6 +399,9 @@ box_check_uri(const char *source, const char *option_name)
static void
box_check_replication(void)
{
+ if (cfg_getb("replication_anon") && !cfg_getb("read_only"))
+ tnt_raise(ClientError, ER_CFG, "replication_anon",
+ "Only read_only replica can be anonymous");
int count = cfg_getarr_size("replication");
for (int i = 0; i < count; i++) {
const char *source = cfg_getarr_elem("replication", i);
@@ -682,6 +685,12 @@ box_set_replication_connect_quorum(void)
}
void
+box_set_replication_anon(void)
+{
+ replica_set_anonymous(cfg_getb("replication_anon"));
+}
+
+void
box_bind(void)
{
const char *uri = cfg_gets("listen");
@@ -1229,14 +1238,34 @@ box_register_replica(uint32_t id, const struct tt_uuid *uuid)
}
/**
+ * Don't write info about anonymous replicas to _cluster,
+ * so simply store them in cache.
+ */
+static inline struct replica *
+add_anonymous_replica(const tt_uuid *instance_uuid)
+{
+ uint32_t replica_id = REPLICA_ID_ANON_INIT;
+ replicaset_foreach(replica) {
+ if (replica != NULL && replica->anonymous
+ && replica->id > replica_id) {
+ replica_id = replica->id;
+ }
+ }
+ return replicaset_add(replica_id, instance_uuid, true);
+}
+
+/**
* @brief Called when recovery/replication wants to add a new
* replica to the replica set.
* replica_set_id() is called as a commit trigger on _cluster
* space and actually adds the replica to the replica set.
+ * For anonymous replicas function adds them to replicaset explicitly
+ * without writing to _cluster space.
* @param instance_uuid
+ * @param anonymous
*/
static void
-box_on_join(const tt_uuid *instance_uuid)
+box_on_join(const tt_uuid *instance_uuid, bool anonymous)
{
struct replica *replica = replica_by_uuid(instance_uuid);
if (replica != NULL && replica->id != REPLICA_ID_NIL)
@@ -1244,22 +1273,27 @@ box_on_join(const tt_uuid *instance_uuid)
box_check_writable_xc();
- /** Find the largest existing replica id. */
- struct space *space = space_cache_find_xc(BOX_CLUSTER_ID);
- struct index *index = index_find_system_xc(space, 0);
- struct iterator *it = index_create_iterator_xc(index, ITER_ALL,
- NULL, 0);
- IteratorGuard iter_guard(it);
- struct tuple *tuple;
/** Assign a new replica id. */
- uint32_t replica_id = 1;
- while ((tuple = iterator_next_xc(it)) != NULL) {
- if (tuple_field_u32_xc(tuple,
- BOX_CLUSTER_FIELD_ID) != replica_id)
- break;
- replica_id++;
+
+ if (!anonymous) {
+ /** Find the largest existing replica id. */
+ struct tuple *tuple;
+ uint32_t replica_id = 1;
+ struct space *space = space_cache_find_xc(BOX_CLUSTER_ID);
+ struct index *index = index_find_system_xc(space, 0);
+ struct iterator *it = index_create_iterator_xc(index, ITER_ALL,
+ NULL, 0);
+ IteratorGuard iter_guard(it);
+ while ((tuple = iterator_next_xc(it)) != NULL) {
+ if (tuple_field_u32_xc(tuple,
+ BOX_CLUSTER_FIELD_ID) != replica_id)
+ break;
+ replica_id++;
+ }
+ box_register_replica(replica_id, instance_uuid);
+ } else {
+ add_anonymous_replica(instance_uuid);
}
- box_register_replica(replica_id, instance_uuid);
}
void
@@ -1292,7 +1326,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
*
* <= INSERT
* ...
- * Initial data: a stream of engine-specifc rows, e.g. snapshot
+ * Initial data: a stream of engine-specific rows, e.g. snapshot
* rows for memtx or dirty cursor data for Vinyl. Engine can
* use REPLICA_ID, LSN and other fields for internal purposes.
* ...
@@ -1324,7 +1358,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
/* Decode JOIN request */
struct tt_uuid instance_uuid = uuid_nil;
- xrow_decode_join_xc(header, &instance_uuid);
+ bool anonymous = false;
+ xrow_decode_join_xc(header, &instance_uuid, &anonymous);
/* Check that bootstrap has been finished */
if (!is_box_configured)
@@ -1395,7 +1430,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
* sending OK - if the hook fails, the error reaches the
* client.
*/
- box_on_join(&instance_uuid);
+ box_on_join(&instance_uuid, anonymous);
replica = replica_by_uuid(&instance_uuid);
assert(replica != NULL);
@@ -1438,9 +1473,10 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
struct tt_uuid replicaset_uuid = uuid_nil, replica_uuid = uuid_nil;
struct vclock replica_clock;
uint32_t replica_version_id;
+ bool anonymous = false;
vclock_create(&replica_clock);
xrow_decode_subscribe_xc(header, &replicaset_uuid, &replica_uuid,
- &replica_clock, &replica_version_id);
+ &replica_clock, &replica_version_id, &anonymous);
/* Forbid connection to itself */
if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID))
@@ -1464,9 +1500,13 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
/* Check replica uuid */
struct replica *replica = replica_by_uuid(&replica_uuid);
if (replica == NULL || replica->id == REPLICA_ID_NIL) {
+ if (anonymous) {
+ replica = add_anonymous_replica(&replica_uuid);
+ } else {
tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
tt_uuid_str(&replica_uuid),
tt_uuid_str(&REPLICASET_UUID));
+ }
}
/* Forbid replication with disabled WAL */
@@ -1772,6 +1812,7 @@ box_cfg_xc(void)
box_set_replication_timeout();
box_set_replication_connect_timeout();
box_set_replication_connect_quorum();
+ box_set_replication_anon();
replication_sync_lag = box_check_replication_sync_lag();
xstream_create(&join_stream, apply_initial_join_row);
xstream_create(&subscribe_stream, apply_row);
@@ -1927,7 +1968,8 @@ box_cfg_xc(void)
/* Check for correct registration of the instance in _cluster */
{
struct replica *self = replica_by_uuid(&INSTANCE_UUID);
- if (self == NULL || self->id == REPLICA_ID_NIL) {
+ if (!replica_is_anonymous() &&
+ (self == NULL || self->id == REPLICA_ID_NIL)) {
tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
tt_uuid_str(&INSTANCE_UUID),
tt_uuid_str(&REPLICASET_UUID));
diff --git a/src/box/box.h b/src/box/box.h
index e337cb0..ba15055 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -182,6 +182,7 @@ void box_set_vinyl_cache(void);
void box_set_vinyl_timeout(void);
void box_set_replication_timeout(void);
void box_set_replication_connect_quorum(void);
+void box_set_replication_anon(void);
extern "C" {
#endif /* defined(__cplusplus) */
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 34b39e1..ff12211 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -100,6 +100,8 @@ enum iproto_key {
* ]
*/
IPROTO_METADATA = 0x32,
+ /* Request key identifying flag of anonymous replicas */
+ IPROTO_ANON = 0x33,
/* Leave a gap between response keys and SQL keys. */
IPROTO_SQL_TEXT = 0x40,
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 5e88ca3..c43a1f3 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -251,6 +251,18 @@ lbox_cfg_set_replication_connect_quorum(struct lua_State *L)
return 0;
}
+static int
+lbox_cfg_set_replication_anon(struct lua_State *L)
+{
+ try {
+ box_set_replication_anon();
+ } catch (Exception *) {
+ luaT_error(L);
+ }
+ return 0;
+}
+
+
void
box_lua_cfg_init(struct lua_State *L)
{
@@ -275,6 +287,8 @@ box_lua_cfg_init(struct lua_State *L)
{"cfg_set_replication_timeout", lbox_cfg_set_replication_timeout},
{"cfg_set_replication_connect_quorum",
lbox_cfg_set_replication_connect_quorum},
+ {"cfg_set_replication_anon",
+ lbox_cfg_set_replication_anon},
{NULL, NULL}
};
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index 8e8fd9d..c24e7be 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -139,6 +139,10 @@ lbox_pushreplica(lua_State *L, struct replica *replica)
luaL_pushuint64(L, vclock_get(&replicaset.vclock, replica->id));
lua_settable(L, -3);
+ lua_pushstring(L, "anonymous");
+ lua_pushboolean(L, replica->anonymous);
+ lua_settable(L, -3);
+
if (applier != NULL && applier->state != APPLIER_OFF) {
lua_pushstring(L, "upstream");
lbox_pushapplier(L, applier);
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 636eef6..cf201c0 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -59,6 +59,7 @@ local default_cfg = {
replication_sync_lag = 10,
replication_connect_timeout = 4,
replication_connect_quorum = nil, -- connect all
+ replication_anon = false,
feedback_enabled = true,
feedback_host = "https://feedback.tarantool.io",
feedback_interval = 3600,
@@ -118,6 +119,7 @@ local template_cfg = {
replication_sync_lag = 'number',
replication_connect_timeout = 'number',
replication_connect_quorum = 'number',
+ replication_anon = 'boolean',
feedback_enabled = 'boolean',
feedback_host = 'string',
feedback_interval = 'number',
@@ -191,6 +193,7 @@ local dynamic_cfg = {
end,
force_recovery = function() end,
replication_timeout = private.cfg_set_replication_timeout,
+ replication_anon = private.cfg_set_replication_anon,
replication_connect_quorum = private.cfg_set_replication_connect_quorum,
}
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 760f837..1c1abab 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -45,6 +45,7 @@
uint32_t instance_id = REPLICA_ID_NIL;
struct tt_uuid INSTANCE_UUID;
struct tt_uuid REPLICASET_UUID;
+bool ANONYMOUS_REPLICA = false;
double replication_timeout = 1.0; /* seconds */
double replication_connect_timeout = 4.0; /* seconds */
@@ -134,6 +135,7 @@ replica_new(void)
replica->applier = NULL;
replica->relay = NULL;
replica->gc = NULL;
+ replica->anonymous = false;
rlist_create(&replica->in_anon);
trigger_create(&replica->on_applier_state,
replica_on_applier_state_f, NULL, NULL);
@@ -151,14 +153,16 @@ replica_delete(struct replica *replica)
}
struct replica *
-replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid)
+replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid, bool anon)
{
assert(!tt_uuid_is_nil(replica_uuid));
- assert(replica_id != REPLICA_ID_NIL && replica_id < VCLOCK_MAX);
+ assert(replica_id != REPLICA_ID_NIL && (replica_id < VCLOCK_MAX
+ || (anon && replica_id < INT32_MAX)));
assert(replica_by_uuid(replica_uuid) == NULL);
struct replica *replica = replica_new();
replica->uuid = *replica_uuid;
+ replica->anonymous = anon;
replica_hash_insert(&replicaset.hash, replica);
replica_set_id(replica, replica_id);
return replica;
@@ -167,7 +171,7 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid)
void
replica_set_id(struct replica *replica, uint32_t replica_id)
{
- assert(replica_id < VCLOCK_MAX);
+ assert(replica_id < VCLOCK_MAX || (replica->anonymous && replica_id < INT32_MAX));
assert(replica->id == REPLICA_ID_NIL); /* replica id is read-only */
replica->id = replica_id;
@@ -718,3 +722,18 @@ replica_by_uuid(const struct tt_uuid *uuid)
key.uuid = *uuid;
return replica_hash_search(&replicaset.hash, &key);
}
+
+void
+replica_set_anonymous(bool anonymous)
+{
+ if (anonymous && !box_is_ro())
+ tnt_raise(ClientError, ER_CFG, "replication_anon",
+ "Only read_only replica can be anonymous");
+ ANONYMOUS_REPLICA = anonymous;
+}
+
+bool
+replica_is_anonymous()
+{
+ return ANONYMOUS_REPLICA;
+}
diff --git a/src/box/replication.h b/src/box/replication.h
index 8a9d575..a87622d 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -156,6 +156,8 @@ extern uint32_t instance_id;
extern struct tt_uuid INSTANCE_UUID;
/** UUID of the replica set. */
extern struct tt_uuid REPLICASET_UUID;
+/** Flag of replica's anonymity */
+extern bool ANONYMOUS_REPLICA;
typedef rb_tree(struct replica) replica_hash_t;
@@ -256,6 +258,8 @@ struct replica {
struct trigger on_applier_state;
/** Replica sync state. */
enum replica_state state;
+ /** Flag signaling that the replica is anonymous. */
+ bool anonymous;
};
enum {
@@ -264,6 +268,10 @@ enum {
* and in cases where id is unknown.
*/
REPLICA_ID_NIL = 0,
+ /**
+ * The first id anonymous replicas start to count from.
+ */
+ REPLICA_ID_ANON_INIT = INT32_MAX / 2,
};
/**
@@ -317,6 +325,14 @@ replica_set_relay(struct replica *replica, struct relay *relay);
void
replica_clear_relay(struct replica *replica);
+/**
+ * Internal setter of anonymous flag.
+ */
+void
+replica_set_anonymous(bool anonymous);
+
+bool
+replica_is_anonymous();
#if defined(__cplusplus)
} /* extern "C" */
@@ -327,10 +343,10 @@ replica_check_id(uint32_t replica_id);
* Register the universally unique identifier of a remote replica and
* a matching replica-set-local identifier in the _cluster registry.
* Called from on_replace_dd_cluster() when a remote master joins the
- * replica set.
+ * replica set and box_on_join for registering anonymous replica.
*/
struct replica *
-replicaset_add(uint32_t replica_id, const struct tt_uuid *instance_uuid);
+replicaset_add(uint32_t replica_id, const struct tt_uuid *instance_uuid, bool anon);
/**
* Try to connect appliers to remote peers and receive UUID.
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 3ef3d82..de9172f 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -864,7 +864,7 @@ int
xrow_encode_subscribe(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
- const struct vclock *vclock)
+ const struct vclock *vclock, bool anonymous)
{
memset(row, 0, sizeof(*row));
uint32_t replicaset_size = vclock_size(vclock);
@@ -876,7 +876,7 @@ xrow_encode_subscribe(struct xrow_header *row,
return -1;
}
char *data = buf;
- data = mp_encode_map(data, 4);
+ data = mp_encode_map(data, 5);
data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
data = xrow_encode_uuid(data, replicaset_uuid);
data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
@@ -891,6 +891,9 @@ xrow_encode_subscribe(struct xrow_header *row,
}
data = mp_encode_uint(data, IPROTO_SERVER_VERSION);
data = mp_encode_uint(data, tarantool_version_id());
+ data = mp_encode_uint(data, IPROTO_ANON);
+ data = mp_encode_bool(data, anonymous);
+
assert(data <= buf + size);
row->body[0].iov_base = buf;
row->body[0].iov_len = (data - buf);
@@ -902,7 +905,7 @@ xrow_encode_subscribe(struct xrow_header *row,
int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
- uint32_t *version_id)
+ uint32_t *version_id, bool *anonymous)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -961,6 +964,16 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
}
*version_id = mp_decode_uint(&d);
break;
+ case IPROTO_ANON:
+ if (anonymous == NULL)
+ goto skip;
+ if (mp_typeof(*d) != MP_BOOL) {
+ diag_set(ClientError, ER_INVALID_MSGPACK,
+ "invalid ANONYMOUS");
+ return -1;
+ }
+ *anonymous = mp_decode_bool(&d);
+ break;
default: skip:
mp_next(&d); /* value */
}
@@ -989,7 +1002,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
}
int
-xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid)
+xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid,
+ bool anonymous)
{
memset(row, 0, sizeof(*row));
@@ -1000,10 +1014,13 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid)
return -1;
}
char *data = buf;
- data = mp_encode_map(data, 1);
+ data = mp_encode_map(data, 2);
data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
/* Greet the remote replica with our replica UUID */
data = xrow_encode_uuid(data, instance_uuid);
+ /* Add flag signaling about our wish to be anonymous. */
+ data = mp_encode_uint(data, IPROTO_ANON);
+ data = mp_encode_bool(data, anonymous);
assert(data <= buf + size);
row->body[0].iov_base = buf;
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 0ca7152..71a0ff9 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -248,6 +248,7 @@ xrow_encode_request_vote(struct xrow_header *row);
* @param replicaset_uuid Replica set uuid.
* @param instance_uuid Instance uuid.
* @param vclock Replication clock.
+ * @param anonymous anonymous flag.
*
* @retval 0 Success.
* @retval -1 Memory error.
@@ -256,7 +257,7 @@ int
xrow_encode_subscribe(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
- const struct vclock *vclock);
+ const struct vclock *vclock, bool anonymous);
/**
* Decode SUBSCRIBE command.
@@ -264,6 +265,8 @@ xrow_encode_subscribe(struct xrow_header *row,
* @param[out] replicaset_uuid.
* @param[out] instance_uuid.
* @param[out] vclock.
+ * @param[out] version_id.
+ * @param[out] anonymous.
*
* @retval 0 Success.
* @retval -1 Memory or format error.
@@ -271,31 +274,36 @@ xrow_encode_subscribe(struct xrow_header *row,
int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
- uint32_t *version_id);
+ uint32_t *version_id, bool *anonymous);
/**
* Encode JOIN command.
* @param[out] row Row to encode into.
* @param instance_uuid.
+ * @param anonymous - flag signaling if joining replica wants to be anonymous.
*
* @retval 0 Success.
* @retval -1 Memory error.
*/
int
-xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid);
+xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid,
+ bool anonymous);
/**
* Decode JOIN command.
* @param row Row to decode.
* @param[out] instance_uuid.
+ * @param[out] anonymous flag.
*
* @retval 0 Success.
* @retval -1 Memory or format error.
*/
static inline int
-xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid)
+xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid,
+ bool *anonymous)
{
- return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL);
+ return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL,
+ anonymous);
}
/**
@@ -320,7 +328,7 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock);
static inline int
xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock)
{
- return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL);
+ return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL);
}
/**
@@ -613,10 +621,10 @@ static inline void
xrow_encode_subscribe_xc(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
- const struct vclock *vclock)
+ const struct vclock *vclock, bool anonymous)
{
if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,
- vclock) != 0)
+ vclock, anonymous) != 0)
diag_raise();
}
@@ -625,27 +633,28 @@ static inline void
xrow_decode_subscribe_xc(struct xrow_header *row,
struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
- uint32_t *replica_version_id)
+ uint32_t *replica_version_id, bool *anonymous)
{
if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
- vclock, replica_version_id) != 0)
+ vclock, replica_version_id, anonymous) != 0)
diag_raise();
}
/** @copydoc xrow_encode_join. */
static inline void
xrow_encode_join_xc(struct xrow_header *row,
- const struct tt_uuid *instance_uuid)
+ const struct tt_uuid *instance_uuid, bool anonymous)
{
- if (xrow_encode_join(row, instance_uuid) != 0)
+ if (xrow_encode_join(row, instance_uuid, anonymous) != 0)
diag_raise();
}
/** @copydoc xrow_decode_join. */
static inline void
-xrow_decode_join_xc(struct xrow_header *row, struct tt_uuid *instance_uuid)
+xrow_decode_join_xc(struct xrow_header *row, struct tt_uuid *instance_uuid,
+ bool *anonymous)
{
- if (xrow_decode_join(row, instance_uuid) != 0)
+ if (xrow_decode_join(row, instance_uuid, anonymous) != 0)
diag_raise();
}
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 9d3bc85..1160d63 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -23,29 +23,30 @@ box.cfg
18 pid_file:box.pid
19 read_only:false
20 readahead:16320
-21 replication_connect_timeout:4
-22 replication_sync_lag:10
-23 replication_timeout:1
-24 rows_per_wal:500000
-25 slab_alloc_factor:1.05
-26 too_long_threshold:0.5
-27 vinyl_bloom_fpr:0.05
-28 vinyl_cache:134217728
-29 vinyl_dir:.
-30 vinyl_max_tuple_size:1048576
-31 vinyl_memory:134217728
-32 vinyl_page_size:8192
-33 vinyl_range_size:1073741824
-34 vinyl_read_threads:1
-35 vinyl_run_count_per_level:2
-36 vinyl_run_size_ratio:3.5
-37 vinyl_timeout:60
-38 vinyl_write_threads:2
-39 wal_dir:.
-40 wal_dir_rescan_delay:2
-41 wal_max_size:268435456
-42 wal_mode:write
-43 worker_pool_threads:4
+21 replication_anon:false
+22 replication_connect_timeout:4
+23 replication_sync_lag:10
+24 replication_timeout:1
+25 rows_per_wal:500000
+26 slab_alloc_factor:1.05
+27 too_long_threshold:0.5
+28 vinyl_bloom_fpr:0.05
+29 vinyl_cache:134217728
+30 vinyl_dir:.
+31 vinyl_max_tuple_size:1048576
+32 vinyl_memory:134217728
+33 vinyl_page_size:8192
+34 vinyl_range_size:1073741824
+35 vinyl_read_threads:1
+36 vinyl_run_count_per_level:2
+37 vinyl_run_size_ratio:3.5
+38 vinyl_timeout:60
+39 vinyl_write_threads:2
+40 wal_dir:.
+41 wal_dir_rescan_delay:2
+42 wal_max_size:268435456
+43 wal_mode:write
+44 worker_pool_threads:4
--
-- Test insert from detached fiber
--
diff --git a/test/box-tap/cfg.test.lua b/test/box-tap/cfg.test.lua
index 8496929..116f27e 100755
--- a/test/box-tap/cfg.test.lua
+++ b/test/box-tap/cfg.test.lua
@@ -6,7 +6,7 @@ local socket = require('socket')
local fio = require('fio')
local uuid = require('uuid')
local msgpack = require('msgpack')
-test:plan(91)
+test:plan(92)
--------------------------------------------------------------------------------
-- Invalid values
@@ -46,6 +46,7 @@ invalid('vinyl_run_count_per_level', 0)
invalid('vinyl_run_size_ratio', 1)
invalid('vinyl_bloom_fpr', 0)
invalid('vinyl_bloom_fpr', 1.1)
+invalid("replication_anon", true)
local function invalid_combinations(name, val)
local status, result = pcall(box.cfg, val)
diff --git a/test/box/admin.result b/test/box/admin.result
index ff5a9ef..9eb74a7 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -66,6 +66,8 @@ cfg_filter(box.cfg)
- false
- - readahead
- 16320
+ - - replication_anon
+ - false
- - replication_connect_timeout
- 4
- - replication_sync_lag
diff --git a/test/box/cfg.result b/test/box/cfg.result
index b11346d..8b5dd66 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -54,6 +54,8 @@ cfg_filter(box.cfg)
- false
- - readahead
- 16320
+ - - replication_anon
+ - false
- - replication_connect_timeout
- 4
- - replication_sync_lag
@@ -147,6 +149,8 @@ cfg_filter(box.cfg)
- false
- - readahead
- 16320
+ - - replication_anon
+ - false
- - replication_connect_timeout
- 4
- - replication_sync_lag
diff --git a/test/replication/replica_anon.lua b/test/replication/replica_anon.lua
new file mode 100644
index 0000000..2846441
--- /dev/null
+++ b/test/replication/replica_anon.lua
@@ -0,0 +1,11 @@
+#!/usr/bin/env tarantool
+
+box.cfg({
+ listen = os.getenv("LISTEN"),
+ replication = os.getenv("MASTER"),
+ memtx_memory = 107374182,
+ read_only = true,
+ replication_anon = true
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/status.result b/test/replication/status.result
index 8394b98..268b5fc 100644
--- a/test/replication/status.result
+++ b/test/replication/status.result
@@ -86,6 +86,10 @@ test_run:cmd("create server replica with rpl_master=default, script='replication
---
- true
...
+test_run:cmd("create server replica_anon with rpl_master=default, script='replication/replica_anon.lua'")
+---
+- true
+...
test_run:cmd("start server replica")
---
- true
@@ -144,6 +148,10 @@ master.downstream == nil
---
- true
...
+master.anonymous == false
+---
+- true
+...
-- replica's status
replica_id = test_run:get_server_id('replica')
---
@@ -238,6 +246,10 @@ master.downstream == nil
---
- true
...
+master.anonymous == false
+---
+- true
+...
-- replica's status
replica_id = box.info.id
---
@@ -275,6 +287,181 @@ replica.downstream == nil
---
- true
...
+replica.anonymous == false
+---
+- true
+...
+test_run:cmd('switch default')
+---
+- true
+...
+-- Start anonymous replica
+test_run:cmd("start server replica_anon")
+---
+- true
+...
+anon_init_id = 1073741823
+---
+...
+#box.info.vclock == 1 -- box.info.vclock[replica_id] is nil
+---
+- true
+...
+box.info.replication[anon_init_id] ~= nil
+---
+- true
+...
+box.space._cluster:count() == 2
+---
+- true
+...
+-- master's status
+box.info.vclock[master_id] == 2 -- grant + registration == 2
+---
+- true
+...
+box.info.lsn == box.info.vclock[master_id]
+---
+- true
+...
+master = box.info.replication[master_id]
+---
+...
+master.id == master_id
+---
+- true
+...
+master.uuid == box.space._cluster:get(master_id)[2]
+---
+- true
+...
+master.lsn == box.info.vclock[master_id]
+---
+- true
+...
+master.upstream == nil
+---
+- true
+...
+master.downstream == nil
+---
+- true
+...
+master.anonymous == false
+---
+- true
+...
+-- anon replica's status
+box.info.vclock[anon_init_id] == nil
+---
+- true
+...
+replica = box.info.replication[anon_init_id]
+---
+...
+replica.id == anon_init_id
+---
+- true
+...
+box.space._cluster:get(anon_init_id)
+---
+...
+-- replica.lsn == box.info.vclock[anon_init_id]
+replica.lsn == 0
+---
+- true
+...
+replica.upstream == nil
+---
+- true
+...
+replica.downstream.vclock[master_id] == box.info.vclock[master_id]
+---
+- true
+...
+replica.downstream.vclock[anon_init_id] == box.info.vclock[anon_init_id]
+---
+- true
+...
+replica.anonymous == true
+---
+- true
+...
+test_run:cmd('switch replica_anon')
+---
+- true
+...
+#box.info.vclock == 1 -- box.info.vclock[replica_id] is nil
+---
+- true
+...
+#box.info.replication == 2
+---
+- true
+...
+box.space._cluster:count() == 2
+---
+- true
+...
+-- master's status
+master_id = test_run:get_server_id('default')
+---
+...
+box.info.vclock[master_id] == 2
+---
+- true
+...
+master = box.info.replication[master_id]
+---
+...
+master.id == master_id
+---
+- true
+...
+master.uuid == box.space._cluster:get(master_id)[2]
+---
+- true
+...
+master.upstream.status == "follow"
+---
+- true
+...
+master.upstream.lag < 1
+---
+- true
+...
+master.upstream.idle < 1
+---
+- true
+...
+master.upstream.peer:match("localhost")
+---
+- localhost
+...
+master.downstream == nil
+---
+- true
+...
+master.anonymous == false
+---
+- true
+...
+replica_id = box.info.id
+---
+...
+box.info.vclock[replica_id] == nil
+---
+- true
+...
+box.info.lsn == 0
+---
+- false
+...
+-- there is no information about itself on anon replica
+box.info.replication[replica_id] == nil
+---
+- true
+...
--
-- ClientError during replication
--
@@ -377,6 +564,27 @@ test_run:cmd('switch default')
---
- true
...
+-- check subsribe from anonymous after master restart
+test_run:cmd('switch default')
+---
+- true
+...
+test_run:cmd("stop server replica_anon")
+---
+- true
+...
+test_run:cmd("restart server default")
+test_run:cmd("start server replica_anon")
+---
+- true
+...
+anon_init_id = 1073741823
+---
+...
+box.info.replication[anon_init_id] ~= nil
+---
+- true
+...
--
-- Cleanup
--
@@ -387,7 +595,15 @@ test_run:cmd("stop server replica")
---
- true
...
+test_run:cmd("stop server replica_anon")
+---
+- true
+...
test_run:cmd("cleanup server replica")
---
- true
...
+test_run:cmd("cleanup server replica_anon")
+---
+- true
+...
diff --git a/test/replication/status.test.lua b/test/replication/status.test.lua
index 8bb25e0..690f0aa 100644
--- a/test/replication/status.test.lua
+++ b/test/replication/status.test.lua
@@ -33,6 +33,7 @@ master.downstream == nil
-- Start Master -> Slave replication
test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("create server replica_anon with rpl_master=default, script='replication/replica_anon.lua'")
test_run:cmd("start server replica")
--
@@ -54,6 +55,7 @@ master.uuid == box.space._cluster:get(master_id)[2]
master.lsn == box.info.vclock[master_id]
master.upstream == nil
master.downstream == nil
+master.anonymous == false
-- replica's status
replica_id = test_run:get_server_id('replica')
@@ -87,6 +89,7 @@ master.upstream.lag < 1
master.upstream.idle < 1
master.upstream.peer:match("localhost")
master.downstream == nil
+master.anonymous == false
-- replica's status
replica_id = box.info.id
@@ -100,6 +103,63 @@ replica.uuid == box.space._cluster:get(replica_id)[2]
replica.lsn == 0
replica.upstream == nil
replica.downstream == nil
+replica.anonymous == false
+
+test_run:cmd('switch default')
+-- Start anonymous replica
+test_run:cmd("start server replica_anon")
+anon_init_id = 1073741823
+#box.info.vclock == 1 -- box.info.vclock[replica_id] is nil
+box.info.replication[anon_init_id] ~= nil
+box.space._cluster:count() == 2
+
+-- master's status
+box.info.vclock[master_id] == 2 -- grant + registration == 2
+box.info.lsn == box.info.vclock[master_id]
+master = box.info.replication[master_id]
+master.id == master_id
+master.uuid == box.space._cluster:get(master_id)[2]
+master.lsn == box.info.vclock[master_id]
+master.upstream == nil
+master.downstream == nil
+master.anonymous == false
+
+-- anon replica's status
+
+box.info.vclock[anon_init_id] == nil
+replica = box.info.replication[anon_init_id]
+replica.id == anon_init_id
+box.space._cluster:get(anon_init_id)
+-- replica.lsn == box.info.vclock[anon_init_id]
+replica.lsn == 0
+replica.upstream == nil
+replica.downstream.vclock[master_id] == box.info.vclock[master_id]
+replica.downstream.vclock[anon_init_id] == box.info.vclock[anon_init_id]
+replica.anonymous == true
+
+test_run:cmd('switch replica_anon')
+#box.info.vclock == 1 -- box.info.vclock[replica_id] is nil
+#box.info.replication == 2
+box.space._cluster:count() == 2
+
+-- master's status
+master_id = test_run:get_server_id('default')
+box.info.vclock[master_id] == 2
+master = box.info.replication[master_id]
+master.id == master_id
+master.uuid == box.space._cluster:get(master_id)[2]
+master.upstream.status == "follow"
+master.upstream.lag < 1
+master.upstream.idle < 1
+master.upstream.peer:match("localhost")
+master.downstream == nil
+master.anonymous == false
+
+replica_id = box.info.id
+box.info.vclock[replica_id] == nil
+box.info.lsn == 0
+-- there is no information about itself on anon replica
+box.info.replication[replica_id] == nil
--
-- ClientError during replication
@@ -135,10 +195,19 @@ master.upstream.peer:match("localhost")
master.downstream == nil
test_run:cmd('switch default')
+-- check subsribe from anonymous after master restart
+test_run:cmd('switch default')
+test_run:cmd("stop server replica_anon")
+test_run:cmd("restart server default")
+test_run:cmd("start server replica_anon")
+anon_init_id = 1073741823
+box.info.replication[anon_init_id] ~= nil
--
-- Cleanup
--
box.schema.user.revoke('guest', 'replication')
test_run:cmd("stop server replica")
+test_run:cmd("stop server replica_anon")
test_run:cmd("cleanup server replica")
+test_run:cmd("cleanup server replica_anon")
--
2.7.4
^ permalink raw reply [flat|nested] 3+ messages in thread