From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id E3C4A234D6 for ; Fri, 4 May 2018 06:46:06 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id ZThH00dKNSmh for ; Fri, 4 May 2018 06:46:06 -0400 (EDT) Received: from smtp61.i.mail.ru (smtp61.i.mail.ru [217.69.128.41]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 5818420AFD for ; Fri, 4 May 2018 06:46:06 -0400 (EDT) From: Ilya Markov Subject: [tarantool-patches] [replication 1/1] replication: Introduce anonymous replicas Date: Fri, 4 May 2018 13:45:53 +0300 Message-Id: In-Reply-To: <2933481.OYFXySxWCs@home.lan> References: <2933481.OYFXySxWCs@home.lan> Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: georgy@tarantool.org Cc: tarantool-patches@freelists.org Introduce anonymous replicas - replicas info about which is not stored in space _cluster so their info is not replicated. Information about these replicas is stored only in runtime data structure on the master node. Their replica_id are separated from ordinary replicas' ids. Closes #3340 --- branch: gh-3340-anonymous-replicas src/box/alter.cc | 2 +- src/box/applier.cc | 9 +- src/box/box.cc | 82 +++++++++++---- src/box/box.h | 1 + src/box/iproto_constants.h | 2 + src/box/lua/cfg.cc | 14 +++ src/box/lua/info.c | 4 + src/box/lua/load_cfg.lua | 3 + src/box/replication.cc | 25 ++++- src/box/replication.h | 20 +++- src/box/xrow.c | 27 ++++- src/box/xrow.h | 37 ++++--- test/app-tap/init_script.result | 47 +++++---- test/box-tap/cfg.test.lua | 3 +- test/box/admin.result | 2 + test/box/cfg.result | 4 + test/replication/replica_anon.lua | 11 ++ test/replication/status.result | 216 ++++++++++++++++++++++++++++++++++++++ test/replication/status.test.lua | 69 ++++++++++++ 19 files changed, 507 insertions(+), 71 deletions(-) create mode 100644 test/replication/replica_anon.lua diff --git a/src/box/alter.cc b/src/box/alter.cc index 1e97953..3ae6fcf 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -2705,7 +2705,7 @@ on_commit_dd_cluster(struct trigger *trigger, void *event) replica_set_id(replica, id); } else { try { - replica = replicaset_add(id, &uuid); + replica = replicaset_add(id, &uuid, false); /* Can't throw exceptions from on_commit trigger */ } catch(Exception *e) { panic("Can't register replica: %s", e->errmsg); diff --git a/src/box/applier.cc b/src/box/applier.cc index 9aa951c..e61ab98 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -47,6 +47,7 @@ #include "xrow_io.h" #include "error.h" #include "session.h" +#include "box.h" STRS(applier_state, applier_STATE); @@ -262,7 +263,11 @@ applier_join(struct applier *applier) struct ev_io *coio = &applier->io; struct ibuf *ibuf = &applier->ibuf; struct xrow_header row; - xrow_encode_join_xc(&row, &INSTANCE_UUID); + if (ANONYMOUS_REPLICA && !box_is_ro()) { + tnt_raise(ClientError, ER_CFG, "replication_anon", + "Only read_only replica can be anonymous"); + } + xrow_encode_join_xc(&row, &INSTANCE_UUID, ANONYMOUS_REPLICA); coio_write_xrow(coio, &row); /** @@ -373,7 +378,7 @@ applier_subscribe(struct applier *applier) struct vclock remote_vclock_at_subscribe; xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, - &replicaset.vclock); + &replicaset.vclock, ANONYMOUS_REPLICA); coio_write_xrow(coio, &row); if (applier->state == APPLIER_READY) { diff --git a/src/box/box.cc b/src/box/box.cc index 8f80af1..7490953 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -399,6 +399,9 @@ box_check_uri(const char *source, const char *option_name) static void box_check_replication(void) { + if (cfg_getb("replication_anon") && !cfg_getb("read_only")) + tnt_raise(ClientError, ER_CFG, "replication_anon", + "Only read_only replica can be anonymous"); int count = cfg_getarr_size("replication"); for (int i = 0; i < count; i++) { const char *source = cfg_getarr_elem("replication", i); @@ -682,6 +685,12 @@ box_set_replication_connect_quorum(void) } void +box_set_replication_anon(void) +{ + replica_set_anonymous(cfg_getb("replication_anon")); +} + +void box_bind(void) { const char *uri = cfg_gets("listen"); @@ -1229,14 +1238,34 @@ box_register_replica(uint32_t id, const struct tt_uuid *uuid) } /** + * Don't write info about anonymous replicas to _cluster, + * so simply store them in cache. + */ +static inline struct replica * +add_anonymous_replica(const tt_uuid *instance_uuid) +{ + uint32_t replica_id = REPLICA_ID_ANON_INIT; + replicaset_foreach(replica) { + if (replica != NULL && replica->anonymous + && replica->id > replica_id) { + replica_id = replica->id; + } + } + return replicaset_add(replica_id, instance_uuid, true); +} + +/** * @brief Called when recovery/replication wants to add a new * replica to the replica set. * replica_set_id() is called as a commit trigger on _cluster * space and actually adds the replica to the replica set. + * For anonymous replicas function adds them to replicaset explicitly + * without writing to _cluster space. * @param instance_uuid + * @param anonymous */ static void -box_on_join(const tt_uuid *instance_uuid) +box_on_join(const tt_uuid *instance_uuid, bool anonymous) { struct replica *replica = replica_by_uuid(instance_uuid); if (replica != NULL && replica->id != REPLICA_ID_NIL) @@ -1244,22 +1273,27 @@ box_on_join(const tt_uuid *instance_uuid) box_check_writable_xc(); - /** Find the largest existing replica id. */ - struct space *space = space_cache_find_xc(BOX_CLUSTER_ID); - struct index *index = index_find_system_xc(space, 0); - struct iterator *it = index_create_iterator_xc(index, ITER_ALL, - NULL, 0); - IteratorGuard iter_guard(it); - struct tuple *tuple; /** Assign a new replica id. */ - uint32_t replica_id = 1; - while ((tuple = iterator_next_xc(it)) != NULL) { - if (tuple_field_u32_xc(tuple, - BOX_CLUSTER_FIELD_ID) != replica_id) - break; - replica_id++; + + if (!anonymous) { + /** Find the largest existing replica id. */ + struct tuple *tuple; + uint32_t replica_id = 1; + struct space *space = space_cache_find_xc(BOX_CLUSTER_ID); + struct index *index = index_find_system_xc(space, 0); + struct iterator *it = index_create_iterator_xc(index, ITER_ALL, + NULL, 0); + IteratorGuard iter_guard(it); + while ((tuple = iterator_next_xc(it)) != NULL) { + if (tuple_field_u32_xc(tuple, + BOX_CLUSTER_FIELD_ID) != replica_id) + break; + replica_id++; + } + box_register_replica(replica_id, instance_uuid); + } else { + add_anonymous_replica(instance_uuid); } - box_register_replica(replica_id, instance_uuid); } void @@ -1292,7 +1326,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header) * * <= INSERT * ... - * Initial data: a stream of engine-specifc rows, e.g. snapshot + * Initial data: a stream of engine-specific rows, e.g. snapshot * rows for memtx or dirty cursor data for Vinyl. Engine can * use REPLICA_ID, LSN and other fields for internal purposes. * ... @@ -1324,7 +1358,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header) /* Decode JOIN request */ struct tt_uuid instance_uuid = uuid_nil; - xrow_decode_join_xc(header, &instance_uuid); + bool anonymous = false; + xrow_decode_join_xc(header, &instance_uuid, &anonymous); /* Check that bootstrap has been finished */ if (!is_box_configured) @@ -1395,7 +1430,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header) * sending OK - if the hook fails, the error reaches the * client. */ - box_on_join(&instance_uuid); + box_on_join(&instance_uuid, anonymous); replica = replica_by_uuid(&instance_uuid); assert(replica != NULL); @@ -1438,9 +1473,10 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) struct tt_uuid replicaset_uuid = uuid_nil, replica_uuid = uuid_nil; struct vclock replica_clock; uint32_t replica_version_id; + bool anonymous = false; vclock_create(&replica_clock); xrow_decode_subscribe_xc(header, &replicaset_uuid, &replica_uuid, - &replica_clock, &replica_version_id); + &replica_clock, &replica_version_id, &anonymous); /* Forbid connection to itself */ if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID)) @@ -1464,9 +1500,13 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) /* Check replica uuid */ struct replica *replica = replica_by_uuid(&replica_uuid); if (replica == NULL || replica->id == REPLICA_ID_NIL) { + if (anonymous) { + replica = add_anonymous_replica(&replica_uuid); + } else { tnt_raise(ClientError, ER_UNKNOWN_REPLICA, tt_uuid_str(&replica_uuid), tt_uuid_str(&REPLICASET_UUID)); + } } /* Forbid replication with disabled WAL */ @@ -1772,6 +1812,7 @@ box_cfg_xc(void) box_set_replication_timeout(); box_set_replication_connect_timeout(); box_set_replication_connect_quorum(); + box_set_replication_anon(); replication_sync_lag = box_check_replication_sync_lag(); xstream_create(&join_stream, apply_initial_join_row); xstream_create(&subscribe_stream, apply_row); @@ -1927,7 +1968,8 @@ box_cfg_xc(void) /* Check for correct registration of the instance in _cluster */ { struct replica *self = replica_by_uuid(&INSTANCE_UUID); - if (self == NULL || self->id == REPLICA_ID_NIL) { + if (!replica_is_anonymous() && + (self == NULL || self->id == REPLICA_ID_NIL)) { tnt_raise(ClientError, ER_UNKNOWN_REPLICA, tt_uuid_str(&INSTANCE_UUID), tt_uuid_str(&REPLICASET_UUID)); diff --git a/src/box/box.h b/src/box/box.h index e337cb0..ba15055 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -182,6 +182,7 @@ void box_set_vinyl_cache(void); void box_set_vinyl_timeout(void); void box_set_replication_timeout(void); void box_set_replication_connect_quorum(void); +void box_set_replication_anon(void); extern "C" { #endif /* defined(__cplusplus) */ diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 34b39e1..ff12211 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -100,6 +100,8 @@ enum iproto_key { * ] */ IPROTO_METADATA = 0x32, + /* Request key identifying flag of anonymous replicas */ + IPROTO_ANON = 0x33, /* Leave a gap between response keys and SQL keys. */ IPROTO_SQL_TEXT = 0x40, diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc index 5e88ca3..c43a1f3 100644 --- a/src/box/lua/cfg.cc +++ b/src/box/lua/cfg.cc @@ -251,6 +251,18 @@ lbox_cfg_set_replication_connect_quorum(struct lua_State *L) return 0; } +static int +lbox_cfg_set_replication_anon(struct lua_State *L) +{ + try { + box_set_replication_anon(); + } catch (Exception *) { + luaT_error(L); + } + return 0; +} + + void box_lua_cfg_init(struct lua_State *L) { @@ -275,6 +287,8 @@ box_lua_cfg_init(struct lua_State *L) {"cfg_set_replication_timeout", lbox_cfg_set_replication_timeout}, {"cfg_set_replication_connect_quorum", lbox_cfg_set_replication_connect_quorum}, + {"cfg_set_replication_anon", + lbox_cfg_set_replication_anon}, {NULL, NULL} }; diff --git a/src/box/lua/info.c b/src/box/lua/info.c index 8e8fd9d..c24e7be 100644 --- a/src/box/lua/info.c +++ b/src/box/lua/info.c @@ -139,6 +139,10 @@ lbox_pushreplica(lua_State *L, struct replica *replica) luaL_pushuint64(L, vclock_get(&replicaset.vclock, replica->id)); lua_settable(L, -3); + lua_pushstring(L, "anonymous"); + lua_pushboolean(L, replica->anonymous); + lua_settable(L, -3); + if (applier != NULL && applier->state != APPLIER_OFF) { lua_pushstring(L, "upstream"); lbox_pushapplier(L, applier); diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 636eef6..cf201c0 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -59,6 +59,7 @@ local default_cfg = { replication_sync_lag = 10, replication_connect_timeout = 4, replication_connect_quorum = nil, -- connect all + replication_anon = false, feedback_enabled = true, feedback_host = "https://feedback.tarantool.io", feedback_interval = 3600, @@ -118,6 +119,7 @@ local template_cfg = { replication_sync_lag = 'number', replication_connect_timeout = 'number', replication_connect_quorum = 'number', + replication_anon = 'boolean', feedback_enabled = 'boolean', feedback_host = 'string', feedback_interval = 'number', @@ -191,6 +193,7 @@ local dynamic_cfg = { end, force_recovery = function() end, replication_timeout = private.cfg_set_replication_timeout, + replication_anon = private.cfg_set_replication_anon, replication_connect_quorum = private.cfg_set_replication_connect_quorum, } diff --git a/src/box/replication.cc b/src/box/replication.cc index 760f837..1c1abab 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -45,6 +45,7 @@ uint32_t instance_id = REPLICA_ID_NIL; struct tt_uuid INSTANCE_UUID; struct tt_uuid REPLICASET_UUID; +bool ANONYMOUS_REPLICA = false; double replication_timeout = 1.0; /* seconds */ double replication_connect_timeout = 4.0; /* seconds */ @@ -134,6 +135,7 @@ replica_new(void) replica->applier = NULL; replica->relay = NULL; replica->gc = NULL; + replica->anonymous = false; rlist_create(&replica->in_anon); trigger_create(&replica->on_applier_state, replica_on_applier_state_f, NULL, NULL); @@ -151,14 +153,16 @@ replica_delete(struct replica *replica) } struct replica * -replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid) +replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid, bool anon) { assert(!tt_uuid_is_nil(replica_uuid)); - assert(replica_id != REPLICA_ID_NIL && replica_id < VCLOCK_MAX); + assert(replica_id != REPLICA_ID_NIL && (replica_id < VCLOCK_MAX + || (anon && replica_id < INT32_MAX))); assert(replica_by_uuid(replica_uuid) == NULL); struct replica *replica = replica_new(); replica->uuid = *replica_uuid; + replica->anonymous = anon; replica_hash_insert(&replicaset.hash, replica); replica_set_id(replica, replica_id); return replica; @@ -167,7 +171,7 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid) void replica_set_id(struct replica *replica, uint32_t replica_id) { - assert(replica_id < VCLOCK_MAX); + assert(replica_id < VCLOCK_MAX || (replica->anonymous && replica_id < INT32_MAX)); assert(replica->id == REPLICA_ID_NIL); /* replica id is read-only */ replica->id = replica_id; @@ -718,3 +722,18 @@ replica_by_uuid(const struct tt_uuid *uuid) key.uuid = *uuid; return replica_hash_search(&replicaset.hash, &key); } + +void +replica_set_anonymous(bool anonymous) +{ + if (anonymous && !box_is_ro()) + tnt_raise(ClientError, ER_CFG, "replication_anon", + "Only read_only replica can be anonymous"); + ANONYMOUS_REPLICA = anonymous; +} + +bool +replica_is_anonymous() +{ + return ANONYMOUS_REPLICA; +} diff --git a/src/box/replication.h b/src/box/replication.h index 8a9d575..a87622d 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -156,6 +156,8 @@ extern uint32_t instance_id; extern struct tt_uuid INSTANCE_UUID; /** UUID of the replica set. */ extern struct tt_uuid REPLICASET_UUID; +/** Flag of replica's anonymity */ +extern bool ANONYMOUS_REPLICA; typedef rb_tree(struct replica) replica_hash_t; @@ -256,6 +258,8 @@ struct replica { struct trigger on_applier_state; /** Replica sync state. */ enum replica_state state; + /** Flag signaling that the replica is anonymous. */ + bool anonymous; }; enum { @@ -264,6 +268,10 @@ enum { * and in cases where id is unknown. */ REPLICA_ID_NIL = 0, + /** + * The first id anonymous replicas start to count from. + */ + REPLICA_ID_ANON_INIT = INT32_MAX / 2, }; /** @@ -317,6 +325,14 @@ replica_set_relay(struct replica *replica, struct relay *relay); void replica_clear_relay(struct replica *replica); +/** + * Internal setter of anonymous flag. + */ +void +replica_set_anonymous(bool anonymous); + +bool +replica_is_anonymous(); #if defined(__cplusplus) } /* extern "C" */ @@ -327,10 +343,10 @@ replica_check_id(uint32_t replica_id); * Register the universally unique identifier of a remote replica and * a matching replica-set-local identifier in the _cluster registry. * Called from on_replace_dd_cluster() when a remote master joins the - * replica set. + * replica set and box_on_join for registering anonymous replica. */ struct replica * -replicaset_add(uint32_t replica_id, const struct tt_uuid *instance_uuid); +replicaset_add(uint32_t replica_id, const struct tt_uuid *instance_uuid, bool anon); /** * Try to connect appliers to remote peers and receive UUID. diff --git a/src/box/xrow.c b/src/box/xrow.c index 3ef3d82..de9172f 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -864,7 +864,7 @@ int xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, - const struct vclock *vclock) + const struct vclock *vclock, bool anonymous) { memset(row, 0, sizeof(*row)); uint32_t replicaset_size = vclock_size(vclock); @@ -876,7 +876,7 @@ xrow_encode_subscribe(struct xrow_header *row, return -1; } char *data = buf; - data = mp_encode_map(data, 4); + data = mp_encode_map(data, 5); data = mp_encode_uint(data, IPROTO_CLUSTER_UUID); data = xrow_encode_uuid(data, replicaset_uuid); data = mp_encode_uint(data, IPROTO_INSTANCE_UUID); @@ -891,6 +891,9 @@ xrow_encode_subscribe(struct xrow_header *row, } data = mp_encode_uint(data, IPROTO_SERVER_VERSION); data = mp_encode_uint(data, tarantool_version_id()); + data = mp_encode_uint(data, IPROTO_ANON); + data = mp_encode_bool(data, anonymous); + assert(data <= buf + size); row->body[0].iov_base = buf; row->body[0].iov_len = (data - buf); @@ -902,7 +905,7 @@ xrow_encode_subscribe(struct xrow_header *row, int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *version_id) + uint32_t *version_id, bool *anonymous) { if (row->bodycnt == 0) { diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); @@ -961,6 +964,16 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, } *version_id = mp_decode_uint(&d); break; + case IPROTO_ANON: + if (anonymous == NULL) + goto skip; + if (mp_typeof(*d) != MP_BOOL) { + diag_set(ClientError, ER_INVALID_MSGPACK, + "invalid ANONYMOUS"); + return -1; + } + *anonymous = mp_decode_bool(&d); + break; default: skip: mp_next(&d); /* value */ } @@ -989,7 +1002,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, } int -xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid) +xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid, + bool anonymous) { memset(row, 0, sizeof(*row)); @@ -1000,10 +1014,13 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid) return -1; } char *data = buf; - data = mp_encode_map(data, 1); + data = mp_encode_map(data, 2); data = mp_encode_uint(data, IPROTO_INSTANCE_UUID); /* Greet the remote replica with our replica UUID */ data = xrow_encode_uuid(data, instance_uuid); + /* Add flag signaling about our wish to be anonymous. */ + data = mp_encode_uint(data, IPROTO_ANON); + data = mp_encode_bool(data, anonymous); assert(data <= buf + size); row->body[0].iov_base = buf; diff --git a/src/box/xrow.h b/src/box/xrow.h index 0ca7152..71a0ff9 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -248,6 +248,7 @@ xrow_encode_request_vote(struct xrow_header *row); * @param replicaset_uuid Replica set uuid. * @param instance_uuid Instance uuid. * @param vclock Replication clock. + * @param anonymous anonymous flag. * * @retval 0 Success. * @retval -1 Memory error. @@ -256,7 +257,7 @@ int xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, - const struct vclock *vclock); + const struct vclock *vclock, bool anonymous); /** * Decode SUBSCRIBE command. @@ -264,6 +265,8 @@ xrow_encode_subscribe(struct xrow_header *row, * @param[out] replicaset_uuid. * @param[out] instance_uuid. * @param[out] vclock. + * @param[out] version_id. + * @param[out] anonymous. * * @retval 0 Success. * @retval -1 Memory or format error. @@ -271,31 +274,36 @@ xrow_encode_subscribe(struct xrow_header *row, int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *version_id); + uint32_t *version_id, bool *anonymous); /** * Encode JOIN command. * @param[out] row Row to encode into. * @param instance_uuid. + * @param anonymous - flag signaling if joining replica wants to be anonymous. * * @retval 0 Success. * @retval -1 Memory error. */ int -xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid); +xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid, + bool anonymous); /** * Decode JOIN command. * @param row Row to decode. * @param[out] instance_uuid. + * @param[out] anonymous flag. * * @retval 0 Success. * @retval -1 Memory or format error. */ static inline int -xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid) +xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid, + bool *anonymous) { - return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL); + return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, + anonymous); } /** @@ -320,7 +328,7 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock); static inline int xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock) { - return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL); + return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL); } /** @@ -613,10 +621,10 @@ static inline void xrow_encode_subscribe_xc(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, - const struct vclock *vclock) + const struct vclock *vclock, bool anonymous) { if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid, - vclock) != 0) + vclock, anonymous) != 0) diag_raise(); } @@ -625,27 +633,28 @@ static inline void xrow_decode_subscribe_xc(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *replica_version_id) + uint32_t *replica_version_id, bool *anonymous) { if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid, - vclock, replica_version_id) != 0) + vclock, replica_version_id, anonymous) != 0) diag_raise(); } /** @copydoc xrow_encode_join. */ static inline void xrow_encode_join_xc(struct xrow_header *row, - const struct tt_uuid *instance_uuid) + const struct tt_uuid *instance_uuid, bool anonymous) { - if (xrow_encode_join(row, instance_uuid) != 0) + if (xrow_encode_join(row, instance_uuid, anonymous) != 0) diag_raise(); } /** @copydoc xrow_decode_join. */ static inline void -xrow_decode_join_xc(struct xrow_header *row, struct tt_uuid *instance_uuid) +xrow_decode_join_xc(struct xrow_header *row, struct tt_uuid *instance_uuid, + bool *anonymous) { - if (xrow_decode_join(row, instance_uuid) != 0) + if (xrow_decode_join(row, instance_uuid, anonymous) != 0) diag_raise(); } diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result index 9d3bc85..1160d63 100644 --- a/test/app-tap/init_script.result +++ b/test/app-tap/init_script.result @@ -23,29 +23,30 @@ box.cfg 18 pid_file:box.pid 19 read_only:false 20 readahead:16320 -21 replication_connect_timeout:4 -22 replication_sync_lag:10 -23 replication_timeout:1 -24 rows_per_wal:500000 -25 slab_alloc_factor:1.05 -26 too_long_threshold:0.5 -27 vinyl_bloom_fpr:0.05 -28 vinyl_cache:134217728 -29 vinyl_dir:. -30 vinyl_max_tuple_size:1048576 -31 vinyl_memory:134217728 -32 vinyl_page_size:8192 -33 vinyl_range_size:1073741824 -34 vinyl_read_threads:1 -35 vinyl_run_count_per_level:2 -36 vinyl_run_size_ratio:3.5 -37 vinyl_timeout:60 -38 vinyl_write_threads:2 -39 wal_dir:. -40 wal_dir_rescan_delay:2 -41 wal_max_size:268435456 -42 wal_mode:write -43 worker_pool_threads:4 +21 replication_anon:false +22 replication_connect_timeout:4 +23 replication_sync_lag:10 +24 replication_timeout:1 +25 rows_per_wal:500000 +26 slab_alloc_factor:1.05 +27 too_long_threshold:0.5 +28 vinyl_bloom_fpr:0.05 +29 vinyl_cache:134217728 +30 vinyl_dir:. +31 vinyl_max_tuple_size:1048576 +32 vinyl_memory:134217728 +33 vinyl_page_size:8192 +34 vinyl_range_size:1073741824 +35 vinyl_read_threads:1 +36 vinyl_run_count_per_level:2 +37 vinyl_run_size_ratio:3.5 +38 vinyl_timeout:60 +39 vinyl_write_threads:2 +40 wal_dir:. +41 wal_dir_rescan_delay:2 +42 wal_max_size:268435456 +43 wal_mode:write +44 worker_pool_threads:4 -- -- Test insert from detached fiber -- diff --git a/test/box-tap/cfg.test.lua b/test/box-tap/cfg.test.lua index 8496929..116f27e 100755 --- a/test/box-tap/cfg.test.lua +++ b/test/box-tap/cfg.test.lua @@ -6,7 +6,7 @@ local socket = require('socket') local fio = require('fio') local uuid = require('uuid') local msgpack = require('msgpack') -test:plan(91) +test:plan(92) -------------------------------------------------------------------------------- -- Invalid values @@ -46,6 +46,7 @@ invalid('vinyl_run_count_per_level', 0) invalid('vinyl_run_size_ratio', 1) invalid('vinyl_bloom_fpr', 0) invalid('vinyl_bloom_fpr', 1.1) +invalid("replication_anon", true) local function invalid_combinations(name, val) local status, result = pcall(box.cfg, val) diff --git a/test/box/admin.result b/test/box/admin.result index ff5a9ef..9eb74a7 100644 --- a/test/box/admin.result +++ b/test/box/admin.result @@ -66,6 +66,8 @@ cfg_filter(box.cfg) - false - - readahead - 16320 + - - replication_anon + - false - - replication_connect_timeout - 4 - - replication_sync_lag diff --git a/test/box/cfg.result b/test/box/cfg.result index b11346d..8b5dd66 100644 --- a/test/box/cfg.result +++ b/test/box/cfg.result @@ -54,6 +54,8 @@ cfg_filter(box.cfg) - false - - readahead - 16320 + - - replication_anon + - false - - replication_connect_timeout - 4 - - replication_sync_lag @@ -147,6 +149,8 @@ cfg_filter(box.cfg) - false - - readahead - 16320 + - - replication_anon + - false - - replication_connect_timeout - 4 - - replication_sync_lag diff --git a/test/replication/replica_anon.lua b/test/replication/replica_anon.lua new file mode 100644 index 0000000..2846441 --- /dev/null +++ b/test/replication/replica_anon.lua @@ -0,0 +1,11 @@ +#!/usr/bin/env tarantool + +box.cfg({ + listen = os.getenv("LISTEN"), + replication = os.getenv("MASTER"), + memtx_memory = 107374182, + read_only = true, + replication_anon = true +}) + +require('console').listen(os.getenv('ADMIN')) diff --git a/test/replication/status.result b/test/replication/status.result index 8394b98..268b5fc 100644 --- a/test/replication/status.result +++ b/test/replication/status.result @@ -86,6 +86,10 @@ test_run:cmd("create server replica with rpl_master=default, script='replication --- - true ... +test_run:cmd("create server replica_anon with rpl_master=default, script='replication/replica_anon.lua'") +--- +- true +... test_run:cmd("start server replica") --- - true @@ -144,6 +148,10 @@ master.downstream == nil --- - true ... +master.anonymous == false +--- +- true +... -- replica's status replica_id = test_run:get_server_id('replica') --- @@ -238,6 +246,10 @@ master.downstream == nil --- - true ... +master.anonymous == false +--- +- true +... -- replica's status replica_id = box.info.id --- @@ -275,6 +287,181 @@ replica.downstream == nil --- - true ... +replica.anonymous == false +--- +- true +... +test_run:cmd('switch default') +--- +- true +... +-- Start anonymous replica +test_run:cmd("start server replica_anon") +--- +- true +... +anon_init_id = 1073741823 +--- +... +#box.info.vclock == 1 -- box.info.vclock[replica_id] is nil +--- +- true +... +box.info.replication[anon_init_id] ~= nil +--- +- true +... +box.space._cluster:count() == 2 +--- +- true +... +-- master's status +box.info.vclock[master_id] == 2 -- grant + registration == 2 +--- +- true +... +box.info.lsn == box.info.vclock[master_id] +--- +- true +... +master = box.info.replication[master_id] +--- +... +master.id == master_id +--- +- true +... +master.uuid == box.space._cluster:get(master_id)[2] +--- +- true +... +master.lsn == box.info.vclock[master_id] +--- +- true +... +master.upstream == nil +--- +- true +... +master.downstream == nil +--- +- true +... +master.anonymous == false +--- +- true +... +-- anon replica's status +box.info.vclock[anon_init_id] == nil +--- +- true +... +replica = box.info.replication[anon_init_id] +--- +... +replica.id == anon_init_id +--- +- true +... +box.space._cluster:get(anon_init_id) +--- +... +-- replica.lsn == box.info.vclock[anon_init_id] +replica.lsn == 0 +--- +- true +... +replica.upstream == nil +--- +- true +... +replica.downstream.vclock[master_id] == box.info.vclock[master_id] +--- +- true +... +replica.downstream.vclock[anon_init_id] == box.info.vclock[anon_init_id] +--- +- true +... +replica.anonymous == true +--- +- true +... +test_run:cmd('switch replica_anon') +--- +- true +... +#box.info.vclock == 1 -- box.info.vclock[replica_id] is nil +--- +- true +... +#box.info.replication == 2 +--- +- true +... +box.space._cluster:count() == 2 +--- +- true +... +-- master's status +master_id = test_run:get_server_id('default') +--- +... +box.info.vclock[master_id] == 2 +--- +- true +... +master = box.info.replication[master_id] +--- +... +master.id == master_id +--- +- true +... +master.uuid == box.space._cluster:get(master_id)[2] +--- +- true +... +master.upstream.status == "follow" +--- +- true +... +master.upstream.lag < 1 +--- +- true +... +master.upstream.idle < 1 +--- +- true +... +master.upstream.peer:match("localhost") +--- +- localhost +... +master.downstream == nil +--- +- true +... +master.anonymous == false +--- +- true +... +replica_id = box.info.id +--- +... +box.info.vclock[replica_id] == nil +--- +- true +... +box.info.lsn == 0 +--- +- false +... +-- there is no information about itself on anon replica +box.info.replication[replica_id] == nil +--- +- true +... -- -- ClientError during replication -- @@ -377,6 +564,27 @@ test_run:cmd('switch default') --- - true ... +-- check subsribe from anonymous after master restart +test_run:cmd('switch default') +--- +- true +... +test_run:cmd("stop server replica_anon") +--- +- true +... +test_run:cmd("restart server default") +test_run:cmd("start server replica_anon") +--- +- true +... +anon_init_id = 1073741823 +--- +... +box.info.replication[anon_init_id] ~= nil +--- +- true +... -- -- Cleanup -- @@ -387,7 +595,15 @@ test_run:cmd("stop server replica") --- - true ... +test_run:cmd("stop server replica_anon") +--- +- true +... test_run:cmd("cleanup server replica") --- - true ... +test_run:cmd("cleanup server replica_anon") +--- +- true +... diff --git a/test/replication/status.test.lua b/test/replication/status.test.lua index 8bb25e0..690f0aa 100644 --- a/test/replication/status.test.lua +++ b/test/replication/status.test.lua @@ -33,6 +33,7 @@ master.downstream == nil -- Start Master -> Slave replication test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +test_run:cmd("create server replica_anon with rpl_master=default, script='replication/replica_anon.lua'") test_run:cmd("start server replica") -- @@ -54,6 +55,7 @@ master.uuid == box.space._cluster:get(master_id)[2] master.lsn == box.info.vclock[master_id] master.upstream == nil master.downstream == nil +master.anonymous == false -- replica's status replica_id = test_run:get_server_id('replica') @@ -87,6 +89,7 @@ master.upstream.lag < 1 master.upstream.idle < 1 master.upstream.peer:match("localhost") master.downstream == nil +master.anonymous == false -- replica's status replica_id = box.info.id @@ -100,6 +103,63 @@ replica.uuid == box.space._cluster:get(replica_id)[2] replica.lsn == 0 replica.upstream == nil replica.downstream == nil +replica.anonymous == false + +test_run:cmd('switch default') +-- Start anonymous replica +test_run:cmd("start server replica_anon") +anon_init_id = 1073741823 +#box.info.vclock == 1 -- box.info.vclock[replica_id] is nil +box.info.replication[anon_init_id] ~= nil +box.space._cluster:count() == 2 + +-- master's status +box.info.vclock[master_id] == 2 -- grant + registration == 2 +box.info.lsn == box.info.vclock[master_id] +master = box.info.replication[master_id] +master.id == master_id +master.uuid == box.space._cluster:get(master_id)[2] +master.lsn == box.info.vclock[master_id] +master.upstream == nil +master.downstream == nil +master.anonymous == false + +-- anon replica's status + +box.info.vclock[anon_init_id] == nil +replica = box.info.replication[anon_init_id] +replica.id == anon_init_id +box.space._cluster:get(anon_init_id) +-- replica.lsn == box.info.vclock[anon_init_id] +replica.lsn == 0 +replica.upstream == nil +replica.downstream.vclock[master_id] == box.info.vclock[master_id] +replica.downstream.vclock[anon_init_id] == box.info.vclock[anon_init_id] +replica.anonymous == true + +test_run:cmd('switch replica_anon') +#box.info.vclock == 1 -- box.info.vclock[replica_id] is nil +#box.info.replication == 2 +box.space._cluster:count() == 2 + +-- master's status +master_id = test_run:get_server_id('default') +box.info.vclock[master_id] == 2 +master = box.info.replication[master_id] +master.id == master_id +master.uuid == box.space._cluster:get(master_id)[2] +master.upstream.status == "follow" +master.upstream.lag < 1 +master.upstream.idle < 1 +master.upstream.peer:match("localhost") +master.downstream == nil +master.anonymous == false + +replica_id = box.info.id +box.info.vclock[replica_id] == nil +box.info.lsn == 0 +-- there is no information about itself on anon replica +box.info.replication[replica_id] == nil -- -- ClientError during replication @@ -135,10 +195,19 @@ master.upstream.peer:match("localhost") master.downstream == nil test_run:cmd('switch default') +-- check subsribe from anonymous after master restart +test_run:cmd('switch default') +test_run:cmd("stop server replica_anon") +test_run:cmd("restart server default") +test_run:cmd("start server replica_anon") +anon_init_id = 1073741823 +box.info.replication[anon_init_id] ~= nil -- -- Cleanup -- box.schema.user.revoke('guest', 'replication') test_run:cmd("stop server replica") +test_run:cmd("stop server replica_anon") test_run:cmd("cleanup server replica") +test_run:cmd("cleanup server replica_anon") -- 2.7.4