From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 5E7862DA7C for ; Sat, 13 Oct 2018 12:54:59 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id R7M71KwzXSio for ; Sat, 13 Oct 2018 12:54:59 -0400 (EDT) Received: from smtp35.i.mail.ru (smtp35.i.mail.ru [94.100.177.95]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id A59F72DA57 for ; Sat, 13 Oct 2018 12:54:58 -0400 (EDT) From: Serge Petrenko Subject: [tarantool-patches] [PATCH] [replication] introduce anonymous replicas. Date: Sat, 13 Oct 2018 19:54:49 +0300 Message-Id: <20181013165449.29849-1-sergepetrenko@tarantool.org> Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: georgy@tarantool.org Cc: tarantool-patches@freelists.org, Serge Petrenko This patch makes it possible for a replica to join/subscribe anonymously, i.e. without being added to the _cluster table. This allows to have more than VCLOCK_MAX (currently 32) replicas in a replicaset, with a condition that some of them must be read only. This is achieved by introducing a new field to JOIN/SUBSCRIBE requests: IPROTO_REPLICA_ANON. Upon recieving a request with the option set to true, master adds the newly connected replica to a replica hash, but does not register it in _cluster table and does not assign an id to it. The replica still appears in `box.info.replication` with 'anonymous` parameter set to true. To make replica send 'anonymous' JOIN/SUBSCRIBE requests, a new config option is introduced: `replica_anon`. It may only be set to true together with `read_only`. Closes #3186 --- https://github.com/tarantool/tarantool/issues/3186 https://github.com/tarantool/tarantool/tree/sp/gh-3186-anon-replica src/box/alter.cc | 2 +- src/box/applier.cc | 4 +- src/box/applier.h | 1 + src/box/box.cc | 55 +++++++++++-- src/box/box.h | 2 + src/box/iproto_constants.h | 1 + src/box/lua/info.c | 35 +++++++-- src/box/lua/load_cfg.lua | 2 + src/box/relay.cc | 2 +- src/box/replication.cc | 16 +++- src/box/replication.h | 11 ++- src/box/xrow.c | 26 +++++-- src/box/xrow.h | 35 +++++---- test/app-tap/init_script.result | 51 ++++++------ test/box/admin.result | 2 + test/box/cfg.result | 4 + test/replication/replica_anon.lua | 9 +++ test/replication/replica_anon.result | 103 +++++++++++++++++++++++++ test/replication/replica_anon.test.lua | 41 ++++++++++ 19 files changed, 337 insertions(+), 65 deletions(-) create mode 100644 test/replication/replica_anon.lua create mode 100644 test/replication/replica_anon.result create mode 100644 test/replication/replica_anon.test.lua diff --git a/src/box/alter.cc b/src/box/alter.cc index 4ac44c240..a26bd7edf 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -2916,7 +2916,7 @@ on_commit_dd_cluster(struct trigger *trigger, void *event) replica_set_id(replica, id); } else { try { - replica = replicaset_add(id, &uuid); + replica = replicaset_add(id, &uuid, false); /* Can't throw exceptions from on_commit trigger */ } catch(Exception *e) { panic("Can't register replica: %s", e->errmsg); diff --git a/src/box/applier.cc b/src/box/applier.cc index 7da278e68..5ac0134aa 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -278,7 +278,7 @@ applier_join(struct applier *applier) struct ev_io *coio = &applier->io; struct ibuf *ibuf = &applier->ibuf; struct xrow_header row; - xrow_encode_join_xc(&row, &INSTANCE_UUID); + xrow_encode_join_xc(&row, &INSTANCE_UUID, replica_anon); coio_write_xrow(coio, &row); /** @@ -388,7 +388,7 @@ applier_subscribe(struct applier *applier) struct vclock remote_vclock_at_subscribe; xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, - &replicaset.vclock); + &replicaset.vclock, replica_anon); coio_write_xrow(coio, &row); /* Read SUBSCRIBE response */ diff --git a/src/box/applier.h b/src/box/applier.h index 5a9c40fc8..5d5cbf978 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -42,6 +42,7 @@ #include "trivia/util.h" #include "tt_uuid.h" #include "uri.h" +#include "box.h" /* replica_anon */ #include "xrow.h" diff --git a/src/box/box.cc b/src/box/box.cc index 7e32b9fc7..38d8b5db4 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -109,6 +109,9 @@ static struct gc_checkpoint_ref backup_gc; */ static bool is_box_configured = false; static bool is_ro = true; + +bool replica_anon = false; + static fiber_cond ro_cond; /** @@ -217,6 +220,10 @@ process_nop(struct request *request) void box_set_ro(bool ro) { + if(replica_anon && !ro) { + tnt_raise(ClientError, ER_CFG, "read_only", + "instance is anonymous and cannot become writeable."); + } is_ro = ro; fiber_cond_broadcast(&ro_cond); } @@ -465,6 +472,17 @@ box_check_replication_sync_timeout(void) return timeout; } +static bool +box_check_replica_anon(void) +{ + bool is_anon = cfg_geti("replica_anon") != 0; + if (is_anon && !is_ro) { + tnt_raise(ClientError, ER_CFG, "replica_anon", + "only a read-only instance may be anonymous."); + } + return is_anon; +} + static void box_check_instance_uuid(struct tt_uuid *uuid) { @@ -745,6 +763,12 @@ box_set_replication_skip_conflict(void) replication_skip_conflict = cfg_geti("replication_skip_conflict"); } +void +box_set_replica_anon(void) +{ + replica_anon = box_check_replica_anon(); +} + void box_listen(void) { @@ -1419,7 +1443,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header) /* Decode JOIN request */ struct tt_uuid instance_uuid = uuid_nil; - xrow_decode_join_xc(header, &instance_uuid); + bool anon; + xrow_decode_join_xc(header, &instance_uuid, &anon); /* Check that bootstrap has been finished */ if (!is_box_configured) @@ -1439,7 +1464,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header) * appropriate access privileges. */ struct replica *replica = replica_by_uuid(&instance_uuid); - if (replica == NULL || replica->id == REPLICA_ID_NIL) { + if ((replica == NULL || replica->id == REPLICA_ID_NIL) && !anon) { box_check_writable_xc(); struct space *space = space_cache_find_xc(BOX_CLUSTER_ID); access_check_space_xc(space, PRIV_W); @@ -1492,7 +1517,15 @@ box_process_join(struct ev_io *io, struct xrow_header *header) * sending OK - if the hook fails, the error reaches the * client. */ - box_on_join(&instance_uuid); + if (!anon) + box_on_join(&instance_uuid); + else { + /* + * In case of an anonymous join manually add a + * replica to replica hash. + */ + replicaset_add(REPLICA_ID_NIL, &instance_uuid, true); + } replica = replica_by_uuid(&instance_uuid); assert(replica != NULL); @@ -1543,8 +1576,9 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) struct vclock replica_clock; uint32_t replica_version_id; vclock_create(&replica_clock); + bool anon; xrow_decode_subscribe_xc(header, &replicaset_uuid, &replica_uuid, - &replica_clock, &replica_version_id); + &replica_clock, &replica_version_id, &anon); /* Forbid connection to itself */ if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID)) @@ -1567,11 +1601,19 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) /* Check replica uuid */ struct replica *replica = replica_by_uuid(&replica_uuid); - if (replica == NULL || replica->id == REPLICA_ID_NIL) { + if ((replica == NULL || replica->id == REPLICA_ID_NIL) && !anon) { tnt_raise(ClientError, ER_UNKNOWN_REPLICA, tt_uuid_str(&replica_uuid), tt_uuid_str(&REPLICASET_UUID)); } + /* + * This is an anonymous subscribe. We have to manually add a + * replica to replica hash. + */ + if (replica == NULL) { + replicaset_add(REPLICA_ID_NIL, &replica_uuid, true); + replica = replica_by_uuid(&replica_uuid); + } /* Don't allow multiple relays for the same replica */ if (relay_get_state(replica->relay) == RELAY_FOLLOW) { @@ -2051,6 +2093,7 @@ box_cfg_xc(void) box_set_replication_sync_lag(); box_set_replication_sync_timeout(); box_set_replication_skip_conflict(); + box_set_replica_anon(); xstream_create(&join_stream, apply_initial_join_row); xstream_create(&subscribe_stream, apply_row); @@ -2086,7 +2129,7 @@ box_cfg_xc(void) /* Check for correct registration of the instance in _cluster */ { struct replica *self = replica_by_uuid(&INSTANCE_UUID); - if (self == NULL || self->id == REPLICA_ID_NIL) { + if ((self == NULL || self->id == REPLICA_ID_NIL) && !replica_anon) { tnt_raise(ClientError, ER_UNKNOWN_REPLICA, tt_uuid_str(&INSTANCE_UUID), tt_uuid_str(&REPLICASET_UUID)); diff --git a/src/box/box.h b/src/box/box.h index 9930d4a1a..42fe7845f 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -116,6 +116,8 @@ extern bool box_checkpoint_is_in_progress; /** Incremented with each next snapshot. */ extern uint32_t snapshot_version; +extern bool replica_anon; + /** * Iterate over all spaces and save them to the * snapshot file. diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 404f97a2e..ccb1599a2 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -79,6 +79,7 @@ enum iproto_key { IPROTO_OPS = 0x28, /* UPSERT but not UPDATE ops, because of legacy */ IPROTO_BALLOT = 0x29, IPROTO_TUPLE_META = 0x2a, + IPROTO_REPLICA_ANON = 0x2b, /* Leave a gap between request keys and response keys */ IPROTO_DATA = 0x30, IPROTO_ERROR = 0x31, diff --git a/src/box/lua/info.c b/src/box/lua/info.c index 655768ec4..693b4674d 100644 --- a/src/box/lua/info.c +++ b/src/box/lua/info.c @@ -132,18 +132,27 @@ lbox_pushreplica(lua_State *L, struct replica *replica) /* 16 is used to get the best visual experience in YAML output */ lua_createtable(L, 0, 16); + /* An anonymous replica has a zero id. */ + if (!replica->is_anon) + { + lua_pushstring(L, "id"); + lua_pushinteger(L, replica->id); + lua_settable(L, -3); + } - lua_pushstring(L, "id"); - lua_pushinteger(L, replica->id); + lua_pushstring(L, "anonymous"); + lua_pushboolean(L, replica->is_anon); lua_settable(L, -3); lua_pushstring(L, "uuid"); lua_pushstring(L, tt_uuid_str(&replica->uuid)); lua_settable(L, -3); - - lua_pushstring(L, "lsn"); - luaL_pushuint64(L, vclock_get(&replicaset.vclock, replica->id)); - lua_settable(L, -3); + /* Anonymous replica's lsn isn't added to the vclock. */ + if (!replica->is_anon) { + lua_pushstring(L, "lsn"); + luaL_pushuint64(L, vclock_get(&replicaset.vclock, replica->id)); + lua_settable(L, -3); + } if (applier != NULL && applier->state != APPLIER_OFF) { lua_pushstring(L, "upstream"); @@ -185,6 +194,7 @@ lbox_info_replication(struct lua_State *L) lua_setfield(L, -2, "__serialize"); lua_setmetatable(L, -2); + uint32_t anon_ctr = 0; replicaset_foreach(replica) { /* Applier hasn't received replica id yet */ if (replica->id == REPLICA_ID_NIL) @@ -193,6 +203,19 @@ lbox_info_replication(struct lua_State *L) lbox_pushreplica(L, replica); lua_rawseti(L, -2, replica->id); + /* + * Needed to start enumerating anonymous replicas + * after the usual ones. + */ + anon_ctr = MAX(anon_ctr, replica->id); + } + replicaset_foreach(replica) { + if (!replica->is_anon) + continue; + + lbox_pushreplica(L, replica); + + lua_rawseti(L, -2, ++anon_ctr); } return 1; diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index f62f4dc1e..ddf0c0c20 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -76,6 +76,7 @@ local default_cfg = { replication_connect_timeout = 30, replication_connect_quorum = nil, -- connect all replication_skip_conflict = false, + replica_anon = false, feedback_enabled = true, feedback_host = "https://feedback.tarantool.io", feedback_interval = 3600, @@ -138,6 +139,7 @@ local template_cfg = { replication_connect_timeout = 'number', replication_connect_quorum = 'number', replication_skip_conflict = 'boolean', + replica_anon = 'boolean', feedback_enabled = 'boolean', feedback_host = 'string', feedback_interval = 'number', diff --git a/src/box/relay.cc b/src/box/relay.cc index d5df487eb..0182cb833 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -583,7 +583,7 @@ void relay_subscribe(struct replica *replica, int fd, uint64_t sync, struct vclock *replica_clock, uint32_t replica_version_id) { - assert(replica->id != REPLICA_ID_NIL); + assert(replica->id != REPLICA_ID_NIL || replica->is_anon); struct relay *relay = replica->relay; assert(relay->state != RELAY_FOLLOW); /* diff --git a/src/box/replication.cc b/src/box/replication.cc index 2cb4ec0f8..864f03482 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -150,6 +150,7 @@ replica_new(void) replica->uuid = uuid_nil; replica->applier = NULL; replica->gc = NULL; + replica->is_anon = false; rlist_create(&replica->in_anon); trigger_create(&replica->on_applier_state, replica_on_applier_state_f, NULL, NULL); @@ -170,17 +171,26 @@ replica_delete(struct replica *replica) free(replica); } +static void +replica_set_anon(struct replica *replica, bool is_anon) +{ + replica->is_anon = is_anon; +} + struct replica * -replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid) +replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid, bool is_anon) { assert(!tt_uuid_is_nil(replica_uuid)); - assert(replica_id != REPLICA_ID_NIL && replica_id < VCLOCK_MAX); + assert(replica_id != REPLICA_ID_NIL && replica_id < VCLOCK_MAX || + is_anon && replica_id == REPLICA_ID_NIL); assert(replica_by_uuid(replica_uuid) == NULL); struct replica *replica = replica_new(); + replica_set_anon(replica, is_anon); replica->uuid = *replica_uuid; replica_hash_insert(&replicaset.hash, replica); - replica_set_id(replica, replica_id); + if (!is_anon) + replica_set_id(replica, replica_id); return replica; } diff --git a/src/box/replication.h b/src/box/replication.h index 2ac620d86..4cf2edc03 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -274,6 +274,12 @@ struct replica { enum applier_state applier_sync_state; /* The latch is used to order replication requests. */ struct latch order_latch; + /** + * A flag indicating that the replica is anonymous, + * i.e. is joined/subscribed without being added + * to the _cluster table. + */ + bool is_anon; }; enum { @@ -343,10 +349,11 @@ replica_check_id(uint32_t replica_id); * Register the universally unique identifier of a remote replica and * a matching replica-set-local identifier in the _cluster registry. * Called from on_replace_dd_cluster() when a remote master joins the - * replica set. + * replica set or from box_process_join() upon an anonymous join. */ struct replica * -replicaset_add(uint32_t replica_id, const struct tt_uuid *instance_uuid); +replicaset_add(uint32_t replica_id, const struct tt_uuid *instance_uuid, + bool is_anon); /** * Try to connect appliers to remote peers and receive UUID. diff --git a/src/box/xrow.c b/src/box/xrow.c index f12b27ace..8d0698412 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -968,7 +968,8 @@ int xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, - const struct vclock *vclock) + const struct vclock *vclock, + bool is_anon) { memset(row, 0, sizeof(*row)); size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock); @@ -978,7 +979,7 @@ xrow_encode_subscribe(struct xrow_header *row, return -1; } char *data = buf; - data = mp_encode_map(data, 4); + data = mp_encode_map(data, 5); data = mp_encode_uint(data, IPROTO_CLUSTER_UUID); data = xrow_encode_uuid(data, replicaset_uuid); data = mp_encode_uint(data, IPROTO_INSTANCE_UUID); @@ -987,6 +988,8 @@ xrow_encode_subscribe(struct xrow_header *row, data = mp_encode_vclock(data, vclock); data = mp_encode_uint(data, IPROTO_SERVER_VERSION); data = mp_encode_uint(data, tarantool_version_id()); + data = mp_encode_uint(data, IPROTO_REPLICA_ANON); + data = mp_encode_bool(data, is_anon); assert(data <= buf + size); row->body[0].iov_base = buf; row->body[0].iov_len = (data - buf); @@ -998,7 +1001,7 @@ xrow_encode_subscribe(struct xrow_header *row, int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *version_id) + uint32_t *version_id, bool *is_anon) { if (row->bodycnt == 0) { diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); @@ -1054,6 +1057,16 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, } *version_id = mp_decode_uint(&d); break; + case IPROTO_REPLICA_ANON: + if (is_anon == NULL) + goto skip; + if (mp_typeof(*d) != MP_BOOL) { + diag_set(ClientError, ER_INVALID_MSGPACK, + "invalid ANON"); + return -1; + } + *is_anon = mp_decode_bool(&d); + break; default: skip: mp_next(&d); /* value */ } @@ -1062,7 +1075,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, } int -xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid) +xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid, + bool is_anon) { memset(row, 0, sizeof(*row)); @@ -1073,10 +1087,12 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid) return -1; } char *data = buf; - data = mp_encode_map(data, 1); + data = mp_encode_map(data, 2); data = mp_encode_uint(data, IPROTO_INSTANCE_UUID); /* Greet the remote replica with our replica UUID */ data = xrow_encode_uuid(data, instance_uuid); + data = mp_encode_uint(data, IPROTO_REPLICA_ANON); + data = mp_encode_bool(data, is_anon); assert(data <= buf + size); row->body[0].iov_base = buf; diff --git a/src/box/xrow.h b/src/box/xrow.h index 3fc007a8d..2e9bb0130 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -257,6 +257,7 @@ xrow_encode_vote(struct xrow_header *row); * @param replicaset_uuid Replica set uuid. * @param instance_uuid Instance uuid. * @param vclock Replication clock. + * @param is_anon Whether to encode an anonymous SUBSCRIBE. * * @retval 0 Success. * @retval -1 Memory error. @@ -265,7 +266,7 @@ int xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, - const struct vclock *vclock); + const struct vclock *vclock, bool is_anon); /** * Decode SUBSCRIBE command. @@ -274,6 +275,7 @@ xrow_encode_subscribe(struct xrow_header *row, * @param[out] instance_uuid. * @param[out] vclock. * @param[out] version_id. + * @param[out] is_anon. * * @retval 0 Success. * @retval -1 Memory or format error. @@ -281,31 +283,35 @@ xrow_encode_subscribe(struct xrow_header *row, int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *version_id); + uint32_t *version_id, bool *is_anon); /** * Encode JOIN command. * @param[out] row Row to encode into. * @param instance_uuid. + * @param is_anon Whether to encode an anonymous JOIN. * * @retval 0 Success. * @retval -1 Memory error. */ int -xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid); +xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid, + bool is_anon); /** * Decode JOIN command. * @param row Row to decode. * @param[out] instance_uuid. + * @param[out] is_anon. * * @retval 0 Success. * @retval -1 Memory or format error. */ static inline int -xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid) +xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid, + bool *is_anon) { - return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL); + return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, is_anon); } /** @@ -330,7 +336,7 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock); static inline int xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock) { - return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL); + return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL); } /** @@ -644,10 +650,10 @@ static inline void xrow_encode_subscribe_xc(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, - const struct vclock *vclock) + const struct vclock *vclock, bool is_anon) { if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid, - vclock) != 0) + vclock, is_anon) != 0) diag_raise(); } @@ -656,27 +662,28 @@ static inline void xrow_decode_subscribe_xc(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *replica_version_id) + uint32_t *replica_version_id, bool *is_anon) { if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid, - vclock, replica_version_id) != 0) + vclock, replica_version_id, is_anon) != 0) diag_raise(); } /** @copydoc xrow_encode_join. */ static inline void xrow_encode_join_xc(struct xrow_header *row, - const struct tt_uuid *instance_uuid) + const struct tt_uuid *instance_uuid, bool is_anon) { - if (xrow_encode_join(row, instance_uuid) != 0) + if (xrow_encode_join(row, instance_uuid, is_anon) != 0) diag_raise(); } /** @copydoc xrow_decode_join. */ static inline void -xrow_decode_join_xc(struct xrow_header *row, struct tt_uuid *instance_uuid) +xrow_decode_join_xc(struct xrow_header *row, struct tt_uuid *instance_uuid, + bool *is_anon) { - if (xrow_decode_join(row, instance_uuid) != 0) + if (xrow_decode_join(row, instance_uuid, is_anon) != 0) diag_raise(); } diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result index a2191d15b..a0c802504 100644 --- a/test/app-tap/init_script.result +++ b/test/app-tap/init_script.result @@ -25,31 +25,32 @@ box.cfg 20 pid_file:box.pid 21 read_only:false 22 readahead:16320 -23 replication_connect_timeout:30 -24 replication_skip_conflict:false -25 replication_sync_lag:10 -26 replication_sync_timeout:300 -27 replication_timeout:1 -28 rows_per_wal:500000 -29 slab_alloc_factor:1.05 -30 too_long_threshold:0.5 -31 vinyl_bloom_fpr:0.05 -32 vinyl_cache:134217728 -33 vinyl_dir:. -34 vinyl_max_tuple_size:1048576 -35 vinyl_memory:134217728 -36 vinyl_page_size:8192 -37 vinyl_range_size:1073741824 -38 vinyl_read_threads:1 -39 vinyl_run_count_per_level:2 -40 vinyl_run_size_ratio:3.5 -41 vinyl_timeout:60 -42 vinyl_write_threads:4 -43 wal_dir:. -44 wal_dir_rescan_delay:2 -45 wal_max_size:268435456 -46 wal_mode:write -47 worker_pool_threads:4 +23 replica_anon:false +24 replication_connect_timeout:30 +25 replication_skip_conflict:false +26 replication_sync_lag:10 +27 replication_sync_timeout:300 +28 replication_timeout:1 +29 rows_per_wal:500000 +30 slab_alloc_factor:1.05 +31 too_long_threshold:0.5 +32 vinyl_bloom_fpr:0.05 +33 vinyl_cache:134217728 +34 vinyl_dir:. +35 vinyl_max_tuple_size:1048576 +36 vinyl_memory:134217728 +37 vinyl_page_size:8192 +38 vinyl_range_size:1073741824 +39 vinyl_read_threads:1 +40 vinyl_run_count_per_level:2 +41 vinyl_run_size_ratio:3.5 +42 vinyl_timeout:60 +43 vinyl_write_threads:4 +44 wal_dir:. +45 wal_dir_rescan_delay:2 +46 wal_max_size:268435456 +47 wal_mode:write +48 worker_pool_threads:4 -- -- Test insert from detached fiber -- diff --git a/test/box/admin.result b/test/box/admin.result index 8048460a1..d84df7a56 100644 --- a/test/box/admin.result +++ b/test/box/admin.result @@ -62,6 +62,8 @@ cfg_filter(box.cfg) - false - - readahead - 16320 + - - replica_anon + - false - - replication_connect_timeout - 30 - - replication_skip_conflict diff --git a/test/box/cfg.result b/test/box/cfg.result index 515033754..3078f7fad 100644 --- a/test/box/cfg.result +++ b/test/box/cfg.result @@ -58,6 +58,8 @@ cfg_filter(box.cfg) - false - - readahead - 16320 + - - replica_anon + - false - - replication_connect_timeout - 30 - - replication_skip_conflict @@ -159,6 +161,8 @@ cfg_filter(box.cfg) - false - - readahead - 16320 + - - replica_anon + - false - - replication_connect_timeout - 30 - - replication_skip_conflict diff --git a/test/replication/replica_anon.lua b/test/replication/replica_anon.lua new file mode 100644 index 000000000..0e5b314f3 --- /dev/null +++ b/test/replication/replica_anon.lua @@ -0,0 +1,9 @@ +#!/usr/bin/env tarantool + +require("console").listen(os.getenv("ADMIN")) + +box.cfg{ + read_only=true, + replica_anon=true, + replication=os.getenv("MASTER") +} diff --git a/test/replication/replica_anon.result b/test/replication/replica_anon.result new file mode 100644 index 000000000..58a892961 --- /dev/null +++ b/test/replication/replica_anon.result @@ -0,0 +1,103 @@ +test_run = require('test_run').new() +--- +... +box.schema.user.grant("guest", "replication") +--- +... +_ = box.schema.space.create("test") +--- +... +_ = box.space.test:create_index("pk") +--- +... +for i = 1, 100 do box.space.test:auto_increment{} end +--- +... +test_run:cmd("create server replica_anon with rpl_master=default, script='replication/replica_anon.lua'") +--- +- true +... +test_run:cmd('start server replica_anon') +--- +- true +... +-- Check anonymous join + subscribe. +test_run:cmd("switch replica_anon") +--- +- true +... +box.space.test:count() +--- +- 100 +... +box.info.replication[1].upstream.status +--- +- follow +... +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server replica_anon") +--- +- true +... +for i = 1, 100 do box.space.test:auto_increment{} end +--- +... +-- Check anonymous subscribe without join. +test_run:cmd("start server replica_anon") +--- +- true +... +test_run:cmd("switch replica_anon") +--- +- true +... +box.space.test:count() +--- +- 200 +... +test_run:cmd("switch default") +--- +- true +... +for i = 1, 100 do box.space.test:auto_increment{} end +--- +... +-- Check following updates. +test_run:cmd("switch replica_anon") +--- +- true +... +while box.space.test:count() < 300 do fiber.sleep(0.01) end +--- +... +box.space.test:count() +--- +- 300 +... +test_run:cmd("switch default") +--- +- true +... +-- Check display of anonymous replicas in box.info. +box.info.replication[#box.info.replication].anonymous +--- +- true +... +-- Cleanup. +test_run:cmd("stop server replica_anon") +--- +- true +... +test_run:cmd("cleanup server replica_anon") +--- +- true +... +box.schema.user.revoke("guest", "replication") +--- +... +box.space.test:drop() +--- +... diff --git a/test/replication/replica_anon.test.lua b/test/replication/replica_anon.test.lua new file mode 100644 index 000000000..a105f4d45 --- /dev/null +++ b/test/replication/replica_anon.test.lua @@ -0,0 +1,41 @@ +test_run = require('test_run').new() + +box.schema.user.grant("guest", "replication") +_ = box.schema.space.create("test") +_ = box.space.test:create_index("pk") + +for i = 1, 100 do box.space.test:auto_increment{} end + +test_run:cmd("create server replica_anon with rpl_master=default, script='replication/replica_anon.lua'") +test_run:cmd('start server replica_anon') +-- Check anonymous join + subscribe. +test_run:cmd("switch replica_anon") +box.space.test:count() +box.info.replication[1].upstream.status +test_run:cmd("switch default") + +test_run:cmd("stop server replica_anon") + +for i = 1, 100 do box.space.test:auto_increment{} end +-- Check anonymous subscribe without join. +test_run:cmd("start server replica_anon") +test_run:cmd("switch replica_anon") +box.space.test:count() +test_run:cmd("switch default") + +for i = 1, 100 do box.space.test:auto_increment{} end + +-- Check following updates. +test_run:cmd("switch replica_anon") +while box.space.test:count() < 300 do fiber.sleep(0.01) end +box.space.test:count() +test_run:cmd("switch default") + +-- Check display of anonymous replicas in box.info. +box.info.replication[#box.info.replication].anonymous + +-- Cleanup. +test_run:cmd("stop server replica_anon") +test_run:cmd("cleanup server replica_anon") +box.schema.user.revoke("guest", "replication") +box.space.test:drop() -- 2.17.1 (Apple Git-112)