Tarantool development patches archive
 help / color / mirror / Atom feed
From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: gorcunov@gmail.com, v.shpilevoy@tarantool.org
Cc: tarantool-patches@dev.tarantool.org
Subject: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
Date: Wed, 24 Feb 2021 22:35:49 +0300	[thread overview]
Message-ID: <20210224193549.70017-1-sergepetrenko@tarantool.org> (raw)

Since the introduction of asynchronous commit, which doesn't wait for a
WAL write to succeed, it's quite easy to clog WAL with huge amounts
write requests. For now, it's only possible from an applier, since it's
the only user of async commit at the moment.

This happens when replica is syncing with master and reads new
transactions at a pace higher than it can write them to WAL (see docbot
request for detailed explanation).

To ameliorate such behavior, we need to introduce some limit on
not-yet-finished WAL write requests. This is what this commit is trying
to do.
Two new counters are added to wal writer: queue_size (in bytes) and
queue_len (in wal messages) together with configuration settings:
`wal_queue_max_size` and `wal_queue_max_len`.
Size and length are increased on every new submitted request, and are
decreased once the tx receives a confirmation that a specific request
was written.
Actually, the limits are added to an abstract journal, but take effect
only for wal writer.

Once size or length reach their maximum values, applier is blocked until
some of the write requests are finished.

The size limit isn't strict, i.e. if there's at least one free byte, the
whole write request fits and no blocking is involved.

The feature is ready for `box.commit{is_async=true}`. Once it's
implemented, it should check whether the queue is full and let the user
decide what to do next. Either wait or roll the tx back.

Part of #5536

@TarantoolBot document
Title: new configuration options: 'wal_queue_max_size', 'wal_queue_max_len'

`wal_queue_max_size` and `wal_queue_max_len` put a limit on the amount
of concurrent write requests submitted to WAL.
`wal_queue_max_size` is measured in number of bytes to be written (0
means unlimited), and `wal_queue_max_len` is measured in number of
transactions, 0 meaning unlimited.
These options only affect replica behaviour at the moment, and default
to 0. They limit the pace at which replica reads new transactions from
master.

Here's when these options come in handy:
Imagine such a situation: there are 2 servers, a master and a replica,
and the replica is down for some period of time. While the replica is
down, the master serves requests at a reasonable pace, possibly close to
its WAL throughput limit. Once the replica reconnects, it has to receive
all the data master has piled up. Now there's no limit in speed at which
master sends the data to replica, and there's no limit at which
replica's applier submits corresponding write requests to WAL. This
leads to a situation when replica's WAL is never in time to serve the
requests and the amount of pending requests is constantly growing.
There's no limit for memory WAL write requests take, and this clogging
of WAL write queue may even lead to replica using up all the available
memory.

Now, when either `wal_queue_max_size` or `wal_queue_max_len` or both are
set, appliers will stop reading new transactions once the limit is
reached. This will let WAL process all the requests that have piled up
and free all the excess memory.
---
https://github.com/tarantool/tarantool/issues/5536
https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom

Changes in v3:
  - Extract queue internals into new struct journal_queue.
  - Move is_ready flag to queue from queue entry.
  - Minor refactoring.

Changes in v2:
  - Move queue logic to journal.

 src/box/applier.cc                          |   9 ++
 src/box/box.cc                              |  52 ++++++++
 src/box/box.h                               |   2 +
 src/box/journal.c                           |  71 ++++++++++
 src/box/journal.h                           | 136 +++++++++++++++++++-
 src/box/lua/cfg.cc                          |  18 +++
 src/box/lua/load_cfg.lua                    |   6 +
 src/box/wal.c                               |  14 ++
 src/box/wal.h                               |  14 ++
 test/app-tap/init_script.result             |   2 +
 test/box-tap/cfg.test.lua                   |   4 +-
 test/box/admin.result                       |   4 +
 test/box/cfg.result                         |   8 ++
 test/replication/gh-5536-wal-limit.result   | 132 +++++++++++++++++++
 test/replication/gh-5536-wal-limit.test.lua |  58 +++++++++
 test/replication/suite.cfg                  |   1 +
 test/replication/suite.ini                  |   2 +-
 17 files changed, 525 insertions(+), 8 deletions(-)
 create mode 100644 test/replication/gh-5536-wal-limit.result
 create mode 100644 test/replication/gh-5536-wal-limit.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 553db76fc..27ddd0f29 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
 		goto success;
 	}
 
+	/*
+	 * Do not spam WAL with excess write requests, let it process what's
+	 * piled up first.
+	 * This is done before opening the transaction to avoid problems with
+	 * yielding inside it.
+	 */
+	if (journal_queue_is_full() || journal_queue_has_waiters())
+		journal_wait_queue();
+
 	/**
 	 * Explicitly begin the transaction so that we can
 	 * control fiber->gc life cycle and, in case of apply
diff --git a/src/box/box.cc b/src/box/box.cc
index 26cbe8aab..9a3b092d0 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -754,6 +754,34 @@ box_check_wal_mode(const char *mode_name)
 	return (enum wal_mode) mode;
 }
 
+static int64_t
+box_check_wal_queue_max_len(void)
+{
+	int64_t len = cfg_geti64("wal_queue_max_len");
+	if (len < 0) {
+		diag_set(ClientError, ER_CFG, "wal_queue_max_len",
+			 "wal_queue_max_len must be >= 0");
+	}
+	/* Unlimited. */
+	if (len == 0)
+		len = INT64_MAX;
+	return len;
+}
+
+static int64_t
+box_check_wal_queue_max_size(void)
+{
+	int64_t size = cfg_geti64("wal_queue_max_size");
+	if (size < 0) {
+		diag_set(ClientError, ER_CFG, "wal_queue_max_size",
+			 "wal_queue_max_size must be >= 0");
+	}
+	/* Unlimited. */
+	if (size == 0)
+		size = INT64_MAX;
+	return size;
+}
+
 static void
 box_check_readahead(int readahead)
 {
@@ -875,6 +903,10 @@ box_check_config(void)
 	box_check_checkpoint_count(cfg_geti("checkpoint_count"));
 	box_check_wal_max_size(cfg_geti64("wal_max_size"));
 	box_check_wal_mode(cfg_gets("wal_mode"));
+	if (box_check_wal_queue_max_size() < 0)
+		diag_raise();
+	if (box_check_wal_queue_max_len() < 0)
+		diag_raise();
 	if (box_check_memory_quota("memtx_memory") < 0)
 		diag_raise();
 	box_check_memtx_min_tuple_size(cfg_geti64("memtx_min_tuple_size"));
@@ -1411,6 +1443,26 @@ box_set_checkpoint_wal_threshold(void)
 	wal_set_checkpoint_threshold(threshold);
 }
 
+int
+box_set_wal_queue_max_size(void)
+{
+	int64_t size = box_check_wal_queue_max_size();
+	if (size < 0)
+		return -1;
+	wal_set_queue_max_size(size);
+	return 0;
+}
+
+int
+box_set_wal_queue_max_len(void)
+{
+	int64_t len = box_check_wal_queue_max_len();
+	if (len < 0)
+		return -1;
+	wal_set_queue_max_len(len);
+	return 0;
+}
+
 void
 box_set_vinyl_memory(void)
 {
diff --git a/src/box/box.h b/src/box/box.h
index b68047a95..4f5b4b617 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -239,6 +239,8 @@ void box_set_readahead(void);
 void box_set_checkpoint_count(void);
 void box_set_checkpoint_interval(void);
 void box_set_checkpoint_wal_threshold(void);
+int box_set_wal_queue_max_size(void);
+int box_set_wal_queue_max_len(void);
 void box_set_memtx_memory(void);
 void box_set_memtx_max_tuple_size(void);
 void box_set_vinyl_memory(void);
diff --git a/src/box/journal.c b/src/box/journal.c
index cb320b557..4c31a3dfe 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -34,6 +34,16 @@
 
 struct journal *current_journal = NULL;
 
+struct journal_queue journal_queue = {
+	.max_size = INT64_MAX,
+	.size = 0,
+	.max_len = INT64_MAX,
+	.len = 0,
+	.waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters),
+	.is_awake = false,
+	.is_ready = false,
+};
+
 struct journal_entry *
 journal_entry_new(size_t n_rows, struct region *region,
 		  journal_write_async_f write_async_cb,
@@ -55,3 +65,64 @@ journal_entry_new(size_t n_rows, struct region *region,
 			     complete_data);
 	return entry;
 }
+
+struct journal_queue_entry {
+	/** The fiber waiting for queue space to free. */
+	struct fiber *fiber;
+	/** A link in all waiting fibers list. */
+	struct rlist in_queue;
+};
+
+/**
+ * Wake up the first waiter in the journal queue.
+ */
+static inline void
+journal_queue_wakeup_first(void)
+{
+	struct journal_queue_entry *e;
+	if (rlist_empty(&journal_queue.waiters))
+		goto out;
+	/*
+	 * When the queue isn't forcefully emptied, no need to wake everyone
+	 * else up until there's some free space.
+	 */
+	if (!journal_queue.is_ready && journal_queue_is_full())
+		goto out;
+	e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e),
+			in_queue);
+	fiber_wakeup(e->fiber);
+	return;
+out:
+	journal_queue.is_awake = false;
+	journal_queue.is_ready = false;
+}
+
+void
+journal_queue_wakeup(bool force_ready)
+{
+	assert(!rlist_empty(&journal_queue.waiters));
+	if (journal_queue.is_awake)
+		return;
+	journal_queue.is_awake = true;
+	journal_queue.is_ready = force_ready;
+	journal_queue_wakeup_first();
+}
+
+void
+journal_wait_queue(void)
+{
+	struct journal_queue_entry entry = {
+		.fiber = fiber(),
+	};
+	rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue);
+	/*
+	 * Will be waken up by either queue emptying or a synchronous write.
+	 */
+	while (journal_queue_is_full() && !journal_queue.is_ready)
+		fiber_yield();
+
+	assert(&entry.in_queue == rlist_first(&journal_queue.waiters));
+	rlist_del(&entry.in_queue);
+
+	journal_queue_wakeup_first();
+}
diff --git a/src/box/journal.h b/src/box/journal.h
index 5d8d5a726..8fec5b27e 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -109,6 +109,36 @@ journal_entry_new(size_t n_rows, struct region *region,
 		  journal_write_async_f write_async_cb,
 		  void *complete_data);
 
+struct journal_queue {
+	/** Maximal size of entries enqueued in journal (in bytes). */
+	int64_t max_size;
+	/** Current approximate size of journal queue. */
+	int64_t size;
+	/** Maximal allowed length of journal queue, in entries. */
+	int64_t max_len;
+	/** Current journal queue length. */
+	int64_t len;
+	/**
+	 * The fibers waiting for some space to free in journal queue.
+	 * Once some space is freed they will be waken up in the same order they
+	 * entered the queue.
+	 */
+	struct rlist waiters;
+	/**
+	 * Whether the queue is being woken or not. Used to avoid multiple
+	 * concurrent wake-ups.
+	 */
+	bool is_awake;
+	/**
+	 * A flag used to tell the waiting fibers they may proceed even if the
+	 * queue is full, i.e. force them to submit a write request.
+	 */
+	bool is_ready;
+};
+
+/** A single queue for all journal instances. */
+extern struct journal_queue journal_queue;
+
 /**
  * An API for an abstract journal for all transactions of this
  * instance, as well as for multiple instances in case of
@@ -124,6 +154,82 @@ struct journal {
 		     struct journal_entry *entry);
 };
 
+/**
+ * Depending on the step of recovery and instance configuration
+ * points at a concrete implementation of the journal.
+ */
+extern struct journal *current_journal;
+
+/**
+ * Wake the journal queue up.
+ * @param force_ready whether waiters should proceed even if the queue is still
+ *                    full.
+ */
+void
+journal_queue_wakeup(bool force_ready);
+
+/**
+ * Check whether any of the queue size limits is reached.
+ * If the queue is full, we must wait for some of the entries to be written
+ * before proceeding with a new asynchronous write request.
+ */
+static inline bool
+journal_queue_is_full(void)
+{
+	return journal_queue.size > journal_queue.max_size ||
+	       journal_queue.len > journal_queue.max_len;
+}
+
+/**
+ * Check whether anyone is waiting for the journal queue to empty. If there are
+ * other waiters we must go after them to preserve write order.
+ */
+static inline bool
+journal_queue_has_waiters(void)
+{
+	return !rlist_empty(&journal_queue.waiters);
+}
+
+/** Yield until there's some space in the journal queue. */
+void
+journal_wait_queue(void);
+
+/** Set maximal journal queue size in bytes. */
+static inline void
+journal_queue_set_max_size(int64_t size)
+{
+	journal_queue.max_size = size;
+	if (journal_queue_has_waiters() && !journal_queue_is_full())
+		journal_queue_wakeup(false);
+}
+
+/** Set maximal journal queue length, in entries. */
+static inline void
+journal_queue_set_max_len(int64_t len)
+{
+	journal_queue.max_len = len;
+	if (journal_queue_has_waiters() && !journal_queue_is_full())
+		journal_queue_wakeup(false);
+}
+
+/** Increase queue size on a new write request. */
+static inline void
+journal_queue_on_append(struct journal_entry *entry)
+{
+	journal_queue.len++;
+	journal_queue.size += entry->approx_len;
+}
+
+/** Decrease queue size once write request is complete. */
+static inline void
+journal_queue_on_complete(struct journal_entry *entry)
+{
+	journal_queue.len--;
+	journal_queue.size -= entry->approx_len;
+	assert(journal_queue.len >= 0);
+	assert(journal_queue.size >= 0);
+}
+
 /**
  * Complete asynchronous write.
  */
@@ -131,15 +237,14 @@ static inline void
 journal_async_complete(struct journal_entry *entry)
 {
 	assert(entry->write_async_cb != NULL);
+
+	journal_queue_on_complete(entry);
+	if (journal_queue_has_waiters() && !journal_queue_is_full())
+		journal_queue_wakeup(false);
+
 	entry->write_async_cb(entry);
 }
 
-/**
- * Depending on the step of recovery and instance configuration
- * points at a concrete implementation of the journal.
- */
-extern struct journal *current_journal;
-
 /**
  * Write a single entry to the journal in synchronous way.
  *
@@ -148,6 +253,18 @@ extern struct journal *current_journal;
 static inline int
 journal_write(struct journal_entry *entry)
 {
+	if (journal_queue_has_waiters()) {
+		/*
+		 * It's a synchronous write, so it's fine to wait a bit more for
+		 * everyone else to be written. They'll wake us up back
+		 * afterwards.
+		 */
+		journal_queue_wakeup(true);
+		journal_wait_queue();
+	}
+
+	journal_queue_on_append(entry);
+
 	return current_journal->write(current_journal, entry);
 }
 
@@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry)
 static inline int
 journal_write_async(struct journal_entry *entry)
 {
+	/*
+	 * It's the job of the caller to check whether the queue is full prior
+	 * to submitting the request.
+	 */
+	assert(!journal_queue_is_full() || journal_queue.is_ready);
+	journal_queue_on_append(entry);
+
 	return current_journal->write_async(current_journal, entry);
 }
 
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 2d3ccbf0e..35f410710 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -164,6 +164,22 @@ lbox_cfg_set_checkpoint_wal_threshold(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_cfg_set_wal_queue_max_size(struct lua_State *L)
+{
+	if (box_set_wal_queue_max_size() != 0)
+		luaT_error(L);
+	return 0;
+}
+
+static int
+lbox_cfg_set_wal_queue_max_len(struct lua_State *L)
+{
+	if (box_set_wal_queue_max_len() != 0)
+		luaT_error(L);
+	return 0;
+}
+
 static int
 lbox_cfg_set_read_only(struct lua_State *L)
 {
@@ -399,6 +415,8 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count},
 		{"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval},
 		{"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold},
+		{"cfg_set_wal_queue_max_size", lbox_cfg_set_wal_queue_max_size},
+		{"cfg_set_wal_queue_max_len", lbox_cfg_set_wal_queue_max_len},
 		{"cfg_set_read_only", lbox_cfg_set_read_only},
 		{"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory},
 		{"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size},
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 574c8bef4..c11a9e103 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -71,6 +71,8 @@ local default_cfg = {
     wal_mode            = "write",
     wal_max_size        = 256 * 1024 * 1024,
     wal_dir_rescan_delay= 2,
+    wal_queue_max_size  = 0,
+    wal_queue_max_len   = 0,
     force_recovery      = false,
     replication         = nil,
     instance_uuid       = nil,
@@ -163,6 +165,8 @@ local template_cfg = {
     coredump            = 'boolean',
     checkpoint_interval = 'number',
     checkpoint_wal_threshold = 'number',
+    wal_queue_max_size  = 'number',
+    wal_queue_max_len   = 'number',
     checkpoint_count    = 'number',
     read_only           = 'boolean',
     hot_standby         = 'boolean',
@@ -277,6 +281,8 @@ local dynamic_cfg = {
     checkpoint_count        = private.cfg_set_checkpoint_count,
     checkpoint_interval     = private.cfg_set_checkpoint_interval,
     checkpoint_wal_threshold = private.cfg_set_checkpoint_wal_threshold,
+    wal_queue_max_size      = private.cfg_set_wal_queue_max_size,
+    wal_queue_max_len       = private.cfg_set_wal_queue_max_len,
     worker_pool_threads     = private.cfg_set_worker_pool_threads,
     feedback_enabled        = ifdef_feedback_set_params,
     feedback_crashinfo      = ifdef_feedback_set_params,
diff --git a/src/box/wal.c b/src/box/wal.c
index 937d47ba9..4a0381cf4 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -765,6 +765,20 @@ wal_set_checkpoint_threshold(int64_t threshold)
 	fiber_set_cancellable(cancellable);
 }
 
+void
+wal_set_queue_max_size(int64_t size)
+{
+	assert(&wal_writer_singleton.base == current_journal);
+	journal_queue_set_max_size(size);
+}
+
+void
+wal_set_queue_max_len(int64_t len)
+{
+	assert(&wal_writer_singleton.base == current_journal);
+	journal_queue_set_max_len(len);
+}
+
 struct wal_gc_msg
 {
 	struct cbus_call_msg base;
diff --git a/src/box/wal.h b/src/box/wal.h
index ca43dc6eb..1db32f66f 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -237,6 +237,20 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint);
 void
 wal_set_checkpoint_threshold(int64_t threshold);
 
+/**
+ * Set the pending write limit in bytes. Once the limit is reached, new
+ * writes are blocked until some previous writes succeed.
+ */
+void
+wal_set_queue_max_size(int64_t size);
+
+/**
+ * Set the pending write limit in journal entries. Once the limit is reached,
+ * new writes are blocked until some previous writes succeeed.
+ */
+void
+wal_set_queue_max_len(int64_t len);
+
 /**
  * Remove WAL files that are not needed by consumers reading
  * rows at @vclock or newer.
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 16c5b01d2..7a224e50e 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -56,6 +56,8 @@ wal_dir:.
 wal_dir_rescan_delay:2
 wal_max_size:268435456
 wal_mode:write
+wal_queue_max_len:0
+wal_queue_max_size:0
 worker_pool_threads:4
 --
 -- Test insert from detached fiber
diff --git a/test/box-tap/cfg.test.lua b/test/box-tap/cfg.test.lua
index a577f023d..3276ddf64 100755
--- a/test/box-tap/cfg.test.lua
+++ b/test/box-tap/cfg.test.lua
@@ -6,7 +6,7 @@ local socket = require('socket')
 local fio = require('fio')
 local uuid = require('uuid')
 local msgpack = require('msgpack')
-test:plan(108)
+test:plan(110)
 
 --------------------------------------------------------------------------------
 -- Invalid values
@@ -49,6 +49,8 @@ invalid('vinyl_run_count_per_level', 0)
 invalid('vinyl_run_size_ratio', 1)
 invalid('vinyl_bloom_fpr', 0)
 invalid('vinyl_bloom_fpr', 1.1)
+invalid('wal_queue_max_size', -1)
+invalid('wal_queue_max_len', -1)
 
 local function invalid_combinations(name, val)
     local status, result = pcall(box.cfg, val)
diff --git a/test/box/admin.result b/test/box/admin.result
index 05debe673..c818f4f9f 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -133,6 +133,10 @@ cfg_filter(box.cfg)
     - 268435456
   - - wal_mode
     - write
+  - - wal_queue_max_len
+    - 0
+  - - wal_queue_max_size
+    - 0
   - - worker_pool_threads
     - 4
 ...
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 22a720c2c..19f322e7d 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -121,6 +121,10 @@ cfg_filter(box.cfg)
  |     - 268435456
  |   - - wal_mode
  |     - write
+ |   - - wal_queue_max_len
+ |     - 0
+ |   - - wal_queue_max_size
+ |     - 0
  |   - - worker_pool_threads
  |     - 4
  | ...
@@ -236,6 +240,10 @@ cfg_filter(box.cfg)
  |     - 268435456
  |   - - wal_mode
  |     - write
+ |   - - wal_queue_max_len
+ |     - 0
+ |   - - wal_queue_max_size
+ |     - 0
  |   - - worker_pool_threads
  |     - 4
  | ...
diff --git a/test/replication/gh-5536-wal-limit.result b/test/replication/gh-5536-wal-limit.result
new file mode 100644
index 000000000..f7799baa8
--- /dev/null
+++ b/test/replication/gh-5536-wal-limit.result
@@ -0,0 +1,132 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+
+--
+-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so
+-- that appliers stop reading new transactions from master once the queue is
+-- full.
+--
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+_ = box.schema.space.create('test')
+ | ---
+ | ...
+_ = box.space.test:create_index('pk')
+ | ---
+ | ...
+
+replication_timeout = box.cfg.replication_timeout
+ | ---
+ | ...
+box.cfg{replication_timeout=1000}
+ | ---
+ | ...
+
+test_run:cmd('create server replica with rpl_master=default,\
+              script="replication/replica.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica with wait=True, wait_load=True')
+ | ---
+ | - true
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+-- Huge replication timeout to not cause reconnects while applier is blocked.
+-- Tiny queue size (in bytes) to allow exactly one queue entry at a time.
+box.cfg{wal_queue_max_size=1, replication_timeout=1000}
+ | ---
+ | ...
+write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT")
+ | ---
+ | ...
+-- Block WAL writes so that we may test queue overflow.
+box.error.injection.set("ERRINJ_WAL_DELAY", true)
+ | ---
+ | - ok
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+for i = 1,10 do box.space.test:insert{i} end
+ | ---
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+-- Wait for replication. Cannot rely on lsn bump here. It won't happen while
+-- WAL is blocked.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\
+end)
+ | ---
+ | - true
+ | ...
+require('fiber').sleep(0.5)
+ | ---
+ | ...
+-- Only one entry fits when the limit is small.
+assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1)
+ | ---
+ | - true
+ | ...
+box.error.injection.set("ERRINJ_WAL_DELAY", false)
+ | ---
+ | - ok
+ | ...
+
+-- Once the block is removed everything is written.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\
+end)
+ | ---
+ | - true
+ | ...
+assert(box.space.test:count() == 10)
+ | ---
+ | - true
+ | ...
+assert(box.info.replication[1].upstream.status == 'follow')
+ | ---
+ | - true
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+-- Cleanup.
+box.cfg{replication_timeout=replication_timeout}
+ | ---
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+box.space.test:drop()
+ | ---
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
+
diff --git a/test/replication/gh-5536-wal-limit.test.lua b/test/replication/gh-5536-wal-limit.test.lua
new file mode 100644
index 000000000..1e7d61ff7
--- /dev/null
+++ b/test/replication/gh-5536-wal-limit.test.lua
@@ -0,0 +1,58 @@
+test_run = require('test_run').new()
+fiber = require('fiber')
+
+--
+-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so
+-- that appliers stop reading new transactions from master once the queue is
+-- full.
+--
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('test')
+_ = box.space.test:create_index('pk')
+
+replication_timeout = box.cfg.replication_timeout
+box.cfg{replication_timeout=1000}
+
+test_run:cmd('create server replica with rpl_master=default,\
+              script="replication/replica.lua"')
+test_run:cmd('start server replica with wait=True, wait_load=True')
+
+test_run:switch('replica')
+-- Huge replication timeout to not cause reconnects while applier is blocked.
+-- Tiny queue size (in bytes) to allow exactly one queue entry at a time.
+box.cfg{wal_queue_max_size=1, replication_timeout=1000}
+write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT")
+-- Block WAL writes so that we may test queue overflow.
+box.error.injection.set("ERRINJ_WAL_DELAY", true)
+
+test_run:switch('default')
+
+for i = 1,10 do box.space.test:insert{i} end
+
+test_run:switch('replica')
+-- Wait for replication. Cannot rely on lsn bump here. It won't happen while
+-- WAL is blocked.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\
+end)
+require('fiber').sleep(0.5)
+-- Only one entry fits when the limit is small.
+assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1)
+box.error.injection.set("ERRINJ_WAL_DELAY", false)
+
+-- Once the block is removed everything is written.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\
+end)
+assert(box.space.test:count() == 10)
+assert(box.info.replication[1].upstream.status == 'follow')
+
+test_run:switch('default')
+
+-- Cleanup.
+box.cfg{replication_timeout=replication_timeout}
+test_run:cmd('stop server replica')
+test_run:cmd('delete server replica')
+box.space.test:drop()
+box.schema.user.revoke('guest', 'replication')
+
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index c80430afc..7e7004592 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -37,6 +37,7 @@
     "gh-4928-tx-boundaries.test.lua": {},
     "gh-5440-qsync-ro.test.lua": {},
     "gh-5435-qsync-clear-synchro-queue-commit-all.test.lua": {},
+    "gh-5536-wal-limit.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index e4812df37..89abfabff 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
 script =  master.lua
 description = tarantool/box, replication
 disabled = consistent.test.lua
-release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua
+release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua
 config = suite.cfg
 lua_libs = lua/fast_replica.lua lua/rlimit.lua
 use_unix_sockets = True
-- 
2.24.3 (Apple Git-128)


             reply	other threads:[~2021-02-24 19:35 UTC|newest]

Thread overview: 30+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-02-24 19:35 Serge Petrenko via Tarantool-patches [this message]
2021-02-24 19:40 ` Serge Petrenko via Tarantool-patches
2021-02-25 13:05 ` Konstantin Osipov via Tarantool-patches
2021-02-26  0:57   ` Vladislav Shpilevoy via Tarantool-patches
2021-02-26  7:18     ` Konstantin Osipov via Tarantool-patches
2021-02-26 20:23       ` Vladislav Shpilevoy via Tarantool-patches
2021-02-26 21:20         ` Konstantin Osipov via Tarantool-patches
2021-02-26 22:44           ` Vladislav Shpilevoy via Tarantool-patches
2021-02-27 13:27             ` Konstantin Osipov via Tarantool-patches
2021-03-01 19:15   ` Serge Petrenko via Tarantool-patches
2021-03-01 21:46     ` Konstantin Osipov via Tarantool-patches
2021-02-26  0:56 ` Vladislav Shpilevoy via Tarantool-patches
2021-03-01 19:08   ` Serge Petrenko via Tarantool-patches
2021-03-01 22:05     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-02 17:51       ` Serge Petrenko via Tarantool-patches
2021-03-03 20:59         ` Vladislav Shpilevoy via Tarantool-patches
2021-03-09 15:10           ` Serge Petrenko via Tarantool-patches
2021-03-09 19:49 ` Vladislav Shpilevoy via Tarantool-patches
2021-03-10  8:18   ` Konstantin Osipov via Tarantool-patches
2021-03-12 17:10     ` Serge Petrenko via Tarantool-patches
2021-03-13 19:14       ` Konstantin Osipov via Tarantool-patches
2021-03-15 23:42       ` Vladislav Shpilevoy via Tarantool-patches
2021-03-16  6:45         ` Konstantin Osipov via Tarantool-patches
2021-03-16 20:27           ` Vladislav Shpilevoy via Tarantool-patches
2021-03-16 10:19         ` Serge Petrenko via Tarantool-patches
2021-03-16 20:48           ` Vladislav Shpilevoy via Tarantool-patches
2021-03-17 12:14             ` Serge Petrenko via Tarantool-patches
2021-03-17 21:02           ` Vladislav Shpilevoy via Tarantool-patches
2021-03-19 11:32             ` Serge Petrenko via Tarantool-patches
2021-03-19 15:36 ` Kirill Yukhin via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210224193549.70017-1-sergepetrenko@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=gorcunov@gmail.com \
    --cc=sergepetrenko@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox