[tarantool-patches] [PATCH v3] replication: remove old snapshot files not needed by replicas

Serge Petrenko sergepetrenko at tarantool.org
Thu Jun 28 18:41:20 MSK 2018


Garbage collection doesn't distinguish consumers which need checkpoint
files, such as backup, and the ones, who only need WALS, such as
replicas. A disconnected replica will 'hold' all checkpoint files, created
after it got unsynchronised, even though it doesn't need them, which may
lead to disk space shortage. To fix this, we store consumer's type, and
treat consumers differently during garbage collection: now only the old
WALS are stored for replicas, and old checkpoints are stored for backup,
if any. Also changed the tests to check updated garbage collection correctly.

Closes #3444
---
https://github.com/tarantool/tarantool/tree/sergepetrenko/gh-3444-remove-old-shapshots-for-replicas
https://github.com/tarantool/tarantool/issues/3444

Changes in v3
- Fixed indentation and code style

Changes in v2:
- prefixed variable names with prefix 'checkpoint_'
  instead of 'snap_', so that there is no
  confusion with memtx snapshots
- same with changing variable name
  xlog_only to wal_only
- rewrote gc_run so that there is only a single
  loop over checkpoints, and also one excess old
  WAL is removed (it was unneeded, but kept due
  to a mistake). Now wal_collect_garbage or
  engine_collect_garbage are called only if they
  have work to do.
- fix tests to correctly check the amount of xlogs
  kept by garbage collection

 src/box/box.cc               |  4 +--
 src/box/gc.c                 | 67 +++++++++++++++++++++++++++++-----------
 src/box/gc.h                 |  5 ++-
 src/box/relay.cc             |  2 +-
 test/replication/gc.result   | 73 ++++++++++++++++++++++++++++++++++++++------
 test/replication/gc.test.lua | 41 +++++++++++++++++--------
 6 files changed, 148 insertions(+), 44 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index dcbf52e10..c5690e908 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1406,7 +1406,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	/* Register the replica with the garbage collector. */
 	struct gc_consumer *gc = gc_consumer_register(
 		tt_sprintf("replica %s", tt_uuid_str(&instance_uuid)),
-		vclock_sum(&start_vclock));
+		vclock_sum(&start_vclock), true);
 	if (gc == NULL)
 		diag_raise();
 	auto gc_guard = make_scoped_guard([=]{
@@ -2082,7 +2082,7 @@ box_backup_start(int checkpoint_idx, box_backup_cb cb, void *cb_arg)
 			return -1;
 		}
 	} while (checkpoint_idx-- > 0);
-	backup_gc = gc_consumer_register("backup", vclock_sum(vclock));
+	backup_gc = gc_consumer_register("backup", vclock_sum(vclock), false);
 	if (backup_gc == NULL)
 		return -1;
 	int rc = engine_backup(vclock, cb, cb_arg);
diff --git a/src/box/gc.c b/src/box/gc.c
index 12e68f3dc..3c9157da2 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -61,6 +61,8 @@ struct gc_consumer {
 	char *name;
 	/** The vclock signature tracked by this consumer. */
 	int64_t signature;
+	/** The flag indicating that consumer only consumes WAL files. */
+	bool wal_only;
 };
 
 typedef rb_tree(struct gc_consumer) gc_tree_t;
@@ -69,8 +71,10 @@ typedef rb_tree(struct gc_consumer) gc_tree_t;
 struct gc_state {
 	/** Number of checkpoints to maintain. */
 	int checkpoint_count;
-	/** Max signature garbage collection has been called for. */
-	int64_t signature;
+	/** Max signature WAL garbage collection has been called for. */
+	int64_t wal_signature;
+	/** Max signature checkpoint garbage collection has been called for. */
+	int64_t checkpoint_signature;
 	/** Registered consumers, linked by gc_consumer::node. */
 	gc_tree_t consumers;
 	/**
@@ -104,7 +108,7 @@ rb_gen(MAYBE_UNUSED static inline, gc_tree_, gc_tree_t,
 
 /** Allocate a consumer object. */
 static struct gc_consumer *
-gc_consumer_new(const char *name, int64_t signature)
+gc_consumer_new(const char *name, int64_t signature, bool wal_only)
 {
 	struct gc_consumer *consumer = calloc(1, sizeof(*consumer));
 	if (consumer == NULL) {
@@ -120,6 +124,7 @@ gc_consumer_new(const char *name, int64_t signature)
 		return NULL;
 	}
 	consumer->signature = signature;
+	consumer->wal_only = wal_only;
 	return consumer;
 }
 
@@ -135,7 +140,8 @@ gc_consumer_delete(struct gc_consumer *consumer)
 void
 gc_init(void)
 {
-	gc.signature = -1;
+	gc.wal_signature = -1;
+	gc.checkpoint_signature = -1;
 	gc_tree_new(&gc.consumers);
 	latch_create(&gc.latch);
 }
@@ -155,21 +161,34 @@ gc_free(void)
 	latch_destroy(&gc.latch);
 }
 
+/** Find the consumer that uses the oldest checkpoint. */
+struct gc_consumer *
+gc_tree_first_checkpoint(gc_tree_t *consumers)
+{
+	struct gc_consumer *consumer = gc_tree_first(consumers);
+	while (consumer != NULL && consumer->wal_only)
+		consumer = gc_tree_next(consumers, consumer);
+	return consumer;
+}
+
 void
 gc_run(void)
 {
 	int checkpoint_count = gc.checkpoint_count;
 	assert(checkpoint_count > 0);
 
-	/* Look up the consumer that uses the oldest snapshot. */
+	/* Look up the consumer that uses the oldest WAL. */
 	struct gc_consumer *leftmost = gc_tree_first(&gc.consumers);
+	/* Look up the consumer that uses the oldest checkpoint. */
+	struct gc_consumer *leftmost_checkpoint =
+		gc_tree_first_checkpoint(&gc.consumers);
 
 	/*
 	 * Find the oldest checkpoint that must be preserved.
-	 * We have to maintain @checkpoint_count oldest snapshots,
-	 * plus we can't remove snapshots that are still in use.
+	 * We have to maintain @checkpoint_count oldest checkpoints,
+	 * plus we can't remove checkpoints that are still in use.
 	 */
-	int64_t gc_signature = -1;
+	int64_t gc_checkpoint_signature = -1;
 
 	struct checkpoint_iterator checkpoints;
 	checkpoint_iterator_init(&checkpoints);
@@ -178,17 +197,19 @@ gc_run(void)
 	while ((vclock = checkpoint_iterator_prev(&checkpoints)) != NULL) {
 		if (--checkpoint_count > 0)
 			continue;
-		if (leftmost != NULL &&
-		    leftmost->signature < vclock_sum(vclock))
+		if (leftmost_checkpoint != NULL &&
+			leftmost_checkpoint->signature < vclock_sum(vclock))
 			continue;
-		gc_signature = vclock_sum(vclock);
+		gc_checkpoint_signature = vclock_sum(vclock);
 		break;
 	}
 
-	if (gc_signature <= gc.signature)
-		return; /* nothing to do */
+	int64_t gc_wal_signature = MIN(gc_checkpoint_signature, leftmost != NULL ?
+		leftmost->signature : INT64_MAX);
 
-	gc.signature = gc_signature;
+	if (gc_checkpoint_signature <= gc.checkpoint_signature &&
+		gc_wal_signature <= gc.wal_signature)
+		return; /* nothing to do */
 
 	/*
 	 * Engine callbacks may sleep, because they use coio for
@@ -197,6 +218,7 @@ gc_run(void)
 	 * executions.
 	 */
 	latch_lock(&gc.latch);
+
 	/*
 	 * Run garbage collection.
 	 *
@@ -204,8 +226,17 @@ gc_run(void)
 	 * collection for memtx snapshots first and abort if it
 	 * fails - see comment to memtx_engine_collect_garbage().
 	 */
-	if (engine_collect_garbage(gc_signature) == 0)
-		wal_collect_garbage(gc_signature);
+	int rc = 0;
+
+	if (gc_checkpoint_signature > gc.checkpoint_signature) {
+		gc.checkpoint_signature = gc_checkpoint_signature;
+		rc = engine_collect_garbage(gc_checkpoint_signature);
+	}
+	if (gc_wal_signature > gc.wal_signature) {
+		gc.wal_signature = gc_wal_signature;
+		if (rc == 0)
+			wal_collect_garbage(gc_wal_signature);
+	}
 
 	latch_unlock(&gc.latch);
 }
@@ -217,9 +248,9 @@ gc_set_checkpoint_count(int checkpoint_count)
 }
 
 struct gc_consumer *
-gc_consumer_register(const char *name, int64_t signature)
+gc_consumer_register(const char *name, int64_t signature, bool wal_only)
 {
-	struct gc_consumer *consumer = gc_consumer_new(name, signature);
+	struct gc_consumer *consumer = gc_consumer_new(name, signature, wal_only);
 	if (consumer != NULL)
 		gc_tree_insert(&gc.consumers, consumer);
 	return consumer;
diff --git a/src/box/gc.h b/src/box/gc.h
index 634ce6d38..36edd7740 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -33,6 +33,7 @@
 
 #include <stddef.h>
 #include <stdint.h>
+#include <stdbool.h>
 
 #if defined(__cplusplus)
 extern "C" {
@@ -74,12 +75,14 @@ gc_set_checkpoint_count(int checkpoint_count);
  * @signature until the consumer is unregistered or advanced.
  * @name is a human-readable name of the consumer, it will
  * be used for reporting the consumer to the user.
+ * @wal_only is a flag reporting whether consumer only depends
+ * on WAL files.
  *
  * Returns a pointer to the new consumer object or NULL on
  * memory allocation failure.
  */
 struct gc_consumer *
-gc_consumer_register(const char *name, int64_t signature);
+gc_consumer_register(const char *name, int64_t signature, bool wal_only);
 
 /**
  * Unregister a consumer and invoke garbage collection
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a25cc540b..9567519d0 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -578,7 +578,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	if (replica->gc == NULL) {
 		replica->gc = gc_consumer_register(
 			tt_sprintf("replica %s", tt_uuid_str(&replica->uuid)),
-			vclock_sum(replica_clock));
+			vclock_sum(replica_clock), true);
 		if (replica->gc == NULL)
 			diag_raise();
 	}
diff --git a/test/replication/gc.result b/test/replication/gc.result
index 7d6644ae6..084530e8a 100644
--- a/test/replication/gc.result
+++ b/test/replication/gc.result
@@ -1,3 +1,6 @@
+fio = require 'fio'
+---
+...
 test_run = require('test_run').new()
 ---
 ...
@@ -115,6 +118,10 @@ wait_gc(1)
 ---
 - true
 ...
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
+---
+- true
+...
 -- Make sure the replica will receive data it is subscribed
 -- to long enough for us to invoke garbage collection.
 box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05)
@@ -122,16 +129,34 @@ box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05)
 - ok
 ...
 -- Send more data to the replica.
-for i = 1, 100 do s:auto_increment{} end
+-- Need to do 2 snapshots here, otherwise the replica would
+-- only require 1 xlog and that case would be
+-- undistingvishable from wrong operation.
+for i = 1, 50 do s:auto_increment{} end
+---
+...
+box.snapshot()
+---
+- ok
+...
+for i = 1, 50 do s:auto_increment{} end
 ---
 ...
+box.snapshot()
+---
+- ok
+...
 -- Invoke garbage collection. Check that it doesn't remove
 -- xlogs needed by the replica.
 box.snapshot()
 ---
 - ok
 ...
-#box.info.gc().checkpoints == 2 or box.info.gc()
+#box.info.gc().checkpoints == 1 or box.info.gc()
+---
+- true
+...
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
 ---
 - true
 ...
@@ -166,6 +191,10 @@ wait_gc(1)
 ---
 - true
 ...
+#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master')
+---
+- true
+...
 --
 -- Check that the master doesn't delete xlog files sent to the
 -- replica until it receives a confirmation that the data has
@@ -199,9 +228,13 @@ fiber.sleep(0.1) -- wait for master to relay data
 ---
 ...
 -- Garbage collection must not delete the old xlog file
--- (and the corresponding snapshot), because it is still
--- needed by the replica.
-#box.info.gc().checkpoints == 2 or box.info.gc()
+-- because it is still needed by the replica, but remove
+-- the old snapshot.
+#box.info.gc().checkpoints == 1 or box.info.gc()
+---
+- true
+...
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
 ---
 - true
 ...
@@ -265,6 +298,10 @@ wait_gc(1)
 ---
 - true
 ...
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
+---
+- true
+...
 -- Stop the replica.
 test_run:cmd("stop server replica")
 ---
@@ -274,8 +311,18 @@ test_run:cmd("cleanup server replica")
 ---
 - true
 ...
--- Invoke garbage collection. Check that it doesn't remove
--- the checkpoint last used by the replica.
+-- Invoke garbage collection. Check that it removes the old
+-- checkpoint, but keeps the xlog last used by the replica.
+-- once again, need 2 snapshots because after 1 snapshot
+-- with no insertions after it the replica would need only
+-- 1 xlog, which is stored anyways.
+_ = s:auto_increment{}
+---
+...
+box.snapshot()
+---
+- ok
+...
 _ = s:auto_increment{}
 ---
 ...
@@ -283,11 +330,15 @@ box.snapshot()
 ---
 - ok
 ...
-#box.info.gc().checkpoints == 2 or box.info.gc()
+#box.info.gc().checkpoints == 1 or box.info.gc()
 ---
 - true
 ...
--- The checkpoint should only be deleted after the replica
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
+---
+- true
+...
+-- The xlog should only be deleted after the replica
 -- is unregistered.
 test_run:cleanup_cluster()
 ---
@@ -296,6 +347,10 @@ test_run:cleanup_cluster()
 ---
 - true
 ...
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
+---
+- true
+...
 --
 -- Test that concurrent invocation of the garbage collector works fine.
 --
diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua
index 3a680075e..710c99ea7 100644
--- a/test/replication/gc.test.lua
+++ b/test/replication/gc.test.lua
@@ -1,3 +1,4 @@
+fio = require 'fio'
 test_run = require('test_run').new()
 engine = test_run:get_cfg('engine')
 replica_set = require('fast_replica')
@@ -60,18 +61,25 @@ test_run:cmd("switch default")
 -- the replica released the corresponding checkpoint.
 wait_gc(1)
 #box.info.gc().checkpoints == 1 or box.info.gc()
-
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
 -- Make sure the replica will receive data it is subscribed
 -- to long enough for us to invoke garbage collection.
 box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05)
 
 -- Send more data to the replica.
-for i = 1, 100 do s:auto_increment{} end
+-- Need to do 2 snapshots here, otherwise the replica would
+-- only require 1 xlog and that case would be
+-- undistingvishable from wrong operation.
+for i = 1, 50 do s:auto_increment{} end
+box.snapshot()
+for i = 1, 50 do s:auto_increment{} end
+box.snapshot()
 
 -- Invoke garbage collection. Check that it doesn't remove
 -- xlogs needed by the replica.
 box.snapshot()
-#box.info.gc().checkpoints == 2 or box.info.gc()
+#box.info.gc().checkpoints == 1 or box.info.gc()
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
 
 -- Remove the timeout injection so that the replica catches
 -- up quickly.
@@ -87,7 +95,7 @@ test_run:cmd("switch default")
 -- from the old checkpoint.
 wait_gc(1)
 #box.info.gc().checkpoints == 1 or box.info.gc()
-
+#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master')
 --
 -- Check that the master doesn't delete xlog files sent to the
 -- replica until it receives a confirmation that the data has
@@ -103,9 +111,10 @@ box.snapshot() -- rotate xlog
 for i = 1, 5 do s:auto_increment{} end
 fiber.sleep(0.1) -- wait for master to relay data
 -- Garbage collection must not delete the old xlog file
--- (and the corresponding snapshot), because it is still
--- needed by the replica.
-#box.info.gc().checkpoints == 2 or box.info.gc()
+-- because it is still needed by the replica, but remove
+-- the old snapshot.
+#box.info.gc().checkpoints == 1 or box.info.gc()
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
 test_run:cmd("switch replica")
 -- Unblock the replica and make it fail to apply a row.
 box.info.replication[1].upstream.message == nil
@@ -125,22 +134,28 @@ test_run:cmd("switch default")
 -- Now it's safe to drop the old xlog.
 wait_gc(1)
 #box.info.gc().checkpoints == 1 or box.info.gc()
-
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
 -- Stop the replica.
 test_run:cmd("stop server replica")
 test_run:cmd("cleanup server replica")
 
--- Invoke garbage collection. Check that it doesn't remove
--- the checkpoint last used by the replica.
+-- Invoke garbage collection. Check that it removes the old
+-- checkpoint, but keeps the xlog last used by the replica.
+-- once again, need 2 snapshots because after 1 snapshot
+-- with no insertions after it the replica would need only
+-- 1 xlog, which is stored anyways.
+_ = s:auto_increment{}
+box.snapshot()
 _ = s:auto_increment{}
 box.snapshot()
-#box.info.gc().checkpoints == 2 or box.info.gc()
+#box.info.gc().checkpoints == 1 or box.info.gc()
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
 
--- The checkpoint should only be deleted after the replica
+-- The xlog should only be deleted after the replica
 -- is unregistered.
 test_run:cleanup_cluster()
 #box.info.gc().checkpoints == 1 or box.info.gc()
-
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
 --
 -- Test that concurrent invocation of the garbage collector works fine.
 --
-- 
2.15.2 (Apple Git-101.1)





More information about the Tarantool-patches mailing list