Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH] replication: force gc to clean xdir on ENOSPC err
@ 2018-06-06  9:27 Konstantin Belyavskiy
  2018-06-07  4:24 ` [tarantool-patches] " Georgy Kirichenko
  0 siblings, 1 reply; 4+ messages in thread
From: Konstantin Belyavskiy @ 2018-06-06  9:27 UTC (permalink / raw)
  To: georgy; +Cc: tarantool-patches

Garbage collector do not delete xlog unless replica do not notify
master with newer vclock. This can lead to running out of disk
space error and this is not right behaviour since it will stop the
master.

List of changes:
- Promoting error from wal_thread via extended journal entry.
- Introduce states for gc_consumers (OK and OFF). GC should not
take into account consumers with state OFF.
- Add an error injection and a test.

WARNING:
Several issues left:
1. This approach call gc if write failes because of ENOSPC. This
aborts last transaction. But initial proposal was to use fallocate
to do force clean in advance.
2. In case of multiple errors we could shoot down all appliers, so
some simple check (e.g. size of LSN gap could be used).

Proposal for #3397
---
ticket: https://github.com/tarantool/tarantool/issues/3397
branch: https://github.com/tarantool/tarantool/tree/kbelyavs/gh-3397-force-del-logs-on-no-disk-space
 src/box/box.cc                                    |   7 +-
 src/box/errcode.h                                 |   1 +
 src/box/gc.c                                      |  31 ++++++-
 src/box/gc.h                                      |  13 +++
 src/box/journal.h                                 |   2 +
 src/box/txn.c                                     |   5 +-
 src/box/wal.cc                                    |   7 +-
 src/errinj.h                                      |   1 +
 src/fio.c                                         |   7 ++
 test/box/errinj.result                            |   2 +
 test/box/misc.result                              |   1 +
 test/replication/force_gc_on_err_nospace.result   | 103 ++++++++++++++++++++++
 test/replication/force_gc_on_err_nospace.test.lua |  36 ++++++++
 13 files changed, 210 insertions(+), 6 deletions(-)
 create mode 100644 test/replication/force_gc_on_err_nospace.result
 create mode 100644 test/replication/force_gc_on_err_nospace.test.lua

diff --git a/src/box/box.cc b/src/box/box.cc
index e3eb2738f..0d25fb502 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -180,8 +180,13 @@ process_rw(struct request *request, struct space *space, struct tuple **result)
 	 * when WAL is written in autocommit mode.
 	 */
 	TupleRefNil ref(tuple);
-	if (txn_commit_stmt(txn, request) != 0)
+	if (txn_commit_stmt(txn, request) != 0) {
+		struct error *e = diag_last_error(diag_get());
+		if (e != NULL && e->type == &type_ClientError &&
+		    box_error_code(e) == ER_NO_DISK_SPACE)
+			gc_leftmost_mark_dead();
 		return -1;
+	}
 	if (result != NULL) {
 		if (tuple != NULL && tuple_bless(tuple) == NULL)
 			return -1;
diff --git a/src/box/errcode.h b/src/box/errcode.h
index a0759f8f4..a5cf99825 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -206,6 +206,7 @@ struct errcode_record {
 	/*151 */_(ER_WRONG_COLLATION_OPTIONS,	"Wrong collation options (field %u): %s") \
 	/*152 */_(ER_NULLABLE_PRIMARY,		"Primary index of the space '%s' can not contain nullable parts") \
 	/*153 */_(ER_NULLABLE_MISMATCH,		"Field %d is %s in space format, but %s in index parts") \
+	/*154 */_(ER_NO_DISK_SPACE,		"Failed to write to disk: no space left") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/gc.c b/src/box/gc.c
index 12e68f3dc..8f3dfa368 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -59,6 +59,8 @@ struct gc_consumer {
 	gc_node_t node;
 	/** Human-readable name. */
 	char *name;
+	/** status, can be marked as dead if replica is not response */
+	enum gc_consumer_state state;
 	/** The vclock signature tracked by this consumer. */
 	int64_t signature;
 };
@@ -132,6 +134,15 @@ gc_consumer_delete(struct gc_consumer *consumer)
 	free(consumer);
 }
 
+static struct gc_consumer *
+gc_tree_first_alive(gc_tree_t *consumers)
+{
+	struct gc_consumer *consumer = gc_tree_first(consumers);
+	while (consumer != NULL && consumer->state != CONSUMER_OK)
+		consumer = gc_tree_next(&gc.consumers, consumer);
+	return consumer;
+}
+
 void
 gc_init(void)
 {
@@ -162,7 +173,7 @@ gc_run(void)
 	assert(checkpoint_count > 0);
 
 	/* Look up the consumer that uses the oldest snapshot. */
-	struct gc_consumer *leftmost = gc_tree_first(&gc.consumers);
+	struct gc_consumer *leftmost = gc_tree_first_alive(&gc.consumers);
 
 	/*
 	 * Find the oldest checkpoint that must be preserved.
@@ -237,9 +248,23 @@ gc_consumer_unregister(struct gc_consumer *consumer)
 	 * Rerun garbage collection after removing the consumer
 	 * if it referenced the oldest vclock.
 	 */
-	struct gc_consumer *leftmost = gc_tree_first(&gc.consumers);
+	struct gc_consumer *leftmost = gc_tree_first_alive(&gc.consumers);
+	if (leftmost == NULL || leftmost->signature > signature)
+		gc_run();
+}
+
+void
+gc_leftmost_mark_dead(void)
+{
+	struct gc_consumer *leftmost = gc_tree_first_alive(&gc.consumers);
+	if (leftmost == NULL)
+		return; // do nothing
+	leftmost->state = CONSUMER_OFF;
+	int64_t signature = leftmost->signature;
+	leftmost = gc_tree_first_alive(&gc.consumers);
 	if (leftmost == NULL || leftmost->signature > signature)
 		gc_run();
+
 }
 
 void
@@ -270,7 +295,7 @@ gc_consumer_advance(struct gc_consumer *consumer, int64_t signature)
 	 * Rerun garbage collection after advancing the consumer
 	 * if it referenced the oldest vclock.
 	 */
-	struct gc_consumer *leftmost = gc_tree_first(&gc.consumers);
+	struct gc_consumer *leftmost = gc_tree_first_alive(&gc.consumers);
 	if (leftmost == NULL || leftmost->signature > prev_signature)
 		gc_run();
 }
diff --git a/src/box/gc.h b/src/box/gc.h
index 634ce6d38..74aa7ce83 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -40,6 +40,11 @@ extern "C" {
 
 struct gc_consumer;
 
+enum gc_consumer_state {
+	CONSUMER_OK = 0,
+	CONSUMER_OFF = 1,
+};
+
 /**
  * Initialize the garbage collection state.
  */
@@ -88,6 +93,14 @@ gc_consumer_register(const char *name, int64_t signature);
 void
 gc_consumer_unregister(struct gc_consumer *consumer);
 
+/**
+ * Remove consumer with least recent vclock and invoke
+ * garbage collection. Originally created for cases with
+ * running out of disk space because of disconnected replica.
+ */
+void
+gc_leftmost_mark_dead(void);
+
 /**
  * Advance the vclock signature tracked by a consumer and
  * invoke garbage collection if needed.
diff --git a/src/box/journal.h b/src/box/journal.h
index 1d64a7bd1..e0ca240a2 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -62,6 +62,8 @@ struct journal_entry {
 	 * The number of rows in the request.
 	 */
 	int n_rows;
+	/** Contains last error if any */
+	int errcode;
 	/**
 	 * The rows.
 	 */
diff --git a/src/box/txn.c b/src/box/txn.c
index e25c0e0e0..fd54cfb4a 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -270,7 +270,10 @@ txn_write_to_wal(struct txn *txn)
 		 * pending rollbacks are processed.
 		 */
 		fiber_reschedule();
-		diag_set(ClientError, ER_WAL_IO);
+		struct error *e = diag_last_error(diag_get());
+		if (e == NULL || e->type != &type_ClientError ||
+		    box_error_code(e) != ER_NO_DISK_SPACE)
+			diag_set(ClientError, ER_WAL_IO);
 		diag_log();
 	} else if (stop - start > too_long_threshold) {
 		say_warn("too long WAL write: %d rows at LSN %lld: %.3f sec",
diff --git a/src/box/wal.cc b/src/box/wal.cc
index 099c70caa..647fbd6b1 100644
--- a/src/box/wal.cc
+++ b/src/box/wal.cc
@@ -34,6 +34,7 @@
 #include "fiber.h"
 #include "fio.h"
 #include "errinj.h"
+#include "error.h"
 
 #include "xlog.h"
 #include "xrow.h"
@@ -629,12 +630,13 @@ wal_write_to_disk(struct cmsg *msg)
 	/*
 	 * Iterate over requests (transactions)
 	 */
-	struct journal_entry *entry;
+	struct journal_entry *entry, *prev_entry = NULL;
 	struct stailq_entry *last_committed = NULL;
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
 		wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
 		entry->res = vclock_sum(&writer->vclock);
 		int rc = xlog_write_entry(l, entry);
+		prev_entry = entry;
 		if (rc < 0)
 			goto done;
 		if (rc > 0)
@@ -652,6 +654,7 @@ done:
 		/* Until we can pass the error to tx, log it and clear. */
 		error_log(error);
 		diag_clear(diag_get());
+		prev_entry->errcode = errno;
 	}
 	/*
 	 * We need to start rollback from the first request
@@ -788,6 +791,8 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 			--last;
 		}
 	}
+	if (entry->errcode == ENOSPC)
+		diag_set(ClientError, ER_NO_DISK_SPACE);
 	return entry->res;
 }
 
diff --git a/src/errinj.h b/src/errinj.h
index ed69b6cb0..438095294 100644
--- a/src/errinj.h
+++ b/src/errinj.h
@@ -109,6 +109,7 @@ struct errinj {
 	_(ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT, ERRINJ_DOUBLE, {.dparam = 0}) \
 	_(ERRINJ_IPROTO_TX_DELAY, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_LOG_ROTATE, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_NO_DISK_SPACE, ERRINJ_BOOL, {.bparam = false}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/src/fio.c b/src/fio.c
index b79d3d058..cdea11e87 100644
--- a/src/fio.c
+++ b/src/fio.c
@@ -29,6 +29,7 @@
  * SUCH DAMAGE.
  */
 #include "fio.h"
+#include "errinj.h"
 
 #include <sys/types.h>
 
@@ -141,6 +142,12 @@ fio_writev(int fd, struct iovec *iov, int iovcnt)
 	ssize_t nwr;
 restart:
 	nwr = writev(fd, iov, iovcnt);
+	/* Simulate running out of disk space to force the gc to clean logs. */
+	struct errinj *inj = errinj(ERRINJ_NO_DISK_SPACE, ERRINJ_BOOL);
+	if (inj != NULL && inj->bparam) {
+		errno = ENOSPC;
+		nwr = -1;
+	}
 	if (nwr < 0) {
 		if (errno == EINTR) {
 			errno = 0;
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 9e3a0bfab..4d85156a9 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -54,6 +54,8 @@ errinj.info()
     state: false
   ERRINJ_VY_RUN_WRITE:
     state: false
+  ERRINJ_NO_DISK_SPACE:
+    state: false
   ERRINJ_RELAY_FINAL_SLEEP:
     state: false
   ERRINJ_VY_RUN_DISCARD:
diff --git a/test/box/misc.result b/test/box/misc.result
index cd3af7f9e..0253a0cde 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -353,6 +353,7 @@ t;
   - 'box.error.NONMASTER : 6'
   - 'box.error.MEMTX_MAX_TUPLE_SIZE : 110'
   - 'box.error.DROP_FUNCTION : 71'
+  - 'box.error.NO_DISK_SPACE : 154'
   - 'box.error.CFG : 59'
   - 'box.error.NO_SUCH_FIELD : 37'
   - 'box.error.CONNECTION_TO_SELF : 117'
diff --git a/test/replication/force_gc_on_err_nospace.result b/test/replication/force_gc_on_err_nospace.result
new file mode 100644
index 000000000..ddddd55fa
--- /dev/null
+++ b/test/replication/force_gc_on_err_nospace.result
@@ -0,0 +1,103 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+engine = test_run:get_cfg('engine')
+---
+...
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+---
+...
+s = box.schema.space.create('test', {engine = engine});
+---
+...
+index = s:create_index('primary', {type = 'tree'})
+---
+...
+errinj = box.error.injection
+---
+...
+fio = require('fio')
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+repl = box.cfg.replication
+---
+...
+box.cfg{replication = ""}
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+for i = 1, 5 do s:insert{i} box.snapshot() end
+---
+...
+s:select()
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [5]
+...
+#fio.glob(fio.pathjoin(fio.abspath("."), 'master/*.xlog'))
+---
+- 6
+...
+errinj.set("ERRINJ_NO_DISK_SPACE", true)
+---
+- ok
+...
+function insert(a) s:insert(a) end
+---
+...
+_, err = pcall(insert, {6})
+---
+...
+err:match("ailed to write")
+---
+- ailed to write
+...
+#fio.glob(fio.pathjoin(fio.abspath("."), 'master/*.xlog'))
+---
+- 2
+...
+-- cleanup
+test_run:cmd("switch replica")
+---
+- true
+...
+test_run:cmd("restart server default with cleanup=1")
+---
+- true
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
diff --git a/test/replication/force_gc_on_err_nospace.test.lua b/test/replication/force_gc_on_err_nospace.test.lua
new file mode 100644
index 000000000..2e1e3e75d
--- /dev/null
+++ b/test/replication/force_gc_on_err_nospace.test.lua
@@ -0,0 +1,36 @@
+env = require('test_run')
+test_run = env.new()
+engine = test_run:get_cfg('engine')
+
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+
+s = box.schema.space.create('test', {engine = engine});
+index = s:create_index('primary', {type = 'tree'})
+
+errinj = box.error.injection
+fio = require('fio')
+
+box.schema.user.grant('guest', 'replication')
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+
+test_run:cmd("switch replica")
+repl = box.cfg.replication
+box.cfg{replication = ""}
+test_run:cmd("switch default")
+
+for i = 1, 5 do s:insert{i} box.snapshot() end
+s:select()
+#fio.glob(fio.pathjoin(fio.abspath("."), 'master/*.xlog'))
+errinj.set("ERRINJ_NO_DISK_SPACE", true)
+function insert(a) s:insert(a) end
+_, err = pcall(insert, {6})
+err:match("ailed to write")
+#fio.glob(fio.pathjoin(fio.abspath("."), 'master/*.xlog'))
+
+-- cleanup
+test_run:cmd("switch replica")
+test_run:cmd("restart server default with cleanup=1")
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
-- 
2.14.3 (Apple Git-98)

^ permalink raw reply	[flat|nested] 4+ messages in thread

* [tarantool-patches] Re: [PATCH] replication: force gc to clean xdir on ENOSPC err
  2018-06-06  9:27 [tarantool-patches] [PATCH] replication: force gc to clean xdir on ENOSPC err Konstantin Belyavskiy
@ 2018-06-07  4:24 ` Georgy Kirichenko
  0 siblings, 0 replies; 4+ messages in thread
From: Georgy Kirichenko @ 2018-06-07  4:24 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Konstantin Belyavskiy

[-- Attachment #1: Type: text/plain, Size: 1007 bytes --]

On Wednesday, June 6, 2018 12:27:52 PM MSK Konstantin Belyavskiy wrote:
> Garbage collector do not delete xlog unless replica do not notify
> master with newer vclock. This can lead to running out of disk
> space error and this is not right behaviour since it will stop the
> master.
> 

> @@ -180,8 +180,13 @@ process_rw(struct request *request, struct space
> *space, struct tuple **result) * when WAL is written in autocommit mode.
>  	 */
>  	TupleRefNil ref(tuple);
> -	if (txn_commit_stmt(txn, request) != 0)
> +	if (txn_commit_stmt(txn, request) != 0) {
1. Here you will mark as dead as many replicas as many in-flight commiting 
requests you have.
2. I think you should send some message from wal thread to advance gc. In that 
case you will be able to use fallocate without any tx thread changes.
> +		struct error *e = diag_last_error(diag_get());
> +		if (e != NULL && e->type == &type_ClientError &&
> +		    box_error_code(e) == ER_NO_DISK_SPACE)
> +			gc_leftmost_mark_dead();
>  		return -1;

[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [tarantool-patches] [PATCH] replication: force gc to clean xdir on ENOSPC err
  2018-06-09 14:05 [tarantool-patches] " Konstantin Belyavskiy
@ 2018-06-18 12:35 ` Vladimir Davydov
  0 siblings, 0 replies; 4+ messages in thread
From: Vladimir Davydov @ 2018-06-18 12:35 UTC (permalink / raw)
  To: Konstantin Belyavskiy; +Cc: georgy, tarantool-patches

On Sat, Jun 09, 2018 at 05:05:10PM +0300, Konstantin Belyavskiy wrote:
> Garbage collector do not delete xlog unless replica do not notify
> master with newer vclock. This can lead to running out of disk
> space error and this is not right behaviour since it will stop the
> master.

AFAIU this is the second iteration of this patch. If so, you should
have appended the version number to the subject prefix (i.e. PATCH v2)
according to our guideline:

  https://tarantool.io/en/doc/1.9/dev_guide/developer_guidelines.html#how-to-submit-a-patch-for-review

> 
> List of changes:
> - Promoting error from wal_thread to tx via cpipe.
> - Introduce states for gc_consumers (OK and OFF). GC does not
> take into account consumers with state OFF.
> - Add an error injection and a test.

The list of changes between different versions of the same patch should
go after the link to the branch and the ticket. For example see

  https://www.freelists.org/post/tarantool-patches/PATCH-v2-box-make-gc-info-public

> 
> Closes #3397
> ---
> ticket: https://github.com/tarantool/tarantool/issues/3397
> branch: https://github.com/tarantool/tarantool/tree/kbelyavs/gh-3397-force-del-logs-on-no-disk-space
> 
>  src/box/gc.c                                      |  52 ++++++++++-
>  src/box/gc.h                                      |  13 +++
>  src/box/wal.cc                                    |  56 +++++++++--
>  src/errinj.h                                      |   1 +
>  src/fio.c                                         |   7 ++
>  test/box/errinj.result                            |   2 +
>  test/replication/force_gc_on_err_nospace.result   | 107 ++++++++++++++++++++++
>  test/replication/force_gc_on_err_nospace.test.lua |  38 ++++++++
>  8 files changed, 265 insertions(+), 11 deletions(-)
>  create mode 100644 test/replication/force_gc_on_err_nospace.result
>  create mode 100644 test/replication/force_gc_on_err_nospace.test.lua
> 
> diff --git a/src/box/gc.c b/src/box/gc.c
> index 12e68f3dc..efe033aaf 100644
> --- a/src/box/gc.c
> +++ b/src/box/gc.c
> @@ -59,6 +59,8 @@ struct gc_consumer {
>  	gc_node_t node;
>  	/** Human-readable name. */
>  	char *name;
> +	/** Dead or alive. GC ignores dead consumers. */
> +	enum gc_consumer_state state;

Keeping dead consumers around is pointless. A consumer must be removed
from the tree as soon as it is evicted.

> +void
> +gc_xdir_clean_notify(bool force_clean)
> +{
> +	if (force_clean == gc.force_clean_flag)
> +		return; // nothing to do
> +	else if (force_clean) { // xdir clean required
> +		gc.force_clean_flag = true;

We don't use C++ style comments in the code.

> +		/**
> +		 * Mark consumer with least recent vclock as "dead" and
> +		 * invoke garbage collection. If nothing to delete find
> +		 * next alive consumer etc. Originally created for
> +		 * cases with running out of disk space because of
> +		 * disconnected replica.
> +		 */
> +		struct gc_consumer *leftmost =
> +		    gc_tree_first_alive(&gc.consumers);
> +		if (leftmost == NULL)
> +			return; // do nothing
> +		int64_t signature = leftmost->signature;
> +		while (true) {
> +			leftmost->state = CONSUMER_OFF;
> +			leftmost = gc_tree_first_alive(&gc.consumers);
> +			if (leftmost == NULL ||
> +			    leftmost->signature > signature) {
> +				gc_run();
> +				return;

A consumer may correspond to a backup procedure, in which case it must
not be evicted, even on ENOSPC.

> +			}
> +		}
> +	} else
> +		gc.force_clean_flag = false;
> +}

> diff --git a/src/box/wal.cc b/src/box/wal.cc
> index 099c70caa..9dd8347d5 100644
> --- a/src/box/wal.cc
> +++ b/src/box/wal.cc
> @@ -59,6 +62,11 @@ struct wal_thread {
>  	struct cord cord;
>  	/** A pipe from 'tx' thread to 'wal' */
>  	struct cpipe wal_pipe;
> +	/**
> +	 * Return pipe from 'wal' to tx'. This is a
> +	 * priority pipe and DOES NOT support yield.
> +	 */
> +	struct cpipe tx_prio_pipe;
>  	/** Return pipe from 'wal' to tx' */
>  	struct cpipe tx_pipe;
>  };
> @@ -154,7 +166,7 @@ static void
>  tx_schedule_commit(struct cmsg *msg);
>  
>  static struct cmsg_hop wal_request_route[] = {
> -	{wal_write_to_disk, &wal_thread.tx_pipe},
> +	{wal_write_to_disk, &wal_thread.tx_prio_pipe},

AFAICS you rename wal_thread.tx_pipe to tx_prio_pipe and create a new
pipe called wal_thread.tx_pipe to be used for garbage collection, all in
one patch. This is very difficult to review. Provided you really need
it, renaming should be done in a separate patch with a good explanation
why you're doing what you're doing. Currently, I fail to see a reason.

> +static void
> +gc_status_update(struct cmsg *msg)
> +{
> +	msg->route = NULL;
> +	say_info("QQQ: gc_status_update");

What is this?

> +	gc_xdir_clean_notify(static_cast<gc_msg*>(msg)->io_err);
> +}
> +
>  static void
>  wal_write_to_disk(struct cmsg *msg)
>  {
> @@ -647,11 +667,29 @@ wal_write_to_disk(struct cmsg *msg)
>  	last_committed = stailq_last(&wal_msg->commit);
>  
>  done:
> +	bool need_send = false;
>  	struct error *error = diag_last_error(diag_get());
>  	if (error) {
>  		/* Until we can pass the error to tx, log it and clear. */
>  		error_log(error);
>  		diag_clear(diag_get());
> +		if (errno == ENOSPC) {
> +			err_nospace = true;
> +			need_send = true;
> +		}
> +	} else if (err_nospace) {
> +		/** Clear flag and send message to gc. */
> +		err_nospace = false;
> +		need_send = true;
> +	}
> +	if (need_send) {

This joggling with flags looks confusing. What are you trying to achieve
here? Avoid invoking garbage collector twice on ENOSPC error? At the
very least, this code should be accompanied by a comment.

Also, AFAICS you fail a transaction that hit ENOSPC and triggered
garbage collection. I don't think it's good. I see two ways of
rectifying this: either retry WAL write after invoking garbage
collection or use fallocate to reserve some space beyond the end of
the file before writing to WAL.

> +		struct gc_msg msg;
> +		static const struct cmsg_hop route[] = {
> +			{gc_status_update, NULL}
> +		};
> +		cmsg_init(&msg, route);
> +		msg.io_err = err_nospace;
> +		cpipe_push(&wal_thread.tx_pipe, &msg);
>  	}

Pushing a message defined on stack to a pipe and not waiting for it to
be delivered doesn't look right.

^ permalink raw reply	[flat|nested] 4+ messages in thread

* [tarantool-patches] [PATCH] replication: force gc to clean xdir on ENOSPC err
@ 2018-06-09 14:05 Konstantin Belyavskiy
  2018-06-18 12:35 ` Vladimir Davydov
  0 siblings, 1 reply; 4+ messages in thread
From: Konstantin Belyavskiy @ 2018-06-09 14:05 UTC (permalink / raw)
  To: georgy; +Cc: tarantool-patches

Garbage collector do not delete xlog unless replica do not notify
master with newer vclock. This can lead to running out of disk
space error and this is not right behaviour since it will stop the
master.

List of changes:
- Promoting error from wal_thread to tx via cpipe.
- Introduce states for gc_consumers (OK and OFF). GC does not
take into account consumers with state OFF.
- Add an error injection and a test.

Closes #3397
---
ticket: https://github.com/tarantool/tarantool/issues/3397
branch: https://github.com/tarantool/tarantool/tree/kbelyavs/gh-3397-force-del-logs-on-no-disk-space

 src/box/gc.c                                      |  52 ++++++++++-
 src/box/gc.h                                      |  13 +++
 src/box/wal.cc                                    |  56 +++++++++--
 src/errinj.h                                      |   1 +
 src/fio.c                                         |   7 ++
 test/box/errinj.result                            |   2 +
 test/replication/force_gc_on_err_nospace.result   | 107 ++++++++++++++++++++++
 test/replication/force_gc_on_err_nospace.test.lua |  38 ++++++++
 8 files changed, 265 insertions(+), 11 deletions(-)
 create mode 100644 test/replication/force_gc_on_err_nospace.result
 create mode 100644 test/replication/force_gc_on_err_nospace.test.lua

diff --git a/src/box/gc.c b/src/box/gc.c
index 12e68f3dc..efe033aaf 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -59,6 +59,8 @@ struct gc_consumer {
 	gc_node_t node;
 	/** Human-readable name. */
 	char *name;
+	/** Dead or alive. GC ignores dead consumers. */
+	enum gc_consumer_state state;
 	/** The vclock signature tracked by this consumer. */
 	int64_t signature;
 };
@@ -78,6 +80,8 @@ struct gc_state {
 	 * garbage collection callbacks.
 	 */
 	struct latch latch;
+	/** On ENOSPC error force gc to clean xdir. */
+	bool force_clean_flag;
 };
 static struct gc_state gc;
 
@@ -132,6 +136,16 @@ gc_consumer_delete(struct gc_consumer *consumer)
 	free(consumer);
 }
 
+static struct gc_consumer *
+gc_tree_first_alive(gc_tree_t *consumers)
+{
+	struct gc_consumer *consumer = gc_tree_first(consumers);
+	while (consumer != NULL && consumer->state != CONSUMER_OK)
+		consumer = gc_tree_next(&gc.consumers, consumer);
+	return consumer;
+}
+
+
 void
 gc_init(void)
 {
@@ -162,7 +176,7 @@ gc_run(void)
 	assert(checkpoint_count > 0);
 
 	/* Look up the consumer that uses the oldest snapshot. */
-	struct gc_consumer *leftmost = gc_tree_first(&gc.consumers);
+	struct gc_consumer *leftmost = gc_tree_first_alive(&gc.consumers);
 
 	/*
 	 * Find the oldest checkpoint that must be preserved.
@@ -216,6 +230,38 @@ gc_set_checkpoint_count(int checkpoint_count)
 	gc.checkpoint_count = checkpoint_count;
 }
 
+void
+gc_xdir_clean_notify(bool force_clean)
+{
+	if (force_clean == gc.force_clean_flag)
+		return; // nothing to do
+	else if (force_clean) { // xdir clean required
+		gc.force_clean_flag = true;
+		/**
+		 * Mark consumer with least recent vclock as "dead" and
+		 * invoke garbage collection. If nothing to delete find
+		 * next alive consumer etc. Originally created for
+		 * cases with running out of disk space because of
+		 * disconnected replica.
+		 */
+		struct gc_consumer *leftmost =
+		    gc_tree_first_alive(&gc.consumers);
+		if (leftmost == NULL)
+			return; // do nothing
+		int64_t signature = leftmost->signature;
+		while (true) {
+			leftmost->state = CONSUMER_OFF;
+			leftmost = gc_tree_first_alive(&gc.consumers);
+			if (leftmost == NULL ||
+			    leftmost->signature > signature) {
+				gc_run();
+				return;
+			}
+		}
+	} else
+		gc.force_clean_flag = false;
+}
+
 struct gc_consumer *
 gc_consumer_register(const char *name, int64_t signature)
 {
@@ -237,7 +283,7 @@ gc_consumer_unregister(struct gc_consumer *consumer)
 	 * Rerun garbage collection after removing the consumer
 	 * if it referenced the oldest vclock.
 	 */
-	struct gc_consumer *leftmost = gc_tree_first(&gc.consumers);
+	struct gc_consumer *leftmost = gc_tree_first_alive(&gc.consumers);
 	if (leftmost == NULL || leftmost->signature > signature)
 		gc_run();
 }
@@ -270,7 +316,7 @@ gc_consumer_advance(struct gc_consumer *consumer, int64_t signature)
 	 * Rerun garbage collection after advancing the consumer
 	 * if it referenced the oldest vclock.
 	 */
-	struct gc_consumer *leftmost = gc_tree_first(&gc.consumers);
+	struct gc_consumer *leftmost = gc_tree_first_alive(&gc.consumers);
 	if (leftmost == NULL || leftmost->signature > prev_signature)
 		gc_run();
 }
diff --git a/src/box/gc.h b/src/box/gc.h
index 634ce6d38..a91e78858 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -31,6 +31,7 @@
  * SUCH DAMAGE.
  */
 
+#include <stdbool.h>
 #include <stddef.h>
 #include <stdint.h>
 
@@ -40,6 +41,11 @@ extern "C" {
 
 struct gc_consumer;
 
+enum gc_consumer_state {
+	CONSUMER_OK = 0,
+	CONSUMER_OFF = 1,
+};
+
 /**
  * Initialize the garbage collection state.
  */
@@ -88,6 +94,13 @@ gc_consumer_register(const char *name, int64_t signature);
 void
 gc_consumer_unregister(struct gc_consumer *consumer);
 
+/**
+ * Notify gc to clean xdir because of running out
+ * of disk space.
+ */
+void
+gc_xdir_clean_notify(bool force_clean);
+
 /**
  * Advance the vclock signature tracked by a consumer and
  * invoke garbage collection if needed.
diff --git a/src/box/wal.cc b/src/box/wal.cc
index 099c70caa..9dd8347d5 100644
--- a/src/box/wal.cc
+++ b/src/box/wal.cc
@@ -41,11 +41,14 @@
 #include "cbus.h"
 #include "coio_task.h"
 #include "replication.h"
+#include "gc.h"
 
 
 const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
 
 int wal_dir_lock = -1;
+/** Variable to store previous disk write result. */
+bool err_nospace = false;
 
 static int64_t
 wal_write(struct journal *, struct journal_entry *);
@@ -59,6 +62,11 @@ struct wal_thread {
 	struct cord cord;
 	/** A pipe from 'tx' thread to 'wal' */
 	struct cpipe wal_pipe;
+	/**
+	 * Return pipe from 'wal' to tx'. This is a
+	 * priority pipe and DOES NOT support yield.
+	 */
+	struct cpipe tx_prio_pipe;
 	/** Return pipe from 'wal' to tx' */
 	struct cpipe tx_pipe;
 };
@@ -129,6 +137,10 @@ struct wal_msg: public cmsg {
 	struct stailq rollback;
 };
 
+struct gc_msg: public cmsg {
+	bool io_err;
+};
+
 /**
  * Vinyl metadata log writer.
  */
@@ -154,7 +166,7 @@ static void
 tx_schedule_commit(struct cmsg *msg);
 
 static struct cmsg_hop wal_request_route[] = {
-	{wal_write_to_disk, &wal_thread.tx_pipe},
+	{wal_write_to_disk, &wal_thread.tx_prio_pipe},
 	{tx_schedule_commit, NULL},
 };
 
@@ -414,7 +426,7 @@ wal_checkpoint(struct vclock *vclock, bool rotate)
 		return 0;
 	}
 	static struct cmsg_hop wal_checkpoint_route[] = {
-		{wal_checkpoint_f, &wal_thread.tx_pipe},
+		{wal_checkpoint_f, &wal_thread.tx_prio_pipe},
 		{wal_checkpoint_done_f, NULL},
 	};
 	vclock_create(vclock);
@@ -453,7 +465,7 @@ wal_collect_garbage(int64_t lsn)
 	struct wal_gc_msg msg;
 	msg.lsn = lsn;
 	bool cancellable = fiber_set_cancellable(false);
-	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_pipe, &msg,
+	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg,
 		  wal_collect_garbage_f, NULL, TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
 }
@@ -544,7 +556,7 @@ wal_writer_begin_rollback(struct wal_writer *writer)
 		 * list.
 		 */
 		{ wal_writer_clear_bus, &wal_thread.wal_pipe },
-		{ wal_writer_clear_bus, &wal_thread.tx_pipe },
+		{ wal_writer_clear_bus, &wal_thread.tx_prio_pipe },
 		/*
 		 * Step 2: writer->rollback queue contains all
 		 * messages which need to be rolled back,
@@ -562,7 +574,7 @@ wal_writer_begin_rollback(struct wal_writer *writer)
 	 * all input until rollback mode is off.
 	 */
 	cmsg_init(&writer->in_rollback, rollback_route);
-	cpipe_push(&wal_thread.tx_pipe, &writer->in_rollback);
+	cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback);
 }
 
 static void
@@ -581,6 +593,14 @@ wal_assign_lsn(struct wal_writer *writer, struct xrow_header **row,
 	}
 }
 
+static void
+gc_status_update(struct cmsg *msg)
+{
+	msg->route = NULL;
+	say_info("QQQ: gc_status_update");
+	gc_xdir_clean_notify(static_cast<gc_msg*>(msg)->io_err);
+}
+
 static void
 wal_write_to_disk(struct cmsg *msg)
 {
@@ -647,11 +667,29 @@ wal_write_to_disk(struct cmsg *msg)
 	last_committed = stailq_last(&wal_msg->commit);
 
 done:
+	bool need_send = false;
 	struct error *error = diag_last_error(diag_get());
 	if (error) {
 		/* Until we can pass the error to tx, log it and clear. */
 		error_log(error);
 		diag_clear(diag_get());
+		if (errno == ENOSPC) {
+			err_nospace = true;
+			need_send = true;
+		}
+	} else if (err_nospace) {
+		/** Clear flag and send message to gc. */
+		err_nospace = false;
+		need_send = true;
+	}
+	if (need_send) {
+		struct gc_msg msg;
+		static const struct cmsg_hop route[] = {
+			{gc_status_update, NULL}
+		};
+		cmsg_init(&msg, route);
+		msg.io_err = err_nospace;
+		cpipe_push(&wal_thread.tx_pipe, &msg);
 	}
 	/*
 	 * We need to start rollback from the first request
@@ -691,7 +729,8 @@ wal_thread_f(va_list ap)
 	 * endpoint, to ensure that WAL messages are delivered
 	 * even when tx fiber pool is used up by net messages.
 	 */
-	cpipe_create(&wal_thread.tx_pipe, "tx_prio");
+	cpipe_create(&wal_thread.tx_prio_pipe, "tx_prio");
+	cpipe_create(&wal_thread.tx_pipe, "tx");
 
 	cbus_loop(&endpoint);
 
@@ -703,6 +742,7 @@ wal_thread_f(va_list ap)
 	if (xlog_is_open(&vy_log_writer.xlog))
 		xlog_close(&vy_log_writer.xlog, false);
 
+	cpipe_destroy(&wal_thread.tx_prio_pipe);
 	cpipe_destroy(&wal_thread.tx_pipe);
 	return 0;
 }
@@ -843,7 +883,7 @@ wal_write_vy_log(struct journal_entry *entry)
 	struct wal_write_vy_log_msg msg;
 	msg.entry= entry;
 	bool cancellable = fiber_set_cancellable(false);
-	int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_pipe, &msg,
+	int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg,
 			   wal_write_vy_log_f, NULL, TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
 	return rc;
@@ -863,7 +903,7 @@ wal_rotate_vy_log()
 {
 	struct cbus_call_msg msg;
 	bool cancellable = fiber_set_cancellable(false);
-	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_pipe, &msg,
+	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg,
 		  wal_rotate_vy_log_f, NULL, TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
 }
diff --git a/src/errinj.h b/src/errinj.h
index ed69b6cb0..438095294 100644
--- a/src/errinj.h
+++ b/src/errinj.h
@@ -109,6 +109,7 @@ struct errinj {
 	_(ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT, ERRINJ_DOUBLE, {.dparam = 0}) \
 	_(ERRINJ_IPROTO_TX_DELAY, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_LOG_ROTATE, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_NO_DISK_SPACE, ERRINJ_BOOL, {.bparam = false}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/src/fio.c b/src/fio.c
index b79d3d058..cdea11e87 100644
--- a/src/fio.c
+++ b/src/fio.c
@@ -29,6 +29,7 @@
  * SUCH DAMAGE.
  */
 #include "fio.h"
+#include "errinj.h"
 
 #include <sys/types.h>
 
@@ -141,6 +142,12 @@ fio_writev(int fd, struct iovec *iov, int iovcnt)
 	ssize_t nwr;
 restart:
 	nwr = writev(fd, iov, iovcnt);
+	/* Simulate running out of disk space to force the gc to clean logs. */
+	struct errinj *inj = errinj(ERRINJ_NO_DISK_SPACE, ERRINJ_BOOL);
+	if (inj != NULL && inj->bparam) {
+		errno = ENOSPC;
+		nwr = -1;
+	}
 	if (nwr < 0) {
 		if (errno == EINTR) {
 			errno = 0;
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 9e3a0bfab..4d85156a9 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -54,6 +54,8 @@ errinj.info()
     state: false
   ERRINJ_VY_RUN_WRITE:
     state: false
+  ERRINJ_NO_DISK_SPACE:
+    state: false
   ERRINJ_RELAY_FINAL_SLEEP:
     state: false
   ERRINJ_VY_RUN_DISCARD:
diff --git a/test/replication/force_gc_on_err_nospace.result b/test/replication/force_gc_on_err_nospace.result
new file mode 100644
index 000000000..904a36ddc
--- /dev/null
+++ b/test/replication/force_gc_on_err_nospace.result
@@ -0,0 +1,107 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+engine = test_run:get_cfg('engine')
+---
+...
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+---
+...
+s = box.schema.space.create('test', {engine = engine});
+---
+...
+index = s:create_index('primary', {type = 'tree'})
+---
+...
+errinj = box.error.injection
+---
+...
+fio = require('fio')
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+repl = box.cfg.replication
+---
+...
+box.cfg{replication = ""}
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+for i = 1, 5 do s:insert{i} box.snapshot() end
+---
+...
+s:select()
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [5]
+...
+#fio.glob(fio.pathjoin(fio.abspath("."), 'master/*.xlog'))
+---
+- 6
+...
+errinj.set("ERRINJ_NO_DISK_SPACE", true)
+---
+- ok
+...
+function insert(a) s:insert(a) end
+---
+...
+_, err = pcall(insert, {6})
+---
+...
+err:match("ailed to write")
+---
+- ailed to write
+...
+-- add a little timeout so gc could finish job
+require('fiber').sleep(0.01)
+---
+...
+#fio.glob(fio.pathjoin(fio.abspath("."), 'master/*.xlog'))
+---
+- 2
+...
+-- cleanup
+test_run:cmd("switch replica")
+---
+- true
+...
+test_run:cmd("restart server default with cleanup=1")
+---
+- true
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
diff --git a/test/replication/force_gc_on_err_nospace.test.lua b/test/replication/force_gc_on_err_nospace.test.lua
new file mode 100644
index 000000000..fea4aa73b
--- /dev/null
+++ b/test/replication/force_gc_on_err_nospace.test.lua
@@ -0,0 +1,38 @@
+env = require('test_run')
+test_run = env.new()
+engine = test_run:get_cfg('engine')
+
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+
+s = box.schema.space.create('test', {engine = engine});
+index = s:create_index('primary', {type = 'tree'})
+
+errinj = box.error.injection
+fio = require('fio')
+
+box.schema.user.grant('guest', 'replication')
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+
+test_run:cmd("switch replica")
+repl = box.cfg.replication
+box.cfg{replication = ""}
+test_run:cmd("switch default")
+
+for i = 1, 5 do s:insert{i} box.snapshot() end
+s:select()
+#fio.glob(fio.pathjoin(fio.abspath("."), 'master/*.xlog'))
+errinj.set("ERRINJ_NO_DISK_SPACE", true)
+function insert(a) s:insert(a) end
+_, err = pcall(insert, {6})
+err:match("ailed to write")
+-- add a little timeout so gc could finish job
+require('fiber').sleep(0.01)
+#fio.glob(fio.pathjoin(fio.abspath("."), 'master/*.xlog'))
+
+-- cleanup
+test_run:cmd("switch replica")
+test_run:cmd("restart server default with cleanup=1")
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
-- 
2.14.3 (Apple Git-98)

^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2018-06-18 12:35 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-06-06  9:27 [tarantool-patches] [PATCH] replication: force gc to clean xdir on ENOSPC err Konstantin Belyavskiy
2018-06-07  4:24 ` [tarantool-patches] " Georgy Kirichenko
2018-06-09 14:05 [tarantool-patches] " Konstantin Belyavskiy
2018-06-18 12:35 ` Vladimir Davydov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox