[tarantool-patches] [PATCH] replication: force gc to clean xdir on ENOSPC err

Konstantin Belyavskiy k.belyavskiy at tarantool.org
Sat Jun 9 17:05:10 MSK 2018


Garbage collector do not delete xlog unless replica do not notify
master with newer vclock. This can lead to running out of disk
space error and this is not right behaviour since it will stop the
master.

List of changes:
- Promoting error from wal_thread to tx via cpipe.
- Introduce states for gc_consumers (OK and OFF). GC does not
take into account consumers with state OFF.
- Add an error injection and a test.

Closes #3397
---
ticket: https://github.com/tarantool/tarantool/issues/3397
branch: https://github.com/tarantool/tarantool/tree/kbelyavs/gh-3397-force-del-logs-on-no-disk-space

 src/box/gc.c                                      |  52 ++++++++++-
 src/box/gc.h                                      |  13 +++
 src/box/wal.cc                                    |  56 +++++++++--
 src/errinj.h                                      |   1 +
 src/fio.c                                         |   7 ++
 test/box/errinj.result                            |   2 +
 test/replication/force_gc_on_err_nospace.result   | 107 ++++++++++++++++++++++
 test/replication/force_gc_on_err_nospace.test.lua |  38 ++++++++
 8 files changed, 265 insertions(+), 11 deletions(-)
 create mode 100644 test/replication/force_gc_on_err_nospace.result
 create mode 100644 test/replication/force_gc_on_err_nospace.test.lua

diff --git a/src/box/gc.c b/src/box/gc.c
index 12e68f3dc..efe033aaf 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -59,6 +59,8 @@ struct gc_consumer {
 	gc_node_t node;
 	/** Human-readable name. */
 	char *name;
+	/** Dead or alive. GC ignores dead consumers. */
+	enum gc_consumer_state state;
 	/** The vclock signature tracked by this consumer. */
 	int64_t signature;
 };
@@ -78,6 +80,8 @@ struct gc_state {
 	 * garbage collection callbacks.
 	 */
 	struct latch latch;
+	/** On ENOSPC error force gc to clean xdir. */
+	bool force_clean_flag;
 };
 static struct gc_state gc;
 
@@ -132,6 +136,16 @@ gc_consumer_delete(struct gc_consumer *consumer)
 	free(consumer);
 }
 
+static struct gc_consumer *
+gc_tree_first_alive(gc_tree_t *consumers)
+{
+	struct gc_consumer *consumer = gc_tree_first(consumers);
+	while (consumer != NULL && consumer->state != CONSUMER_OK)
+		consumer = gc_tree_next(&gc.consumers, consumer);
+	return consumer;
+}
+
+
 void
 gc_init(void)
 {
@@ -162,7 +176,7 @@ gc_run(void)
 	assert(checkpoint_count > 0);
 
 	/* Look up the consumer that uses the oldest snapshot. */
-	struct gc_consumer *leftmost = gc_tree_first(&gc.consumers);
+	struct gc_consumer *leftmost = gc_tree_first_alive(&gc.consumers);
 
 	/*
 	 * Find the oldest checkpoint that must be preserved.
@@ -216,6 +230,38 @@ gc_set_checkpoint_count(int checkpoint_count)
 	gc.checkpoint_count = checkpoint_count;
 }
 
+void
+gc_xdir_clean_notify(bool force_clean)
+{
+	if (force_clean == gc.force_clean_flag)
+		return; // nothing to do
+	else if (force_clean) { // xdir clean required
+		gc.force_clean_flag = true;
+		/**
+		 * Mark consumer with least recent vclock as "dead" and
+		 * invoke garbage collection. If nothing to delete find
+		 * next alive consumer etc. Originally created for
+		 * cases with running out of disk space because of
+		 * disconnected replica.
+		 */
+		struct gc_consumer *leftmost =
+		    gc_tree_first_alive(&gc.consumers);
+		if (leftmost == NULL)
+			return; // do nothing
+		int64_t signature = leftmost->signature;
+		while (true) {
+			leftmost->state = CONSUMER_OFF;
+			leftmost = gc_tree_first_alive(&gc.consumers);
+			if (leftmost == NULL ||
+			    leftmost->signature > signature) {
+				gc_run();
+				return;
+			}
+		}
+	} else
+		gc.force_clean_flag = false;
+}
+
 struct gc_consumer *
 gc_consumer_register(const char *name, int64_t signature)
 {
@@ -237,7 +283,7 @@ gc_consumer_unregister(struct gc_consumer *consumer)
 	 * Rerun garbage collection after removing the consumer
 	 * if it referenced the oldest vclock.
 	 */
-	struct gc_consumer *leftmost = gc_tree_first(&gc.consumers);
+	struct gc_consumer *leftmost = gc_tree_first_alive(&gc.consumers);
 	if (leftmost == NULL || leftmost->signature > signature)
 		gc_run();
 }
@@ -270,7 +316,7 @@ gc_consumer_advance(struct gc_consumer *consumer, int64_t signature)
 	 * Rerun garbage collection after advancing the consumer
 	 * if it referenced the oldest vclock.
 	 */
-	struct gc_consumer *leftmost = gc_tree_first(&gc.consumers);
+	struct gc_consumer *leftmost = gc_tree_first_alive(&gc.consumers);
 	if (leftmost == NULL || leftmost->signature > prev_signature)
 		gc_run();
 }
diff --git a/src/box/gc.h b/src/box/gc.h
index 634ce6d38..a91e78858 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -31,6 +31,7 @@
  * SUCH DAMAGE.
  */
 
+#include <stdbool.h>
 #include <stddef.h>
 #include <stdint.h>
 
@@ -40,6 +41,11 @@ extern "C" {
 
 struct gc_consumer;
 
+enum gc_consumer_state {
+	CONSUMER_OK = 0,
+	CONSUMER_OFF = 1,
+};
+
 /**
  * Initialize the garbage collection state.
  */
@@ -88,6 +94,13 @@ gc_consumer_register(const char *name, int64_t signature);
 void
 gc_consumer_unregister(struct gc_consumer *consumer);
 
+/**
+ * Notify gc to clean xdir because of running out
+ * of disk space.
+ */
+void
+gc_xdir_clean_notify(bool force_clean);
+
 /**
  * Advance the vclock signature tracked by a consumer and
  * invoke garbage collection if needed.
diff --git a/src/box/wal.cc b/src/box/wal.cc
index 099c70caa..9dd8347d5 100644
--- a/src/box/wal.cc
+++ b/src/box/wal.cc
@@ -41,11 +41,14 @@
 #include "cbus.h"
 #include "coio_task.h"
 #include "replication.h"
+#include "gc.h"
 
 
 const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
 
 int wal_dir_lock = -1;
+/** Variable to store previous disk write result. */
+bool err_nospace = false;
 
 static int64_t
 wal_write(struct journal *, struct journal_entry *);
@@ -59,6 +62,11 @@ struct wal_thread {
 	struct cord cord;
 	/** A pipe from 'tx' thread to 'wal' */
 	struct cpipe wal_pipe;
+	/**
+	 * Return pipe from 'wal' to tx'. This is a
+	 * priority pipe and DOES NOT support yield.
+	 */
+	struct cpipe tx_prio_pipe;
 	/** Return pipe from 'wal' to tx' */
 	struct cpipe tx_pipe;
 };
@@ -129,6 +137,10 @@ struct wal_msg: public cmsg {
 	struct stailq rollback;
 };
 
+struct gc_msg: public cmsg {
+	bool io_err;
+};
+
 /**
  * Vinyl metadata log writer.
  */
@@ -154,7 +166,7 @@ static void
 tx_schedule_commit(struct cmsg *msg);
 
 static struct cmsg_hop wal_request_route[] = {
-	{wal_write_to_disk, &wal_thread.tx_pipe},
+	{wal_write_to_disk, &wal_thread.tx_prio_pipe},
 	{tx_schedule_commit, NULL},
 };
 
@@ -414,7 +426,7 @@ wal_checkpoint(struct vclock *vclock, bool rotate)
 		return 0;
 	}
 	static struct cmsg_hop wal_checkpoint_route[] = {
-		{wal_checkpoint_f, &wal_thread.tx_pipe},
+		{wal_checkpoint_f, &wal_thread.tx_prio_pipe},
 		{wal_checkpoint_done_f, NULL},
 	};
 	vclock_create(vclock);
@@ -453,7 +465,7 @@ wal_collect_garbage(int64_t lsn)
 	struct wal_gc_msg msg;
 	msg.lsn = lsn;
 	bool cancellable = fiber_set_cancellable(false);
-	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_pipe, &msg,
+	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg,
 		  wal_collect_garbage_f, NULL, TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
 }
@@ -544,7 +556,7 @@ wal_writer_begin_rollback(struct wal_writer *writer)
 		 * list.
 		 */
 		{ wal_writer_clear_bus, &wal_thread.wal_pipe },
-		{ wal_writer_clear_bus, &wal_thread.tx_pipe },
+		{ wal_writer_clear_bus, &wal_thread.tx_prio_pipe },
 		/*
 		 * Step 2: writer->rollback queue contains all
 		 * messages which need to be rolled back,
@@ -562,7 +574,7 @@ wal_writer_begin_rollback(struct wal_writer *writer)
 	 * all input until rollback mode is off.
 	 */
 	cmsg_init(&writer->in_rollback, rollback_route);
-	cpipe_push(&wal_thread.tx_pipe, &writer->in_rollback);
+	cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback);
 }
 
 static void
@@ -581,6 +593,14 @@ wal_assign_lsn(struct wal_writer *writer, struct xrow_header **row,
 	}
 }
 
+static void
+gc_status_update(struct cmsg *msg)
+{
+	msg->route = NULL;
+	say_info("QQQ: gc_status_update");
+	gc_xdir_clean_notify(static_cast<gc_msg*>(msg)->io_err);
+}
+
 static void
 wal_write_to_disk(struct cmsg *msg)
 {
@@ -647,11 +667,29 @@ wal_write_to_disk(struct cmsg *msg)
 	last_committed = stailq_last(&wal_msg->commit);
 
 done:
+	bool need_send = false;
 	struct error *error = diag_last_error(diag_get());
 	if (error) {
 		/* Until we can pass the error to tx, log it and clear. */
 		error_log(error);
 		diag_clear(diag_get());
+		if (errno == ENOSPC) {
+			err_nospace = true;
+			need_send = true;
+		}
+	} else if (err_nospace) {
+		/** Clear flag and send message to gc. */
+		err_nospace = false;
+		need_send = true;
+	}
+	if (need_send) {
+		struct gc_msg msg;
+		static const struct cmsg_hop route[] = {
+			{gc_status_update, NULL}
+		};
+		cmsg_init(&msg, route);
+		msg.io_err = err_nospace;
+		cpipe_push(&wal_thread.tx_pipe, &msg);
 	}
 	/*
 	 * We need to start rollback from the first request
@@ -691,7 +729,8 @@ wal_thread_f(va_list ap)
 	 * endpoint, to ensure that WAL messages are delivered
 	 * even when tx fiber pool is used up by net messages.
 	 */
-	cpipe_create(&wal_thread.tx_pipe, "tx_prio");
+	cpipe_create(&wal_thread.tx_prio_pipe, "tx_prio");
+	cpipe_create(&wal_thread.tx_pipe, "tx");
 
 	cbus_loop(&endpoint);
 
@@ -703,6 +742,7 @@ wal_thread_f(va_list ap)
 	if (xlog_is_open(&vy_log_writer.xlog))
 		xlog_close(&vy_log_writer.xlog, false);
 
+	cpipe_destroy(&wal_thread.tx_prio_pipe);
 	cpipe_destroy(&wal_thread.tx_pipe);
 	return 0;
 }
@@ -843,7 +883,7 @@ wal_write_vy_log(struct journal_entry *entry)
 	struct wal_write_vy_log_msg msg;
 	msg.entry= entry;
 	bool cancellable = fiber_set_cancellable(false);
-	int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_pipe, &msg,
+	int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg,
 			   wal_write_vy_log_f, NULL, TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
 	return rc;
@@ -863,7 +903,7 @@ wal_rotate_vy_log()
 {
 	struct cbus_call_msg msg;
 	bool cancellable = fiber_set_cancellable(false);
-	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_pipe, &msg,
+	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg,
 		  wal_rotate_vy_log_f, NULL, TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
 }
diff --git a/src/errinj.h b/src/errinj.h
index ed69b6cb0..438095294 100644
--- a/src/errinj.h
+++ b/src/errinj.h
@@ -109,6 +109,7 @@ struct errinj {
 	_(ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT, ERRINJ_DOUBLE, {.dparam = 0}) \
 	_(ERRINJ_IPROTO_TX_DELAY, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_LOG_ROTATE, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_NO_DISK_SPACE, ERRINJ_BOOL, {.bparam = false}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/src/fio.c b/src/fio.c
index b79d3d058..cdea11e87 100644
--- a/src/fio.c
+++ b/src/fio.c
@@ -29,6 +29,7 @@
  * SUCH DAMAGE.
  */
 #include "fio.h"
+#include "errinj.h"
 
 #include <sys/types.h>
 
@@ -141,6 +142,12 @@ fio_writev(int fd, struct iovec *iov, int iovcnt)
 	ssize_t nwr;
 restart:
 	nwr = writev(fd, iov, iovcnt);
+	/* Simulate running out of disk space to force the gc to clean logs. */
+	struct errinj *inj = errinj(ERRINJ_NO_DISK_SPACE, ERRINJ_BOOL);
+	if (inj != NULL && inj->bparam) {
+		errno = ENOSPC;
+		nwr = -1;
+	}
 	if (nwr < 0) {
 		if (errno == EINTR) {
 			errno = 0;
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 9e3a0bfab..4d85156a9 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -54,6 +54,8 @@ errinj.info()
     state: false
   ERRINJ_VY_RUN_WRITE:
     state: false
+  ERRINJ_NO_DISK_SPACE:
+    state: false
   ERRINJ_RELAY_FINAL_SLEEP:
     state: false
   ERRINJ_VY_RUN_DISCARD:
diff --git a/test/replication/force_gc_on_err_nospace.result b/test/replication/force_gc_on_err_nospace.result
new file mode 100644
index 000000000..904a36ddc
--- /dev/null
+++ b/test/replication/force_gc_on_err_nospace.result
@@ -0,0 +1,107 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+engine = test_run:get_cfg('engine')
+---
+...
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+---
+...
+s = box.schema.space.create('test', {engine = engine});
+---
+...
+index = s:create_index('primary', {type = 'tree'})
+---
+...
+errinj = box.error.injection
+---
+...
+fio = require('fio')
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+repl = box.cfg.replication
+---
+...
+box.cfg{replication = ""}
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+for i = 1, 5 do s:insert{i} box.snapshot() end
+---
+...
+s:select()
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [5]
+...
+#fio.glob(fio.pathjoin(fio.abspath("."), 'master/*.xlog'))
+---
+- 6
+...
+errinj.set("ERRINJ_NO_DISK_SPACE", true)
+---
+- ok
+...
+function insert(a) s:insert(a) end
+---
+...
+_, err = pcall(insert, {6})
+---
+...
+err:match("ailed to write")
+---
+- ailed to write
+...
+-- add a little timeout so gc could finish job
+require('fiber').sleep(0.01)
+---
+...
+#fio.glob(fio.pathjoin(fio.abspath("."), 'master/*.xlog'))
+---
+- 2
+...
+-- cleanup
+test_run:cmd("switch replica")
+---
+- true
+...
+test_run:cmd("restart server default with cleanup=1")
+---
+- true
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
diff --git a/test/replication/force_gc_on_err_nospace.test.lua b/test/replication/force_gc_on_err_nospace.test.lua
new file mode 100644
index 000000000..fea4aa73b
--- /dev/null
+++ b/test/replication/force_gc_on_err_nospace.test.lua
@@ -0,0 +1,38 @@
+env = require('test_run')
+test_run = env.new()
+engine = test_run:get_cfg('engine')
+
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+
+s = box.schema.space.create('test', {engine = engine});
+index = s:create_index('primary', {type = 'tree'})
+
+errinj = box.error.injection
+fio = require('fio')
+
+box.schema.user.grant('guest', 'replication')
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+
+test_run:cmd("switch replica")
+repl = box.cfg.replication
+box.cfg{replication = ""}
+test_run:cmd("switch default")
+
+for i = 1, 5 do s:insert{i} box.snapshot() end
+s:select()
+#fio.glob(fio.pathjoin(fio.abspath("."), 'master/*.xlog'))
+errinj.set("ERRINJ_NO_DISK_SPACE", true)
+function insert(a) s:insert(a) end
+_, err = pcall(insert, {6})
+err:match("ailed to write")
+-- add a little timeout so gc could finish job
+require('fiber').sleep(0.01)
+#fio.glob(fio.pathjoin(fio.abspath("."), 'master/*.xlog'))
+
+-- cleanup
+test_run:cmd("switch replica")
+test_run:cmd("restart server default with cleanup=1")
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
-- 
2.14.3 (Apple Git-98)





More information about the Tarantool-patches mailing list