* [RFC PATCH 01/12] recovery: drop unused recovery_exit
2018-06-06 17:45 [RFC PATCH 00/12] Replica rejoin Vladimir Davydov
@ 2018-06-06 17:45 ` Vladimir Davydov
2018-06-08 4:13 ` Konstantin Osipov
2018-06-06 17:45 ` [RFC PATCH 02/12] recovery: constify vclock argument Vladimir Davydov
` (10 subsequent siblings)
11 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-06-06 17:45 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
---
src/box/recovery.cc | 8 --------
src/box/recovery.h | 4 ----
2 files changed, 12 deletions(-)
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index f8ec3971..a5e0ad34 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -169,14 +169,6 @@ recovery_delete(struct recovery *r)
free(r);
}
-void
-recovery_exit(struct recovery *r)
-{
- /* Avoid fibers, there is no event loop */
- r->watcher = NULL;
- recovery_delete(r);
-}
-
/**
* Read all rows in a file starting from the last position.
* Advance the position. If end of file is reached,
diff --git a/src/box/recovery.h b/src/box/recovery.h
index 562a1fbb..b7e4a312 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -68,10 +68,6 @@ recovery_new(const char *wal_dirname, bool force_recovery,
void
recovery_delete(struct recovery *r);
-/* to be called at exit */
-void
-recovery_exit(struct recovery *r);
-
void
recovery_follow_local(struct recovery *r, struct xstream *stream,
const char *name, ev_tstamp wal_dir_rescan_delay);
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [RFC PATCH 02/12] recovery: constify vclock argument
2018-06-06 17:45 [RFC PATCH 00/12] Replica rejoin Vladimir Davydov
2018-06-06 17:45 ` [RFC PATCH 01/12] recovery: drop unused recovery_exit Vladimir Davydov
@ 2018-06-06 17:45 ` Vladimir Davydov
2018-06-08 4:14 ` Konstantin Osipov
2018-06-06 17:45 ` [RFC PATCH 03/12] applier: remove extra new line in log message printed on connect Vladimir Davydov
` (9 subsequent siblings)
11 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-06-06 17:45 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
Neither recovery_new() nor recover_remaining_wals() need to modify it.
---
src/box/recovery.cc | 6 +++---
src/box/recovery.h | 4 ++--
2 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index a5e0ad34..cf348d29 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -102,7 +102,7 @@ struct XlogGapError: public XlogError {
*/
struct recovery *
recovery_new(const char *wal_dirname, bool force_recovery,
- struct vclock *vclock)
+ const struct vclock *vclock)
{
struct recovery *r = (struct recovery *)
calloc(1, sizeof(*r));
@@ -178,7 +178,7 @@ recovery_delete(struct recovery *r)
*/
static void
recover_xlog(struct recovery *r, struct xstream *stream,
- struct vclock *stop_vclock)
+ const struct vclock *stop_vclock)
{
struct xrow_header row;
uint64_t row_count = 0;
@@ -238,7 +238,7 @@ recover_xlog(struct recovery *r, struct xstream *stream,
*/
void
recover_remaining_wals(struct recovery *r, struct xstream *stream,
- struct vclock *stop_vclock, bool scan_dir)
+ const struct vclock *stop_vclock, bool scan_dir)
{
struct vclock *clock;
diff --git a/src/box/recovery.h b/src/box/recovery.h
index b7e4a312..3a950e47 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -63,7 +63,7 @@ struct recovery {
struct recovery *
recovery_new(const char *wal_dirname, bool force_recovery,
- struct vclock *vclock);
+ const struct vclock *vclock);
void
recovery_delete(struct recovery *r);
@@ -94,6 +94,6 @@ recovery_finalize(struct recovery *r, struct xstream *stream);
*/
void
recover_remaining_wals(struct recovery *r, struct xstream *stream,
- struct vclock *stop_vclock, bool scan_dir);
+ const struct vclock *stop_vclock, bool scan_dir);
#endif /* TARANTOOL_RECOVERY_H_INCLUDED */
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [RFC PATCH 03/12] applier: remove extra new line in log message printed on connect
2018-06-06 17:45 [RFC PATCH 00/12] Replica rejoin Vladimir Davydov
2018-06-06 17:45 ` [RFC PATCH 01/12] recovery: drop unused recovery_exit Vladimir Davydov
2018-06-06 17:45 ` [RFC PATCH 02/12] recovery: constify vclock argument Vladimir Davydov
@ 2018-06-06 17:45 ` Vladimir Davydov
2018-06-08 4:15 ` Konstantin Osipov
2018-06-06 17:45 ` [RFC PATCH 04/12] xrow: add helper function for encoding vclock Vladimir Davydov
` (8 subsequent siblings)
11 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-06-06 17:45 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
An extra new line looks ugly in the log:
2018-06-06 15:22:22.682 [9807] main/101/interactive C> Tarantool 1.10.1-58-gd2272132
2018-06-06 15:22:22.682 [9807] main/101/interactive C> log level 5
2018-06-06 15:22:22.682 [9807] main/101/interactive I> mapping 268435456 bytes for memtx tuple arena...
2018-06-06 15:22:22.683 [9807] main/101/interactive I> mapping 134217728 bytes for vinyl tuple arena...
2018-06-06 15:22:22.692 [9807] main/101/interactive I> recovery start
2018-06-06 15:22:22.692 [9807] main/101/interactive I> recovering from `./00000000000000000006.snap'
2018-06-06 15:22:22.721 [9807] main/106/applier/ I> remote master is 1.10.1 at 0.0.0.0:44441
2018-06-06 15:22:22.723 [9807] main/106/applier/ C> leaving orphan mode
2018-06-06 15:22:22.723 [9807] main/101/interactive C> replica set sync complete, quorum of 1 replicas formed
2018-06-06 15:22:22.723 [9807] main/101/interactive I> ready to accept requests
---
src/box/applier.cc | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index d3ccc24b..556502bf 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -198,7 +198,7 @@ applier_connect(struct applier *applier)
}
if (applier->version_id != greeting.version_id) {
- say_info("remote master is %u.%u.%u at %s\r\n",
+ say_info("remote master is %u.%u.%u at %s",
version_id_major(greeting.version_id),
version_id_minor(greeting.version_id),
version_id_patch(greeting.version_id),
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [RFC PATCH 04/12] xrow: add helper function for encoding vclock
2018-06-06 17:45 [RFC PATCH 00/12] Replica rejoin Vladimir Davydov
` (2 preceding siblings ...)
2018-06-06 17:45 ` [RFC PATCH 03/12] applier: remove extra new line in log message printed on connect Vladimir Davydov
@ 2018-06-06 17:45 ` Vladimir Davydov
2018-06-08 4:16 ` Konstantin Osipov
2018-06-06 17:45 ` [RFC PATCH 05/12] box: retrieve instance uuid before starting local recovery Vladimir Davydov
` (7 subsequent siblings)
11 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-06-06 17:45 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
So as not to duplicate the same code over and over again.
---
src/box/xrow.c | 58 +++++++++++++++++++++++++++-------------------------------
1 file changed, 27 insertions(+), 31 deletions(-)
diff --git a/src/box/xrow.c b/src/box/xrow.c
index b3f81a86..532e1296 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -43,6 +43,27 @@
#include "scramble.h"
#include "iproto_constants.h"
+static inline uint32_t
+mp_sizeof_vclock(const struct vclock *vclock)
+{
+ uint32_t size = vclock_size(vclock);
+ return mp_sizeof_map(size) + size * (mp_sizeof_uint(UINT32_MAX) +
+ mp_sizeof_uint(UINT64_MAX));
+}
+
+static inline char *
+mp_encode_vclock(char *data, const struct vclock *vclock)
+{
+ data = mp_encode_map(data, vclock_size(vclock));
+ struct vclock_iterator it;
+ vclock_iterator_init(&it, vclock);
+ vclock_foreach(&it, replica) {
+ data = mp_encode_uint(data, replica.id);
+ data = mp_encode_uint(data, replica.lsn);
+ }
+ return data;
+}
+
int
xrow_header_decode(struct xrow_header *header, const char **pos,
const char *end)
@@ -265,11 +286,8 @@ iproto_reply_request_vote(struct obuf *out, uint64_t sync,
uint32_t schema_version, const struct vclock *vclock,
bool read_only)
{
- uint32_t replicaset_size = vclock_size(vclock);
size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(2) +
- mp_sizeof_uint(UINT32_MAX) + mp_sizeof_map(replicaset_size) +
- replicaset_size * (mp_sizeof_uint(UINT32_MAX) +
- mp_sizeof_uint(UINT64_MAX)) +
+ mp_sizeof_uint(UINT32_MAX) + mp_sizeof_vclock(vclock) +
mp_sizeof_uint(UINT32_MAX) + mp_sizeof_bool(true);
char *buf = obuf_reserve(out, max_size);
@@ -284,13 +302,7 @@ iproto_reply_request_vote(struct obuf *out, uint64_t sync,
data = mp_encode_uint(data, IPROTO_SERVER_IS_RO);
data = mp_encode_bool(data, read_only);
data = mp_encode_uint(data, IPROTO_VCLOCK);
- data = mp_encode_map(data, replicaset_size);
- struct vclock_iterator it;
- vclock_iterator_init(&it, vclock);
- vclock_foreach(&it, replica) {
- data = mp_encode_uint(data, replica.id);
- data = mp_encode_uint(data, replica.lsn);
- }
+ data = mp_encode_vclock(data, vclock);
size_t size = data - buf;
assert(size <= max_size);
@@ -806,9 +818,7 @@ xrow_encode_subscribe(struct xrow_header *row,
const struct vclock *vclock)
{
memset(row, 0, sizeof(*row));
- uint32_t replicaset_size = vclock_size(vclock);
- size_t size = XROW_BODY_LEN_MAX + replicaset_size *
- (mp_sizeof_uint(UINT32_MAX) + mp_sizeof_uint(UINT64_MAX));
+ size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
char *buf = (char *) region_alloc(&fiber()->gc, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "region_alloc", "buf");
@@ -821,13 +831,7 @@ xrow_encode_subscribe(struct xrow_header *row,
data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
data = xrow_encode_uuid(data, instance_uuid);
data = mp_encode_uint(data, IPROTO_VCLOCK);
- data = mp_encode_map(data, replicaset_size);
- struct vclock_iterator it;
- vclock_iterator_init(&it, vclock);
- vclock_foreach(&it, replica) {
- data = mp_encode_uint(data, replica.id);
- data = mp_encode_uint(data, replica.lsn);
- }
+ data = mp_encode_vclock(data, vclock);
data = mp_encode_uint(data, IPROTO_SERVER_VERSION);
data = mp_encode_uint(data, tarantool_version_id());
assert(data <= buf + size);
@@ -971,9 +975,7 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock)
memset(row, 0, sizeof(*row));
/* Add vclock to response body */
- uint32_t replicaset_size = vclock_size(vclock);
- size_t size = 8 + replicaset_size *
- (mp_sizeof_uint(UINT32_MAX) + mp_sizeof_uint(UINT64_MAX));
+ size_t size = 8 + mp_sizeof_vclock(vclock);
char *buf = (char *) region_alloc(&fiber()->gc, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "region_alloc", "buf");
@@ -982,13 +984,7 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock)
char *data = buf;
data = mp_encode_map(data, 1);
data = mp_encode_uint(data, IPROTO_VCLOCK);
- data = mp_encode_map(data, replicaset_size);
- struct vclock_iterator it;
- vclock_iterator_init(&it, vclock);
- vclock_foreach(&it, replica) {
- data = mp_encode_uint(data, replica.id);
- data = mp_encode_uint(data, replica.lsn);
- }
+ data = mp_encode_vclock(data, vclock);
assert(data <= buf + size);
row->body[0].iov_base = buf;
row->body[0].iov_len = (data - buf);
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [RFC PATCH 05/12] box: retrieve instance uuid before starting local recovery
2018-06-06 17:45 [RFC PATCH 00/12] Replica rejoin Vladimir Davydov
` (3 preceding siblings ...)
2018-06-06 17:45 ` [RFC PATCH 04/12] xrow: add helper function for encoding vclock Vladimir Davydov
@ 2018-06-06 17:45 ` Vladimir Davydov
2018-06-08 4:22 ` Konstantin Osipov
2018-06-06 17:45 ` [RFC PATCH 06/12] box: refactor hot standby recovery Vladimir Davydov
` (6 subsequent siblings)
11 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-06-06 17:45 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
In order to find out if the current instance fell too much behind its
peers in the cluster and so needs to be rebootstrapped, we need to
connect it to remote peers before proceeding to local recovery. The
problem is box.cfg.replication may have an entry corresponding to the
instance itself so before connecting we have to start listening to
incoming connections. Since an instance is supposed to sent its uuid in
the greeting message, we also have to initialize INSTANCE_UUID early,
before we start local recovery. So this patch makes memtx engine
constructor not only scan the snapshot directory, but also read the
header of the most recent snapshot to initialize INSTANCE_UUID.
Needed for #461
---
src/box/box.cc | 18 ++++++++++--------
src/box/memtx_engine.c | 11 ++++++++++-
2 files changed, 20 insertions(+), 9 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 61bfa117..e1bf3934 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1839,6 +1839,15 @@ box_cfg_xc(void)
}
bool is_bootstrap_leader = false;
if (last_checkpoint_lsn >= 0) {
+ /* Check instance UUID. */
+ assert(!tt_uuid_is_nil(&INSTANCE_UUID));
+ if (!tt_uuid_is_nil(&instance_uuid) &&
+ !tt_uuid_is_equal(&instance_uuid, &INSTANCE_UUID)) {
+ tnt_raise(ClientError, ER_INSTANCE_UUID_MISMATCH,
+ tt_uuid_str(&instance_uuid),
+ tt_uuid_str(&INSTANCE_UUID));
+ }
+
struct wal_stream wal_stream;
wal_stream_create(&wal_stream, cfg_geti64("rows_per_wal"));
@@ -1882,7 +1891,6 @@ box_cfg_xc(void)
cfg_getd("wal_dir_rescan_delay"));
title("hot_standby");
- assert(!tt_uuid_is_nil(&INSTANCE_UUID));
/*
* Leave hot standby mode, if any, only
* after acquiring the lock.
@@ -1902,13 +1910,7 @@ box_cfg_xc(void)
recovery_finalize(recovery, &wal_stream.base);
engine_end_recovery_xc();
- /* Check replica set and instance UUID. */
- if (!tt_uuid_is_nil(&instance_uuid) &&
- !tt_uuid_is_equal(&instance_uuid, &INSTANCE_UUID)) {
- tnt_raise(ClientError, ER_INSTANCE_UUID_MISMATCH,
- tt_uuid_str(&instance_uuid),
- tt_uuid_str(&INSTANCE_UUID));
- }
+ /* Check replica set UUID. */
if (!tt_uuid_is_nil(&replicaset_uuid) &&
!tt_uuid_is_equal(&replicaset_uuid, &REPLICASET_UUID)) {
tnt_raise(ClientError, ER_REPLICASET_UUID_MISMATCH,
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index fac84ce1..df0c6c28 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -164,7 +164,6 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx,
struct xlog_cursor cursor;
if (xlog_cursor_open(&cursor, filename) < 0)
return -1;
- INSTANCE_UUID = cursor.meta.instance_uuid;
int rc;
struct xrow_header row;
@@ -1001,6 +1000,16 @@ memtx_engine_new(const char *snap_dirname, bool force_recovery,
if (xdir_scan(&memtx->snap_dir) != 0)
goto fail;
+ int64_t snap_signature = xdir_last_vclock(&memtx->snap_dir, NULL);
+ if (snap_signature >= 0) {
+ struct xlog_cursor cursor;
+ if (xdir_open_cursor(&memtx->snap_dir,
+ snap_signature, &cursor) != 0)
+ goto fail;
+ INSTANCE_UUID = cursor.meta.instance_uuid;
+ xlog_cursor_close(&cursor, false);
+ }
+
stailq_create(&memtx->gc_queue);
memtx->gc_fiber = fiber_new("memtx.gc", memtx_engine_gc_f);
if (memtx->gc_fiber == NULL)
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [RFC PATCH 05/12] box: retrieve instance uuid before starting local recovery
2018-06-06 17:45 ` [RFC PATCH 05/12] box: retrieve instance uuid before starting local recovery Vladimir Davydov
@ 2018-06-08 4:22 ` Konstantin Osipov
0 siblings, 0 replies; 22+ messages in thread
From: Konstantin Osipov @ 2018-06-08 4:22 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [18/06/06 20:45]:
> In order to find out if the current instance fell too much behind its
> peers in the cluster and so needs to be rebootstrapped, we need to
> connect it to remote peers before proceeding to local recovery. The
> problem is box.cfg.replication may have an entry corresponding to the
> instance itself so before connecting we have to start listening to
> incoming connections. Since an instance is supposed to sent its uuid in
> the greeting message, we also have to initialize INSTANCE_UUID early,
> before we start local recovery. So this patch makes memtx engine
> constructor not only scan the snapshot directory, but also read the
> header of the most recent snapshot to initialize INSTANCE_UUID.
Please as usual add a similar comment to the code snippet you're
adding. One does not have to dig into history to understand the
code.
Otherwise LGTM.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* [RFC PATCH 06/12] box: refactor hot standby recovery
2018-06-06 17:45 [RFC PATCH 00/12] Replica rejoin Vladimir Davydov
` (4 preceding siblings ...)
2018-06-06 17:45 ` [RFC PATCH 05/12] box: retrieve instance uuid before starting local recovery Vladimir Davydov
@ 2018-06-06 17:45 ` Vladimir Davydov
2018-06-08 4:40 ` Konstantin Osipov
2018-06-06 17:45 ` [RFC PATCH 07/12] box: retrieve end vclock before starting local recovery Vladimir Davydov
` (5 subsequent siblings)
11 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-06-06 17:45 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
Currently, we start a hot standby fiber even if not in hot standby mode
(see recovery_follow_local). And we scan the wal directory twice - first
time in recovery_follow_local(), second time in recovery_finalize().
Let's factor out recover_remaining_wals() from those functions and call
it explicitly. And let's start call follow_local() and stop_local() only
if in hot standby mode.
Needed for #461
---
src/box/box.cc | 12 +++++++-----
src/box/recovery.cc | 12 ++----------
src/box/recovery.h | 2 +-
3 files changed, 10 insertions(+), 16 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index e1bf3934..9105ed19 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1887,16 +1887,17 @@ box_cfg_xc(void)
&last_checkpoint_vclock);
engine_begin_final_recovery_xc();
- recovery_follow_local(recovery, &wal_stream.base, "hot_standby",
- cfg_getd("wal_dir_rescan_delay"));
- title("hot_standby");
-
+ recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
/*
* Leave hot standby mode, if any, only
* after acquiring the lock.
*/
if (wal_dir_lock < 0) {
+ title("hot_standby");
say_info("Entering hot standby mode");
+ recovery_follow_local(recovery, &wal_stream.base,
+ "hot_standby",
+ cfg_getd("wal_dir_rescan_delay"));
while (true) {
if (path_lock(cfg_gets("wal_dir"),
&wal_dir_lock))
@@ -1905,9 +1906,10 @@ box_cfg_xc(void)
break;
fiber_sleep(0.1);
}
+ recovery_stop_local(recovery);
box_bind();
}
- recovery_finalize(recovery, &wal_stream.base);
+ recovery_finalize(recovery);
engine_end_recovery_xc();
/* Check replica set UUID. */
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index cf348d29..5ef1f979 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -313,12 +313,8 @@ recover_current_wal:
}
void
-recovery_finalize(struct recovery *r, struct xstream *stream)
+recovery_finalize(struct recovery *r)
{
- recovery_stop_local(r);
-
- recover_remaining_wals(r, stream, NULL, true);
-
recovery_close_log(r);
/*
@@ -490,6 +486,7 @@ hot_standby_f(va_list ap)
subscription.events = 0;
}
+ recover_remaining_wals(r, stream, NULL, true);
return 0;
}
@@ -498,11 +495,6 @@ recovery_follow_local(struct recovery *r, struct xstream *stream,
const char *name, ev_tstamp wal_dir_rescan_delay)
{
/*
- * Scan wal_dir and recover all existing at the moment xlogs.
- * Blocks until finished.
- */
- recover_remaining_wals(r, stream, NULL, true);
- /*
* Start 'hot_standby' background fiber to follow xlog changes.
* It will pick up from the position of the currently open
* xlog.
diff --git a/src/box/recovery.h b/src/box/recovery.h
index 3a950e47..6aba922b 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -76,7 +76,7 @@ void
recovery_stop_local(struct recovery *r);
void
-recovery_finalize(struct recovery *r, struct xstream *stream);
+recovery_finalize(struct recovery *r);
#if defined(__cplusplus)
} /* extern "C" */
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [RFC PATCH 06/12] box: refactor hot standby recovery
2018-06-06 17:45 ` [RFC PATCH 06/12] box: refactor hot standby recovery Vladimir Davydov
@ 2018-06-08 4:40 ` Konstantin Osipov
2018-06-08 6:43 ` Vladimir Davydov
0 siblings, 1 reply; 22+ messages in thread
From: Konstantin Osipov @ 2018-06-08 4:40 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [18/06/06 20:45]:
> Currently, we start a hot standby fiber even if not in hot standby mode
> (see recovery_follow_local). And we scan the wal directory twice - first
> time in recovery_follow_local(), second time in recovery_finalize().
> Let's factor out recover_remaining_wals() from those functions and call
> it explicitly. And let's start call follow_local() and stop_local() only
> if in hot standby mode.
In hot standby mode, we need to make sure we scan all the WALs at
least once after we acquired a path lock.
As far as I understand this patch, it removes this property.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [RFC PATCH 06/12] box: refactor hot standby recovery
2018-06-08 4:40 ` Konstantin Osipov
@ 2018-06-08 6:43 ` Vladimir Davydov
2018-06-08 13:15 ` Konstantin Osipov
0 siblings, 1 reply; 22+ messages in thread
From: Vladimir Davydov @ 2018-06-08 6:43 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches
On Fri, Jun 08, 2018 at 07:40:45AM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev@gmail.com> [18/06/06 20:45]:
> > Currently, we start a hot standby fiber even if not in hot standby mode
> > (see recovery_follow_local). And we scan the wal directory twice - first
> > time in recovery_follow_local(), second time in recovery_finalize().
> > Let's factor out recover_remaining_wals() from those functions and call
> > it explicitly. And let's start call follow_local() and stop_local() only
> > if in hot standby mode.
>
> In hot standby mode, we need to make sure we scan all the WALs at
> least once after we acquired a path lock.
>
> As far as I understand this patch, it removes this property.
No, it doesn't. See, even though it removes recover_remaining_wals()
from recovery_finalize(), it makes hot_standby_f() call it before
returning, so that we will rescan the xlog dir just before leaving
hot standby mode:
> @@ -490,6 +486,7 @@ hot_standby_f(va_list ap)
>
> subscription.events = 0;
> }
> + recover_remaining_wals(r, stream, NULL, true);
> return 0;
> }
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [RFC PATCH 06/12] box: refactor hot standby recovery
2018-06-08 6:43 ` Vladimir Davydov
@ 2018-06-08 13:15 ` Konstantin Osipov
2018-06-08 13:30 ` Vladimir Davydov
0 siblings, 1 reply; 22+ messages in thread
From: Konstantin Osipov @ 2018-06-08 13:15 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [18/06/08 11:28]:
> No, it doesn't. See, even though it removes recover_remaining_wals()
> from recovery_finalize(), it makes hot_standby_f() call it before
> returning, so that we will rescan the xlog dir just before leaving
> hot standby mode:
This is quite tricky. And I don't see why you need to do it this
way - why not add recover_remaining_wals to box_init instead,
where it will be more visible.
> > @@ -490,6 +486,7 @@ hot_standby_f(va_list ap)
> >
> > subscription.events = 0;
> > }
> > + recover_remaining_wals(r, stream, NULL, true);
> > return 0;
> > }
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* Re: [RFC PATCH 06/12] box: refactor hot standby recovery
2018-06-08 13:15 ` Konstantin Osipov
@ 2018-06-08 13:30 ` Vladimir Davydov
0 siblings, 0 replies; 22+ messages in thread
From: Vladimir Davydov @ 2018-06-08 13:30 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches
On Fri, Jun 08, 2018 at 04:15:24PM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev@gmail.com> [18/06/08 11:28]:
> > No, it doesn't. See, even though it removes recover_remaining_wals()
> > from recovery_finalize(), it makes hot_standby_f() call it before
> > returning, so that we will rescan the xlog dir just before leaving
> > hot standby mode:
>
> This is quite tricky. And I don't see why you need to do it this
> way - why not add recover_remaining_wals to box_init instead,
> where it will be more visible.
Hmm, I guess you're right. I'll move the invocation of
recover_remaining_wals from hot_standby_f to be called right after
recovery_stop_local.
Just so you understand why I'm doing this: if this patch set is applied,
we will init replicaset.vclock before reading snapshot for normal
recovery while in case of hot standby we will also have to promote it
after recovery_stop_local. I want to separate those two paths so that we
don't set replicaset.vclock twice in case of normal recovery.
>
> > > @@ -490,6 +486,7 @@ hot_standby_f(va_list ap)
> > >
> > > subscription.events = 0;
> > > }
> > > + recover_remaining_wals(r, stream, NULL, true);
> > > return 0;
^ permalink raw reply [flat|nested] 22+ messages in thread
* [RFC PATCH 07/12] box: retrieve end vclock before starting local recovery
2018-06-06 17:45 [RFC PATCH 00/12] Replica rejoin Vladimir Davydov
` (5 preceding siblings ...)
2018-06-06 17:45 ` [RFC PATCH 06/12] box: refactor hot standby recovery Vladimir Davydov
@ 2018-06-06 17:45 ` Vladimir Davydov
2018-06-06 17:45 ` [RFC PATCH 08/12] box: open the port " Vladimir Davydov
` (4 subsequent siblings)
11 siblings, 0 replies; 22+ messages in thread
From: Vladimir Davydov @ 2018-06-06 17:45 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
In order to find out if the current instance fell too much behind its
peers in the cluster and so needs to be rebootstrapped, we need to know
its vclock before we start local recovery. To do that, let's scan the
most recent xlog. In future, we can optimize that by either storing end
vclock in xlog eof marker or by making a new xlog on server stop.
Needed for #461
---
src/box/box.cc | 20 +++++++++++++-------
src/box/recovery.cc | 23 +++++++++++++++++++++++
src/box/recovery.h | 3 +++
3 files changed, 39 insertions(+), 7 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 9105ed19..b072f788 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1858,6 +1858,14 @@ box_cfg_xc(void)
auto guard = make_scoped_guard([=]{ recovery_delete(recovery); });
/*
+ * Initialize the replica set vclock from recovery.
+ * The local WAL may contain rows from remote masters,
+ * so we must reflect this in replicaset vclock to
+ * not attempt to apply these rows twice.
+ */
+ recovery_end_vclock(recovery, &replicaset.vclock);
+
+ /*
* recovery->vclock is needed by Vinyl to filter
* WAL rows that were dumped before restart.
*
@@ -1907,6 +1915,11 @@ box_cfg_xc(void)
fiber_sleep(0.1);
}
recovery_stop_local(recovery);
+ /*
+ * Advance replica set vclock to reflect records
+ * applied in hot standby mode.
+ */
+ vclock_copy(&replicaset.vclock, &recovery->vclock);
box_bind();
}
recovery_finalize(recovery);
@@ -1922,13 +1935,6 @@ box_cfg_xc(void)
/* Clear the pointer to journal before it goes out of scope */
journal_set(NULL);
- /*
- * Initialize the replica set vclock from recovery.
- * The local WAL may contain rows from remote masters,
- * so we must reflect this in replicaset vclock to
- * not attempt to apply these rows twice.
- */
- vclock_copy(&replicaset.vclock, &recovery->vclock);
/** Begin listening only when the local recovery is complete. */
box_listen();
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index 5ef1f979..8bf081d6 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -137,6 +137,29 @@ recovery_new(const char *wal_dirname, bool force_recovery,
return r;
}
+void
+recovery_end_vclock(struct recovery *r, struct vclock *end_vclock)
+{
+ xdir_scan_xc(&r->wal_dir);
+
+ struct vclock *vclock = vclockset_last(&r->wal_dir.index);
+ if (vclock == NULL || vclock_compare(vclock, &r->vclock) < 0) {
+ /* No xlogs after last checkpoint. */
+ vclock_copy(end_vclock, &r->vclock);
+ return;
+ }
+
+ /* Scan the last xlog to find end vclock. */
+ vclock_copy(end_vclock, vclock);
+ struct xlog_cursor cursor;
+ if (xdir_open_cursor(&r->wal_dir, vclock_sum(vclock), &cursor) != 0)
+ return;
+ struct xrow_header row;
+ while (xlog_cursor_next(&cursor, &row, true) == 0)
+ vclock_follow(end_vclock, row.replica_id, row.lsn);
+ xlog_cursor_close(&cursor, false);
+}
+
static inline void
recovery_close_log(struct recovery *r)
{
diff --git a/src/box/recovery.h b/src/box/recovery.h
index 6aba922b..1ae6f2c3 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -69,6 +69,9 @@ void
recovery_delete(struct recovery *r);
void
+recovery_end_vclock(struct recovery *r, struct vclock *end_vclock);
+
+void
recovery_follow_local(struct recovery *r, struct xstream *stream,
const char *name, ev_tstamp wal_dir_rescan_delay);
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [RFC PATCH 08/12] box: open the port before starting local recovery
2018-06-06 17:45 [RFC PATCH 00/12] Replica rejoin Vladimir Davydov
` (6 preceding siblings ...)
2018-06-06 17:45 ` [RFC PATCH 07/12] box: retrieve end vclock before starting local recovery Vladimir Davydov
@ 2018-06-06 17:45 ` Vladimir Davydov
2018-06-06 17:45 ` [RFC PATCH 09/12] box: connect to remote peers " Vladimir Davydov
` (3 subsequent siblings)
11 siblings, 0 replies; 22+ messages in thread
From: Vladimir Davydov @ 2018-06-06 17:45 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
In order to find out if the current instance fell too much behind its
peers in the cluster and so needs to be re-bootstrapped we need to
connect it to remote peers before proceeding to local recovery. The
problem is box.cfg.replication may have an entry corresponding to the
instance itself so before connecting we have to start listening to
incoming connections. So this patch moves the call to box_listen()
before recoery is started unless the instance in hot standby mode.
It also folds box_bind() into box_listen() as it is no longer needed
as a separate function.
Needed for #461
---
src/box/box.cc | 25 ++++++-------------------
src/box/box.h | 1 -
src/box/iproto.cc | 23 ++++++-----------------
src/box/iproto.h | 5 +----
src/box/lua/cfg.cc | 1 -
5 files changed, 13 insertions(+), 42 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index b072f788..fce5ebb1 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -681,17 +681,11 @@ box_set_replication_skip_conflict(void)
}
void
-box_bind(void)
+box_listen(void)
{
const char *uri = cfg_gets("listen");
box_check_uri(uri, "listen");
- iproto_bind(uri);
-}
-
-void
-box_listen(void)
-{
- iproto_listen();
+ iproto_listen(uri);
}
void
@@ -1829,13 +1823,6 @@ box_cfg_xc(void)
*/
if (!cfg_geti("hot_standby") || last_checkpoint_lsn < 0)
tnt_raise(ClientError, ER_ALREADY_RUNNING, cfg_gets("wal_dir"));
- } else {
- /*
- * Try to bind the port before recovery, to fail
- * early if the port is busy. In hot standby mode,
- * the port is most likely busy.
- */
- box_bind();
}
bool is_bootstrap_leader = false;
if (last_checkpoint_lsn >= 0) {
@@ -1865,6 +1852,9 @@ box_cfg_xc(void)
*/
recovery_end_vclock(recovery, &replicaset.vclock);
+ if (wal_dir_lock >= 0)
+ box_listen();
+
/*
* recovery->vclock is needed by Vinyl to filter
* WAL rows that were dumped before restart.
@@ -1920,7 +1910,7 @@ box_cfg_xc(void)
* applied in hot standby mode.
*/
vclock_copy(&replicaset.vclock, &recovery->vclock);
- box_bind();
+ box_listen();
}
recovery_finalize(recovery);
engine_end_recovery_xc();
@@ -1936,9 +1926,6 @@ box_cfg_xc(void)
/* Clear the pointer to journal before it goes out of scope */
journal_set(NULL);
- /** Begin listening only when the local recovery is complete. */
- box_listen();
-
title("orphan");
/* Wait for the cluster to start up */
diff --git a/src/box/box.h b/src/box/box.h
index d3967891..182e1b72 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -170,7 +170,6 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header);
void
box_check_config();
-void box_bind(void);
void box_listen(void);
void box_set_replication(void);
void box_set_log_level(void);
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 76844555..c6b13934 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1763,7 +1763,6 @@ iproto_init()
/** Available IProto configuration changes. */
enum iproto_cfg_op {
- IPROTO_CFG_BIND,
IPROTO_CFG_MSG_MAX,
IPROTO_CFG_LISTEN
};
@@ -1801,12 +1800,6 @@ iproto_do_cfg_f(struct cbus_call_msg *m)
int old;
try {
switch (cfg_msg->op) {
- case IPROTO_CFG_BIND:
- if (evio_service_is_active(&binary))
- evio_service_stop(&binary);
- if (cfg_msg->uri != NULL)
- evio_service_bind(&binary, cfg_msg->uri);
- break;
case IPROTO_CFG_MSG_MAX:
cpipe_set_max_input(&tx_pipe,
cfg_msg->iproto_msg_max / 2);
@@ -1817,7 +1810,11 @@ iproto_do_cfg_f(struct cbus_call_msg *m)
break;
case IPROTO_CFG_LISTEN:
if (evio_service_is_active(&binary))
+ evio_service_stop(&binary);
+ if (cfg_msg->uri != NULL) {
+ evio_service_bind(&binary, cfg_msg->uri);
evio_service_listen(&binary);
+ }
break;
default:
unreachable();
@@ -1837,19 +1834,11 @@ iproto_do_cfg(struct iproto_cfg_msg *msg)
}
void
-iproto_bind(const char *uri)
-{
- struct iproto_cfg_msg cfg_msg;
- iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_BIND);
- cfg_msg.uri = uri;
- iproto_do_cfg(&cfg_msg);
-}
-
-void
-iproto_listen()
+iproto_listen(const char *uri)
{
struct iproto_cfg_msg cfg_msg;
iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_LISTEN);
+ cfg_msg.uri = uri;
iproto_do_cfg(&cfg_msg);
}
diff --git a/src/box/iproto.h b/src/box/iproto.h
index b6591469..b9a6cf8f 100644
--- a/src/box/iproto.h
+++ b/src/box/iproto.h
@@ -75,10 +75,7 @@ void
iproto_init();
void
-iproto_bind(const char *uri);
-
-void
-iproto_listen();
+iproto_listen(const char *uri);
void
iproto_set_msg_max(int iproto_msg_max);
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 5afebc94..0f6b8a5a 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -69,7 +69,6 @@ static int
lbox_cfg_set_listen(struct lua_State *L)
{
try {
- box_bind();
box_listen();
} catch (Exception *) {
luaT_error(L);
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [RFC PATCH 09/12] box: connect to remote peers before starting local recovery
2018-06-06 17:45 [RFC PATCH 00/12] Replica rejoin Vladimir Davydov
` (7 preceding siblings ...)
2018-06-06 17:45 ` [RFC PATCH 08/12] box: open the port " Vladimir Davydov
@ 2018-06-06 17:45 ` Vladimir Davydov
2018-06-06 17:45 ` [RFC PATCH 10/12] box: factor out local recovery function Vladimir Davydov
` (2 subsequent siblings)
11 siblings, 0 replies; 22+ messages in thread
From: Vladimir Davydov @ 2018-06-06 17:45 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
box_sync_replication() can now be called before recovery, right after
box_listen(). This is a step toward detecting if the instance fell too
much behind its peers in the cluster and so needs to be rebootstrapped.
Needed for #461
---
src/box/box.cc | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index fce5ebb1..2ff9fb5f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1852,8 +1852,10 @@ box_cfg_xc(void)
*/
recovery_end_vclock(recovery, &replicaset.vclock);
- if (wal_dir_lock >= 0)
+ if (wal_dir_lock >= 0) {
box_listen();
+ box_sync_replication(replication_connect_timeout, false);
+ }
/*
* recovery->vclock is needed by Vinyl to filter
@@ -1911,6 +1913,7 @@ box_cfg_xc(void)
*/
vclock_copy(&replicaset.vclock, &recovery->vclock);
box_listen();
+ box_sync_replication(replication_connect_timeout, false);
}
recovery_finalize(recovery);
engine_end_recovery_xc();
@@ -1925,11 +1928,6 @@ box_cfg_xc(void)
/* Clear the pointer to journal before it goes out of scope */
journal_set(NULL);
-
- title("orphan");
-
- /* Wait for the cluster to start up */
- box_sync_replication(replication_connect_timeout, false);
} else {
if (!tt_uuid_is_nil(&instance_uuid))
INSTANCE_UUID = instance_uuid;
@@ -1941,8 +1939,6 @@ box_cfg_xc(void)
*/
box_listen();
- title("orphan");
-
/*
* Wait for the cluster to start up.
*
@@ -1978,6 +1974,8 @@ box_cfg_xc(void)
rmean_cleanup(rmean_box);
+ title("orphan");
+
/*
* If this instance is a leader of a newly bootstrapped
* cluster, it is uptodate by definition so leave the
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [RFC PATCH 10/12] box: factor out local recovery function
2018-06-06 17:45 [RFC PATCH 00/12] Replica rejoin Vladimir Davydov
` (8 preceding siblings ...)
2018-06-06 17:45 ` [RFC PATCH 09/12] box: connect to remote peers " Vladimir Davydov
@ 2018-06-06 17:45 ` Vladimir Davydov
2018-06-06 17:45 ` [RFC PATCH 11/12] applier: inquire oldest vclock on connect Vladimir Davydov
2018-06-06 17:45 ` [RFC PATCH 12/12] replication: rebootstrap instance on startup if it fell behind Vladimir Davydov
11 siblings, 0 replies; 22+ messages in thread
From: Vladimir Davydov @ 2018-06-06 17:45 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
- Factor out local_recovery() from box_cfg_xc(). Make it setup
replication and handle local recovery and hot standby cases.
- Move replication setup in case of initial bootstrap from box_cfg_xc()
to bootstrap() to make bootstrap() consistent with local_recovery().
- Move initial snapshot creation from bootstrap() to bootsrap_master()
and bootstrap_from_master().
Needed for #461
---
src/box/box.cc | 277 +++++++++++++++++++++++++++++++--------------------------
1 file changed, 150 insertions(+), 127 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 2ff9fb5f..9b2c2e2a 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1644,6 +1644,11 @@ bootstrap_master(const struct tt_uuid *replicaset_uuid)
/* Set UUID of a new replica set */
box_set_replicaset_uuid(replicaset_uuid);
+
+ /* Make the initial checkpoint */
+ if (engine_begin_checkpoint() ||
+ engine_commit_checkpoint(&replicaset.vclock))
+ panic("failed to create a checkpoint");
}
/**
@@ -1698,6 +1703,11 @@ bootstrap_from_master(struct replica *master)
/* Switch applier to initial state */
applier_resume_to_state(applier, APPLIER_READY, TIMEOUT_INFINITY);
assert(applier->state == APPLIER_READY);
+
+ /* Make the initial checkpoint */
+ if (engine_begin_checkpoint() ||
+ engine_commit_checkpoint(&replicaset.vclock))
+ panic("failed to create a checkpoint");
}
/**
@@ -1708,8 +1718,31 @@ bootstrap_from_master(struct replica *master)
* the leader of a new cluster
*/
static void
-bootstrap(const struct tt_uuid *replicaset_uuid, bool *is_bootstrap_leader)
-{
+bootstrap(const struct tt_uuid *instance_uuid,
+ const struct tt_uuid *replicaset_uuid,
+ bool *is_bootstrap_leader)
+{
+ /* Initialize instance UUID. */
+ assert(tt_uuid_is_nil(&INSTANCE_UUID));
+ if (!tt_uuid_is_nil(instance_uuid))
+ INSTANCE_UUID = *instance_uuid;
+ else
+ tt_uuid_create(&INSTANCE_UUID);
+ /*
+ * Begin listening on the socket to enable
+ * master-master replication leader election.
+ */
+ box_listen();
+ /*
+ * Wait for the cluster to start up.
+ *
+ * Note, when bootstrapping a new instance, we have to
+ * connect to all masters to make sure all replicas
+ * receive the same replica set UUID when a new cluster
+ * is deployed.
+ */
+ box_sync_replication(TIMEOUT_INFINITY, true);
+
/* Use the first replica by URI as a bootstrap leader */
struct replica *master = replicaset_leader();
assert(master == NULL || master->applier != NULL);
@@ -1727,9 +1760,116 @@ bootstrap(const struct tt_uuid *replicaset_uuid, bool *is_bootstrap_leader)
bootstrap_master(replicaset_uuid);
*is_bootstrap_leader = true;
}
- if (engine_begin_checkpoint() ||
- engine_commit_checkpoint(&replicaset.vclock))
- panic("failed to create a checkpoint");
+}
+
+/**
+ * Recover the instance from the local directory.
+ * Enter hot standby if the directory is locked.
+ */
+static void
+local_recovery(const struct tt_uuid *instance_uuid,
+ const struct tt_uuid *replicaset_uuid,
+ const struct vclock *checkpoint_vclock)
+{
+ /* Check instance UUID. */
+ assert(!tt_uuid_is_nil(&INSTANCE_UUID));
+ if (!tt_uuid_is_nil(instance_uuid) &&
+ !tt_uuid_is_equal(instance_uuid, &INSTANCE_UUID)) {
+ tnt_raise(ClientError, ER_INSTANCE_UUID_MISMATCH,
+ tt_uuid_str(instance_uuid),
+ tt_uuid_str(&INSTANCE_UUID));
+ }
+
+ struct wal_stream wal_stream;
+ wal_stream_create(&wal_stream, cfg_geti64("rows_per_wal"));
+
+ struct recovery *recovery;
+ recovery = recovery_new(cfg_gets("wal_dir"),
+ cfg_geti("force_recovery"),
+ checkpoint_vclock);
+ auto guard = make_scoped_guard([=]{ recovery_delete(recovery); });
+
+ /*
+ * Initialize the replica set vclock from recovery.
+ * The local WAL may contain rows from remote masters,
+ * so we must reflect this in replicaset vclock to
+ * not attempt to apply these rows twice.
+ */
+ recovery_end_vclock(recovery, &replicaset.vclock);
+
+ if (wal_dir_lock >= 0) {
+ box_listen();
+ box_sync_replication(replication_connect_timeout, false);
+ }
+
+ /*
+ * recovery->vclock is needed by Vinyl to filter
+ * WAL rows that were dumped before restart.
+ *
+ * XXX: Passing an internal member of the recovery
+ * object to an engine is an ugly hack. Instead we
+ * should introduce space_vtab::apply_wal_row method
+ * and explicitly pass the statement LSN to it.
+ */
+ engine_begin_initial_recovery_xc(&recovery->vclock);
+
+ struct memtx_engine *memtx;
+ memtx = (struct memtx_engine *)engine_by_name("memtx");
+ assert(memtx != NULL);
+
+ struct recovery_journal journal;
+ recovery_journal_create(&journal, &recovery->vclock);
+ journal_set(&journal.base);
+
+ /*
+ * We explicitly request memtx to recover its
+ * snapshot as a separate phase since it contains
+ * data for system spaces, and triggers on
+ * recovery of system spaces issue DDL events in
+ * other engines.
+ */
+ memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
+
+ engine_begin_final_recovery_xc();
+ recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
+ /*
+ * Leave hot standby mode, if any, only after
+ * acquiring the lock.
+ */
+ if (wal_dir_lock < 0) {
+ title("hot_standby");
+ say_info("Entering hot standby mode");
+ recovery_follow_local(recovery, &wal_stream.base, "hot_standby",
+ cfg_getd("wal_dir_rescan_delay"));
+ while (true) {
+ if (path_lock(cfg_gets("wal_dir"), &wal_dir_lock))
+ diag_raise();
+ if (wal_dir_lock >= 0)
+ break;
+ fiber_sleep(0.1);
+ }
+ recovery_stop_local(recovery);
+ /*
+ * Advance replica set vclock to reflect records
+ * applied in hot standby mode.
+ */
+ vclock_copy(&replicaset.vclock, &recovery->vclock);
+ box_listen();
+ box_sync_replication(replication_connect_timeout, false);
+ }
+ recovery_finalize(recovery);
+ engine_end_recovery_xc();
+
+ /* Check replica set UUID. */
+ if (!tt_uuid_is_nil(replicaset_uuid) &&
+ !tt_uuid_is_equal(replicaset_uuid, &REPLICASET_UUID)) {
+ tnt_raise(ClientError, ER_REPLICASET_UUID_MISMATCH,
+ tt_uuid_str(replicaset_uuid),
+ tt_uuid_str(&REPLICASET_UUID));
+ }
+
+ /* Clear the pointer to journal before it goes out of scope */
+ journal_set(NULL);
}
static void
@@ -1826,130 +1966,13 @@ box_cfg_xc(void)
}
bool is_bootstrap_leader = false;
if (last_checkpoint_lsn >= 0) {
- /* Check instance UUID. */
- assert(!tt_uuid_is_nil(&INSTANCE_UUID));
- if (!tt_uuid_is_nil(&instance_uuid) &&
- !tt_uuid_is_equal(&instance_uuid, &INSTANCE_UUID)) {
- tnt_raise(ClientError, ER_INSTANCE_UUID_MISMATCH,
- tt_uuid_str(&instance_uuid),
- tt_uuid_str(&INSTANCE_UUID));
- }
-
- struct wal_stream wal_stream;
- wal_stream_create(&wal_stream, cfg_geti64("rows_per_wal"));
-
- struct recovery *recovery;
- recovery = recovery_new(cfg_gets("wal_dir"),
- cfg_geti("force_recovery"),
- &last_checkpoint_vclock);
- auto guard = make_scoped_guard([=]{ recovery_delete(recovery); });
-
- /*
- * Initialize the replica set vclock from recovery.
- * The local WAL may contain rows from remote masters,
- * so we must reflect this in replicaset vclock to
- * not attempt to apply these rows twice.
- */
- recovery_end_vclock(recovery, &replicaset.vclock);
-
- if (wal_dir_lock >= 0) {
- box_listen();
- box_sync_replication(replication_connect_timeout, false);
- }
-
- /*
- * recovery->vclock is needed by Vinyl to filter
- * WAL rows that were dumped before restart.
- *
- * XXX: Passing an internal member of the recovery
- * object to an engine is an ugly hack. Instead we
- * should introduce Engine::applyWALRow method and
- * explicitly pass the statement LSN to it.
- */
- engine_begin_initial_recovery_xc(&recovery->vclock);
-
- struct memtx_engine *memtx;
- memtx = (struct memtx_engine *)engine_by_name("memtx");
- assert(memtx != NULL);
-
- struct recovery_journal journal;
- recovery_journal_create(&journal, &recovery->vclock);
- journal_set(&journal.base);
-
- /**
- * We explicitly request memtx to recover its
- * snapshot as a separate phase since it contains
- * data for system spaces, and triggers on
- * recovery of system spaces issue DDL events in
- * other engines.
- */
- memtx_engine_recover_snapshot_xc(memtx,
- &last_checkpoint_vclock);
-
- engine_begin_final_recovery_xc();
- recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
- /*
- * Leave hot standby mode, if any, only
- * after acquiring the lock.
- */
- if (wal_dir_lock < 0) {
- title("hot_standby");
- say_info("Entering hot standby mode");
- recovery_follow_local(recovery, &wal_stream.base,
- "hot_standby",
- cfg_getd("wal_dir_rescan_delay"));
- while (true) {
- if (path_lock(cfg_gets("wal_dir"),
- &wal_dir_lock))
- diag_raise();
- if (wal_dir_lock >= 0)
- break;
- fiber_sleep(0.1);
- }
- recovery_stop_local(recovery);
- /*
- * Advance replica set vclock to reflect records
- * applied in hot standby mode.
- */
- vclock_copy(&replicaset.vclock, &recovery->vclock);
- box_listen();
- box_sync_replication(replication_connect_timeout, false);
- }
- recovery_finalize(recovery);
- engine_end_recovery_xc();
-
- /* Check replica set UUID. */
- if (!tt_uuid_is_nil(&replicaset_uuid) &&
- !tt_uuid_is_equal(&replicaset_uuid, &REPLICASET_UUID)) {
- tnt_raise(ClientError, ER_REPLICASET_UUID_MISMATCH,
- tt_uuid_str(&replicaset_uuid),
- tt_uuid_str(&REPLICASET_UUID));
- }
-
- /* Clear the pointer to journal before it goes out of scope */
- journal_set(NULL);
+ /* Recover the instance from the local directory */
+ local_recovery(&instance_uuid, &replicaset_uuid,
+ &last_checkpoint_vclock);
} else {
- if (!tt_uuid_is_nil(&instance_uuid))
- INSTANCE_UUID = instance_uuid;
- else
- tt_uuid_create(&INSTANCE_UUID);
- /*
- * Begin listening on the socket to enable
- * master-master replication leader election.
- */
- box_listen();
-
- /*
- * Wait for the cluster to start up.
- *
- * Note, when bootstrapping a new instance, we have to
- * connect to all masters to make sure all replicas
- * receive the same replica set UUID when a new cluster
- * is deployed.
- */
- box_sync_replication(TIMEOUT_INFINITY, true);
/* Bootstrap a new master */
- bootstrap(&replicaset_uuid, &is_bootstrap_leader);
+ bootstrap(&instance_uuid, &replicaset_uuid,
+ &is_bootstrap_leader);
}
fiber_gc();
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [RFC PATCH 11/12] applier: inquire oldest vclock on connect
2018-06-06 17:45 [RFC PATCH 00/12] Replica rejoin Vladimir Davydov
` (9 preceding siblings ...)
2018-06-06 17:45 ` [RFC PATCH 10/12] box: factor out local recovery function Vladimir Davydov
@ 2018-06-06 17:45 ` Vladimir Davydov
2018-06-06 17:45 ` [RFC PATCH 12/12] replication: rebootstrap instance on startup if it fell behind Vladimir Davydov
11 siblings, 0 replies; 22+ messages in thread
From: Vladimir Davydov @ 2018-06-06 17:45 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
Introduce a new iproto command IPROTO_GET_GC_VCLOCK that returns the
vclock of the oldest checkpoint available at the master. Use this
command when applier is connected to set applier->gc_vclock. We will
need it to check whether a replica fell too much behind its peers in
the cluster and so needs to be rebootstrapped.
Needed for #461
---
src/box/applier.cc | 15 +++++++++++++++
src/box/applier.h | 2 ++
src/box/box.cc | 12 ++++++++++++
src/box/box.h | 3 +++
src/box/iproto.cc | 7 +++++++
src/box/iproto_constants.h | 2 ++
src/box/xrow.c | 36 ++++++++++++++++++++++++++++++++++++
src/box/xrow.h | 31 +++++++++++++++++++++++++++++++
8 files changed, 108 insertions(+)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 556502bf..8d750dc6 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -228,6 +228,21 @@ applier_connect(struct applier *applier)
&applier->remote_is_ro);
}
+ /**
+ * Tarantool >= 1.10.1: send an IPROTO_GET_GC_VCLOCK message
+ * to find out the oldest vclock available at the remove end.
+ * Needed to check if the replica has to be rebootstrapped.
+ */
+ if (applier->version_id >= version_id(1, 10, 1)) {
+ xrow_encode_get_gc_vclock(&row);
+ coio_write_xrow(coio, &row);
+ coio_read_xrow(coio, ibuf, &row);
+ if (row.type != IPROTO_OK)
+ xrow_decode_error_xc(&row);
+ vclock_create(&applier->gc_vclock);
+ xrow_decode_vclock_xc(&row, &applier->gc_vclock);
+ }
+
applier_set_state(applier, APPLIER_CONNECTED);
/* Detect connection to itself */
diff --git a/src/box/applier.h b/src/box/applier.h
index c33562cc..d0ae1ed1 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -96,6 +96,8 @@ struct applier {
uint32_t version_id;
/** Remote vclock at time of connect. */
struct vclock vclock;
+ /** Oldest vclock available at remote at time of connect. */
+ struct vclock gc_vclock;
/** Remote peer mode, true if read-only, default: false */
bool remote_is_ro;
/** Remote address */
diff --git a/src/box/box.cc b/src/box/box.cc
index 9b2c2e2a..c10124ea 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1540,6 +1540,18 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
replica_version_id);
}
+void
+box_get_gc_vclock(struct vclock *vclock)
+{
+ struct checkpoint_iterator it;
+ checkpoint_iterator_init(&it);
+ const struct vclock *oldest = checkpoint_iterator_next(&it);
+ if (oldest != NULL)
+ vclock_copy(vclock, oldest);
+ else
+ vclock_create(vclock);
+}
+
/** Insert a new cluster into _schema */
static void
box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
diff --git a/src/box/box.h b/src/box/box.h
index 182e1b72..10c54102 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -163,6 +163,9 @@ box_process_join(struct ev_io *io, struct xrow_header *header);
void
box_process_subscribe(struct ev_io *io, struct xrow_header *header);
+void
+box_get_gc_vclock(struct vclock *vclock);
+
/**
* Check Lua configuration before initialization or
* in case of a configuration change.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index c6b13934..fdb286ad 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1062,6 +1062,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
*stop_input = true;
break;
case IPROTO_REQUEST_VOTE:
+ case IPROTO_GET_GC_VCLOCK:
cmsg_init(&msg->base, misc_route);
break;
case IPROTO_AUTH:
@@ -1423,6 +1424,7 @@ tx_process_misc(struct cmsg *m)
struct iproto_msg *msg = tx_accept_msg(m);
struct iproto_connection *con = msg->connection;
struct obuf *out = con->tx.p_obuf;
+ struct vclock vclock;
tx_fiber_init(con->session, msg->header.sync);
@@ -1446,6 +1448,11 @@ tx_process_misc(struct cmsg *m)
&replicaset.vclock,
cfg_geti("read_only"));
break;
+ case IPROTO_GET_GC_VCLOCK:
+ box_get_gc_vclock(&vclock);
+ iproto_reply_vclock_xc(out, msg->header.sync,
+ ::schema_version, &vclock);
+ break;
default:
unreachable();
}
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 46d47719..cb2fdbf1 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -156,6 +156,8 @@ enum iproto_type {
IPROTO_SUBSCRIBE = 66,
/** Vote request command for master election */
IPROTO_REQUEST_VOTE = 67,
+ /** Command to inquire garbage collection state */
+ IPROTO_GET_GC_VCLOCK = 68,
/** Vinyl run info stored in .index file */
VY_INDEX_RUN_INFO = 100,
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 532e1296..dc5fa0a2 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -282,6 +282,35 @@ iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version)
}
int
+iproto_reply_vclock(struct obuf *out, uint64_t sync, uint32_t schema_version,
+ const struct vclock *vclock)
+{
+ size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(1) +
+ mp_sizeof_uint(UINT32_MAX) + mp_sizeof_vclock(vclock);
+
+ char *buf = obuf_reserve(out, max_size);
+ if (buf == NULL) {
+ diag_set(OutOfMemory, max_size,
+ "obuf_alloc", "buf");
+ return -1;
+ }
+
+ char *data = buf + IPROTO_HEADER_LEN;
+ data = mp_encode_map(data, 1);
+ data = mp_encode_uint(data, IPROTO_VCLOCK);
+ data = mp_encode_vclock(data, vclock);
+ size_t size = data - buf;
+ assert(size <= max_size);
+
+ iproto_header_encode(buf, IPROTO_OK, sync, schema_version,
+ size - IPROTO_HEADER_LEN);
+
+ char *ptr = obuf_alloc(out, size);
+ assert(ptr == buf);
+ return 0;
+}
+
+int
iproto_reply_request_vote(struct obuf *out, uint64_t sync,
uint32_t schema_version, const struct vclock *vclock,
bool read_only)
@@ -811,6 +840,13 @@ xrow_encode_request_vote(struct xrow_header *row)
row->type = IPROTO_REQUEST_VOTE;
}
+void
+xrow_encode_get_gc_vclock(struct xrow_header *row)
+{
+ memset(row, 0, sizeof(*row));
+ row->type = IPROTO_GET_GC_VCLOCK;
+}
+
int
xrow_encode_subscribe(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
diff --git a/src/box/xrow.h b/src/box/xrow.h
index b10bf26d..edf16ec2 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -230,6 +230,13 @@ void
xrow_encode_request_vote(struct xrow_header *row);
/**
+ * Encode a vote request for gc state inquiry.
+ * @param row[out] Row to encode into.
+ */
+void
+xrow_encode_get_gc_vclock(struct xrow_header *row);
+
+/**
* Encode SUBSCRIBE command.
* @param[out] Row.
* @param replicaset_uuid Replica set uuid.
@@ -393,6 +400,21 @@ iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version);
* @param sync Request sync.
* @param schema_version.
* @param vclock.
+ *
+ * @retval 0 Success.
+ * @retval -1 Memory error.
+ */
+int
+iproto_reply_vclock(struct obuf *out, uint64_t sync, uint32_t schema_version,
+ const struct vclock *vclock);
+
+/**
+ * Encode iproto header with IPROTO_OK response code
+ * and vclock in the body.
+ * @param out Encode to.
+ * @param sync Request sync.
+ * @param schema_version.
+ * @param vclock.
* @param read_only.
*
* @retval 0 Success.
@@ -646,6 +668,15 @@ iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version)
diag_raise();
}
+/** @copydoc iproto_reply_vclock. */
+static inline void
+iproto_reply_vclock_xc(struct obuf *out, uint64_t sync, uint32_t schema_version,
+ const struct vclock *vclock)
+{
+ if (iproto_reply_vclock(out, sync, schema_version, vclock) != 0)
+ diag_raise();
+}
+
/** @copydoc iproto_reply_request_vote_xc. */
static inline void
iproto_reply_request_vote_xc(struct obuf *out, uint64_t sync,
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread
* [RFC PATCH 12/12] replication: rebootstrap instance on startup if it fell behind
2018-06-06 17:45 [RFC PATCH 00/12] Replica rejoin Vladimir Davydov
` (10 preceding siblings ...)
2018-06-06 17:45 ` [RFC PATCH 11/12] applier: inquire oldest vclock on connect Vladimir Davydov
@ 2018-06-06 17:45 ` Vladimir Davydov
11 siblings, 0 replies; 22+ messages in thread
From: Vladimir Davydov @ 2018-06-06 17:45 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
If a replica fell too much behind its peers in the cluster and xlog
files needed for it to get up to speed have been removed, it won't be
able to proceed without rebootstrap. This patch makes the recovery
procedure detect such cases and initiate rebootstrap procedure if
necessary.
Closes #461
---
src/box/box.cc | 9 ++
src/box/replication.cc | 15 +++
src/box/replication.h | 9 ++
test/replication/replica_rejoin.result | 194 +++++++++++++++++++++++++++++++
test/replication/replica_rejoin.test.lua | 73 ++++++++++++
test/replication/suite.cfg | 1 +
6 files changed, 301 insertions(+)
create mode 100644 test/replication/replica_rejoin.result
create mode 100644 test/replication/replica_rejoin.test.lua
diff --git a/src/box/box.cc b/src/box/box.cc
index c10124ea..a83dbff5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1777,6 +1777,9 @@ bootstrap(const struct tt_uuid *instance_uuid,
/**
* Recover the instance from the local directory.
* Enter hot standby if the directory is locked.
+ * Invoke rebootstrap if the instance fell too much
+ * behind its peers in the replica set and needs
+ * to be rebootstrapped.
*/
static void
local_recovery(const struct tt_uuid *instance_uuid,
@@ -1812,6 +1815,12 @@ local_recovery(const struct tt_uuid *instance_uuid,
if (wal_dir_lock >= 0) {
box_listen();
box_sync_replication(replication_connect_timeout, false);
+
+ struct replica *master;
+ if (replicaset_needs_rejoin(&master)) {
+ say_info("replica is too old, initiating rejoin");
+ return bootstrap_from_master(master);
+ }
}
/*
diff --git a/src/box/replication.cc b/src/box/replication.cc
index c1e17698..0dda5dec 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -625,6 +625,21 @@ error:
"failed to connect to one or more replicas");
}
+bool
+replicaset_needs_rejoin(struct replica **master)
+{
+ replicaset_foreach(replica) {
+ if (replica->applier != NULL &&
+ vclock_compare(&replica->applier->gc_vclock,
+ &replicaset.vclock) > 0) {
+ *master = replica;
+ return true;
+ }
+ }
+ *master = NULL;
+ return false;
+}
+
void
replicaset_follow(void)
{
diff --git a/src/box/replication.h b/src/box/replication.h
index fdf995c3..e8b391af 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -360,6 +360,15 @@ replicaset_connect(struct applier **appliers, int count,
double timeout, bool connect_all);
/**
+ * Check if the current instance fell too much behind its
+ * peers in the replica set and needs to be rebootstrapped.
+ * If it does, return true and set @master to the instance
+ * to use for rebootstrap, otherwise return false.
+ */
+bool
+replicaset_needs_rejoin(struct replica **master);
+
+/**
* Resume all appliers registered with the replica set.
*/
void
diff --git a/test/replication/replica_rejoin.result b/test/replication/replica_rejoin.result
new file mode 100644
index 00000000..2deccd01
--- /dev/null
+++ b/test/replication/replica_rejoin.result
@@ -0,0 +1,194 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+test_run:cleanup_cluster()
+---
+...
+--
+-- gh-461: check that a replica refetches the last checkpoint
+-- in case it fell behind the master.
+--
+box.schema.user.grant('guest', 'replication')
+---
+...
+_ = box.schema.space.create('test')
+---
+...
+_ = box.space.test:create_index('pk')
+---
+...
+_ = box.space.test:insert{1}
+---
+...
+_ = box.space.test:insert{2}
+---
+...
+_ = box.space.test:insert{3}
+---
+...
+-- Join a replica, then stop it.
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.info.replication[1].upstream.status == 'follow' or box.info
+---
+- true
+...
+box.space.test:select()
+---
+- - [1]
+ - [2]
+ - [3]
+...
+_ = box.schema.space.create('replica') -- will disappear after rejoin
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+-- Restart the server to purge the replica from
+-- the garbage collection state.
+test_run:cmd("restart server default")
+-- Make some checkpoints to remove old xlogs.
+checkpoint_count = box.cfg.checkpoint_count
+---
+...
+box.cfg{checkpoint_count = 1}
+---
+...
+_ = box.space.test:delete{1}
+---
+...
+_ = box.space.test:insert{10}
+---
+...
+box.snapshot()
+---
+- ok
+...
+_ = box.space.test:delete{2}
+---
+...
+_ = box.space.test:insert{20}
+---
+...
+box.snapshot()
+---
+- ok
+...
+_ = box.space.test:delete{3}
+---
+...
+_ = box.space.test:insert{30}
+---
+...
+#box.info.gc().checkpoints -- 1
+---
+- 1
+...
+box.cfg{checkpoint_count = checkpoint_count}
+---
+...
+-- Restart the replica. Since xlogs have been removed,
+-- it is supposed to rejoin without changing id.
+test_run:cmd("start server replica")
+---
+- true
+...
+box.info.replication[2].downstream.vclock ~= nil or box.info
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.info.replication[1].upstream.status == 'follow' or box.info
+---
+- true
+...
+box.space.test:select()
+---
+- - [10]
+ - [20]
+ - [30]
+...
+box.space.replica == nil -- was removed by rejoin
+---
+- true
+...
+_ = box.schema.space.create('replica')
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+-- Make sure the replica follows new changes.
+for i = 10, 30, 10 do box.space.test:update(i, {{'!', 1, i}}) end
+---
+...
+vclock = test_run:get_vclock('default')
+---
+...
+_ = test_run:wait_vclock('replica', vclock)
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [10, 10]
+ - [20, 20]
+ - [30, 30]
+...
+-- Check that restart works as usual.
+test_run:cmd("restart server replica")
+box.info.replication[1].upstream.status == 'follow' or box.info
+---
+- true
+...
+box.space.test:select()
+---
+- - [10, 10]
+ - [20, 20]
+ - [30, 30]
+...
+box.space.replica ~= nil
+---
+- true
+...
+-- Cleanup.
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.space.test:drop()
+---
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
diff --git a/test/replication/replica_rejoin.test.lua b/test/replication/replica_rejoin.test.lua
new file mode 100644
index 00000000..5bd92119
--- /dev/null
+++ b/test/replication/replica_rejoin.test.lua
@@ -0,0 +1,73 @@
+env = require('test_run')
+test_run = env.new()
+
+test_run:cleanup_cluster()
+
+--
+-- gh-461: check that a replica refetches the last checkpoint
+-- in case it fell behind the master.
+--
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('test')
+_ = box.space.test:create_index('pk')
+_ = box.space.test:insert{1}
+_ = box.space.test:insert{2}
+_ = box.space.test:insert{3}
+
+-- Join a replica, then stop it.
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+box.info.replication[1].upstream.status == 'follow' or box.info
+box.space.test:select()
+_ = box.schema.space.create('replica') -- will disappear after rejoin
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+
+-- Restart the server to purge the replica from
+-- the garbage collection state.
+test_run:cmd("restart server default")
+
+-- Make some checkpoints to remove old xlogs.
+checkpoint_count = box.cfg.checkpoint_count
+box.cfg{checkpoint_count = 1}
+_ = box.space.test:delete{1}
+_ = box.space.test:insert{10}
+box.snapshot()
+_ = box.space.test:delete{2}
+_ = box.space.test:insert{20}
+box.snapshot()
+_ = box.space.test:delete{3}
+_ = box.space.test:insert{30}
+#box.info.gc().checkpoints -- 1
+box.cfg{checkpoint_count = checkpoint_count}
+
+-- Restart the replica. Since xlogs have been removed,
+-- it is supposed to rejoin without changing id.
+test_run:cmd("start server replica")
+box.info.replication[2].downstream.vclock ~= nil or box.info
+test_run:cmd("switch replica")
+box.info.replication[1].upstream.status == 'follow' or box.info
+box.space.test:select()
+box.space.replica == nil -- was removed by rejoin
+_ = box.schema.space.create('replica')
+test_run:cmd("switch default")
+
+-- Make sure the replica follows new changes.
+for i = 10, 30, 10 do box.space.test:update(i, {{'!', 1, i}}) end
+vclock = test_run:get_vclock('default')
+_ = test_run:wait_vclock('replica', vclock)
+test_run:cmd("switch replica")
+box.space.test:select()
+
+-- Check that restart works as usual.
+test_run:cmd("restart server replica")
+box.info.replication[1].upstream.status == 'follow' or box.info
+box.space.test:select()
+box.space.replica ~= nil
+
+-- Cleanup.
+test_run:cmd("switch default")
+test_run:cmd("cleanup server replica")
+box.space.test:drop()
+box.schema.user.revoke('guest', 'replication')
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 95e94e5a..2b609f16 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -6,6 +6,7 @@
"wal_off.test.lua": {},
"hot_standby.test.lua": {},
"rebootstrap.test.lua": {},
+ "replica_rejoin.test.lua": {},
"*": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
--
2.11.0
^ permalink raw reply [flat|nested] 22+ messages in thread