[tarantool-patches] [PATCH] replication: remove old snapshot files not needed by replicas
Serge Petrenko
sergepetrenko at tarantool.org
Wed Jun 27 17:08:22 MSK 2018
Closes #3444
branch: https://github.com/tarantool/tarantool/tree/sergepetrenko/gh-3444-remove-old-shapshots-for-replicas
issue [3444]: https://github.com/tarantool/tarantool/issues/3444
---
src/box/box.cc | 4 +--
src/box/gc.c | 69 +++++++++++++++++++++++++++++++++++---------
src/box/gc.h | 9 +++++-
src/box/relay.cc | 2 +-
test/replication/gc.result | 47 +++++++++++++++++++++++++-----
test/replication/gc.test.lua | 29 ++++++++++---------
6 files changed, 122 insertions(+), 38 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index dcbf52e10..c5690e908 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1406,7 +1406,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
/* Register the replica with the garbage collector. */
struct gc_consumer *gc = gc_consumer_register(
tt_sprintf("replica %s", tt_uuid_str(&instance_uuid)),
- vclock_sum(&start_vclock));
+ vclock_sum(&start_vclock), true);
if (gc == NULL)
diag_raise();
auto gc_guard = make_scoped_guard([=]{
@@ -2082,7 +2082,7 @@ box_backup_start(int checkpoint_idx, box_backup_cb cb, void *cb_arg)
return -1;
}
} while (checkpoint_idx-- > 0);
- backup_gc = gc_consumer_register("backup", vclock_sum(vclock));
+ backup_gc = gc_consumer_register("backup", vclock_sum(vclock), false);
if (backup_gc == NULL)
return -1;
int rc = engine_backup(vclock, cb, cb_arg);
diff --git a/src/box/gc.c b/src/box/gc.c
index 12e68f3dc..288cc7236 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -61,6 +61,8 @@ struct gc_consumer {
char *name;
/** The vclock signature tracked by this consumer. */
int64_t signature;
+ /** The flag indicating that consumer only consumes xlog files. */
+ bool xlog_only;
};
typedef rb_tree(struct gc_consumer) gc_tree_t;
@@ -69,8 +71,10 @@ typedef rb_tree(struct gc_consumer) gc_tree_t;
struct gc_state {
/** Number of checkpoints to maintain. */
int checkpoint_count;
- /** Max signature garbage collection has been called for. */
- int64_t signature;
+ /** Max signature WAL garbage collection has been called for. */
+ int64_t xlog_signature;
+ /** Max signature snapshot garbage collection has been called for. */
+ int64_t snap_signature;
/** Registered consumers, linked by gc_consumer::node. */
gc_tree_t consumers;
/**
@@ -104,7 +108,7 @@ rb_gen(MAYBE_UNUSED static inline, gc_tree_, gc_tree_t,
/** Allocate a consumer object. */
static struct gc_consumer *
-gc_consumer_new(const char *name, int64_t signature)
+gc_consumer_new(const char *name, int64_t signature, bool xlog_only)
{
struct gc_consumer *consumer = calloc(1, sizeof(*consumer));
if (consumer == NULL) {
@@ -120,6 +124,7 @@ gc_consumer_new(const char *name, int64_t signature)
return NULL;
}
consumer->signature = signature;
+ consumer->xlog_only = xlog_only;
return consumer;
}
@@ -135,7 +140,8 @@ gc_consumer_delete(struct gc_consumer *consumer)
void
gc_init(void)
{
- gc.signature = -1;
+ gc.xlog_signature = -1;
+ gc.snap_signature = -1;
gc_tree_new(&gc.consumers);
latch_create(&gc.latch);
}
@@ -155,21 +161,33 @@ gc_free(void)
latch_destroy(&gc.latch);
}
+/** Find the consumer that uses the oldest snapshot */
+struct gc_consumer *
+gc_first_snap(gc_tree_t *consumers)
+{
+ struct gc_consumer *consumer = gc_tree_first(consumers);
+ while (consumer != NULL && consumer->xlog_only)
+ consumer = gc_tree_next(consumers, consumer);
+ return consumer;
+}
+
void
gc_run(void)
{
int checkpoint_count = gc.checkpoint_count;
assert(checkpoint_count > 0);
- /* Look up the consumer that uses the oldest snapshot. */
+ /* Look up the consumer that uses the oldest WAL */
struct gc_consumer *leftmost = gc_tree_first(&gc.consumers);
+ /* Look up the consumer that uses the oldest snapshot. */
+ struct gc_consumer *leftmost_snap = gc_first_snap(&gc.consumers);
/*
* Find the oldest checkpoint that must be preserved.
* We have to maintain @checkpoint_count oldest snapshots,
* plus we can't remove snapshots that are still in use.
*/
- int64_t gc_signature = -1;
+ int64_t gc_xlog_signature = -1;
struct checkpoint_iterator checkpoints;
checkpoint_iterator_init(&checkpoints);
@@ -181,14 +199,33 @@ gc_run(void)
if (leftmost != NULL &&
leftmost->signature < vclock_sum(vclock))
continue;
- gc_signature = vclock_sum(vclock);
+ gc_xlog_signature = vclock_sum(vclock);
break;
}
- if (gc_signature <= gc.signature)
+ int64_t gc_snap_signature = -1;
+ checkpoint_count = gc.checkpoint_count;
+
+ checkpoint_iterator_init(&checkpoints);
+
+ while ((vclock = checkpoint_iterator_prev(&checkpoints)) != NULL) {
+ if (--checkpoint_count > 0)
+ continue;
+ if (leftmost_snap != NULL &&
+ leftmost_snap->signature < vclock_sum(vclock))
+ continue;
+ gc_snap_signature = vclock_sum(vclock);
+ break;
+ }
+
+ if (gc_snap_signature <= gc.snap_signature &&
+ gc_xlog_signature <= gc.xlog_signature)
return; /* nothing to do */
- gc.signature = gc_signature;
+ if (gc_snap_signature > gc.snap_signature)
+ gc.snap_signature = gc_snap_signature;
+ if (gc_xlog_signature > gc.xlog_signature)
+ gc.xlog_signature = gc_xlog_signature;
/*
* Engine callbacks may sleep, because they use coio for
@@ -204,8 +241,8 @@ gc_run(void)
* collection for memtx snapshots first and abort if it
* fails - see comment to memtx_engine_collect_garbage().
*/
- if (engine_collect_garbage(gc_signature) == 0)
- wal_collect_garbage(gc_signature);
+ if (engine_collect_garbage(gc_snap_signature) == 0)
+ wal_collect_garbage(gc_xlog_signature);
latch_unlock(&gc.latch);
}
@@ -217,9 +254,9 @@ gc_set_checkpoint_count(int checkpoint_count)
}
struct gc_consumer *
-gc_consumer_register(const char *name, int64_t signature)
+gc_consumer_register(const char *name, int64_t signature, bool xlog_only)
{
- struct gc_consumer *consumer = gc_consumer_new(name, signature);
+ struct gc_consumer *consumer = gc_consumer_new(name, signature, xlog_only);
if (consumer != NULL)
gc_tree_insert(&gc.consumers, consumer);
return consumer;
@@ -287,6 +324,12 @@ gc_consumer_signature(const struct gc_consumer *consumer)
return consumer->signature;
}
+bool
+gc_consumer_xlog_only(const struct gc_consumer *consumer)
+{
+ return consumer->xlog_only;
+}
+
struct gc_consumer *
gc_consumer_iterator_next(struct gc_consumer_iterator *it)
{
diff --git a/src/box/gc.h b/src/box/gc.h
index 634ce6d38..c9a1d6558 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -33,6 +33,7 @@
#include <stddef.h>
#include <stdint.h>
+#include <stdbool.h>
#if defined(__cplusplus)
extern "C" {
@@ -74,12 +75,14 @@ gc_set_checkpoint_count(int checkpoint_count);
* @signature until the consumer is unregistered or advanced.
* @name is a human-readable name of the consumer, it will
* be used for reporting the consumer to the user.
+ * @xlog_only is a flag reporting whether consumer only consumes
+ * xlog files.
*
* Returns a pointer to the new consumer object or NULL on
* memory allocation failure.
*/
struct gc_consumer *
-gc_consumer_register(const char *name, int64_t signature);
+gc_consumer_register(const char *name, int64_t signature, bool xlog_only);
/**
* Unregister a consumer and invoke garbage collection
@@ -103,6 +106,10 @@ gc_consumer_name(const struct gc_consumer *consumer);
int64_t
gc_consumer_signature(const struct gc_consumer *consumer);
+/** Return whether consumer only consumes xlog files. */
+bool
+gc_consumer_xlog_only(const struct gc_consumer *consumer);
+
/**
* Iterator over registered consumers. The iterator is valid
* as long as the caller doesn't yield.
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a25cc540b..9567519d0 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -578,7 +578,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
if (replica->gc == NULL) {
replica->gc = gc_consumer_register(
tt_sprintf("replica %s", tt_uuid_str(&replica->uuid)),
- vclock_sum(replica_clock));
+ vclock_sum(replica_clock), true);
if (replica->gc == NULL)
diag_raise();
}
diff --git a/test/replication/gc.result b/test/replication/gc.result
index 7d6644ae6..adbe04ca2 100644
--- a/test/replication/gc.result
+++ b/test/replication/gc.result
@@ -1,3 +1,6 @@
+fio = require 'fio'
+---
+...
test_run = require('test_run').new()
---
...
@@ -115,6 +118,10 @@ wait_gc(1)
---
- true
...
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
+---
+- true
+...
-- Make sure the replica will receive data it is subscribed
-- to long enough for us to invoke garbage collection.
box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05)
@@ -131,7 +138,11 @@ box.snapshot()
---
- ok
...
-#box.info.gc().checkpoints == 2 or box.info.gc()
+#box.info.gc().checkpoints == 1 or box.info.gc()
+---
+- true
+...
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
---
- true
...
@@ -166,6 +177,10 @@ wait_gc(1)
---
- true
...
+#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master')
+---
+- true
+...
--
-- Check that the master doesn't delete xlog files sent to the
-- replica until it receives a confirmation that the data has
@@ -199,9 +214,13 @@ fiber.sleep(0.1) -- wait for master to relay data
---
...
-- Garbage collection must not delete the old xlog file
--- (and the corresponding snapshot), because it is still
--- needed by the replica.
-#box.info.gc().checkpoints == 2 or box.info.gc()
+-- because it is still needed by the replica, but remove
+-- the old snapshot.
+#box.info.gc().checkpoints == 1 or box.info.gc()
+---
+- true
+...
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
---
- true
...
@@ -265,6 +284,10 @@ wait_gc(1)
---
- true
...
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
+---
+- true
+...
-- Stop the replica.
test_run:cmd("stop server replica")
---
@@ -274,8 +297,8 @@ test_run:cmd("cleanup server replica")
---
- true
...
--- Invoke garbage collection. Check that it doesn't remove
--- the checkpoint last used by the replica.
+-- Invoke garbage collection. Check that it removes the old
+-- checkpoint, but keeps the xlog last used by the replica.
_ = s:auto_increment{}
---
...
@@ -283,11 +306,15 @@ box.snapshot()
---
- ok
...
-#box.info.gc().checkpoints == 2 or box.info.gc()
+#box.info.gc().checkpoints == 1 or box.info.gc()
---
- true
...
--- The checkpoint should only be deleted after the replica
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
+---
+- true
+...
+-- The xlog should only be deleted after the replica
-- is unregistered.
test_run:cleanup_cluster()
---
@@ -296,6 +323,10 @@ test_run:cleanup_cluster()
---
- true
...
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
+---
+- true
+...
--
-- Test that concurrent invocation of the garbage collector works fine.
--
diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua
index 3a680075e..2b9ab0cf0 100644
--- a/test/replication/gc.test.lua
+++ b/test/replication/gc.test.lua
@@ -1,3 +1,4 @@
+fio = require 'fio'
test_run = require('test_run').new()
engine = test_run:get_cfg('engine')
replica_set = require('fast_replica')
@@ -60,7 +61,7 @@ test_run:cmd("switch default")
-- the replica released the corresponding checkpoint.
wait_gc(1)
#box.info.gc().checkpoints == 1 or box.info.gc()
-
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
-- Make sure the replica will receive data it is subscribed
-- to long enough for us to invoke garbage collection.
box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05)
@@ -71,8 +72,8 @@ for i = 1, 100 do s:auto_increment{} end
-- Invoke garbage collection. Check that it doesn't remove
-- xlogs needed by the replica.
box.snapshot()
-#box.info.gc().checkpoints == 2 or box.info.gc()
-
+#box.info.gc().checkpoints == 1 or box.info.gc()
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
-- Remove the timeout injection so that the replica catches
-- up quickly.
box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0)
@@ -87,7 +88,7 @@ test_run:cmd("switch default")
-- from the old checkpoint.
wait_gc(1)
#box.info.gc().checkpoints == 1 or box.info.gc()
-
+#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master')
--
-- Check that the master doesn't delete xlog files sent to the
-- replica until it receives a confirmation that the data has
@@ -103,9 +104,10 @@ box.snapshot() -- rotate xlog
for i = 1, 5 do s:auto_increment{} end
fiber.sleep(0.1) -- wait for master to relay data
-- Garbage collection must not delete the old xlog file
--- (and the corresponding snapshot), because it is still
--- needed by the replica.
-#box.info.gc().checkpoints == 2 or box.info.gc()
+-- because it is still needed by the replica, but remove
+-- the old snapshot.
+#box.info.gc().checkpoints == 1 or box.info.gc()
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
test_run:cmd("switch replica")
-- Unblock the replica and make it fail to apply a row.
box.info.replication[1].upstream.message == nil
@@ -125,22 +127,23 @@ test_run:cmd("switch default")
-- Now it's safe to drop the old xlog.
wait_gc(1)
#box.info.gc().checkpoints == 1 or box.info.gc()
-
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
-- Stop the replica.
test_run:cmd("stop server replica")
test_run:cmd("cleanup server replica")
--- Invoke garbage collection. Check that it doesn't remove
--- the checkpoint last used by the replica.
+-- Invoke garbage collection. Check that it removes the old
+-- checkpoint, but keeps the xlog last used by the replica.
_ = s:auto_increment{}
box.snapshot()
-#box.info.gc().checkpoints == 2 or box.info.gc()
+#box.info.gc().checkpoints == 1 or box.info.gc()
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
--- The checkpoint should only be deleted after the replica
+-- The xlog should only be deleted after the replica
-- is unregistered.
test_run:cleanup_cluster()
#box.info.gc().checkpoints == 1 or box.info.gc()
-
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
--
-- Test that concurrent invocation of the garbage collector works fine.
--
--
2.15.2 (Apple Git-101.1)
More information about the Tarantool-patches
mailing list