From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 4A1462042B for ; Wed, 27 Jun 2018 10:08:47 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id MAo5xdTaHy-O for ; Wed, 27 Jun 2018 10:08:47 -0400 (EDT) Received: from smtp51.i.mail.ru (smtp51.i.mail.ru [94.100.177.111]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 618F81FBA3 for ; Wed, 27 Jun 2018 10:08:46 -0400 (EDT) From: Serge Petrenko Subject: [tarantool-patches] [PATCH] replication: remove old snapshot files not needed by replicas Date: Wed, 27 Jun 2018 17:08:22 +0300 Message-Id: <20180627140822.46368-1-sergepetrenko@tarantool.org> Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: Serge Petrenko Closes #3444 branch: https://github.com/tarantool/tarantool/tree/sergepetrenko/gh-3444-remove-old-shapshots-for-replicas issue [3444]: https://github.com/tarantool/tarantool/issues/3444 --- src/box/box.cc | 4 +-- src/box/gc.c | 69 +++++++++++++++++++++++++++++++++++--------- src/box/gc.h | 9 +++++- src/box/relay.cc | 2 +- test/replication/gc.result | 47 +++++++++++++++++++++++++----- test/replication/gc.test.lua | 29 ++++++++++--------- 6 files changed, 122 insertions(+), 38 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index dcbf52e10..c5690e908 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1406,7 +1406,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header) /* Register the replica with the garbage collector. */ struct gc_consumer *gc = gc_consumer_register( tt_sprintf("replica %s", tt_uuid_str(&instance_uuid)), - vclock_sum(&start_vclock)); + vclock_sum(&start_vclock), true); if (gc == NULL) diag_raise(); auto gc_guard = make_scoped_guard([=]{ @@ -2082,7 +2082,7 @@ box_backup_start(int checkpoint_idx, box_backup_cb cb, void *cb_arg) return -1; } } while (checkpoint_idx-- > 0); - backup_gc = gc_consumer_register("backup", vclock_sum(vclock)); + backup_gc = gc_consumer_register("backup", vclock_sum(vclock), false); if (backup_gc == NULL) return -1; int rc = engine_backup(vclock, cb, cb_arg); diff --git a/src/box/gc.c b/src/box/gc.c index 12e68f3dc..288cc7236 100644 --- a/src/box/gc.c +++ b/src/box/gc.c @@ -61,6 +61,8 @@ struct gc_consumer { char *name; /** The vclock signature tracked by this consumer. */ int64_t signature; + /** The flag indicating that consumer only consumes xlog files. */ + bool xlog_only; }; typedef rb_tree(struct gc_consumer) gc_tree_t; @@ -69,8 +71,10 @@ typedef rb_tree(struct gc_consumer) gc_tree_t; struct gc_state { /** Number of checkpoints to maintain. */ int checkpoint_count; - /** Max signature garbage collection has been called for. */ - int64_t signature; + /** Max signature WAL garbage collection has been called for. */ + int64_t xlog_signature; + /** Max signature snapshot garbage collection has been called for. */ + int64_t snap_signature; /** Registered consumers, linked by gc_consumer::node. */ gc_tree_t consumers; /** @@ -104,7 +108,7 @@ rb_gen(MAYBE_UNUSED static inline, gc_tree_, gc_tree_t, /** Allocate a consumer object. */ static struct gc_consumer * -gc_consumer_new(const char *name, int64_t signature) +gc_consumer_new(const char *name, int64_t signature, bool xlog_only) { struct gc_consumer *consumer = calloc(1, sizeof(*consumer)); if (consumer == NULL) { @@ -120,6 +124,7 @@ gc_consumer_new(const char *name, int64_t signature) return NULL; } consumer->signature = signature; + consumer->xlog_only = xlog_only; return consumer; } @@ -135,7 +140,8 @@ gc_consumer_delete(struct gc_consumer *consumer) void gc_init(void) { - gc.signature = -1; + gc.xlog_signature = -1; + gc.snap_signature = -1; gc_tree_new(&gc.consumers); latch_create(&gc.latch); } @@ -155,21 +161,33 @@ gc_free(void) latch_destroy(&gc.latch); } +/** Find the consumer that uses the oldest snapshot */ +struct gc_consumer * +gc_first_snap(gc_tree_t *consumers) +{ + struct gc_consumer *consumer = gc_tree_first(consumers); + while (consumer != NULL && consumer->xlog_only) + consumer = gc_tree_next(consumers, consumer); + return consumer; +} + void gc_run(void) { int checkpoint_count = gc.checkpoint_count; assert(checkpoint_count > 0); - /* Look up the consumer that uses the oldest snapshot. */ + /* Look up the consumer that uses the oldest WAL */ struct gc_consumer *leftmost = gc_tree_first(&gc.consumers); + /* Look up the consumer that uses the oldest snapshot. */ + struct gc_consumer *leftmost_snap = gc_first_snap(&gc.consumers); /* * Find the oldest checkpoint that must be preserved. * We have to maintain @checkpoint_count oldest snapshots, * plus we can't remove snapshots that are still in use. */ - int64_t gc_signature = -1; + int64_t gc_xlog_signature = -1; struct checkpoint_iterator checkpoints; checkpoint_iterator_init(&checkpoints); @@ -181,14 +199,33 @@ gc_run(void) if (leftmost != NULL && leftmost->signature < vclock_sum(vclock)) continue; - gc_signature = vclock_sum(vclock); + gc_xlog_signature = vclock_sum(vclock); break; } - if (gc_signature <= gc.signature) + int64_t gc_snap_signature = -1; + checkpoint_count = gc.checkpoint_count; + + checkpoint_iterator_init(&checkpoints); + + while ((vclock = checkpoint_iterator_prev(&checkpoints)) != NULL) { + if (--checkpoint_count > 0) + continue; + if (leftmost_snap != NULL && + leftmost_snap->signature < vclock_sum(vclock)) + continue; + gc_snap_signature = vclock_sum(vclock); + break; + } + + if (gc_snap_signature <= gc.snap_signature && + gc_xlog_signature <= gc.xlog_signature) return; /* nothing to do */ - gc.signature = gc_signature; + if (gc_snap_signature > gc.snap_signature) + gc.snap_signature = gc_snap_signature; + if (gc_xlog_signature > gc.xlog_signature) + gc.xlog_signature = gc_xlog_signature; /* * Engine callbacks may sleep, because they use coio for @@ -204,8 +241,8 @@ gc_run(void) * collection for memtx snapshots first and abort if it * fails - see comment to memtx_engine_collect_garbage(). */ - if (engine_collect_garbage(gc_signature) == 0) - wal_collect_garbage(gc_signature); + if (engine_collect_garbage(gc_snap_signature) == 0) + wal_collect_garbage(gc_xlog_signature); latch_unlock(&gc.latch); } @@ -217,9 +254,9 @@ gc_set_checkpoint_count(int checkpoint_count) } struct gc_consumer * -gc_consumer_register(const char *name, int64_t signature) +gc_consumer_register(const char *name, int64_t signature, bool xlog_only) { - struct gc_consumer *consumer = gc_consumer_new(name, signature); + struct gc_consumer *consumer = gc_consumer_new(name, signature, xlog_only); if (consumer != NULL) gc_tree_insert(&gc.consumers, consumer); return consumer; @@ -287,6 +324,12 @@ gc_consumer_signature(const struct gc_consumer *consumer) return consumer->signature; } +bool +gc_consumer_xlog_only(const struct gc_consumer *consumer) +{ + return consumer->xlog_only; +} + struct gc_consumer * gc_consumer_iterator_next(struct gc_consumer_iterator *it) { diff --git a/src/box/gc.h b/src/box/gc.h index 634ce6d38..c9a1d6558 100644 --- a/src/box/gc.h +++ b/src/box/gc.h @@ -33,6 +33,7 @@ #include #include +#include #if defined(__cplusplus) extern "C" { @@ -74,12 +75,14 @@ gc_set_checkpoint_count(int checkpoint_count); * @signature until the consumer is unregistered or advanced. * @name is a human-readable name of the consumer, it will * be used for reporting the consumer to the user. + * @xlog_only is a flag reporting whether consumer only consumes + * xlog files. * * Returns a pointer to the new consumer object or NULL on * memory allocation failure. */ struct gc_consumer * -gc_consumer_register(const char *name, int64_t signature); +gc_consumer_register(const char *name, int64_t signature, bool xlog_only); /** * Unregister a consumer and invoke garbage collection @@ -103,6 +106,10 @@ gc_consumer_name(const struct gc_consumer *consumer); int64_t gc_consumer_signature(const struct gc_consumer *consumer); +/** Return whether consumer only consumes xlog files. */ +bool +gc_consumer_xlog_only(const struct gc_consumer *consumer); + /** * Iterator over registered consumers. The iterator is valid * as long as the caller doesn't yield. diff --git a/src/box/relay.cc b/src/box/relay.cc index a25cc540b..9567519d0 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -578,7 +578,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, if (replica->gc == NULL) { replica->gc = gc_consumer_register( tt_sprintf("replica %s", tt_uuid_str(&replica->uuid)), - vclock_sum(replica_clock)); + vclock_sum(replica_clock), true); if (replica->gc == NULL) diag_raise(); } diff --git a/test/replication/gc.result b/test/replication/gc.result index 7d6644ae6..adbe04ca2 100644 --- a/test/replication/gc.result +++ b/test/replication/gc.result @@ -1,3 +1,6 @@ +fio = require 'fio' +--- +... test_run = require('test_run').new() --- ... @@ -115,6 +118,10 @@ wait_gc(1) --- - true ... +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') +--- +- true +... -- Make sure the replica will receive data it is subscribed -- to long enough for us to invoke garbage collection. box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05) @@ -131,7 +138,11 @@ box.snapshot() --- - ok ... -#box.info.gc().checkpoints == 2 or box.info.gc() +#box.info.gc().checkpoints == 1 or box.info.gc() +--- +- true +... +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') --- - true ... @@ -166,6 +177,10 @@ wait_gc(1) --- - true ... +#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master') +--- +- true +... -- -- Check that the master doesn't delete xlog files sent to the -- replica until it receives a confirmation that the data has @@ -199,9 +214,13 @@ fiber.sleep(0.1) -- wait for master to relay data --- ... -- Garbage collection must not delete the old xlog file --- (and the corresponding snapshot), because it is still --- needed by the replica. -#box.info.gc().checkpoints == 2 or box.info.gc() +-- because it is still needed by the replica, but remove +-- the old snapshot. +#box.info.gc().checkpoints == 1 or box.info.gc() +--- +- true +... +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') --- - true ... @@ -265,6 +284,10 @@ wait_gc(1) --- - true ... +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') +--- +- true +... -- Stop the replica. test_run:cmd("stop server replica") --- @@ -274,8 +297,8 @@ test_run:cmd("cleanup server replica") --- - true ... --- Invoke garbage collection. Check that it doesn't remove --- the checkpoint last used by the replica. +-- Invoke garbage collection. Check that it removes the old +-- checkpoint, but keeps the xlog last used by the replica. _ = s:auto_increment{} --- ... @@ -283,11 +306,15 @@ box.snapshot() --- - ok ... -#box.info.gc().checkpoints == 2 or box.info.gc() +#box.info.gc().checkpoints == 1 or box.info.gc() --- - true ... --- The checkpoint should only be deleted after the replica +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') +--- +- true +... +-- The xlog should only be deleted after the replica -- is unregistered. test_run:cleanup_cluster() --- @@ -296,6 +323,10 @@ test_run:cleanup_cluster() --- - true ... +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') +--- +- true +... -- -- Test that concurrent invocation of the garbage collector works fine. -- diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua index 3a680075e..2b9ab0cf0 100644 --- a/test/replication/gc.test.lua +++ b/test/replication/gc.test.lua @@ -1,3 +1,4 @@ +fio = require 'fio' test_run = require('test_run').new() engine = test_run:get_cfg('engine') replica_set = require('fast_replica') @@ -60,7 +61,7 @@ test_run:cmd("switch default") -- the replica released the corresponding checkpoint. wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() - +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') -- Make sure the replica will receive data it is subscribed -- to long enough for us to invoke garbage collection. box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05) @@ -71,8 +72,8 @@ for i = 1, 100 do s:auto_increment{} end -- Invoke garbage collection. Check that it doesn't remove -- xlogs needed by the replica. box.snapshot() -#box.info.gc().checkpoints == 2 or box.info.gc() - +#box.info.gc().checkpoints == 1 or box.info.gc() +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') -- Remove the timeout injection so that the replica catches -- up quickly. box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0) @@ -87,7 +88,7 @@ test_run:cmd("switch default") -- from the old checkpoint. wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() - +#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master') -- -- Check that the master doesn't delete xlog files sent to the -- replica until it receives a confirmation that the data has @@ -103,9 +104,10 @@ box.snapshot() -- rotate xlog for i = 1, 5 do s:auto_increment{} end fiber.sleep(0.1) -- wait for master to relay data -- Garbage collection must not delete the old xlog file --- (and the corresponding snapshot), because it is still --- needed by the replica. -#box.info.gc().checkpoints == 2 or box.info.gc() +-- because it is still needed by the replica, but remove +-- the old snapshot. +#box.info.gc().checkpoints == 1 or box.info.gc() +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') test_run:cmd("switch replica") -- Unblock the replica and make it fail to apply a row. box.info.replication[1].upstream.message == nil @@ -125,22 +127,23 @@ test_run:cmd("switch default") -- Now it's safe to drop the old xlog. wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() - +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') -- Stop the replica. test_run:cmd("stop server replica") test_run:cmd("cleanup server replica") --- Invoke garbage collection. Check that it doesn't remove --- the checkpoint last used by the replica. +-- Invoke garbage collection. Check that it removes the old +-- checkpoint, but keeps the xlog last used by the replica. _ = s:auto_increment{} box.snapshot() -#box.info.gc().checkpoints == 2 or box.info.gc() +#box.info.gc().checkpoints == 1 or box.info.gc() +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') --- The checkpoint should only be deleted after the replica +-- The xlog should only be deleted after the replica -- is unregistered. test_run:cleanup_cluster() #box.info.gc().checkpoints == 1 or box.info.gc() - +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') -- -- Test that concurrent invocation of the garbage collector works fine. -- -- 2.15.2 (Apple Git-101.1)