Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
@ 2021-02-24 19:35 Serge Petrenko via Tarantool-patches
  2021-02-24 19:40 ` Serge Petrenko via Tarantool-patches
                   ` (4 more replies)
  0 siblings, 5 replies; 30+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-02-24 19:35 UTC (permalink / raw)
  To: gorcunov, v.shpilevoy; +Cc: tarantool-patches

Since the introduction of asynchronous commit, which doesn't wait for a
WAL write to succeed, it's quite easy to clog WAL with huge amounts
write requests. For now, it's only possible from an applier, since it's
the only user of async commit at the moment.

This happens when replica is syncing with master and reads new
transactions at a pace higher than it can write them to WAL (see docbot
request for detailed explanation).

To ameliorate such behavior, we need to introduce some limit on
not-yet-finished WAL write requests. This is what this commit is trying
to do.
Two new counters are added to wal writer: queue_size (in bytes) and
queue_len (in wal messages) together with configuration settings:
`wal_queue_max_size` and `wal_queue_max_len`.
Size and length are increased on every new submitted request, and are
decreased once the tx receives a confirmation that a specific request
was written.
Actually, the limits are added to an abstract journal, but take effect
only for wal writer.

Once size or length reach their maximum values, applier is blocked until
some of the write requests are finished.

The size limit isn't strict, i.e. if there's at least one free byte, the
whole write request fits and no blocking is involved.

The feature is ready for `box.commit{is_async=true}`. Once it's
implemented, it should check whether the queue is full and let the user
decide what to do next. Either wait or roll the tx back.

Part of #5536

@TarantoolBot document
Title: new configuration options: 'wal_queue_max_size', 'wal_queue_max_len'

`wal_queue_max_size` and `wal_queue_max_len` put a limit on the amount
of concurrent write requests submitted to WAL.
`wal_queue_max_size` is measured in number of bytes to be written (0
means unlimited), and `wal_queue_max_len` is measured in number of
transactions, 0 meaning unlimited.
These options only affect replica behaviour at the moment, and default
to 0. They limit the pace at which replica reads new transactions from
master.

Here's when these options come in handy:
Imagine such a situation: there are 2 servers, a master and a replica,
and the replica is down for some period of time. While the replica is
down, the master serves requests at a reasonable pace, possibly close to
its WAL throughput limit. Once the replica reconnects, it has to receive
all the data master has piled up. Now there's no limit in speed at which
master sends the data to replica, and there's no limit at which
replica's applier submits corresponding write requests to WAL. This
leads to a situation when replica's WAL is never in time to serve the
requests and the amount of pending requests is constantly growing.
There's no limit for memory WAL write requests take, and this clogging
of WAL write queue may even lead to replica using up all the available
memory.

Now, when either `wal_queue_max_size` or `wal_queue_max_len` or both are
set, appliers will stop reading new transactions once the limit is
reached. This will let WAL process all the requests that have piled up
and free all the excess memory.
---
https://github.com/tarantool/tarantool/issues/5536
https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom

Changes in v3:
  - Extract queue internals into new struct journal_queue.
  - Move is_ready flag to queue from queue entry.
  - Minor refactoring.

Changes in v2:
  - Move queue logic to journal.

 src/box/applier.cc                          |   9 ++
 src/box/box.cc                              |  52 ++++++++
 src/box/box.h                               |   2 +
 src/box/journal.c                           |  71 ++++++++++
 src/box/journal.h                           | 136 +++++++++++++++++++-
 src/box/lua/cfg.cc                          |  18 +++
 src/box/lua/load_cfg.lua                    |   6 +
 src/box/wal.c                               |  14 ++
 src/box/wal.h                               |  14 ++
 test/app-tap/init_script.result             |   2 +
 test/box-tap/cfg.test.lua                   |   4 +-
 test/box/admin.result                       |   4 +
 test/box/cfg.result                         |   8 ++
 test/replication/gh-5536-wal-limit.result   | 132 +++++++++++++++++++
 test/replication/gh-5536-wal-limit.test.lua |  58 +++++++++
 test/replication/suite.cfg                  |   1 +
 test/replication/suite.ini                  |   2 +-
 17 files changed, 525 insertions(+), 8 deletions(-)
 create mode 100644 test/replication/gh-5536-wal-limit.result
 create mode 100644 test/replication/gh-5536-wal-limit.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 553db76fc..27ddd0f29 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
 		goto success;
 	}
 
+	/*
+	 * Do not spam WAL with excess write requests, let it process what's
+	 * piled up first.
+	 * This is done before opening the transaction to avoid problems with
+	 * yielding inside it.
+	 */
+	if (journal_queue_is_full() || journal_queue_has_waiters())
+		journal_wait_queue();
+
 	/**
 	 * Explicitly begin the transaction so that we can
 	 * control fiber->gc life cycle and, in case of apply
diff --git a/src/box/box.cc b/src/box/box.cc
index 26cbe8aab..9a3b092d0 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -754,6 +754,34 @@ box_check_wal_mode(const char *mode_name)
 	return (enum wal_mode) mode;
 }
 
+static int64_t
+box_check_wal_queue_max_len(void)
+{
+	int64_t len = cfg_geti64("wal_queue_max_len");
+	if (len < 0) {
+		diag_set(ClientError, ER_CFG, "wal_queue_max_len",
+			 "wal_queue_max_len must be >= 0");
+	}
+	/* Unlimited. */
+	if (len == 0)
+		len = INT64_MAX;
+	return len;
+}
+
+static int64_t
+box_check_wal_queue_max_size(void)
+{
+	int64_t size = cfg_geti64("wal_queue_max_size");
+	if (size < 0) {
+		diag_set(ClientError, ER_CFG, "wal_queue_max_size",
+			 "wal_queue_max_size must be >= 0");
+	}
+	/* Unlimited. */
+	if (size == 0)
+		size = INT64_MAX;
+	return size;
+}
+
 static void
 box_check_readahead(int readahead)
 {
@@ -875,6 +903,10 @@ box_check_config(void)
 	box_check_checkpoint_count(cfg_geti("checkpoint_count"));
 	box_check_wal_max_size(cfg_geti64("wal_max_size"));
 	box_check_wal_mode(cfg_gets("wal_mode"));
+	if (box_check_wal_queue_max_size() < 0)
+		diag_raise();
+	if (box_check_wal_queue_max_len() < 0)
+		diag_raise();
 	if (box_check_memory_quota("memtx_memory") < 0)
 		diag_raise();
 	box_check_memtx_min_tuple_size(cfg_geti64("memtx_min_tuple_size"));
@@ -1411,6 +1443,26 @@ box_set_checkpoint_wal_threshold(void)
 	wal_set_checkpoint_threshold(threshold);
 }
 
+int
+box_set_wal_queue_max_size(void)
+{
+	int64_t size = box_check_wal_queue_max_size();
+	if (size < 0)
+		return -1;
+	wal_set_queue_max_size(size);
+	return 0;
+}
+
+int
+box_set_wal_queue_max_len(void)
+{
+	int64_t len = box_check_wal_queue_max_len();
+	if (len < 0)
+		return -1;
+	wal_set_queue_max_len(len);
+	return 0;
+}
+
 void
 box_set_vinyl_memory(void)
 {
diff --git a/src/box/box.h b/src/box/box.h
index b68047a95..4f5b4b617 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -239,6 +239,8 @@ void box_set_readahead(void);
 void box_set_checkpoint_count(void);
 void box_set_checkpoint_interval(void);
 void box_set_checkpoint_wal_threshold(void);
+int box_set_wal_queue_max_size(void);
+int box_set_wal_queue_max_len(void);
 void box_set_memtx_memory(void);
 void box_set_memtx_max_tuple_size(void);
 void box_set_vinyl_memory(void);
diff --git a/src/box/journal.c b/src/box/journal.c
index cb320b557..4c31a3dfe 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -34,6 +34,16 @@
 
 struct journal *current_journal = NULL;
 
+struct journal_queue journal_queue = {
+	.max_size = INT64_MAX,
+	.size = 0,
+	.max_len = INT64_MAX,
+	.len = 0,
+	.waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters),
+	.is_awake = false,
+	.is_ready = false,
+};
+
 struct journal_entry *
 journal_entry_new(size_t n_rows, struct region *region,
 		  journal_write_async_f write_async_cb,
@@ -55,3 +65,64 @@ journal_entry_new(size_t n_rows, struct region *region,
 			     complete_data);
 	return entry;
 }
+
+struct journal_queue_entry {
+	/** The fiber waiting for queue space to free. */
+	struct fiber *fiber;
+	/** A link in all waiting fibers list. */
+	struct rlist in_queue;
+};
+
+/**
+ * Wake up the first waiter in the journal queue.
+ */
+static inline void
+journal_queue_wakeup_first(void)
+{
+	struct journal_queue_entry *e;
+	if (rlist_empty(&journal_queue.waiters))
+		goto out;
+	/*
+	 * When the queue isn't forcefully emptied, no need to wake everyone
+	 * else up until there's some free space.
+	 */
+	if (!journal_queue.is_ready && journal_queue_is_full())
+		goto out;
+	e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e),
+			in_queue);
+	fiber_wakeup(e->fiber);
+	return;
+out:
+	journal_queue.is_awake = false;
+	journal_queue.is_ready = false;
+}
+
+void
+journal_queue_wakeup(bool force_ready)
+{
+	assert(!rlist_empty(&journal_queue.waiters));
+	if (journal_queue.is_awake)
+		return;
+	journal_queue.is_awake = true;
+	journal_queue.is_ready = force_ready;
+	journal_queue_wakeup_first();
+}
+
+void
+journal_wait_queue(void)
+{
+	struct journal_queue_entry entry = {
+		.fiber = fiber(),
+	};
+	rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue);
+	/*
+	 * Will be waken up by either queue emptying or a synchronous write.
+	 */
+	while (journal_queue_is_full() && !journal_queue.is_ready)
+		fiber_yield();
+
+	assert(&entry.in_queue == rlist_first(&journal_queue.waiters));
+	rlist_del(&entry.in_queue);
+
+	journal_queue_wakeup_first();
+}
diff --git a/src/box/journal.h b/src/box/journal.h
index 5d8d5a726..8fec5b27e 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -109,6 +109,36 @@ journal_entry_new(size_t n_rows, struct region *region,
 		  journal_write_async_f write_async_cb,
 		  void *complete_data);
 
+struct journal_queue {
+	/** Maximal size of entries enqueued in journal (in bytes). */
+	int64_t max_size;
+	/** Current approximate size of journal queue. */
+	int64_t size;
+	/** Maximal allowed length of journal queue, in entries. */
+	int64_t max_len;
+	/** Current journal queue length. */
+	int64_t len;
+	/**
+	 * The fibers waiting for some space to free in journal queue.
+	 * Once some space is freed they will be waken up in the same order they
+	 * entered the queue.
+	 */
+	struct rlist waiters;
+	/**
+	 * Whether the queue is being woken or not. Used to avoid multiple
+	 * concurrent wake-ups.
+	 */
+	bool is_awake;
+	/**
+	 * A flag used to tell the waiting fibers they may proceed even if the
+	 * queue is full, i.e. force them to submit a write request.
+	 */
+	bool is_ready;
+};
+
+/** A single queue for all journal instances. */
+extern struct journal_queue journal_queue;
+
 /**
  * An API for an abstract journal for all transactions of this
  * instance, as well as for multiple instances in case of
@@ -124,6 +154,82 @@ struct journal {
 		     struct journal_entry *entry);
 };
 
+/**
+ * Depending on the step of recovery and instance configuration
+ * points at a concrete implementation of the journal.
+ */
+extern struct journal *current_journal;
+
+/**
+ * Wake the journal queue up.
+ * @param force_ready whether waiters should proceed even if the queue is still
+ *                    full.
+ */
+void
+journal_queue_wakeup(bool force_ready);
+
+/**
+ * Check whether any of the queue size limits is reached.
+ * If the queue is full, we must wait for some of the entries to be written
+ * before proceeding with a new asynchronous write request.
+ */
+static inline bool
+journal_queue_is_full(void)
+{
+	return journal_queue.size > journal_queue.max_size ||
+	       journal_queue.len > journal_queue.max_len;
+}
+
+/**
+ * Check whether anyone is waiting for the journal queue to empty. If there are
+ * other waiters we must go after them to preserve write order.
+ */
+static inline bool
+journal_queue_has_waiters(void)
+{
+	return !rlist_empty(&journal_queue.waiters);
+}
+
+/** Yield until there's some space in the journal queue. */
+void
+journal_wait_queue(void);
+
+/** Set maximal journal queue size in bytes. */
+static inline void
+journal_queue_set_max_size(int64_t size)
+{
+	journal_queue.max_size = size;
+	if (journal_queue_has_waiters() && !journal_queue_is_full())
+		journal_queue_wakeup(false);
+}
+
+/** Set maximal journal queue length, in entries. */
+static inline void
+journal_queue_set_max_len(int64_t len)
+{
+	journal_queue.max_len = len;
+	if (journal_queue_has_waiters() && !journal_queue_is_full())
+		journal_queue_wakeup(false);
+}
+
+/** Increase queue size on a new write request. */
+static inline void
+journal_queue_on_append(struct journal_entry *entry)
+{
+	journal_queue.len++;
+	journal_queue.size += entry->approx_len;
+}
+
+/** Decrease queue size once write request is complete. */
+static inline void
+journal_queue_on_complete(struct journal_entry *entry)
+{
+	journal_queue.len--;
+	journal_queue.size -= entry->approx_len;
+	assert(journal_queue.len >= 0);
+	assert(journal_queue.size >= 0);
+}
+
 /**
  * Complete asynchronous write.
  */
@@ -131,15 +237,14 @@ static inline void
 journal_async_complete(struct journal_entry *entry)
 {
 	assert(entry->write_async_cb != NULL);
+
+	journal_queue_on_complete(entry);
+	if (journal_queue_has_waiters() && !journal_queue_is_full())
+		journal_queue_wakeup(false);
+
 	entry->write_async_cb(entry);
 }
 
-/**
- * Depending on the step of recovery and instance configuration
- * points at a concrete implementation of the journal.
- */
-extern struct journal *current_journal;
-
 /**
  * Write a single entry to the journal in synchronous way.
  *
@@ -148,6 +253,18 @@ extern struct journal *current_journal;
 static inline int
 journal_write(struct journal_entry *entry)
 {
+	if (journal_queue_has_waiters()) {
+		/*
+		 * It's a synchronous write, so it's fine to wait a bit more for
+		 * everyone else to be written. They'll wake us up back
+		 * afterwards.
+		 */
+		journal_queue_wakeup(true);
+		journal_wait_queue();
+	}
+
+	journal_queue_on_append(entry);
+
 	return current_journal->write(current_journal, entry);
 }
 
@@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry)
 static inline int
 journal_write_async(struct journal_entry *entry)
 {
+	/*
+	 * It's the job of the caller to check whether the queue is full prior
+	 * to submitting the request.
+	 */
+	assert(!journal_queue_is_full() || journal_queue.is_ready);
+	journal_queue_on_append(entry);
+
 	return current_journal->write_async(current_journal, entry);
 }
 
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 2d3ccbf0e..35f410710 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -164,6 +164,22 @@ lbox_cfg_set_checkpoint_wal_threshold(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_cfg_set_wal_queue_max_size(struct lua_State *L)
+{
+	if (box_set_wal_queue_max_size() != 0)
+		luaT_error(L);
+	return 0;
+}
+
+static int
+lbox_cfg_set_wal_queue_max_len(struct lua_State *L)
+{
+	if (box_set_wal_queue_max_len() != 0)
+		luaT_error(L);
+	return 0;
+}
+
 static int
 lbox_cfg_set_read_only(struct lua_State *L)
 {
@@ -399,6 +415,8 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count},
 		{"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval},
 		{"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold},
+		{"cfg_set_wal_queue_max_size", lbox_cfg_set_wal_queue_max_size},
+		{"cfg_set_wal_queue_max_len", lbox_cfg_set_wal_queue_max_len},
 		{"cfg_set_read_only", lbox_cfg_set_read_only},
 		{"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory},
 		{"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size},
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 574c8bef4..c11a9e103 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -71,6 +71,8 @@ local default_cfg = {
     wal_mode            = "write",
     wal_max_size        = 256 * 1024 * 1024,
     wal_dir_rescan_delay= 2,
+    wal_queue_max_size  = 0,
+    wal_queue_max_len   = 0,
     force_recovery      = false,
     replication         = nil,
     instance_uuid       = nil,
@@ -163,6 +165,8 @@ local template_cfg = {
     coredump            = 'boolean',
     checkpoint_interval = 'number',
     checkpoint_wal_threshold = 'number',
+    wal_queue_max_size  = 'number',
+    wal_queue_max_len   = 'number',
     checkpoint_count    = 'number',
     read_only           = 'boolean',
     hot_standby         = 'boolean',
@@ -277,6 +281,8 @@ local dynamic_cfg = {
     checkpoint_count        = private.cfg_set_checkpoint_count,
     checkpoint_interval     = private.cfg_set_checkpoint_interval,
     checkpoint_wal_threshold = private.cfg_set_checkpoint_wal_threshold,
+    wal_queue_max_size      = private.cfg_set_wal_queue_max_size,
+    wal_queue_max_len       = private.cfg_set_wal_queue_max_len,
     worker_pool_threads     = private.cfg_set_worker_pool_threads,
     feedback_enabled        = ifdef_feedback_set_params,
     feedback_crashinfo      = ifdef_feedback_set_params,
diff --git a/src/box/wal.c b/src/box/wal.c
index 937d47ba9..4a0381cf4 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -765,6 +765,20 @@ wal_set_checkpoint_threshold(int64_t threshold)
 	fiber_set_cancellable(cancellable);
 }
 
+void
+wal_set_queue_max_size(int64_t size)
+{
+	assert(&wal_writer_singleton.base == current_journal);
+	journal_queue_set_max_size(size);
+}
+
+void
+wal_set_queue_max_len(int64_t len)
+{
+	assert(&wal_writer_singleton.base == current_journal);
+	journal_queue_set_max_len(len);
+}
+
 struct wal_gc_msg
 {
 	struct cbus_call_msg base;
diff --git a/src/box/wal.h b/src/box/wal.h
index ca43dc6eb..1db32f66f 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -237,6 +237,20 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint);
 void
 wal_set_checkpoint_threshold(int64_t threshold);
 
+/**
+ * Set the pending write limit in bytes. Once the limit is reached, new
+ * writes are blocked until some previous writes succeed.
+ */
+void
+wal_set_queue_max_size(int64_t size);
+
+/**
+ * Set the pending write limit in journal entries. Once the limit is reached,
+ * new writes are blocked until some previous writes succeeed.
+ */
+void
+wal_set_queue_max_len(int64_t len);
+
 /**
  * Remove WAL files that are not needed by consumers reading
  * rows at @vclock or newer.
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 16c5b01d2..7a224e50e 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -56,6 +56,8 @@ wal_dir:.
 wal_dir_rescan_delay:2
 wal_max_size:268435456
 wal_mode:write
+wal_queue_max_len:0
+wal_queue_max_size:0
 worker_pool_threads:4
 --
 -- Test insert from detached fiber
diff --git a/test/box-tap/cfg.test.lua b/test/box-tap/cfg.test.lua
index a577f023d..3276ddf64 100755
--- a/test/box-tap/cfg.test.lua
+++ b/test/box-tap/cfg.test.lua
@@ -6,7 +6,7 @@ local socket = require('socket')
 local fio = require('fio')
 local uuid = require('uuid')
 local msgpack = require('msgpack')
-test:plan(108)
+test:plan(110)
 
 --------------------------------------------------------------------------------
 -- Invalid values
@@ -49,6 +49,8 @@ invalid('vinyl_run_count_per_level', 0)
 invalid('vinyl_run_size_ratio', 1)
 invalid('vinyl_bloom_fpr', 0)
 invalid('vinyl_bloom_fpr', 1.1)
+invalid('wal_queue_max_size', -1)
+invalid('wal_queue_max_len', -1)
 
 local function invalid_combinations(name, val)
     local status, result = pcall(box.cfg, val)
diff --git a/test/box/admin.result b/test/box/admin.result
index 05debe673..c818f4f9f 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -133,6 +133,10 @@ cfg_filter(box.cfg)
     - 268435456
   - - wal_mode
     - write
+  - - wal_queue_max_len
+    - 0
+  - - wal_queue_max_size
+    - 0
   - - worker_pool_threads
     - 4
 ...
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 22a720c2c..19f322e7d 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -121,6 +121,10 @@ cfg_filter(box.cfg)
  |     - 268435456
  |   - - wal_mode
  |     - write
+ |   - - wal_queue_max_len
+ |     - 0
+ |   - - wal_queue_max_size
+ |     - 0
  |   - - worker_pool_threads
  |     - 4
  | ...
@@ -236,6 +240,10 @@ cfg_filter(box.cfg)
  |     - 268435456
  |   - - wal_mode
  |     - write
+ |   - - wal_queue_max_len
+ |     - 0
+ |   - - wal_queue_max_size
+ |     - 0
  |   - - worker_pool_threads
  |     - 4
  | ...
diff --git a/test/replication/gh-5536-wal-limit.result b/test/replication/gh-5536-wal-limit.result
new file mode 100644
index 000000000..f7799baa8
--- /dev/null
+++ b/test/replication/gh-5536-wal-limit.result
@@ -0,0 +1,132 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+
+--
+-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so
+-- that appliers stop reading new transactions from master once the queue is
+-- full.
+--
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+_ = box.schema.space.create('test')
+ | ---
+ | ...
+_ = box.space.test:create_index('pk')
+ | ---
+ | ...
+
+replication_timeout = box.cfg.replication_timeout
+ | ---
+ | ...
+box.cfg{replication_timeout=1000}
+ | ---
+ | ...
+
+test_run:cmd('create server replica with rpl_master=default,\
+              script="replication/replica.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica with wait=True, wait_load=True')
+ | ---
+ | - true
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+-- Huge replication timeout to not cause reconnects while applier is blocked.
+-- Tiny queue size (in bytes) to allow exactly one queue entry at a time.
+box.cfg{wal_queue_max_size=1, replication_timeout=1000}
+ | ---
+ | ...
+write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT")
+ | ---
+ | ...
+-- Block WAL writes so that we may test queue overflow.
+box.error.injection.set("ERRINJ_WAL_DELAY", true)
+ | ---
+ | - ok
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+for i = 1,10 do box.space.test:insert{i} end
+ | ---
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+-- Wait for replication. Cannot rely on lsn bump here. It won't happen while
+-- WAL is blocked.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\
+end)
+ | ---
+ | - true
+ | ...
+require('fiber').sleep(0.5)
+ | ---
+ | ...
+-- Only one entry fits when the limit is small.
+assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1)
+ | ---
+ | - true
+ | ...
+box.error.injection.set("ERRINJ_WAL_DELAY", false)
+ | ---
+ | - ok
+ | ...
+
+-- Once the block is removed everything is written.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\
+end)
+ | ---
+ | - true
+ | ...
+assert(box.space.test:count() == 10)
+ | ---
+ | - true
+ | ...
+assert(box.info.replication[1].upstream.status == 'follow')
+ | ---
+ | - true
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+-- Cleanup.
+box.cfg{replication_timeout=replication_timeout}
+ | ---
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+box.space.test:drop()
+ | ---
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
+
diff --git a/test/replication/gh-5536-wal-limit.test.lua b/test/replication/gh-5536-wal-limit.test.lua
new file mode 100644
index 000000000..1e7d61ff7
--- /dev/null
+++ b/test/replication/gh-5536-wal-limit.test.lua
@@ -0,0 +1,58 @@
+test_run = require('test_run').new()
+fiber = require('fiber')
+
+--
+-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so
+-- that appliers stop reading new transactions from master once the queue is
+-- full.
+--
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('test')
+_ = box.space.test:create_index('pk')
+
+replication_timeout = box.cfg.replication_timeout
+box.cfg{replication_timeout=1000}
+
+test_run:cmd('create server replica with rpl_master=default,\
+              script="replication/replica.lua"')
+test_run:cmd('start server replica with wait=True, wait_load=True')
+
+test_run:switch('replica')
+-- Huge replication timeout to not cause reconnects while applier is blocked.
+-- Tiny queue size (in bytes) to allow exactly one queue entry at a time.
+box.cfg{wal_queue_max_size=1, replication_timeout=1000}
+write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT")
+-- Block WAL writes so that we may test queue overflow.
+box.error.injection.set("ERRINJ_WAL_DELAY", true)
+
+test_run:switch('default')
+
+for i = 1,10 do box.space.test:insert{i} end
+
+test_run:switch('replica')
+-- Wait for replication. Cannot rely on lsn bump here. It won't happen while
+-- WAL is blocked.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\
+end)
+require('fiber').sleep(0.5)
+-- Only one entry fits when the limit is small.
+assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1)
+box.error.injection.set("ERRINJ_WAL_DELAY", false)
+
+-- Once the block is removed everything is written.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\
+end)
+assert(box.space.test:count() == 10)
+assert(box.info.replication[1].upstream.status == 'follow')
+
+test_run:switch('default')
+
+-- Cleanup.
+box.cfg{replication_timeout=replication_timeout}
+test_run:cmd('stop server replica')
+test_run:cmd('delete server replica')
+box.space.test:drop()
+box.schema.user.revoke('guest', 'replication')
+
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index c80430afc..7e7004592 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -37,6 +37,7 @@
     "gh-4928-tx-boundaries.test.lua": {},
     "gh-5440-qsync-ro.test.lua": {},
     "gh-5435-qsync-clear-synchro-queue-commit-all.test.lua": {},
+    "gh-5536-wal-limit.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index e4812df37..89abfabff 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
 script =  master.lua
 description = tarantool/box, replication
 disabled = consistent.test.lua
-release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua
+release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua
 config = suite.cfg
 lua_libs = lua/fast_replica.lua lua/rlimit.lua
 use_unix_sockets = True
-- 
2.24.3 (Apple Git-128)


^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-02-24 19:35 [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes Serge Petrenko via Tarantool-patches
@ 2021-02-24 19:40 ` Serge Petrenko via Tarantool-patches
  2021-02-25 13:05 ` Konstantin Osipov via Tarantool-patches
                   ` (3 subsequent siblings)
  4 siblings, 0 replies; 30+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-02-24 19:40 UTC (permalink / raw)
  To: gorcunov, v.shpilevoy; +Cc: tarantool-patches



24.02.2021 22:35, Serge Petrenko пишет:
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 937d47ba9..4a0381cf4 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -765,6 +765,20 @@ wal_set_checkpoint_threshold(int64_t threshold)
>   	fiber_set_cancellable(cancellable);
>   }
>   
> +void
> +wal_set_queue_max_size(int64_t size)
> +{
> +	assert(&wal_writer_singleton.base == current_journal);
> +	journal_queue_set_max_size(size);
> +}
> +
> +void
> +wal_set_queue_max_len(int64_t len)
> +{
> +	assert(&wal_writer_singleton.base == current_journal);
> +	journal_queue_set_max_len(len);
> +}
> +
A small fix:
these asserts aren't needed anymore.
So,

diff --git a/src/box/wal.c b/src/box/wal.c
index 4a0381cf4..328ab092d 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -768,14 +768,12 @@ wal_set_checkpoint_threshold(int64_t threshold)
  void
  wal_set_queue_max_size(int64_t size)
  {
-       assert(&wal_writer_singleton.base == current_journal);
         journal_queue_set_max_size(size);
  }

  void
  wal_set_queue_max_len(int64_t len)
  {
-       assert(&wal_writer_singleton.base == current_journal);
         journal_queue_set_max_len(len);
  }

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-02-24 19:35 [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes Serge Petrenko via Tarantool-patches
  2021-02-24 19:40 ` Serge Petrenko via Tarantool-patches
@ 2021-02-25 13:05 ` Konstantin Osipov via Tarantool-patches
  2021-02-26  0:57   ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-01 19:15   ` Serge Petrenko via Tarantool-patches
  2021-02-26  0:56 ` Vladislav Shpilevoy via Tarantool-patches
                   ` (2 subsequent siblings)
  4 siblings, 2 replies; 30+ messages in thread
From: Konstantin Osipov via Tarantool-patches @ 2021-02-25 13:05 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: tarantool-patches, v.shpilevoy

* Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org> [21/02/24 22:39]:

Looks like a simple counting semaphore. I don't see why it has
to be specific to class journal, more like part of lib/core. 

https://en.cppreference.com/w/cpp/thread/counting_semaphore

I'd also question the place where you decided to put this gate.
The source of the issue is async requests, not WAL, which worked
fine in absence of async requests. So it's async requests that
should be gated, not WAL. Otherwise your overflow will just spill
out someplace else.

> Since the introduction of asynchronous commit, which doesn't wait for a
> WAL write to succeed, it's quite easy to clog WAL with huge amounts
> write requests. For now, it's only possible from an applier, since it's
> the only user of async commit at the moment.
> 
> This happens when replica is syncing with master and reads new
> transactions at a pace higher than it can write them to WAL (see docbot
> request for detailed explanation).
> 
> To ameliorate such behavior, we need to introduce some limit on
> not-yet-finished WAL write requests. This is what this commit is trying
> to do.
> Two new counters are added to wal writer: queue_size (in bytes) and
> queue_len (in wal messages) together with configuration settings:
> `wal_queue_max_size` and `wal_queue_max_len`.
> Size and length are increased on every new submitted request, and are
> decreased once the tx receives a confirmation that a specific request
> was written.
> Actually, the limits are added to an abstract journal, but take effect
> only for wal writer.
> 
> Once size or length reach their maximum values, applier is blocked until
> some of the write requests are finished.
> 
> The size limit isn't strict, i.e. if there's at least one free byte, the
> whole write request fits and no blocking is involved.
> 
> The feature is ready for `box.commit{is_async=true}`. Once it's
> implemented, it should check whether the queue is full and let the user
> decide what to do next. Either wait or roll the tx back.
> 
> Part of #5536
> 
> @TarantoolBot document
> Title: new configuration options: 'wal_queue_max_size', 'wal_queue_max_len'
> 
> `wal_queue_max_size` and `wal_queue_max_len` put a limit on the amount
> of concurrent write requests submitted to WAL.
> `wal_queue_max_size` is measured in number of bytes to be written (0
> means unlimited), and `wal_queue_max_len` is measured in number of
> transactions, 0 meaning unlimited.
> These options only affect replica behaviour at the moment, and default
> to 0. They limit the pace at which replica reads new transactions from
> master.
> 
> Here's when these options come in handy:
> Imagine such a situation: there are 2 servers, a master and a replica,
> and the replica is down for some period of time. While the replica is
> down, the master serves requests at a reasonable pace, possibly close to
> its WAL throughput limit. Once the replica reconnects, it has to receive
> all the data master has piled up. Now there's no limit in speed at which
> master sends the data to replica, and there's no limit at which
> replica's applier submits corresponding write requests to WAL. This
> leads to a situation when replica's WAL is never in time to serve the
> requests and the amount of pending requests is constantly growing.
> There's no limit for memory WAL write requests take, and this clogging
> of WAL write queue may even lead to replica using up all the available
> memory.
> 
> Now, when either `wal_queue_max_size` or `wal_queue_max_len` or both are
> set, appliers will stop reading new transactions once the limit is
> reached. This will let WAL process all the requests that have piled up
> and free all the excess memory.
> ---
> https://github.com/tarantool/tarantool/issues/5536
> https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom
> 
> Changes in v3:
>   - Extract queue internals into new struct journal_queue.
>   - Move is_ready flag to queue from queue entry.
>   - Minor refactoring.
> 
> Changes in v2:
>   - Move queue logic to journal.
> 
>  src/box/applier.cc                          |   9 ++
>  src/box/box.cc                              |  52 ++++++++
>  src/box/box.h                               |   2 +
>  src/box/journal.c                           |  71 ++++++++++
>  src/box/journal.h                           | 136 +++++++++++++++++++-
>  src/box/lua/cfg.cc                          |  18 +++
>  src/box/lua/load_cfg.lua                    |   6 +
>  src/box/wal.c                               |  14 ++
>  src/box/wal.h                               |  14 ++
>  test/app-tap/init_script.result             |   2 +
>  test/box-tap/cfg.test.lua                   |   4 +-
>  test/box/admin.result                       |   4 +
>  test/box/cfg.result                         |   8 ++
>  test/replication/gh-5536-wal-limit.result   | 132 +++++++++++++++++++
>  test/replication/gh-5536-wal-limit.test.lua |  58 +++++++++
>  test/replication/suite.cfg                  |   1 +
>  test/replication/suite.ini                  |   2 +-
>  17 files changed, 525 insertions(+), 8 deletions(-)
>  create mode 100644 test/replication/gh-5536-wal-limit.result
>  create mode 100644 test/replication/gh-5536-wal-limit.test.lua
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 553db76fc..27ddd0f29 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>  		goto success;
>  	}
>  
> +	/*
> +	 * Do not spam WAL with excess write requests, let it process what's
> +	 * piled up first.
> +	 * This is done before opening the transaction to avoid problems with
> +	 * yielding inside it.
> +	 */
> +	if (journal_queue_is_full() || journal_queue_has_waiters())
> +		journal_wait_queue();
> +
>  	/**
>  	 * Explicitly begin the transaction so that we can
>  	 * control fiber->gc life cycle and, in case of apply
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 26cbe8aab..9a3b092d0 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -754,6 +754,34 @@ box_check_wal_mode(const char *mode_name)
>  	return (enum wal_mode) mode;
>  }
>  
> +static int64_t
> +box_check_wal_queue_max_len(void)
> +{
> +	int64_t len = cfg_geti64("wal_queue_max_len");
> +	if (len < 0) {
> +		diag_set(ClientError, ER_CFG, "wal_queue_max_len",
> +			 "wal_queue_max_len must be >= 0");
> +	}
> +	/* Unlimited. */
> +	if (len == 0)
> +		len = INT64_MAX;
> +	return len;
> +}
> +
> +static int64_t
> +box_check_wal_queue_max_size(void)
> +{
> +	int64_t size = cfg_geti64("wal_queue_max_size");
> +	if (size < 0) {
> +		diag_set(ClientError, ER_CFG, "wal_queue_max_size",
> +			 "wal_queue_max_size must be >= 0");
> +	}
> +	/* Unlimited. */
> +	if (size == 0)
> +		size = INT64_MAX;
> +	return size;
> +}
> +
>  static void
>  box_check_readahead(int readahead)
>  {
> @@ -875,6 +903,10 @@ box_check_config(void)
>  	box_check_checkpoint_count(cfg_geti("checkpoint_count"));
>  	box_check_wal_max_size(cfg_geti64("wal_max_size"));
>  	box_check_wal_mode(cfg_gets("wal_mode"));
> +	if (box_check_wal_queue_max_size() < 0)
> +		diag_raise();
> +	if (box_check_wal_queue_max_len() < 0)
> +		diag_raise();
>  	if (box_check_memory_quota("memtx_memory") < 0)
>  		diag_raise();
>  	box_check_memtx_min_tuple_size(cfg_geti64("memtx_min_tuple_size"));
> @@ -1411,6 +1443,26 @@ box_set_checkpoint_wal_threshold(void)
>  	wal_set_checkpoint_threshold(threshold);
>  }
>  
> +int
> +box_set_wal_queue_max_size(void)
> +{
> +	int64_t size = box_check_wal_queue_max_size();
> +	if (size < 0)
> +		return -1;
> +	wal_set_queue_max_size(size);
> +	return 0;
> +}
> +
> +int
> +box_set_wal_queue_max_len(void)
> +{
> +	int64_t len = box_check_wal_queue_max_len();
> +	if (len < 0)
> +		return -1;
> +	wal_set_queue_max_len(len);
> +	return 0;
> +}
> +
>  void
>  box_set_vinyl_memory(void)
>  {
> diff --git a/src/box/box.h b/src/box/box.h
> index b68047a95..4f5b4b617 100644
> --- a/src/box/box.h
> +++ b/src/box/box.h
> @@ -239,6 +239,8 @@ void box_set_readahead(void);
>  void box_set_checkpoint_count(void);
>  void box_set_checkpoint_interval(void);
>  void box_set_checkpoint_wal_threshold(void);
> +int box_set_wal_queue_max_size(void);
> +int box_set_wal_queue_max_len(void);
>  void box_set_memtx_memory(void);
>  void box_set_memtx_max_tuple_size(void);
>  void box_set_vinyl_memory(void);
> diff --git a/src/box/journal.c b/src/box/journal.c
> index cb320b557..4c31a3dfe 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -34,6 +34,16 @@
>  
>  struct journal *current_journal = NULL;
>  
> +struct journal_queue journal_queue = {
> +	.max_size = INT64_MAX,
> +	.size = 0,
> +	.max_len = INT64_MAX,
> +	.len = 0,
> +	.waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters),
> +	.is_awake = false,
> +	.is_ready = false,
> +};
> +
>  struct journal_entry *
>  journal_entry_new(size_t n_rows, struct region *region,
>  		  journal_write_async_f write_async_cb,
> @@ -55,3 +65,64 @@ journal_entry_new(size_t n_rows, struct region *region,
>  			     complete_data);
>  	return entry;
>  }
> +
> +struct journal_queue_entry {
> +	/** The fiber waiting for queue space to free. */
> +	struct fiber *fiber;
> +	/** A link in all waiting fibers list. */
> +	struct rlist in_queue;
> +};
> +
> +/**
> + * Wake up the first waiter in the journal queue.
> + */
> +static inline void
> +journal_queue_wakeup_first(void)
> +{
> +	struct journal_queue_entry *e;
> +	if (rlist_empty(&journal_queue.waiters))
> +		goto out;
> +	/*
> +	 * When the queue isn't forcefully emptied, no need to wake everyone
> +	 * else up until there's some free space.
> +	 */
> +	if (!journal_queue.is_ready && journal_queue_is_full())
> +		goto out;
> +	e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e),
> +			in_queue);
> +	fiber_wakeup(e->fiber);
> +	return;
> +out:
> +	journal_queue.is_awake = false;
> +	journal_queue.is_ready = false;
> +}
> +
> +void
> +journal_queue_wakeup(bool force_ready)
> +{
> +	assert(!rlist_empty(&journal_queue.waiters));
> +	if (journal_queue.is_awake)
> +		return;
> +	journal_queue.is_awake = true;
> +	journal_queue.is_ready = force_ready;
> +	journal_queue_wakeup_first();
> +}
> +
> +void
> +journal_wait_queue(void)
> +{
> +	struct journal_queue_entry entry = {
> +		.fiber = fiber(),
> +	};
> +	rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue);
> +	/*
> +	 * Will be waken up by either queue emptying or a synchronous write.
> +	 */
> +	while (journal_queue_is_full() && !journal_queue.is_ready)
> +		fiber_yield();
> +
> +	assert(&entry.in_queue == rlist_first(&journal_queue.waiters));
> +	rlist_del(&entry.in_queue);
> +
> +	journal_queue_wakeup_first();
> +}
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 5d8d5a726..8fec5b27e 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -109,6 +109,36 @@ journal_entry_new(size_t n_rows, struct region *region,
>  		  journal_write_async_f write_async_cb,
>  		  void *complete_data);
>  
> +struct journal_queue {
> +	/** Maximal size of entries enqueued in journal (in bytes). */
> +	int64_t max_size;
> +	/** Current approximate size of journal queue. */
> +	int64_t size;
> +	/** Maximal allowed length of journal queue, in entries. */
> +	int64_t max_len;
> +	/** Current journal queue length. */
> +	int64_t len;
> +	/**
> +	 * The fibers waiting for some space to free in journal queue.
> +	 * Once some space is freed they will be waken up in the same order they
> +	 * entered the queue.
> +	 */
> +	struct rlist waiters;
> +	/**
> +	 * Whether the queue is being woken or not. Used to avoid multiple
> +	 * concurrent wake-ups.
> +	 */
> +	bool is_awake;
> +	/**
> +	 * A flag used to tell the waiting fibers they may proceed even if the
> +	 * queue is full, i.e. force them to submit a write request.
> +	 */
> +	bool is_ready;
> +};
> +
> +/** A single queue for all journal instances. */
> +extern struct journal_queue journal_queue;
> +
>  /**
>   * An API for an abstract journal for all transactions of this
>   * instance, as well as for multiple instances in case of
> @@ -124,6 +154,82 @@ struct journal {
>  		     struct journal_entry *entry);
>  };
>  
> +/**
> + * Depending on the step of recovery and instance configuration
> + * points at a concrete implementation of the journal.
> + */
> +extern struct journal *current_journal;
> +
> +/**
> + * Wake the journal queue up.
> + * @param force_ready whether waiters should proceed even if the queue is still
> + *                    full.
> + */
> +void
> +journal_queue_wakeup(bool force_ready);
> +
> +/**
> + * Check whether any of the queue size limits is reached.
> + * If the queue is full, we must wait for some of the entries to be written
> + * before proceeding with a new asynchronous write request.
> + */
> +static inline bool
> +journal_queue_is_full(void)
> +{
> +	return journal_queue.size > journal_queue.max_size ||
> +	       journal_queue.len > journal_queue.max_len;
> +}
> +
> +/**
> + * Check whether anyone is waiting for the journal queue to empty. If there are
> + * other waiters we must go after them to preserve write order.
> + */
> +static inline bool
> +journal_queue_has_waiters(void)
> +{
> +	return !rlist_empty(&journal_queue.waiters);
> +}
> +
> +/** Yield until there's some space in the journal queue. */
> +void
> +journal_wait_queue(void);
> +
> +/** Set maximal journal queue size in bytes. */
> +static inline void
> +journal_queue_set_max_size(int64_t size)
> +{
> +	journal_queue.max_size = size;
> +	if (journal_queue_has_waiters() && !journal_queue_is_full())
> +		journal_queue_wakeup(false);
> +}
> +
> +/** Set maximal journal queue length, in entries. */
> +static inline void
> +journal_queue_set_max_len(int64_t len)
> +{
> +	journal_queue.max_len = len;
> +	if (journal_queue_has_waiters() && !journal_queue_is_full())
> +		journal_queue_wakeup(false);
> +}
> +
> +/** Increase queue size on a new write request. */
> +static inline void
> +journal_queue_on_append(struct journal_entry *entry)
> +{
> +	journal_queue.len++;
> +	journal_queue.size += entry->approx_len;
> +}
> +
> +/** Decrease queue size once write request is complete. */
> +static inline void
> +journal_queue_on_complete(struct journal_entry *entry)
> +{
> +	journal_queue.len--;
> +	journal_queue.size -= entry->approx_len;
> +	assert(journal_queue.len >= 0);
> +	assert(journal_queue.size >= 0);
> +}
> +
>  /**
>   * Complete asynchronous write.
>   */
> @@ -131,15 +237,14 @@ static inline void
>  journal_async_complete(struct journal_entry *entry)
>  {
>  	assert(entry->write_async_cb != NULL);
> +
> +	journal_queue_on_complete(entry);
> +	if (journal_queue_has_waiters() && !journal_queue_is_full())
> +		journal_queue_wakeup(false);
> +
>  	entry->write_async_cb(entry);
>  }
>  
> -/**
> - * Depending on the step of recovery and instance configuration
> - * points at a concrete implementation of the journal.
> - */
> -extern struct journal *current_journal;
> -
>  /**
>   * Write a single entry to the journal in synchronous way.
>   *
> @@ -148,6 +253,18 @@ extern struct journal *current_journal;
>  static inline int
>  journal_write(struct journal_entry *entry)
>  {
> +	if (journal_queue_has_waiters()) {
> +		/*
> +		 * It's a synchronous write, so it's fine to wait a bit more for
> +		 * everyone else to be written. They'll wake us up back
> +		 * afterwards.
> +		 */
> +		journal_queue_wakeup(true);
> +		journal_wait_queue();
> +	}
> +
> +	journal_queue_on_append(entry);
> +
>  	return current_journal->write(current_journal, entry);
>  }
>  
> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry)
>  static inline int
>  journal_write_async(struct journal_entry *entry)
>  {
> +	/*
> +	 * It's the job of the caller to check whether the queue is full prior
> +	 * to submitting the request.
> +	 */
> +	assert(!journal_queue_is_full() || journal_queue.is_ready);
> +	journal_queue_on_append(entry);
> +
>  	return current_journal->write_async(current_journal, entry);
>  }
>  
> diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
> index 2d3ccbf0e..35f410710 100644
> --- a/src/box/lua/cfg.cc
> +++ b/src/box/lua/cfg.cc
> @@ -164,6 +164,22 @@ lbox_cfg_set_checkpoint_wal_threshold(struct lua_State *L)
>  	return 0;
>  }
>  
> +static int
> +lbox_cfg_set_wal_queue_max_size(struct lua_State *L)
> +{
> +	if (box_set_wal_queue_max_size() != 0)
> +		luaT_error(L);
> +	return 0;
> +}
> +
> +static int
> +lbox_cfg_set_wal_queue_max_len(struct lua_State *L)
> +{
> +	if (box_set_wal_queue_max_len() != 0)
> +		luaT_error(L);
> +	return 0;
> +}
> +
>  static int
>  lbox_cfg_set_read_only(struct lua_State *L)
>  {
> @@ -399,6 +415,8 @@ box_lua_cfg_init(struct lua_State *L)
>  		{"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count},
>  		{"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval},
>  		{"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold},
> +		{"cfg_set_wal_queue_max_size", lbox_cfg_set_wal_queue_max_size},
> +		{"cfg_set_wal_queue_max_len", lbox_cfg_set_wal_queue_max_len},
>  		{"cfg_set_read_only", lbox_cfg_set_read_only},
>  		{"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory},
>  		{"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size},
> diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
> index 574c8bef4..c11a9e103 100644
> --- a/src/box/lua/load_cfg.lua
> +++ b/src/box/lua/load_cfg.lua
> @@ -71,6 +71,8 @@ local default_cfg = {
>      wal_mode            = "write",
>      wal_max_size        = 256 * 1024 * 1024,
>      wal_dir_rescan_delay= 2,
> +    wal_queue_max_size  = 0,
> +    wal_queue_max_len   = 0,
>      force_recovery      = false,
>      replication         = nil,
>      instance_uuid       = nil,
> @@ -163,6 +165,8 @@ local template_cfg = {
>      coredump            = 'boolean',
>      checkpoint_interval = 'number',
>      checkpoint_wal_threshold = 'number',
> +    wal_queue_max_size  = 'number',
> +    wal_queue_max_len   = 'number',
>      checkpoint_count    = 'number',
>      read_only           = 'boolean',
>      hot_standby         = 'boolean',
> @@ -277,6 +281,8 @@ local dynamic_cfg = {
>      checkpoint_count        = private.cfg_set_checkpoint_count,
>      checkpoint_interval     = private.cfg_set_checkpoint_interval,
>      checkpoint_wal_threshold = private.cfg_set_checkpoint_wal_threshold,
> +    wal_queue_max_size      = private.cfg_set_wal_queue_max_size,
> +    wal_queue_max_len       = private.cfg_set_wal_queue_max_len,
>      worker_pool_threads     = private.cfg_set_worker_pool_threads,
>      feedback_enabled        = ifdef_feedback_set_params,
>      feedback_crashinfo      = ifdef_feedback_set_params,
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 937d47ba9..4a0381cf4 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -765,6 +765,20 @@ wal_set_checkpoint_threshold(int64_t threshold)
>  	fiber_set_cancellable(cancellable);
>  }
>  
> +void
> +wal_set_queue_max_size(int64_t size)
> +{
> +	assert(&wal_writer_singleton.base == current_journal);
> +	journal_queue_set_max_size(size);
> +}
> +
> +void
> +wal_set_queue_max_len(int64_t len)
> +{
> +	assert(&wal_writer_singleton.base == current_journal);
> +	journal_queue_set_max_len(len);
> +}
> +
>  struct wal_gc_msg
>  {
>  	struct cbus_call_msg base;
> diff --git a/src/box/wal.h b/src/box/wal.h
> index ca43dc6eb..1db32f66f 100644
> --- a/src/box/wal.h
> +++ b/src/box/wal.h
> @@ -237,6 +237,20 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint);
>  void
>  wal_set_checkpoint_threshold(int64_t threshold);
>  
> +/**
> + * Set the pending write limit in bytes. Once the limit is reached, new
> + * writes are blocked until some previous writes succeed.
> + */
> +void
> +wal_set_queue_max_size(int64_t size);
> +
> +/**
> + * Set the pending write limit in journal entries. Once the limit is reached,
> + * new writes are blocked until some previous writes succeeed.
> + */
> +void
> +wal_set_queue_max_len(int64_t len);
> +
>  /**
>   * Remove WAL files that are not needed by consumers reading
>   * rows at @vclock or newer.
> diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
> index 16c5b01d2..7a224e50e 100644
> --- a/test/app-tap/init_script.result
> +++ b/test/app-tap/init_script.result
> @@ -56,6 +56,8 @@ wal_dir:.
>  wal_dir_rescan_delay:2
>  wal_max_size:268435456
>  wal_mode:write
> +wal_queue_max_len:0
> +wal_queue_max_size:0
>  worker_pool_threads:4
>  --
>  -- Test insert from detached fiber
> diff --git a/test/box-tap/cfg.test.lua b/test/box-tap/cfg.test.lua
> index a577f023d..3276ddf64 100755
> --- a/test/box-tap/cfg.test.lua
> +++ b/test/box-tap/cfg.test.lua
> @@ -6,7 +6,7 @@ local socket = require('socket')
>  local fio = require('fio')
>  local uuid = require('uuid')
>  local msgpack = require('msgpack')
> -test:plan(108)
> +test:plan(110)
>  
>  --------------------------------------------------------------------------------
>  -- Invalid values
> @@ -49,6 +49,8 @@ invalid('vinyl_run_count_per_level', 0)
>  invalid('vinyl_run_size_ratio', 1)
>  invalid('vinyl_bloom_fpr', 0)
>  invalid('vinyl_bloom_fpr', 1.1)
> +invalid('wal_queue_max_size', -1)
> +invalid('wal_queue_max_len', -1)
>  
>  local function invalid_combinations(name, val)
>      local status, result = pcall(box.cfg, val)
> diff --git a/test/box/admin.result b/test/box/admin.result
> index 05debe673..c818f4f9f 100644
> --- a/test/box/admin.result
> +++ b/test/box/admin.result
> @@ -133,6 +133,10 @@ cfg_filter(box.cfg)
>      - 268435456
>    - - wal_mode
>      - write
> +  - - wal_queue_max_len
> +    - 0
> +  - - wal_queue_max_size
> +    - 0
>    - - worker_pool_threads
>      - 4
>  ...
> diff --git a/test/box/cfg.result b/test/box/cfg.result
> index 22a720c2c..19f322e7d 100644
> --- a/test/box/cfg.result
> +++ b/test/box/cfg.result
> @@ -121,6 +121,10 @@ cfg_filter(box.cfg)
>   |     - 268435456
>   |   - - wal_mode
>   |     - write
> + |   - - wal_queue_max_len
> + |     - 0
> + |   - - wal_queue_max_size
> + |     - 0
>   |   - - worker_pool_threads
>   |     - 4
>   | ...
> @@ -236,6 +240,10 @@ cfg_filter(box.cfg)
>   |     - 268435456
>   |   - - wal_mode
>   |     - write
> + |   - - wal_queue_max_len
> + |     - 0
> + |   - - wal_queue_max_size
> + |     - 0
>   |   - - worker_pool_threads
>   |     - 4
>   | ...
> diff --git a/test/replication/gh-5536-wal-limit.result b/test/replication/gh-5536-wal-limit.result
> new file mode 100644
> index 000000000..f7799baa8
> --- /dev/null
> +++ b/test/replication/gh-5536-wal-limit.result
> @@ -0,0 +1,132 @@
> +-- test-run result file version 2
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +fiber = require('fiber')
> + | ---
> + | ...
> +
> +--
> +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so
> +-- that appliers stop reading new transactions from master once the queue is
> +-- full.
> +--
> +box.schema.user.grant('guest', 'replication')
> + | ---
> + | ...
> +_ = box.schema.space.create('test')
> + | ---
> + | ...
> +_ = box.space.test:create_index('pk')
> + | ---
> + | ...
> +
> +replication_timeout = box.cfg.replication_timeout
> + | ---
> + | ...
> +box.cfg{replication_timeout=1000}
> + | ---
> + | ...
> +
> +test_run:cmd('create server replica with rpl_master=default,\
> +              script="replication/replica.lua"')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('start server replica with wait=True, wait_load=True')
> + | ---
> + | - true
> + | ...
> +
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +-- Huge replication timeout to not cause reconnects while applier is blocked.
> +-- Tiny queue size (in bytes) to allow exactly one queue entry at a time.
> +box.cfg{wal_queue_max_size=1, replication_timeout=1000}
> + | ---
> + | ...
> +write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT")
> + | ---
> + | ...
> +-- Block WAL writes so that we may test queue overflow.
> +box.error.injection.set("ERRINJ_WAL_DELAY", true)
> + | ---
> + | - ok
> + | ...
> +
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +
> +for i = 1,10 do box.space.test:insert{i} end
> + | ---
> + | ...
> +
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +-- Wait for replication. Cannot rely on lsn bump here. It won't happen while
> +-- WAL is blocked.
> +test_run:wait_cond(function()\
> +    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\
> +end)
> + | ---
> + | - true
> + | ...
> +require('fiber').sleep(0.5)
> + | ---
> + | ...
> +-- Only one entry fits when the limit is small.
> +assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1)
> + | ---
> + | - true
> + | ...
> +box.error.injection.set("ERRINJ_WAL_DELAY", false)
> + | ---
> + | - ok
> + | ...
> +
> +-- Once the block is removed everything is written.
> +test_run:wait_cond(function()\
> +    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\
> +end)
> + | ---
> + | - true
> + | ...
> +assert(box.space.test:count() == 10)
> + | ---
> + | - true
> + | ...
> +assert(box.info.replication[1].upstream.status == 'follow')
> + | ---
> + | - true
> + | ...
> +
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +
> +-- Cleanup.
> +box.cfg{replication_timeout=replication_timeout}
> + | ---
> + | ...
> +test_run:cmd('stop server replica')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('delete server replica')
> + | ---
> + | - true
> + | ...
> +box.space.test:drop()
> + | ---
> + | ...
> +box.schema.user.revoke('guest', 'replication')
> + | ---
> + | ...
> +
> diff --git a/test/replication/gh-5536-wal-limit.test.lua b/test/replication/gh-5536-wal-limit.test.lua
> new file mode 100644
> index 000000000..1e7d61ff7
> --- /dev/null
> +++ b/test/replication/gh-5536-wal-limit.test.lua
> @@ -0,0 +1,58 @@
> +test_run = require('test_run').new()
> +fiber = require('fiber')
> +
> +--
> +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so
> +-- that appliers stop reading new transactions from master once the queue is
> +-- full.
> +--
> +box.schema.user.grant('guest', 'replication')
> +_ = box.schema.space.create('test')
> +_ = box.space.test:create_index('pk')
> +
> +replication_timeout = box.cfg.replication_timeout
> +box.cfg{replication_timeout=1000}
> +
> +test_run:cmd('create server replica with rpl_master=default,\
> +              script="replication/replica.lua"')
> +test_run:cmd('start server replica with wait=True, wait_load=True')
> +
> +test_run:switch('replica')
> +-- Huge replication timeout to not cause reconnects while applier is blocked.
> +-- Tiny queue size (in bytes) to allow exactly one queue entry at a time.
> +box.cfg{wal_queue_max_size=1, replication_timeout=1000}
> +write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT")
> +-- Block WAL writes so that we may test queue overflow.
> +box.error.injection.set("ERRINJ_WAL_DELAY", true)
> +
> +test_run:switch('default')
> +
> +for i = 1,10 do box.space.test:insert{i} end
> +
> +test_run:switch('replica')
> +-- Wait for replication. Cannot rely on lsn bump here. It won't happen while
> +-- WAL is blocked.
> +test_run:wait_cond(function()\
> +    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\
> +end)
> +require('fiber').sleep(0.5)
> +-- Only one entry fits when the limit is small.
> +assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1)
> +box.error.injection.set("ERRINJ_WAL_DELAY", false)
> +
> +-- Once the block is removed everything is written.
> +test_run:wait_cond(function()\
> +    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\
> +end)
> +assert(box.space.test:count() == 10)
> +assert(box.info.replication[1].upstream.status == 'follow')
> +
> +test_run:switch('default')
> +
> +-- Cleanup.
> +box.cfg{replication_timeout=replication_timeout}
> +test_run:cmd('stop server replica')
> +test_run:cmd('delete server replica')
> +box.space.test:drop()
> +box.schema.user.revoke('guest', 'replication')
> +
> diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
> index c80430afc..7e7004592 100644
> --- a/test/replication/suite.cfg
> +++ b/test/replication/suite.cfg
> @@ -37,6 +37,7 @@
>      "gh-4928-tx-boundaries.test.lua": {},
>      "gh-5440-qsync-ro.test.lua": {},
>      "gh-5435-qsync-clear-synchro-queue-commit-all.test.lua": {},
> +    "gh-5536-wal-limit.test.lua": {},
>      "*": {
>          "memtx": {"engine": "memtx"},
>          "vinyl": {"engine": "vinyl"}
> diff --git a/test/replication/suite.ini b/test/replication/suite.ini
> index e4812df37..89abfabff 100644
> --- a/test/replication/suite.ini
> +++ b/test/replication/suite.ini
> @@ -3,7 +3,7 @@ core = tarantool
>  script =  master.lua
>  description = tarantool/box, replication
>  disabled = consistent.test.lua
> -release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua
> +release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua
>  config = suite.cfg
>  lua_libs = lua/fast_replica.lua lua/rlimit.lua
>  use_unix_sockets = True
> -- 
> 2.24.3 (Apple Git-128)

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-02-24 19:35 [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes Serge Petrenko via Tarantool-patches
  2021-02-24 19:40 ` Serge Petrenko via Tarantool-patches
  2021-02-25 13:05 ` Konstantin Osipov via Tarantool-patches
@ 2021-02-26  0:56 ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-01 19:08   ` Serge Petrenko via Tarantool-patches
  2021-03-09 19:49 ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-19 15:36 ` Kirill Yukhin via Tarantool-patches
  4 siblings, 1 reply; 30+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-02-26  0:56 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

Hi! Thanks for the patch!

See 8 comments below, and my diff in the end of the email and on
top of the branch.

On 24.02.2021 20:35, Serge Petrenko via Tarantool-patches wrote:
> Since the introduction of asynchronous commit, which doesn't wait for a
> WAL write to succeed, it's quite easy to clog WAL with huge amounts
> write requests. For now, it's only possible from an applier, since it's
> the only user of async commit at the moment.
> 
> This happens when replica is syncing with master and reads new
> transactions at a pace higher than it can write them to WAL (see docbot
> request for detailed explanation).
> 
> To ameliorate such behavior, we need to introduce some limit on
> not-yet-finished WAL write requests. This is what this commit is trying
> to do.
> Two new counters are added to wal writer: queue_size (in bytes) and
> queue_len (in wal messages) together with configuration settings:
> `wal_queue_max_size` and `wal_queue_max_len`.
> Size and length are increased on every new submitted request, and are
> decreased once the tx receives a confirmation that a specific request
> was written.
> Actually, the limits are added to an abstract journal, but take effect
> only for wal writer.

1. Now the limits affect any current journal, regardless of its type.
Although they really work only for WAL, because only WAL journal
yields AFAIK. Others just 'commit' immediately.

> ---
> https://github.com/tarantool/tarantool/issues/5536
> https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 553db76fc..27ddd0f29 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>  		goto success;
>  	}
>  
> +	/*
> +	 * Do not spam WAL with excess write requests, let it process what's
> +	 * piled up first.
> +	 * This is done before opening the transaction to avoid problems with
> +	 * yielding inside it.
> +	 */
> +	if (journal_queue_is_full() || journal_queue_has_waiters())
> +		journal_wait_queue();

2. Perhaps simply move both checks into journal_wait_queue(). Seems like
an internal thing for the queue's API.

> diff --git a/src/box/journal.c b/src/box/journal.c
> index cb320b557..4c31a3dfe 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -34,6 +34,16 @@
>  
>  struct journal *current_journal = NULL;
>  
> +struct journal_queue journal_queue = {
> +	.max_size = INT64_MAX,
> +	.size = 0,
> +	.max_len = INT64_MAX,
> +	.len = 0,
> +	.waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters),
> +	.is_awake = false,
> +	.is_ready = false,
> +};

3. Kostja might be right about most of the queue's code being a good
candidate for an extraction to libcore. So the queue itself would
be the semaphore + queue size and len parameters. But up to you.

> +
>  struct journal_entry *
>  journal_entry_new(size_t n_rows, struct region *region,
>  		  journal_write_async_f write_async_cb,
> @@ -55,3 +65,64 @@ journal_entry_new(size_t n_rows, struct region *region,
>  			     complete_data);
>  	return entry;
>  }
> +
> +struct journal_queue_entry {
> +	/** The fiber waiting for queue space to free. */
> +	struct fiber *fiber;
> +	/** A link in all waiting fibers list. */
> +	struct rlist in_queue;

4. I see fiber_cond uses fiber->state member. Can we do the same here?
Because the queue is not much different from fiber_cond. Both are
built on top of fiber API. Which means the 'state' might be usable in
the queue as well.

> +};
> +
> +/**
> + * Wake up the first waiter in the journal queue.
> + */
> +static inline void
> +journal_queue_wakeup_first(void)
> +{
> +	struct journal_queue_entry *e;
> +	if (rlist_empty(&journal_queue.waiters))
> +		goto out;
> +	/*
> +	 * When the queue isn't forcefully emptied, no need to wake everyone
> +	 * else up until there's some free space.
> +	 */
> +	if (!journal_queue.is_ready && journal_queue_is_full())
> +		goto out;
> +	e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e),
> +			in_queue);

5. Why didn't rlist_first_entry() work?

> +	fiber_wakeup(e->fiber);
> +	return;
> +out:
> +	journal_queue.is_awake = false;
> +	journal_queue.is_ready = false;
> +}
> +
> +void
> +journal_queue_wakeup(bool force_ready)
> +{
> +	assert(!rlist_empty(&journal_queue.waiters));
> +	if (journal_queue.is_awake)
> +		return;
> +	journal_queue.is_awake = true;
> +	journal_queue.is_ready = force_ready;
> +	journal_queue_wakeup_first();
> +}
> +
> +void
> +journal_wait_queue(void)
> +{
> +	struct journal_queue_entry entry = {
> +		.fiber = fiber(),
> +	};
> +	rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue);
> +	/*
> +	 * Will be waken up by either queue emptying or a synchronous write.
> +	 */
> +	while (journal_queue_is_full() && !journal_queue.is_ready)
> +		fiber_yield();

6. You check for full anyway. So as I mentioned in the comment in
applier's code, you can move all the checks into there, and do
them before creating entry and adding it to the queue. As a
'fast path'. And it would make journal_wait_queue() easier to use.

> +
> +	assert(&entry.in_queue == rlist_first(&journal_queue.waiters));
> +	rlist_del(&entry.in_queue);
> +
> +	journal_queue_wakeup_first();
> +}
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 5d8d5a726..8fec5b27e 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -124,6 +154,82 @@ struct journal {
>  		     struct journal_entry *entry);
>  };
>  
> +/**
> + * Depending on the step of recovery and instance configuration
> + * points at a concrete implementation of the journal.
> + */
> +extern struct journal *current_journal;

7. Now you don't need to move current_journal declaration.

> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry)
>  static inline int
>  journal_write_async(struct journal_entry *entry)
>  {
> +	/*
> +	 * It's the job of the caller to check whether the queue is full prior
> +	 * to submitting the request.
> +	 */
> +	assert(!journal_queue_is_full() || journal_queue.is_ready);
> +	journal_queue_on_append(entry);

8. Probably must assert that waiters list is empty. Otherwise you
could go out of order.

> +
>  	return current_journal->write_async(current_journal, entry);
>  }
>  

I tried to do some fixes on top of your branch in order to delete
the flags and some branching.

Take a look at the diff below and on top of the branch in a separate
commit.

====================
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 27ddd0f29..2586f6654 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -973,8 +973,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
 	 * This is done before opening the transaction to avoid problems with
 	 * yielding inside it.
 	 */
-	if (journal_queue_is_full() || journal_queue_has_waiters())
-		journal_wait_queue();
+	journal_queue_wait();
 
 	/**
 	 * Explicitly begin the transaction so that we can
diff --git a/src/box/journal.c b/src/box/journal.c
index 4c31a3dfe..40a5f5b1a 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -40,8 +40,7 @@ struct journal_queue journal_queue = {
 	.max_len = INT64_MAX,
 	.len = 0,
 	.waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters),
-	.is_awake = false,
-	.is_ready = false,
+	.waiter_count = 0,
 };
 
 struct journal_entry *
@@ -66,63 +65,38 @@ journal_entry_new(size_t n_rows, struct region *region,
 	return entry;
 }
 
-struct journal_queue_entry {
-	/** The fiber waiting for queue space to free. */
-	struct fiber *fiber;
-	/** A link in all waiting fibers list. */
-	struct rlist in_queue;
-};
-
-/**
- * Wake up the first waiter in the journal queue.
- */
-static inline void
-journal_queue_wakeup_first(void)
-{
-	struct journal_queue_entry *e;
-	if (rlist_empty(&journal_queue.waiters))
-		goto out;
-	/*
-	 * When the queue isn't forcefully emptied, no need to wake everyone
-	 * else up until there's some free space.
-	 */
-	if (!journal_queue.is_ready && journal_queue_is_full())
-		goto out;
-	e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e),
-			in_queue);
-	fiber_wakeup(e->fiber);
-	return;
-out:
-	journal_queue.is_awake = false;
-	journal_queue.is_ready = false;
-}
-
 void
-journal_queue_wakeup(bool force_ready)
+journal_queue_wakeup(void)
 {
-	assert(!rlist_empty(&journal_queue.waiters));
-	if (journal_queue.is_awake)
-		return;
-	journal_queue.is_awake = true;
-	journal_queue.is_ready = force_ready;
-	journal_queue_wakeup_first();
+	struct rlist *list = &journal_queue.waiters;
+	if (!rlist_empty(list) && !journal_queue_is_full())
+		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
 }
 
 void
-journal_wait_queue(void)
+journal_queue_wait(void)
 {
-	struct journal_queue_entry entry = {
-		.fiber = fiber(),
-	};
-	rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue);
+	if (!journal_queue_is_full() && !journal_queue_has_waiters())
+		return;
+	++journal_queue.waiter_count;
+	rlist_add_tail_entry(&journal_queue.waiters, fiber(), state);
 	/*
 	 * Will be waken up by either queue emptying or a synchronous write.
 	 */
-	while (journal_queue_is_full() && !journal_queue.is_ready)
+	do {
 		fiber_yield();
+	} while (journal_queue_is_full());
+	--journal_queue.waiter_count;
+	journal_queue_wakeup();
+}
 
-	assert(&entry.in_queue == rlist_first(&journal_queue.waiters));
-	rlist_del(&entry.in_queue);
-
-	journal_queue_wakeup_first();
+void
+journal_queue_flush(void)
+{
+	if (!journal_queue_has_waiters())
+		return;
+	struct rlist *list = &journal_queue.waiters;
+	while (!rlist_empty(list))
+		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
+	journal_queue_wait();
 }
diff --git a/src/box/journal.h b/src/box/journal.h
index 8fec5b27e..3b93158cf 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -128,17 +128,22 @@ struct journal_queue {
 	 * Whether the queue is being woken or not. Used to avoid multiple
 	 * concurrent wake-ups.
 	 */
-	bool is_awake;
-	/**
-	 * A flag used to tell the waiting fibers they may proceed even if the
-	 * queue is full, i.e. force them to submit a write request.
-	 */
-	bool is_ready;
+	bool waiter_count;
 };
 
 /** A single queue for all journal instances. */
 extern struct journal_queue journal_queue;
 
+/**
+ * Check whether anyone is waiting for the journal queue to empty. If there are
+ * other waiters we must go after them to preserve write order.
+ */
+static inline bool
+journal_queue_has_waiters(void)
+{
+	return journal_queue.waiter_count != 0;
+}
====================

I moved this function not on purpose. Cold be kept in
the old place.

====================
+
 /**
  * An API for an abstract journal for all transactions of this
  * instance, as well as for multiple instances in case of
@@ -166,7 +171,7 @@ extern struct journal *current_journal;
  *                    full.
  */
 void
-journal_queue_wakeup(bool force_ready);
+journal_queue_wakeup(void);
 
 /**
  * Check whether any of the queue size limits is reached.
@@ -180,27 +185,19 @@ journal_queue_is_full(void)
 	       journal_queue.len > journal_queue.max_len;
 }
 
-/**
- * Check whether anyone is waiting for the journal queue to empty. If there are
- * other waiters we must go after them to preserve write order.
- */
-static inline bool
-journal_queue_has_waiters(void)
-{
-	return !rlist_empty(&journal_queue.waiters);
-}
-
 /** Yield until there's some space in the journal queue. */
 void
-journal_wait_queue(void);
+journal_queue_wait(void);
+
+void
+journal_queue_flush(void);
 
 /** Set maximal journal queue size in bytes. */
 static inline void
 journal_queue_set_max_size(int64_t size)
 {
 	journal_queue.max_size = size;
-	if (journal_queue_has_waiters() && !journal_queue_is_full())
-		journal_queue_wakeup(false);
+	journal_queue_wakeup();
 }
 
 /** Set maximal journal queue length, in entries. */
@@ -208,8 +205,7 @@ static inline void
 journal_queue_set_max_len(int64_t len)
 {
 	journal_queue.max_len = len;
-	if (journal_queue_has_waiters() && !journal_queue_is_full())
-		journal_queue_wakeup(false);
+	journal_queue_wakeup();
 }
 
 /** Increase queue size on a new write request. */
@@ -239,8 +235,7 @@ journal_async_complete(struct journal_entry *entry)
 	assert(entry->write_async_cb != NULL);
 
 	journal_queue_on_complete(entry);
-	if (journal_queue_has_waiters() && !journal_queue_is_full())
-		journal_queue_wakeup(false);
+	journal_queue_wakeup();
 
 	entry->write_async_cb(entry);
 }
@@ -253,16 +248,7 @@ journal_async_complete(struct journal_entry *entry)
 static inline int
 journal_write(struct journal_entry *entry)
 {
-	if (journal_queue_has_waiters()) {
-		/*
-		 * It's a synchronous write, so it's fine to wait a bit more for
-		 * everyone else to be written. They'll wake us up back
-		 * afterwards.
-		 */
-		journal_queue_wakeup(true);
-		journal_wait_queue();
-	}
-
+	journal_queue_flush();
 	journal_queue_on_append(entry);
 
 	return current_journal->write(current_journal, entry);
@@ -280,7 +266,7 @@ journal_write_async(struct journal_entry *entry)
 	 * It's the job of the caller to check whether the queue is full prior
 	 * to submitting the request.
 	 */
-	assert(!journal_queue_is_full() || journal_queue.is_ready);
+	assert(journal_queue.waiter_count == 0);
 	journal_queue_on_append(entry);
 
 	return current_journal->write_async(current_journal, entry);

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-02-25 13:05 ` Konstantin Osipov via Tarantool-patches
@ 2021-02-26  0:57   ` Vladislav Shpilevoy via Tarantool-patches
  2021-02-26  7:18     ` Konstantin Osipov via Tarantool-patches
  2021-03-01 19:15   ` Serge Petrenko via Tarantool-patches
  1 sibling, 1 reply; 30+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-02-26  0:57 UTC (permalink / raw)
  To: Konstantin Osipov, Serge Petrenko, gorcunov, tarantool-patches

> I'd also question the place where you decided to put this gate.
> The source of the issue is async requests, not WAL, which worked
> fine in absence of async requests. So it's async requests that
> should be gated, not WAL.

In the commit message it is clearly stated why it is in the
journal's code, not just in the applier:

	The feature is ready for `box.commit{is_async=true}`. Once it's
	implemented, it should check whether the queue is full and let the user
	decide what to do next. Either wait or roll the tx back.

Async transactions will be exposed to 'userspace' to be able to reduce
latency for network requests ending with a transaction. They won't have
to wait for WAL write to end.

> Otherwise your overflow will just spill out someplace else.

On the contrary. Your proposal to do it in the applier would lead to
queue overflow in some other place - in userspace. When the queue is
for the entire WAL, it won't overflow.

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-02-26  0:57   ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-02-26  7:18     ` Konstantin Osipov via Tarantool-patches
  2021-02-26 20:23       ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 1 reply; 30+ messages in thread
From: Konstantin Osipov via Tarantool-patches @ 2021-02-26  7:18 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [21/02/26 10:15]:
> > I'd also question the place where you decided to put this gate.
> > The source of the issue is async requests, not WAL, which worked
> > fine in absence of async requests. So it's async requests that
> > should be gated, not WAL.
> 
> In the commit message it is clearly stated why it is in the
> journal's code, not just in the applier:
> 
> 	The feature is ready for `box.commit{is_async=true}`. Once it's
> 	implemented, it should check whether the queue is full and let the user
> 	decide what to do next. Either wait or roll the tx back.
> 
> Async transactions will be exposed to 'userspace' to be able to reduce
> latency for network requests ending with a transaction. They won't have
> to wait for WAL write to end.

You did not understand my comment. I tried to say that a major
part of this code is generic and should reside in lib/core as a
counting semaphore abstraction. Async transaction simply use this
counting semaphore to throttle themselves. Then neither WAL nor
any other resource used by async transactions will be overloaded.

Otherwise, the system would be allowed to create async
transactions, and while WAL will not overflow, some other resource
(memory, transaction identifiers, whatever) may still overflow. 

> > Otherwise your overflow will just spill out someplace else.
> 
> On the contrary. Your proposal to do it in the applier would lead to
> queue overflow in some other place - in userspace. When the queue is
> for the entire WAL, it won't overflow.

I did  not say it should be in the applier. 

-- 
Konstantin Osipov, Moscow, Russia
https://scylladb.com

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-02-26  7:18     ` Konstantin Osipov via Tarantool-patches
@ 2021-02-26 20:23       ` Vladislav Shpilevoy via Tarantool-patches
  2021-02-26 21:20         ` Konstantin Osipov via Tarantool-patches
  0 siblings, 1 reply; 30+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-02-26 20:23 UTC (permalink / raw)
  To: Konstantin Osipov, Serge Petrenko, gorcunov, tarantool-patches

On 26.02.2021 08:18, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [21/02/26 10:15]:
>>> I'd also question the place where you decided to put this gate.
>>> The source of the issue is async requests, not WAL, which worked
>>> fine in absence of async requests. So it's async requests that
>>> should be gated, not WAL.
>>
>> In the commit message it is clearly stated why it is in the
>> journal's code, not just in the applier:
>>
>> 	The feature is ready for `box.commit{is_async=true}`. Once it's
>> 	implemented, it should check whether the queue is full and let the user
>> 	decide what to do next. Either wait or roll the tx back.
>>
>> Async transactions will be exposed to 'userspace' to be able to reduce
>> latency for network requests ending with a transaction. They won't have
>> to wait for WAL write to end.
> 
> You did not understand my comment. I tried to say that a major
> part of this code is generic and should reside in lib/core as a
> counting semaphore abstraction. Async transaction simply use this
> counting semaphore to throttle themselves. Then neither WAL nor
> any other resource used by async transactions will be overloaded.
> 
> Otherwise, the system would be allowed to create async
> transactions, and while WAL will not overflow, some other resource
> (memory, transaction identifiers, whatever) may still overflow. 

Ok, now I understand. Yeah, I also think it is a good idea to move it
libcore if nothing major will change in the patch due to any reason.

Talking of the other limits - firstly we need to find if some of them
really overflows. Then yes, such a semaphone-thing could be applied
there too. But AFAIK, there are no other known similar bugs yet.

>>> Otherwise your overflow will just spill out someplace else.
>>
>> On the contrary. Your proposal to do it in the applier would lead to
>> queue overflow in some other place - in userspace. When the queue is
>> for the entire WAL, it won't overflow.
> 
> I did  not say it should be in the applier. 

It was a misunderstanding.

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-02-26 20:23       ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-02-26 21:20         ` Konstantin Osipov via Tarantool-patches
  2021-02-26 22:44           ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 1 reply; 30+ messages in thread
From: Konstantin Osipov via Tarantool-patches @ 2021-02-26 21:20 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [21/02/26 23:24]:
> Talking of the other limits - firstly we need to find if some of them
> really overflows. Then yes, such a semaphone-thing could be applied
> there too. But AFAIK, there are no other known similar bugs yet.

Exploring this rather theoretically, since there are no user async
transactions yet, I can imagine such transaction takes up memory
and then blocks on WAL semaphore. If there is no limit on the
number of async transactions, it can be a lot of memory. On the
other hand this can be limited by a yet another semaphore.

> >>> Otherwise your overflow will just spill out someplace else.
> >>
> >> On the contrary. Your proposal to do it in the applier would lead to
> >> queue overflow in some other place - in userspace. When the queue is
> >> for the entire WAL, it won't overflow.
> > 
> > I did  not say it should be in the applier. 
> 
> It was a misunderstanding.

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-02-26 21:20         ` Konstantin Osipov via Tarantool-patches
@ 2021-02-26 22:44           ` Vladislav Shpilevoy via Tarantool-patches
  2021-02-27 13:27             ` Konstantin Osipov via Tarantool-patches
  0 siblings, 1 reply; 30+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-02-26 22:44 UTC (permalink / raw)
  To: Konstantin Osipov, Serge Petrenko, gorcunov, tarantool-patches

On 26.02.2021 22:20, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [21/02/26 23:24]:
>> Talking of the other limits - firstly we need to find if some of them
>> really overflows. Then yes, such a semaphone-thing could be applied
>> there too. But AFAIK, there are no other known similar bugs yet.
> 
> Exploring this rather theoretically, since there are no user async
> transactions yet, I can imagine such transaction takes up memory
> and then blocks on WAL semaphore. If there is no limit on the
> number of async transactions, it can be a lot of memory. On the
> other hand this can be limited by a yet another semaphore.

Yes, such transactions will only occupy memory. But the plan is return
an error from box.commit({is_async}) if the WAL queue is full already.
Because we don't want to block the async commit in anyway. Better bail
out earlier and give the user a chance to call normal box.commit() if
necessary. Or introduce some kind of 'try_async' to block if the queue
is full, but no block if not full, I don't know. We didn't think on the
API yet.

There will be a limit both on number of transactions and on their
total size. The limits are introduced right in this patch, and will
be used by async transactions in the future.

The main usage, if I understand correctly, will be for latency-sensitive
applications, when you send your transactions/calls/IPROTO commands via
network, send your data to WAL and return response to the client
immediately. You don't have to wait even until writev() completes.

Interestingly, it won't lead to loss of any guarantees, because anyway
completion of writev() does not mean anything. Still can power off before
it hits the disk, and still the node can fail loosing all non-replicated
data. So seems like a good feature.

Especially should be good for synchrononus transactions, where commit
duration can be quite big.

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-02-26 22:44           ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-02-27 13:27             ` Konstantin Osipov via Tarantool-patches
  0 siblings, 0 replies; 30+ messages in thread
From: Konstantin Osipov via Tarantool-patches @ 2021-02-27 13:27 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [21/02/27 16:22]:
> On 26.02.2021 22:20, Konstantin Osipov wrote:
> > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [21/02/26 23:24]:
> >> Talking of the other limits - firstly we need to find if some of them
> >> really overflows. Then yes, such a semaphone-thing could be applied
> >> there too. But AFAIK, there are no other known similar bugs yet.
> > 
> > Exploring this rather theoretically, since there are no user async
> > transactions yet, I can imagine such transaction takes up memory
> > and then blocks on WAL semaphore. If there is no limit on the
> > number of async transactions, it can be a lot of memory. On the
> > other hand this can be limited by a yet another semaphore.
> 
> Yes, such transactions will only occupy memory. But the plan is return
> an error from box.commit({is_async}) if the WAL queue is full already.
> Because we don't want to block the async commit in anyway. Better bail
> out earlier and give the user a chance to call normal box.commit() if
> necessary. Or introduce some kind of 'try_async' to block if the queue
> is full, but no block if not full, I don't know. We didn't think on the
> API yet.

There is no point in doing parasitic work in this case -
performing a transaction and then rolling it back. 
> 

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-02-26  0:56 ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-01 19:08   ` Serge Petrenko via Tarantool-patches
  2021-03-01 22:05     ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 1 reply; 30+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-01 19:08 UTC (permalink / raw)
  To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches



26.02.2021 03:56, Vladislav Shpilevoy пишет:
> Hi! Thanks for the patch!
>
> See 8 comments below, and my diff in the end of the email and on
> top of the branch.

Thanks for the review & the fixes!

>
> On 24.02.2021 20:35, Serge Petrenko via Tarantool-patches wrote:
>> Since the introduction of asynchronous commit, which doesn't wait for a
>> WAL write to succeed, it's quite easy to clog WAL with huge amounts
>> write requests. For now, it's only possible from an applier, since it's
>> the only user of async commit at the moment.
>>
>> This happens when replica is syncing with master and reads new
>> transactions at a pace higher than it can write them to WAL (see docbot
>> request for detailed explanation).
>>
>> To ameliorate such behavior, we need to introduce some limit on
>> not-yet-finished WAL write requests. This is what this commit is trying
>> to do.
>> Two new counters are added to wal writer: queue_size (in bytes) and
>> queue_len (in wal messages) together with configuration settings:
>> `wal_queue_max_size` and `wal_queue_max_len`.
>> Size and length are increased on every new submitted request, and are
>> decreased once the tx receives a confirmation that a specific request
>> was written.
>> Actually, the limits are added to an abstract journal, but take effect
>> only for wal writer.
> 1. Now the limits affect any current journal, regardless of its type.
> Although they really work only for WAL, because only WAL journal
> yields AFAIK. Others just 'commit' immediately.

Correct.
I've reworded this piece.

>
>> ---
>> https://github.com/tarantool/tarantool/issues/5536
>> https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom
>>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 553db76fc..27ddd0f29 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>>   		goto success;
>>   	}
>>   
>> +	/*
>> +	 * Do not spam WAL with excess write requests, let it process what's
>> +	 * piled up first.
>> +	 * This is done before opening the transaction to avoid problems with
>> +	 * yielding inside it.
>> +	 */
>> +	if (journal_queue_is_full() || journal_queue_has_waiters())
>> +		journal_wait_queue();
> 2. Perhaps simply move both checks into journal_wait_queue(). Seems like
> an internal thing for the queue's API.

Looks good.

>
>> diff --git a/src/box/journal.c b/src/box/journal.c
>> index cb320b557..4c31a3dfe 100644
>> --- a/src/box/journal.c
>> +++ b/src/box/journal.c
>> @@ -34,6 +34,16 @@
>>   
>>   struct journal *current_journal = NULL;
>>   
>> +struct journal_queue journal_queue = {
>> +	.max_size = INT64_MAX,
>> +	.size = 0,
>> +	.max_len = INT64_MAX,
>> +	.len = 0,
>> +	.waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters),
>> +	.is_awake = false,
>> +	.is_ready = false,
>> +};
> 3. Kostja might be right about most of the queue's code being a good
> candidate for an extraction to libcore. So the queue itself would
> be the semaphore + queue size and len parameters. But up to you.

I'm not sure I get it. It would be a counting semaphore, if we had a 
single limit,
say, only entry count, or only entry size. But we have both.
So it's not a "normal" counting semaphore. And if not, why extract it as 
a generic
primitive?

Moreover, the limits are now 'soft'. As discussed verbally, we'll wake 
the entry up
and let it proceed, if we see, that the queue isn't full at the time of 
wake up.
But it may be full again once the fiber is actually put to execution. 
And we ignore
this. So it's a "soft counting semaphore with 2 resources".

>
>> +
>>   struct journal_entry *
>>   journal_entry_new(size_t n_rows, struct region *region,
>>   		  journal_write_async_f write_async_cb,
>> @@ -55,3 +65,64 @@ journal_entry_new(size_t n_rows, struct region *region,
>>   			     complete_data);
>>   	return entry;
>>   }
>> +
>> +struct journal_queue_entry {
>> +	/** The fiber waiting for queue space to free. */
>> +	struct fiber *fiber;
>> +	/** A link in all waiting fibers list. */
>> +	struct rlist in_queue;
> 4. I see fiber_cond uses fiber->state member. Can we do the same here?
> Because the queue is not much different from fiber_cond. Both are
> built on top of fiber API. Which means the 'state' might be usable in
> the queue as well.

It's a good idea, thanks! Applied with minor changes, described below.

>
>> +};
>> +
>> +/**
>> + * Wake up the first waiter in the journal queue.
>> + */
>> +static inline void
>> +journal_queue_wakeup_first(void)
>> +{
>> +	struct journal_queue_entry *e;
>> +	if (rlist_empty(&journal_queue.waiters))
>> +		goto out;
>> +	/*
>> +	 * When the queue isn't forcefully emptied, no need to wake everyone
>> +	 * else up until there's some free space.
>> +	 */
>> +	if (!journal_queue.is_ready && journal_queue_is_full())
>> +		goto out;
>> +	e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e),
>> +			in_queue);
> 5. Why didn't rlist_first_entry() work?

It sure would work. Just a misprint, thanks for pointing this out!

>
>> +	fiber_wakeup(e->fiber);
>> +	return;
>> +out:
>> +	journal_queue.is_awake = false;
>> +	journal_queue.is_ready = false;
>> +}
>> +
>> +void
>> +journal_queue_wakeup(bool force_ready)
>> +{
>> +	assert(!rlist_empty(&journal_queue.waiters));
>> +	if (journal_queue.is_awake)
>> +		return;
>> +	journal_queue.is_awake = true;
>> +	journal_queue.is_ready = force_ready;
>> +	journal_queue_wakeup_first();
>> +}
>> +
>> +void
>> +journal_wait_queue(void)
>> +{
>> +	struct journal_queue_entry entry = {
>> +		.fiber = fiber(),
>> +	};
>> +	rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue);
>> +	/*
>> +	 * Will be waken up by either queue emptying or a synchronous write.
>> +	 */
>> +	while (journal_queue_is_full() && !journal_queue.is_ready)
>> +		fiber_yield();
> 6. You check for full anyway. So as I mentioned in the comment in
> applier's code, you can move all the checks into there, and do
> them before creating entry and adding it to the queue. As a
> 'fast path'. And it would make journal_wait_queue() easier to use.

Ok

>
>> +
>> +	assert(&entry.in_queue == rlist_first(&journal_queue.waiters));
>> +	rlist_del(&entry.in_queue);
>> +
>> +	journal_queue_wakeup_first();
>> +}
>> diff --git a/src/box/journal.h b/src/box/journal.h
>> index 5d8d5a726..8fec5b27e 100644
>> --- a/src/box/journal.h
>> +++ b/src/box/journal.h
>> @@ -124,6 +154,82 @@ struct journal {
>>   		     struct journal_entry *entry);
>>   };
>>   
>> +/**
>> + * Depending on the step of recovery and instance configuration
>> + * points at a concrete implementation of the journal.
>> + */
>> +extern struct journal *current_journal;
> 7. Now you don't need to move current_journal declaration.

Moved that back.

>
>> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry)
>>   static inline int
>>   journal_write_async(struct journal_entry *entry)
>>   {
>> +	/*
>> +	 * It's the job of the caller to check whether the queue is full prior
>> +	 * to submitting the request.
>> +	 */
>> +	assert(!journal_queue_is_full() || journal_queue.is_ready);
>> +	journal_queue_on_append(entry);
> 8. Probably must assert that waiters list is empty. Otherwise you
> could go out of order.

It's not empty by the time the first entry gets to 'journal_write_async'.
Everyone else is waken up, but not yet removed from the queue.

Looks like we cannot determine whether a write is called after waiting 
in queue
or not.

>
>> +
>>   	return current_journal->write_async(current_journal, entry);
>>   }
>>   
> I tried to do some fixes on top of your branch in order to delete
> the flags and some branching.
>
> Take a look at the diff below and on top of the branch in a separate
> commit.

Thanks! Looks good, with my changes on top. I squashed everything into one
commit and updated the branch.

============================================

diff --git a/src/box/journal.c b/src/box/journal.c
index 40a5f5b1a..92a773684 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -83,9 +83,7 @@ journal_queue_wait(void)
      /*
       * Will be waken up by either queue emptying or a synchronous write.
       */
-    do {
-        fiber_yield();
-    } while (journal_queue_is_full());
+    fiber_yield();
      --journal_queue.waiter_count;
      journal_queue_wakeup();
  }
diff --git a/src/box/journal.h b/src/box/journal.h
index 3b93158cf..b5d587e3a 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -124,26 +124,13 @@ struct journal_queue {
       * entered the queue.
       */
      struct rlist waiters;
-    /**
-     * Whether the queue is being woken or not. Used to avoid multiple
-     * concurrent wake-ups.
-     */
-    bool waiter_count;
+    /** How many waiters there are in a queue. */
+    int waiter_count;
  };

  /** A single queue for all journal instances. */
  extern struct journal_queue journal_queue;

-/**
- * Check whether anyone is waiting for the journal queue to empty. If 
there are
- * other waiters we must go after them to preserve write order.
- */
-static inline bool
-journal_queue_has_waiters(void)
-{
-    return journal_queue.waiter_count != 0;
-}
-

============================================
Returned this func back to its original place.
============================================

  /**
   * An API for an abstract journal for all transactions of this
   * instance, as well as for multiple instances in case of
@@ -173,6 +160,16 @@ extern struct journal *current_journal;
  void
  journal_queue_wakeup(void);

+/**
+ * Check whether anyone is waiting for the journal queue to empty. If 
there are
+ * other waiters we must go after them to preserve write order.
+ */
+static inline bool
+journal_queue_has_waiters(void)
+{
+    return journal_queue.waiter_count != 0;
+}
+
  /**
   * Check whether any of the queue size limits is reached.
   * If the queue is full, we must wait for some of the entries to be 
written
@@ -235,7 +232,6 @@ journal_async_complete(struct journal_entry *entry)
      assert(entry->write_async_cb != NULL);

      journal_queue_on_complete(entry);
-    journal_queue_wakeup();

============================================
Let's wake the queue up once the whole batch is processed.
This way we avoid excess checks and waking every waiter in queue
up in case the queue is not full (each wakeup wakes the first entry up
and removes it from the list, so a ton of wake-ups would wake up every
entry there is).
============================================

      entry->write_async_cb(entry);
  }
@@ -266,7 +262,6 @@ journal_write_async(struct journal_entry *entry)
       * It's the job of the caller to check whether the queue is full prior
       * to submitting the request.
       */
-    assert(journal_queue.waiter_count == 0);
      journal_queue_on_append(entry);

============================================
As I mentioned above, we cannot assert queue hasn't got waiters here.
============================================

      return current_journal->write_async(current_journal, entry);
diff --git a/src/box/wal.c b/src/box/wal.c
index 328ab092d..7829ccc95 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -274,6 +274,8 @@ tx_schedule_queue(struct stailq *queue)
      struct journal_entry *req, *tmp;
      stailq_foreach_entry_safe(req, tmp, queue, fifo)
          journal_async_complete(req);
+
+    journal_queue_wakeup();
  }

  /**

============================================
>
> ====================
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 27ddd0f29..2586f6654 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -973,8 +973,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>   	 * This is done before opening the transaction to avoid problems with
>   	 * yielding inside it.
>   	 */
> -	if (journal_queue_is_full() || journal_queue_has_waiters())
> -		journal_wait_queue();
> +	journal_queue_wait();
>   
>   	/**
>   	 * Explicitly begin the transaction so that we can
> diff --git a/src/box/journal.c b/src/box/journal.c
> index 4c31a3dfe..40a5f5b1a 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -40,8 +40,7 @@ struct journal_queue journal_queue = {
>   	.max_len = INT64_MAX,
>   	.len = 0,
>   	.waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters),
> -	.is_awake = false,
> -	.is_ready = false,
> +	.waiter_count = 0,
>   };
>   
>   struct journal_entry *
> @@ -66,63 +65,38 @@ journal_entry_new(size_t n_rows, struct region *region,
>   	return entry;
>   }
>   
> -struct journal_queue_entry {
> -	/** The fiber waiting for queue space to free. */
> -	struct fiber *fiber;
> -	/** A link in all waiting fibers list. */
> -	struct rlist in_queue;
> -};
> -
> -/**
> - * Wake up the first waiter in the journal queue.
> - */
> -static inline void
> -journal_queue_wakeup_first(void)
> -{
> -	struct journal_queue_entry *e;
> -	if (rlist_empty(&journal_queue.waiters))
> -		goto out;
> -	/*
> -	 * When the queue isn't forcefully emptied, no need to wake everyone
> -	 * else up until there's some free space.
> -	 */
> -	if (!journal_queue.is_ready && journal_queue_is_full())
> -		goto out;
> -	e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e),
> -			in_queue);
> -	fiber_wakeup(e->fiber);
> -	return;
> -out:
> -	journal_queue.is_awake = false;
> -	journal_queue.is_ready = false;
> -}
> -
>   void
> -journal_queue_wakeup(bool force_ready)
> +journal_queue_wakeup(void)
>   {
> -	assert(!rlist_empty(&journal_queue.waiters));
> -	if (journal_queue.is_awake)
> -		return;
> -	journal_queue.is_awake = true;
> -	journal_queue.is_ready = force_ready;
> -	journal_queue_wakeup_first();
> +	struct rlist *list = &journal_queue.waiters;
> +	if (!rlist_empty(list) && !journal_queue_is_full())
> +		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
>   }
>   
>   void
> -journal_wait_queue(void)
> +journal_queue_wait(void)
>   {
> -	struct journal_queue_entry entry = {
> -		.fiber = fiber(),
> -	};
> -	rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue);
> +	if (!journal_queue_is_full() && !journal_queue_has_waiters())
> +		return;
> +	++journal_queue.waiter_count;
> +	rlist_add_tail_entry(&journal_queue.waiters, fiber(), state);
>   	/*
>   	 * Will be waken up by either queue emptying or a synchronous write.
>   	 */
> -	while (journal_queue_is_full() && !journal_queue.is_ready)
> +	do {
>   		fiber_yield();
> +	} while (journal_queue_is_full());
> +	--journal_queue.waiter_count;
> +	journal_queue_wakeup();
> +}
>   
> -	assert(&entry.in_queue == rlist_first(&journal_queue.waiters));
> -	rlist_del(&entry.in_queue);
> -
> -	journal_queue_wakeup_first();
> +void
> +journal_queue_flush(void)
> +{
> +	if (!journal_queue_has_waiters())
> +		return;
> +	struct rlist *list = &journal_queue.waiters;
> +	while (!rlist_empty(list))
> +		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
> +	journal_queue_wait();
>   }
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 8fec5b27e..3b93158cf 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -128,17 +128,22 @@ struct journal_queue {
>   	 * Whether the queue is being woken or not. Used to avoid multiple
>   	 * concurrent wake-ups.
>   	 */
> -	bool is_awake;
> -	/**
> -	 * A flag used to tell the waiting fibers they may proceed even if the
> -	 * queue is full, i.e. force them to submit a write request.
> -	 */
> -	bool is_ready;
> +	bool waiter_count;
>   };
>   
>   /** A single queue for all journal instances. */
>   extern struct journal_queue journal_queue;
>   
> +/**
> + * Check whether anyone is waiting for the journal queue to empty. If there are
> + * other waiters we must go after them to preserve write order.
> + */
> +static inline bool
> +journal_queue_has_waiters(void)
> +{
> +	return journal_queue.waiter_count != 0;
> +}
> ====================
>
> I moved this function not on purpose. Cold be kept in
> the old place.
>
> ====================
> +
>   /**
>    * An API for an abstract journal for all transactions of this
>    * instance, as well as for multiple instances in case of
> @@ -166,7 +171,7 @@ extern struct journal *current_journal;
>    *                    full.
>    */
>   void
> -journal_queue_wakeup(bool force_ready);
> +journal_queue_wakeup(void);
>   
>   /**
>    * Check whether any of the queue size limits is reached.
> @@ -180,27 +185,19 @@ journal_queue_is_full(void)
>   	       journal_queue.len > journal_queue.max_len;
>   }
>   
> -/**
> - * Check whether anyone is waiting for the journal queue to empty. If there are
> - * other waiters we must go after them to preserve write order.
> - */
> -static inline bool
> -journal_queue_has_waiters(void)
> -{
> -	return !rlist_empty(&journal_queue.waiters);
> -}
> -
>   /** Yield until there's some space in the journal queue. */
>   void
> -journal_wait_queue(void);
> +journal_queue_wait(void);
> +
> +void
> +journal_queue_flush(void);
>   
>   /** Set maximal journal queue size in bytes. */
>   static inline void
>   journal_queue_set_max_size(int64_t size)
>   {
>   	journal_queue.max_size = size;
> -	if (journal_queue_has_waiters() && !journal_queue_is_full())
> -		journal_queue_wakeup(false);
> +	journal_queue_wakeup();
>   }
>   
>   /** Set maximal journal queue length, in entries. */
> @@ -208,8 +205,7 @@ static inline void
>   journal_queue_set_max_len(int64_t len)
>   {
>   	journal_queue.max_len = len;
> -	if (journal_queue_has_waiters() && !journal_queue_is_full())
> -		journal_queue_wakeup(false);
> +	journal_queue_wakeup();
>   }
>   
>   /** Increase queue size on a new write request. */
> @@ -239,8 +235,7 @@ journal_async_complete(struct journal_entry *entry)
>   	assert(entry->write_async_cb != NULL);
>   
>   	journal_queue_on_complete(entry);
> -	if (journal_queue_has_waiters() && !journal_queue_is_full())
> -		journal_queue_wakeup(false);
> +	journal_queue_wakeup();
>   
>   	entry->write_async_cb(entry);
>   }
> @@ -253,16 +248,7 @@ journal_async_complete(struct journal_entry *entry)
>   static inline int
>   journal_write(struct journal_entry *entry)
>   {
> -	if (journal_queue_has_waiters()) {
> -		/*
> -		 * It's a synchronous write, so it's fine to wait a bit more for
> -		 * everyone else to be written. They'll wake us up back
> -		 * afterwards.
> -		 */
> -		journal_queue_wakeup(true);
> -		journal_wait_queue();
> -	}
> -
> +	journal_queue_flush();
>   	journal_queue_on_append(entry);
>   
>   	return current_journal->write(current_journal, entry);
> @@ -280,7 +266,7 @@ journal_write_async(struct journal_entry *entry)
>   	 * It's the job of the caller to check whether the queue is full prior
>   	 * to submitting the request.
>   	 */
> -	assert(!journal_queue_is_full() || journal_queue.is_ready);
> +	assert(journal_queue.waiter_count == 0);
>   	journal_queue_on_append(entry);
>   
>   	return current_journal->write_async(current_journal, entry);

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-02-25 13:05 ` Konstantin Osipov via Tarantool-patches
  2021-02-26  0:57   ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-01 19:15   ` Serge Petrenko via Tarantool-patches
  2021-03-01 21:46     ` Konstantin Osipov via Tarantool-patches
  1 sibling, 1 reply; 30+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-01 19:15 UTC (permalink / raw)
  To: Konstantin Osipov, gorcunov, v.shpilevoy, tarantool-patches



25.02.2021 16:05, Konstantin Osipov пишет:
> * Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org> [21/02/24 22:39]:

Hi! Thanks for the review!

>
> Looks like a simple counting semaphore. I don't see why it has
> to be specific to class journal, more like part of lib/core.
>
> https://en.cppreference.com/w/cpp/thread/counting_semaphore

Not completely. It has 2 limits instead of 1, size and len,
and the limits are 'soft', meaning the resource is free at the
moment we wake the waiters up but it may be occupied again once
the waiters actually wake up. Some fibers put to execution before
the waken ones may have exhausted the limit, but we don't care
about that.

IMO this looks quite specialised. And there wouldn't be much use
of such a primitive in lib/core.

>
> I'd also question the place where you decided to put this gate.
> The source of the issue is async requests, not WAL, which worked
> fine in absence of async requests. So it's async requests that
> should be gated, not WAL. Otherwise your overflow will just spill
> out someplace else.
>
>> Since the introduction of asynchronous commit, which doesn't wait for a
>> WAL write to succeed, it's quite easy to clog WAL with huge amounts
>> write requests. For now, it's only possible from an applier, since it's
>> the only user of async commit at the moment.
>>
>> This happens when replica is syncing with master and reads new
>> transactions at a pace higher than it can write them to WAL (see docbot
>> request for detailed explanation).
>>
>> To ameliorate such behavior, we need to introduce some limit on
>> not-yet-finished WAL write requests. This is what this commit is trying
>> to do.
>> Two new counters are added to wal writer: queue_size (in bytes) and
>> queue_len (in wal messages) together with configuration settings:
>> `wal_queue_max_size` and `wal_queue_max_len`.
>> Size and length are increased on every new submitted request, and are
>> decreased once the tx receives a confirmation that a specific request
>> was written.
>> Actually, the limits are added to an abstract journal, but take effect
>> only for wal writer.
>>
>> Once size or length reach their maximum values, applier is blocked until
>> some of the write requests are finished.
>>
>> The size limit isn't strict, i.e. if there's at least one free byte, the
>> whole write request fits and no blocking is involved.
>>
>> The feature is ready for `box.commit{is_async=true}`. Once it's
>> implemented, it should check whether the queue is full and let the user
>> decide what to do next. Either wait or roll the tx back.
>>
>> Part of #5536
>>
>> @TarantoolBot document
>> Title: new configuration options: 'wal_queue_max_size', 'wal_queue_max_len'
>>
>> `wal_queue_max_size` and `wal_queue_max_len` put a limit on the amount
>> of concurrent write requests submitted to WAL.
>> `wal_queue_max_size` is measured in number of bytes to be written (0
>> means unlimited), and `wal_queue_max_len` is measured in number of
>> transactions, 0 meaning unlimited.
>> These options only affect replica behaviour at the moment, and default
>> to 0. They limit the pace at which replica reads new transactions from
>> master.
>>
>> Here's when these options come in handy:
>> Imagine such a situation: there are 2 servers, a master and a replica,
>> and the replica is down for some period of time. While the replica is
>> down, the master serves requests at a reasonable pace, possibly close to
>> its WAL throughput limit. Once the replica reconnects, it has to receive
>> all the data master has piled up. Now there's no limit in speed at which
>> master sends the data to replica, and there's no limit at which
>> replica's applier submits corresponding write requests to WAL. This
>> leads to a situation when replica's WAL is never in time to serve the
>> requests and the amount of pending requests is constantly growing.
>> There's no limit for memory WAL write requests take, and this clogging
>> of WAL write queue may even lead to replica using up all the available
>> memory.
>>
>> Now, when either `wal_queue_max_size` or `wal_queue_max_len` or both are
>> set, appliers will stop reading new transactions once the limit is
>> reached. This will let WAL process all the requests that have piled up
>> and free all the excess memory.
>> ---
>> https://github.com/tarantool/tarantool/issues/5536
>> https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom
>>
>> Changes in v3:
>>    - Extract queue internals into new struct journal_queue.
>>    - Move is_ready flag to queue from queue entry.
>>    - Minor refactoring.
>>
>> Changes in v2:
>>    - Move queue logic to journal.
>>
>>   src/box/applier.cc                          |   9 ++
>>   src/box/box.cc                              |  52 ++++++++
>>   src/box/box.h                               |   2 +
>>   src/box/journal.c                           |  71 ++++++++++
>>   src/box/journal.h                           | 136 +++++++++++++++++++-
>>   src/box/lua/cfg.cc                          |  18 +++
>>   src/box/lua/load_cfg.lua                    |   6 +
>>   src/box/wal.c                               |  14 ++
>>   src/box/wal.h                               |  14 ++
>>   test/app-tap/init_script.result             |   2 +
>>   test/box-tap/cfg.test.lua                   |   4 +-
>>   test/box/admin.result                       |   4 +
>>   test/box/cfg.result                         |   8 ++
>>   test/replication/gh-5536-wal-limit.result   | 132 +++++++++++++++++++
>>   test/replication/gh-5536-wal-limit.test.lua |  58 +++++++++
>>   test/replication/suite.cfg                  |   1 +
>>   test/replication/suite.ini                  |   2 +-
>>   17 files changed, 525 insertions(+), 8 deletions(-)
>>   create mode 100644 test/replication/gh-5536-wal-limit.result
>>   create mode 100644 test/replication/gh-5536-wal-limit.test.lua
>>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 553db76fc..27ddd0f29 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>>   		goto success;
>>   	}
>>   
>> +	/*
>> +	 * Do not spam WAL with excess write requests, let it process what's
>> +	 * piled up first.
>> +	 * This is done before opening the transaction to avoid problems with
>> +	 * yielding inside it.
>> +	 */
>> +	if (journal_queue_is_full() || journal_queue_has_waiters())
>> +		journal_wait_queue();
>> +
>>   	/**
>>   	 * Explicitly begin the transaction so that we can
>>   	 * control fiber->gc life cycle and, in case of apply
>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index 26cbe8aab..9a3b092d0 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -754,6 +754,34 @@ box_check_wal_mode(const char *mode_name)
>>   	return (enum wal_mode) mode;
>>   }
>>   
>> +static int64_t
>> +box_check_wal_queue_max_len(void)
>> +{
>> +	int64_t len = cfg_geti64("wal_queue_max_len");
>> +	if (len < 0) {
>> +		diag_set(ClientError, ER_CFG, "wal_queue_max_len",
>> +			 "wal_queue_max_len must be >= 0");
>> +	}
>> +	/* Unlimited. */
>> +	if (len == 0)
>> +		len = INT64_MAX;
>> +	return len;
>> +}
>> +
>> +static int64_t
>> +box_check_wal_queue_max_size(void)
>> +{
>> +	int64_t size = cfg_geti64("wal_queue_max_size");
>> +	if (size < 0) {
>> +		diag_set(ClientError, ER_CFG, "wal_queue_max_size",
>> +			 "wal_queue_max_size must be >= 0");
>> +	}
>> +	/* Unlimited. */
>> +	if (size == 0)
>> +		size = INT64_MAX;
>> +	return size;
>> +}
>> +
>>   static void
>>   box_check_readahead(int readahead)
>>   {
>> @@ -875,6 +903,10 @@ box_check_config(void)
>>   	box_check_checkpoint_count(cfg_geti("checkpoint_count"));
>>   	box_check_wal_max_size(cfg_geti64("wal_max_size"));
>>   	box_check_wal_mode(cfg_gets("wal_mode"));
>> +	if (box_check_wal_queue_max_size() < 0)
>> +		diag_raise();
>> +	if (box_check_wal_queue_max_len() < 0)
>> +		diag_raise();
>>   	if (box_check_memory_quota("memtx_memory") < 0)
>>   		diag_raise();
>>   	box_check_memtx_min_tuple_size(cfg_geti64("memtx_min_tuple_size"));
>> @@ -1411,6 +1443,26 @@ box_set_checkpoint_wal_threshold(void)
>>   	wal_set_checkpoint_threshold(threshold);
>>   }
>>   
>> +int
>> +box_set_wal_queue_max_size(void)
>> +{
>> +	int64_t size = box_check_wal_queue_max_size();
>> +	if (size < 0)
>> +		return -1;
>> +	wal_set_queue_max_size(size);
>> +	return 0;
>> +}
>> +
>> +int
>> +box_set_wal_queue_max_len(void)
>> +{
>> +	int64_t len = box_check_wal_queue_max_len();
>> +	if (len < 0)
>> +		return -1;
>> +	wal_set_queue_max_len(len);
>> +	return 0;
>> +}
>> +
>>   void
>>   box_set_vinyl_memory(void)
>>   {
>> diff --git a/src/box/box.h b/src/box/box.h
>> index b68047a95..4f5b4b617 100644
>> --- a/src/box/box.h
>> +++ b/src/box/box.h
>> @@ -239,6 +239,8 @@ void box_set_readahead(void);
>>   void box_set_checkpoint_count(void);
>>   void box_set_checkpoint_interval(void);
>>   void box_set_checkpoint_wal_threshold(void);
>> +int box_set_wal_queue_max_size(void);
>> +int box_set_wal_queue_max_len(void);
>>   void box_set_memtx_memory(void);
>>   void box_set_memtx_max_tuple_size(void);
>>   void box_set_vinyl_memory(void);
>> diff --git a/src/box/journal.c b/src/box/journal.c
>> index cb320b557..4c31a3dfe 100644
>> --- a/src/box/journal.c
>> +++ b/src/box/journal.c
>> @@ -34,6 +34,16 @@
>>   
>>   struct journal *current_journal = NULL;
>>   
>> +struct journal_queue journal_queue = {
>> +	.max_size = INT64_MAX,
>> +	.size = 0,
>> +	.max_len = INT64_MAX,
>> +	.len = 0,
>> +	.waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters),
>> +	.is_awake = false,
>> +	.is_ready = false,
>> +};
>> +
>>   struct journal_entry *
>>   journal_entry_new(size_t n_rows, struct region *region,
>>   		  journal_write_async_f write_async_cb,
>> @@ -55,3 +65,64 @@ journal_entry_new(size_t n_rows, struct region *region,
>>   			     complete_data);
>>   	return entry;
>>   }
>> +
>> +struct journal_queue_entry {
>> +	/** The fiber waiting for queue space to free. */
>> +	struct fiber *fiber;
>> +	/** A link in all waiting fibers list. */
>> +	struct rlist in_queue;
>> +};
>> +
>> +/**
>> + * Wake up the first waiter in the journal queue.
>> + */
>> +static inline void
>> +journal_queue_wakeup_first(void)
>> +{
>> +	struct journal_queue_entry *e;
>> +	if (rlist_empty(&journal_queue.waiters))
>> +		goto out;
>> +	/*
>> +	 * When the queue isn't forcefully emptied, no need to wake everyone
>> +	 * else up until there's some free space.
>> +	 */
>> +	if (!journal_queue.is_ready && journal_queue_is_full())
>> +		goto out;
>> +	e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e),
>> +			in_queue);
>> +	fiber_wakeup(e->fiber);
>> +	return;
>> +out:
>> +	journal_queue.is_awake = false;
>> +	journal_queue.is_ready = false;
>> +}
>> +
>> +void
>> +journal_queue_wakeup(bool force_ready)
>> +{
>> +	assert(!rlist_empty(&journal_queue.waiters));
>> +	if (journal_queue.is_awake)
>> +		return;
>> +	journal_queue.is_awake = true;
>> +	journal_queue.is_ready = force_ready;
>> +	journal_queue_wakeup_first();
>> +}
>> +
>> +void
>> +journal_wait_queue(void)
>> +{
>> +	struct journal_queue_entry entry = {
>> +		.fiber = fiber(),
>> +	};
>> +	rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue);
>> +	/*
>> +	 * Will be waken up by either queue emptying or a synchronous write.
>> +	 */
>> +	while (journal_queue_is_full() && !journal_queue.is_ready)
>> +		fiber_yield();
>> +
>> +	assert(&entry.in_queue == rlist_first(&journal_queue.waiters));
>> +	rlist_del(&entry.in_queue);
>> +
>> +	journal_queue_wakeup_first();
>> +}
>> diff --git a/src/box/journal.h b/src/box/journal.h
>> index 5d8d5a726..8fec5b27e 100644
>> --- a/src/box/journal.h
>> +++ b/src/box/journal.h
>> @@ -109,6 +109,36 @@ journal_entry_new(size_t n_rows, struct region *region,
>>   		  journal_write_async_f write_async_cb,
>>   		  void *complete_data);
>>   
>> +struct journal_queue {
>> +	/** Maximal size of entries enqueued in journal (in bytes). */
>> +	int64_t max_size;
>> +	/** Current approximate size of journal queue. */
>> +	int64_t size;
>> +	/** Maximal allowed length of journal queue, in entries. */
>> +	int64_t max_len;
>> +	/** Current journal queue length. */
>> +	int64_t len;
>> +	/**
>> +	 * The fibers waiting for some space to free in journal queue.
>> +	 * Once some space is freed they will be waken up in the same order they
>> +	 * entered the queue.
>> +	 */
>> +	struct rlist waiters;
>> +	/**
>> +	 * Whether the queue is being woken or not. Used to avoid multiple
>> +	 * concurrent wake-ups.
>> +	 */
>> +	bool is_awake;
>> +	/**
>> +	 * A flag used to tell the waiting fibers they may proceed even if the
>> +	 * queue is full, i.e. force them to submit a write request.
>> +	 */
>> +	bool is_ready;
>> +};
>> +
>> +/** A single queue for all journal instances. */
>> +extern struct journal_queue journal_queue;
>> +
>>   /**
>>    * An API for an abstract journal for all transactions of this
>>    * instance, as well as for multiple instances in case of
>> @@ -124,6 +154,82 @@ struct journal {
>>   		     struct journal_entry *entry);
>>   };
>>   
>> +/**
>> + * Depending on the step of recovery and instance configuration
>> + * points at a concrete implementation of the journal.
>> + */
>> +extern struct journal *current_journal;
>> +
>> +/**
>> + * Wake the journal queue up.
>> + * @param force_ready whether waiters should proceed even if the queue is still
>> + *                    full.
>> + */
>> +void
>> +journal_queue_wakeup(bool force_ready);
>> +
>> +/**
>> + * Check whether any of the queue size limits is reached.
>> + * If the queue is full, we must wait for some of the entries to be written
>> + * before proceeding with a new asynchronous write request.
>> + */
>> +static inline bool
>> +journal_queue_is_full(void)
>> +{
>> +	return journal_queue.size > journal_queue.max_size ||
>> +	       journal_queue.len > journal_queue.max_len;
>> +}
>> +
>> +/**
>> + * Check whether anyone is waiting for the journal queue to empty. If there are
>> + * other waiters we must go after them to preserve write order.
>> + */
>> +static inline bool
>> +journal_queue_has_waiters(void)
>> +{
>> +	return !rlist_empty(&journal_queue.waiters);
>> +}
>> +
>> +/** Yield until there's some space in the journal queue. */
>> +void
>> +journal_wait_queue(void);
>> +
>> +/** Set maximal journal queue size in bytes. */
>> +static inline void
>> +journal_queue_set_max_size(int64_t size)
>> +{
>> +	journal_queue.max_size = size;
>> +	if (journal_queue_has_waiters() && !journal_queue_is_full())
>> +		journal_queue_wakeup(false);
>> +}
>> +
>> +/** Set maximal journal queue length, in entries. */
>> +static inline void
>> +journal_queue_set_max_len(int64_t len)
>> +{
>> +	journal_queue.max_len = len;
>> +	if (journal_queue_has_waiters() && !journal_queue_is_full())
>> +		journal_queue_wakeup(false);
>> +}
>> +
>> +/** Increase queue size on a new write request. */
>> +static inline void
>> +journal_queue_on_append(struct journal_entry *entry)
>> +{
>> +	journal_queue.len++;
>> +	journal_queue.size += entry->approx_len;
>> +}
>> +
>> +/** Decrease queue size once write request is complete. */
>> +static inline void
>> +journal_queue_on_complete(struct journal_entry *entry)
>> +{
>> +	journal_queue.len--;
>> +	journal_queue.size -= entry->approx_len;
>> +	assert(journal_queue.len >= 0);
>> +	assert(journal_queue.size >= 0);
>> +}
>> +
>>   /**
>>    * Complete asynchronous write.
>>    */
>> @@ -131,15 +237,14 @@ static inline void
>>   journal_async_complete(struct journal_entry *entry)
>>   {
>>   	assert(entry->write_async_cb != NULL);
>> +
>> +	journal_queue_on_complete(entry);
>> +	if (journal_queue_has_waiters() && !journal_queue_is_full())
>> +		journal_queue_wakeup(false);
>> +
>>   	entry->write_async_cb(entry);
>>   }
>>   
>> -/**
>> - * Depending on the step of recovery and instance configuration
>> - * points at a concrete implementation of the journal.
>> - */
>> -extern struct journal *current_journal;
>> -
>>   /**
>>    * Write a single entry to the journal in synchronous way.
>>    *
>> @@ -148,6 +253,18 @@ extern struct journal *current_journal;
>>   static inline int
>>   journal_write(struct journal_entry *entry)
>>   {
>> +	if (journal_queue_has_waiters()) {
>> +		/*
>> +		 * It's a synchronous write, so it's fine to wait a bit more for
>> +		 * everyone else to be written. They'll wake us up back
>> +		 * afterwards.
>> +		 */
>> +		journal_queue_wakeup(true);
>> +		journal_wait_queue();
>> +	}
>> +
>> +	journal_queue_on_append(entry);
>> +
>>   	return current_journal->write(current_journal, entry);
>>   }
>>   
>> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry)
>>   static inline int
>>   journal_write_async(struct journal_entry *entry)
>>   {
>> +	/*
>> +	 * It's the job of the caller to check whether the queue is full prior
>> +	 * to submitting the request.
>> +	 */
>> +	assert(!journal_queue_is_full() || journal_queue.is_ready);
>> +	journal_queue_on_append(entry);
>> +
>>   	return current_journal->write_async(current_journal, entry);
>>   }
>>   
>> diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
>> index 2d3ccbf0e..35f410710 100644
>> --- a/src/box/lua/cfg.cc
>> +++ b/src/box/lua/cfg.cc
>> @@ -164,6 +164,22 @@ lbox_cfg_set_checkpoint_wal_threshold(struct lua_State *L)
>>   	return 0;
>>   }
>>   
>> +static int
>> +lbox_cfg_set_wal_queue_max_size(struct lua_State *L)
>> +{
>> +	if (box_set_wal_queue_max_size() != 0)
>> +		luaT_error(L);
>> +	return 0;
>> +}
>> +
>> +static int
>> +lbox_cfg_set_wal_queue_max_len(struct lua_State *L)
>> +{
>> +	if (box_set_wal_queue_max_len() != 0)
>> +		luaT_error(L);
>> +	return 0;
>> +}
>> +
>>   static int
>>   lbox_cfg_set_read_only(struct lua_State *L)
>>   {
>> @@ -399,6 +415,8 @@ box_lua_cfg_init(struct lua_State *L)
>>   		{"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count},
>>   		{"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval},
>>   		{"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold},
>> +		{"cfg_set_wal_queue_max_size", lbox_cfg_set_wal_queue_max_size},
>> +		{"cfg_set_wal_queue_max_len", lbox_cfg_set_wal_queue_max_len},
>>   		{"cfg_set_read_only", lbox_cfg_set_read_only},
>>   		{"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory},
>>   		{"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size},
>> diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
>> index 574c8bef4..c11a9e103 100644
>> --- a/src/box/lua/load_cfg.lua
>> +++ b/src/box/lua/load_cfg.lua
>> @@ -71,6 +71,8 @@ local default_cfg = {
>>       wal_mode            = "write",
>>       wal_max_size        = 256 * 1024 * 1024,
>>       wal_dir_rescan_delay= 2,
>> +    wal_queue_max_size  = 0,
>> +    wal_queue_max_len   = 0,
>>       force_recovery      = false,
>>       replication         = nil,
>>       instance_uuid       = nil,
>> @@ -163,6 +165,8 @@ local template_cfg = {
>>       coredump            = 'boolean',
>>       checkpoint_interval = 'number',
>>       checkpoint_wal_threshold = 'number',
>> +    wal_queue_max_size  = 'number',
>> +    wal_queue_max_len   = 'number',
>>       checkpoint_count    = 'number',
>>       read_only           = 'boolean',
>>       hot_standby         = 'boolean',
>> @@ -277,6 +281,8 @@ local dynamic_cfg = {
>>       checkpoint_count        = private.cfg_set_checkpoint_count,
>>       checkpoint_interval     = private.cfg_set_checkpoint_interval,
>>       checkpoint_wal_threshold = private.cfg_set_checkpoint_wal_threshold,
>> +    wal_queue_max_size      = private.cfg_set_wal_queue_max_size,
>> +    wal_queue_max_len       = private.cfg_set_wal_queue_max_len,
>>       worker_pool_threads     = private.cfg_set_worker_pool_threads,
>>       feedback_enabled        = ifdef_feedback_set_params,
>>       feedback_crashinfo      = ifdef_feedback_set_params,
>> diff --git a/src/box/wal.c b/src/box/wal.c
>> index 937d47ba9..4a0381cf4 100644
>> --- a/src/box/wal.c
>> +++ b/src/box/wal.c
>> @@ -765,6 +765,20 @@ wal_set_checkpoint_threshold(int64_t threshold)
>>   	fiber_set_cancellable(cancellable);
>>   }
>>   
>> +void
>> +wal_set_queue_max_size(int64_t size)
>> +{
>> +	assert(&wal_writer_singleton.base == current_journal);
>> +	journal_queue_set_max_size(size);
>> +}
>> +
>> +void
>> +wal_set_queue_max_len(int64_t len)
>> +{
>> +	assert(&wal_writer_singleton.base == current_journal);
>> +	journal_queue_set_max_len(len);
>> +}
>> +
>>   struct wal_gc_msg
>>   {
>>   	struct cbus_call_msg base;
>> diff --git a/src/box/wal.h b/src/box/wal.h
>> index ca43dc6eb..1db32f66f 100644
>> --- a/src/box/wal.h
>> +++ b/src/box/wal.h
>> @@ -237,6 +237,20 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint);
>>   void
>>   wal_set_checkpoint_threshold(int64_t threshold);
>>   
>> +/**
>> + * Set the pending write limit in bytes. Once the limit is reached, new
>> + * writes are blocked until some previous writes succeed.
>> + */
>> +void
>> +wal_set_queue_max_size(int64_t size);
>> +
>> +/**
>> + * Set the pending write limit in journal entries. Once the limit is reached,
>> + * new writes are blocked until some previous writes succeeed.
>> + */
>> +void
>> +wal_set_queue_max_len(int64_t len);
>> +
>>   /**
>>    * Remove WAL files that are not needed by consumers reading
>>    * rows at @vclock or newer.
>> diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
>> index 16c5b01d2..7a224e50e 100644
>> --- a/test/app-tap/init_script.result
>> +++ b/test/app-tap/init_script.result
>> @@ -56,6 +56,8 @@ wal_dir:.
>>   wal_dir_rescan_delay:2
>>   wal_max_size:268435456
>>   wal_mode:write
>> +wal_queue_max_len:0
>> +wal_queue_max_size:0
>>   worker_pool_threads:4
>>   --
>>   -- Test insert from detached fiber
>> diff --git a/test/box-tap/cfg.test.lua b/test/box-tap/cfg.test.lua
>> index a577f023d..3276ddf64 100755
>> --- a/test/box-tap/cfg.test.lua
>> +++ b/test/box-tap/cfg.test.lua
>> @@ -6,7 +6,7 @@ local socket = require('socket')
>>   local fio = require('fio')
>>   local uuid = require('uuid')
>>   local msgpack = require('msgpack')
>> -test:plan(108)
>> +test:plan(110)
>>   
>>   --------------------------------------------------------------------------------
>>   -- Invalid values
>> @@ -49,6 +49,8 @@ invalid('vinyl_run_count_per_level', 0)
>>   invalid('vinyl_run_size_ratio', 1)
>>   invalid('vinyl_bloom_fpr', 0)
>>   invalid('vinyl_bloom_fpr', 1.1)
>> +invalid('wal_queue_max_size', -1)
>> +invalid('wal_queue_max_len', -1)
>>   
>>   local function invalid_combinations(name, val)
>>       local status, result = pcall(box.cfg, val)
>> diff --git a/test/box/admin.result b/test/box/admin.result
>> index 05debe673..c818f4f9f 100644
>> --- a/test/box/admin.result
>> +++ b/test/box/admin.result
>> @@ -133,6 +133,10 @@ cfg_filter(box.cfg)
>>       - 268435456
>>     - - wal_mode
>>       - write
>> +  - - wal_queue_max_len
>> +    - 0
>> +  - - wal_queue_max_size
>> +    - 0
>>     - - worker_pool_threads
>>       - 4
>>   ...
>> diff --git a/test/box/cfg.result b/test/box/cfg.result
>> index 22a720c2c..19f322e7d 100644
>> --- a/test/box/cfg.result
>> +++ b/test/box/cfg.result
>> @@ -121,6 +121,10 @@ cfg_filter(box.cfg)
>>    |     - 268435456
>>    |   - - wal_mode
>>    |     - write
>> + |   - - wal_queue_max_len
>> + |     - 0
>> + |   - - wal_queue_max_size
>> + |     - 0
>>    |   - - worker_pool_threads
>>    |     - 4
>>    | ...
>> @@ -236,6 +240,10 @@ cfg_filter(box.cfg)
>>    |     - 268435456
>>    |   - - wal_mode
>>    |     - write
>> + |   - - wal_queue_max_len
>> + |     - 0
>> + |   - - wal_queue_max_size
>> + |     - 0
>>    |   - - worker_pool_threads
>>    |     - 4
>>    | ...
>> diff --git a/test/replication/gh-5536-wal-limit.result b/test/replication/gh-5536-wal-limit.result
>> new file mode 100644
>> index 000000000..f7799baa8
>> --- /dev/null
>> +++ b/test/replication/gh-5536-wal-limit.result
>> @@ -0,0 +1,132 @@
>> +-- test-run result file version 2
>> +test_run = require('test_run').new()
>> + | ---
>> + | ...
>> +fiber = require('fiber')
>> + | ---
>> + | ...
>> +
>> +--
>> +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so
>> +-- that appliers stop reading new transactions from master once the queue is
>> +-- full.
>> +--
>> +box.schema.user.grant('guest', 'replication')
>> + | ---
>> + | ...
>> +_ = box.schema.space.create('test')
>> + | ---
>> + | ...
>> +_ = box.space.test:create_index('pk')
>> + | ---
>> + | ...
>> +
>> +replication_timeout = box.cfg.replication_timeout
>> + | ---
>> + | ...
>> +box.cfg{replication_timeout=1000}
>> + | ---
>> + | ...
>> +
>> +test_run:cmd('create server replica with rpl_master=default,\
>> +              script="replication/replica.lua"')
>> + | ---
>> + | - true
>> + | ...
>> +test_run:cmd('start server replica with wait=True, wait_load=True')
>> + | ---
>> + | - true
>> + | ...
>> +
>> +test_run:switch('replica')
>> + | ---
>> + | - true
>> + | ...
>> +-- Huge replication timeout to not cause reconnects while applier is blocked.
>> +-- Tiny queue size (in bytes) to allow exactly one queue entry at a time.
>> +box.cfg{wal_queue_max_size=1, replication_timeout=1000}
>> + | ---
>> + | ...
>> +write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT")
>> + | ---
>> + | ...
>> +-- Block WAL writes so that we may test queue overflow.
>> +box.error.injection.set("ERRINJ_WAL_DELAY", true)
>> + | ---
>> + | - ok
>> + | ...
>> +
>> +test_run:switch('default')
>> + | ---
>> + | - true
>> + | ...
>> +
>> +for i = 1,10 do box.space.test:insert{i} end
>> + | ---
>> + | ...
>> +
>> +test_run:switch('replica')
>> + | ---
>> + | - true
>> + | ...
>> +-- Wait for replication. Cannot rely on lsn bump here. It won't happen while
>> +-- WAL is blocked.
>> +test_run:wait_cond(function()\
>> +    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\
>> +end)
>> + | ---
>> + | - true
>> + | ...
>> +require('fiber').sleep(0.5)
>> + | ---
>> + | ...
>> +-- Only one entry fits when the limit is small.
>> +assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1)
>> + | ---
>> + | - true
>> + | ...
>> +box.error.injection.set("ERRINJ_WAL_DELAY", false)
>> + | ---
>> + | - ok
>> + | ...
>> +
>> +-- Once the block is removed everything is written.
>> +test_run:wait_cond(function()\
>> +    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\
>> +end)
>> + | ---
>> + | - true
>> + | ...
>> +assert(box.space.test:count() == 10)
>> + | ---
>> + | - true
>> + | ...
>> +assert(box.info.replication[1].upstream.status == 'follow')
>> + | ---
>> + | - true
>> + | ...
>> +
>> +test_run:switch('default')
>> + | ---
>> + | - true
>> + | ...
>> +
>> +-- Cleanup.
>> +box.cfg{replication_timeout=replication_timeout}
>> + | ---
>> + | ...
>> +test_run:cmd('stop server replica')
>> + | ---
>> + | - true
>> + | ...
>> +test_run:cmd('delete server replica')
>> + | ---
>> + | - true
>> + | ...
>> +box.space.test:drop()
>> + | ---
>> + | ...
>> +box.schema.user.revoke('guest', 'replication')
>> + | ---
>> + | ...
>> +
>> diff --git a/test/replication/gh-5536-wal-limit.test.lua b/test/replication/gh-5536-wal-limit.test.lua
>> new file mode 100644
>> index 000000000..1e7d61ff7
>> --- /dev/null
>> +++ b/test/replication/gh-5536-wal-limit.test.lua
>> @@ -0,0 +1,58 @@
>> +test_run = require('test_run').new()
>> +fiber = require('fiber')
>> +
>> +--
>> +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so
>> +-- that appliers stop reading new transactions from master once the queue is
>> +-- full.
>> +--
>> +box.schema.user.grant('guest', 'replication')
>> +_ = box.schema.space.create('test')
>> +_ = box.space.test:create_index('pk')
>> +
>> +replication_timeout = box.cfg.replication_timeout
>> +box.cfg{replication_timeout=1000}
>> +
>> +test_run:cmd('create server replica with rpl_master=default,\
>> +              script="replication/replica.lua"')
>> +test_run:cmd('start server replica with wait=True, wait_load=True')
>> +
>> +test_run:switch('replica')
>> +-- Huge replication timeout to not cause reconnects while applier is blocked.
>> +-- Tiny queue size (in bytes) to allow exactly one queue entry at a time.
>> +box.cfg{wal_queue_max_size=1, replication_timeout=1000}
>> +write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT")
>> +-- Block WAL writes so that we may test queue overflow.
>> +box.error.injection.set("ERRINJ_WAL_DELAY", true)
>> +
>> +test_run:switch('default')
>> +
>> +for i = 1,10 do box.space.test:insert{i} end
>> +
>> +test_run:switch('replica')
>> +-- Wait for replication. Cannot rely on lsn bump here. It won't happen while
>> +-- WAL is blocked.
>> +test_run:wait_cond(function()\
>> +    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\
>> +end)
>> +require('fiber').sleep(0.5)
>> +-- Only one entry fits when the limit is small.
>> +assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1)
>> +box.error.injection.set("ERRINJ_WAL_DELAY", false)
>> +
>> +-- Once the block is removed everything is written.
>> +test_run:wait_cond(function()\
>> +    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\
>> +end)
>> +assert(box.space.test:count() == 10)
>> +assert(box.info.replication[1].upstream.status == 'follow')
>> +
>> +test_run:switch('default')
>> +
>> +-- Cleanup.
>> +box.cfg{replication_timeout=replication_timeout}
>> +test_run:cmd('stop server replica')
>> +test_run:cmd('delete server replica')
>> +box.space.test:drop()
>> +box.schema.user.revoke('guest', 'replication')
>> +
>> diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
>> index c80430afc..7e7004592 100644
>> --- a/test/replication/suite.cfg
>> +++ b/test/replication/suite.cfg
>> @@ -37,6 +37,7 @@
>>       "gh-4928-tx-boundaries.test.lua": {},
>>       "gh-5440-qsync-ro.test.lua": {},
>>       "gh-5435-qsync-clear-synchro-queue-commit-all.test.lua": {},
>> +    "gh-5536-wal-limit.test.lua": {},
>>       "*": {
>>           "memtx": {"engine": "memtx"},
>>           "vinyl": {"engine": "vinyl"}
>> diff --git a/test/replication/suite.ini b/test/replication/suite.ini
>> index e4812df37..89abfabff 100644
>> --- a/test/replication/suite.ini
>> +++ b/test/replication/suite.ini
>> @@ -3,7 +3,7 @@ core = tarantool
>>   script =  master.lua
>>   description = tarantool/box, replication
>>   disabled = consistent.test.lua
>> -release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua
>> +release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua
>>   config = suite.cfg
>>   lua_libs = lua/fast_replica.lua lua/rlimit.lua
>>   use_unix_sockets = True
>> -- 
>> 2.24.3 (Apple Git-128)

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-01 19:15   ` Serge Petrenko via Tarantool-patches
@ 2021-03-01 21:46     ` Konstantin Osipov via Tarantool-patches
  0 siblings, 0 replies; 30+ messages in thread
From: Konstantin Osipov via Tarantool-patches @ 2021-03-01 21:46 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: tarantool-patches, v.shpilevoy

* Serge Petrenko <sergepetrenko@tarantool.org> [21/03/01 22:19]:
> 25.02.2021 16:05, Konstantin Osipov пишет:
> > * Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org> [21/02/24 22:39]:
> 
> Hi! Thanks for the review!
> 
> > 
> > Looks like a simple counting semaphore. I don't see why it has
> > to be specific to class journal, more like part of lib/core.
> > 
> > https://en.cppreference.com/w/cpp/thread/counting_semaphore
> 
> Not completely. It has 2 limits instead of 1, size and len,
> and the limits are 'soft', meaning the resource is free at the
> moment we wake the waiters up but it may be occupied again once
> the waiters actually wake up.

For problem 1, use two semaphores, if you need two limits. For
problem 2, I don't see any value in doing it this way. Do you?

> Some fibers put to execution before
> the waken ones may have exhausted the limit, but we don't care
> about that.

Then they don't take the semaphore?

> IMO this looks quite specialised. And there wouldn't be much use
> of such a primitive in lib/core.

There is inherent value to solve the problem using standard
primitives. Non-standard primitives should be justified over
standard ones, not vice versa.


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-01 19:08   ` Serge Petrenko via Tarantool-patches
@ 2021-03-01 22:05     ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-02 17:51       ` Serge Petrenko via Tarantool-patches
  0 siblings, 1 reply; 30+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-01 22:05 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

Hi! Thanks for the fixes!

>>> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry)
>>>   static inline int
>>>   journal_write_async(struct journal_entry *entry)
>>>   {
>>> +    /*
>>> +     * It's the job of the caller to check whether the queue is full prior
>>> +     * to submitting the request.
>>> +     */
>>> +    assert(!journal_queue_is_full() || journal_queue.is_ready);
>>> +    journal_queue_on_append(entry);
>> 8. Probably must assert that waiters list is empty. Otherwise you
>> could go out of order.
> 
> It's not empty by the time the first entry gets to 'journal_write_async'.
> Everyone else is waken up, but not yet removed from the queue.
> 
> Looks like we cannot determine whether a write is called after waiting in queue
> or not.

It bothers me a lot, the rest looks good. We don't have any protection against
a reorder, even not an assertion. How about this? (I didn't test):

====================
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -69,8 +69,11 @@ void
 journal_queue_wakeup(void)
 {
 	struct rlist *list = &journal_queue.waiters;
-	if (!rlist_empty(list) && !journal_queue_is_full())
+	if (!rlist_empty(list) && !journal_queue_is_full()) {
 		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
+		--journal_queue.waiter_count;
+		assert(journal_queue.waiter_count >= 0);
+	}
 }
 
 void
@@ -84,7 +87,6 @@ journal_queue_wait(void)
 	 * Will be waken up by either queue emptying or a synchronous write.
 	 */
 	fiber_yield();
-	--journal_queue.waiter_count;
 	journal_queue_wakeup();
 }
 
@@ -96,5 +98,6 @@ journal_queue_flush(void)
 	struct rlist *list = &journal_queue.waiters;
 	while (!rlist_empty(list))
 		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
+	journal_queue.waiter_count = 0;
 	journal_queue_wait();
 }
diff --git a/src/box/journal.h b/src/box/journal.h
index 5f0e0accd..ea56043e2 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -260,6 +260,7 @@ journal_write_async(struct journal_entry *entry)
 	 * to submitting the request.
 	 */
 	journal_queue_on_append(entry);
+	assert(journal_queue.waiter_count == 0);
 
 	return current_journal->write_async(current_journal, entry);
 }

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-01 22:05     ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-02 17:51       ` Serge Petrenko via Tarantool-patches
  2021-03-03 20:59         ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 1 reply; 30+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-02 17:51 UTC (permalink / raw)
  To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches



02.03.2021 01:05, Vladislav Shpilevoy пишет:
> Hi! Thanks for the fixes!
>
>>>> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry)
>>>>    static inline int
>>>>    journal_write_async(struct journal_entry *entry)
>>>>    {
>>>> +    /*
>>>> +     * It's the job of the caller to check whether the queue is full prior
>>>> +     * to submitting the request.
>>>> +     */
>>>> +    assert(!journal_queue_is_full() || journal_queue.is_ready);
>>>> +    journal_queue_on_append(entry);
>>> 8. Probably must assert that waiters list is empty. Otherwise you
>>> could go out of order.
>> It's not empty by the time the first entry gets to 'journal_write_async'.
>> Everyone else is waken up, but not yet removed from the queue.
>>
>> Looks like we cannot determine whether a write is called after waiting in queue
>> or not.
> It bothers me a lot, the rest looks good. We don't have any protection against
> a reorder, even not an assertion. How about this? (I didn't test):

Thanks for thinking this through!
Yes, indeed, this is a problem. Unfortunately, your solution will allow 
reordering.
We needed the waiter count to prevent anyone from sliding in front when 
the qeueue
is already waken, but the fibers are not yet scheduled.

We also discussed that a reorder may happen if applier tries to apply
a vinyl tx, which may yield after it has waited in queue.
In this case, if some tx is committed (synchronously) and added to the queue
while the vinyl tx sleeps, the vinyl tx will bypass that synchronous tx 
in queue.

So, there shouldn't be any yields between journal_queue_wait() and
journal_write(_async)().

We cannot yield in memtx transactions, so the only solution is to put
journal_queue_wait() after txn_prepare() inside txn_commit_async().

Take a look at this patch (on top of the branch):
I decided to move journal_queue_wait() to journal_write_async() to be
consistent with journal_write(), which also checks queue.

Now reordering is not a problem since waiting in queue is done right before
the write request is submitted. No yields in between.

Also, I renamed journal_write_async() to journal_write_try_async() and
txn_commit_async() to txn_commit_try_async() to indicate that they may
yield occasionally now.

What do you think of this approach?

P.S. there's another thing I wanted to discuss: should we set defaults for
wal_queue_max_size(len) to some finite value? If yes, then what value should
it be? wal_queue_max_size=default memtx_memory (256 Mb)?
Don't know which value to choose for wal_queue_max_len at all.

===================================

Subject: [PATCH] [tosquash] journal_write and txn_commit: async -> try_async

Make journal_write_async() wait for journal queue. Rename it to
journal_write_try_async() to show that it might yield now (rarely).
Same for txn_commit_async(): rename it to txn_commit_try_async() since
it uses journal_write_try_async().
---
  src/box/applier.cc  | 12 ++----------
  src/box/box.cc      |  4 ++--
  src/box/journal.h   |  7 ++-----
  src/box/txn.c       |  4 ++--
  src/box/txn.h       |  3 ++-
  src/box/txn_limbo.h |  2 +-
  6 files changed, 11 insertions(+), 21 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index dff43795c..5a88a013e 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -868,7 +868,7 @@ apply_synchro_row(struct xrow_header *row)
      if (entry == NULL)
          goto err;

-    if (journal_write_async(&entry->journal_entry) != 0) {
+    if (journal_write_try_async(&entry->journal_entry) != 0) {
          diag_set(ClientError, ER_WAL_IO);
          goto err;
      }
@@ -967,14 +967,6 @@ applier_apply_tx(struct applier *applier, struct 
stailq *rows)
          goto success;
      }

-    /*
-     * Do not spam WAL with excess write requests, let it process what's
-     * piled up first.
-     * This is done before opening the transaction to avoid problems with
-     * yielding inside it.
-     */
-    journal_queue_wait();
-
      /**
       * Explicitly begin the transaction so that we can
       * control fiber->gc life cycle and, in case of apply
@@ -1048,7 +1040,7 @@ applier_apply_tx(struct applier *applier, struct 
stailq *rows)
      trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
      txn_on_wal_write(txn, on_wal_write);

-    if (txn_commit_async(txn) < 0)
+    if (txn_commit_try_async(txn) < 0)
          goto fail;

  success:
diff --git a/src/box/box.cc b/src/box/box.cc
index 9a3b092d0..a8aa2663f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -246,12 +246,12 @@ box_process_rw(struct request *request, struct 
space *space,
           * synchronous tx it meets until confirm timeout
           * is reached and the tx is rolled back, yielding
           * an error.
-         * Moreover, txn_commit_async() doesn't hurt at
+         * Moreover, txn_commit_try_async() doesn't hurt at
           * all during local recovery, since journal_write
           * is faked at this stage and returns immediately.
           */
          if (is_local_recovery) {
-            res = txn_commit_async(txn);
+            res = txn_commit_try_async(txn);
          } else {
              res = txn_commit(txn);
          }
diff --git a/src/box/journal.h b/src/box/journal.h
index 5f0e0accd..3a945fa53 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -253,12 +253,9 @@ journal_write(struct journal_entry *entry)
   * @return 0 if write was queued to a backend or -1 in case of an error.
   */
  static inline int
-journal_write_async(struct journal_entry *entry)
+journal_write_try_async(struct journal_entry *entry)
  {
-    /*
-     * It's the job of the caller to check whether the queue is full prior
-     * to submitting the request.
-     */
+    journal_queue_wait();
      journal_queue_on_append(entry);

      return current_journal->write_async(current_journal, entry);
diff --git a/src/box/txn.c b/src/box/txn.c
index 71c89ce5f..40061ff09 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -789,7 +789,7 @@ txn_limbo_on_rollback(struct trigger *trig, void *event)
  }

  int
-txn_commit_async(struct txn *txn)
+txn_commit_try_async(struct txn *txn)
  {
      struct journal_entry *req;

@@ -859,7 +859,7 @@ txn_commit_async(struct txn *txn)
      }

      fiber_set_txn(fiber(), NULL);
-    if (journal_write_async(req) != 0) {
+    if (journal_write_try_async(req) != 0) {
          fiber_set_txn(fiber(), txn);
          diag_set(ClientError, ER_WAL_IO);
          diag_log();
diff --git a/src/box/txn.h b/src/box/txn.h
index 29fe6d5ce..a45518064 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -473,12 +473,13 @@ txn_rollback(struct txn *txn);
   * journal write completion. Note, the journal write may still fail.
   * To track transaction status, one is supposed to use on_commit and
   * on_rollback triggers.
+ * Note, this may yield occasionally, once journal queue gets full.
   *
   * On failure -1 is returned and the transaction is rolled back and
   * freed.
   */
  int
-txn_commit_async(struct txn *txn);
+txn_commit_try_async(struct txn *txn);

  /**
   * Most txns don't have triggers, and txn objects
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index c28b5666d..af0addf8d 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -229,7 +229,7 @@ txn_limbo_assign_local_lsn(struct txn_limbo *limbo,
   * remote transactions. The function exists to be used in a
   * context, where a transaction is not known whether it is local
   * or not. For example, when a transaction is committed not bound
- * to any fiber (txn_commit_async()), it can be created by applier
+ * to any fiber (txn_commit_try_async()), it can be created by applier
   * (then it is remote) or by recovery (then it is local). Besides,
   * recovery can commit remote transactions as well, when works on
   * a replica - it will recover data received from master.
-- 
2.24.3 (Apple Git-128)

===================================


>
> ====================
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -69,8 +69,11 @@ void
>   journal_queue_wakeup(void)
>   {
>   	struct rlist *list = &journal_queue.waiters;
> -	if (!rlist_empty(list) && !journal_queue_is_full())
> +	if (!rlist_empty(list) && !journal_queue_is_full()) {
>   		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
> +		--journal_queue.waiter_count;
> +		assert(journal_queue.waiter_count >= 0);
> +	}
>   }
>   
>   void
> @@ -84,7 +87,6 @@ journal_queue_wait(void)
>   	 * Will be waken up by either queue emptying or a synchronous write.
>   	 */
>   	fiber_yield();
> -	--journal_queue.waiter_count;
>   	journal_queue_wakeup();
>   }
>   
> @@ -96,5 +98,6 @@ journal_queue_flush(void)
>   	struct rlist *list = &journal_queue.waiters;
>   	while (!rlist_empty(list))
>   		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
> +	journal_queue.waiter_count = 0;
>   	journal_queue_wait();
>   }
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 5f0e0accd..ea56043e2 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -260,6 +260,7 @@ journal_write_async(struct journal_entry *entry)
>   	 * to submitting the request.
>   	 */
>   	journal_queue_on_append(entry);
> +	assert(journal_queue.waiter_count == 0);
>   
>   	return current_journal->write_async(current_journal, entry);
>   }

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-02 17:51       ` Serge Petrenko via Tarantool-patches
@ 2021-03-03 20:59         ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-09 15:10           ` Serge Petrenko via Tarantool-patches
  0 siblings, 1 reply; 30+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-03 20:59 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

>>>>> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry)
>>>>>    static inline int
>>>>>    journal_write_async(struct journal_entry *entry)
>>>>>    {
>>>>> +    /*
>>>>> +     * It's the job of the caller to check whether the queue is full prior
>>>>> +     * to submitting the request.
>>>>> +     */
>>>>> +    assert(!journal_queue_is_full() || journal_queue.is_ready);
>>>>> +    journal_queue_on_append(entry);
>>>> 8. Probably must assert that waiters list is empty. Otherwise you
>>>> could go out of order.
>>> It's not empty by the time the first entry gets to 'journal_write_async'.
>>> Everyone else is waken up, but not yet removed from the queue.
>>>
>>> Looks like we cannot determine whether a write is called after waiting in queue
>>> or not.
>> It bothers me a lot, the rest looks good. We don't have any protection against
>> a reorder, even not an assertion. How about this? (I didn't test):
> 
> Thanks for thinking this through!
> Yes, indeed, this is a problem. Unfortunately, your solution will allow reordering.
> We needed the waiter count to prevent anyone from sliding in front when the qeueue
> is already waken, but the fibers are not yet scheduled.
> 
> We also discussed that a reorder may happen if applier tries to apply
> a vinyl tx, which may yield after it has waited in queue.
> In this case, if some tx is committed (synchronously) and added to the queue
> while the vinyl tx sleeps, the vinyl tx will bypass that synchronous tx in queue.
> 
> So, there shouldn't be any yields between journal_queue_wait() and
> journal_write(_async)().
> 
> We cannot yield in memtx transactions, so the only solution is to put
> journal_queue_wait() after txn_prepare() inside txn_commit_async().
> 
> Take a look at this patch (on top of the branch):
> I decided to move journal_queue_wait() to journal_write_async() to be
> consistent with journal_write(), which also checks queue.
> 
> Now reordering is not a problem since waiting in queue is done right before
> the write request is submitted. No yields in between.
> 
> Also, I renamed journal_write_async() to journal_write_try_async() and
> txn_commit_async() to txn_commit_try_async() to indicate that they may
> yield occasionally now.
> 
> What do you think of this approach?

Looks good.

> P.S. there's another thing I wanted to discuss: should we set defaults for
> wal_queue_max_size(len) to some finite value? If yes, then what value should
> it be? wal_queue_max_size=default memtx_memory (256 Mb)?
> Don't know which value to choose for wal_queue_max_len at all.

Probably we could try something sanely big. For instance, max_len = 1000000
in assumption that you might replicate a million rows per second, and a WAL
write is likely to end faster than a second. But if you still have a queue
bigger than million, then it looks wrong. On the other hand, should give some
bit of protection against OOM assuming the transactions aren't big.

For max_size we could be a bit more brave than 256MB. Perhaps 5GB? 10GB? Just
to have some sane limit up to which it is still imaginable that the instance
really has that much memory. Better than infinity.

Don't know. Also can leave unlimited, or ask Mons for good defaults since he
has experience of configuring Tarantool in prod.

See 2 comments below.

> diff --git a/test/replication/gh-5536-wal-limit.result b/test/replication/gh-5536-wal-limit.result
> new file mode 100644
> index 000000000..f7799baa8
> --- /dev/null
> +++ b/test/replication/gh-5536-wal-limit.result
> @@ -0,0 +1,132 @@
> +-- test-run result file version 2
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +fiber = require('fiber')
> + | ---
> + | ...
> +
> +--
> +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so
> +-- that appliers stop reading new transactions from master once the queue is
> +-- full.
> +--
> +box.schema.user.grant('guest', 'replication')
> + | ---
> + | ...
> +_ = box.schema.space.create('test')
> + | ---
> + | ...
> +_ = box.space.test:create_index('pk')
> + | ---
> + | ...
> +
> +replication_timeout = box.cfg.replication_timeout

1. Need to save the WAL limit in case it won't be defaulted
to 0 in future. To properly restore it in the end of the test.

> + | ---
> + | ...
> +box.cfg{replication_timeout=1000}
> + | ---
> + | ...=
2. Would be also good to have at least one test for WAL row count
limit, not only for the byte size.

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-03 20:59         ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-09 15:10           ` Serge Petrenko via Tarantool-patches
  0 siblings, 0 replies; 30+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-09 15:10 UTC (permalink / raw)
  To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches



03.03.2021 23:59, Vladislav Shpilevoy пишет:
>>>>>> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry)
>>>>>>     static inline int
>>>>>>     journal_write_async(struct journal_entry *entry)
>>>>>>     {
>>>>>> +    /*
>>>>>> +     * It's the job of the caller to check whether the queue is full prior
>>>>>> +     * to submitting the request.
>>>>>> +     */
>>>>>> +    assert(!journal_queue_is_full() || journal_queue.is_ready);
>>>>>> +    journal_queue_on_append(entry);
>>>>> 8. Probably must assert that waiters list is empty. Otherwise you
>>>>> could go out of order.
>>>> It's not empty by the time the first entry gets to 'journal_write_async'.
>>>> Everyone else is waken up, but not yet removed from the queue.
>>>>
>>>> Looks like we cannot determine whether a write is called after waiting in queue
>>>> or not.
>>> It bothers me a lot, the rest looks good. We don't have any protection against
>>> a reorder, even not an assertion. How about this? (I didn't test):
>> Thanks for thinking this through!
>> Yes, indeed, this is a problem. Unfortunately, your solution will allow reordering.
>> We needed the waiter count to prevent anyone from sliding in front when the qeueue
>> is already waken, but the fibers are not yet scheduled.
>>
>> We also discussed that a reorder may happen if applier tries to apply
>> a vinyl tx, which may yield after it has waited in queue.
>> In this case, if some tx is committed (synchronously) and added to the queue
>> while the vinyl tx sleeps, the vinyl tx will bypass that synchronous tx in queue.
>>
>> So, there shouldn't be any yields between journal_queue_wait() and
>> journal_write(_async)().
>>
>> We cannot yield in memtx transactions, so the only solution is to put
>> journal_queue_wait() after txn_prepare() inside txn_commit_async().
>>
>> Take a look at this patch (on top of the branch):
>> I decided to move journal_queue_wait() to journal_write_async() to be
>> consistent with journal_write(), which also checks queue.
>>
>> Now reordering is not a problem since waiting in queue is done right before
>> the write request is submitted. No yields in between.
>>
>> Also, I renamed journal_write_async() to journal_write_try_async() and
>> txn_commit_async() to txn_commit_try_async() to indicate that they may
>> yield occasionally now.
>>
>> What do you think of this approach?
> Looks good.

Ok,  squashed.

>
>> P.S. there's another thing I wanted to discuss: should we set defaults for
>> wal_queue_max_size(len) to some finite value? If yes, then what value should
>> it be? wal_queue_max_size=default memtx_memory (256 Mb)?
>> Don't know which value to choose for wal_queue_max_len at all.
> Probably we could try something sanely big. For instance, max_len = 1000000
> in assumption that you might replicate a million rows per second, and a WAL
> write is likely to end faster than a second. But if you still have a queue
> bigger than million, then it looks wrong. On the other hand, should give some
> bit of protection against OOM assuming the transactions aren't big.
>
> For max_size we could be a bit more brave than 256MB. Perhaps 5GB? 10GB? Just
> to have some sane limit up to which it is still imaginable that the instance
> really has that much memory. Better than infinity.
>
> Don't know. Also can leave unlimited, or ask Mons for good defaults since he
> has experience of configuring Tarantool in prod.

Asked Mons. He suggests 10-100k transactions and 10-50 Mb buffer.
Let's settle with 100k transactions and 100Mb buffer, just cause I think
bigger is better.

>
> See 2 comments below.
>
>> diff --git a/test/replication/gh-5536-wal-limit.result b/test/replication/gh-5536-wal-limit.result
>> new file mode 100644
>> index 000000000..f7799baa8
>> --- /dev/null
>> +++ b/test/replication/gh-5536-wal-limit.result
>> @@ -0,0 +1,132 @@
>> +-- test-run result file version 2
>> +test_run = require('test_run').new()
>> + | ---
>> + | ...
>> +fiber = require('fiber')
>> + | ---
>> + | ...
>> +
>> +--
>> +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so
>> +-- that appliers stop reading new transactions from master once the queue is
>> +-- full.
>> +--
>> +box.schema.user.grant('guest', 'replication')
>> + | ---
>> + | ...
>> +_ = box.schema.space.create('test')
>> + | ---
>> + | ...
>> +_ = box.space.test:create_index('pk')
>> + | ---
>> + | ...
>> +
>> +replication_timeout = box.cfg.replication_timeout
> 1. Need to save the WAL limit in case it won't be defaulted
> to 0 in future. To properly restore it in the end of the test.

Actually, no. WAL limits are only set on replica. Which's destroyed
in the end of the test anyway.
>
>> + | ---
>> + | ...
>> +box.cfg{replication_timeout=1000}
>> + | ---
>> + | ...=
> 2. Would be also good to have at least one test for WAL row count
> limit, not only for the byte size.

Sure. Here's the incremental diff (I've also added the changelog entry):

===============================

diff --git a/changelogs/unreleased/wal-queue-limit.md 
b/changelogs/unreleased/wal-queue-limit.md
new file mode 100644
index 000000000..393932456
--- /dev/null
+++ b/changelogs/unreleased/wal-queue-limit.md
@@ -0,0 +1,9 @@
+## feature/core
+
+* Introduce the concept of WAL queue and 2 new configuration options:
+  `wal_queue_max_len`, measured in transactions, with 100k default and
+  `wal_queue_max_size`, measured in bytes, with 100 Mb default.
+  The options help limit the pace at which replica submits new transactions
+  to WAL: the limits are checked every time a transaction from master is
+  submitted to replica's WAL, and the space taken by a transaction is
+  considered empty once it's successfully written (gh-5536).
diff --git a/src/box/journal.c b/src/box/journal.c
index 92a773684..4319f8b33 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -35,9 +35,9 @@
  struct journal *current_journal = NULL;

  struct journal_queue journal_queue = {
-    .max_size = INT64_MAX,
+    .max_size = 100 * 1024 * 1024, /* 100 megabytes */
      .size = 0,
-    .max_len = INT64_MAX,
+    .max_len = 100000,  /* 100k journal entries */
      .len = 0,
      .waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters),
      .waiter_count = 0,
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index c11a9e103..c16ebd290 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -71,8 +71,8 @@ local default_cfg = {
      wal_mode            = "write",
      wal_max_size        = 256 * 1024 * 1024,
      wal_dir_rescan_delay= 2,
-    wal_queue_max_size  = 0,
-    wal_queue_max_len   = 0,
+    wal_queue_max_size  = 100 * 1024 * 1024,
+    wal_queue_max_len   = 100000,
      force_recovery      = false,
      replication         = nil,
      instance_uuid       = nil,
diff --git a/test/app-tap/init_script.result 
b/test/app-tap/init_script.result
index 7a224e50e..2964f21bb 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -56,8 +56,8 @@ wal_dir:.
  wal_dir_rescan_delay:2
  wal_max_size:268435456
  wal_mode:write
-wal_queue_max_len:0
-wal_queue_max_size:0
+wal_queue_max_len:100000
+wal_queue_max_size:104857600
  worker_pool_threads:4
  --
  -- Test insert from detached fiber
diff --git a/test/box/admin.result b/test/box/admin.result
index c818f4f9f..91b193d1e 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -134,9 +134,9 @@ cfg_filter(box.cfg)
    - - wal_mode
      - write
    - - wal_queue_max_len
-    - 0
+    - 100000
    - - wal_queue_max_size
-    - 0
+    - 104857600
    - - worker_pool_threads
      - 4
  ...
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 19f322e7d..3adfbd9df 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -122,9 +122,9 @@ cfg_filter(box.cfg)
   |   - - wal_mode
   |     - write
   |   - - wal_queue_max_len
- |     - 0
+ |     - 100000
   |   - - wal_queue_max_size
- |     - 0
+ |     - 104857600
   |   - - worker_pool_threads
   |     - 4
   | ...
@@ -241,9 +241,9 @@ cfg_filter(box.cfg)
   |   - - wal_mode
   |     - write
   |   - - wal_queue_max_len
- |     - 0
+ |     - 100000
   |   - - wal_queue_max_size
- |     - 0
+ |     - 104857600
   |   - - worker_pool_threads
   |     - 4
   | ...
diff --git a/test/replication/gh-5536-wal-limit.result 
b/test/replication/gh-5536-wal-limit.result
index f7799baa8..2f6e7644b 100644
--- a/test/replication/gh-5536-wal-limit.result
+++ b/test/replication/gh-5536-wal-limit.result
@@ -42,9 +42,19 @@ test_run:switch('replica')
   | ---
   | - true
   | ...
+
+queue_cfg_opt = test_run:get_cfg('cfg_option')
+ | ---
+ | ...
  -- Huge replication timeout to not cause reconnects while applier is 
blocked.
--- Tiny queue size (in bytes) to allow exactly one queue entry at a time.
-box.cfg{wal_queue_max_size=1, replication_timeout=1000}
+cfg_tbl = {replication_timeout=1000}
+ | ---
+ | ...
+-- Tiny queue size to allow exactly one queue entry at a time.
+cfg_tbl[queue_cfg_opt] = 1
+ | ---
+ | ...
+box.cfg(cfg_tbl)
   | ---
   | ...
  write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT")
diff --git a/test/replication/gh-5536-wal-limit.test.lua 
b/test/replication/gh-5536-wal-limit.test.lua
index 1e7d61ff7..c32fbb08f 100644
--- a/test/replication/gh-5536-wal-limit.test.lua
+++ b/test/replication/gh-5536-wal-limit.test.lua
@@ -18,9 +18,13 @@ test_run:cmd('create server replica with 
rpl_master=default,\
  test_run:cmd('start server replica with wait=True, wait_load=True')

  test_run:switch('replica')
+
+queue_cfg_opt = test_run:get_cfg('cfg_option')
  -- Huge replication timeout to not cause reconnects while applier is 
blocked.
--- Tiny queue size (in bytes) to allow exactly one queue entry at a time.
-box.cfg{wal_queue_max_size=1, replication_timeout=1000}
+cfg_tbl = {replication_timeout=1000}
+-- Tiny queue size to allow exactly one queue entry at a time.
+cfg_tbl[queue_cfg_opt] = 1
+box.cfg(cfg_tbl)
  write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT")
  -- Block WAL writes so that we may test queue overflow.
  box.error.injection.set("ERRINJ_WAL_DELAY", true)
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 7e7004592..e7f660e47 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -37,7 +37,10 @@
      "gh-4928-tx-boundaries.test.lua": {},
      "gh-5440-qsync-ro.test.lua": {},
      "gh-5435-qsync-clear-synchro-queue-commit-all.test.lua": {},
-    "gh-5536-wal-limit.test.lua": {},
+    "gh-5536-wal-limit.test.lua": {
+        "wal_queue_max_len":  {"cfg_option": "wal_queue_max_len"},
+        "wal_queue_max_size": {"cfg_option": "wal_queue_max_size"}
+    },
      "*": {
          "memtx": {"engine": "memtx"},
          "vinyl": {"engine": "vinyl"}


-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-02-24 19:35 [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes Serge Petrenko via Tarantool-patches
                   ` (2 preceding siblings ...)
  2021-02-26  0:56 ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-09 19:49 ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-10  8:18   ` Konstantin Osipov via Tarantool-patches
  2021-03-19 15:36 ` Kirill Yukhin via Tarantool-patches
  4 siblings, 1 reply; 30+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-09 19:49 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

Hi! Thanks for the patch!

LGTM.

> https://github.com/tarantool/tarantool/issues/5536
> https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-09 19:49 ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-10  8:18   ` Konstantin Osipov via Tarantool-patches
  2021-03-12 17:10     ` Serge Petrenko via Tarantool-patches
  0 siblings, 1 reply; 30+ messages in thread
From: Konstantin Osipov via Tarantool-patches @ 2021-03-10  8:18 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy via Tarantool-patches <tarantool-patches@dev.tarantool.org> [21/03/09 23:44]:
> Hi! Thanks for the patch!
> 
> LGTM.
> 
> > https://github.com/tarantool/tarantool/issues/5536
> > https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom

I don't understand why you need two parameters, size and len. What
difference does it make how many requests are in the queue? The
queue is IO and memory bound, not CPU bound, at least not more cpu
bound than any other subsystem.

Pushing the problem to the user by forcing them to choose not one,
but two configuration values is poor design. How am I supposed to
know what is the right value for the option? 

I guess don't know it either - the patch was not followed by
a benchmark which would prove the selected defaults are optimal.

Tarantool has an incoming throttling limit which makes the whole
system work like a clock: it's the fiber  pool size and net msg
size. They were carefully selected after days and weeks of
testing. 

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-10  8:18   ` Konstantin Osipov via Tarantool-patches
@ 2021-03-12 17:10     ` Serge Petrenko via Tarantool-patches
  2021-03-13 19:14       ` Konstantin Osipov via Tarantool-patches
  2021-03-15 23:42       ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 2 replies; 30+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-12 17:10 UTC (permalink / raw)
  To: Konstantin Osipov, Vladislav Shpilevoy, gorcunov, tarantool-patches



10.03.2021 11:18, Konstantin Osipov пишет:
> * Vladislav Shpilevoy via Tarantool-patches <tarantool-patches@dev.tarantool.org> [21/03/09 23:44]:
>> Hi! Thanks for the patch!
>>
>> LGTM.
>>
>>> https://github.com/tarantool/tarantool/issues/5536
>>> https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom
> I don't understand why you need two parameters, size and len. What
> difference does it make how many requests are in the queue? The
> queue is IO and memory bound, not CPU bound, at least not more cpu
> bound than any other subsystem.
>
> Pushing the problem to the user by forcing them to choose not one,
> but two configuration values is poor design. How am I supposed to
> know what is the right value for the option?

After some consideration, let's only leave size.
I've force pushed the changes on the branch.

I've also implemented some kind of a fiber semaphore in a separate 
commit on top.
I'm not sure I like how it turned out, so feel free to throw it away. Or 
keep it.


>
> I guess don't know it either - the patch was not followed by
> a benchmark which would prove the selected defaults are optimal.
>
> Tarantool has an incoming throttling limit which makes the whole
> system work like a clock: it's the fiber  pool size and net msg
> size. They were carefully selected after days and weeks of
> testing.
>
-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-12 17:10     ` Serge Petrenko via Tarantool-patches
@ 2021-03-13 19:14       ` Konstantin Osipov via Tarantool-patches
  2021-03-15 23:42       ` Vladislav Shpilevoy via Tarantool-patches
  1 sibling, 0 replies; 30+ messages in thread
From: Konstantin Osipov via Tarantool-patches @ 2021-03-13 19:14 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: tarantool-patches, Vladislav Shpilevoy

* Serge Petrenko <sergepetrenko@tarantool.org> [21/03/12 20:14]:
> > * Vladislav Shpilevoy via Tarantool-patches <tarantool-patches@dev.tarantool.org> [21/03/09 23:44]:
> > > Hi! Thanks for the patch!
> > > 
> > > LGTM.
> > > 
> > > > https://github.com/tarantool/tarantool/issues/5536
> > > > https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom
> > I don't understand why you need two parameters, size and len. What
> > difference does it make how many requests are in the queue? The
> > queue is IO and memory bound, not CPU bound, at least not more cpu
> > bound than any other subsystem.
> > 
> > Pushing the problem to the user by forcing them to choose not one,
> > but two configuration values is poor design. How am I supposed to
> > know what is the right value for the option?
> 
> After some consideration, let's only leave size.
> I've force pushed the changes on the branch.
> 
> I've also implemented some kind of a fiber semaphore in a separate commit on
> top.
> I'm not sure I like how it turned out, so feel free to throw it away. Or
> keep it.

As long as you can benchmark it showing that it is has a
reasonable default (it should be pretty straightforward to
benchmark - send a huge xlog over 10Gb link to tarantool running
on a reasonably fresh SSD and find out optimal limit size) I will
be pretty happy with dropping the size.

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-12 17:10     ` Serge Petrenko via Tarantool-patches
  2021-03-13 19:14       ` Konstantin Osipov via Tarantool-patches
@ 2021-03-15 23:42       ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-16  6:45         ` Konstantin Osipov via Tarantool-patches
  2021-03-16 10:19         ` Serge Petrenko via Tarantool-patches
  1 sibling, 2 replies; 30+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-15 23:42 UTC (permalink / raw)
  To: Serge Petrenko, Konstantin Osipov, gorcunov, tarantool-patches

Hi! Thanks for the patch!

I must admit, it looks kind of ugly :D. The class we have now only remotely
looks like a semaphore.

Number of reasons, some of which you already listed in the header file:

- It is advisory. Can be bypassed easily if you forget to check wouldblock.
  But not a big issue really. An optional thing like 'try_take' is needed
  for box.commit({is_async = true}) anyway, not to block the fiber;

- You can take more amount of the resource than there is. Bearable as well,
  but still;

- sem_release() does not wakeup anybody. Quite counter-intuitive;

- The wouldblock check not only checks the resource being available, but also
  if there are any waiters. It wouldn't matter for a real semaphore, because
  it has nothing to do with ordering the waiters in FIFO. It is a detail of
  the journal which slipped into the general class.
  But maybe that is the only way to make it fair? Otherwise some fibers
  could be blocked forever due to starvation.

The last thing I am not sure is even an issue. Might be a feature.

The others probably can be fixed if we would rework journal_queue API. For
instance, not have journal_queue_wait() separated from journal_queue_on_append().
Then sem_take() could become blocking and obligatory.

You simply inline everything into journal_write() and journal_write_try_async(),
and you will see that you can always call take() and block inside of it.

But I don't know if it is worth doing TBH. It is used in a single place so far.
This is hard to define fiber_sem API which would be suitable for future usages.
I would vote for not doing it now and see if we would need the semaphore in the
future.

Although the idea about removing journal_queue_wait() might be worth trying.
It is used either right before journal_queue_on_append(), or in
journal_queue_flush() which is also right before journal_queue_on_append().
Up to you. Anyway we need to return to this code for box.commit({is_async})
feature, which means the hard polishing might be not so useful.

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-15 23:42       ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-16  6:45         ` Konstantin Osipov via Tarantool-patches
  2021-03-16 20:27           ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-16 10:19         ` Serge Petrenko via Tarantool-patches
  1 sibling, 1 reply; 30+ messages in thread
From: Konstantin Osipov via Tarantool-patches @ 2021-03-16  6:45 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [21/03/16 09:16]:

BTW, one option is to change semantics of "async" to be "best
effort async". That is, if the queue is full, we don't drop the
transaction, we turn it into a waiting one.

Thoughts?

> Hi! Thanks for the patch!
> 
> I must admit, it looks kind of ugly :D. The class we have now only remotely
> looks like a semaphore.
> 
> Number of reasons, some of which you already listed in the header file:
> 
> - It is advisory. Can be bypassed easily if you forget to check wouldblock.
>   But not a big issue really. An optional thing like 'try_take' is needed
>   for box.commit({is_async = true}) anyway, not to block the fiber;
> 
> - You can take more amount of the resource than there is. Bearable as well,
>   but still;
> 
> - sem_release() does not wakeup anybody. Quite counter-intuitive;
> 
> - The wouldblock check not only checks the resource being available, but also
>   if there are any waiters. It wouldn't matter for a real semaphore, because
>   it has nothing to do with ordering the waiters in FIFO. It is a detail of
>   the journal which slipped into the general class.
>   But maybe that is the only way to make it fair? Otherwise some fibers
>   could be blocked forever due to starvation.
> 
> The last thing I am not sure is even an issue. Might be a feature.
> 
> The others probably can be fixed if we would rework journal_queue API. For
> instance, not have journal_queue_wait() separated from journal_queue_on_append().
> Then sem_take() could become blocking and obligatory.
> 
> You simply inline everything into journal_write() and journal_write_try_async(),
> and you will see that you can always call take() and block inside of it.
> 
> But I don't know if it is worth doing TBH. It is used in a single place so far.
> This is hard to define fiber_sem API which would be suitable for future usages.
> I would vote for not doing it now and see if we would need the semaphore in the
> future.
> 
> Although the idea about removing journal_queue_wait() might be worth trying.
> It is used either right before journal_queue_on_append(), or in
> journal_queue_flush() which is also right before journal_queue_on_append().
> Up to you. Anyway we need to return to this code for box.commit({is_async})
> feature, which means the hard polishing might be not so useful.



-- 
Konstantin Osipov, Moscow, Russia
https://scylladb.com

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-15 23:42       ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-16  6:45         ` Konstantin Osipov via Tarantool-patches
@ 2021-03-16 10:19         ` Serge Petrenko via Tarantool-patches
  2021-03-16 20:48           ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-17 21:02           ` Vladislav Shpilevoy via Tarantool-patches
  1 sibling, 2 replies; 30+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-16 10:19 UTC (permalink / raw)
  To: Vladislav Shpilevoy, Konstantin Osipov, gorcunov, tarantool-patches



16.03.2021 02:42, Vladislav Shpilevoy пишет:
> Hi! Thanks for the patch!

Thanks for your answer!

>
> I must admit, it looks kind of ugly :D. The class we have now only remotely
> looks like a semaphore.
>
> Number of reasons, some of which you already listed in the header file:
>
> - It is advisory. Can be bypassed easily if you forget to check wouldblock.
>    But not a big issue really. An optional thing like 'try_take' is needed
>    for box.commit({is_async = true}) anyway, not to block the fiber;
>
> - You can take more amount of the resource than there is. Bearable as well,
>    but still;
>
> - sem_release() does not wakeup anybody. Quite counter-intuitive;
>
> - The wouldblock check not only checks the resource being available, but also
>    if there are any waiters. It wouldn't matter for a real semaphore, because
>    it has nothing to do with ordering the waiters in FIFO. It is a detail of
>    the journal which slipped into the general class.
>    But maybe that is the only way to make it fair? Otherwise some fibers
>    could be blocked forever due to starvation.
>
> The last thing I am not sure is even an issue. Might be a feature.
>
> The others probably can be fixed if we would rework journal_queue API. For
> instance, not have journal_queue_wait() separated from journal_queue_on_append().
> Then sem_take() could become blocking and obligatory.
>
> You simply inline everything into journal_write() and journal_write_try_async(),
> and you will see that you can always call take() and block inside of it.
>
> But I don't know if it is worth doing TBH. It is used in a single place so far.
> This is hard to define fiber_sem API which would be suitable for future usages.
> I would vote for not doing it now and see if we would need the semaphore in the
> future.
>
> Although the idea about removing journal_queue_wait() might be worth trying.
> It is used either right before journal_queue_on_append(), or in
> journal_queue_flush() which is also right before journal_queue_on_append().
> Up to you. Anyway we need to return to this code for box.commit({is_async})
> feature, which means the hard polishing might be not so useful.

Let's drop the semaphore thing then. I removed the commit introducing it and
pushed the patch on a new branch: sp/gh-5536-replica-oom-no-sem

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-16  6:45         ` Konstantin Osipov via Tarantool-patches
@ 2021-03-16 20:27           ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 0 replies; 30+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-16 20:27 UTC (permalink / raw)
  To: Konstantin Osipov, Serge Petrenko, gorcunov, tarantool-patches

> BTW, one option is to change semantics of "async" to be "best
> effort async". That is, if the queue is full, we don't drop the
> transaction, we turn it into a waiting one.
> 
> Thoughts?

If you are talking about internal API (txn_commit_async), Sergey did it
in this patch. It is now called txn_commit_try_async and it blocks the
fiber if the queue is full.

If you are talking about is_async option for box.commit(), then yeah,
this is a good idea. One of them. It is not decided yet what options and
with which names we will expose in box.commit(). No any design so far.
You can left comments here https://github.com/tarantool/tarantool/issues/67
with your proposals so they are not lost.

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-16 10:19         ` Serge Petrenko via Tarantool-patches
@ 2021-03-16 20:48           ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-17 12:14             ` Serge Petrenko via Tarantool-patches
  2021-03-17 21:02           ` Vladislav Shpilevoy via Tarantool-patches
  1 sibling, 1 reply; 30+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-16 20:48 UTC (permalink / raw)
  To: Serge Petrenko, Konstantin Osipov, gorcunov, tarantool-patches

Hi! Thanks for the fixes!

See 3 comments below.

>     Here's when the option comes in handy:
>     Imagine such a situation: there are 2 servers, a master and a replica,
>     and the replica is down for some period of time. While the replica is
>     down, the master serves requests at a reasonable pace, possibly close to
>     its WAL throughput limit. Once the replica reconnects, it has to receive
>     all the data master has piled up. Now there's no limit in speed at which
>     master sends the data to replica, and there's no limit at which
>     replica's applier submits corresponding write requests to WAL. This
>     leads to a situation when replica's WAL is never in time to serve the
>     requests and the amount of pending requests is constantly growing.
>     There's no limit for memory WAL write requests take, and this clogging
>     of WAL write queue may even lead to replica using up all the available
>     memory.
>     
>     Now, when `wal_queue_max_size` is set, appliers will stop reading new
>     transactions once the limit is reached. This will let WAL process all the
>     requests that have piled up and free all the excess memory.
>     
>     [tosquash] remove wal_queue_max_len

1. You forgot something, the last line. Also, while we are here, it probably
would be easier for the doc team if the old behaviour was described using a
past tense, while the new one using the present tense. Currently you use
'now' word both for the old and for the new behaviour. For instance, you say

	Now there's no limit in speed at which master sends the data to replica,
	and there's no limit at which replica's applier submits corresponding
	write requests to WAL

But >now< there is a limit. Even if 'wal_queue_max_size' is not set, it works
with the default value.

> diff --git a/changelogs/unreleased/wal-queue-limit.md b/changelogs/unreleased/wal-queue-limit.md
> new file mode 100644
> index 000000000..393932456
> --- /dev/null
> +++ b/changelogs/unreleased/wal-queue-limit.md
> @@ -0,0 +1,9 @@
> +## feature/core
> +
> +* Introduce the concept of WAL queue and 2 new configuration options:
> +  `wal_queue_max_len`, measured in transactions, with 100k default and
> +  `wal_queue_max_size`, measured in bytes, with 100 Mb default.

2. There is 1 option now, not 2.

> +  The options help limit the pace at which replica submits new transactions
> +  to WAL: the limits are checked every time a transaction from master is
> +  submitted to replica's WAL, and the space taken by a transaction is
> +  considered empty once it's successfully written (gh-5536).> diff --git a/src/box/journal.h b/src/box/journal.h
> index 5d8d5a726..437257728 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -124,6 +142,62 @@ struct journal {
> +
> +/** Set maximal journal queue size in bytes. */
> +static inline void
> +journal_queue_set_max_size(int64_t size)
> +{
> +	journal_queue.max_size = size;
> +	journal_queue_wakeup();
> +}
> +
> +/** Increase queue size on a new write request. */
> +static inline void
> +journal_queue_on_append(struct journal_entry *entry)

3. Since you will amend the patch anyway, you could also
make the entry 'const', the same in journal_queue_on_complete().

> +{
> +	journal_queue.size += entry->approx_len;
> +}
> +
> +/** Decrease queue size once write request is complete. */
> +static inline void
> +journal_queue_on_complete(struct journal_entry *entry)
> +{
> +	journal_queue.size -= entry->approx_len;
> +	assert(journal_queue.size >= 0);
> +}

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-16 20:48           ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-17 12:14             ` Serge Petrenko via Tarantool-patches
  0 siblings, 0 replies; 30+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-17 12:14 UTC (permalink / raw)
  To: Vladislav Shpilevoy, Konstantin Osipov, gorcunov, tarantool-patches



16.03.2021 23:48, Vladislav Shpilevoy пишет:
> Hi! Thanks for the fixes!

Thanks for the review!

> See 3 comments below.
>
>>      Here's when the option comes in handy:
>>      Imagine such a situation: there are 2 servers, a master and a replica,
>>      and the replica is down for some period of time. While the replica is
>>      down, the master serves requests at a reasonable pace, possibly close to
>>      its WAL throughput limit. Once the replica reconnects, it has to receive
>>      all the data master has piled up. Now there's no limit in speed at which
>>      master sends the data to replica, and there's no limit at which
>>      replica's applier submits corresponding write requests to WAL. This
>>      leads to a situation when replica's WAL is never in time to serve the
>>      requests and the amount of pending requests is constantly growing.
>>      There's no limit for memory WAL write requests take, and this clogging
>>      of WAL write queue may even lead to replica using up all the available
>>      memory.
>>      
>>      Now, when `wal_queue_max_size` is set, appliers will stop reading new
>>      transactions once the limit is reached. This will let WAL process all the
>>      requests that have piled up and free all the excess memory.
>>      
>>      [tosquash] remove wal_queue_max_len
> 1. You forgot something, the last line. Also, while we are here, it probably
> would be easier for the doc team if the old behaviour was described using a
> past tense, while the new one using the present tense. Currently you use
> 'now' word both for the old and for the new behaviour. For instance, you say
>
> 	Now there's no limit in speed at which master sends the data to replica,
> 	and there's no limit at which replica's applier submits corresponding
> 	write requests to WAL
>
> But >now< there is a limit. Even if 'wal_queue_max_size' is not set, it works
> with the default value.

Thanks! Take a look at the revised paragraph:


=====================================================
Here's when the option comes in handy:

Before this option was introduced such a situation could be possible:
there are 2 servers, a master and a replica, and the replica is down for
some period of time. While the replica is down, master serves requests
at a reasonable pace, possibly close to its WAL throughput limit. Once the
replica reconnects, it has to receive all the data master has piled up and
there's no limit in speed at which master sends the data to replica, and,
without the option, there was no limit in speed at which replica submitted
corresponding write requests to WAL.

This lead to a situation when replica's WAL was never in time to serve the
requests and the amount of pending requests was constantly growing.
There was no limit for memory WAL write requests take, and this clogging
of WAL write queue could even lead to replica using up all the available
memory.

Now, when `wal_queue_max_size` is set, appliers will stop reading new
transactions once the limit is reached. This will let WAL process all the
requests that have piled up and free all the excess memory.

=====================================================

>> diff --git a/changelogs/unreleased/wal-queue-limit.md b/changelogs/unreleased/wal-queue-limit.md
>> new file mode 100644
>> index 000000000..393932456
>> --- /dev/null
>> +++ b/changelogs/unreleased/wal-queue-limit.md
>> @@ -0,0 +1,9 @@
>> +## feature/core
>> +
>> +* Introduce the concept of WAL queue and 2 new configuration options:
>> +  `wal_queue_max_len`, measured in transactions, with 100k default and
>> +  `wal_queue_max_size`, measured in bytes, with 100 Mb default.
> 2. There is 1 option now, not 2.

Sorry for the inattention, fixed.

>
>> +  The options help limit the pace at which replica submits new transactions
>> +  to WAL: the limits are checked every time a transaction from master is
>> +  submitted to replica's WAL, and the space taken by a transaction is
>> +  considered empty once it's successfully written (gh-5536).> diff --git a/src/box/journal.h b/src/box/journal.h
>> index 5d8d5a726..437257728 100644
>> --- a/src/box/journal.h
>> +++ b/src/box/journal.h
>> @@ -124,6 +142,62 @@ struct journal {
>> +
>> +/** Set maximal journal queue size in bytes. */
>> +static inline void
>> +journal_queue_set_max_size(int64_t size)
>> +{
>> +	journal_queue.max_size = size;
>> +	journal_queue_wakeup();
>> +}
>> +
>> +/** Increase queue size on a new write request. */
>> +static inline void
>> +journal_queue_on_append(struct journal_entry *entry)
> 3. Since you will amend the patch anyway, you could also
> make the entry 'const', the same in journal_queue_on_complete().

Sure. The diff's below.

>
>> +{
>> +	journal_queue.size += entry->approx_len;
>> +}
>> +
>> +/** Decrease queue size once write request is complete. */
>> +static inline void
>> +journal_queue_on_complete(struct journal_entry *entry)
>> +{
>> +	journal_queue.size -= entry->approx_len;
>> +	assert(journal_queue.size >= 0);
>> +}

=====================================================diff --git 
a/changelogs/unreleased/wal-queue-limit.md 
b/changelogs/unreleased/wal-queue-limit.md
index 393932456..1708e46e6 100644
--- a/changelogs/unreleased/wal-queue-limit.md
+++ b/changelogs/unreleased/wal-queue-limit.md
@@ -1,9 +1,8 @@
  ## feature/core

-* Introduce the concept of WAL queue and 2 new configuration options:
-  `wal_queue_max_len`, measured in transactions, with 100k default and
+* Introduce the concept of WAL queue and a new configuration option:
    `wal_queue_max_size`, measured in bytes, with 100 Mb default.
-  The options help limit the pace at which replica submits new transactions
-  to WAL: the limits are checked every time a transaction from master is
+  The option helps limit the pace at which replica submits new transactions
+  to WAL: the limit is checked every time a transaction from master is
    submitted to replica's WAL, and the space taken by a transaction is
    considered empty once it's successfully written (gh-5536).
diff --git a/src/box/journal.h b/src/box/journal.h
index 437257728..76c70c19f 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -185,14 +185,14 @@ journal_queue_set_max_size(int64_t size)

  /** Increase queue size on a new write request. */
  static inline void
-journal_queue_on_append(struct journal_entry *entry)
+journal_queue_on_append(const struct journal_entry *entry)
  {
         journal_queue.size += entry->approx_len;
  }

  /** Decrease queue size once write request is complete. */
  static inline void
-journal_queue_on_complete(struct journal_entry *entry)
+journal_queue_on_complete(const struct journal_entry *entry)
  {
         journal_queue.size -= entry->approx_len;
         assert(journal_queue.size >= 0);
diff --git a/test/box-tap/cfg.test.lua b/test/box-tap/cfg.test.lua
index 3276ddf64..8f21c5628 100755
--- a/test/box-tap/cfg.test.lua
+++ b/test/box-tap/cfg.test.lua
@@ -6,7 +6,7 @@ local socket = require('socket')
  local fio = require('fio')
  local uuid = require('uuid')
  local msgpack = require('msgpack')
-test:plan(110)
+test:plan(109)

  --------------------------------------------------------------------------------
  -- Invalid values
@@ -50,7 +50,6 @@ invalid('vinyl_run_size_ratio', 1)
  invalid('vinyl_bloom_fpr', 0)
  invalid('vinyl_bloom_fpr', 1.1)
  invalid('wal_queue_max_size', -1)
-invalid('wal_queue_max_len', -1)

  local function invalid_combinations(name, val)
      local status, result = pcall(box.cfg, val)
-- 

Serge Petrenko


^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-16 10:19         ` Serge Petrenko via Tarantool-patches
  2021-03-16 20:48           ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-17 21:02           ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-19 11:32             ` Serge Petrenko via Tarantool-patches
  1 sibling, 1 reply; 30+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-17 21:02 UTC (permalink / raw)
  To: Serge Petrenko, Konstantin Osipov, gorcunov, tarantool-patches

Hi! Thanks for the patch!

LGTM

> sp/gh-5536-replica-oom-no-sem

^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-03-17 21:02           ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-19 11:32             ` Serge Petrenko via Tarantool-patches
  0 siblings, 0 replies; 30+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-19 11:32 UTC (permalink / raw)
  To: Vladislav Shpilevoy, Konstantin Osipov, gorcunov, tarantool-patches



18.03.2021 00:02, Vladislav Shpilevoy пишет:
> Hi! Thanks for the patch!
>
> LGTM
>
>> sp/gh-5536-replica-oom-no-sem

Hi! Updated the queue size to 16 Mb, as discussed with Mons and Vlad. No 
other changes.

Branch: sp/gh-5536-replica-oom-no-sem

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 30+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
  2021-02-24 19:35 [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes Serge Petrenko via Tarantool-patches
                   ` (3 preceding siblings ...)
  2021-03-09 19:49 ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-19 15:36 ` Kirill Yukhin via Tarantool-patches
  4 siblings, 0 replies; 30+ messages in thread
From: Kirill Yukhin via Tarantool-patches @ 2021-03-19 15:36 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: tarantool-patches, v.shpilevoy

Hello,

On 24 фев 22:35, Serge Petrenko via Tarantool-patches wrote:
> Since the introduction of asynchronous commit, which doesn't wait for a
> WAL write to succeed, it's quite easy to clog WAL with huge amounts
> write requests. For now, it's only possible from an applier, since it's
> the only user of async commit at the moment.
> 
> This happens when replica is syncing with master and reads new
> transactions at a pace higher than it can write them to WAL (see docbot
> request for detailed explanation).
> 
> To ameliorate such behavior, we need to introduce some limit on
> not-yet-finished WAL write requests. This is what this commit is trying
> to do.
> Two new counters are added to wal writer: queue_size (in bytes) and
> queue_len (in wal messages) together with configuration settings:
> `wal_queue_max_size` and `wal_queue_max_len`.
> Size and length are increased on every new submitted request, and are
> decreased once the tx receives a confirmation that a specific request
> was written.
> Actually, the limits are added to an abstract journal, but take effect
> only for wal writer.
> 
> Once size or length reach their maximum values, applier is blocked until
> some of the write requests are finished.
> 
> The size limit isn't strict, i.e. if there's at least one free byte, the
> whole write request fits and no blocking is involved.
> 
> The feature is ready for `box.commit{is_async=true}`. Once it's
> implemented, it should check whether the queue is full and let the user
> decide what to do next. Either wait or roll the tx back.
> 
> Part of #5536

I've checked your patch into 2.6, 2.7 and master.

--
Regards, Kirill Yukhin

^ permalink raw reply	[flat|nested] 30+ messages in thread

end of thread, other threads:[~2021-03-19 15:36 UTC | newest]

Thread overview: 30+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-02-24 19:35 [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes Serge Petrenko via Tarantool-patches
2021-02-24 19:40 ` Serge Petrenko via Tarantool-patches
2021-02-25 13:05 ` Konstantin Osipov via Tarantool-patches
2021-02-26  0:57   ` Vladislav Shpilevoy via Tarantool-patches
2021-02-26  7:18     ` Konstantin Osipov via Tarantool-patches
2021-02-26 20:23       ` Vladislav Shpilevoy via Tarantool-patches
2021-02-26 21:20         ` Konstantin Osipov via Tarantool-patches
2021-02-26 22:44           ` Vladislav Shpilevoy via Tarantool-patches
2021-02-27 13:27             ` Konstantin Osipov via Tarantool-patches
2021-03-01 19:15   ` Serge Petrenko via Tarantool-patches
2021-03-01 21:46     ` Konstantin Osipov via Tarantool-patches
2021-02-26  0:56 ` Vladislav Shpilevoy via Tarantool-patches
2021-03-01 19:08   ` Serge Petrenko via Tarantool-patches
2021-03-01 22:05     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-02 17:51       ` Serge Petrenko via Tarantool-patches
2021-03-03 20:59         ` Vladislav Shpilevoy via Tarantool-patches
2021-03-09 15:10           ` Serge Petrenko via Tarantool-patches
2021-03-09 19:49 ` Vladislav Shpilevoy via Tarantool-patches
2021-03-10  8:18   ` Konstantin Osipov via Tarantool-patches
2021-03-12 17:10     ` Serge Petrenko via Tarantool-patches
2021-03-13 19:14       ` Konstantin Osipov via Tarantool-patches
2021-03-15 23:42       ` Vladislav Shpilevoy via Tarantool-patches
2021-03-16  6:45         ` Konstantin Osipov via Tarantool-patches
2021-03-16 20:27           ` Vladislav Shpilevoy via Tarantool-patches
2021-03-16 10:19         ` Serge Petrenko via Tarantool-patches
2021-03-16 20:48           ` Vladislav Shpilevoy via Tarantool-patches
2021-03-17 12:14             ` Serge Petrenko via Tarantool-patches
2021-03-17 21:02           ` Vladislav Shpilevoy via Tarantool-patches
2021-03-19 11:32             ` Serge Petrenko via Tarantool-patches
2021-03-19 15:36 ` Kirill Yukhin via Tarantool-patches

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox