From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id AB33C4696C2 for ; Sun, 15 Dec 2019 23:59:01 +0300 (MSK) From: sergepetrenko Date: Sun, 15 Dec 2019 23:58:45 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH 5/5] replication: introduce anonymous replica. List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: georgy@tarantool.org Cc: tarantool-patches@dev.tarantool.org This commit introduces anonymous replicas. Such replicas do not pollute _cluster table (they can only be read-only and have a zero id in return). An anonymous replica can be promoted to a normal one if needed. Closes #3186 @TarantoolBot document Title: Document anonymous replica There is a new type of replica in tarantool, anonymous one. Anonymous replica is read-only (but you still can write to temporary and replica-local spaces), and it isn't present in _cluster table. Since anonymous replica isn't registered in _cluster table, there is no limitation for anonymous replica count in a replicaset. You can have as many of them as you want. In order to make a replica anonymous, you have to pass an option `replication_anon=true` to `box.cfg`. You also have to set 'read_only' to true. Let's go through anonymous replica bootstrap. Suppose we have a master configured with ``` box.cfg{listen=3301} ``` And created a local space called "loc" ``` box.schema.space.create('loc', {is_local=true}) box.space.loc:create_index("pk") ``` Now, to configure an anonymous replica, we have to issue `box.cfg`, as usual. ``` box.cfg{replication_anon=true, read_only=true, replication=3301} ``` As mentioned above, `replication_anon` may be set to true only together with `read_only` The instance will fetch masters snapshot and proceed to following its changes. It will not receive an id so its id will remain zero. ``` tarantool> box.info.id --- - 0 ... ``` ``` tarantool> box.info.replication --- - 1: id: 1 uuid: 3c84f8d9-e34d-4651-969c-3d0ed214c60f lsn: 4 upstream: status: follow idle: 0.6912029999985 peer: lag: 0.00014615058898926 ... ``` Now we can use the replica. For example, we may do inserts into the local space: ``` tarantool> for i = 1,10 do > box.space.loc:insert{i} > end --- ... ``` Note, that while the instance is anonymous, it will increase the 0-th component of its vclock: ``` tarantool> box.info.vclock --- - {0: 10, 1: 4} ... ``` Let's now promote the replica to a normal one: ``` tarantool> box.cfg{replication_anon=false} 2019-12-13 20:34:37.423 [71329] main I> assigned id 2 to replica 6a9c2ed2-b9e1-4c57-a0e8-51a46def7661 2019-12-13 20:34:37.424 [71329] main/102/interactive I> set 'replication_anon' configuration option to false --- ... tarantool> 2019-12-13 20:34:37.424 [71329] main/117/applier/ I> subscribed 2019-12-13 20:34:37.424 [71329] main/117/applier/ I> remote vclock {1: 5} local vclock {0: 10, 1: 5} 2019-12-13 20:34:37.425 [71329] main/118/applierw/ C> leaving orphan mode ``` The replica just received id 2. We can make it read-write now. ``` box.cfg{read_only=false} 2019-12-13 20:35:46.392 [71329] main/102/interactive I> set 'read_only' configuration option to false --- ... tarantool> box.schema.space.create('test') --- - engine: memtx before_replace: 'function: 0x01109f9dc8' on_replace: 'function: 0x01109f9d90' ck_constraint: [] field_count: 0 temporary: false index: [] is_local: false enabled: false name: test id: 513 - created ... tarantool> box.info.vclock --- - {0: 10, 1: 5, 2: 2} ... ``` Now replica tracks its changes in 2nd vclock component, as expected. It can also become replication master from now on. Side notes: * You cannot replicate from an anonymous instance. * To promote an anonymous instance to a regular one, you first have to start it as anonymous, ano only then issue `box.cfg{replication_anon=false}` * In order for the deanonymization to succeed, the instance must replicate from some read-write instance, otherwise noone will be able to add it to _cluster table. --- src/box/applier.cc | 58 ++++++- src/box/applier.h | 4 + src/box/box.cc | 267 ++++++++++++++++++++++++++++++-- src/box/box.h | 11 +- src/box/iproto.cc | 16 +- src/box/iproto_constants.h | 6 + src/box/lua/cfg.cc | 14 +- src/box/lua/info.c | 4 +- src/box/lua/load_cfg.lua | 4 + src/box/recovery.cc | 7 +- src/box/relay.cc | 32 +++- src/box/replication.cc | 41 ++++- src/box/replication.h | 24 +++ src/box/wal.c | 4 + src/box/xrow.c | 47 +++++- src/box/xrow.h | 68 ++++++-- test/app-tap/init_script.result | 49 +++--- test/box/admin.result | 2 + test/box/cfg.result | 4 + test/replication/anon.lua | 13 ++ test/replication/anon.result | 259 +++++++++++++++++++++++++++++++ test/replication/anon.test.lua | 89 +++++++++++ test/replication/suite.cfg | 1 + 23 files changed, 957 insertions(+), 67 deletions(-) create mode 100644 test/replication/anon.lua create mode 100644 test/replication/anon.result create mode 100644 test/replication/anon.test.lua diff --git a/src/box/applier.cc b/src/box/applier.cc index 357369025..1445dd4d1 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -452,6 +452,23 @@ applier_do_fetch_snapshot(struct applier *applier) return row_count; } +static void +applier_fetch_snapshot(struct applier *applier) +{ + /* Send FETCH SNAPSHOT request */ + struct ev_io *coio = &applier->io; + struct xrow_header row; + + memset(&row, 0, sizeof(row)); + row.type = IPROTO_FETCH_SNAPSHOT; + coio_write_xrow(coio, &row); + + applier_set_state(applier, APPLIER_FETCH_SNAPSHOT); + applier_do_fetch_snapshot(applier); + applier_set_state(applier, APPLIER_FETCHED_SNAPSHOT); + applier_set_state(applier, APPLIER_READY); +} + static uint64_t applier_do_register(struct applier *applier, uint64_t row_count) { @@ -497,6 +514,28 @@ applier_do_register(struct applier *applier, uint64_t row_count) return row_count; } +static void +applier_register(struct applier *applier) +{ + /* Send REGISTER request */ + struct ev_io *coio = &applier->io; + struct xrow_header row; + + memset(&row, 0, sizeof(row)); + /* + * Send this instance's current vclock together + * with REGISTER request. + */ + xrow_encode_register(&row, &INSTANCE_UUID, box_vclock); + row.type = IPROTO_REGISTER; + coio_write_xrow(coio, &row); + + applier_set_state(applier, APPLIER_REGISTER); + applier_do_register(applier, 0); + applier_set_state(applier, APPLIER_REGISTERED); + applier_set_state(applier, APPLIER_READY); +} + /** * Execute and process JOIN request (bootstrap the instance). */ @@ -828,7 +867,7 @@ applier_subscribe(struct applier *applier) vclock_create(&vclock); vclock_copy(&vclock, &replicaset.vclock); xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, - &vclock); + &vclock, replication_anon); coio_write_xrow(coio, &row); /* Read SUBSCRIBE response */ @@ -996,10 +1035,25 @@ applier_f(va_list ap) if (tt_uuid_is_nil(&REPLICASET_UUID)) { /* * Execute JOIN if this is a bootstrap. + * In case of anonymous replication, don't + * join but just fetch master's snapshot. + * * The join will pause the applier * until WAL is created. */ - applier_join(applier); + if (replication_anon) { + applier_fetch_snapshot(applier); + } else { + applier_join(applier); + } + } + if (applier->version_id >= version_id(1, 7, 0) && + !replication_anon && instance_id == REPLICA_ID_NIL) { + /* anonymity was turned off while we were + * fetching a snapshot or following master. + * Register the replica now. + */ + applier_register(applier); } applier_subscribe(applier); /* diff --git a/src/box/applier.h b/src/box/applier.h index b406e6aaf..c9fdc2955 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -61,6 +61,10 @@ enum { APPLIER_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */ _(APPLIER_STOPPED, 10) \ _(APPLIER_DISCONNECTED, 11) \ _(APPLIER_LOADING, 12) \ + _(APPLIER_FETCH_SNAPSHOT, 13) \ + _(APPLIER_FETCHED_SNAPSHOT, 14) \ + _(APPLIER_REGISTER, 15) \ + _(APPLIER_REGISTERED, 16) \ /** States for the applier */ ENUM(applier_state, applier_STATE); diff --git a/src/box/box.cc b/src/box/box.cc index 981a5bac1..4c39e4971 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -223,9 +223,13 @@ error: return -1; } +static bool +box_check_ro(void); + void -box_set_ro(bool ro) +box_set_ro() { + bool ro = box_check_ro(); if (ro == is_ro) return; /* nothing to do */ if (ro) @@ -486,6 +490,32 @@ box_check_uuid(struct tt_uuid *uuid, const char *name) } } +static bool +box_check_ro() +{ + bool ro = cfg_geti("read_only") != 0; + bool anon = cfg_geti("replication_anon") != 0; + if (anon && !ro) { + tnt_raise(ClientError, ER_CFG, "read_only", + "the value may be set to false only when " + "replication_anon is false"); + } + return ro; +} + +static bool +box_check_replication_anon(void) +{ + bool anon = cfg_geti("replication_anon") != 0; + bool ro = cfg_geti("read_only") != 0; + if (anon && !ro) { + tnt_raise(ClientError, ER_CFG, "replication_anon", + "the value may be set to true only when " + "the instance is read-only"); + } + return anon; +} + static void box_check_instance_uuid(struct tt_uuid *uuid) { @@ -740,6 +770,65 @@ box_set_replication_skip_conflict(void) replication_skip_conflict = cfg_geti("replication_skip_conflict"); } +void +box_set_replication_anon(void) +{ + bool anon = box_check_replication_anon(); + if (anon == replication_anon) + return; + + if (!anon) { + /* Turn anonymous instance into a normal one. */ + replication_anon = anon; + /* + * Reset all appliers. This will interrupt + * anonymous follow they're in and also update + * corresponding instance ballots so that we can + * use the latest info when choosing a replica to + * register on. + */ + replicaset_foreach(replica) { + struct applier *applier = replica->applier; + if (applier == NULL) + continue; + replica_clear_applier(replica); + replica->applier_sync_state = APPLIER_DISCONNECTED; + applier_stop(applier); + applier_start(applier); + replica_set_applier(replica, applier); + applier_resume_to_state(applier, APPLIER_CONNECTED, TIMEOUT_INFINITY); + } + /* Choose a master to send register request to. */ + struct replica *master = replicaset_leader(); + assert(master != NULL && master->applier != NULL); + struct applier *master_applier = master->applier; + applier_resume_to_state(master_applier, APPLIER_REGISTER, TIMEOUT_INFINITY); + applier_resume_to_state(master_applier, APPLIER_REGISTERED, TIMEOUT_INFINITY); + applier_resume_to_state(master_applier, APPLIER_READY, TIMEOUT_INFINITY); + applier_resume(master_applier); + /** + * Restart other appliers to + * resend non-anonymous subscribe. + */ + replicaset_foreach(replica) { + if (replica == master || replica->applier == NULL) + continue; + applier_resume(replica->applier); + } + } else if (!is_box_configured) { + replication_anon = anon; + } else { + /* + * It is forbidden to turn a normal replica into + * an anonymous one. + */ + tnt_raise(ClientError, ER_CFG, "replication_anon", + "cannot be turned on after bootstrap" + " has finished"); + } + +} + void box_listen(void) { @@ -1379,6 +1468,132 @@ box_process_auth(struct auth_request *request, const char *salt) authenticate(user, len, salt, request->scramble); } +void +box_process_fetch_snapshot(struct ev_io *io, struct xrow_header *header) +{ + + assert(header->type == IPROTO_FETCH_SNAPSHOT); + + /* Check that bootstrap has been finished */ + if (!is_box_configured) + tnt_raise(ClientError, ER_LOADING); + + /* Check permissions */ + access_check_universe_xc(PRIV_R); + + /* Forbid replication with disabled WAL */ + if (wal_mode() == WAL_NONE) { + tnt_raise(ClientError, ER_UNSUPPORTED, "Replication", + "wal_mode = 'none'"); + } + + say_info("sending current read-view to replica at %s", sio_socketname(io->fd)); + + /* Send the snapshot data to the instance. */ + struct vclock start_vclock; + relay_initial_join(io->fd, header->sync, &start_vclock); + say_info("read-view sent."); + + /* Remember master's vclock after the last request */ + struct vclock stop_vclock; + vclock_copy(&stop_vclock, &replicaset.vclock); + + /* Send end of snapshot data marker */ + struct xrow_header row; + xrow_encode_vclock_xc(&row, &stop_vclock); + row.sync = header->sync; + coio_write_xrow(io, &row); +} + +void +box_process_register(struct ev_io *io, struct xrow_header *header) +{ + assert(header->type == IPROTO_REGISTER); + + struct tt_uuid instance_uuid = uuid_nil; + struct vclock vclock; + xrow_decode_register_xc(header, &instance_uuid, &vclock); + + if (!is_box_configured) + tnt_raise(ClientError, ER_LOADING); + + if (tt_uuid_is_equal(&instance_uuid, &INSTANCE_UUID)) + tnt_raise(ClientError, ER_CONNECTION_TO_SELF); + + /* Forbid replication from an anonymous instance. */ + if (replication_anon) { + tnt_raise(ClientError, ER_UNSUPPORTED, "Replication", + "replicating from an anonymous instance."); + } + + access_check_universe_xc(PRIV_R); + /* We only get register requests from anonymous instances. */ + struct replica *replica = replica_by_uuid(&instance_uuid); + assert(replica == NULL || replica->id == REPLICA_ID_NIL); + /* See box_process_join() */ + box_check_writable_xc(); + struct space *space = space_cache_find_xc(BOX_CLUSTER_ID); + access_check_space_xc(space, PRIV_W); + + /* Forbid replication with disabled WAL */ + if (wal_mode() == WAL_NONE) { + tnt_raise(ClientError, ER_UNSUPPORTED, "Replication", + "wal_mode = 'none'"); + } + + /* + * Register the replica as a WAL consumer so that + * it can resume FINAL JOIN where INITIAL JOIN ends. + */ + struct gc_consumer *gc = gc_consumer_register(&replicaset.vclock, + "replica %s", tt_uuid_str(&instance_uuid)); + if (gc == NULL) + diag_raise(); + auto gc_guard = make_scoped_guard([&] { gc_consumer_unregister(gc); }); + + say_info("registering replica %s at %s", + tt_uuid_str(&instance_uuid), sio_socketname(io->fd)); + + struct vclock start_vclock; + vclock_copy(&start_vclock, &replicaset.vclock); + + /** + * Call the server-side hook which stores the replica uuid + * in _cluster space. + */ + box_on_join(&instance_uuid); + + ERROR_INJECT_YIELD(ERRINJ_REPLICA_JOIN_DELAY); + + /* Remember master's vclock after the last request */ + struct vclock stop_vclock; + vclock_copy(&stop_vclock, &replicaset.vclock); + + /* + * Feed replica with WALs in range (start_vclock, stop_vclock) + * so that it gets its registration. + */ + relay_final_join(io->fd, header->sync, &start_vclock, &stop_vclock); + say_info("final data sent."); + + struct xrow_header row; + /* Send end of WAL stream marker */ + xrow_encode_vclock_xc(&row, &replicaset.vclock); + row.sync = header->sync; + coio_write_xrow(io, &row); + + /* + * Advance the WAL consumer state to the position where + * FINAL JOIN ended and assign it to the replica. + */ + gc_consumer_advance(gc, &stop_vclock); + replica = replica_by_uuid(&instance_uuid); + if (replica->gc != NULL) + gc_consumer_unregister(replica->gc); + replica->gc = gc; + gc_guard.is_active = false; +} + void box_process_join(struct ev_io *io, struct xrow_header *header) { @@ -1438,6 +1653,12 @@ box_process_join(struct ev_io *io, struct xrow_header *header) if (tt_uuid_is_equal(&instance_uuid, &INSTANCE_UUID)) tnt_raise(ClientError, ER_CONNECTION_TO_SELF); + /* Forbid replication from an anonymous instance. */ + if (replication_anon) { + tnt_raise(ClientError, ER_UNSUPPORTED, "Replication", + "replicating from an anonymous instance."); + } + /* Check permissions */ access_check_universe_xc(PRIV_R); @@ -1533,27 +1754,39 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) if (!is_box_configured) tnt_raise(ClientError, ER_LOADING); + struct tt_uuid replica_uuid = uuid_nil; struct vclock replica_clock; uint32_t replica_version_id; vclock_create(&replica_clock); + bool anon; xrow_decode_subscribe_xc(header, NULL, &replica_uuid, - &replica_clock, &replica_version_id); + &replica_clock, &replica_version_id, &anon); /* Forbid connection to itself */ if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID)) tnt_raise(ClientError, ER_CONNECTION_TO_SELF); + /* Forbid replication from an anonymous instance. */ + if (replication_anon) { + tnt_raise(ClientError, ER_UNSUPPORTED, "Replication", + "replicating from an anonymous instance."); + } + /* Check permissions */ access_check_universe_xc(PRIV_R); /* Check replica uuid */ struct replica *replica = replica_by_uuid(&replica_uuid); - if (replica == NULL || replica->id == REPLICA_ID_NIL) { + + if (!anon && (replica == NULL || replica->id == REPLICA_ID_NIL)) { tnt_raise(ClientError, ER_UNKNOWN_REPLICA, tt_uuid_str(&replica_uuid), tt_uuid_str(&REPLICASET_UUID)); } + if (replica == NULL) { + replica = replicaset_add_anon(&replica_uuid); + } /* Don't allow multiple relays for the same replica */ if (relay_get_state(replica->relay) == RELAY_FOLLOW) { @@ -1774,13 +2007,16 @@ bootstrap_from_master(struct replica *master) */ assert(!tt_uuid_is_nil(&INSTANCE_UUID)); - applier_resume_to_state(applier, APPLIER_INITIAL_JOIN, TIMEOUT_INFINITY); - + enum applier_state wait_state = replication_anon ? APPLIER_FETCH_SNAPSHOT : + APPLIER_INITIAL_JOIN; + applier_resume_to_state(applier, wait_state, TIMEOUT_INFINITY); /* * Process initial data (snapshot or dirty disk data). */ engine_begin_initial_recovery_xc(NULL); - applier_resume_to_state(applier, APPLIER_FINAL_JOIN, TIMEOUT_INFINITY); + wait_state = replication_anon ? APPLIER_FETCHED_SNAPSHOT : + APPLIER_FINAL_JOIN; + applier_resume_to_state(applier, wait_state, TIMEOUT_INFINITY); /* * Process final data (WALs). @@ -1790,8 +2026,10 @@ bootstrap_from_master(struct replica *master) recovery_journal_create(&journal, &replicaset.vclock); journal_set(&journal.base); - applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY); - + if (!replication_anon) { + applier_resume_to_state(applier, APPLIER_JOINED, + TIMEOUT_INFINITY); + } /* Finalize the new replica */ engine_end_recovery_xc(); @@ -2106,6 +2344,7 @@ box_cfg_xc(void) box_set_replication_sync_lag(); box_set_replication_sync_timeout(); box_set_replication_skip_conflict(); + box_set_replication_anon(); struct gc_checkpoint *checkpoint = gc_last_checkpoint(); @@ -2136,14 +2375,20 @@ box_cfg_xc(void) } fiber_gc(); - /* Check for correct registration of the instance in _cluster */ - { - struct replica *self = replica_by_uuid(&INSTANCE_UUID); + /* + * Check for correct registration of the instance in _cluster + * The instance won't exist in _cluster space if it is an + * anonymous replica, add it manually. + */ + struct replica *self = replica_by_uuid(&INSTANCE_UUID); + if (!replication_anon) { if (self == NULL || self->id == REPLICA_ID_NIL) { tnt_raise(ClientError, ER_UNKNOWN_REPLICA, tt_uuid_str(&INSTANCE_UUID), tt_uuid_str(&REPLICASET_UUID)); } + } else if (self == NULL) { + replicaset_add_anon(&INSTANCE_UUID); } rmean_cleanup(rmean_box); diff --git a/src/box/box.h b/src/box/box.h index ccd527bd5..e4088d6b6 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -100,7 +100,7 @@ void box_atfork(void); void -box_set_ro(bool ro); +box_set_ro(); bool box_is_ro(void); @@ -179,6 +179,14 @@ box_reset_stat(void); void box_process_auth(struct auth_request *request, const char *salt); +/** Send current read view to the replica. */ +void +box_process_fetch_snapshot(struct ev_io *io, struct xrow_header *header); + +/** Register a replica */ +void +box_process_register(struct ev_io *io, struct xrow_header *header); + /** * Join a replica. * @@ -234,6 +242,7 @@ void box_set_replication_connect_quorum(void); void box_set_replication_sync_lag(void); void box_set_replication_sync_timeout(void); void box_set_replication_skip_conflict(void); +void box_set_replication_anon(void); void box_set_net_msg_max(void); extern "C" { diff --git a/src/box/iproto.cc b/src/box/iproto.cc index c39b8e7bf..9e6bd2dd7 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -1162,7 +1162,7 @@ static void net_send_error(struct cmsg *msg); static void -tx_process_join_subscribe(struct cmsg *msg); +tx_process_replication(struct cmsg *msg); static void net_end_join(struct cmsg *msg); @@ -1212,12 +1212,12 @@ static const struct cmsg_hop *dml_route[IPROTO_TYPE_STAT_MAX] = { }; static const struct cmsg_hop join_route[] = { - { tx_process_join_subscribe, &net_pipe }, + { tx_process_replication, &net_pipe }, { net_end_join, NULL }, }; static const struct cmsg_hop subscribe_route[] = { - { tx_process_join_subscribe, &net_pipe }, + { tx_process_replication, &net_pipe }, { net_end_subscribe, NULL }, }; @@ -1272,6 +1272,8 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, cmsg_init(&msg->base, misc_route); break; case IPROTO_JOIN: + case IPROTO_FETCH_SNAPSHOT: + case IPROTO_REGISTER: cmsg_init(&msg->base, join_route); *stop_input = true; break; @@ -1752,7 +1754,7 @@ error: } static void -tx_process_join_subscribe(struct cmsg *m) +tx_process_replication(struct cmsg *m) { struct iproto_msg *msg = tx_accept_msg(m); struct iproto_connection *con = msg->connection; @@ -1768,6 +1770,12 @@ tx_process_join_subscribe(struct cmsg *m) */ box_process_join(&io, &msg->header); break; + case IPROTO_FETCH_SNAPSHOT: + box_process_fetch_snapshot(&io, &msg->header); + break; + case IPROTO_REGISTER: + box_process_register(&io, &msg->header); + break; case IPROTO_SUBSCRIBE: /* * Subscribe never returns - unless there diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 5e8a7d483..cc8dd7cd7 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -120,6 +120,8 @@ enum iproto_key { * } */ IPROTO_SQL_INFO = 0x42, + /* Leave a gap between SQL keys and additional request keys */ + IPROTO_REPLICA_ANON = 0x50, IPROTO_KEY_MAX }; @@ -216,6 +218,10 @@ enum iproto_type { IPROTO_VOTE_DEPRECATED = 67, /** Vote request command for master election */ IPROTO_VOTE = 68, + /** Anonymous replication FETCH SNAPSHOT */ + IPROTO_FETCH_SNAPSHOT = 69, + /** REGISTER request to leave anonymous replication */ + IPROTO_REGISTER = 70, /** Vinyl run info stored in .index file */ VY_INDEX_RUN_INFO = 100, diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc index 4884ce013..f59470774 100644 --- a/src/box/lua/cfg.cc +++ b/src/box/lua/cfg.cc @@ -190,7 +190,7 @@ static int lbox_cfg_set_read_only(struct lua_State *L) { try { - box_set_ro(cfg_geti("read_only") != 0); + box_set_ro(); } catch (Exception *) { luaT_error(L); } @@ -338,6 +338,17 @@ lbox_cfg_set_replication_sync_timeout(struct lua_State *L) return 0; } +static int +lbox_cfg_set_replication_anon(struct lua_State *L) +{ + try { + box_set_replication_anon(); + } catch (Exception *) { + luaT_error(L); + } + return 0; +} + static int lbox_cfg_set_replication_skip_conflict(struct lua_State *L) { @@ -377,6 +388,7 @@ box_lua_cfg_init(struct lua_State *L) {"cfg_set_replication_sync_lag", lbox_cfg_set_replication_sync_lag}, {"cfg_set_replication_sync_timeout", lbox_cfg_set_replication_sync_timeout}, {"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict}, + {"cfg_set_replication_anon", lbox_cfg_set_replication_anon}, {"cfg_set_net_msg_max", lbox_cfg_set_net_msg_max}, {NULL, NULL} }; diff --git a/src/box/lua/info.c b/src/box/lua/info.c index e029e0e17..b5909a878 100644 --- a/src/box/lua/info.c +++ b/src/box/lua/info.c @@ -223,7 +223,7 @@ lbox_info_id(struct lua_State *L) * at box.info.status. */ struct replica *self = replica_by_uuid(&INSTANCE_UUID); - if (self != NULL && self->id != REPLICA_ID_NIL) { + if (self != NULL && (self->id != REPLICA_ID_NIL || replication_anon)) { lua_pushinteger(L, self->id); } else { luaL_pushnull(L); @@ -243,7 +243,7 @@ lbox_info_lsn(struct lua_State *L) { /* See comments in lbox_info_id */ struct replica *self = replica_by_uuid(&INSTANCE_UUID); - if (self != NULL && self->id != REPLICA_ID_NIL) { + if (self != NULL && (self->id != REPLICA_ID_NIL || replication_anon)) { luaL_pushint64(L, vclock_get(box_vclock, self->id)); } else { luaL_pushint64(L, -1); diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 85617c8f0..9dee75b7d 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -77,6 +77,7 @@ local default_cfg = { replication_connect_timeout = 30, replication_connect_quorum = nil, -- connect all replication_skip_conflict = false, + replication_anon = false, feedback_enabled = true, feedback_host = "https://feedback.tarantool.io", feedback_interval = 3600, @@ -140,6 +141,7 @@ local template_cfg = { replication_connect_timeout = 'number', replication_connect_quorum = 'number', replication_skip_conflict = 'boolean', + replication_anon = 'boolean', feedback_enabled = 'boolean', feedback_host = 'string', feedback_interval = 'number', @@ -247,6 +249,7 @@ local dynamic_cfg = { replication_sync_lag = private.cfg_set_replication_sync_lag, replication_sync_timeout = private.cfg_set_replication_sync_timeout, replication_skip_conflict = private.cfg_set_replication_skip_conflict, + replication_anon = private.cfg_set_replication_anon, instance_uuid = check_instance_uuid, replicaset_uuid = check_replicaset_uuid, net_msg_max = private.cfg_set_net_msg_max, @@ -301,6 +304,7 @@ local dynamic_cfg_skip_at_load = { replication_sync_lag = true, replication_sync_timeout = true, replication_skip_conflict = true, + replication_anon = true, wal_dir_rescan_delay = true, custom_proc_title = true, force_recovery = true, diff --git a/src/box/recovery.cc b/src/box/recovery.cc index d122d618a..64aa467b1 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -262,9 +262,12 @@ recover_xlog(struct recovery *r, struct xstream *stream, /* * All rows in xlog files have an assigned - * replica id. + * replica id. The only exception is anonymous + * replica, which has a zero instance id. + * In this case the only rows from such an instance + * can be for the local spaces. */ - assert(row.replica_id != 0); + assert(row.replica_id != 0 || row.group_id == GROUP_LOCAL); /* * We can promote the vclock either before or * after xstream_write(): it only makes any impact diff --git a/src/box/relay.cc b/src/box/relay.cc index e849fcf4f..14644716d 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -569,11 +569,17 @@ relay_subscribe_f(va_list ap) cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe, &relay->relay_pipe, NULL, NULL, cbus_process); - /* Setup garbage collection trigger. */ + /* + * Setup garbage collection trigger. + * Not needed for anonymous replicas, since they + * aren't registered with gc at all. + */ struct trigger on_close_log = { RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL }; - trigger_add(&r->on_close_log, &on_close_log); + if (!relay->replica->anon) { + trigger_add(&r->on_close_log, &on_close_log); + } /* Setup WAL watcher for sending new rows to the replica. */ wal_set_watcher(&relay->wal_watcher, relay->endpoint.name, @@ -652,7 +658,9 @@ relay_subscribe_f(va_list ap) say_crit("exiting the relay loop"); /* Clear garbage collector trigger and WAL watcher. */ - trigger_clear(&on_close_log); + if (!relay->replica->anon) { + trigger_clear(&on_close_log); + } wal_clear_watcher(&relay->wal_watcher, cbus_process); /* Join ack reader fiber. */ @@ -673,7 +681,7 @@ void relay_subscribe(struct replica *replica, int fd, uint64_t sync, struct vclock *replica_clock, uint32_t replica_version_id) { - assert(replica->id != REPLICA_ID_NIL); + assert(replica->anon || replica->id != REPLICA_ID_NIL); struct relay *relay = replica->relay; assert(relay->state != RELAY_FOLLOW); /* @@ -681,7 +689,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, * unless it has already been registered by initial * join. */ - if (replica->gc == NULL) { + if (replica->gc == NULL && !replica->anon) { replica->gc = gc_consumer_register(replica_clock, "replica %s", tt_uuid_str(&replica->uuid)); if (replica->gc == NULL) @@ -691,7 +699,11 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, relay_start(relay, fd, sync, relay_send_row); auto relay_guard = make_scoped_guard([=] { relay_stop(relay); - replica_on_relay_stop(replica); + if (replica->anon) { + replica_anon_delete(replica); + } else { + replica_on_relay_stop(replica); + } }); vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock); @@ -741,6 +753,14 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) { struct relay *relay = container_of(stream, struct relay, stream); assert(iproto_type_is_dml(packet->type)); + /* + * Replica-local requests generated while replica was + * anonymous have a zero instance id. Just skip all + * these rows. + */ + if (packet->replica_id == REPLICA_ID_NIL) { + return; + } /* * Transform replica local requests to IPROTO_NOP so as to * promote vclock on the replica without actually modifying diff --git a/src/box/replication.cc b/src/box/replication.cc index 81f19aa07..ce707811a 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -53,6 +53,7 @@ int replication_connect_quorum = REPLICATION_CONNECT_QUORUM_ALL; double replication_sync_lag = 10.0; /* seconds */ double replication_sync_timeout = 300.0; /* seconds */ bool replication_skip_conflict = false; +bool replication_anon = false; struct replicaset replicaset; @@ -172,6 +173,7 @@ replica_new(void) diag_raise(); } replica->id = 0; + replica->anon = false; replica->uuid = uuid_nil; replica->applier = NULL; replica->gc = NULL; @@ -209,6 +211,19 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid) return replica; } +struct replica * +replicaset_add_anon(const struct tt_uuid *replica_uuid) +{ + assert(!tt_uuid_is_nil(replica_uuid)); + assert(replica_by_uuid(replica_uuid) == NULL); + + struct replica *replica = replica_new(); + replica->uuid = *replica_uuid; + replica_hash_insert(&replicaset.hash, replica); + replica->anon = true; + return replica; +} + void replica_set_id(struct replica *replica, uint32_t replica_id) { @@ -220,11 +235,21 @@ replica_set_id(struct replica *replica, uint32_t replica_id) /* Assign local replica id */ assert(instance_id == REPLICA_ID_NIL); instance_id = replica_id; + } else if (replica->anon) { + /* + * Set replica gc on its transition from + * anonymous to a normal one. + */ + assert(replica->gc == NULL); + replica->gc = gc_consumer_register(&replicaset.vclock, + "replica %s", + tt_uuid_str(&replica->uuid)); } replicaset.replica_by_id[replica_id] = replica; say_info("assigned id %d to replica %s", replica->id, tt_uuid_str(&replica->uuid)); + replica->anon = false; } void @@ -268,7 +293,7 @@ replica_clear_id(struct replica *replica) } } -static void +void replica_set_applier(struct replica *replica, struct applier *applier) { assert(replica->applier == NULL); @@ -277,7 +302,7 @@ replica_set_applier(struct replica *replica, struct applier *applier) &replica->on_applier_state); } -static void +void replica_clear_applier(struct replica *replica) { assert(replica->applier != NULL); @@ -880,6 +905,18 @@ replica_on_relay_stop(struct replica *replica) } } +void +replica_anon_delete(struct replica *replica) +{ + assert(replica->gc == NULL); + assert(replica->id == REPLICA_ID_NIL); + /* We do not replicate from anonymous replicas */ + assert(replica->applier == NULL); + replica_hash_remove(&replicaset.hash, replica); + replica_delete(replica); +} + + struct replica * replicaset_first(void) { diff --git a/src/box/replication.h b/src/box/replication.h index 470420592..978a09d41 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -137,6 +137,12 @@ extern double replication_sync_timeout; */ extern bool replication_skip_conflict; +/** + * Whether this replica will be anonymous or not, e.g. be preset + * in _cluster table and have a non-zero id. + */ +extern bool replication_anon; + /** * Wait for the given period of time before trying to reconnect * to a master. @@ -265,6 +271,12 @@ struct replica { * registered in the _cluster space yet. */ uint32_t id; + /** + * Whether this is an anonymous replica, e.g. a read-only + * replica that doesn't have an id and isn't present in + * _cluster table. + */ + bool anon; /** Applier fiber. */ struct applier *applier; /** Relay thread. */ @@ -343,12 +355,21 @@ replica_set_id(struct replica *replica, uint32_t id); void replica_clear_id(struct replica *replica); +void +replica_clear_applier(struct replica *replica); + +void +replica_set_applier(struct replica * replica, struct applier * applier); + /** * Unregister \a relay from the \a replica. */ void replica_on_relay_stop(struct replica *replica); +void +replica_anon_delete(struct replica *replica); + #if defined(__cplusplus) } /* extern "C" */ @@ -364,6 +385,9 @@ replica_check_id(uint32_t replica_id); struct replica * replicaset_add(uint32_t replica_id, const struct tt_uuid *instance_uuid); +struct replica * +replicaset_add_anon(const struct tt_uuid *replica_uuid); + /** * Try to connect appliers to remote peers and receive UUID. * Appliers that did not connect will connect asynchronously. diff --git a/src/box/wal.c b/src/box/wal.c index 5e2c13e0e..2b238b743 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -930,6 +930,10 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, if ((*row)->replica_id == 0) { (*row)->lsn = vclock_inc(vclock_diff, instance_id) + vclock_get(base, instance_id); + /* + * Note, an anonymous replica signs local + * rows whith a zero instance id. + */ (*row)->replica_id = instance_id; /* Use lsn of the first local row as transaction id. */ tsn = tsn == 0 ? (*row)->lsn : tsn; diff --git a/src/box/xrow.c b/src/box/xrow.c index 18bf08971..37a565bcb 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -1148,11 +1148,40 @@ err: return -1; } +int +xrow_encode_register(struct xrow_header *row, + const struct tt_uuid *instance_uuid, + const struct vclock *vclock) +{ + memset(row, 0, sizeof(*row)); + size_t size = mp_sizeof_map(2) + + mp_sizeof_uint(IPROTO_INSTANCE_UUID) + + mp_sizeof_str(UUID_STR_LEN) + + mp_sizeof_uint(IPROTO_VCLOCK) + mp_sizeof_vclock(vclock); + char *buf = (char *) region_alloc(&fiber()->gc, size); + if (buf == NULL) { + diag_set(OutOfMemory, size, "region_alloc", "buf"); + return -1; + } + char *data = buf; + data = mp_encode_map(data, 2); + data = mp_encode_uint(data, IPROTO_INSTANCE_UUID); + data = xrow_encode_uuid(data, instance_uuid); + data = mp_encode_uint(data, IPROTO_VCLOCK); + data = mp_encode_vclock(data, vclock); + assert(data <= buf + size); + row->body[0].iov_base = buf; + row->body[0].iov_len = (data - buf); + row->bodycnt = 1; + row->type = IPROTO_REGISTER; + return 0; +} + int xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, - const struct vclock *vclock) + const struct vclock *vclock, bool anon) { memset(row, 0, sizeof(*row)); size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock); @@ -1162,7 +1191,7 @@ xrow_encode_subscribe(struct xrow_header *row, return -1; } char *data = buf; - data = mp_encode_map(data, 4); + data = mp_encode_map(data, 5); data = mp_encode_uint(data, IPROTO_CLUSTER_UUID); data = xrow_encode_uuid(data, replicaset_uuid); data = mp_encode_uint(data, IPROTO_INSTANCE_UUID); @@ -1171,6 +1200,8 @@ xrow_encode_subscribe(struct xrow_header *row, data = mp_encode_vclock(data, vclock); data = mp_encode_uint(data, IPROTO_SERVER_VERSION); data = mp_encode_uint(data, tarantool_version_id()); + data = mp_encode_uint(data, IPROTO_REPLICA_ANON); + data = mp_encode_bool(data, anon); assert(data <= buf + size); row->body[0].iov_base = buf; row->body[0].iov_len = (data - buf); @@ -1182,7 +1213,7 @@ xrow_encode_subscribe(struct xrow_header *row, int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *version_id) + uint32_t *version_id, bool *anon) { if (row->bodycnt == 0) { diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); @@ -1245,6 +1276,16 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, } *version_id = mp_decode_uint(&d); break; + case IPROTO_REPLICA_ANON: + if (anon == NULL) + goto skip; + if (mp_typeof(*d) != MP_BOOL) { + xrow_on_decode_err(data, end, ER_INVALID_MSGPACK, + "invalid REPLICA_ANON flag"); + return -1; + } + *anon = mp_decode_bool(&d); + break; default: skip: mp_next(&d); /* value */ } diff --git a/src/box/xrow.h b/src/box/xrow.h index 60def2d3c..b8da3a0d0 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -301,12 +301,27 @@ xrow_decode_ballot(struct xrow_header *row, struct ballot *ballot); void xrow_encode_vote(struct xrow_header *row); +/** + * Encode REGISTER command. + * @param[out] Row. + * @param instance_uuid Instance uuid. + * @param vclock Replication clock. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +int +xrow_encode_register(struct xrow_header *row, + const struct tt_uuid *instance_uuid, + const struct vclock *vclock); + /** * Encode SUBSCRIBE command. * @param[out] Row. * @param replicaset_uuid Replica set uuid. * @param instance_uuid Instance uuid. * @param vclock Replication clock. + * @param anon Whether it is an anonymous subscribe request or not. * * @retval 0 Success. * @retval -1 Memory error. @@ -315,7 +330,7 @@ int xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, - const struct vclock *vclock); + const struct vclock *vclock, bool anon); /** * Decode SUBSCRIBE command. @@ -324,6 +339,7 @@ xrow_encode_subscribe(struct xrow_header *row, * @param[out] instance_uuid. * @param[out] vclock. * @param[out] version_id. + * @param[out] anon Whether it is an anonymous subscribe. * * @retval 0 Success. * @retval -1 Memory or format error. @@ -331,7 +347,7 @@ xrow_encode_subscribe(struct xrow_header *row, int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *version_id); + uint32_t *version_id, bool *anon); /** * Encode JOIN command. @@ -355,7 +371,22 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid); static inline int xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid) { - return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL); + return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, NULL); +} + +/** + * Decode REGISTER request. + * @param row Row to decode. + * @param[out] instance_uuid Instance uuid. + * @param[out] vclock Instance vclock. + * @retval 0 Success. + * @retval -1 Memory or format error. + */ +static inline int +xrow_decode_register(struct xrow_header *row, struct tt_uuid *instance_uuid, + struct vclock *vclock) +{ + return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, NULL, NULL); } /** @@ -380,7 +411,7 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock); static inline int xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock) { - return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL); + return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL); } /** @@ -411,7 +442,7 @@ xrow_decode_subscribe_response(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct vclock *vclock) { - return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL); + return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL, NULL); } /** @@ -769,15 +800,25 @@ xrow_decode_ballot_xc(struct xrow_header *row, struct ballot *ballot) diag_raise(); } +/** @copydoc xrow_encode_register. */ +static inline void +xrow_encode_register_xc(struct xrow_header *row, + const struct tt_uuid *instance_uuid, + const struct vclock *vclock) +{ + if (xrow_encode_register(row, instance_uuid, vclock) != 0) + diag_raise(); +} + /** @copydoc xrow_encode_subscribe. */ static inline void xrow_encode_subscribe_xc(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, - const struct vclock *vclock) + const struct vclock *vclock, bool anon) { if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid, - vclock) != 0) + vclock, anon) != 0) diag_raise(); } @@ -786,10 +827,10 @@ static inline void xrow_decode_subscribe_xc(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *replica_version_id) + uint32_t *replica_version_id, bool *anon) { if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid, - vclock, replica_version_id) != 0) + vclock, replica_version_id, anon) != 0) diag_raise(); } @@ -810,6 +851,15 @@ xrow_decode_join_xc(struct xrow_header *row, struct tt_uuid *instance_uuid) diag_raise(); } +/** @copydoc xrow_decode_register. */ +static inline void +xrow_decode_register_xc(struct xrow_header *row, struct tt_uuid *instance_uuid, + struct vclock *vclock) +{ + if (xrow_decode_register(row, instance_uuid, vclock) != 0) + diag_raise(); +} + /** @copydoc xrow_encode_vclock. */ static inline void xrow_encode_vclock_xc(struct xrow_header *row, const struct vclock *vclock) diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result index 799297ba0..7aec1d715 100644 --- a/test/app-tap/init_script.result +++ b/test/app-tap/init_script.result @@ -25,30 +25,31 @@ box.cfg 20 pid_file:box.pid 21 read_only:false 22 readahead:16320 -23 replication_connect_timeout:30 -24 replication_skip_conflict:false -25 replication_sync_lag:10 -26 replication_sync_timeout:300 -27 replication_timeout:1 -28 slab_alloc_factor:1.05 -29 strip_core:true -30 too_long_threshold:0.5 -31 vinyl_bloom_fpr:0.05 -32 vinyl_cache:134217728 -33 vinyl_dir:. -34 vinyl_max_tuple_size:1048576 -35 vinyl_memory:134217728 -36 vinyl_page_size:8192 -37 vinyl_read_threads:1 -38 vinyl_run_count_per_level:2 -39 vinyl_run_size_ratio:3.5 -40 vinyl_timeout:60 -41 vinyl_write_threads:4 -42 wal_dir:. -43 wal_dir_rescan_delay:2 -44 wal_max_size:268435456 -45 wal_mode:write -46 worker_pool_threads:4 +23 replication_anon:false +24 replication_connect_timeout:30 +25 replication_skip_conflict:false +26 replication_sync_lag:10 +27 replication_sync_timeout:300 +28 replication_timeout:1 +29 slab_alloc_factor:1.05 +30 strip_core:true +31 too_long_threshold:0.5 +32 vinyl_bloom_fpr:0.05 +33 vinyl_cache:134217728 +34 vinyl_dir:. +35 vinyl_max_tuple_size:1048576 +36 vinyl_memory:134217728 +37 vinyl_page_size:8192 +38 vinyl_read_threads:1 +39 vinyl_run_count_per_level:2 +40 vinyl_run_size_ratio:3.5 +41 vinyl_timeout:60 +42 vinyl_write_threads:4 +43 wal_dir:. +44 wal_dir_rescan_delay:2 +45 wal_max_size:268435456 +46 wal_mode:write +47 worker_pool_threads:4 -- -- Test insert from detached fiber -- diff --git a/test/box/admin.result b/test/box/admin.result index 6126f3a97..5a03a979a 100644 --- a/test/box/admin.result +++ b/test/box/admin.result @@ -71,6 +71,8 @@ cfg_filter(box.cfg) - false - - readahead - 16320 + - - replication_anon + - false - - replication_connect_timeout - 30 - - replication_skip_conflict diff --git a/test/box/cfg.result b/test/box/cfg.result index 5370bb870..d6ce6b621 100644 --- a/test/box/cfg.result +++ b/test/box/cfg.result @@ -59,6 +59,8 @@ cfg_filter(box.cfg) | - false | - - readahead | - 16320 + | - - replication_anon + | - false | - - replication_connect_timeout | - 30 | - - replication_skip_conflict @@ -158,6 +160,8 @@ cfg_filter(box.cfg) | - false | - - readahead | - 16320 + | - - replication_anon + | - false | - - replication_connect_timeout | - 30 | - - replication_skip_conflict diff --git a/test/replication/anon.lua b/test/replication/anon.lua new file mode 100644 index 000000000..2e7ee9983 --- /dev/null +++ b/test/replication/anon.lua @@ -0,0 +1,13 @@ +#!/usr/bin/env tarantool + +box.cfg({ + listen = os.getenv("LISTEN"), + replication = os.getenv("MASTER"), + memtx_memory = 107374182, + replication_timeout = 0.1, + replication_connect_timeout = 0.5, + read_only=true, + replication_anon=true, +}) + +require('console').listen(os.getenv('ADMIN')) diff --git a/test/replication/anon.result b/test/replication/anon.result new file mode 100644 index 000000000..df84484b2 --- /dev/null +++ b/test/replication/anon.result @@ -0,0 +1,259 @@ +-- test-run result file version 2 +env = require('test_run') + | --- + | ... +vclock_diff = require('fast_replica').vclock_diff + | --- + | ... +test_run = env.new() + | --- + | ... + +-- prepare master +box.schema.user.grant('guest', 'replication') + | --- + | ... +_ = box.schema.space.create('loc', {is_local=true}) + | --- + | ... +_ = box.schema.space.create('temp', {temporary=true}) + | --- + | ... +_ = box.schema.space.create('test') + | --- + | ... +_ = box.space.loc:create_index('pk') + | --- + | ... +_ = box.space.temp:create_index('pk') + | --- + | ... +_ = box.space.test:create_index('pk') + | --- + | ... +box.space.test:insert{1} + | --- + | - [1] + | ... + +test_run:cmd('create server replica_anon with rpl_master=default, script="replication/anon.lua"') + | --- + | - true + | ... +test_run:cmd('start server replica_anon') + | --- + | - true + | ... +test_run:cmd('switch replica_anon') + | --- + | - true + | ... + +box.info.status + | --- + | - running + | ... +box.info.id + | --- + | - 0 + | ... +box.info.lsn + | --- + | - 0 + | ... +test_run:wait_upstream(1, {status='follow'}) + | --- + | - true + | ... + +-- Temporary spaces are accessible as read / write. +for i = 1,10 do box.space.temp:insert{i} end + | --- + | ... +box.space.temp:select{} + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | - [5] + | - [6] + | - [7] + | - [8] + | - [9] + | - [10] + | ... + +box.info.lsn + | --- + | - 0 + | ... + +-- Same for local spaces. +for i = 1,10 do box.space.loc:insert{i} end + | --- + | ... +box.space.loc:select{} + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | - [5] + | - [6] + | - [7] + | - [8] + | - [9] + | - [10] + | ... + +-- Replica-local changes are accounted for in 0 vclock component. +box.info.lsn + | --- + | - 10 + | ... +box.info.vclock[0] + | --- + | - 10 + | ... + +-- Replica is read-only. +box.cfg.read_only + | --- + | - true + | ... +box.cfg{read_only=false} + | --- + | - error: 'Incorrect value for option ''read_only'': the value may be set to false + | only when replication_anon is false' + | ... + +box.space.test:insert{2} + | --- + | - error: Can't modify data because this instance is in read-only mode. + | ... + +box.space.loc:drop() + | --- + | - error: Can't modify data because this instance is in read-only mode. + | ... +box.space.loc:truncate() + | --- + | - error: Can't modify data because this instance is in read-only mode. + | ... + +test_run:cmd('switch default') + | --- + | - true + | ... + +-- Replica isn't visible on master. +#box.info.replication + | --- + | - 1 + | ... + +test_run:cmd('switch replica_anon') + | --- + | - true + | ... + +-- Promote anonymous replica. +box.cfg{replication_anon=false} + | --- + | ... +-- Cannot switch back after becoming "normal". +box.cfg{replication_anon=true} + | --- + | - error: 'Incorrect value for option ''replication_anon'': cannot be turned on after + | bootstrap has finished' + | ... + +box.info.id + | --- + | - 2 + | ... +#box.info.replication + | --- + | - 2 + | ... +test_run:wait_upstream(1, {status='follow'}) + | --- + | - true + | ... +box.info.replication.downstream + | --- + | - null + | ... + +old_lsn = box.info.vclock[2] or 0 + | --- + | ... + +-- Now read_only can be turned off. +box.cfg{read_only=false} + | --- + | ... +box.space.test:insert{2} + | --- + | - [2] + | ... +-- New changes are tracked under freshly assigned id. +box.info.vclock[2] == old_lsn + 1 + | --- + | - true + | ... + +test_run:cmd('switch default') + | --- + | - true + | ... + +-- Other instances may replicate from a previously-anonymous one. +test_run:cmd("set variable repl_source to 'replica_anon.listen'") + | --- + | - true + | ... +box.cfg{replication=repl_source} + | --- + | ... +#box.info.replication + | --- + | - 2 + | ... +test_run:wait_upstream(2, {status='follow'}) + | --- + | - true + | ... +test_run:wait_downstream(2, {status='follow'}) + | --- + | - true + | ... +#box.info.vclock + | --- + | - 2 + | ... + +-- cleanup +box.cfg{replication=""} + | --- + | ... +test_run:cmd('stop server replica_anon with cleanup=1') + | --- + | - true + | ... +box.space.test:drop() + | --- + | ... +box.space.temp:drop() + | --- + | ... +box.space.loc:drop() + | --- + | ... +box.schema.user.revoke('guest', 'replication') + | --- + | ... +test_run:cleanup_cluster() + | --- + | ... diff --git a/test/replication/anon.test.lua b/test/replication/anon.test.lua new file mode 100644 index 000000000..f151f9e8c --- /dev/null +++ b/test/replication/anon.test.lua @@ -0,0 +1,89 @@ +env = require('test_run') +vclock_diff = require('fast_replica').vclock_diff +test_run = env.new() + +-- prepare master +box.schema.user.grant('guest', 'replication') +_ = box.schema.space.create('loc', {is_local=true}) +_ = box.schema.space.create('temp', {temporary=true}) +_ = box.schema.space.create('test') +_ = box.space.loc:create_index('pk') +_ = box.space.temp:create_index('pk') +_ = box.space.test:create_index('pk') +box.space.test:insert{1} + +test_run:cmd('create server replica_anon with rpl_master=default, script="replication/anon.lua"') +test_run:cmd('start server replica_anon') +test_run:cmd('switch replica_anon') + +box.info.status +box.info.id +box.info.lsn +test_run:wait_upstream(1, {status='follow'}) + +-- Temporary spaces are accessible as read / write. +for i = 1,10 do box.space.temp:insert{i} end +box.space.temp:select{} + +box.info.lsn + +-- Same for local spaces. +for i = 1,10 do box.space.loc:insert{i} end +box.space.loc:select{} + +-- Replica-local changes are accounted for in 0 vclock component. +box.info.lsn +box.info.vclock[0] + +-- Replica is read-only. +box.cfg.read_only +box.cfg{read_only=false} + +box.space.test:insert{2} + +box.space.loc:drop() +box.space.loc:truncate() + +test_run:cmd('switch default') + +-- Replica isn't visible on master. +#box.info.replication + +test_run:cmd('switch replica_anon') + +-- Promote anonymous replica. +box.cfg{replication_anon=false} +-- Cannot switch back after becoming "normal". +box.cfg{replication_anon=true} + +box.info.id +#box.info.replication +test_run:wait_upstream(1, {status='follow'}) +box.info.replication.downstream + +old_lsn = box.info.vclock[2] or 0 + +-- Now read_only can be turned off. +box.cfg{read_only=false} +box.space.test:insert{2} +-- New changes are tracked under freshly assigned id. +box.info.vclock[2] == old_lsn + 1 + +test_run:cmd('switch default') + +-- Other instances may replicate from a previously-anonymous one. +test_run:cmd("set variable repl_source to 'replica_anon.listen'") +box.cfg{replication=repl_source} +#box.info.replication +test_run:wait_upstream(2, {status='follow'}) +test_run:wait_downstream(2, {status='follow'}) +#box.info.vclock + +-- cleanup +box.cfg{replication=""} +test_run:cmd('stop server replica_anon with cleanup=1') +box.space.test:drop() +box.space.temp:drop() +box.space.loc:drop() +box.schema.user.revoke('guest', 'replication') +test_run:cleanup_cluster() diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index cd686a0e2..429c64df3 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -1,4 +1,5 @@ { + "anon.test.lua": {}, "misc.test.lua": {}, "once.test.lua": {}, "on_replace.test.lua": {}, -- 2.20.1 (Apple Git-117)