[Tarantool-patches] [PATCH 5/5] replication: introduce anonymous replica.

Serge Petrenko sergepetrenko at tarantool.org
Mon Dec 16 16:28:33 MSK 2019


Hi!
A minor fixup which gets rid of occasional test failures.
Sorry for not noticing this right away.

diff --git a/src/box/box.cc b/src/box/box.cc
index 4c39e4971..efffa654f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -792,11 +792,10 @@ box_set_replication_anon(void)
                        if (applier == NULL)
                                continue;
                        replica_clear_applier(replica);
-                       replica->applier_sync_state = APPLIER_DISCONNECTED;
                        applier_stop(applier);
-                       applier_start(applier);
+                       replica->applier_sync_state = APPLIER_DISCONNECTED;
                        replica_set_applier(replica, applier);
-                       applier_resume_to_state(applier, APPLIER_CONNECTED, TIMEOUT_INFINITY);
+                       applier_start(applier);
                }
                /* Choose a master to send register request to. */
                struct replica *master = replicaset_leader();

--
Serge Petrenko
sergepetrenko at tarantool.org




> 15 дек. 2019 г., в 23:58, sergepetrenko <sergepetrenko at tarantool.org> написал(а):
> 
> This commit introduces anonymous replicas. Such replicas do not pollute
> _cluster table (they can only be read-only and have a zero id in return).
> An anonymous replica can be promoted to a normal one if needed.
> 
> Closes #3186
> 
> @TarantoolBot document
> Title: Document anonymous replica
> 
> There is a new type of replica in tarantool, anonymous one. Anonymous
> replica is read-only (but you still can write to temporary and
> replica-local spaces), and it isn't present in _cluster table.
> 
> Since anonymous replica isn't registered in _cluster table, there is no
> limitation for anonymous replica count in a replicaset. You can have as
> many of them as you want.
> 
> In order to make a replica anonymous, you have to pass an option
> `replication_anon=true` to `box.cfg`. You also have to set 'read_only'
> to true.
> 
> Let's go through anonymous replica bootstrap.
> Suppose we have a master configured with
> ```
> box.cfg{listen=3301}
> ```
> And created a local space called "loc"
> ```
> box.schema.space.create('loc', {is_local=true})
> box.space.loc:create_index("pk")
> ```
> Now, to configure an anonymous replica, we have to issue `box.cfg`,
> as usual.
> ```
> box.cfg{replication_anon=true, read_only=true, replication=3301}
> ```
> As mentioned above, `replication_anon` may be set to true only together
> with `read_only`
> The instance will fetch masters snapshot and proceed to following its
> changes. It will not receive an id so its id will remain zero.
> ```
> tarantool> box.info.id
> ---
> - 0
> ...
> ```
> ```
> tarantool> box.info.replication
> ---
> - 1:
>    id: 1
>    uuid: 3c84f8d9-e34d-4651-969c-3d0ed214c60f
>    lsn: 4
>    upstream:
>      status: follow
>      idle: 0.6912029999985
>      peer:
>      lag: 0.00014615058898926
> ...
> ```
> Now we can use the replica.
> For example, we may do inserts into the local space:
> ```
> tarantool> for i = 1,10 do
>> box.space.loc:insert{i}
>> end
> ---
> ...
> ```
> Note, that while the instance is anonymous, it will increase the 0-th
> component of its vclock:
> ```
> tarantool> box.info.vclock
> ---
> - {0: 10, 1: 4}
> ...
> ```
> Let's now promote the replica to a normal one:
> ```
> tarantool> box.cfg{replication_anon=false}
> 2019-12-13 20:34:37.423 [71329] main I> assigned id 2 to replica 6a9c2ed2-b9e1-4c57-a0e8-51a46def7661
> 2019-12-13 20:34:37.424 [71329] main/102/interactive I> set 'replication_anon' configuration option to false
> ---
> ...
> 
> tarantool> 2019-12-13 20:34:37.424 [71329] main/117/applier/ I> subscribed
> 2019-12-13 20:34:37.424 [71329] main/117/applier/ I> remote vclock {1: 5} local vclock {0: 10, 1: 5}
> 2019-12-13 20:34:37.425 [71329] main/118/applierw/ C> leaving orphan mode
> ```
> The replica just received id 2. We can make it read-write now.
> ```
> box.cfg{read_only=false}
> 2019-12-13 20:35:46.392 [71329] main/102/interactive I> set 'read_only' configuration option to false
> ---
> ...
> 
> tarantool> box.schema.space.create('test')
> ---
> - engine: memtx
>  before_replace: 'function: 0x01109f9dc8'
>  on_replace: 'function: 0x01109f9d90'
>  ck_constraint: []
>  field_count: 0
>  temporary: false
>  index: []
>  is_local: false
>  enabled: false
>  name: test
>  id: 513
> - created
> ...
> 
> tarantool> box.info.vclock
> ---
> - {0: 10, 1: 5, 2: 2}
> ...
> ```
> Now replica tracks its changes in 2nd vclock component, as expected.
> It can also become replication master from now on.
> 
> Side notes:
>  * You cannot replicate from an anonymous instance.
>  * To promote an anonymous instance to a regular one,
>    you first have to start it as anonymous, ano only
>    then issue `box.cfg{replication_anon=false}`
>  * In order for the deanonymization to succeed, the
>    instance must replicate from some read-write instance,
>    otherwise noone will be able to add it to _cluster table.
> ---
> src/box/applier.cc              |  58 ++++++-
> src/box/applier.h               |   4 +
> src/box/box.cc                  | 267 ++++++++++++++++++++++++++++++--
> src/box/box.h                   |  11 +-
> src/box/iproto.cc               |  16 +-
> src/box/iproto_constants.h      |   6 +
> src/box/lua/cfg.cc              |  14 +-
> src/box/lua/info.c              |   4 +-
> src/box/lua/load_cfg.lua        |   4 +
> src/box/recovery.cc             |   7 +-
> src/box/relay.cc                |  32 +++-
> src/box/replication.cc          |  41 ++++-
> src/box/replication.h           |  24 +++
> src/box/wal.c                   |   4 +
> src/box/xrow.c                  |  47 +++++-
> src/box/xrow.h                  |  68 ++++++--
> test/app-tap/init_script.result |  49 +++---
> test/box/admin.result           |   2 +
> test/box/cfg.result             |   4 +
> test/replication/anon.lua       |  13 ++
> test/replication/anon.result    | 259 +++++++++++++++++++++++++++++++
> test/replication/anon.test.lua  |  89 +++++++++++
> test/replication/suite.cfg      |   1 +
> 23 files changed, 957 insertions(+), 67 deletions(-)
> create mode 100644 test/replication/anon.lua
> create mode 100644 test/replication/anon.result
> create mode 100644 test/replication/anon.test.lua
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 357369025..1445dd4d1 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -452,6 +452,23 @@ applier_do_fetch_snapshot(struct applier *applier)
> 	return row_count;
> }
> 
> +static void
> +applier_fetch_snapshot(struct applier *applier)
> +{
> +	/* Send FETCH SNAPSHOT request */
> +	struct ev_io *coio = &applier->io;
> +	struct xrow_header row;
> +
> +	memset(&row, 0, sizeof(row));
> +	row.type = IPROTO_FETCH_SNAPSHOT;
> +	coio_write_xrow(coio, &row);
> +
> +	applier_set_state(applier, APPLIER_FETCH_SNAPSHOT);
> +	applier_do_fetch_snapshot(applier);
> +	applier_set_state(applier, APPLIER_FETCHED_SNAPSHOT);
> +	applier_set_state(applier, APPLIER_READY);
> +}
> +
> static uint64_t
> applier_do_register(struct applier *applier, uint64_t row_count)
> {
> @@ -497,6 +514,28 @@ applier_do_register(struct applier *applier, uint64_t row_count)
> 	return row_count;
> }
> 
> +static void
> +applier_register(struct applier *applier)
> +{
> +	/* Send REGISTER request */
> +	struct ev_io *coio = &applier->io;
> +	struct xrow_header row;
> +
> +	memset(&row, 0, sizeof(row));
> +	/*
> +	 * Send this instance's current vclock together
> +	 * with REGISTER request.
> +	 */
> +	xrow_encode_register(&row, &INSTANCE_UUID, box_vclock);
> +	row.type = IPROTO_REGISTER;
> +	coio_write_xrow(coio, &row);
> +
> +	applier_set_state(applier, APPLIER_REGISTER);
> +	applier_do_register(applier, 0);
> +	applier_set_state(applier, APPLIER_REGISTERED);
> +	applier_set_state(applier, APPLIER_READY);
> +}
> +
> /**
>  * Execute and process JOIN request (bootstrap the instance).
>  */
> @@ -828,7 +867,7 @@ applier_subscribe(struct applier *applier)
> 	vclock_create(&vclock);
> 	vclock_copy(&vclock, &replicaset.vclock);
> 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
> -				 &vclock);
> +				 &vclock, replication_anon);
> 	coio_write_xrow(coio, &row);
> 
> 	/* Read SUBSCRIBE response */
> @@ -996,10 +1035,25 @@ applier_f(va_list ap)
> 			if (tt_uuid_is_nil(&REPLICASET_UUID)) {
> 				/*
> 				 * Execute JOIN if this is a bootstrap.
> +				 * In case of anonymous replication, don't
> +				 * join but just fetch master's snapshot.
> +				 *
> 				 * The join will pause the applier
> 				 * until WAL is created.
> 				 */
> -				applier_join(applier);
> +				if (replication_anon) {
> +					applier_fetch_snapshot(applier);
> +				} else {
> +					applier_join(applier);
> +				}
> +			}
> +			if (applier->version_id >= version_id(1, 7, 0) &&
> +			    !replication_anon && instance_id == REPLICA_ID_NIL) {
> +				/* anonymity was turned off while we were
> +				 * fetching a snapshot or following master.
> +				 * Register the replica now.
> +				 */
> +				applier_register(applier);
> 			}
> 			applier_subscribe(applier);
> 			/*
> diff --git a/src/box/applier.h b/src/box/applier.h
> index b406e6aaf..c9fdc2955 100644
> --- a/src/box/applier.h
> +++ b/src/box/applier.h
> @@ -61,6 +61,10 @@ enum { APPLIER_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */
> 	_(APPLIER_STOPPED, 10)                                       \
> 	_(APPLIER_DISCONNECTED, 11)                                  \
> 	_(APPLIER_LOADING, 12)                                       \
> +	_(APPLIER_FETCH_SNAPSHOT, 13)                                \
> +	_(APPLIER_FETCHED_SNAPSHOT, 14)                              \
> +	_(APPLIER_REGISTER, 15)                                      \
> +	_(APPLIER_REGISTERED, 16)                                    \
> 
> /** States for the applier */
> ENUM(applier_state, applier_STATE);
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 981a5bac1..4c39e4971 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -223,9 +223,13 @@ error:
> 	return -1;
> }
> 
> +static bool
> +box_check_ro(void);
> +
> void
> -box_set_ro(bool ro)
> +box_set_ro()
> {
> +	bool ro = box_check_ro();
> 	if (ro == is_ro)
> 		return; /* nothing to do */
> 	if (ro)
> @@ -486,6 +490,32 @@ box_check_uuid(struct tt_uuid *uuid, const char *name)
> 	}
> }
> 
> +static bool
> +box_check_ro()
> +{
> +	bool ro = cfg_geti("read_only") != 0;
> +	bool anon = cfg_geti("replication_anon") != 0;
> +	if (anon && !ro) {
> +		tnt_raise(ClientError, ER_CFG, "read_only",
> +			  "the value may be set to false only when "
> +			  "replication_anon is false");
> +	}
> +	return ro;
> +}
> +
> +static bool
> +box_check_replication_anon(void)
> +{
> +	bool anon = cfg_geti("replication_anon") != 0;
> +	bool ro = cfg_geti("read_only") != 0;
> +	if (anon && !ro) {
> +		tnt_raise(ClientError, ER_CFG, "replication_anon",
> +			  "the value may be set to true only when "
> +			  "the instance is read-only");
> +	}
> +	return anon;
> +}
> +
> static void
> box_check_instance_uuid(struct tt_uuid *uuid)
> {
> @@ -740,6 +770,65 @@ box_set_replication_skip_conflict(void)
> 	replication_skip_conflict = cfg_geti("replication_skip_conflict");
> }
> 
> +void
> +box_set_replication_anon(void)
> +{
> +	bool anon = box_check_replication_anon();
> +	if (anon == replication_anon)
> +		return;
> +
> +	if (!anon) {
> +		/* Turn anonymous instance into a normal one. */
> +		replication_anon = anon;
> +		/*
> +		 * Reset all appliers. This will interrupt
> +		 * anonymous follow they're in and also update
> +		 * corresponding instance ballots so that we can
> +		 * use the latest info when choosing a replica to
> +		 * register on.
> +		 */
> +		replicaset_foreach(replica) {
> +			struct applier *applier = replica->applier;
> +			if (applier == NULL)
> +				continue;
> +			replica_clear_applier(replica);
> +			replica->applier_sync_state = APPLIER_DISCONNECTED;
> +			applier_stop(applier);
> +			applier_start(applier);
> +			replica_set_applier(replica, applier);
> +			applier_resume_to_state(applier, APPLIER_CONNECTED, TIMEOUT_INFINITY);
> +		}
> +		/* Choose a master to send register request to. */
> +		struct replica *master = replicaset_leader();
> +		assert(master != NULL && master->applier != NULL);
> +		struct applier *master_applier = master->applier;
> +		applier_resume_to_state(master_applier, APPLIER_REGISTER, TIMEOUT_INFINITY);
> +		applier_resume_to_state(master_applier, APPLIER_REGISTERED, TIMEOUT_INFINITY);
> +		applier_resume_to_state(master_applier, APPLIER_READY, TIMEOUT_INFINITY);
> +		applier_resume(master_applier);
> +		/**
> +		 * Restart other appliers to
> +		 * resend non-anonymous subscribe.
> +		 */
> +		replicaset_foreach(replica) {
> +			if (replica == master || replica->applier == NULL)
> +				continue;
> +			applier_resume(replica->applier);
> +		}
> +	} else if (!is_box_configured) {
> +		replication_anon = anon;
> +	} else {
> +		/*
> +		 * It is forbidden to turn a normal replica into
> +		 * an anonymous one.
> +		 */
> +		tnt_raise(ClientError, ER_CFG, "replication_anon",
> +			  "cannot be turned on after bootstrap"
> +			  " has finished");
> +	}
> +
> +}
> +
> void
> box_listen(void)
> {
> @@ -1379,6 +1468,132 @@ box_process_auth(struct auth_request *request, const char *salt)
> 	authenticate(user, len, salt, request->scramble);
> }
> 
> +void
> +box_process_fetch_snapshot(struct ev_io *io, struct xrow_header *header)
> +{
> +
> +	assert(header->type == IPROTO_FETCH_SNAPSHOT);
> +
> +	/* Check that bootstrap has been finished */
> +	if (!is_box_configured)
> +		tnt_raise(ClientError, ER_LOADING);
> +
> +	/* Check permissions */
> +	access_check_universe_xc(PRIV_R);
> +
> +	/* Forbid replication with disabled WAL */
> +	if (wal_mode() == WAL_NONE) {
> +		tnt_raise(ClientError, ER_UNSUPPORTED, "Replication",
> +			  "wal_mode = 'none'");
> +	}
> +
> +	say_info("sending current read-view to replica at %s", sio_socketname(io->fd));
> +
> +	/* Send the snapshot data to the instance. */
> +	struct vclock start_vclock;
> +	relay_initial_join(io->fd, header->sync, &start_vclock);
> +	say_info("read-view sent.");
> +
> +	/* Remember master's vclock after the last request */
> +	struct vclock stop_vclock;
> +	vclock_copy(&stop_vclock, &replicaset.vclock);
> +
> +	/* Send end of snapshot data marker */
> +	struct xrow_header row;
> +	xrow_encode_vclock_xc(&row, &stop_vclock);
> +	row.sync = header->sync;
> +	coio_write_xrow(io, &row);
> +}
> +
> +void
> +box_process_register(struct ev_io *io, struct xrow_header *header)
> +{
> +	assert(header->type == IPROTO_REGISTER);
> +
> +	struct tt_uuid instance_uuid = uuid_nil;
> +	struct vclock vclock;
> +	xrow_decode_register_xc(header, &instance_uuid, &vclock);
> +
> +	if (!is_box_configured)
> +		tnt_raise(ClientError, ER_LOADING);
> +
> +	if (tt_uuid_is_equal(&instance_uuid, &INSTANCE_UUID))
> +		tnt_raise(ClientError, ER_CONNECTION_TO_SELF);
> +
> +	/* Forbid replication from an anonymous instance. */
> +	if (replication_anon) {
> +		tnt_raise(ClientError, ER_UNSUPPORTED, "Replication",
> +			  "replicating from an anonymous instance.");
> +	}
> +
> +	access_check_universe_xc(PRIV_R);
> +	/* We only get register requests from anonymous instances. */
> +	struct replica *replica = replica_by_uuid(&instance_uuid);
> +	assert(replica == NULL || replica->id == REPLICA_ID_NIL);
> +	/* See box_process_join() */
> +	box_check_writable_xc();
> +	struct space *space = space_cache_find_xc(BOX_CLUSTER_ID);
> +	access_check_space_xc(space, PRIV_W);
> +
> +	/* Forbid replication with disabled WAL */
> +	if (wal_mode() == WAL_NONE) {
> +		tnt_raise(ClientError, ER_UNSUPPORTED, "Replication",
> +			  "wal_mode = 'none'");
> +	}
> +
> +	/*
> +	 * Register the replica as a WAL consumer so that
> +	 * it can resume FINAL JOIN where INITIAL JOIN ends.
> +	 */
> +	struct gc_consumer *gc = gc_consumer_register(&replicaset.vclock,
> +				"replica %s", tt_uuid_str(&instance_uuid));
> +	if (gc == NULL)
> +		diag_raise();
> +	auto gc_guard = make_scoped_guard([&] { gc_consumer_unregister(gc); });
> +
> +	say_info("registering replica %s at %s",
> +		 tt_uuid_str(&instance_uuid), sio_socketname(io->fd));
> +
> +	struct vclock start_vclock;
> +	vclock_copy(&start_vclock, &replicaset.vclock);
> +
> +	/**
> +	 * Call the server-side hook which stores the replica uuid
> +	 * in _cluster space.
> +	 */
> +	box_on_join(&instance_uuid);
> +
> +	ERROR_INJECT_YIELD(ERRINJ_REPLICA_JOIN_DELAY);
> +
> +	/* Remember master's vclock after the last request */
> +	struct vclock stop_vclock;
> +	vclock_copy(&stop_vclock, &replicaset.vclock);
> +
> +	/*
> +	 * Feed replica with WALs in range (start_vclock, stop_vclock)
> +	 * so that it gets its registration.
> +	 */
> +	relay_final_join(io->fd, header->sync, &start_vclock, &stop_vclock);
> +	say_info("final data sent.");
> +
> +	struct xrow_header row;
> +	/* Send end of WAL stream marker */
> +	xrow_encode_vclock_xc(&row, &replicaset.vclock);
> +	row.sync = header->sync;
> +	coio_write_xrow(io, &row);
> +
> +	/*
> +	 * Advance the WAL consumer state to the position where
> +	 * FINAL JOIN ended and assign it to the replica.
> +	 */
> +	gc_consumer_advance(gc, &stop_vclock);
> +	replica = replica_by_uuid(&instance_uuid);
> +	if (replica->gc != NULL)
> +		gc_consumer_unregister(replica->gc);
> +	replica->gc = gc;
> +	gc_guard.is_active = false;
> +}
> +
> void
> box_process_join(struct ev_io *io, struct xrow_header *header)
> {
> @@ -1438,6 +1653,12 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
> 	if (tt_uuid_is_equal(&instance_uuid, &INSTANCE_UUID))
> 		tnt_raise(ClientError, ER_CONNECTION_TO_SELF);
> 
> +	/* Forbid replication from an anonymous instance. */
> +	if (replication_anon) {
> +		tnt_raise(ClientError, ER_UNSUPPORTED, "Replication",
> +			  "replicating from an anonymous instance.");
> +	}
> +
> 	/* Check permissions */
> 	access_check_universe_xc(PRIV_R);
> 
> @@ -1533,27 +1754,39 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
> 	if (!is_box_configured)
> 		tnt_raise(ClientError, ER_LOADING);
> 
> +
> 	struct tt_uuid replica_uuid = uuid_nil;
> 	struct vclock replica_clock;
> 	uint32_t replica_version_id;
> 	vclock_create(&replica_clock);
> +	bool anon;
> 	xrow_decode_subscribe_xc(header, NULL, &replica_uuid,
> -				 &replica_clock, &replica_version_id);
> +				 &replica_clock, &replica_version_id, &anon);
> 
> 	/* Forbid connection to itself */
> 	if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID))
> 		tnt_raise(ClientError, ER_CONNECTION_TO_SELF);
> 
> +	/* Forbid replication from an anonymous instance. */
> +	if (replication_anon) {
> +		tnt_raise(ClientError, ER_UNSUPPORTED, "Replication",
> +			  "replicating from an anonymous instance.");
> +	}
> +
> 	/* Check permissions */
> 	access_check_universe_xc(PRIV_R);
> 
> 	/* Check replica uuid */
> 	struct replica *replica = replica_by_uuid(&replica_uuid);
> -	if (replica == NULL || replica->id == REPLICA_ID_NIL) {
> +
> +	if (!anon && (replica == NULL || replica->id == REPLICA_ID_NIL)) {
> 		tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
> 			  tt_uuid_str(&replica_uuid),
> 			  tt_uuid_str(&REPLICASET_UUID));
> 	}
> +	if (replica == NULL) {
> +		replica = replicaset_add_anon(&replica_uuid);
> +	}
> 
> 	/* Don't allow multiple relays for the same replica */
> 	if (relay_get_state(replica->relay) == RELAY_FOLLOW) {
> @@ -1774,13 +2007,16 @@ bootstrap_from_master(struct replica *master)
> 	 */
> 
> 	assert(!tt_uuid_is_nil(&INSTANCE_UUID));
> -	applier_resume_to_state(applier, APPLIER_INITIAL_JOIN, TIMEOUT_INFINITY);
> -
> +	enum applier_state wait_state = replication_anon ? APPLIER_FETCH_SNAPSHOT :
> +						      APPLIER_INITIAL_JOIN;
> +	applier_resume_to_state(applier, wait_state, TIMEOUT_INFINITY);
> 	/*
> 	 * Process initial data (snapshot or dirty disk data).
> 	 */
> 	engine_begin_initial_recovery_xc(NULL);
> -	applier_resume_to_state(applier, APPLIER_FINAL_JOIN, TIMEOUT_INFINITY);
> +	wait_state = replication_anon ? APPLIER_FETCHED_SNAPSHOT :
> +				   APPLIER_FINAL_JOIN;
> +	applier_resume_to_state(applier, wait_state, TIMEOUT_INFINITY);
> 
> 	/*
> 	 * Process final data (WALs).
> @@ -1790,8 +2026,10 @@ bootstrap_from_master(struct replica *master)
> 	recovery_journal_create(&journal, &replicaset.vclock);
> 	journal_set(&journal.base);
> 
> -	applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
> -
> +	if (!replication_anon) {
> +		applier_resume_to_state(applier, APPLIER_JOINED,
> +					TIMEOUT_INFINITY);
> +	}
> 	/* Finalize the new replica */
> 	engine_end_recovery_xc();
> 
> @@ -2106,6 +2344,7 @@ box_cfg_xc(void)
> 	box_set_replication_sync_lag();
> 	box_set_replication_sync_timeout();
> 	box_set_replication_skip_conflict();
> +	box_set_replication_anon();
> 
> 	struct gc_checkpoint *checkpoint = gc_last_checkpoint();
> 
> @@ -2136,14 +2375,20 @@ box_cfg_xc(void)
> 	}
> 	fiber_gc();
> 
> -	/* Check for correct registration of the instance in _cluster */
> -	{
> -		struct replica *self = replica_by_uuid(&INSTANCE_UUID);
> +	/*
> +	 * Check for correct registration of the instance in _cluster
> +	 * The instance won't exist in _cluster space if it is an
> +	 * anonymous replica, add it manually.
> +	 */
> +	struct replica *self = replica_by_uuid(&INSTANCE_UUID);
> +	if (!replication_anon) {
> 		if (self == NULL || self->id == REPLICA_ID_NIL) {
> 			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
> 				  tt_uuid_str(&INSTANCE_UUID),
> 				  tt_uuid_str(&REPLICASET_UUID));
> 		}
> +	} else if (self == NULL) {
> +		replicaset_add_anon(&INSTANCE_UUID);
> 	}
> 
> 	rmean_cleanup(rmean_box);
> diff --git a/src/box/box.h b/src/box/box.h
> index ccd527bd5..e4088d6b6 100644
> --- a/src/box/box.h
> +++ b/src/box/box.h
> @@ -100,7 +100,7 @@ void
> box_atfork(void);
> 
> void
> -box_set_ro(bool ro);
> +box_set_ro();
> 
> bool
> box_is_ro(void);
> @@ -179,6 +179,14 @@ box_reset_stat(void);
> void
> box_process_auth(struct auth_request *request, const char *salt);
> 
> +/** Send current read view to the replica. */
> +void
> +box_process_fetch_snapshot(struct ev_io *io, struct xrow_header *header);
> +
> +/** Register a replica */
> +void
> +box_process_register(struct ev_io *io, struct xrow_header *header);
> +
> /**
>  * Join a replica.
>  *
> @@ -234,6 +242,7 @@ void box_set_replication_connect_quorum(void);
> void box_set_replication_sync_lag(void);
> void box_set_replication_sync_timeout(void);
> void box_set_replication_skip_conflict(void);
> +void box_set_replication_anon(void);
> void box_set_net_msg_max(void);
> 
> extern "C" {
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index c39b8e7bf..9e6bd2dd7 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -1162,7 +1162,7 @@ static void
> net_send_error(struct cmsg *msg);
> 
> static void
> -tx_process_join_subscribe(struct cmsg *msg);
> +tx_process_replication(struct cmsg *msg);
> 
> static void
> net_end_join(struct cmsg *msg);
> @@ -1212,12 +1212,12 @@ static const struct cmsg_hop *dml_route[IPROTO_TYPE_STAT_MAX] = {
> };
> 
> static const struct cmsg_hop join_route[] = {
> -	{ tx_process_join_subscribe, &net_pipe },
> +	{ tx_process_replication, &net_pipe },
> 	{ net_end_join, NULL },
> };
> 
> static const struct cmsg_hop subscribe_route[] = {
> -	{ tx_process_join_subscribe, &net_pipe },
> +	{ tx_process_replication, &net_pipe },
> 	{ net_end_subscribe, NULL },
> };
> 
> @@ -1272,6 +1272,8 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
> 		cmsg_init(&msg->base, misc_route);
> 		break;
> 	case IPROTO_JOIN:
> +	case IPROTO_FETCH_SNAPSHOT:
> +	case IPROTO_REGISTER:
> 		cmsg_init(&msg->base, join_route);
> 		*stop_input = true;
> 		break;
> @@ -1752,7 +1754,7 @@ error:
> }
> 
> static void
> -tx_process_join_subscribe(struct cmsg *m)
> +tx_process_replication(struct cmsg *m)
> {
> 	struct iproto_msg *msg = tx_accept_msg(m);
> 	struct iproto_connection *con = msg->connection;
> @@ -1768,6 +1770,12 @@ tx_process_join_subscribe(struct cmsg *m)
> 			 */
> 			box_process_join(&io, &msg->header);
> 			break;
> +		case IPROTO_FETCH_SNAPSHOT:
> +			box_process_fetch_snapshot(&io, &msg->header);
> +			break;
> +		case IPROTO_REGISTER:
> +			box_process_register(&io, &msg->header);
> +			break;
> 		case IPROTO_SUBSCRIBE:
> 			/*
> 			 * Subscribe never returns - unless there
> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> index 5e8a7d483..cc8dd7cd7 100644
> --- a/src/box/iproto_constants.h
> +++ b/src/box/iproto_constants.h
> @@ -120,6 +120,8 @@ enum iproto_key {
> 	 * }
> 	 */
> 	IPROTO_SQL_INFO = 0x42,
> +	/* Leave a gap between SQL keys and additional request keys */
> +	IPROTO_REPLICA_ANON = 0x50,
> 	IPROTO_KEY_MAX
> };
> 
> @@ -216,6 +218,10 @@ enum iproto_type {
> 	IPROTO_VOTE_DEPRECATED = 67,
> 	/** Vote request command for master election */
> 	IPROTO_VOTE = 68,
> +	/** Anonymous replication FETCH SNAPSHOT */
> +	IPROTO_FETCH_SNAPSHOT = 69,
> +	/** REGISTER request to leave anonymous replication */
> +	IPROTO_REGISTER = 70,
> 
> 	/** Vinyl run info stored in .index file */
> 	VY_INDEX_RUN_INFO = 100,
> diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
> index 4884ce013..f59470774 100644
> --- a/src/box/lua/cfg.cc
> +++ b/src/box/lua/cfg.cc
> @@ -190,7 +190,7 @@ static int
> lbox_cfg_set_read_only(struct lua_State *L)
> {
> 	try {
> -		box_set_ro(cfg_geti("read_only") != 0);
> +		box_set_ro();
> 	} catch (Exception *) {
> 		luaT_error(L);
> 	}
> @@ -338,6 +338,17 @@ lbox_cfg_set_replication_sync_timeout(struct lua_State *L)
> 	return 0;
> }
> 
> +static int
> +lbox_cfg_set_replication_anon(struct lua_State *L)
> +{
> +	try {
> +		box_set_replication_anon();
> +	} catch (Exception *) {
> +		luaT_error(L);
> +	}
> +	return 0;
> +}
> +
> static int
> lbox_cfg_set_replication_skip_conflict(struct lua_State *L)
> {
> @@ -377,6 +388,7 @@ box_lua_cfg_init(struct lua_State *L)
> 		{"cfg_set_replication_sync_lag", lbox_cfg_set_replication_sync_lag},
> 		{"cfg_set_replication_sync_timeout", lbox_cfg_set_replication_sync_timeout},
> 		{"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict},
> +		{"cfg_set_replication_anon", lbox_cfg_set_replication_anon},
> 		{"cfg_set_net_msg_max", lbox_cfg_set_net_msg_max},
> 		{NULL, NULL}
> 	};
> diff --git a/src/box/lua/info.c b/src/box/lua/info.c
> index e029e0e17..b5909a878 100644
> --- a/src/box/lua/info.c
> +++ b/src/box/lua/info.c
> @@ -223,7 +223,7 @@ lbox_info_id(struct lua_State *L)
> 	 * at box.info.status.
> 	 */
> 	struct replica *self = replica_by_uuid(&INSTANCE_UUID);
> -	if (self != NULL && self->id != REPLICA_ID_NIL) {
> +	if (self != NULL && (self->id != REPLICA_ID_NIL || replication_anon)) {
> 		lua_pushinteger(L, self->id);
> 	} else {
> 		luaL_pushnull(L);
> @@ -243,7 +243,7 @@ lbox_info_lsn(struct lua_State *L)
> {
> 	/* See comments in lbox_info_id */
> 	struct replica *self = replica_by_uuid(&INSTANCE_UUID);
> -	if (self != NULL && self->id != REPLICA_ID_NIL) {
> +	if (self != NULL && (self->id != REPLICA_ID_NIL || replication_anon)) {
> 		luaL_pushint64(L, vclock_get(box_vclock, self->id));
> 	} else {
> 		luaL_pushint64(L, -1);
> diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
> index 85617c8f0..9dee75b7d 100644
> --- a/src/box/lua/load_cfg.lua
> +++ b/src/box/lua/load_cfg.lua
> @@ -77,6 +77,7 @@ local default_cfg = {
>     replication_connect_timeout = 30,
>     replication_connect_quorum = nil, -- connect all
>     replication_skip_conflict = false,
> +    replication_anon      = false,
>     feedback_enabled      = true,
>     feedback_host         = "https://feedback.tarantool.io",
>     feedback_interval     = 3600,
> @@ -140,6 +141,7 @@ local template_cfg = {
>     replication_connect_timeout = 'number',
>     replication_connect_quorum = 'number',
>     replication_skip_conflict = 'boolean',
> +    replication_anon      = 'boolean',
>     feedback_enabled      = 'boolean',
>     feedback_host         = 'string',
>     feedback_interval     = 'number',
> @@ -247,6 +249,7 @@ local dynamic_cfg = {
>     replication_sync_lag    = private.cfg_set_replication_sync_lag,
>     replication_sync_timeout = private.cfg_set_replication_sync_timeout,
>     replication_skip_conflict = private.cfg_set_replication_skip_conflict,
> +    replication_anon        = private.cfg_set_replication_anon,
>     instance_uuid           = check_instance_uuid,
>     replicaset_uuid         = check_replicaset_uuid,
>     net_msg_max             = private.cfg_set_net_msg_max,
> @@ -301,6 +304,7 @@ local dynamic_cfg_skip_at_load = {
>     replication_sync_lag    = true,
>     replication_sync_timeout = true,
>     replication_skip_conflict = true,
> +    replication_anon        = true,
>     wal_dir_rescan_delay    = true,
>     custom_proc_title       = true,
>     force_recovery          = true,
> diff --git a/src/box/recovery.cc b/src/box/recovery.cc
> index d122d618a..64aa467b1 100644
> --- a/src/box/recovery.cc
> +++ b/src/box/recovery.cc
> @@ -262,9 +262,12 @@ recover_xlog(struct recovery *r, struct xstream *stream,
> 
> 		/*
> 		 * All rows in xlog files have an assigned
> -		 * replica id.
> +		 * replica id. The only exception is anonymous
> +		 * replica, which has a zero instance id.
> +		 * In this case the only rows from such an instance
> +		 * can be for the local spaces.
> 		 */
> -		assert(row.replica_id != 0);
> +		assert(row.replica_id != 0 || row.group_id == GROUP_LOCAL);
> 		/*
> 		 * We can promote the vclock either before or
> 		 * after xstream_write(): it only makes any impact
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index e849fcf4f..14644716d 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -569,11 +569,17 @@ relay_subscribe_f(va_list ap)
> 	cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
> 		  &relay->relay_pipe, NULL, NULL, cbus_process);
> 
> -	/* Setup garbage collection trigger. */
> +	/*
> +	 * Setup garbage collection trigger.
> +	 * Not needed for anonymous replicas, since they
> +	 * aren't registered with gc at all.
> +	 */
> 	struct trigger on_close_log = {
> 		RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL
> 	};
> -	trigger_add(&r->on_close_log, &on_close_log);
> +	if (!relay->replica->anon) {
> +		trigger_add(&r->on_close_log, &on_close_log);
> +	}
> 
> 	/* Setup WAL watcher for sending new rows to the replica. */
> 	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
> @@ -652,7 +658,9 @@ relay_subscribe_f(va_list ap)
> 	say_crit("exiting the relay loop");
> 
> 	/* Clear garbage collector trigger and WAL watcher. */
> -	trigger_clear(&on_close_log);
> +	if (!relay->replica->anon) {
> +		trigger_clear(&on_close_log);
> +	}
> 	wal_clear_watcher(&relay->wal_watcher, cbus_process);
> 
> 	/* Join ack reader fiber. */
> @@ -673,7 +681,7 @@ void
> relay_subscribe(struct replica *replica, int fd, uint64_t sync,
> 		struct vclock *replica_clock, uint32_t replica_version_id)
> {
> -	assert(replica->id != REPLICA_ID_NIL);
> +	assert(replica->anon || replica->id != REPLICA_ID_NIL);
> 	struct relay *relay = replica->relay;
> 	assert(relay->state != RELAY_FOLLOW);
> 	/*
> @@ -681,7 +689,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
> 	 * unless it has already been registered by initial
> 	 * join.
> 	 */
> -	if (replica->gc == NULL) {
> +	if (replica->gc == NULL && !replica->anon) {
> 		replica->gc = gc_consumer_register(replica_clock, "replica %s",
> 						   tt_uuid_str(&replica->uuid));
> 		if (replica->gc == NULL)
> @@ -691,7 +699,11 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
> 	relay_start(relay, fd, sync, relay_send_row);
> 	auto relay_guard = make_scoped_guard([=] {
> 		relay_stop(relay);
> -		replica_on_relay_stop(replica);
> +		if (replica->anon) {
> +			replica_anon_delete(replica);
> +		} else {
> +			replica_on_relay_stop(replica);
> +		}
> 	});
> 
> 	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
> @@ -741,6 +753,14 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
> {
> 	struct relay *relay = container_of(stream, struct relay, stream);
> 	assert(iproto_type_is_dml(packet->type));
> +	/*
> +	 * Replica-local requests generated while replica was
> +	 * anonymous have a zero instance id. Just skip all
> +	 * these rows.
> +	 */
> +	if (packet->replica_id == REPLICA_ID_NIL) {
> +		return;
> +	}
> 	/*
> 	 * Transform replica local requests to IPROTO_NOP so as to
> 	 * promote vclock on the replica without actually modifying
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index 81f19aa07..ce707811a 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -53,6 +53,7 @@ int replication_connect_quorum = REPLICATION_CONNECT_QUORUM_ALL;
> double replication_sync_lag = 10.0; /* seconds */
> double replication_sync_timeout = 300.0; /* seconds */
> bool replication_skip_conflict = false;
> +bool replication_anon = false;
> 
> struct replicaset replicaset;
> 
> @@ -172,6 +173,7 @@ replica_new(void)
> 		diag_raise();
> 	}
> 	replica->id = 0;
> +	replica->anon = false;
> 	replica->uuid = uuid_nil;
> 	replica->applier = NULL;
> 	replica->gc = NULL;
> @@ -209,6 +211,19 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid)
> 	return replica;
> }
> 
> +struct replica *
> +replicaset_add_anon(const struct tt_uuid *replica_uuid)
> +{
> +	assert(!tt_uuid_is_nil(replica_uuid));
> +	assert(replica_by_uuid(replica_uuid) == NULL);
> +
> +	struct replica *replica = replica_new();
> +	replica->uuid = *replica_uuid;
> +	replica_hash_insert(&replicaset.hash, replica);
> +	replica->anon = true;
> +	return replica;
> +}
> +
> void
> replica_set_id(struct replica *replica, uint32_t replica_id)
> {
> @@ -220,11 +235,21 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
> 		/* Assign local replica id */
> 		assert(instance_id == REPLICA_ID_NIL);
> 		instance_id = replica_id;
> +	} else if (replica->anon) {
> +		/*
> +		 * Set replica gc on its transition from
> +		 * anonymous to a normal one.
> +		 */
> +		assert(replica->gc == NULL);
> +		replica->gc = gc_consumer_register(&replicaset.vclock,
> +						   "replica %s",
> +						   tt_uuid_str(&replica->uuid));
> 	}
> 	replicaset.replica_by_id[replica_id] = replica;
> 
> 	say_info("assigned id %d to replica %s",
> 		 replica->id, tt_uuid_str(&replica->uuid));
> +	replica->anon = false;
> }
> 
> void
> @@ -268,7 +293,7 @@ replica_clear_id(struct replica *replica)
> 	}
> }
> 
> -static void
> +void
> replica_set_applier(struct replica *replica, struct applier *applier)
> {
> 	assert(replica->applier == NULL);
> @@ -277,7 +302,7 @@ replica_set_applier(struct replica *replica, struct applier *applier)
> 		    &replica->on_applier_state);
> }
> 
> -static void
> +void
> replica_clear_applier(struct replica *replica)
> {
> 	assert(replica->applier != NULL);
> @@ -880,6 +905,18 @@ replica_on_relay_stop(struct replica *replica)
> 	}
> }
> 
> +void
> +replica_anon_delete(struct replica *replica)
> +{
> +	assert(replica->gc == NULL);
> +	assert(replica->id == REPLICA_ID_NIL);
> +	/* We do not replicate from anonymous replicas */
> +	assert(replica->applier == NULL);
> +	replica_hash_remove(&replicaset.hash, replica);
> +	replica_delete(replica);
> +}
> +
> +
> struct replica *
> replicaset_first(void)
> {
> diff --git a/src/box/replication.h b/src/box/replication.h
> index 470420592..978a09d41 100644
> --- a/src/box/replication.h
> +++ b/src/box/replication.h
> @@ -137,6 +137,12 @@ extern double replication_sync_timeout;
>  */
> extern bool replication_skip_conflict;
> 
> +/**
> + * Whether this replica will be anonymous or not, e.g. be preset
> + * in _cluster table and have a non-zero id.
> + */
> +extern bool replication_anon;
> +
> /**
>  * Wait for the given period of time before trying to reconnect
>  * to a master.
> @@ -265,6 +271,12 @@ struct replica {
> 	 * registered in the _cluster space yet.
> 	 */
> 	uint32_t id;
> +	/**
> +	 * Whether this is an anonymous replica, e.g. a read-only
> +	 * replica that doesn't have an id and isn't present in
> +	 * _cluster table.
> +	 */
> +	bool anon;
> 	/** Applier fiber. */
> 	struct applier *applier;
> 	/** Relay thread. */
> @@ -343,12 +355,21 @@ replica_set_id(struct replica *replica, uint32_t id);
> void
> replica_clear_id(struct replica *replica);
> 
> +void
> +replica_clear_applier(struct replica *replica);
> +
> +void
> +replica_set_applier(struct replica * replica, struct applier * applier);
> +
> /**
>  * Unregister \a relay from the \a replica.
>  */
> void
> replica_on_relay_stop(struct replica *replica);
> 
> +void
> +replica_anon_delete(struct replica *replica);
> +
> #if defined(__cplusplus)
> } /* extern "C" */
> 
> @@ -364,6 +385,9 @@ replica_check_id(uint32_t replica_id);
> struct replica *
> replicaset_add(uint32_t replica_id, const struct tt_uuid *instance_uuid);
> 
> +struct replica *
> +replicaset_add_anon(const struct tt_uuid *replica_uuid);
> +
> /**
>  * Try to connect appliers to remote peers and receive UUID.
>  * Appliers that did not connect will connect asynchronously.
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 5e2c13e0e..2b238b743 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -930,6 +930,10 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
> 		if ((*row)->replica_id == 0) {
> 			(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
> 				      vclock_get(base, instance_id);
> +			/*
> +			 * Note, an anonymous replica signs local
> +			 * rows whith a zero instance id.
> +			 */
> 			(*row)->replica_id = instance_id;
> 			/* Use lsn of the first local row as transaction id. */
> 			tsn = tsn == 0 ? (*row)->lsn : tsn;
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index 18bf08971..37a565bcb 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -1148,11 +1148,40 @@ err:
> 	return -1;
> }
> 
> +int
> +xrow_encode_register(struct xrow_header *row,
> +		     const struct tt_uuid *instance_uuid,
> +		     const struct vclock *vclock)
> +{
> +	memset(row, 0, sizeof(*row));
> +	size_t size = mp_sizeof_map(2) +
> +		      mp_sizeof_uint(IPROTO_INSTANCE_UUID) +
> +		      mp_sizeof_str(UUID_STR_LEN) +
> +		      mp_sizeof_uint(IPROTO_VCLOCK) + mp_sizeof_vclock(vclock);
> +	char *buf = (char *) region_alloc(&fiber()->gc, size);
> +	if (buf == NULL) {
> +		diag_set(OutOfMemory, size, "region_alloc", "buf");
> +		return -1;
> +	}
> +	char *data = buf;
> +	data = mp_encode_map(data, 2);
> +	data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
> +	data = xrow_encode_uuid(data, instance_uuid);
> +	data = mp_encode_uint(data, IPROTO_VCLOCK);
> +	data = mp_encode_vclock(data, vclock);
> +	assert(data <= buf + size);
> +	row->body[0].iov_base = buf;
> +	row->body[0].iov_len = (data - buf);
> +	row->bodycnt = 1;
> +	row->type = IPROTO_REGISTER;
> +	return 0;
> +}
> +
> int
> xrow_encode_subscribe(struct xrow_header *row,
> 		      const struct tt_uuid *replicaset_uuid,
> 		      const struct tt_uuid *instance_uuid,
> -		      const struct vclock *vclock)
> +		      const struct vclock *vclock, bool anon)
> {
> 	memset(row, 0, sizeof(*row));
> 	size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
> @@ -1162,7 +1191,7 @@ xrow_encode_subscribe(struct xrow_header *row,
> 		return -1;
> 	}
> 	char *data = buf;
> -	data = mp_encode_map(data, 4);
> +	data = mp_encode_map(data, 5);
> 	data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
> 	data = xrow_encode_uuid(data, replicaset_uuid);
> 	data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
> @@ -1171,6 +1200,8 @@ xrow_encode_subscribe(struct xrow_header *row,
> 	data = mp_encode_vclock(data, vclock);
> 	data = mp_encode_uint(data, IPROTO_SERVER_VERSION);
> 	data = mp_encode_uint(data, tarantool_version_id());
> +	data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
> +	data = mp_encode_bool(data, anon);
> 	assert(data <= buf + size);
> 	row->body[0].iov_base = buf;
> 	row->body[0].iov_len = (data - buf);
> @@ -1182,7 +1213,7 @@ xrow_encode_subscribe(struct xrow_header *row,
> int
> xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
> 		      struct tt_uuid *instance_uuid, struct vclock *vclock,
> -		      uint32_t *version_id)
> +		      uint32_t *version_id, bool *anon)
> {
> 	if (row->bodycnt == 0) {
> 		diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
> @@ -1245,6 +1276,16 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
> 			}
> 			*version_id = mp_decode_uint(&d);
> 			break;
> +		case IPROTO_REPLICA_ANON:
> +			if (anon == NULL)
> +				goto skip;
> +			if (mp_typeof(*d) != MP_BOOL) {
> +				xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
> +						   "invalid REPLICA_ANON flag");
> +				return -1;
> +			}
> +			*anon = mp_decode_bool(&d);
> +			break;
> 		default: skip:
> 			mp_next(&d); /* value */
> 		}
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index 60def2d3c..b8da3a0d0 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -301,12 +301,27 @@ xrow_decode_ballot(struct xrow_header *row, struct ballot *ballot);
> void
> xrow_encode_vote(struct xrow_header *row);
> 
> +/**
> + * Encode REGISTER command.
> + * @param[out] Row.
> + * @param instance_uuid Instance uuid.
> + * @param vclock Replication clock.
> + *
> + * @retval 0 Success.
> + * @retval -1 Memory error.
> + */
> +int
> +xrow_encode_register(struct xrow_header *row,
> +		     const struct tt_uuid *instance_uuid,
> +		     const struct vclock *vclock);
> +
> /**
>  * Encode SUBSCRIBE command.
>  * @param[out] Row.
>  * @param replicaset_uuid Replica set uuid.
>  * @param instance_uuid Instance uuid.
>  * @param vclock Replication clock.
> + * @param anon Whether it is an anonymous subscribe request or not.
>  *
>  * @retval  0 Success.
>  * @retval -1 Memory error.
> @@ -315,7 +330,7 @@ int
> xrow_encode_subscribe(struct xrow_header *row,
> 		      const struct tt_uuid *replicaset_uuid,
> 		      const struct tt_uuid *instance_uuid,
> -		      const struct vclock *vclock);
> +		      const struct vclock *vclock, bool anon);
> 
> /**
>  * Decode SUBSCRIBE command.
> @@ -324,6 +339,7 @@ xrow_encode_subscribe(struct xrow_header *row,
>  * @param[out] instance_uuid.
>  * @param[out] vclock.
>  * @param[out] version_id.
> + * @param[out] anon Whether it is an anonymous subscribe.
>  *
>  * @retval  0 Success.
>  * @retval -1 Memory or format error.
> @@ -331,7 +347,7 @@ xrow_encode_subscribe(struct xrow_header *row,
> int
> xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
> 		      struct tt_uuid *instance_uuid, struct vclock *vclock,
> -		      uint32_t *version_id);
> +		      uint32_t *version_id, bool *anon);
> 
> /**
>  * Encode JOIN command.
> @@ -355,7 +371,22 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid);
> static inline int
> xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid)
> {
> -	return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL);
> +	return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, NULL);
> +}
> +
> +/**
> + * Decode REGISTER request.
> + * @param row Row to decode.
> + * @param[out] instance_uuid Instance uuid.
> + * @param[out] vclock Instance vclock.
> + * @retval 0 Success.
> + * @retval -1 Memory or format error.
> + */
> +static inline int
> +xrow_decode_register(struct xrow_header *row, struct tt_uuid *instance_uuid,
> +		     struct vclock *vclock)
> +{
> +	return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, NULL, NULL);
> }
> 
> /**
> @@ -380,7 +411,7 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock);
> static inline int
> xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock)
> {
> -	return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL);
> +	return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL);
> }
> 
> /**
> @@ -411,7 +442,7 @@ xrow_decode_subscribe_response(struct xrow_header *row,
> 			       struct tt_uuid *replicaset_uuid,
> 			       struct vclock *vclock)
> {
> -	return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL);
> +	return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL, NULL);
> }
> 
> /**
> @@ -769,15 +800,25 @@ xrow_decode_ballot_xc(struct xrow_header *row, struct ballot *ballot)
> 		diag_raise();
> }
> 
> +/** @copydoc xrow_encode_register. */
> +static inline void
> +xrow_encode_register_xc(struct xrow_header *row,
> +		       const struct tt_uuid *instance_uuid,
> +		       const struct vclock *vclock)
> +{
> +	if (xrow_encode_register(row, instance_uuid, vclock) != 0)
> +		diag_raise();
> +}
> +
> /** @copydoc xrow_encode_subscribe. */
> static inline void
> xrow_encode_subscribe_xc(struct xrow_header *row,
> 			 const struct tt_uuid *replicaset_uuid,
> 			 const struct tt_uuid *instance_uuid,
> -			 const struct vclock *vclock)
> +			 const struct vclock *vclock, bool anon)
> {
> 	if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,
> -				  vclock) != 0)
> +				  vclock, anon) != 0)
> 		diag_raise();
> }
> 
> @@ -786,10 +827,10 @@ static inline void
> xrow_decode_subscribe_xc(struct xrow_header *row,
> 			 struct tt_uuid *replicaset_uuid,
> 		         struct tt_uuid *instance_uuid, struct vclock *vclock,
> -			 uint32_t *replica_version_id)
> +			 uint32_t *replica_version_id, bool *anon)
> {
> 	if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
> -				  vclock, replica_version_id) != 0)
> +				  vclock, replica_version_id, anon) != 0)
> 		diag_raise();
> }
> 
> @@ -810,6 +851,15 @@ xrow_decode_join_xc(struct xrow_header *row, struct tt_uuid *instance_uuid)
> 		diag_raise();
> }
> 
> +/** @copydoc xrow_decode_register. */
> +static inline void
> +xrow_decode_register_xc(struct xrow_header *row, struct tt_uuid *instance_uuid,
> +			struct vclock *vclock)
> +{
> +	if (xrow_decode_register(row, instance_uuid, vclock) != 0)
> +		diag_raise();
> +}
> +
> /** @copydoc xrow_encode_vclock. */
> static inline void
> xrow_encode_vclock_xc(struct xrow_header *row, const struct vclock *vclock)
> diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
> index 799297ba0..7aec1d715 100644
> --- a/test/app-tap/init_script.result
> +++ b/test/app-tap/init_script.result
> @@ -25,30 +25,31 @@ box.cfg
> 20	pid_file:box.pid
> 21	read_only:false
> 22	readahead:16320
> -23	replication_connect_timeout:30
> -24	replication_skip_conflict:false
> -25	replication_sync_lag:10
> -26	replication_sync_timeout:300
> -27	replication_timeout:1
> -28	slab_alloc_factor:1.05
> -29	strip_core:true
> -30	too_long_threshold:0.5
> -31	vinyl_bloom_fpr:0.05
> -32	vinyl_cache:134217728
> -33	vinyl_dir:.
> -34	vinyl_max_tuple_size:1048576
> -35	vinyl_memory:134217728
> -36	vinyl_page_size:8192
> -37	vinyl_read_threads:1
> -38	vinyl_run_count_per_level:2
> -39	vinyl_run_size_ratio:3.5
> -40	vinyl_timeout:60
> -41	vinyl_write_threads:4
> -42	wal_dir:.
> -43	wal_dir_rescan_delay:2
> -44	wal_max_size:268435456
> -45	wal_mode:write
> -46	worker_pool_threads:4
> +23	replication_anon:false
> +24	replication_connect_timeout:30
> +25	replication_skip_conflict:false
> +26	replication_sync_lag:10
> +27	replication_sync_timeout:300
> +28	replication_timeout:1
> +29	slab_alloc_factor:1.05
> +30	strip_core:true
> +31	too_long_threshold:0.5
> +32	vinyl_bloom_fpr:0.05
> +33	vinyl_cache:134217728
> +34	vinyl_dir:.
> +35	vinyl_max_tuple_size:1048576
> +36	vinyl_memory:134217728
> +37	vinyl_page_size:8192
> +38	vinyl_read_threads:1
> +39	vinyl_run_count_per_level:2
> +40	vinyl_run_size_ratio:3.5
> +41	vinyl_timeout:60
> +42	vinyl_write_threads:4
> +43	wal_dir:.
> +44	wal_dir_rescan_delay:2
> +45	wal_max_size:268435456
> +46	wal_mode:write
> +47	worker_pool_threads:4
> --
> -- Test insert from detached fiber
> --
> diff --git a/test/box/admin.result b/test/box/admin.result
> index 6126f3a97..5a03a979a 100644
> --- a/test/box/admin.result
> +++ b/test/box/admin.result
> @@ -71,6 +71,8 @@ cfg_filter(box.cfg)
>     - false
>   - - readahead
>     - 16320
> +  - - replication_anon
> +    - false
>   - - replication_connect_timeout
>     - 30
>   - - replication_skip_conflict
> diff --git a/test/box/cfg.result b/test/box/cfg.result
> index 5370bb870..d6ce6b621 100644
> --- a/test/box/cfg.result
> +++ b/test/box/cfg.result
> @@ -59,6 +59,8 @@ cfg_filter(box.cfg)
>  |     - false
>  |   - - readahead
>  |     - 16320
> + |   - - replication_anon
> + |     - false
>  |   - - replication_connect_timeout
>  |     - 30
>  |   - - replication_skip_conflict
> @@ -158,6 +160,8 @@ cfg_filter(box.cfg)
>  |     - false
>  |   - - readahead
>  |     - 16320
> + |   - - replication_anon
> + |     - false
>  |   - - replication_connect_timeout
>  |     - 30
>  |   - - replication_skip_conflict
> diff --git a/test/replication/anon.lua b/test/replication/anon.lua
> new file mode 100644
> index 000000000..2e7ee9983
> --- /dev/null
> +++ b/test/replication/anon.lua
> @@ -0,0 +1,13 @@
> +#!/usr/bin/env tarantool
> +
> +box.cfg({
> +    listen              = os.getenv("LISTEN"),
> +    replication         = os.getenv("MASTER"),
> +    memtx_memory        = 107374182,
> +    replication_timeout = 0.1,
> +    replication_connect_timeout = 0.5,
> +    read_only=true,
> +    replication_anon=true,
> +})
> +
> +require('console').listen(os.getenv('ADMIN'))
> diff --git a/test/replication/anon.result b/test/replication/anon.result
> new file mode 100644
> index 000000000..df84484b2
> --- /dev/null
> +++ b/test/replication/anon.result
> @@ -0,0 +1,259 @@
> +-- test-run result file version 2
> +env = require('test_run')
> + | ---
> + | ...
> +vclock_diff = require('fast_replica').vclock_diff
> + | ---
> + | ...
> +test_run = env.new()
> + | ---
> + | ...
> +
> +-- prepare master
> +box.schema.user.grant('guest', 'replication')
> + | ---
> + | ...
> +_ = box.schema.space.create('loc', {is_local=true})
> + | ---
> + | ...
> +_ = box.schema.space.create('temp', {temporary=true})
> + | ---
> + | ...
> +_ = box.schema.space.create('test')
> + | ---
> + | ...
> +_ = box.space.loc:create_index('pk')
> + | ---
> + | ...
> +_ = box.space.temp:create_index('pk')
> + | ---
> + | ...
> +_ = box.space.test:create_index('pk')
> + | ---
> + | ...
> +box.space.test:insert{1}
> + | ---
> + | - [1]
> + | ...
> +
> +test_run:cmd('create server replica_anon with rpl_master=default, script="replication/anon.lua"')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('start server replica_anon')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('switch replica_anon')
> + | ---
> + | - true
> + | ...
> +
> +box.info.status
> + | ---
> + | - running
> + | ...
> +box.info.id
> + | ---
> + | - 0
> + | ...
> +box.info.lsn
> + | ---
> + | - 0
> + | ...
> +test_run:wait_upstream(1, {status='follow'})
> + | ---
> + | - true
> + | ...
> +
> +-- Temporary spaces are accessible as read / write.
> +for i = 1,10 do box.space.temp:insert{i} end
> + | ---
> + | ...
> +box.space.temp:select{}
> + | ---
> + | - - [1]
> + |   - [2]
> + |   - [3]
> + |   - [4]
> + |   - [5]
> + |   - [6]
> + |   - [7]
> + |   - [8]
> + |   - [9]
> + |   - [10]
> + | ...
> +
> +box.info.lsn
> + | ---
> + | - 0
> + | ...
> +
> +-- Same for local spaces.
> +for i = 1,10 do box.space.loc:insert{i} end
> + | ---
> + | ...
> +box.space.loc:select{}
> + | ---
> + | - - [1]
> + |   - [2]
> + |   - [3]
> + |   - [4]
> + |   - [5]
> + |   - [6]
> + |   - [7]
> + |   - [8]
> + |   - [9]
> + |   - [10]
> + | ...
> +
> +-- Replica-local changes are accounted for in 0 vclock component.
> +box.info.lsn
> + | ---
> + | - 10
> + | ...
> +box.info.vclock[0]
> + | ---
> + | - 10
> + | ...
> +
> +-- Replica is read-only.
> +box.cfg.read_only
> + | ---
> + | - true
> + | ...
> +box.cfg{read_only=false}
> + | ---
> + | - error: 'Incorrect value for option ''read_only'': the value may be set to false
> + |     only when replication_anon is false'
> + | ...
> +
> +box.space.test:insert{2}
> + | ---
> + | - error: Can't modify data because this instance is in read-only mode.
> + | ...
> +
> +box.space.loc:drop()
> + | ---
> + | - error: Can't modify data because this instance is in read-only mode.
> + | ...
> +box.space.loc:truncate()
> + | ---
> + | - error: Can't modify data because this instance is in read-only mode.
> + | ...
> +
> +test_run:cmd('switch default')
> + | ---
> + | - true
> + | ...
> +
> +-- Replica isn't visible on master.
> +#box.info.replication
> + | ---
> + | - 1
> + | ...
> +
> +test_run:cmd('switch replica_anon')
> + | ---
> + | - true
> + | ...
> +
> +-- Promote anonymous replica.
> +box.cfg{replication_anon=false}
> + | ---
> + | ...
> +-- Cannot switch back after becoming "normal".
> +box.cfg{replication_anon=true}
> + | ---
> + | - error: 'Incorrect value for option ''replication_anon'': cannot be turned on after
> + |     bootstrap has finished'
> + | ...
> +
> +box.info.id
> + | ---
> + | - 2
> + | ...
> +#box.info.replication
> + | ---
> + | - 2
> + | ...
> +test_run:wait_upstream(1, {status='follow'})
> + | ---
> + | - true
> + | ...
> +box.info.replication.downstream
> + | ---
> + | - null
> + | ...
> +
> +old_lsn = box.info.vclock[2] or 0
> + | ---
> + | ...
> +
> +-- Now read_only can be turned off.
> +box.cfg{read_only=false}
> + | ---
> + | ...
> +box.space.test:insert{2}
> + | ---
> + | - [2]
> + | ...
> +-- New changes are tracked under freshly assigned id.
> +box.info.vclock[2] == old_lsn + 1
> + | ---
> + | - true
> + | ...
> +
> +test_run:cmd('switch default')
> + | ---
> + | - true
> + | ...
> +
> +-- Other instances may replicate from a previously-anonymous one.
> +test_run:cmd("set variable repl_source to 'replica_anon.listen'")
> + | ---
> + | - true
> + | ...
> +box.cfg{replication=repl_source}
> + | ---
> + | ...
> +#box.info.replication
> + | ---
> + | - 2
> + | ...
> +test_run:wait_upstream(2, {status='follow'})
> + | ---
> + | - true
> + | ...
> +test_run:wait_downstream(2, {status='follow'})
> + | ---
> + | - true
> + | ...
> +#box.info.vclock
> + | ---
> + | - 2
> + | ...
> +
> +-- cleanup
> +box.cfg{replication=""}
> + | ---
> + | ...
> +test_run:cmd('stop server replica_anon with cleanup=1')
> + | ---
> + | - true
> + | ...
> +box.space.test:drop()
> + | ---
> + | ...
> +box.space.temp:drop()
> + | ---
> + | ...
> +box.space.loc:drop()
> + | ---
> + | ...
> +box.schema.user.revoke('guest', 'replication')
> + | ---
> + | ...
> +test_run:cleanup_cluster()
> + | ---
> + | ...
> diff --git a/test/replication/anon.test.lua b/test/replication/anon.test.lua
> new file mode 100644
> index 000000000..f151f9e8c
> --- /dev/null
> +++ b/test/replication/anon.test.lua
> @@ -0,0 +1,89 @@
> +env = require('test_run')
> +vclock_diff = require('fast_replica').vclock_diff
> +test_run = env.new()
> +
> +-- prepare master
> +box.schema.user.grant('guest', 'replication')
> +_ = box.schema.space.create('loc', {is_local=true})
> +_ = box.schema.space.create('temp', {temporary=true})
> +_ = box.schema.space.create('test')
> +_ = box.space.loc:create_index('pk')
> +_ = box.space.temp:create_index('pk')
> +_ = box.space.test:create_index('pk')
> +box.space.test:insert{1}
> +
> +test_run:cmd('create server replica_anon with rpl_master=default, script="replication/anon.lua"')
> +test_run:cmd('start server replica_anon')
> +test_run:cmd('switch replica_anon')
> +
> +box.info.status
> +box.info.id
> +box.info.lsn
> +test_run:wait_upstream(1, {status='follow'})
> +
> +-- Temporary spaces are accessible as read / write.
> +for i = 1,10 do box.space.temp:insert{i} end
> +box.space.temp:select{}
> +
> +box.info.lsn
> +
> +-- Same for local spaces.
> +for i = 1,10 do box.space.loc:insert{i} end
> +box.space.loc:select{}
> +
> +-- Replica-local changes are accounted for in 0 vclock component.
> +box.info.lsn
> +box.info.vclock[0]
> +
> +-- Replica is read-only.
> +box.cfg.read_only
> +box.cfg{read_only=false}
> +
> +box.space.test:insert{2}
> +
> +box.space.loc:drop()
> +box.space.loc:truncate()
> +
> +test_run:cmd('switch default')
> +
> +-- Replica isn't visible on master.
> +#box.info.replication
> +
> +test_run:cmd('switch replica_anon')
> +
> +-- Promote anonymous replica.
> +box.cfg{replication_anon=false}
> +-- Cannot switch back after becoming "normal".
> +box.cfg{replication_anon=true}
> +
> +box.info.id
> +#box.info.replication
> +test_run:wait_upstream(1, {status='follow'})
> +box.info.replication.downstream
> +
> +old_lsn = box.info.vclock[2] or 0
> +
> +-- Now read_only can be turned off.
> +box.cfg{read_only=false}
> +box.space.test:insert{2}
> +-- New changes are tracked under freshly assigned id.
> +box.info.vclock[2] == old_lsn + 1
> +
> +test_run:cmd('switch default')
> +
> +-- Other instances may replicate from a previously-anonymous one.
> +test_run:cmd("set variable repl_source to 'replica_anon.listen'")
> +box.cfg{replication=repl_source}
> +#box.info.replication
> +test_run:wait_upstream(2, {status='follow'})
> +test_run:wait_downstream(2, {status='follow'})
> +#box.info.vclock
> +
> +-- cleanup
> +box.cfg{replication=""}
> +test_run:cmd('stop server replica_anon with cleanup=1')
> +box.space.test:drop()
> +box.space.temp:drop()
> +box.space.loc:drop()
> +box.schema.user.revoke('guest', 'replication')
> +test_run:cleanup_cluster()
> diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
> index cd686a0e2..429c64df3 100644
> --- a/test/replication/suite.cfg
> +++ b/test/replication/suite.cfg
> @@ -1,4 +1,5 @@
> {
> +    "anon.test.lua": {},
>     "misc.test.lua": {},
>     "once.test.lua": {},
>     "on_replace.test.lua": {},
> -- 
> 2.20.1 (Apple Git-117)
> 



More information about the Tarantool-patches mailing list