From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp62.i.mail.ru (smtp62.i.mail.ru [217.69.128.42]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 19E4C46970E for ; Mon, 16 Dec 2019 16:28:35 +0300 (MSK) Content-Type: text/plain; charset=utf-8 Mime-Version: 1.0 (Mac OS X Mail 12.4 \(3445.104.11\)) From: Serge Petrenko In-Reply-To: Date: Mon, 16 Dec 2019 16:28:33 +0300 Content-Transfer-Encoding: quoted-printable Message-Id: <02504A05-83EF-4641-8DD4-51C6DD863BC7@tarantool.org> References: Subject: Re: [Tarantool-patches] [PATCH 5/5] replication: introduce anonymous replica. List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Georgy Kirichenko Cc: tarantool-patches@dev.tarantool.org Hi! A minor fixup which gets rid of occasional test failures. Sorry for not noticing this right away. diff --git a/src/box/box.cc b/src/box/box.cc index 4c39e4971..efffa654f 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -792,11 +792,10 @@ box_set_replication_anon(void) if (applier =3D=3D NULL) continue; replica_clear_applier(replica); - replica->applier_sync_state =3D = APPLIER_DISCONNECTED; applier_stop(applier); - applier_start(applier); + replica->applier_sync_state =3D = APPLIER_DISCONNECTED; replica_set_applier(replica, applier); - applier_resume_to_state(applier, = APPLIER_CONNECTED, TIMEOUT_INFINITY); + applier_start(applier); } /* Choose a master to send register request to. */ struct replica *master =3D replicaset_leader(); -- Serge Petrenko sergepetrenko@tarantool.org > 15 =D0=B4=D0=B5=D0=BA. 2019 =D0=B3., =D0=B2 23:58, sergepetrenko = =D0=BD=D0=B0=D0=BF=D0=B8=D1=81=D0=B0=D0=BB(=D0= =B0): >=20 > This commit introduces anonymous replicas. Such replicas do not = pollute > _cluster table (they can only be read-only and have a zero id in = return). > An anonymous replica can be promoted to a normal one if needed. >=20 > Closes #3186 >=20 > @TarantoolBot document > Title: Document anonymous replica >=20 > There is a new type of replica in tarantool, anonymous one. Anonymous > replica is read-only (but you still can write to temporary and > replica-local spaces), and it isn't present in _cluster table. >=20 > Since anonymous replica isn't registered in _cluster table, there is = no > limitation for anonymous replica count in a replicaset. You can have = as > many of them as you want. >=20 > In order to make a replica anonymous, you have to pass an option > `replication_anon=3Dtrue` to `box.cfg`. You also have to set = 'read_only' > to true. >=20 > Let's go through anonymous replica bootstrap. > Suppose we have a master configured with > ``` > box.cfg{listen=3D3301} > ``` > And created a local space called "loc" > ``` > box.schema.space.create('loc', {is_local=3Dtrue}) > box.space.loc:create_index("pk") > ``` > Now, to configure an anonymous replica, we have to issue `box.cfg`, > as usual. > ``` > box.cfg{replication_anon=3Dtrue, read_only=3Dtrue, replication=3D3301} > ``` > As mentioned above, `replication_anon` may be set to true only = together > with `read_only` > The instance will fetch masters snapshot and proceed to following its > changes. It will not receive an id so its id will remain zero. > ``` > tarantool> box.info.id > --- > - 0 > ... > ``` > ``` > tarantool> box.info.replication > --- > - 1: > id: 1 > uuid: 3c84f8d9-e34d-4651-969c-3d0ed214c60f > lsn: 4 > upstream: > status: follow > idle: 0.6912029999985 > peer: > lag: 0.00014615058898926 > ... > ``` > Now we can use the replica. > For example, we may do inserts into the local space: > ``` > tarantool> for i =3D 1,10 do >> box.space.loc:insert{i} >> end > --- > ... > ``` > Note, that while the instance is anonymous, it will increase the 0-th > component of its vclock: > ``` > tarantool> box.info.vclock > --- > - {0: 10, 1: 4} > ... > ``` > Let's now promote the replica to a normal one: > ``` > tarantool> box.cfg{replication_anon=3Dfalse} > 2019-12-13 20:34:37.423 [71329] main I> assigned id 2 to replica = 6a9c2ed2-b9e1-4c57-a0e8-51a46def7661 > 2019-12-13 20:34:37.424 [71329] main/102/interactive I> set = 'replication_anon' configuration option to false > --- > ... >=20 > tarantool> 2019-12-13 20:34:37.424 [71329] main/117/applier/ I> = subscribed > 2019-12-13 20:34:37.424 [71329] main/117/applier/ I> remote vclock {1: = 5} local vclock {0: 10, 1: 5} > 2019-12-13 20:34:37.425 [71329] main/118/applierw/ C> leaving orphan = mode > ``` > The replica just received id 2. We can make it read-write now. > ``` > box.cfg{read_only=3Dfalse} > 2019-12-13 20:35:46.392 [71329] main/102/interactive I> set = 'read_only' configuration option to false > --- > ... >=20 > tarantool> box.schema.space.create('test') > --- > - engine: memtx > before_replace: 'function: 0x01109f9dc8' > on_replace: 'function: 0x01109f9d90' > ck_constraint: [] > field_count: 0 > temporary: false > index: [] > is_local: false > enabled: false > name: test > id: 513 > - created > ... >=20 > tarantool> box.info.vclock > --- > - {0: 10, 1: 5, 2: 2} > ... > ``` > Now replica tracks its changes in 2nd vclock component, as expected. > It can also become replication master from now on. >=20 > Side notes: > * You cannot replicate from an anonymous instance. > * To promote an anonymous instance to a regular one, > you first have to start it as anonymous, ano only > then issue `box.cfg{replication_anon=3Dfalse}` > * In order for the deanonymization to succeed, the > instance must replicate from some read-write instance, > otherwise noone will be able to add it to _cluster table. > --- > src/box/applier.cc | 58 ++++++- > src/box/applier.h | 4 + > src/box/box.cc | 267 ++++++++++++++++++++++++++++++-- > src/box/box.h | 11 +- > src/box/iproto.cc | 16 +- > src/box/iproto_constants.h | 6 + > src/box/lua/cfg.cc | 14 +- > src/box/lua/info.c | 4 +- > src/box/lua/load_cfg.lua | 4 + > src/box/recovery.cc | 7 +- > src/box/relay.cc | 32 +++- > src/box/replication.cc | 41 ++++- > src/box/replication.h | 24 +++ > src/box/wal.c | 4 + > src/box/xrow.c | 47 +++++- > src/box/xrow.h | 68 ++++++-- > test/app-tap/init_script.result | 49 +++--- > test/box/admin.result | 2 + > test/box/cfg.result | 4 + > test/replication/anon.lua | 13 ++ > test/replication/anon.result | 259 +++++++++++++++++++++++++++++++ > test/replication/anon.test.lua | 89 +++++++++++ > test/replication/suite.cfg | 1 + > 23 files changed, 957 insertions(+), 67 deletions(-) > create mode 100644 test/replication/anon.lua > create mode 100644 test/replication/anon.result > create mode 100644 test/replication/anon.test.lua >=20 > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 357369025..1445dd4d1 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -452,6 +452,23 @@ applier_do_fetch_snapshot(struct applier = *applier) > return row_count; > } >=20 > +static void > +applier_fetch_snapshot(struct applier *applier) > +{ > + /* Send FETCH SNAPSHOT request */ > + struct ev_io *coio =3D &applier->io; > + struct xrow_header row; > + > + memset(&row, 0, sizeof(row)); > + row.type =3D IPROTO_FETCH_SNAPSHOT; > + coio_write_xrow(coio, &row); > + > + applier_set_state(applier, APPLIER_FETCH_SNAPSHOT); > + applier_do_fetch_snapshot(applier); > + applier_set_state(applier, APPLIER_FETCHED_SNAPSHOT); > + applier_set_state(applier, APPLIER_READY); > +} > + > static uint64_t > applier_do_register(struct applier *applier, uint64_t row_count) > { > @@ -497,6 +514,28 @@ applier_do_register(struct applier *applier, = uint64_t row_count) > return row_count; > } >=20 > +static void > +applier_register(struct applier *applier) > +{ > + /* Send REGISTER request */ > + struct ev_io *coio =3D &applier->io; > + struct xrow_header row; > + > + memset(&row, 0, sizeof(row)); > + /* > + * Send this instance's current vclock together > + * with REGISTER request. > + */ > + xrow_encode_register(&row, &INSTANCE_UUID, box_vclock); > + row.type =3D IPROTO_REGISTER; > + coio_write_xrow(coio, &row); > + > + applier_set_state(applier, APPLIER_REGISTER); > + applier_do_register(applier, 0); > + applier_set_state(applier, APPLIER_REGISTERED); > + applier_set_state(applier, APPLIER_READY); > +} > + > /** > * Execute and process JOIN request (bootstrap the instance). > */ > @@ -828,7 +867,7 @@ applier_subscribe(struct applier *applier) > vclock_create(&vclock); > vclock_copy(&vclock, &replicaset.vclock); > xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, > - &vclock); > + &vclock, replication_anon); > coio_write_xrow(coio, &row); >=20 > /* Read SUBSCRIBE response */ > @@ -996,10 +1035,25 @@ applier_f(va_list ap) > if (tt_uuid_is_nil(&REPLICASET_UUID)) { > /* > * Execute JOIN if this is a bootstrap. > + * In case of anonymous replication, = don't > + * join but just fetch master's = snapshot. > + * > * The join will pause the applier > * until WAL is created. > */ > - applier_join(applier); > + if (replication_anon) { > + applier_fetch_snapshot(applier); > + } else { > + applier_join(applier); > + } > + } > + if (applier->version_id >=3D version_id(1, 7, 0) = && > + !replication_anon && instance_id =3D=3D = REPLICA_ID_NIL) { > + /* anonymity was turned off while we = were > + * fetching a snapshot or following = master. > + * Register the replica now. > + */ > + applier_register(applier); > } > applier_subscribe(applier); > /* > diff --git a/src/box/applier.h b/src/box/applier.h > index b406e6aaf..c9fdc2955 100644 > --- a/src/box/applier.h > +++ b/src/box/applier.h > @@ -61,6 +61,10 @@ enum { APPLIER_SOURCE_MAXLEN =3D 1024 }; /* enough = to fit URI with passwords */ > _(APPLIER_STOPPED, 10) \ > _(APPLIER_DISCONNECTED, 11) \ > _(APPLIER_LOADING, 12) \ > + _(APPLIER_FETCH_SNAPSHOT, 13) \ > + _(APPLIER_FETCHED_SNAPSHOT, 14) \ > + _(APPLIER_REGISTER, 15) \ > + _(APPLIER_REGISTERED, 16) \ >=20 > /** States for the applier */ > ENUM(applier_state, applier_STATE); > diff --git a/src/box/box.cc b/src/box/box.cc > index 981a5bac1..4c39e4971 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -223,9 +223,13 @@ error: > return -1; > } >=20 > +static bool > +box_check_ro(void); > + > void > -box_set_ro(bool ro) > +box_set_ro() > { > + bool ro =3D box_check_ro(); > if (ro =3D=3D is_ro) > return; /* nothing to do */ > if (ro) > @@ -486,6 +490,32 @@ box_check_uuid(struct tt_uuid *uuid, const char = *name) > } > } >=20 > +static bool > +box_check_ro() > +{ > + bool ro =3D cfg_geti("read_only") !=3D 0; > + bool anon =3D cfg_geti("replication_anon") !=3D 0; > + if (anon && !ro) { > + tnt_raise(ClientError, ER_CFG, "read_only", > + "the value may be set to false only when " > + "replication_anon is false"); > + } > + return ro; > +} > + > +static bool > +box_check_replication_anon(void) > +{ > + bool anon =3D cfg_geti("replication_anon") !=3D 0; > + bool ro =3D cfg_geti("read_only") !=3D 0; > + if (anon && !ro) { > + tnt_raise(ClientError, ER_CFG, "replication_anon", > + "the value may be set to true only when " > + "the instance is read-only"); > + } > + return anon; > +} > + > static void > box_check_instance_uuid(struct tt_uuid *uuid) > { > @@ -740,6 +770,65 @@ box_set_replication_skip_conflict(void) > replication_skip_conflict =3D = cfg_geti("replication_skip_conflict"); > } >=20 > +void > +box_set_replication_anon(void) > +{ > + bool anon =3D box_check_replication_anon(); > + if (anon =3D=3D replication_anon) > + return; > + > + if (!anon) { > + /* Turn anonymous instance into a normal one. */ > + replication_anon =3D anon; > + /* > + * Reset all appliers. This will interrupt > + * anonymous follow they're in and also update > + * corresponding instance ballots so that we can > + * use the latest info when choosing a replica to > + * register on. > + */ > + replicaset_foreach(replica) { > + struct applier *applier =3D replica->applier; > + if (applier =3D=3D NULL) > + continue; > + replica_clear_applier(replica); > + replica->applier_sync_state =3D = APPLIER_DISCONNECTED; > + applier_stop(applier); > + applier_start(applier); > + replica_set_applier(replica, applier); > + applier_resume_to_state(applier, = APPLIER_CONNECTED, TIMEOUT_INFINITY); > + } > + /* Choose a master to send register request to. */ > + struct replica *master =3D replicaset_leader(); > + assert(master !=3D NULL && master->applier !=3D NULL); > + struct applier *master_applier =3D master->applier; > + applier_resume_to_state(master_applier, = APPLIER_REGISTER, TIMEOUT_INFINITY); > + applier_resume_to_state(master_applier, = APPLIER_REGISTERED, TIMEOUT_INFINITY); > + applier_resume_to_state(master_applier, APPLIER_READY, = TIMEOUT_INFINITY); > + applier_resume(master_applier); > + /** > + * Restart other appliers to > + * resend non-anonymous subscribe. > + */ > + replicaset_foreach(replica) { > + if (replica =3D=3D master || replica->applier =3D=3D= NULL) > + continue; > + applier_resume(replica->applier); > + } > + } else if (!is_box_configured) { > + replication_anon =3D anon; > + } else { > + /* > + * It is forbidden to turn a normal replica into > + * an anonymous one. > + */ > + tnt_raise(ClientError, ER_CFG, "replication_anon", > + "cannot be turned on after bootstrap" > + " has finished"); > + } > + > +} > + > void > box_listen(void) > { > @@ -1379,6 +1468,132 @@ box_process_auth(struct auth_request *request, = const char *salt) > authenticate(user, len, salt, request->scramble); > } >=20 > +void > +box_process_fetch_snapshot(struct ev_io *io, struct xrow_header = *header) > +{ > + > + assert(header->type =3D=3D IPROTO_FETCH_SNAPSHOT); > + > + /* Check that bootstrap has been finished */ > + if (!is_box_configured) > + tnt_raise(ClientError, ER_LOADING); > + > + /* Check permissions */ > + access_check_universe_xc(PRIV_R); > + > + /* Forbid replication with disabled WAL */ > + if (wal_mode() =3D=3D WAL_NONE) { > + tnt_raise(ClientError, ER_UNSUPPORTED, "Replication", > + "wal_mode =3D 'none'"); > + } > + > + say_info("sending current read-view to replica at %s", = sio_socketname(io->fd)); > + > + /* Send the snapshot data to the instance. */ > + struct vclock start_vclock; > + relay_initial_join(io->fd, header->sync, &start_vclock); > + say_info("read-view sent."); > + > + /* Remember master's vclock after the last request */ > + struct vclock stop_vclock; > + vclock_copy(&stop_vclock, &replicaset.vclock); > + > + /* Send end of snapshot data marker */ > + struct xrow_header row; > + xrow_encode_vclock_xc(&row, &stop_vclock); > + row.sync =3D header->sync; > + coio_write_xrow(io, &row); > +} > + > +void > +box_process_register(struct ev_io *io, struct xrow_header *header) > +{ > + assert(header->type =3D=3D IPROTO_REGISTER); > + > + struct tt_uuid instance_uuid =3D uuid_nil; > + struct vclock vclock; > + xrow_decode_register_xc(header, &instance_uuid, &vclock); > + > + if (!is_box_configured) > + tnt_raise(ClientError, ER_LOADING); > + > + if (tt_uuid_is_equal(&instance_uuid, &INSTANCE_UUID)) > + tnt_raise(ClientError, ER_CONNECTION_TO_SELF); > + > + /* Forbid replication from an anonymous instance. */ > + if (replication_anon) { > + tnt_raise(ClientError, ER_UNSUPPORTED, "Replication", > + "replicating from an anonymous instance."); > + } > + > + access_check_universe_xc(PRIV_R); > + /* We only get register requests from anonymous instances. */ > + struct replica *replica =3D replica_by_uuid(&instance_uuid); > + assert(replica =3D=3D NULL || replica->id =3D=3D = REPLICA_ID_NIL); > + /* See box_process_join() */ > + box_check_writable_xc(); > + struct space *space =3D space_cache_find_xc(BOX_CLUSTER_ID); > + access_check_space_xc(space, PRIV_W); > + > + /* Forbid replication with disabled WAL */ > + if (wal_mode() =3D=3D WAL_NONE) { > + tnt_raise(ClientError, ER_UNSUPPORTED, "Replication", > + "wal_mode =3D 'none'"); > + } > + > + /* > + * Register the replica as a WAL consumer so that > + * it can resume FINAL JOIN where INITIAL JOIN ends. > + */ > + struct gc_consumer *gc =3D = gc_consumer_register(&replicaset.vclock, > + "replica %s", = tt_uuid_str(&instance_uuid)); > + if (gc =3D=3D NULL) > + diag_raise(); > + auto gc_guard =3D make_scoped_guard([&] { = gc_consumer_unregister(gc); }); > + > + say_info("registering replica %s at %s", > + tt_uuid_str(&instance_uuid), sio_socketname(io->fd)); > + > + struct vclock start_vclock; > + vclock_copy(&start_vclock, &replicaset.vclock); > + > + /** > + * Call the server-side hook which stores the replica uuid > + * in _cluster space. > + */ > + box_on_join(&instance_uuid); > + > + ERROR_INJECT_YIELD(ERRINJ_REPLICA_JOIN_DELAY); > + > + /* Remember master's vclock after the last request */ > + struct vclock stop_vclock; > + vclock_copy(&stop_vclock, &replicaset.vclock); > + > + /* > + * Feed replica with WALs in range (start_vclock, stop_vclock) > + * so that it gets its registration. > + */ > + relay_final_join(io->fd, header->sync, &start_vclock, = &stop_vclock); > + say_info("final data sent."); > + > + struct xrow_header row; > + /* Send end of WAL stream marker */ > + xrow_encode_vclock_xc(&row, &replicaset.vclock); > + row.sync =3D header->sync; > + coio_write_xrow(io, &row); > + > + /* > + * Advance the WAL consumer state to the position where > + * FINAL JOIN ended and assign it to the replica. > + */ > + gc_consumer_advance(gc, &stop_vclock); > + replica =3D replica_by_uuid(&instance_uuid); > + if (replica->gc !=3D NULL) > + gc_consumer_unregister(replica->gc); > + replica->gc =3D gc; > + gc_guard.is_active =3D false; > +} > + > void > box_process_join(struct ev_io *io, struct xrow_header *header) > { > @@ -1438,6 +1653,12 @@ box_process_join(struct ev_io *io, struct = xrow_header *header) > if (tt_uuid_is_equal(&instance_uuid, &INSTANCE_UUID)) > tnt_raise(ClientError, ER_CONNECTION_TO_SELF); >=20 > + /* Forbid replication from an anonymous instance. */ > + if (replication_anon) { > + tnt_raise(ClientError, ER_UNSUPPORTED, "Replication", > + "replicating from an anonymous instance."); > + } > + > /* Check permissions */ > access_check_universe_xc(PRIV_R); >=20 > @@ -1533,27 +1754,39 @@ box_process_subscribe(struct ev_io *io, struct = xrow_header *header) > if (!is_box_configured) > tnt_raise(ClientError, ER_LOADING); >=20 > + > struct tt_uuid replica_uuid =3D uuid_nil; > struct vclock replica_clock; > uint32_t replica_version_id; > vclock_create(&replica_clock); > + bool anon; > xrow_decode_subscribe_xc(header, NULL, &replica_uuid, > - &replica_clock, &replica_version_id); > + &replica_clock, &replica_version_id, = &anon); >=20 > /* Forbid connection to itself */ > if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID)) > tnt_raise(ClientError, ER_CONNECTION_TO_SELF); >=20 > + /* Forbid replication from an anonymous instance. */ > + if (replication_anon) { > + tnt_raise(ClientError, ER_UNSUPPORTED, "Replication", > + "replicating from an anonymous instance."); > + } > + > /* Check permissions */ > access_check_universe_xc(PRIV_R); >=20 > /* Check replica uuid */ > struct replica *replica =3D replica_by_uuid(&replica_uuid); > - if (replica =3D=3D NULL || replica->id =3D=3D REPLICA_ID_NIL) { > + > + if (!anon && (replica =3D=3D NULL || replica->id =3D=3D = REPLICA_ID_NIL)) { > tnt_raise(ClientError, ER_UNKNOWN_REPLICA, > tt_uuid_str(&replica_uuid), > tt_uuid_str(&REPLICASET_UUID)); > } > + if (replica =3D=3D NULL) { > + replica =3D replicaset_add_anon(&replica_uuid); > + } >=20 > /* Don't allow multiple relays for the same replica */ > if (relay_get_state(replica->relay) =3D=3D RELAY_FOLLOW) { > @@ -1774,13 +2007,16 @@ bootstrap_from_master(struct replica *master) > */ >=20 > assert(!tt_uuid_is_nil(&INSTANCE_UUID)); > - applier_resume_to_state(applier, APPLIER_INITIAL_JOIN, = TIMEOUT_INFINITY); > - > + enum applier_state wait_state =3D replication_anon ? = APPLIER_FETCH_SNAPSHOT : > + = APPLIER_INITIAL_JOIN; > + applier_resume_to_state(applier, wait_state, TIMEOUT_INFINITY); > /* > * Process initial data (snapshot or dirty disk data). > */ > engine_begin_initial_recovery_xc(NULL); > - applier_resume_to_state(applier, APPLIER_FINAL_JOIN, = TIMEOUT_INFINITY); > + wait_state =3D replication_anon ? APPLIER_FETCHED_SNAPSHOT : > + APPLIER_FINAL_JOIN; > + applier_resume_to_state(applier, wait_state, TIMEOUT_INFINITY); >=20 > /* > * Process final data (WALs). > @@ -1790,8 +2026,10 @@ bootstrap_from_master(struct replica *master) > recovery_journal_create(&journal, &replicaset.vclock); > journal_set(&journal.base); >=20 > - applier_resume_to_state(applier, APPLIER_JOINED, = TIMEOUT_INFINITY); > - > + if (!replication_anon) { > + applier_resume_to_state(applier, APPLIER_JOINED, > + TIMEOUT_INFINITY); > + } > /* Finalize the new replica */ > engine_end_recovery_xc(); >=20 > @@ -2106,6 +2344,7 @@ box_cfg_xc(void) > box_set_replication_sync_lag(); > box_set_replication_sync_timeout(); > box_set_replication_skip_conflict(); > + box_set_replication_anon(); >=20 > struct gc_checkpoint *checkpoint =3D gc_last_checkpoint(); >=20 > @@ -2136,14 +2375,20 @@ box_cfg_xc(void) > } > fiber_gc(); >=20 > - /* Check for correct registration of the instance in _cluster */ > - { > - struct replica *self =3D = replica_by_uuid(&INSTANCE_UUID); > + /* > + * Check for correct registration of the instance in _cluster > + * The instance won't exist in _cluster space if it is an > + * anonymous replica, add it manually. > + */ > + struct replica *self =3D replica_by_uuid(&INSTANCE_UUID); > + if (!replication_anon) { > if (self =3D=3D NULL || self->id =3D=3D REPLICA_ID_NIL) = { > tnt_raise(ClientError, ER_UNKNOWN_REPLICA, > tt_uuid_str(&INSTANCE_UUID), > tt_uuid_str(&REPLICASET_UUID)); > } > + } else if (self =3D=3D NULL) { > + replicaset_add_anon(&INSTANCE_UUID); > } >=20 > rmean_cleanup(rmean_box); > diff --git a/src/box/box.h b/src/box/box.h > index ccd527bd5..e4088d6b6 100644 > --- a/src/box/box.h > +++ b/src/box/box.h > @@ -100,7 +100,7 @@ void > box_atfork(void); >=20 > void > -box_set_ro(bool ro); > +box_set_ro(); >=20 > bool > box_is_ro(void); > @@ -179,6 +179,14 @@ box_reset_stat(void); > void > box_process_auth(struct auth_request *request, const char *salt); >=20 > +/** Send current read view to the replica. */ > +void > +box_process_fetch_snapshot(struct ev_io *io, struct xrow_header = *header); > + > +/** Register a replica */ > +void > +box_process_register(struct ev_io *io, struct xrow_header *header); > + > /** > * Join a replica. > * > @@ -234,6 +242,7 @@ void box_set_replication_connect_quorum(void); > void box_set_replication_sync_lag(void); > void box_set_replication_sync_timeout(void); > void box_set_replication_skip_conflict(void); > +void box_set_replication_anon(void); > void box_set_net_msg_max(void); >=20 > extern "C" { > diff --git a/src/box/iproto.cc b/src/box/iproto.cc > index c39b8e7bf..9e6bd2dd7 100644 > --- a/src/box/iproto.cc > +++ b/src/box/iproto.cc > @@ -1162,7 +1162,7 @@ static void > net_send_error(struct cmsg *msg); >=20 > static void > -tx_process_join_subscribe(struct cmsg *msg); > +tx_process_replication(struct cmsg *msg); >=20 > static void > net_end_join(struct cmsg *msg); > @@ -1212,12 +1212,12 @@ static const struct cmsg_hop = *dml_route[IPROTO_TYPE_STAT_MAX] =3D { > }; >=20 > static const struct cmsg_hop join_route[] =3D { > - { tx_process_join_subscribe, &net_pipe }, > + { tx_process_replication, &net_pipe }, > { net_end_join, NULL }, > }; >=20 > static const struct cmsg_hop subscribe_route[] =3D { > - { tx_process_join_subscribe, &net_pipe }, > + { tx_process_replication, &net_pipe }, > { net_end_subscribe, NULL }, > }; >=20 > @@ -1272,6 +1272,8 @@ iproto_msg_decode(struct iproto_msg *msg, const = char **pos, const char *reqend, > cmsg_init(&msg->base, misc_route); > break; > case IPROTO_JOIN: > + case IPROTO_FETCH_SNAPSHOT: > + case IPROTO_REGISTER: > cmsg_init(&msg->base, join_route); > *stop_input =3D true; > break; > @@ -1752,7 +1754,7 @@ error: > } >=20 > static void > -tx_process_join_subscribe(struct cmsg *m) > +tx_process_replication(struct cmsg *m) > { > struct iproto_msg *msg =3D tx_accept_msg(m); > struct iproto_connection *con =3D msg->connection; > @@ -1768,6 +1770,12 @@ tx_process_join_subscribe(struct cmsg *m) > */ > box_process_join(&io, &msg->header); > break; > + case IPROTO_FETCH_SNAPSHOT: > + box_process_fetch_snapshot(&io, &msg->header); > + break; > + case IPROTO_REGISTER: > + box_process_register(&io, &msg->header); > + break; > case IPROTO_SUBSCRIBE: > /* > * Subscribe never returns - unless there > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h > index 5e8a7d483..cc8dd7cd7 100644 > --- a/src/box/iproto_constants.h > +++ b/src/box/iproto_constants.h > @@ -120,6 +120,8 @@ enum iproto_key { > * } > */ > IPROTO_SQL_INFO =3D 0x42, > + /* Leave a gap between SQL keys and additional request keys */ > + IPROTO_REPLICA_ANON =3D 0x50, > IPROTO_KEY_MAX > }; >=20 > @@ -216,6 +218,10 @@ enum iproto_type { > IPROTO_VOTE_DEPRECATED =3D 67, > /** Vote request command for master election */ > IPROTO_VOTE =3D 68, > + /** Anonymous replication FETCH SNAPSHOT */ > + IPROTO_FETCH_SNAPSHOT =3D 69, > + /** REGISTER request to leave anonymous replication */ > + IPROTO_REGISTER =3D 70, >=20 > /** Vinyl run info stored in .index file */ > VY_INDEX_RUN_INFO =3D 100, > diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc > index 4884ce013..f59470774 100644 > --- a/src/box/lua/cfg.cc > +++ b/src/box/lua/cfg.cc > @@ -190,7 +190,7 @@ static int > lbox_cfg_set_read_only(struct lua_State *L) > { > try { > - box_set_ro(cfg_geti("read_only") !=3D 0); > + box_set_ro(); > } catch (Exception *) { > luaT_error(L); > } > @@ -338,6 +338,17 @@ lbox_cfg_set_replication_sync_timeout(struct = lua_State *L) > return 0; > } >=20 > +static int > +lbox_cfg_set_replication_anon(struct lua_State *L) > +{ > + try { > + box_set_replication_anon(); > + } catch (Exception *) { > + luaT_error(L); > + } > + return 0; > +} > + > static int > lbox_cfg_set_replication_skip_conflict(struct lua_State *L) > { > @@ -377,6 +388,7 @@ box_lua_cfg_init(struct lua_State *L) > {"cfg_set_replication_sync_lag", = lbox_cfg_set_replication_sync_lag}, > {"cfg_set_replication_sync_timeout", = lbox_cfg_set_replication_sync_timeout}, > {"cfg_set_replication_skip_conflict", = lbox_cfg_set_replication_skip_conflict}, > + {"cfg_set_replication_anon", = lbox_cfg_set_replication_anon}, > {"cfg_set_net_msg_max", lbox_cfg_set_net_msg_max}, > {NULL, NULL} > }; > diff --git a/src/box/lua/info.c b/src/box/lua/info.c > index e029e0e17..b5909a878 100644 > --- a/src/box/lua/info.c > +++ b/src/box/lua/info.c > @@ -223,7 +223,7 @@ lbox_info_id(struct lua_State *L) > * at box.info.status. > */ > struct replica *self =3D replica_by_uuid(&INSTANCE_UUID); > - if (self !=3D NULL && self->id !=3D REPLICA_ID_NIL) { > + if (self !=3D NULL && (self->id !=3D REPLICA_ID_NIL || = replication_anon)) { > lua_pushinteger(L, self->id); > } else { > luaL_pushnull(L); > @@ -243,7 +243,7 @@ lbox_info_lsn(struct lua_State *L) > { > /* See comments in lbox_info_id */ > struct replica *self =3D replica_by_uuid(&INSTANCE_UUID); > - if (self !=3D NULL && self->id !=3D REPLICA_ID_NIL) { > + if (self !=3D NULL && (self->id !=3D REPLICA_ID_NIL || = replication_anon)) { > luaL_pushint64(L, vclock_get(box_vclock, self->id)); > } else { > luaL_pushint64(L, -1); > diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua > index 85617c8f0..9dee75b7d 100644 > --- a/src/box/lua/load_cfg.lua > +++ b/src/box/lua/load_cfg.lua > @@ -77,6 +77,7 @@ local default_cfg =3D { > replication_connect_timeout =3D 30, > replication_connect_quorum =3D nil, -- connect all > replication_skip_conflict =3D false, > + replication_anon =3D false, > feedback_enabled =3D true, > feedback_host =3D "https://feedback.tarantool.io", > feedback_interval =3D 3600, > @@ -140,6 +141,7 @@ local template_cfg =3D { > replication_connect_timeout =3D 'number', > replication_connect_quorum =3D 'number', > replication_skip_conflict =3D 'boolean', > + replication_anon =3D 'boolean', > feedback_enabled =3D 'boolean', > feedback_host =3D 'string', > feedback_interval =3D 'number', > @@ -247,6 +249,7 @@ local dynamic_cfg =3D { > replication_sync_lag =3D private.cfg_set_replication_sync_lag, > replication_sync_timeout =3D = private.cfg_set_replication_sync_timeout, > replication_skip_conflict =3D = private.cfg_set_replication_skip_conflict, > + replication_anon =3D private.cfg_set_replication_anon, > instance_uuid =3D check_instance_uuid, > replicaset_uuid =3D check_replicaset_uuid, > net_msg_max =3D private.cfg_set_net_msg_max, > @@ -301,6 +304,7 @@ local dynamic_cfg_skip_at_load =3D { > replication_sync_lag =3D true, > replication_sync_timeout =3D true, > replication_skip_conflict =3D true, > + replication_anon =3D true, > wal_dir_rescan_delay =3D true, > custom_proc_title =3D true, > force_recovery =3D true, > diff --git a/src/box/recovery.cc b/src/box/recovery.cc > index d122d618a..64aa467b1 100644 > --- a/src/box/recovery.cc > +++ b/src/box/recovery.cc > @@ -262,9 +262,12 @@ recover_xlog(struct recovery *r, struct xstream = *stream, >=20 > /* > * All rows in xlog files have an assigned > - * replica id. > + * replica id. The only exception is anonymous > + * replica, which has a zero instance id. > + * In this case the only rows from such an instance > + * can be for the local spaces. > */ > - assert(row.replica_id !=3D 0); > + assert(row.replica_id !=3D 0 || row.group_id =3D=3D = GROUP_LOCAL); > /* > * We can promote the vclock either before or > * after xstream_write(): it only makes any impact > diff --git a/src/box/relay.cc b/src/box/relay.cc > index e849fcf4f..14644716d 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -569,11 +569,17 @@ relay_subscribe_f(va_list ap) > cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe, > &relay->relay_pipe, NULL, NULL, cbus_process); >=20 > - /* Setup garbage collection trigger. */ > + /* > + * Setup garbage collection trigger. > + * Not needed for anonymous replicas, since they > + * aren't registered with gc at all. > + */ > struct trigger on_close_log =3D { > RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, = NULL > }; > - trigger_add(&r->on_close_log, &on_close_log); > + if (!relay->replica->anon) { > + trigger_add(&r->on_close_log, &on_close_log); > + } >=20 > /* Setup WAL watcher for sending new rows to the replica. */ > wal_set_watcher(&relay->wal_watcher, relay->endpoint.name, > @@ -652,7 +658,9 @@ relay_subscribe_f(va_list ap) > say_crit("exiting the relay loop"); >=20 > /* Clear garbage collector trigger and WAL watcher. */ > - trigger_clear(&on_close_log); > + if (!relay->replica->anon) { > + trigger_clear(&on_close_log); > + } > wal_clear_watcher(&relay->wal_watcher, cbus_process); >=20 > /* Join ack reader fiber. */ > @@ -673,7 +681,7 @@ void > relay_subscribe(struct replica *replica, int fd, uint64_t sync, > struct vclock *replica_clock, uint32_t = replica_version_id) > { > - assert(replica->id !=3D REPLICA_ID_NIL); > + assert(replica->anon || replica->id !=3D REPLICA_ID_NIL); > struct relay *relay =3D replica->relay; > assert(relay->state !=3D RELAY_FOLLOW); > /* > @@ -681,7 +689,7 @@ relay_subscribe(struct replica *replica, int fd, = uint64_t sync, > * unless it has already been registered by initial > * join. > */ > - if (replica->gc =3D=3D NULL) { > + if (replica->gc =3D=3D NULL && !replica->anon) { > replica->gc =3D gc_consumer_register(replica_clock, = "replica %s", > = tt_uuid_str(&replica->uuid)); > if (replica->gc =3D=3D NULL) > @@ -691,7 +699,11 @@ relay_subscribe(struct replica *replica, int fd, = uint64_t sync, > relay_start(relay, fd, sync, relay_send_row); > auto relay_guard =3D make_scoped_guard([=3D] { > relay_stop(relay); > - replica_on_relay_stop(replica); > + if (replica->anon) { > + replica_anon_delete(replica); > + } else { > + replica_on_relay_stop(replica); > + } > }); >=20 > vclock_copy(&relay->local_vclock_at_subscribe, = &replicaset.vclock); > @@ -741,6 +753,14 @@ relay_send_row(struct xstream *stream, struct = xrow_header *packet) > { > struct relay *relay =3D container_of(stream, struct relay, = stream); > assert(iproto_type_is_dml(packet->type)); > + /* > + * Replica-local requests generated while replica was > + * anonymous have a zero instance id. Just skip all > + * these rows. > + */ > + if (packet->replica_id =3D=3D REPLICA_ID_NIL) { > + return; > + } > /* > * Transform replica local requests to IPROTO_NOP so as to > * promote vclock on the replica without actually modifying > diff --git a/src/box/replication.cc b/src/box/replication.cc > index 81f19aa07..ce707811a 100644 > --- a/src/box/replication.cc > +++ b/src/box/replication.cc > @@ -53,6 +53,7 @@ int replication_connect_quorum =3D = REPLICATION_CONNECT_QUORUM_ALL; > double replication_sync_lag =3D 10.0; /* seconds */ > double replication_sync_timeout =3D 300.0; /* seconds */ > bool replication_skip_conflict =3D false; > +bool replication_anon =3D false; >=20 > struct replicaset replicaset; >=20 > @@ -172,6 +173,7 @@ replica_new(void) > diag_raise(); > } > replica->id =3D 0; > + replica->anon =3D false; > replica->uuid =3D uuid_nil; > replica->applier =3D NULL; > replica->gc =3D NULL; > @@ -209,6 +211,19 @@ replicaset_add(uint32_t replica_id, const struct = tt_uuid *replica_uuid) > return replica; > } >=20 > +struct replica * > +replicaset_add_anon(const struct tt_uuid *replica_uuid) > +{ > + assert(!tt_uuid_is_nil(replica_uuid)); > + assert(replica_by_uuid(replica_uuid) =3D=3D NULL); > + > + struct replica *replica =3D replica_new(); > + replica->uuid =3D *replica_uuid; > + replica_hash_insert(&replicaset.hash, replica); > + replica->anon =3D true; > + return replica; > +} > + > void > replica_set_id(struct replica *replica, uint32_t replica_id) > { > @@ -220,11 +235,21 @@ replica_set_id(struct replica *replica, uint32_t = replica_id) > /* Assign local replica id */ > assert(instance_id =3D=3D REPLICA_ID_NIL); > instance_id =3D replica_id; > + } else if (replica->anon) { > + /* > + * Set replica gc on its transition from > + * anonymous to a normal one. > + */ > + assert(replica->gc =3D=3D NULL); > + replica->gc =3D gc_consumer_register(&replicaset.vclock, > + "replica %s", > + = tt_uuid_str(&replica->uuid)); > } > replicaset.replica_by_id[replica_id] =3D replica; >=20 > say_info("assigned id %d to replica %s", > replica->id, tt_uuid_str(&replica->uuid)); > + replica->anon =3D false; > } >=20 > void > @@ -268,7 +293,7 @@ replica_clear_id(struct replica *replica) > } > } >=20 > -static void > +void > replica_set_applier(struct replica *replica, struct applier *applier) > { > assert(replica->applier =3D=3D NULL); > @@ -277,7 +302,7 @@ replica_set_applier(struct replica *replica, = struct applier *applier) > &replica->on_applier_state); > } >=20 > -static void > +void > replica_clear_applier(struct replica *replica) > { > assert(replica->applier !=3D NULL); > @@ -880,6 +905,18 @@ replica_on_relay_stop(struct replica *replica) > } > } >=20 > +void > +replica_anon_delete(struct replica *replica) > +{ > + assert(replica->gc =3D=3D NULL); > + assert(replica->id =3D=3D REPLICA_ID_NIL); > + /* We do not replicate from anonymous replicas */ > + assert(replica->applier =3D=3D NULL); > + replica_hash_remove(&replicaset.hash, replica); > + replica_delete(replica); > +} > + > + > struct replica * > replicaset_first(void) > { > diff --git a/src/box/replication.h b/src/box/replication.h > index 470420592..978a09d41 100644 > --- a/src/box/replication.h > +++ b/src/box/replication.h > @@ -137,6 +137,12 @@ extern double replication_sync_timeout; > */ > extern bool replication_skip_conflict; >=20 > +/** > + * Whether this replica will be anonymous or not, e.g. be preset > + * in _cluster table and have a non-zero id. > + */ > +extern bool replication_anon; > + > /** > * Wait for the given period of time before trying to reconnect > * to a master. > @@ -265,6 +271,12 @@ struct replica { > * registered in the _cluster space yet. > */ > uint32_t id; > + /** > + * Whether this is an anonymous replica, e.g. a read-only > + * replica that doesn't have an id and isn't present in > + * _cluster table. > + */ > + bool anon; > /** Applier fiber. */ > struct applier *applier; > /** Relay thread. */ > @@ -343,12 +355,21 @@ replica_set_id(struct replica *replica, uint32_t = id); > void > replica_clear_id(struct replica *replica); >=20 > +void > +replica_clear_applier(struct replica *replica); > + > +void > +replica_set_applier(struct replica * replica, struct applier * = applier); > + > /** > * Unregister \a relay from the \a replica. > */ > void > replica_on_relay_stop(struct replica *replica); >=20 > +void > +replica_anon_delete(struct replica *replica); > + > #if defined(__cplusplus) > } /* extern "C" */ >=20 > @@ -364,6 +385,9 @@ replica_check_id(uint32_t replica_id); > struct replica * > replicaset_add(uint32_t replica_id, const struct tt_uuid = *instance_uuid); >=20 > +struct replica * > +replicaset_add_anon(const struct tt_uuid *replica_uuid); > + > /** > * Try to connect appliers to remote peers and receive UUID. > * Appliers that did not connect will connect asynchronously. > diff --git a/src/box/wal.c b/src/box/wal.c > index 5e2c13e0e..2b238b743 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -930,6 +930,10 @@ wal_assign_lsn(struct vclock *vclock_diff, struct = vclock *base, > if ((*row)->replica_id =3D=3D 0) { > (*row)->lsn =3D vclock_inc(vclock_diff, = instance_id) + > vclock_get(base, instance_id); > + /* > + * Note, an anonymous replica signs local > + * rows whith a zero instance id. > + */ > (*row)->replica_id =3D instance_id; > /* Use lsn of the first local row as transaction = id. */ > tsn =3D tsn =3D=3D 0 ? (*row)->lsn : tsn; > diff --git a/src/box/xrow.c b/src/box/xrow.c > index 18bf08971..37a565bcb 100644 > --- a/src/box/xrow.c > +++ b/src/box/xrow.c > @@ -1148,11 +1148,40 @@ err: > return -1; > } >=20 > +int > +xrow_encode_register(struct xrow_header *row, > + const struct tt_uuid *instance_uuid, > + const struct vclock *vclock) > +{ > + memset(row, 0, sizeof(*row)); > + size_t size =3D mp_sizeof_map(2) + > + mp_sizeof_uint(IPROTO_INSTANCE_UUID) + > + mp_sizeof_str(UUID_STR_LEN) + > + mp_sizeof_uint(IPROTO_VCLOCK) + = mp_sizeof_vclock(vclock); > + char *buf =3D (char *) region_alloc(&fiber()->gc, size); > + if (buf =3D=3D NULL) { > + diag_set(OutOfMemory, size, "region_alloc", "buf"); > + return -1; > + } > + char *data =3D buf; > + data =3D mp_encode_map(data, 2); > + data =3D mp_encode_uint(data, IPROTO_INSTANCE_UUID); > + data =3D xrow_encode_uuid(data, instance_uuid); > + data =3D mp_encode_uint(data, IPROTO_VCLOCK); > + data =3D mp_encode_vclock(data, vclock); > + assert(data <=3D buf + size); > + row->body[0].iov_base =3D buf; > + row->body[0].iov_len =3D (data - buf); > + row->bodycnt =3D 1; > + row->type =3D IPROTO_REGISTER; > + return 0; > +} > + > int > xrow_encode_subscribe(struct xrow_header *row, > const struct tt_uuid *replicaset_uuid, > const struct tt_uuid *instance_uuid, > - const struct vclock *vclock) > + const struct vclock *vclock, bool anon) > { > memset(row, 0, sizeof(*row)); > size_t size =3D XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock); > @@ -1162,7 +1191,7 @@ xrow_encode_subscribe(struct xrow_header *row, > return -1; > } > char *data =3D buf; > - data =3D mp_encode_map(data, 4); > + data =3D mp_encode_map(data, 5); > data =3D mp_encode_uint(data, IPROTO_CLUSTER_UUID); > data =3D xrow_encode_uuid(data, replicaset_uuid); > data =3D mp_encode_uint(data, IPROTO_INSTANCE_UUID); > @@ -1171,6 +1200,8 @@ xrow_encode_subscribe(struct xrow_header *row, > data =3D mp_encode_vclock(data, vclock); > data =3D mp_encode_uint(data, IPROTO_SERVER_VERSION); > data =3D mp_encode_uint(data, tarantool_version_id()); > + data =3D mp_encode_uint(data, IPROTO_REPLICA_ANON); > + data =3D mp_encode_bool(data, anon); > assert(data <=3D buf + size); > row->body[0].iov_base =3D buf; > row->body[0].iov_len =3D (data - buf); > @@ -1182,7 +1213,7 @@ xrow_encode_subscribe(struct xrow_header *row, > int > xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid = *replicaset_uuid, > struct tt_uuid *instance_uuid, struct vclock = *vclock, > - uint32_t *version_id) > + uint32_t *version_id, bool *anon) > { > if (row->bodycnt =3D=3D 0) { > diag_set(ClientError, ER_INVALID_MSGPACK, "request = body"); > @@ -1245,6 +1276,16 @@ xrow_decode_subscribe(struct xrow_header *row, = struct tt_uuid *replicaset_uuid, > } > *version_id =3D mp_decode_uint(&d); > break; > + case IPROTO_REPLICA_ANON: > + if (anon =3D=3D NULL) > + goto skip; > + if (mp_typeof(*d) !=3D MP_BOOL) { > + xrow_on_decode_err(data, end, = ER_INVALID_MSGPACK, > + "invalid REPLICA_ANON = flag"); > + return -1; > + } > + *anon =3D mp_decode_bool(&d); > + break; > default: skip: > mp_next(&d); /* value */ > } > diff --git a/src/box/xrow.h b/src/box/xrow.h > index 60def2d3c..b8da3a0d0 100644 > --- a/src/box/xrow.h > +++ b/src/box/xrow.h > @@ -301,12 +301,27 @@ xrow_decode_ballot(struct xrow_header *row, = struct ballot *ballot); > void > xrow_encode_vote(struct xrow_header *row); >=20 > +/** > + * Encode REGISTER command. > + * @param[out] Row. > + * @param instance_uuid Instance uuid. > + * @param vclock Replication clock. > + * > + * @retval 0 Success. > + * @retval -1 Memory error. > + */ > +int > +xrow_encode_register(struct xrow_header *row, > + const struct tt_uuid *instance_uuid, > + const struct vclock *vclock); > + > /** > * Encode SUBSCRIBE command. > * @param[out] Row. > * @param replicaset_uuid Replica set uuid. > * @param instance_uuid Instance uuid. > * @param vclock Replication clock. > + * @param anon Whether it is an anonymous subscribe request or not. > * > * @retval 0 Success. > * @retval -1 Memory error. > @@ -315,7 +330,7 @@ int > xrow_encode_subscribe(struct xrow_header *row, > const struct tt_uuid *replicaset_uuid, > const struct tt_uuid *instance_uuid, > - const struct vclock *vclock); > + const struct vclock *vclock, bool anon); >=20 > /** > * Decode SUBSCRIBE command. > @@ -324,6 +339,7 @@ xrow_encode_subscribe(struct xrow_header *row, > * @param[out] instance_uuid. > * @param[out] vclock. > * @param[out] version_id. > + * @param[out] anon Whether it is an anonymous subscribe. > * > * @retval 0 Success. > * @retval -1 Memory or format error. > @@ -331,7 +347,7 @@ xrow_encode_subscribe(struct xrow_header *row, > int > xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid = *replicaset_uuid, > struct tt_uuid *instance_uuid, struct vclock = *vclock, > - uint32_t *version_id); > + uint32_t *version_id, bool *anon); >=20 > /** > * Encode JOIN command. > @@ -355,7 +371,22 @@ xrow_encode_join(struct xrow_header *row, const = struct tt_uuid *instance_uuid); > static inline int > xrow_decode_join(struct xrow_header *row, struct tt_uuid = *instance_uuid) > { > - return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, = NULL); > + return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, = NULL, NULL); > +} > + > +/** > + * Decode REGISTER request. > + * @param row Row to decode. > + * @param[out] instance_uuid Instance uuid. > + * @param[out] vclock Instance vclock. > + * @retval 0 Success. > + * @retval -1 Memory or format error. > + */ > +static inline int > +xrow_decode_register(struct xrow_header *row, struct tt_uuid = *instance_uuid, > + struct vclock *vclock) > +{ > + return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, = NULL, NULL); > } >=20 > /** > @@ -380,7 +411,7 @@ xrow_encode_vclock(struct xrow_header *row, const = struct vclock *vclock); > static inline int > xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock) > { > - return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL); > + return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, = NULL); > } >=20 > /** > @@ -411,7 +442,7 @@ xrow_decode_subscribe_response(struct xrow_header = *row, > struct tt_uuid *replicaset_uuid, > struct vclock *vclock) > { > - return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, = NULL); > + return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, = NULL, NULL); > } >=20 > /** > @@ -769,15 +800,25 @@ xrow_decode_ballot_xc(struct xrow_header *row, = struct ballot *ballot) > diag_raise(); > } >=20 > +/** @copydoc xrow_encode_register. */ > +static inline void > +xrow_encode_register_xc(struct xrow_header *row, > + const struct tt_uuid *instance_uuid, > + const struct vclock *vclock) > +{ > + if (xrow_encode_register(row, instance_uuid, vclock) !=3D 0) > + diag_raise(); > +} > + > /** @copydoc xrow_encode_subscribe. */ > static inline void > xrow_encode_subscribe_xc(struct xrow_header *row, > const struct tt_uuid *replicaset_uuid, > const struct tt_uuid *instance_uuid, > - const struct vclock *vclock) > + const struct vclock *vclock, bool anon) > { > if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid, > - vclock) !=3D 0) > + vclock, anon) !=3D 0) > diag_raise(); > } >=20 > @@ -786,10 +827,10 @@ static inline void > xrow_decode_subscribe_xc(struct xrow_header *row, > struct tt_uuid *replicaset_uuid, > struct tt_uuid *instance_uuid, struct vclock = *vclock, > - uint32_t *replica_version_id) > + uint32_t *replica_version_id, bool *anon) > { > if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid, > - vclock, replica_version_id) !=3D 0) > + vclock, replica_version_id, anon) !=3D = 0) > diag_raise(); > } >=20 > @@ -810,6 +851,15 @@ xrow_decode_join_xc(struct xrow_header *row, = struct tt_uuid *instance_uuid) > diag_raise(); > } >=20 > +/** @copydoc xrow_decode_register. */ > +static inline void > +xrow_decode_register_xc(struct xrow_header *row, struct tt_uuid = *instance_uuid, > + struct vclock *vclock) > +{ > + if (xrow_decode_register(row, instance_uuid, vclock) !=3D 0) > + diag_raise(); > +} > + > /** @copydoc xrow_encode_vclock. */ > static inline void > xrow_encode_vclock_xc(struct xrow_header *row, const struct vclock = *vclock) > diff --git a/test/app-tap/init_script.result = b/test/app-tap/init_script.result > index 799297ba0..7aec1d715 100644 > --- a/test/app-tap/init_script.result > +++ b/test/app-tap/init_script.result > @@ -25,30 +25,31 @@ box.cfg > 20 pid_file:box.pid > 21 read_only:false > 22 readahead:16320 > -23 replication_connect_timeout:30 > -24 replication_skip_conflict:false > -25 replication_sync_lag:10 > -26 replication_sync_timeout:300 > -27 replication_timeout:1 > -28 slab_alloc_factor:1.05 > -29 strip_core:true > -30 too_long_threshold:0.5 > -31 vinyl_bloom_fpr:0.05 > -32 vinyl_cache:134217728 > -33 vinyl_dir:. > -34 vinyl_max_tuple_size:1048576 > -35 vinyl_memory:134217728 > -36 vinyl_page_size:8192 > -37 vinyl_read_threads:1 > -38 vinyl_run_count_per_level:2 > -39 vinyl_run_size_ratio:3.5 > -40 vinyl_timeout:60 > -41 vinyl_write_threads:4 > -42 wal_dir:. > -43 wal_dir_rescan_delay:2 > -44 wal_max_size:268435456 > -45 wal_mode:write > -46 worker_pool_threads:4 > +23 replication_anon:false > +24 replication_connect_timeout:30 > +25 replication_skip_conflict:false > +26 replication_sync_lag:10 > +27 replication_sync_timeout:300 > +28 replication_timeout:1 > +29 slab_alloc_factor:1.05 > +30 strip_core:true > +31 too_long_threshold:0.5 > +32 vinyl_bloom_fpr:0.05 > +33 vinyl_cache:134217728 > +34 vinyl_dir:. > +35 vinyl_max_tuple_size:1048576 > +36 vinyl_memory:134217728 > +37 vinyl_page_size:8192 > +38 vinyl_read_threads:1 > +39 vinyl_run_count_per_level:2 > +40 vinyl_run_size_ratio:3.5 > +41 vinyl_timeout:60 > +42 vinyl_write_threads:4 > +43 wal_dir:. > +44 wal_dir_rescan_delay:2 > +45 wal_max_size:268435456 > +46 wal_mode:write > +47 worker_pool_threads:4 > -- > -- Test insert from detached fiber > -- > diff --git a/test/box/admin.result b/test/box/admin.result > index 6126f3a97..5a03a979a 100644 > --- a/test/box/admin.result > +++ b/test/box/admin.result > @@ -71,6 +71,8 @@ cfg_filter(box.cfg) > - false > - - readahead > - 16320 > + - - replication_anon > + - false > - - replication_connect_timeout > - 30 > - - replication_skip_conflict > diff --git a/test/box/cfg.result b/test/box/cfg.result > index 5370bb870..d6ce6b621 100644 > --- a/test/box/cfg.result > +++ b/test/box/cfg.result > @@ -59,6 +59,8 @@ cfg_filter(box.cfg) > | - false > | - - readahead > | - 16320 > + | - - replication_anon > + | - false > | - - replication_connect_timeout > | - 30 > | - - replication_skip_conflict > @@ -158,6 +160,8 @@ cfg_filter(box.cfg) > | - false > | - - readahead > | - 16320 > + | - - replication_anon > + | - false > | - - replication_connect_timeout > | - 30 > | - - replication_skip_conflict > diff --git a/test/replication/anon.lua b/test/replication/anon.lua > new file mode 100644 > index 000000000..2e7ee9983 > --- /dev/null > +++ b/test/replication/anon.lua > @@ -0,0 +1,13 @@ > +#!/usr/bin/env tarantool > + > +box.cfg({ > + listen =3D os.getenv("LISTEN"), > + replication =3D os.getenv("MASTER"), > + memtx_memory =3D 107374182, > + replication_timeout =3D 0.1, > + replication_connect_timeout =3D 0.5, > + read_only=3Dtrue, > + replication_anon=3Dtrue, > +}) > + > +require('console').listen(os.getenv('ADMIN')) > diff --git a/test/replication/anon.result = b/test/replication/anon.result > new file mode 100644 > index 000000000..df84484b2 > --- /dev/null > +++ b/test/replication/anon.result > @@ -0,0 +1,259 @@ > +-- test-run result file version 2 > +env =3D require('test_run') > + | --- > + | ... > +vclock_diff =3D require('fast_replica').vclock_diff > + | --- > + | ... > +test_run =3D env.new() > + | --- > + | ... > + > +-- prepare master > +box.schema.user.grant('guest', 'replication') > + | --- > + | ... > +_ =3D box.schema.space.create('loc', {is_local=3Dtrue}) > + | --- > + | ... > +_ =3D box.schema.space.create('temp', {temporary=3Dtrue}) > + | --- > + | ... > +_ =3D box.schema.space.create('test') > + | --- > + | ... > +_ =3D box.space.loc:create_index('pk') > + | --- > + | ... > +_ =3D box.space.temp:create_index('pk') > + | --- > + | ... > +_ =3D box.space.test:create_index('pk') > + | --- > + | ... > +box.space.test:insert{1} > + | --- > + | - [1] > + | ... > + > +test_run:cmd('create server replica_anon with rpl_master=3Ddefault, = script=3D"replication/anon.lua"') > + | --- > + | - true > + | ... > +test_run:cmd('start server replica_anon') > + | --- > + | - true > + | ... > +test_run:cmd('switch replica_anon') > + | --- > + | - true > + | ... > + > +box.info.status > + | --- > + | - running > + | ... > +box.info.id > + | --- > + | - 0 > + | ... > +box.info.lsn > + | --- > + | - 0 > + | ... > +test_run:wait_upstream(1, {status=3D'follow'}) > + | --- > + | - true > + | ... > + > +-- Temporary spaces are accessible as read / write. > +for i =3D 1,10 do box.space.temp:insert{i} end > + | --- > + | ... > +box.space.temp:select{} > + | --- > + | - - [1] > + | - [2] > + | - [3] > + | - [4] > + | - [5] > + | - [6] > + | - [7] > + | - [8] > + | - [9] > + | - [10] > + | ... > + > +box.info.lsn > + | --- > + | - 0 > + | ... > + > +-- Same for local spaces. > +for i =3D 1,10 do box.space.loc:insert{i} end > + | --- > + | ... > +box.space.loc:select{} > + | --- > + | - - [1] > + | - [2] > + | - [3] > + | - [4] > + | - [5] > + | - [6] > + | - [7] > + | - [8] > + | - [9] > + | - [10] > + | ... > + > +-- Replica-local changes are accounted for in 0 vclock component. > +box.info.lsn > + | --- > + | - 10 > + | ... > +box.info.vclock[0] > + | --- > + | - 10 > + | ... > + > +-- Replica is read-only. > +box.cfg.read_only > + | --- > + | - true > + | ... > +box.cfg{read_only=3Dfalse} > + | --- > + | - error: 'Incorrect value for option ''read_only'': the value may = be set to false > + | only when replication_anon is false' > + | ... > + > +box.space.test:insert{2} > + | --- > + | - error: Can't modify data because this instance is in read-only = mode. > + | ... > + > +box.space.loc:drop() > + | --- > + | - error: Can't modify data because this instance is in read-only = mode. > + | ... > +box.space.loc:truncate() > + | --- > + | - error: Can't modify data because this instance is in read-only = mode. > + | ... > + > +test_run:cmd('switch default') > + | --- > + | - true > + | ... > + > +-- Replica isn't visible on master. > +#box.info.replication > + | --- > + | - 1 > + | ... > + > +test_run:cmd('switch replica_anon') > + | --- > + | - true > + | ... > + > +-- Promote anonymous replica. > +box.cfg{replication_anon=3Dfalse} > + | --- > + | ... > +-- Cannot switch back after becoming "normal". > +box.cfg{replication_anon=3Dtrue} > + | --- > + | - error: 'Incorrect value for option ''replication_anon'': cannot = be turned on after > + | bootstrap has finished' > + | ... > + > +box.info.id > + | --- > + | - 2 > + | ... > +#box.info.replication > + | --- > + | - 2 > + | ... > +test_run:wait_upstream(1, {status=3D'follow'}) > + | --- > + | - true > + | ... > +box.info.replication.downstream > + | --- > + | - null > + | ... > + > +old_lsn =3D box.info.vclock[2] or 0 > + | --- > + | ... > + > +-- Now read_only can be turned off. > +box.cfg{read_only=3Dfalse} > + | --- > + | ... > +box.space.test:insert{2} > + | --- > + | - [2] > + | ... > +-- New changes are tracked under freshly assigned id. > +box.info.vclock[2] =3D=3D old_lsn + 1 > + | --- > + | - true > + | ... > + > +test_run:cmd('switch default') > + | --- > + | - true > + | ... > + > +-- Other instances may replicate from a previously-anonymous one. > +test_run:cmd("set variable repl_source to 'replica_anon.listen'") > + | --- > + | - true > + | ... > +box.cfg{replication=3Drepl_source} > + | --- > + | ... > +#box.info.replication > + | --- > + | - 2 > + | ... > +test_run:wait_upstream(2, {status=3D'follow'}) > + | --- > + | - true > + | ... > +test_run:wait_downstream(2, {status=3D'follow'}) > + | --- > + | - true > + | ... > +#box.info.vclock > + | --- > + | - 2 > + | ... > + > +-- cleanup > +box.cfg{replication=3D""} > + | --- > + | ... > +test_run:cmd('stop server replica_anon with cleanup=3D1') > + | --- > + | - true > + | ... > +box.space.test:drop() > + | --- > + | ... > +box.space.temp:drop() > + | --- > + | ... > +box.space.loc:drop() > + | --- > + | ... > +box.schema.user.revoke('guest', 'replication') > + | --- > + | ... > +test_run:cleanup_cluster() > + | --- > + | ... > diff --git a/test/replication/anon.test.lua = b/test/replication/anon.test.lua > new file mode 100644 > index 000000000..f151f9e8c > --- /dev/null > +++ b/test/replication/anon.test.lua > @@ -0,0 +1,89 @@ > +env =3D require('test_run') > +vclock_diff =3D require('fast_replica').vclock_diff > +test_run =3D env.new() > + > +-- prepare master > +box.schema.user.grant('guest', 'replication') > +_ =3D box.schema.space.create('loc', {is_local=3Dtrue}) > +_ =3D box.schema.space.create('temp', {temporary=3Dtrue}) > +_ =3D box.schema.space.create('test') > +_ =3D box.space.loc:create_index('pk') > +_ =3D box.space.temp:create_index('pk') > +_ =3D box.space.test:create_index('pk') > +box.space.test:insert{1} > + > +test_run:cmd('create server replica_anon with rpl_master=3Ddefault, = script=3D"replication/anon.lua"') > +test_run:cmd('start server replica_anon') > +test_run:cmd('switch replica_anon') > + > +box.info.status > +box.info.id > +box.info.lsn > +test_run:wait_upstream(1, {status=3D'follow'}) > + > +-- Temporary spaces are accessible as read / write. > +for i =3D 1,10 do box.space.temp:insert{i} end > +box.space.temp:select{} > + > +box.info.lsn > + > +-- Same for local spaces. > +for i =3D 1,10 do box.space.loc:insert{i} end > +box.space.loc:select{} > + > +-- Replica-local changes are accounted for in 0 vclock component. > +box.info.lsn > +box.info.vclock[0] > + > +-- Replica is read-only. > +box.cfg.read_only > +box.cfg{read_only=3Dfalse} > + > +box.space.test:insert{2} > + > +box.space.loc:drop() > +box.space.loc:truncate() > + > +test_run:cmd('switch default') > + > +-- Replica isn't visible on master. > +#box.info.replication > + > +test_run:cmd('switch replica_anon') > + > +-- Promote anonymous replica. > +box.cfg{replication_anon=3Dfalse} > +-- Cannot switch back after becoming "normal". > +box.cfg{replication_anon=3Dtrue} > + > +box.info.id > +#box.info.replication > +test_run:wait_upstream(1, {status=3D'follow'}) > +box.info.replication.downstream > + > +old_lsn =3D box.info.vclock[2] or 0 > + > +-- Now read_only can be turned off. > +box.cfg{read_only=3Dfalse} > +box.space.test:insert{2} > +-- New changes are tracked under freshly assigned id. > +box.info.vclock[2] =3D=3D old_lsn + 1 > + > +test_run:cmd('switch default') > + > +-- Other instances may replicate from a previously-anonymous one. > +test_run:cmd("set variable repl_source to 'replica_anon.listen'") > +box.cfg{replication=3Drepl_source} > +#box.info.replication > +test_run:wait_upstream(2, {status=3D'follow'}) > +test_run:wait_downstream(2, {status=3D'follow'}) > +#box.info.vclock > + > +-- cleanup > +box.cfg{replication=3D""} > +test_run:cmd('stop server replica_anon with cleanup=3D1') > +box.space.test:drop() > +box.space.temp:drop() > +box.space.loc:drop() > +box.schema.user.revoke('guest', 'replication') > +test_run:cleanup_cluster() > diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg > index cd686a0e2..429c64df3 100644 > --- a/test/replication/suite.cfg > +++ b/test/replication/suite.cfg > @@ -1,4 +1,5 @@ > { > + "anon.test.lua": {}, > "misc.test.lua": {}, > "once.test.lua": {}, > "on_replace.test.lua": {}, > --=20 > 2.20.1 (Apple Git-117) >=20