Tarantool development patches archive
 help / color / mirror / Atom feed
From: Serge Petrenko <sergepetrenko@tarantool.org>
To: tarantool-patches@freelists.org
Cc: Serge Petrenko <sergepetrenko@tarantool.org>
Subject: [tarantool-patches] [PATCH] replication: remove old snapshot files not needed by replicas
Date: Wed, 27 Jun 2018 17:08:22 +0300	[thread overview]
Message-ID: <20180627140822.46368-1-sergepetrenko@tarantool.org> (raw)

Closes #3444

branch: https://github.com/tarantool/tarantool/tree/sergepetrenko/gh-3444-remove-old-shapshots-for-replicas
issue [3444]: https://github.com/tarantool/tarantool/issues/3444
---
 src/box/box.cc               |  4 +--
 src/box/gc.c                 | 69 +++++++++++++++++++++++++++++++++++---------
 src/box/gc.h                 |  9 +++++-
 src/box/relay.cc             |  2 +-
 test/replication/gc.result   | 47 +++++++++++++++++++++++++-----
 test/replication/gc.test.lua | 29 ++++++++++---------
 6 files changed, 122 insertions(+), 38 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index dcbf52e10..c5690e908 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1406,7 +1406,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	/* Register the replica with the garbage collector. */
 	struct gc_consumer *gc = gc_consumer_register(
 		tt_sprintf("replica %s", tt_uuid_str(&instance_uuid)),
-		vclock_sum(&start_vclock));
+		vclock_sum(&start_vclock), true);
 	if (gc == NULL)
 		diag_raise();
 	auto gc_guard = make_scoped_guard([=]{
@@ -2082,7 +2082,7 @@ box_backup_start(int checkpoint_idx, box_backup_cb cb, void *cb_arg)
 			return -1;
 		}
 	} while (checkpoint_idx-- > 0);
-	backup_gc = gc_consumer_register("backup", vclock_sum(vclock));
+	backup_gc = gc_consumer_register("backup", vclock_sum(vclock), false);
 	if (backup_gc == NULL)
 		return -1;
 	int rc = engine_backup(vclock, cb, cb_arg);
diff --git a/src/box/gc.c b/src/box/gc.c
index 12e68f3dc..288cc7236 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -61,6 +61,8 @@ struct gc_consumer {
 	char *name;
 	/** The vclock signature tracked by this consumer. */
 	int64_t signature;
+	/** The flag indicating that consumer only consumes xlog files. */
+	bool xlog_only;
 };
 
 typedef rb_tree(struct gc_consumer) gc_tree_t;
@@ -69,8 +71,10 @@ typedef rb_tree(struct gc_consumer) gc_tree_t;
 struct gc_state {
 	/** Number of checkpoints to maintain. */
 	int checkpoint_count;
-	/** Max signature garbage collection has been called for. */
-	int64_t signature;
+	/** Max signature WAL garbage collection has been called for. */
+	int64_t xlog_signature;
+	/** Max signature snapshot garbage collection has been called for. */
+	int64_t snap_signature;
 	/** Registered consumers, linked by gc_consumer::node. */
 	gc_tree_t consumers;
 	/**
@@ -104,7 +108,7 @@ rb_gen(MAYBE_UNUSED static inline, gc_tree_, gc_tree_t,
 
 /** Allocate a consumer object. */
 static struct gc_consumer *
-gc_consumer_new(const char *name, int64_t signature)
+gc_consumer_new(const char *name, int64_t signature, bool xlog_only)
 {
 	struct gc_consumer *consumer = calloc(1, sizeof(*consumer));
 	if (consumer == NULL) {
@@ -120,6 +124,7 @@ gc_consumer_new(const char *name, int64_t signature)
 		return NULL;
 	}
 	consumer->signature = signature;
+	consumer->xlog_only = xlog_only;
 	return consumer;
 }
 
@@ -135,7 +140,8 @@ gc_consumer_delete(struct gc_consumer *consumer)
 void
 gc_init(void)
 {
-	gc.signature = -1;
+	gc.xlog_signature = -1;
+	gc.snap_signature = -1;
 	gc_tree_new(&gc.consumers);
 	latch_create(&gc.latch);
 }
@@ -155,21 +161,33 @@ gc_free(void)
 	latch_destroy(&gc.latch);
 }
 
+/** Find the consumer that uses the oldest snapshot */
+struct gc_consumer *
+gc_first_snap(gc_tree_t *consumers)
+{
+	struct gc_consumer *consumer = gc_tree_first(consumers);
+	while (consumer != NULL && consumer->xlog_only)
+		consumer = gc_tree_next(consumers, consumer);
+	return consumer;
+}
+
 void
 gc_run(void)
 {
 	int checkpoint_count = gc.checkpoint_count;
 	assert(checkpoint_count > 0);
 
-	/* Look up the consumer that uses the oldest snapshot. */
+	/* Look up the consumer that uses the oldest WAL */
 	struct gc_consumer *leftmost = gc_tree_first(&gc.consumers);
+	/* Look up the consumer that uses the oldest snapshot. */
+	struct gc_consumer *leftmost_snap = gc_first_snap(&gc.consumers);
 
 	/*
 	 * Find the oldest checkpoint that must be preserved.
 	 * We have to maintain @checkpoint_count oldest snapshots,
 	 * plus we can't remove snapshots that are still in use.
 	 */
-	int64_t gc_signature = -1;
+	int64_t gc_xlog_signature = -1;
 
 	struct checkpoint_iterator checkpoints;
 	checkpoint_iterator_init(&checkpoints);
@@ -181,14 +199,33 @@ gc_run(void)
 		if (leftmost != NULL &&
 		    leftmost->signature < vclock_sum(vclock))
 			continue;
-		gc_signature = vclock_sum(vclock);
+		gc_xlog_signature = vclock_sum(vclock);
 		break;
 	}
 
-	if (gc_signature <= gc.signature)
+	int64_t gc_snap_signature = -1;
+	checkpoint_count = gc.checkpoint_count;
+
+	checkpoint_iterator_init(&checkpoints);
+
+	while ((vclock = checkpoint_iterator_prev(&checkpoints)) != NULL) {
+		if (--checkpoint_count > 0)
+			continue;
+		if (leftmost_snap != NULL &&
+		    leftmost_snap->signature < vclock_sum(vclock))
+			continue;
+		gc_snap_signature = vclock_sum(vclock);
+		break;
+	}
+
+	if (gc_snap_signature <= gc.snap_signature &&
+	    gc_xlog_signature <= gc.xlog_signature)
 		return; /* nothing to do */
 
-	gc.signature = gc_signature;
+	if (gc_snap_signature > gc.snap_signature)
+		gc.snap_signature = gc_snap_signature;
+	if (gc_xlog_signature > gc.xlog_signature)
+		gc.xlog_signature = gc_xlog_signature;
 
 	/*
 	 * Engine callbacks may sleep, because they use coio for
@@ -204,8 +241,8 @@ gc_run(void)
 	 * collection for memtx snapshots first and abort if it
 	 * fails - see comment to memtx_engine_collect_garbage().
 	 */
-	if (engine_collect_garbage(gc_signature) == 0)
-		wal_collect_garbage(gc_signature);
+	if (engine_collect_garbage(gc_snap_signature) == 0)
+		wal_collect_garbage(gc_xlog_signature);
 
 	latch_unlock(&gc.latch);
 }
@@ -217,9 +254,9 @@ gc_set_checkpoint_count(int checkpoint_count)
 }
 
 struct gc_consumer *
-gc_consumer_register(const char *name, int64_t signature)
+gc_consumer_register(const char *name, int64_t signature, bool xlog_only)
 {
-	struct gc_consumer *consumer = gc_consumer_new(name, signature);
+	struct gc_consumer *consumer = gc_consumer_new(name, signature, xlog_only);
 	if (consumer != NULL)
 		gc_tree_insert(&gc.consumers, consumer);
 	return consumer;
@@ -287,6 +324,12 @@ gc_consumer_signature(const struct gc_consumer *consumer)
 	return consumer->signature;
 }
 
+bool
+gc_consumer_xlog_only(const struct gc_consumer *consumer)
+{
+	return consumer->xlog_only;
+}
+
 struct gc_consumer *
 gc_consumer_iterator_next(struct gc_consumer_iterator *it)
 {
diff --git a/src/box/gc.h b/src/box/gc.h
index 634ce6d38..c9a1d6558 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -33,6 +33,7 @@
 
 #include <stddef.h>
 #include <stdint.h>
+#include <stdbool.h>
 
 #if defined(__cplusplus)
 extern "C" {
@@ -74,12 +75,14 @@ gc_set_checkpoint_count(int checkpoint_count);
  * @signature until the consumer is unregistered or advanced.
  * @name is a human-readable name of the consumer, it will
  * be used for reporting the consumer to the user.
+ * @xlog_only is a flag reporting whether consumer only consumes
+ * xlog files.
  *
  * Returns a pointer to the new consumer object or NULL on
  * memory allocation failure.
  */
 struct gc_consumer *
-gc_consumer_register(const char *name, int64_t signature);
+gc_consumer_register(const char *name, int64_t signature, bool xlog_only);
 
 /**
  * Unregister a consumer and invoke garbage collection
@@ -103,6 +106,10 @@ gc_consumer_name(const struct gc_consumer *consumer);
 int64_t
 gc_consumer_signature(const struct gc_consumer *consumer);
 
+/** Return whether consumer only consumes xlog files. */
+bool
+gc_consumer_xlog_only(const struct gc_consumer *consumer);
+
 /**
  * Iterator over registered consumers. The iterator is valid
  * as long as the caller doesn't yield.
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a25cc540b..9567519d0 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -578,7 +578,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	if (replica->gc == NULL) {
 		replica->gc = gc_consumer_register(
 			tt_sprintf("replica %s", tt_uuid_str(&replica->uuid)),
-			vclock_sum(replica_clock));
+			vclock_sum(replica_clock), true);
 		if (replica->gc == NULL)
 			diag_raise();
 	}
diff --git a/test/replication/gc.result b/test/replication/gc.result
index 7d6644ae6..adbe04ca2 100644
--- a/test/replication/gc.result
+++ b/test/replication/gc.result
@@ -1,3 +1,6 @@
+fio = require 'fio'
+---
+...
 test_run = require('test_run').new()
 ---
 ...
@@ -115,6 +118,10 @@ wait_gc(1)
 ---
 - true
 ...
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
+---
+- true
+...
 -- Make sure the replica will receive data it is subscribed
 -- to long enough for us to invoke garbage collection.
 box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05)
@@ -131,7 +138,11 @@ box.snapshot()
 ---
 - ok
 ...
-#box.info.gc().checkpoints == 2 or box.info.gc()
+#box.info.gc().checkpoints == 1 or box.info.gc()
+---
+- true
+...
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
 ---
 - true
 ...
@@ -166,6 +177,10 @@ wait_gc(1)
 ---
 - true
 ...
+#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master')
+---
+- true
+...
 --
 -- Check that the master doesn't delete xlog files sent to the
 -- replica until it receives a confirmation that the data has
@@ -199,9 +214,13 @@ fiber.sleep(0.1) -- wait for master to relay data
 ---
 ...
 -- Garbage collection must not delete the old xlog file
--- (and the corresponding snapshot), because it is still
--- needed by the replica.
-#box.info.gc().checkpoints == 2 or box.info.gc()
+-- because it is still needed by the replica, but remove
+-- the old snapshot.
+#box.info.gc().checkpoints == 1 or box.info.gc()
+---
+- true
+...
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
 ---
 - true
 ...
@@ -265,6 +284,10 @@ wait_gc(1)
 ---
 - true
 ...
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
+---
+- true
+...
 -- Stop the replica.
 test_run:cmd("stop server replica")
 ---
@@ -274,8 +297,8 @@ test_run:cmd("cleanup server replica")
 ---
 - true
 ...
--- Invoke garbage collection. Check that it doesn't remove
--- the checkpoint last used by the replica.
+-- Invoke garbage collection. Check that it removes the old
+-- checkpoint, but keeps the xlog last used by the replica.
 _ = s:auto_increment{}
 ---
 ...
@@ -283,11 +306,15 @@ box.snapshot()
 ---
 - ok
 ...
-#box.info.gc().checkpoints == 2 or box.info.gc()
+#box.info.gc().checkpoints == 1 or box.info.gc()
 ---
 - true
 ...
--- The checkpoint should only be deleted after the replica
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
+---
+- true
+...
+-- The xlog should only be deleted after the replica
 -- is unregistered.
 test_run:cleanup_cluster()
 ---
@@ -296,6 +323,10 @@ test_run:cleanup_cluster()
 ---
 - true
 ...
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
+---
+- true
+...
 --
 -- Test that concurrent invocation of the garbage collector works fine.
 --
diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua
index 3a680075e..2b9ab0cf0 100644
--- a/test/replication/gc.test.lua
+++ b/test/replication/gc.test.lua
@@ -1,3 +1,4 @@
+fio = require 'fio'
 test_run = require('test_run').new()
 engine = test_run:get_cfg('engine')
 replica_set = require('fast_replica')
@@ -60,7 +61,7 @@ test_run:cmd("switch default")
 -- the replica released the corresponding checkpoint.
 wait_gc(1)
 #box.info.gc().checkpoints == 1 or box.info.gc()
-
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
 -- Make sure the replica will receive data it is subscribed
 -- to long enough for us to invoke garbage collection.
 box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05)
@@ -71,8 +72,8 @@ for i = 1, 100 do s:auto_increment{} end
 -- Invoke garbage collection. Check that it doesn't remove
 -- xlogs needed by the replica.
 box.snapshot()
-#box.info.gc().checkpoints == 2 or box.info.gc()
-
+#box.info.gc().checkpoints == 1 or box.info.gc()
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
 -- Remove the timeout injection so that the replica catches
 -- up quickly.
 box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0)
@@ -87,7 +88,7 @@ test_run:cmd("switch default")
 -- from the old checkpoint.
 wait_gc(1)
 #box.info.gc().checkpoints == 1 or box.info.gc()
-
+#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master')
 --
 -- Check that the master doesn't delete xlog files sent to the
 -- replica until it receives a confirmation that the data has
@@ -103,9 +104,10 @@ box.snapshot() -- rotate xlog
 for i = 1, 5 do s:auto_increment{} end
 fiber.sleep(0.1) -- wait for master to relay data
 -- Garbage collection must not delete the old xlog file
--- (and the corresponding snapshot), because it is still
--- needed by the replica.
-#box.info.gc().checkpoints == 2 or box.info.gc()
+-- because it is still needed by the replica, but remove
+-- the old snapshot.
+#box.info.gc().checkpoints == 1 or box.info.gc()
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
 test_run:cmd("switch replica")
 -- Unblock the replica and make it fail to apply a row.
 box.info.replication[1].upstream.message == nil
@@ -125,22 +127,23 @@ test_run:cmd("switch default")
 -- Now it's safe to drop the old xlog.
 wait_gc(1)
 #box.info.gc().checkpoints == 1 or box.info.gc()
-
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
 -- Stop the replica.
 test_run:cmd("stop server replica")
 test_run:cmd("cleanup server replica")
 
--- Invoke garbage collection. Check that it doesn't remove
--- the checkpoint last used by the replica.
+-- Invoke garbage collection. Check that it removes the old
+-- checkpoint, but keeps the xlog last used by the replica.
 _ = s:auto_increment{}
 box.snapshot()
-#box.info.gc().checkpoints == 2 or box.info.gc()
+#box.info.gc().checkpoints == 1 or box.info.gc()
+#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
 
--- The checkpoint should only be deleted after the replica
+-- The xlog should only be deleted after the replica
 -- is unregistered.
 test_run:cleanup_cluster()
 #box.info.gc().checkpoints == 1 or box.info.gc()
-
+#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
 --
 -- Test that concurrent invocation of the garbage collector works fine.
 --
-- 
2.15.2 (Apple Git-101.1)

             reply	other threads:[~2018-06-27 14:08 UTC|newest]

Thread overview: 2+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-06-27 14:08 Serge Petrenko [this message]
2018-06-28  8:58 ` Vladimir Davydov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20180627140822.46368-1-sergepetrenko@tarantool.org \
    --to=sergepetrenko@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [tarantool-patches] [PATCH] replication: remove old snapshot files not needed by replicas' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox