From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 90721210C8 for ; Thu, 28 Jun 2018 11:41:37 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id MEeKP20siCVI for ; Thu, 28 Jun 2018 11:41:37 -0400 (EDT) Received: from smtp39.i.mail.ru (smtp39.i.mail.ru [94.100.177.99]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 2152C210AB for ; Thu, 28 Jun 2018 11:41:37 -0400 (EDT) From: Serge Petrenko Subject: [tarantool-patches] [PATCH v3] replication: remove old snapshot files not needed by replicas Date: Thu, 28 Jun 2018 18:41:20 +0300 Message-Id: <20180628154120.87155-1-sergepetrenko@tarantool.org> Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: Serge Petrenko Garbage collection doesn't distinguish consumers which need checkpoint files, such as backup, and the ones, who only need WALS, such as replicas. A disconnected replica will 'hold' all checkpoint files, created after it got unsynchronised, even though it doesn't need them, which may lead to disk space shortage. To fix this, we store consumer's type, and treat consumers differently during garbage collection: now only the old WALS are stored for replicas, and old checkpoints are stored for backup, if any. Also changed the tests to check updated garbage collection correctly. Closes #3444 --- https://github.com/tarantool/tarantool/tree/sergepetrenko/gh-3444-remove-old-shapshots-for-replicas https://github.com/tarantool/tarantool/issues/3444 Changes in v3 - Fixed indentation and code style Changes in v2: - prefixed variable names with prefix 'checkpoint_' instead of 'snap_', so that there is no confusion with memtx snapshots - same with changing variable name xlog_only to wal_only - rewrote gc_run so that there is only a single loop over checkpoints, and also one excess old WAL is removed (it was unneeded, but kept due to a mistake). Now wal_collect_garbage or engine_collect_garbage are called only if they have work to do. - fix tests to correctly check the amount of xlogs kept by garbage collection src/box/box.cc | 4 +-- src/box/gc.c | 67 +++++++++++++++++++++++++++++----------- src/box/gc.h | 5 ++- src/box/relay.cc | 2 +- test/replication/gc.result | 73 ++++++++++++++++++++++++++++++++++++++------ test/replication/gc.test.lua | 41 +++++++++++++++++-------- 6 files changed, 148 insertions(+), 44 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index dcbf52e10..c5690e908 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1406,7 +1406,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header) /* Register the replica with the garbage collector. */ struct gc_consumer *gc = gc_consumer_register( tt_sprintf("replica %s", tt_uuid_str(&instance_uuid)), - vclock_sum(&start_vclock)); + vclock_sum(&start_vclock), true); if (gc == NULL) diag_raise(); auto gc_guard = make_scoped_guard([=]{ @@ -2082,7 +2082,7 @@ box_backup_start(int checkpoint_idx, box_backup_cb cb, void *cb_arg) return -1; } } while (checkpoint_idx-- > 0); - backup_gc = gc_consumer_register("backup", vclock_sum(vclock)); + backup_gc = gc_consumer_register("backup", vclock_sum(vclock), false); if (backup_gc == NULL) return -1; int rc = engine_backup(vclock, cb, cb_arg); diff --git a/src/box/gc.c b/src/box/gc.c index 12e68f3dc..3c9157da2 100644 --- a/src/box/gc.c +++ b/src/box/gc.c @@ -61,6 +61,8 @@ struct gc_consumer { char *name; /** The vclock signature tracked by this consumer. */ int64_t signature; + /** The flag indicating that consumer only consumes WAL files. */ + bool wal_only; }; typedef rb_tree(struct gc_consumer) gc_tree_t; @@ -69,8 +71,10 @@ typedef rb_tree(struct gc_consumer) gc_tree_t; struct gc_state { /** Number of checkpoints to maintain. */ int checkpoint_count; - /** Max signature garbage collection has been called for. */ - int64_t signature; + /** Max signature WAL garbage collection has been called for. */ + int64_t wal_signature; + /** Max signature checkpoint garbage collection has been called for. */ + int64_t checkpoint_signature; /** Registered consumers, linked by gc_consumer::node. */ gc_tree_t consumers; /** @@ -104,7 +108,7 @@ rb_gen(MAYBE_UNUSED static inline, gc_tree_, gc_tree_t, /** Allocate a consumer object. */ static struct gc_consumer * -gc_consumer_new(const char *name, int64_t signature) +gc_consumer_new(const char *name, int64_t signature, bool wal_only) { struct gc_consumer *consumer = calloc(1, sizeof(*consumer)); if (consumer == NULL) { @@ -120,6 +124,7 @@ gc_consumer_new(const char *name, int64_t signature) return NULL; } consumer->signature = signature; + consumer->wal_only = wal_only; return consumer; } @@ -135,7 +140,8 @@ gc_consumer_delete(struct gc_consumer *consumer) void gc_init(void) { - gc.signature = -1; + gc.wal_signature = -1; + gc.checkpoint_signature = -1; gc_tree_new(&gc.consumers); latch_create(&gc.latch); } @@ -155,21 +161,34 @@ gc_free(void) latch_destroy(&gc.latch); } +/** Find the consumer that uses the oldest checkpoint. */ +struct gc_consumer * +gc_tree_first_checkpoint(gc_tree_t *consumers) +{ + struct gc_consumer *consumer = gc_tree_first(consumers); + while (consumer != NULL && consumer->wal_only) + consumer = gc_tree_next(consumers, consumer); + return consumer; +} + void gc_run(void) { int checkpoint_count = gc.checkpoint_count; assert(checkpoint_count > 0); - /* Look up the consumer that uses the oldest snapshot. */ + /* Look up the consumer that uses the oldest WAL. */ struct gc_consumer *leftmost = gc_tree_first(&gc.consumers); + /* Look up the consumer that uses the oldest checkpoint. */ + struct gc_consumer *leftmost_checkpoint = + gc_tree_first_checkpoint(&gc.consumers); /* * Find the oldest checkpoint that must be preserved. - * We have to maintain @checkpoint_count oldest snapshots, - * plus we can't remove snapshots that are still in use. + * We have to maintain @checkpoint_count oldest checkpoints, + * plus we can't remove checkpoints that are still in use. */ - int64_t gc_signature = -1; + int64_t gc_checkpoint_signature = -1; struct checkpoint_iterator checkpoints; checkpoint_iterator_init(&checkpoints); @@ -178,17 +197,19 @@ gc_run(void) while ((vclock = checkpoint_iterator_prev(&checkpoints)) != NULL) { if (--checkpoint_count > 0) continue; - if (leftmost != NULL && - leftmost->signature < vclock_sum(vclock)) + if (leftmost_checkpoint != NULL && + leftmost_checkpoint->signature < vclock_sum(vclock)) continue; - gc_signature = vclock_sum(vclock); + gc_checkpoint_signature = vclock_sum(vclock); break; } - if (gc_signature <= gc.signature) - return; /* nothing to do */ + int64_t gc_wal_signature = MIN(gc_checkpoint_signature, leftmost != NULL ? + leftmost->signature : INT64_MAX); - gc.signature = gc_signature; + if (gc_checkpoint_signature <= gc.checkpoint_signature && + gc_wal_signature <= gc.wal_signature) + return; /* nothing to do */ /* * Engine callbacks may sleep, because they use coio for @@ -197,6 +218,7 @@ gc_run(void) * executions. */ latch_lock(&gc.latch); + /* * Run garbage collection. * @@ -204,8 +226,17 @@ gc_run(void) * collection for memtx snapshots first and abort if it * fails - see comment to memtx_engine_collect_garbage(). */ - if (engine_collect_garbage(gc_signature) == 0) - wal_collect_garbage(gc_signature); + int rc = 0; + + if (gc_checkpoint_signature > gc.checkpoint_signature) { + gc.checkpoint_signature = gc_checkpoint_signature; + rc = engine_collect_garbage(gc_checkpoint_signature); + } + if (gc_wal_signature > gc.wal_signature) { + gc.wal_signature = gc_wal_signature; + if (rc == 0) + wal_collect_garbage(gc_wal_signature); + } latch_unlock(&gc.latch); } @@ -217,9 +248,9 @@ gc_set_checkpoint_count(int checkpoint_count) } struct gc_consumer * -gc_consumer_register(const char *name, int64_t signature) +gc_consumer_register(const char *name, int64_t signature, bool wal_only) { - struct gc_consumer *consumer = gc_consumer_new(name, signature); + struct gc_consumer *consumer = gc_consumer_new(name, signature, wal_only); if (consumer != NULL) gc_tree_insert(&gc.consumers, consumer); return consumer; diff --git a/src/box/gc.h b/src/box/gc.h index 634ce6d38..36edd7740 100644 --- a/src/box/gc.h +++ b/src/box/gc.h @@ -33,6 +33,7 @@ #include #include +#include #if defined(__cplusplus) extern "C" { @@ -74,12 +75,14 @@ gc_set_checkpoint_count(int checkpoint_count); * @signature until the consumer is unregistered or advanced. * @name is a human-readable name of the consumer, it will * be used for reporting the consumer to the user. + * @wal_only is a flag reporting whether consumer only depends + * on WAL files. * * Returns a pointer to the new consumer object or NULL on * memory allocation failure. */ struct gc_consumer * -gc_consumer_register(const char *name, int64_t signature); +gc_consumer_register(const char *name, int64_t signature, bool wal_only); /** * Unregister a consumer and invoke garbage collection diff --git a/src/box/relay.cc b/src/box/relay.cc index a25cc540b..9567519d0 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -578,7 +578,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, if (replica->gc == NULL) { replica->gc = gc_consumer_register( tt_sprintf("replica %s", tt_uuid_str(&replica->uuid)), - vclock_sum(replica_clock)); + vclock_sum(replica_clock), true); if (replica->gc == NULL) diag_raise(); } diff --git a/test/replication/gc.result b/test/replication/gc.result index 7d6644ae6..084530e8a 100644 --- a/test/replication/gc.result +++ b/test/replication/gc.result @@ -1,3 +1,6 @@ +fio = require 'fio' +--- +... test_run = require('test_run').new() --- ... @@ -115,6 +118,10 @@ wait_gc(1) --- - true ... +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') +--- +- true +... -- Make sure the replica will receive data it is subscribed -- to long enough for us to invoke garbage collection. box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05) @@ -122,16 +129,34 @@ box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05) - ok ... -- Send more data to the replica. -for i = 1, 100 do s:auto_increment{} end +-- Need to do 2 snapshots here, otherwise the replica would +-- only require 1 xlog and that case would be +-- undistingvishable from wrong operation. +for i = 1, 50 do s:auto_increment{} end +--- +... +box.snapshot() +--- +- ok +... +for i = 1, 50 do s:auto_increment{} end --- ... +box.snapshot() +--- +- ok +... -- Invoke garbage collection. Check that it doesn't remove -- xlogs needed by the replica. box.snapshot() --- - ok ... -#box.info.gc().checkpoints == 2 or box.info.gc() +#box.info.gc().checkpoints == 1 or box.info.gc() +--- +- true +... +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') --- - true ... @@ -166,6 +191,10 @@ wait_gc(1) --- - true ... +#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master') +--- +- true +... -- -- Check that the master doesn't delete xlog files sent to the -- replica until it receives a confirmation that the data has @@ -199,9 +228,13 @@ fiber.sleep(0.1) -- wait for master to relay data --- ... -- Garbage collection must not delete the old xlog file --- (and the corresponding snapshot), because it is still --- needed by the replica. -#box.info.gc().checkpoints == 2 or box.info.gc() +-- because it is still needed by the replica, but remove +-- the old snapshot. +#box.info.gc().checkpoints == 1 or box.info.gc() +--- +- true +... +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') --- - true ... @@ -265,6 +298,10 @@ wait_gc(1) --- - true ... +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') +--- +- true +... -- Stop the replica. test_run:cmd("stop server replica") --- @@ -274,8 +311,18 @@ test_run:cmd("cleanup server replica") --- - true ... --- Invoke garbage collection. Check that it doesn't remove --- the checkpoint last used by the replica. +-- Invoke garbage collection. Check that it removes the old +-- checkpoint, but keeps the xlog last used by the replica. +-- once again, need 2 snapshots because after 1 snapshot +-- with no insertions after it the replica would need only +-- 1 xlog, which is stored anyways. +_ = s:auto_increment{} +--- +... +box.snapshot() +--- +- ok +... _ = s:auto_increment{} --- ... @@ -283,11 +330,15 @@ box.snapshot() --- - ok ... -#box.info.gc().checkpoints == 2 or box.info.gc() +#box.info.gc().checkpoints == 1 or box.info.gc() --- - true ... --- The checkpoint should only be deleted after the replica +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') +--- +- true +... +-- The xlog should only be deleted after the replica -- is unregistered. test_run:cleanup_cluster() --- @@ -296,6 +347,10 @@ test_run:cleanup_cluster() --- - true ... +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') +--- +- true +... -- -- Test that concurrent invocation of the garbage collector works fine. -- diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua index 3a680075e..710c99ea7 100644 --- a/test/replication/gc.test.lua +++ b/test/replication/gc.test.lua @@ -1,3 +1,4 @@ +fio = require 'fio' test_run = require('test_run').new() engine = test_run:get_cfg('engine') replica_set = require('fast_replica') @@ -60,18 +61,25 @@ test_run:cmd("switch default") -- the replica released the corresponding checkpoint. wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() - +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') -- Make sure the replica will receive data it is subscribed -- to long enough for us to invoke garbage collection. box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05) -- Send more data to the replica. -for i = 1, 100 do s:auto_increment{} end +-- Need to do 2 snapshots here, otherwise the replica would +-- only require 1 xlog and that case would be +-- undistingvishable from wrong operation. +for i = 1, 50 do s:auto_increment{} end +box.snapshot() +for i = 1, 50 do s:auto_increment{} end +box.snapshot() -- Invoke garbage collection. Check that it doesn't remove -- xlogs needed by the replica. box.snapshot() -#box.info.gc().checkpoints == 2 or box.info.gc() +#box.info.gc().checkpoints == 1 or box.info.gc() +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') -- Remove the timeout injection so that the replica catches -- up quickly. @@ -87,7 +95,7 @@ test_run:cmd("switch default") -- from the old checkpoint. wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() - +#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master') -- -- Check that the master doesn't delete xlog files sent to the -- replica until it receives a confirmation that the data has @@ -103,9 +111,10 @@ box.snapshot() -- rotate xlog for i = 1, 5 do s:auto_increment{} end fiber.sleep(0.1) -- wait for master to relay data -- Garbage collection must not delete the old xlog file --- (and the corresponding snapshot), because it is still --- needed by the replica. -#box.info.gc().checkpoints == 2 or box.info.gc() +-- because it is still needed by the replica, but remove +-- the old snapshot. +#box.info.gc().checkpoints == 1 or box.info.gc() +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') test_run:cmd("switch replica") -- Unblock the replica and make it fail to apply a row. box.info.replication[1].upstream.message == nil @@ -125,22 +134,28 @@ test_run:cmd("switch default") -- Now it's safe to drop the old xlog. wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() - +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') -- Stop the replica. test_run:cmd("stop server replica") test_run:cmd("cleanup server replica") --- Invoke garbage collection. Check that it doesn't remove --- the checkpoint last used by the replica. +-- Invoke garbage collection. Check that it removes the old +-- checkpoint, but keeps the xlog last used by the replica. +-- once again, need 2 snapshots because after 1 snapshot +-- with no insertions after it the replica would need only +-- 1 xlog, which is stored anyways. +_ = s:auto_increment{} +box.snapshot() _ = s:auto_increment{} box.snapshot() -#box.info.gc().checkpoints == 2 or box.info.gc() +#box.info.gc().checkpoints == 1 or box.info.gc() +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') --- The checkpoint should only be deleted after the replica +-- The xlog should only be deleted after the replica -- is unregistered. test_run:cleanup_cluster() #box.info.gc().checkpoints == 1 or box.info.gc() - +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') -- -- Test that concurrent invocation of the garbage collector works fine. -- -- 2.15.2 (Apple Git-101.1)