[Tarantool-patches] [PATCH v2 5/5] replication: introduce anonymous replica.
sergepetrenko
sergepetrenko at tarantool.org
Wed Dec 25 15:47:02 MSK 2019
From: Serge Petrenko <sergepetrenko at tarantool.org>
This commit introduces anonymous replicas. Such replicas do not pollute
_cluster table (they can only be read-only and have a zero id in return).
An anonymous replica can be promoted to a normal one if needed.
Closes #3186
@TarantoolBot document
Title: Document anonymous replica
There is a new type of replica in tarantool, anonymous one. Anonymous
replica is read-only (but you still can write to temporary and
replica-local spaces), and it isn't present in _cluster table.
Since anonymous replica isn't registered in _cluster table, there is no
limitation for anonymous replica count in a replicaset. You can have as
many of them as you want.
In order to make a replica anonymous, you have to pass an option
`replication_anon=true` to `box.cfg`. You also have to set 'read_only'
to true.
Let's go through anonymous replica bootstrap.
Suppose we have a master configured with
```
box.cfg{listen=3301}
```
And created a local space called "loc"
```
box.schema.space.create('loc', {is_local=true})
box.space.loc:create_index("pk")
```
Now, to configure an anonymous replica, we have to issue `box.cfg`,
as usual.
```
box.cfg{replication_anon=true, read_only=true, replication=3301}
```
As mentioned above, `replication_anon` may be set to true only together
with `read_only`
The instance will fetch masters snapshot and proceed to following its
changes. It will not receive an id so its id will remain zero.
```
tarantool> box.info.id
---
- 0
...
```
```
tarantool> box.info.replication
---
- 1:
id: 1
uuid: 3c84f8d9-e34d-4651-969c-3d0ed214c60f
lsn: 4
upstream:
status: follow
idle: 0.6912029999985
peer:
lag: 0.00014615058898926
...
```
Now we can use the replica.
For example, we may do inserts into the local space:
```
tarantool> for i = 1,10 do
> box.space.loc:insert{i}
> end
---
...
```
Note, that while the instance is anonymous, it will increase the 0-th
component of its vclock:
```
tarantool> box.info.vclock
---
- {0: 10, 1: 4}
...
```
Let's now promote the replica to a normal one:
```
tarantool> box.cfg{replication_anon=false}
2019-12-13 20:34:37.423 [71329] main I> assigned id 2 to replica 6a9c2ed2-b9e1-4c57-a0e8-51a46def7661
2019-12-13 20:34:37.424 [71329] main/102/interactive I> set 'replication_anon' configuration option to false
---
...
tarantool> 2019-12-13 20:34:37.424 [71329] main/117/applier/ I> subscribed
2019-12-13 20:34:37.424 [71329] main/117/applier/ I> remote vclock {1: 5} local vclock {0: 10, 1: 5}
2019-12-13 20:34:37.425 [71329] main/118/applierw/ C> leaving orphan mode
```
The replica just received id 2. We can make it read-write now.
```
box.cfg{read_only=false}
2019-12-13 20:35:46.392 [71329] main/102/interactive I> set 'read_only' configuration option to false
---
...
tarantool> box.schema.space.create('test')
---
- engine: memtx
before_replace: 'function: 0x01109f9dc8'
on_replace: 'function: 0x01109f9d90'
ck_constraint: []
field_count: 0
temporary: false
index: []
is_local: false
enabled: false
name: test
id: 513
- created
...
tarantool> box.info.vclock
---
- {0: 10, 1: 5, 2: 2}
...
```
Now replica tracks its changes in 2nd vclock component, as expected.
It can also become replication master from now on.
Side notes:
* You cannot replicate from an anonymous instance.
* To promote an anonymous instance to a regular one,
you first have to start it as anonymous, and only
then issue `box.cfg{replication_anon=false}`
* In order for the deanonymization to succeed, the
instance must replicate from some read-write instance,
otherwise noone will be able to add it to _cluster table.
---
src/box/applier.cc | 59 +++++-
src/box/applier.h | 4 +
src/box/box.cc | 265 ++++++++++++++++++++++++--
src/box/box.h | 11 +-
src/box/errcode.h | 1 +
src/box/iproto.cc | 16 +-
src/box/iproto_constants.h | 6 +
src/box/lua/cfg.cc | 14 +-
src/box/lua/info.c | 4 +-
src/box/lua/load_cfg.lua | 4 +
src/box/recovery.cc | 7 +-
src/box/relay.cc | 28 ++-
src/box/replication.cc | 41 +++-
src/box/replication.h | 24 +++
src/box/wal.c | 4 +
src/box/xrow.c | 49 ++++-
src/box/xrow.h | 68 ++++++-
test/app-tap/init_script.result | 49 ++---
test/box/admin.result | 2 +
test/box/cfg.result | 4 +
test/box/misc.result | 1 +
test/replication/anon.lua | 13 ++
test/replication/anon.result | 326 ++++++++++++++++++++++++++++++++
test/replication/anon.test.lua | 118 ++++++++++++
test/replication/anon1.lua | 1 +
test/replication/anon2.lua | 1 +
test/replication/suite.cfg | 1 +
27 files changed, 1053 insertions(+), 68 deletions(-)
create mode 100644 test/replication/anon.lua
create mode 100644 test/replication/anon.result
create mode 100644 test/replication/anon.test.lua
create mode 120000 test/replication/anon1.lua
create mode 120000 test/replication/anon2.lua
diff --git a/src/box/applier.cc b/src/box/applier.cc
index f4f9d0670..2939a0f61 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -400,7 +400,7 @@ applier_wait_snapshot(struct applier *applier)
* response, but a stream of rows from checkpoint.
*/
if (applier->version_id >= version_id(1, 7, 0)) {
- /* Decode JOIN response */
+ /* Decode JOIN/FETCH_SNAPSHOT response */
coio_read_xrow(coio, ibuf, &row);
if (iproto_type_is_error(row.type)) {
xrow_decode_error_xc(&row); /* re-throw error */
@@ -452,6 +452,23 @@ applier_wait_snapshot(struct applier *applier)
return row_count;
}
+static void
+applier_fetch_snapshot(struct applier *applier)
+{
+ /* Send FETCH SNAPSHOT request */
+ struct ev_io *coio = &applier->io;
+ struct xrow_header row;
+
+ memset(&row, 0, sizeof(row));
+ row.type = IPROTO_FETCH_SNAPSHOT;
+ coio_write_xrow(coio, &row);
+
+ applier_set_state(applier, APPLIER_FETCH_SNAPSHOT);
+ applier_wait_snapshot(applier);
+ applier_set_state(applier, APPLIER_FETCHED_SNAPSHOT);
+ applier_set_state(applier, APPLIER_READY);
+}
+
static uint64_t
applier_wait_register(struct applier *applier, uint64_t row_count)
{
@@ -497,6 +514,28 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
return row_count;
}
+static void
+applier_register(struct applier *applier)
+{
+ /* Send REGISTER request */
+ struct ev_io *coio = &applier->io;
+ struct xrow_header row;
+
+ memset(&row, 0, sizeof(row));
+ /*
+ * Send this instance's current vclock together
+ * with REGISTER request.
+ */
+ xrow_encode_register(&row, &INSTANCE_UUID, box_vclock);
+ row.type = IPROTO_REGISTER;
+ coio_write_xrow(coio, &row);
+
+ applier_set_state(applier, APPLIER_REGISTER);
+ applier_wait_register(applier, 0);
+ applier_set_state(applier, APPLIER_REGISTERED);
+ applier_set_state(applier, APPLIER_READY);
+}
+
/**
* Execute and process JOIN request (bootstrap the instance).
*/
@@ -828,7 +867,7 @@ applier_subscribe(struct applier *applier)
vclock_create(&vclock);
vclock_copy(&vclock, &replicaset.vclock);
xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
- &vclock);
+ &vclock, replication_anon);
coio_write_xrow(coio, &row);
/* Read SUBSCRIBE response */
@@ -996,10 +1035,24 @@ applier_f(va_list ap)
if (tt_uuid_is_nil(&REPLICASET_UUID)) {
/*
* Execute JOIN if this is a bootstrap.
+ * In case of anonymous replication, don't
+ * join but just fetch master's snapshot.
+ *
* The join will pause the applier
* until WAL is created.
*/
- applier_join(applier);
+ if (replication_anon)
+ applier_fetch_snapshot(applier);
+ else
+ applier_join(applier);
+ }
+ if (applier->version_id >= version_id(1, 7, 0) &&
+ !replication_anon && instance_id == REPLICA_ID_NIL) {
+ /* anonymity was turned off while we were
+ * fetching a snapshot or following master.
+ * Register the replica now.
+ */
+ applier_register(applier);
}
applier_subscribe(applier);
/*
diff --git a/src/box/applier.h b/src/box/applier.h
index b406e6aaf..c9fdc2955 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -61,6 +61,10 @@ enum { APPLIER_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */
_(APPLIER_STOPPED, 10) \
_(APPLIER_DISCONNECTED, 11) \
_(APPLIER_LOADING, 12) \
+ _(APPLIER_FETCH_SNAPSHOT, 13) \
+ _(APPLIER_FETCHED_SNAPSHOT, 14) \
+ _(APPLIER_REGISTER, 15) \
+ _(APPLIER_REGISTERED, 16) \
/** States for the applier */
ENUM(applier_state, applier_STATE);
diff --git a/src/box/box.cc b/src/box/box.cc
index 56d07e634..a93e2503e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -223,9 +223,13 @@ error:
return -1;
}
+static bool
+box_check_ro(void);
+
void
-box_set_ro(bool ro)
+box_set_ro()
{
+ bool ro = box_check_ro();
if (ro == is_ro)
return; /* nothing to do */
if (ro)
@@ -486,6 +490,32 @@ box_check_uuid(struct tt_uuid *uuid, const char *name)
}
}
+static bool
+box_check_ro()
+{
+ bool ro = cfg_geti("read_only") != 0;
+ bool anon = cfg_geti("replication_anon") != 0;
+ if (anon && !ro) {
+ tnt_raise(ClientError, ER_CFG, "read_only",
+ "the value may be set to false only when "
+ "replication_anon is false");
+ }
+ return ro;
+}
+
+static bool
+box_check_replication_anon(void)
+{
+ bool anon = cfg_geti("replication_anon") != 0;
+ bool ro = cfg_geti("read_only") != 0;
+ if (anon && !ro) {
+ tnt_raise(ClientError, ER_CFG, "replication_anon",
+ "the value may be set to true only when "
+ "the instance is read-only");
+ }
+ return anon;
+}
+
static void
box_check_instance_uuid(struct tt_uuid *uuid)
{
@@ -740,6 +770,64 @@ box_set_replication_skip_conflict(void)
replication_skip_conflict = cfg_geti("replication_skip_conflict");
}
+void
+box_set_replication_anon(void)
+{
+ bool anon = box_check_replication_anon();
+ if (anon == replication_anon)
+ return;
+
+ if (!anon) {
+ /* Turn anonymous instance into a normal one. */
+ replication_anon = anon;
+ /*
+ * Reset all appliers. This will interrupt
+ * anonymous follow they're in so that one of
+ * them can register and others resend a
+ * non-anonymous subscribe.
+ */
+ replicaset_foreach(replica) {
+ struct applier *applier = replica->applier;
+ if (applier == NULL)
+ continue;
+ replica_clear_applier(replica);
+ applier_stop(applier);
+ replica->applier_sync_state = APPLIER_DISCONNECTED;
+ replica_set_applier(replica, applier);
+ }
+ /* Choose a master to send register request to. */
+ struct replica *master = replicaset_leader();
+ assert(master != NULL && master->applier != NULL);
+ struct applier *master_applier = master->applier;
+
+ applier_start(master_applier);
+
+ applier_resume_to_state(master_applier, APPLIER_REGISTER, TIMEOUT_INFINITY);
+ applier_resume_to_state(master_applier, APPLIER_REGISTERED, TIMEOUT_INFINITY);
+ applier_resume_to_state(master_applier, APPLIER_READY, TIMEOUT_INFINITY);
+ applier_resume(master_applier);
+ /**
+ * Restart other appliers to
+ * resend non-anonymous subscribe.
+ */
+ replicaset_foreach(replica) {
+ if (replica != master && replica->applier)
+ applier_start(replica->applier);
+ }
+ } else if (!is_box_configured) {
+ replication_anon = anon;
+ } else {
+ /*
+ * It is forbidden to turn a normal replica into
+ * an anonymous one.
+ */
+ tnt_raise(ClientError, ER_CFG, "replication_anon",
+ "cannot be turned on after bootstrap"
+ " has finished");
+ }
+
+}
+
void
box_listen(void)
{
@@ -1379,6 +1467,132 @@ box_process_auth(struct auth_request *request, const char *salt)
authenticate(user, len, salt, request->scramble);
}
+void
+box_process_fetch_snapshot(struct ev_io *io, struct xrow_header *header)
+{
+ assert(header->type == IPROTO_FETCH_SNAPSHOT);
+
+ /* Check that bootstrap has been finished */
+ if (!is_box_configured)
+ tnt_raise(ClientError, ER_LOADING);
+
+ /* Check permissions */
+ access_check_universe_xc(PRIV_R);
+
+ /* Forbid replication with disabled WAL */
+ if (wal_mode() == WAL_NONE) {
+ tnt_raise(ClientError, ER_UNSUPPORTED, "Replication",
+ "wal_mode = 'none'");
+ }
+
+ say_info("sending current read-view to replica at %s", sio_socketname(io->fd));
+
+ /* Send the snapshot data to the instance. */
+ struct vclock start_vclock;
+ relay_initial_join(io->fd, header->sync, &start_vclock);
+ say_info("read-view sent.");
+
+ /* Remember master's vclock after the last request */
+ struct vclock stop_vclock;
+ vclock_copy(&stop_vclock, &replicaset.vclock);
+
+ /* Send end of snapshot data marker */
+ struct xrow_header row;
+ xrow_encode_vclock_xc(&row, &stop_vclock);
+ row.sync = header->sync;
+ coio_write_xrow(io, &row);
+}
+
+void
+box_process_register(struct ev_io *io, struct xrow_header *header)
+{
+ assert(header->type == IPROTO_REGISTER);
+
+ struct tt_uuid instance_uuid = uuid_nil;
+ struct vclock vclock;
+ xrow_decode_register_xc(header, &instance_uuid, &vclock);
+
+ if (!is_box_configured)
+ tnt_raise(ClientError, ER_LOADING);
+
+ if (tt_uuid_is_equal(&instance_uuid, &INSTANCE_UUID))
+ tnt_raise(ClientError, ER_CONNECTION_TO_SELF);
+
+ /* Forbid replication from an anonymous instance. */
+ if (replication_anon) {
+ tnt_raise(ClientError, ER_UNSUPPORTED, "Replication",
+ "replicating from an anonymous instance.");
+ }
+
+ access_check_universe_xc(PRIV_R);
+ /* We only get register requests from anonymous instances. */
+ struct replica *replica = replica_by_uuid(&instance_uuid);
+ if (replica && replica->id != REPLICA_ID_NIL) {
+ tnt_raise(ClientError, ER_REPLICA_NOT_ANON,
+ tt_uuid_str(&instance_uuid));
+ }
+ /* See box_process_join() */
+ box_check_writable_xc();
+ struct space *space = space_cache_find_xc(BOX_CLUSTER_ID);
+ access_check_space_xc(space, PRIV_W);
+
+ /* Forbid replication with disabled WAL */
+ if (wal_mode() == WAL_NONE) {
+ tnt_raise(ClientError, ER_UNSUPPORTED, "Replication",
+ "wal_mode = 'none'");
+ }
+
+ struct gc_consumer *gc = gc_consumer_register(&replicaset.vclock,
+ "replica %s", tt_uuid_str(&instance_uuid));
+ if (gc == NULL)
+ diag_raise();
+ auto gc_guard = make_scoped_guard([&] { gc_consumer_unregister(gc); });
+
+ say_info("registering replica %s at %s",
+ tt_uuid_str(&instance_uuid), sio_socketname(io->fd));
+
+ struct vclock start_vclock;
+ vclock_copy(&start_vclock, &replicaset.vclock);
+
+ /**
+ * Call the server-side hook which stores the replica uuid
+ * in _cluster space.
+ */
+ box_on_join(&instance_uuid);
+
+ ERROR_INJECT_YIELD(ERRINJ_REPLICA_JOIN_DELAY);
+
+ /* Remember master's vclock after the last request */
+ struct vclock stop_vclock;
+ vclock_copy(&stop_vclock, &replicaset.vclock);
+
+ /*
+ * Feed replica with WALs in range
+ * (start_vclock, stop_vclock) so that it gets its
+ * registration.
+ */
+ relay_final_join(io->fd, header->sync, &start_vclock, &stop_vclock);
+ say_info("final data sent.");
+
+ struct xrow_header row;
+ /* Send end of WAL stream marker */
+ xrow_encode_vclock_xc(&row, &replicaset.vclock);
+ row.sync = header->sync;
+ coio_write_xrow(io, &row);
+
+ /*
+ * Advance the WAL consumer state to the position where
+ * registration was complete and assign it to the
+ * replica.
+ */
+ gc_consumer_advance(gc, &stop_vclock);
+ replica = replica_by_uuid(&instance_uuid);
+ if (replica->gc != NULL)
+ gc_consumer_unregister(replica->gc);
+ replica->gc = gc;
+ gc_guard.is_active = false;
+}
+
void
box_process_join(struct ev_io *io, struct xrow_header *header)
{
@@ -1438,6 +1652,12 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
if (tt_uuid_is_equal(&instance_uuid, &INSTANCE_UUID))
tnt_raise(ClientError, ER_CONNECTION_TO_SELF);
+ /* Forbid replication from an anonymous instance. */
+ if (replication_anon) {
+ tnt_raise(ClientError, ER_UNSUPPORTED, "Replication",
+ "replicating from an anonymous instance.");
+ }
+
/* Check permissions */
access_check_universe_xc(PRIV_R);
@@ -1537,23 +1757,33 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
struct vclock replica_clock;
uint32_t replica_version_id;
vclock_create(&replica_clock);
+ bool anon;
xrow_decode_subscribe_xc(header, NULL, &replica_uuid,
- &replica_clock, &replica_version_id);
+ &replica_clock, &replica_version_id, &anon);
/* Forbid connection to itself */
if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID))
tnt_raise(ClientError, ER_CONNECTION_TO_SELF);
+ /* Forbid replication from an anonymous instance. */
+ if (replication_anon) {
+ tnt_raise(ClientError, ER_UNSUPPORTED, "Replication",
+ "replicating from an anonymous instance.");
+ }
+
/* Check permissions */
access_check_universe_xc(PRIV_R);
/* Check replica uuid */
struct replica *replica = replica_by_uuid(&replica_uuid);
- if (replica == NULL || replica->id == REPLICA_ID_NIL) {
+
+ if (!anon && (replica == NULL || replica->id == REPLICA_ID_NIL)) {
tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
tt_uuid_str(&replica_uuid),
tt_uuid_str(&REPLICASET_UUID));
}
+ if (replica == NULL)
+ replica = replicaset_add_anon(&replica_uuid);
/* Don't allow multiple relays for the same replica */
if (relay_get_state(replica->relay) == RELAY_FOLLOW) {
@@ -1774,13 +2004,17 @@ bootstrap_from_master(struct replica *master)
*/
assert(!tt_uuid_is_nil(&INSTANCE_UUID));
- applier_resume_to_state(applier, APPLIER_INITIAL_JOIN, TIMEOUT_INFINITY);
-
+ enum applier_state wait_state = replication_anon ?
+ APPLIER_FETCH_SNAPSHOT :
+ APPLIER_INITIAL_JOIN;
+ applier_resume_to_state(applier, wait_state, TIMEOUT_INFINITY);
/*
* Process initial data (snapshot or dirty disk data).
*/
engine_begin_initial_recovery_xc(NULL);
- applier_resume_to_state(applier, APPLIER_FINAL_JOIN, TIMEOUT_INFINITY);
+ wait_state = replication_anon ? APPLIER_FETCHED_SNAPSHOT :
+ APPLIER_FINAL_JOIN;
+ applier_resume_to_state(applier, wait_state, TIMEOUT_INFINITY);
/*
* Process final data (WALs).
@@ -1790,8 +2024,10 @@ bootstrap_from_master(struct replica *master)
recovery_journal_create(&journal, &replicaset.vclock);
journal_set(&journal.base);
- applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
-
+ if (!replication_anon) {
+ applier_resume_to_state(applier, APPLIER_JOINED,
+ TIMEOUT_INFINITY);
+ }
/* Finalize the new replica */
engine_end_recovery_xc();
@@ -2106,6 +2342,7 @@ box_cfg_xc(void)
box_set_replication_sync_lag();
box_set_replication_sync_timeout();
box_set_replication_skip_conflict();
+ box_set_replication_anon();
struct gc_checkpoint *checkpoint = gc_last_checkpoint();
@@ -2136,14 +2373,20 @@ box_cfg_xc(void)
}
fiber_gc();
- /* Check for correct registration of the instance in _cluster */
- {
- struct replica *self = replica_by_uuid(&INSTANCE_UUID);
+ /*
+ * Check for correct registration of the instance in _cluster
+ * The instance won't exist in _cluster space if it is an
+ * anonymous replica, add it manually.
+ */
+ struct replica *self = replica_by_uuid(&INSTANCE_UUID);
+ if (!replication_anon) {
if (self == NULL || self->id == REPLICA_ID_NIL) {
tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
tt_uuid_str(&INSTANCE_UUID),
tt_uuid_str(&REPLICASET_UUID));
}
+ } else if (self == NULL) {
+ replicaset_add_anon(&INSTANCE_UUID);
}
rmean_cleanup(rmean_box);
diff --git a/src/box/box.h b/src/box/box.h
index ccd527bd5..e4088d6b6 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -100,7 +100,7 @@ void
box_atfork(void);
void
-box_set_ro(bool ro);
+box_set_ro();
bool
box_is_ro(void);
@@ -179,6 +179,14 @@ box_reset_stat(void);
void
box_process_auth(struct auth_request *request, const char *salt);
+/** Send current read view to the replica. */
+void
+box_process_fetch_snapshot(struct ev_io *io, struct xrow_header *header);
+
+/** Register a replica */
+void
+box_process_register(struct ev_io *io, struct xrow_header *header);
+
/**
* Join a replica.
*
@@ -234,6 +242,7 @@ void box_set_replication_connect_quorum(void);
void box_set_replication_sync_lag(void);
void box_set_replication_sync_timeout(void);
void box_set_replication_skip_conflict(void);
+void box_set_replication_anon(void);
void box_set_net_msg_max(void);
extern "C" {
diff --git a/src/box/errcode.h b/src/box/errcode.h
index c660b1c70..dce69c9c0 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -258,6 +258,7 @@ struct errcode_record {
/*203 */_(ER_BOOTSTRAP_READONLY, "Trying to bootstrap a local read-only instance as master") \
/*204 */_(ER_SQL_FUNC_WRONG_RET_COUNT, "SQL expects exactly one argument returned from %s, got %d")\
/*205 */_(ER_FUNC_INVALID_RETURN_TYPE, "Function '%s' returned value of invalid type: expected %s got %s") \
+ /*206 */_(ER_REPLICA_NOT_ANON, "Replica '%s' is not anonymous and cannot register.") \
/*
* !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index c39b8e7bf..9e6bd2dd7 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1162,7 +1162,7 @@ static void
net_send_error(struct cmsg *msg);
static void
-tx_process_join_subscribe(struct cmsg *msg);
+tx_process_replication(struct cmsg *msg);
static void
net_end_join(struct cmsg *msg);
@@ -1212,12 +1212,12 @@ static const struct cmsg_hop *dml_route[IPROTO_TYPE_STAT_MAX] = {
};
static const struct cmsg_hop join_route[] = {
- { tx_process_join_subscribe, &net_pipe },
+ { tx_process_replication, &net_pipe },
{ net_end_join, NULL },
};
static const struct cmsg_hop subscribe_route[] = {
- { tx_process_join_subscribe, &net_pipe },
+ { tx_process_replication, &net_pipe },
{ net_end_subscribe, NULL },
};
@@ -1272,6 +1272,8 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
cmsg_init(&msg->base, misc_route);
break;
case IPROTO_JOIN:
+ case IPROTO_FETCH_SNAPSHOT:
+ case IPROTO_REGISTER:
cmsg_init(&msg->base, join_route);
*stop_input = true;
break;
@@ -1752,7 +1754,7 @@ error:
}
static void
-tx_process_join_subscribe(struct cmsg *m)
+tx_process_replication(struct cmsg *m)
{
struct iproto_msg *msg = tx_accept_msg(m);
struct iproto_connection *con = msg->connection;
@@ -1768,6 +1770,12 @@ tx_process_join_subscribe(struct cmsg *m)
*/
box_process_join(&io, &msg->header);
break;
+ case IPROTO_FETCH_SNAPSHOT:
+ box_process_fetch_snapshot(&io, &msg->header);
+ break;
+ case IPROTO_REGISTER:
+ box_process_register(&io, &msg->header);
+ break;
case IPROTO_SUBSCRIBE:
/*
* Subscribe never returns - unless there
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 5e8a7d483..10a9a0102 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -120,6 +120,8 @@ enum iproto_key {
* }
*/
IPROTO_SQL_INFO = 0x42,
+ /* Leave a gap between SQL keys and additional request keys */
+ IPROTO_REPLICA_ANON = 0x50,
IPROTO_KEY_MAX
};
@@ -216,6 +218,10 @@ enum iproto_type {
IPROTO_VOTE_DEPRECATED = 67,
/** Vote request command for master election */
IPROTO_VOTE = 68,
+ /** Anonymous replication FETCH SNAPSHOT. */
+ IPROTO_FETCH_SNAPSHOT = 69,
+ /** REGISTER request to leave anonymous replication. */
+ IPROTO_REGISTER = 70,
/** Vinyl run info stored in .index file */
VY_INDEX_RUN_INFO = 100,
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 4884ce013..f59470774 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -190,7 +190,7 @@ static int
lbox_cfg_set_read_only(struct lua_State *L)
{
try {
- box_set_ro(cfg_geti("read_only") != 0);
+ box_set_ro();
} catch (Exception *) {
luaT_error(L);
}
@@ -338,6 +338,17 @@ lbox_cfg_set_replication_sync_timeout(struct lua_State *L)
return 0;
}
+static int
+lbox_cfg_set_replication_anon(struct lua_State *L)
+{
+ try {
+ box_set_replication_anon();
+ } catch (Exception *) {
+ luaT_error(L);
+ }
+ return 0;
+}
+
static int
lbox_cfg_set_replication_skip_conflict(struct lua_State *L)
{
@@ -377,6 +388,7 @@ box_lua_cfg_init(struct lua_State *L)
{"cfg_set_replication_sync_lag", lbox_cfg_set_replication_sync_lag},
{"cfg_set_replication_sync_timeout", lbox_cfg_set_replication_sync_timeout},
{"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict},
+ {"cfg_set_replication_anon", lbox_cfg_set_replication_anon},
{"cfg_set_net_msg_max", lbox_cfg_set_net_msg_max},
{NULL, NULL}
};
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index e029e0e17..b5909a878 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -223,7 +223,7 @@ lbox_info_id(struct lua_State *L)
* at box.info.status.
*/
struct replica *self = replica_by_uuid(&INSTANCE_UUID);
- if (self != NULL && self->id != REPLICA_ID_NIL) {
+ if (self != NULL && (self->id != REPLICA_ID_NIL || replication_anon)) {
lua_pushinteger(L, self->id);
} else {
luaL_pushnull(L);
@@ -243,7 +243,7 @@ lbox_info_lsn(struct lua_State *L)
{
/* See comments in lbox_info_id */
struct replica *self = replica_by_uuid(&INSTANCE_UUID);
- if (self != NULL && self->id != REPLICA_ID_NIL) {
+ if (self != NULL && (self->id != REPLICA_ID_NIL || replication_anon)) {
luaL_pushint64(L, vclock_get(box_vclock, self->id));
} else {
luaL_pushint64(L, -1);
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 85617c8f0..9dee75b7d 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -77,6 +77,7 @@ local default_cfg = {
replication_connect_timeout = 30,
replication_connect_quorum = nil, -- connect all
replication_skip_conflict = false,
+ replication_anon = false,
feedback_enabled = true,
feedback_host = "https://feedback.tarantool.io",
feedback_interval = 3600,
@@ -140,6 +141,7 @@ local template_cfg = {
replication_connect_timeout = 'number',
replication_connect_quorum = 'number',
replication_skip_conflict = 'boolean',
+ replication_anon = 'boolean',
feedback_enabled = 'boolean',
feedback_host = 'string',
feedback_interval = 'number',
@@ -247,6 +249,7 @@ local dynamic_cfg = {
replication_sync_lag = private.cfg_set_replication_sync_lag,
replication_sync_timeout = private.cfg_set_replication_sync_timeout,
replication_skip_conflict = private.cfg_set_replication_skip_conflict,
+ replication_anon = private.cfg_set_replication_anon,
instance_uuid = check_instance_uuid,
replicaset_uuid = check_replicaset_uuid,
net_msg_max = private.cfg_set_net_msg_max,
@@ -301,6 +304,7 @@ local dynamic_cfg_skip_at_load = {
replication_sync_lag = true,
replication_sync_timeout = true,
replication_skip_conflict = true,
+ replication_anon = true,
wal_dir_rescan_delay = true,
custom_proc_title = true,
force_recovery = true,
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index d122d618a..64aa467b1 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -262,9 +262,12 @@ recover_xlog(struct recovery *r, struct xstream *stream,
/*
* All rows in xlog files have an assigned
- * replica id.
+ * replica id. The only exception is anonymous
+ * replica, which has a zero instance id.
+ * In this case the only rows from such an instance
+ * can be for the local spaces.
*/
- assert(row.replica_id != 0);
+ assert(row.replica_id != 0 || row.group_id == GROUP_LOCAL);
/*
* We can promote the vclock either before or
* after xstream_write(): it only makes any impact
diff --git a/src/box/relay.cc b/src/box/relay.cc
index e849fcf4f..22029751f 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -569,11 +569,16 @@ relay_subscribe_f(va_list ap)
cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
&relay->relay_pipe, NULL, NULL, cbus_process);
- /* Setup garbage collection trigger. */
+ /*
+ * Setup garbage collection trigger.
+ * Not needed for anonymous replicas, since they
+ * aren't registered with gc at all.
+ */
struct trigger on_close_log = {
RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL
};
- trigger_add(&r->on_close_log, &on_close_log);
+ if (!relay->replica->anon)
+ trigger_add(&r->on_close_log, &on_close_log);
/* Setup WAL watcher for sending new rows to the replica. */
wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
@@ -652,7 +657,8 @@ relay_subscribe_f(va_list ap)
say_crit("exiting the relay loop");
/* Clear garbage collector trigger and WAL watcher. */
- trigger_clear(&on_close_log);
+ if (!relay->replica->anon)
+ trigger_clear(&on_close_log);
wal_clear_watcher(&relay->wal_watcher, cbus_process);
/* Join ack reader fiber. */
@@ -673,7 +679,7 @@ void
relay_subscribe(struct replica *replica, int fd, uint64_t sync,
struct vclock *replica_clock, uint32_t replica_version_id)
{
- assert(replica->id != REPLICA_ID_NIL);
+ assert(replica->anon || replica->id != REPLICA_ID_NIL);
struct relay *relay = replica->relay;
assert(relay->state != RELAY_FOLLOW);
/*
@@ -681,7 +687,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
* unless it has already been registered by initial
* join.
*/
- if (replica->gc == NULL) {
+ if (replica->gc == NULL && !replica->anon) {
replica->gc = gc_consumer_register(replica_clock, "replica %s",
tt_uuid_str(&replica->uuid));
if (replica->gc == NULL)
@@ -691,7 +697,10 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
relay_start(relay, fd, sync, relay_send_row);
auto relay_guard = make_scoped_guard([=] {
relay_stop(relay);
- replica_on_relay_stop(replica);
+ if (replica->anon)
+ replica_anon_delete(replica);
+ else
+ replica_on_relay_stop(replica);
});
vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
@@ -741,6 +750,13 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
{
struct relay *relay = container_of(stream, struct relay, stream);
assert(iproto_type_is_dml(packet->type));
+ /*
+ * Replica-local requests generated while replica was
+ * anonymous have a zero instance id. Just skip all
+ * these rows.
+ */
+ if (packet->replica_id == REPLICA_ID_NIL)
+ return;
/*
* Transform replica local requests to IPROTO_NOP so as to
* promote vclock on the replica without actually modifying
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 81f19aa07..ce707811a 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -53,6 +53,7 @@ int replication_connect_quorum = REPLICATION_CONNECT_QUORUM_ALL;
double replication_sync_lag = 10.0; /* seconds */
double replication_sync_timeout = 300.0; /* seconds */
bool replication_skip_conflict = false;
+bool replication_anon = false;
struct replicaset replicaset;
@@ -172,6 +173,7 @@ replica_new(void)
diag_raise();
}
replica->id = 0;
+ replica->anon = false;
replica->uuid = uuid_nil;
replica->applier = NULL;
replica->gc = NULL;
@@ -209,6 +211,19 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid)
return replica;
}
+struct replica *
+replicaset_add_anon(const struct tt_uuid *replica_uuid)
+{
+ assert(!tt_uuid_is_nil(replica_uuid));
+ assert(replica_by_uuid(replica_uuid) == NULL);
+
+ struct replica *replica = replica_new();
+ replica->uuid = *replica_uuid;
+ replica_hash_insert(&replicaset.hash, replica);
+ replica->anon = true;
+ return replica;
+}
+
void
replica_set_id(struct replica *replica, uint32_t replica_id)
{
@@ -220,11 +235,21 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
/* Assign local replica id */
assert(instance_id == REPLICA_ID_NIL);
instance_id = replica_id;
+ } else if (replica->anon) {
+ /*
+ * Set replica gc on its transition from
+ * anonymous to a normal one.
+ */
+ assert(replica->gc == NULL);
+ replica->gc = gc_consumer_register(&replicaset.vclock,
+ "replica %s",
+ tt_uuid_str(&replica->uuid));
}
replicaset.replica_by_id[replica_id] = replica;
say_info("assigned id %d to replica %s",
replica->id, tt_uuid_str(&replica->uuid));
+ replica->anon = false;
}
void
@@ -268,7 +293,7 @@ replica_clear_id(struct replica *replica)
}
}
-static void
+void
replica_set_applier(struct replica *replica, struct applier *applier)
{
assert(replica->applier == NULL);
@@ -277,7 +302,7 @@ replica_set_applier(struct replica *replica, struct applier *applier)
&replica->on_applier_state);
}
-static void
+void
replica_clear_applier(struct replica *replica)
{
assert(replica->applier != NULL);
@@ -880,6 +905,18 @@ replica_on_relay_stop(struct replica *replica)
}
}
+void
+replica_anon_delete(struct replica *replica)
+{
+ assert(replica->gc == NULL);
+ assert(replica->id == REPLICA_ID_NIL);
+ /* We do not replicate from anonymous replicas */
+ assert(replica->applier == NULL);
+ replica_hash_remove(&replicaset.hash, replica);
+ replica_delete(replica);
+}
+
+
struct replica *
replicaset_first(void)
{
diff --git a/src/box/replication.h b/src/box/replication.h
index 470420592..978a09d41 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -137,6 +137,12 @@ extern double replication_sync_timeout;
*/
extern bool replication_skip_conflict;
+/**
+ * Whether this replica will be anonymous or not, e.g. be preset
+ * in _cluster table and have a non-zero id.
+ */
+extern bool replication_anon;
+
/**
* Wait for the given period of time before trying to reconnect
* to a master.
@@ -265,6 +271,12 @@ struct replica {
* registered in the _cluster space yet.
*/
uint32_t id;
+ /**
+ * Whether this is an anonymous replica, e.g. a read-only
+ * replica that doesn't have an id and isn't present in
+ * _cluster table.
+ */
+ bool anon;
/** Applier fiber. */
struct applier *applier;
/** Relay thread. */
@@ -343,12 +355,21 @@ replica_set_id(struct replica *replica, uint32_t id);
void
replica_clear_id(struct replica *replica);
+void
+replica_clear_applier(struct replica *replica);
+
+void
+replica_set_applier(struct replica * replica, struct applier * applier);
+
/**
* Unregister \a relay from the \a replica.
*/
void
replica_on_relay_stop(struct replica *replica);
+void
+replica_anon_delete(struct replica *replica);
+
#if defined(__cplusplus)
} /* extern "C" */
@@ -364,6 +385,9 @@ replica_check_id(uint32_t replica_id);
struct replica *
replicaset_add(uint32_t replica_id, const struct tt_uuid *instance_uuid);
+struct replica *
+replicaset_add_anon(const struct tt_uuid *replica_uuid);
+
/**
* Try to connect appliers to remote peers and receive UUID.
* Appliers that did not connect will connect asynchronously.
diff --git a/src/box/wal.c b/src/box/wal.c
index 5e2c13e0e..2b238b743 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -930,6 +930,10 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
if ((*row)->replica_id == 0) {
(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
vclock_get(base, instance_id);
+ /*
+ * Note, an anonymous replica signs local
+ * rows whith a zero instance id.
+ */
(*row)->replica_id = instance_id;
/* Use lsn of the first local row as transaction id. */
tsn = tsn == 0 ? (*row)->lsn : tsn;
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 18bf08971..27623d5df 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1148,11 +1148,40 @@ err:
return -1;
}
+int
+xrow_encode_register(struct xrow_header *row,
+ const struct tt_uuid *instance_uuid,
+ const struct vclock *vclock)
+{
+ memset(row, 0, sizeof(*row));
+ size_t size = mp_sizeof_map(2) +
+ mp_sizeof_uint(IPROTO_INSTANCE_UUID) +
+ mp_sizeof_str(UUID_STR_LEN) +
+ mp_sizeof_uint(IPROTO_VCLOCK) + mp_sizeof_vclock(vclock);
+ char *buf = (char *) region_alloc(&fiber()->gc, size);
+ if (buf == NULL) {
+ diag_set(OutOfMemory, size, "region_alloc", "buf");
+ return -1;
+ }
+ char *data = buf;
+ data = mp_encode_map(data, 2);
+ data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
+ data = xrow_encode_uuid(data, instance_uuid);
+ data = mp_encode_uint(data, IPROTO_VCLOCK);
+ data = mp_encode_vclock(data, vclock);
+ assert(data <= buf + size);
+ row->body[0].iov_base = buf;
+ row->body[0].iov_len = (data - buf);
+ row->bodycnt = 1;
+ row->type = IPROTO_REGISTER;
+ return 0;
+}
+
int
xrow_encode_subscribe(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
- const struct vclock *vclock)
+ const struct vclock *vclock, bool anon)
{
memset(row, 0, sizeof(*row));
size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
@@ -1162,7 +1191,7 @@ xrow_encode_subscribe(struct xrow_header *row,
return -1;
}
char *data = buf;
- data = mp_encode_map(data, 4);
+ data = mp_encode_map(data, 5);
data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
data = xrow_encode_uuid(data, replicaset_uuid);
data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
@@ -1171,6 +1200,8 @@ xrow_encode_subscribe(struct xrow_header *row,
data = mp_encode_vclock(data, vclock);
data = mp_encode_uint(data, IPROTO_SERVER_VERSION);
data = mp_encode_uint(data, tarantool_version_id());
+ data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
+ data = mp_encode_bool(data, anon);
assert(data <= buf + size);
row->body[0].iov_base = buf;
row->body[0].iov_len = (data - buf);
@@ -1182,7 +1213,7 @@ xrow_encode_subscribe(struct xrow_header *row,
int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
- uint32_t *version_id)
+ uint32_t *version_id, bool *anon)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -1198,6 +1229,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
return -1;
}
+ if (anon)
+ *anon = false;
d = data;
uint32_t map_size = mp_decode_map(&d);
for (uint32_t i = 0; i < map_size; i++) {
@@ -1245,6 +1278,16 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
}
*version_id = mp_decode_uint(&d);
break;
+ case IPROTO_REPLICA_ANON:
+ if (anon == NULL)
+ goto skip;
+ if (mp_typeof(*d) != MP_BOOL) {
+ xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
+ "invalid REPLICA_ANON flag");
+ return -1;
+ }
+ *anon = mp_decode_bool(&d);
+ break;
default: skip:
mp_next(&d); /* value */
}
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 60def2d3c..b8da3a0d0 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -301,12 +301,27 @@ xrow_decode_ballot(struct xrow_header *row, struct ballot *ballot);
void
xrow_encode_vote(struct xrow_header *row);
+/**
+ * Encode REGISTER command.
+ * @param[out] Row.
+ * @param instance_uuid Instance uuid.
+ * @param vclock Replication clock.
+ *
+ * @retval 0 Success.
+ * @retval -1 Memory error.
+ */
+int
+xrow_encode_register(struct xrow_header *row,
+ const struct tt_uuid *instance_uuid,
+ const struct vclock *vclock);
+
/**
* Encode SUBSCRIBE command.
* @param[out] Row.
* @param replicaset_uuid Replica set uuid.
* @param instance_uuid Instance uuid.
* @param vclock Replication clock.
+ * @param anon Whether it is an anonymous subscribe request or not.
*
* @retval 0 Success.
* @retval -1 Memory error.
@@ -315,7 +330,7 @@ int
xrow_encode_subscribe(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
- const struct vclock *vclock);
+ const struct vclock *vclock, bool anon);
/**
* Decode SUBSCRIBE command.
@@ -324,6 +339,7 @@ xrow_encode_subscribe(struct xrow_header *row,
* @param[out] instance_uuid.
* @param[out] vclock.
* @param[out] version_id.
+ * @param[out] anon Whether it is an anonymous subscribe.
*
* @retval 0 Success.
* @retval -1 Memory or format error.
@@ -331,7 +347,7 @@ xrow_encode_subscribe(struct xrow_header *row,
int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
- uint32_t *version_id);
+ uint32_t *version_id, bool *anon);
/**
* Encode JOIN command.
@@ -355,7 +371,22 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid);
static inline int
xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid)
{
- return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL);
+ return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, NULL);
+}
+
+/**
+ * Decode REGISTER request.
+ * @param row Row to decode.
+ * @param[out] instance_uuid Instance uuid.
+ * @param[out] vclock Instance vclock.
+ * @retval 0 Success.
+ * @retval -1 Memory or format error.
+ */
+static inline int
+xrow_decode_register(struct xrow_header *row, struct tt_uuid *instance_uuid,
+ struct vclock *vclock)
+{
+ return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, NULL, NULL);
}
/**
@@ -380,7 +411,7 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock);
static inline int
xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock)
{
- return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL);
+ return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL);
}
/**
@@ -411,7 +442,7 @@ xrow_decode_subscribe_response(struct xrow_header *row,
struct tt_uuid *replicaset_uuid,
struct vclock *vclock)
{
- return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL);
+ return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL, NULL);
}
/**
@@ -769,15 +800,25 @@ xrow_decode_ballot_xc(struct xrow_header *row, struct ballot *ballot)
diag_raise();
}
+/** @copydoc xrow_encode_register. */
+static inline void
+xrow_encode_register_xc(struct xrow_header *row,
+ const struct tt_uuid *instance_uuid,
+ const struct vclock *vclock)
+{
+ if (xrow_encode_register(row, instance_uuid, vclock) != 0)
+ diag_raise();
+}
+
/** @copydoc xrow_encode_subscribe. */
static inline void
xrow_encode_subscribe_xc(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
- const struct vclock *vclock)
+ const struct vclock *vclock, bool anon)
{
if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,
- vclock) != 0)
+ vclock, anon) != 0)
diag_raise();
}
@@ -786,10 +827,10 @@ static inline void
xrow_decode_subscribe_xc(struct xrow_header *row,
struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
- uint32_t *replica_version_id)
+ uint32_t *replica_version_id, bool *anon)
{
if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
- vclock, replica_version_id) != 0)
+ vclock, replica_version_id, anon) != 0)
diag_raise();
}
@@ -810,6 +851,15 @@ xrow_decode_join_xc(struct xrow_header *row, struct tt_uuid *instance_uuid)
diag_raise();
}
+/** @copydoc xrow_decode_register. */
+static inline void
+xrow_decode_register_xc(struct xrow_header *row, struct tt_uuid *instance_uuid,
+ struct vclock *vclock)
+{
+ if (xrow_decode_register(row, instance_uuid, vclock) != 0)
+ diag_raise();
+}
+
/** @copydoc xrow_encode_vclock. */
static inline void
xrow_encode_vclock_xc(struct xrow_header *row, const struct vclock *vclock)
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 799297ba0..7aec1d715 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -25,30 +25,31 @@ box.cfg
20 pid_file:box.pid
21 read_only:false
22 readahead:16320
-23 replication_connect_timeout:30
-24 replication_skip_conflict:false
-25 replication_sync_lag:10
-26 replication_sync_timeout:300
-27 replication_timeout:1
-28 slab_alloc_factor:1.05
-29 strip_core:true
-30 too_long_threshold:0.5
-31 vinyl_bloom_fpr:0.05
-32 vinyl_cache:134217728
-33 vinyl_dir:.
-34 vinyl_max_tuple_size:1048576
-35 vinyl_memory:134217728
-36 vinyl_page_size:8192
-37 vinyl_read_threads:1
-38 vinyl_run_count_per_level:2
-39 vinyl_run_size_ratio:3.5
-40 vinyl_timeout:60
-41 vinyl_write_threads:4
-42 wal_dir:.
-43 wal_dir_rescan_delay:2
-44 wal_max_size:268435456
-45 wal_mode:write
-46 worker_pool_threads:4
+23 replication_anon:false
+24 replication_connect_timeout:30
+25 replication_skip_conflict:false
+26 replication_sync_lag:10
+27 replication_sync_timeout:300
+28 replication_timeout:1
+29 slab_alloc_factor:1.05
+30 strip_core:true
+31 too_long_threshold:0.5
+32 vinyl_bloom_fpr:0.05
+33 vinyl_cache:134217728
+34 vinyl_dir:.
+35 vinyl_max_tuple_size:1048576
+36 vinyl_memory:134217728
+37 vinyl_page_size:8192
+38 vinyl_read_threads:1
+39 vinyl_run_count_per_level:2
+40 vinyl_run_size_ratio:3.5
+41 vinyl_timeout:60
+42 vinyl_write_threads:4
+43 wal_dir:.
+44 wal_dir_rescan_delay:2
+45 wal_max_size:268435456
+46 wal_mode:write
+47 worker_pool_threads:4
--
-- Test insert from detached fiber
--
diff --git a/test/box/admin.result b/test/box/admin.result
index 6126f3a97..5a03a979a 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -71,6 +71,8 @@ cfg_filter(box.cfg)
- false
- - readahead
- 16320
+ - - replication_anon
+ - false
- - replication_connect_timeout
- 30
- - replication_skip_conflict
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 5370bb870..d6ce6b621 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -59,6 +59,8 @@ cfg_filter(box.cfg)
| - false
| - - readahead
| - 16320
+ | - - replication_anon
+ | - false
| - - replication_connect_timeout
| - 30
| - - replication_skip_conflict
@@ -158,6 +160,8 @@ cfg_filter(box.cfg)
| - false
| - - readahead
| - 16320
+ | - - replication_anon
+ | - false
| - - replication_connect_timeout
| - 30
| - - replication_skip_conflict
diff --git a/test/box/misc.result b/test/box/misc.result
index d2a20307a..134caef63 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -554,6 +554,7 @@ t;
203: box.error.BOOTSTRAP_READONLY
204: box.error.SQL_FUNC_WRONG_RET_COUNT
205: box.error.FUNC_INVALID_RETURN_TYPE
+ 206: box.error.REPLICA_NOT_ANON
...
test_run:cmd("setopt delimiter ''");
---
diff --git a/test/replication/anon.lua b/test/replication/anon.lua
new file mode 100644
index 000000000..2e7ee9983
--- /dev/null
+++ b/test/replication/anon.lua
@@ -0,0 +1,13 @@
+#!/usr/bin/env tarantool
+
+box.cfg({
+ listen = os.getenv("LISTEN"),
+ replication = os.getenv("MASTER"),
+ memtx_memory = 107374182,
+ replication_timeout = 0.1,
+ replication_connect_timeout = 0.5,
+ read_only=true,
+ replication_anon=true,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/anon.result b/test/replication/anon.result
new file mode 100644
index 000000000..f6a85e7d0
--- /dev/null
+++ b/test/replication/anon.result
@@ -0,0 +1,326 @@
+-- test-run result file version 2
+env = require('test_run')
+ | ---
+ | ...
+vclock_diff = require('fast_replica').vclock_diff
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+
+
+--
+-- gh-3186 Anonymous replicas.
+--
+-- Prepare master.
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+_ = box.schema.space.create('loc', {is_local=true})
+ | ---
+ | ...
+_ = box.schema.space.create('temp', {temporary=true})
+ | ---
+ | ...
+_ = box.schema.space.create('test')
+ | ---
+ | ...
+_ = box.space.loc:create_index('pk')
+ | ---
+ | ...
+_ = box.space.temp:create_index('pk')
+ | ---
+ | ...
+_ = box.space.test:create_index('pk')
+ | ---
+ | ...
+box.space.test:insert{1}
+ | ---
+ | - [1]
+ | ...
+
+test_run:cmd('create server replica_anon with rpl_master=default, script="replication/anon1.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica_anon')
+ | ---
+ | - true
+ | ...
+test_run:cmd('switch replica_anon')
+ | ---
+ | - true
+ | ...
+
+box.info.status
+ | ---
+ | - running
+ | ...
+box.info.id
+ | ---
+ | - 0
+ | ...
+box.info.lsn
+ | ---
+ | - 0
+ | ...
+test_run:wait_upstream(1, {status='follow'})
+ | ---
+ | - true
+ | ...
+
+-- Temporary spaces are accessible as read / write.
+for i = 1,10 do box.space.temp:insert{i} end
+ | ---
+ | ...
+box.space.temp:select{}
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | - [4]
+ | - [5]
+ | - [6]
+ | - [7]
+ | - [8]
+ | - [9]
+ | - [10]
+ | ...
+
+box.info.lsn
+ | ---
+ | - 0
+ | ...
+
+-- Same for local spaces.
+for i = 1,10 do box.space.loc:insert{i} end
+ | ---
+ | ...
+box.space.loc:select{}
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | - [4]
+ | - [5]
+ | - [6]
+ | - [7]
+ | - [8]
+ | - [9]
+ | - [10]
+ | ...
+
+-- Replica-local changes are accounted for in 0 vclock component.
+box.info.lsn
+ | ---
+ | - 10
+ | ...
+box.info.vclock[0]
+ | ---
+ | - 10
+ | ...
+
+-- Replica is read-only.
+box.cfg.read_only
+ | ---
+ | - true
+ | ...
+box.cfg{read_only=false}
+ | ---
+ | - error: 'Incorrect value for option ''read_only'': the value may be set to false
+ | only when replication_anon is false'
+ | ...
+
+box.space.test:insert{2}
+ | ---
+ | - error: Can't modify data because this instance is in read-only mode.
+ | ...
+
+box.space.loc:drop()
+ | ---
+ | - error: Can't modify data because this instance is in read-only mode.
+ | ...
+box.space.loc:truncate()
+ | ---
+ | - error: Can't modify data because this instance is in read-only mode.
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+
+-- Replica isn't visible on master.
+#box.info.replication
+ | ---
+ | - 1
+ | ...
+
+-- Test that replication (even anonymous) from an anonymous
+-- instance is forbidden. An anonymous replica will fetch
+-- a snapshot though.
+test_run:cmd([[create server replica_anon2 with rpl_master=replica_anon,\
+ script="replication/anon2.lua"]])
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica_anon2')
+ | ---
+ | - true
+ | ...
+test_run:wait_log('replica_anon2',\
+ 'Replication does not support replicating from an anonymous instance',\
+ nil, 10)
+ | ---
+ | - Replication does not support replicating from an anonymous instance
+ | ...
+test_run:cmd('switch replica_anon2')
+ | ---
+ | - true
+ | ...
+a = box.info.vclock[1]
+ | ---
+ | ...
+-- The instance did fetch a snapshot.
+a > 0
+ | ---
+ | - true
+ | ...
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+box.space.test:insert{2}
+ | ---
+ | - [2]
+ | ...
+test_run:cmd("switch replica_anon2")
+ | ---
+ | - true
+ | ...
+-- Second replica doesn't follow master through the
+-- 1st one. Replication from an anonymous instance
+-- is forbidden indeed.
+box.info.vclock[1] == a or box.info.vclock[1]
+ | ---
+ | - true
+ | ...
+
+test_run:cmd('switch replica_anon')
+ | ---
+ | - true
+ | ...
+
+test_run:cmd('stop server replica_anon2')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica_anon2')
+ | ---
+ | - true
+ | ...
+
+-- Promote anonymous replica.
+box.cfg{replication_anon=false}
+ | ---
+ | ...
+-- Cannot switch back after becoming "normal".
+box.cfg{replication_anon=true}
+ | ---
+ | - error: 'Incorrect value for option ''replication_anon'': cannot be turned on after
+ | bootstrap has finished'
+ | ...
+
+box.info.id
+ | ---
+ | - 2
+ | ...
+#box.info.replication
+ | ---
+ | - 2
+ | ...
+test_run:wait_upstream(1, {status='follow'})
+ | ---
+ | - true
+ | ...
+box.info.replication.downstream
+ | ---
+ | - null
+ | ...
+
+old_lsn = box.info.vclock[2] or 0
+ | ---
+ | ...
+
+-- Now read_only can be turned off.
+box.cfg{read_only=false}
+ | ---
+ | ...
+box.space.test:insert{3}
+ | ---
+ | - [3]
+ | ...
+-- New changes are tracked under freshly assigned id.
+box.info.vclock[2] == old_lsn + 1
+ | ---
+ | - true
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+
+-- Other instances may replicate from a previously-anonymous one.
+test_run:cmd("set variable repl_source to 'replica_anon.listen'")
+ | ---
+ | - true
+ | ...
+box.cfg{replication=repl_source}
+ | ---
+ | ...
+#box.info.replication
+ | ---
+ | - 2
+ | ...
+test_run:wait_upstream(2, {status='follow'})
+ | ---
+ | - true
+ | ...
+test_run:wait_downstream(2, {status='follow'})
+ | ---
+ | - true
+ | ...
+#box.info.vclock
+ | ---
+ | - 2
+ | ...
+
+-- Cleanup.
+box.cfg{replication=""}
+ | ---
+ | ...
+test_run:cmd('stop server replica_anon')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica_anon')
+ | ---
+ | - true
+ | ...
+box.space.test:drop()
+ | ---
+ | ...
+box.space.temp:drop()
+ | ---
+ | ...
+box.space.loc:drop()
+ | ---
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
+test_run:cleanup_cluster()
+ | ---
+ | ...
diff --git a/test/replication/anon.test.lua b/test/replication/anon.test.lua
new file mode 100644
index 000000000..1877ec8bb
--- /dev/null
+++ b/test/replication/anon.test.lua
@@ -0,0 +1,118 @@
+env = require('test_run')
+vclock_diff = require('fast_replica').vclock_diff
+test_run = env.new()
+
+
+--
+-- gh-3186 Anonymous replicas.
+--
+-- Prepare master.
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('loc', {is_local=true})
+_ = box.schema.space.create('temp', {temporary=true})
+_ = box.schema.space.create('test')
+_ = box.space.loc:create_index('pk')
+_ = box.space.temp:create_index('pk')
+_ = box.space.test:create_index('pk')
+box.space.test:insert{1}
+
+test_run:cmd('create server replica_anon with rpl_master=default, script="replication/anon1.lua"')
+test_run:cmd('start server replica_anon')
+test_run:cmd('switch replica_anon')
+
+box.info.status
+box.info.id
+box.info.lsn
+test_run:wait_upstream(1, {status='follow'})
+
+-- Temporary spaces are accessible as read / write.
+for i = 1,10 do box.space.temp:insert{i} end
+box.space.temp:select{}
+
+box.info.lsn
+
+-- Same for local spaces.
+for i = 1,10 do box.space.loc:insert{i} end
+box.space.loc:select{}
+
+-- Replica-local changes are accounted for in 0 vclock component.
+box.info.lsn
+box.info.vclock[0]
+
+-- Replica is read-only.
+box.cfg.read_only
+box.cfg{read_only=false}
+
+box.space.test:insert{2}
+
+box.space.loc:drop()
+box.space.loc:truncate()
+
+test_run:cmd('switch default')
+
+-- Replica isn't visible on master.
+#box.info.replication
+
+-- Test that replication (even anonymous) from an anonymous
+-- instance is forbidden. An anonymous replica will fetch
+-- a snapshot though.
+test_run:cmd([[create server replica_anon2 with rpl_master=replica_anon,\
+ script="replication/anon2.lua"]])
+test_run:cmd('start server replica_anon2')
+test_run:wait_log('replica_anon2',\
+ 'Replication does not support replicating from an anonymous instance',\
+ nil, 10)
+test_run:cmd('switch replica_anon2')
+a = box.info.vclock[1]
+-- The instance did fetch a snapshot.
+a > 0
+test_run:cmd('switch default')
+box.space.test:insert{2}
+test_run:cmd("switch replica_anon2")
+-- Second replica doesn't follow master through the
+-- 1st one. Replication from an anonymous instance
+-- is forbidden indeed.
+box.info.vclock[1] == a or box.info.vclock[1]
+
+test_run:cmd('switch replica_anon')
+
+test_run:cmd('stop server replica_anon2')
+test_run:cmd('delete server replica_anon2')
+
+-- Promote anonymous replica.
+box.cfg{replication_anon=false}
+-- Cannot switch back after becoming "normal".
+box.cfg{replication_anon=true}
+
+box.info.id
+#box.info.replication
+test_run:wait_upstream(1, {status='follow'})
+box.info.replication.downstream
+
+old_lsn = box.info.vclock[2] or 0
+
+-- Now read_only can be turned off.
+box.cfg{read_only=false}
+box.space.test:insert{3}
+-- New changes are tracked under freshly assigned id.
+box.info.vclock[2] == old_lsn + 1
+
+test_run:cmd('switch default')
+
+-- Other instances may replicate from a previously-anonymous one.
+test_run:cmd("set variable repl_source to 'replica_anon.listen'")
+box.cfg{replication=repl_source}
+#box.info.replication
+test_run:wait_upstream(2, {status='follow'})
+test_run:wait_downstream(2, {status='follow'})
+#box.info.vclock
+
+-- Cleanup.
+box.cfg{replication=""}
+test_run:cmd('stop server replica_anon')
+test_run:cmd('delete server replica_anon')
+box.space.test:drop()
+box.space.temp:drop()
+box.space.loc:drop()
+box.schema.user.revoke('guest', 'replication')
+test_run:cleanup_cluster()
diff --git a/test/replication/anon1.lua b/test/replication/anon1.lua
new file mode 120000
index 000000000..6638147e5
--- /dev/null
+++ b/test/replication/anon1.lua
@@ -0,0 +1 @@
+anon.lua
\ No newline at end of file
diff --git a/test/replication/anon2.lua b/test/replication/anon2.lua
new file mode 120000
index 000000000..6638147e5
--- /dev/null
+++ b/test/replication/anon2.lua
@@ -0,0 +1 @@
+anon.lua
\ No newline at end of file
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index cd686a0e2..429c64df3 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -1,4 +1,5 @@
{
+ "anon.test.lua": {},
"misc.test.lua": {},
"once.test.lua": {},
"on_replace.test.lua": {},
--
2.20.1 (Apple Git-117)
More information about the Tarantool-patches
mailing list