Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes
@ 2021-02-11 12:17 Serge Petrenko via Tarantool-patches
  2021-02-15 11:17 ` Cyrill Gorcunov via Tarantool-patches
  2021-02-17 20:46 ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 2 replies; 9+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-02-11 12:17 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov; +Cc: tarantool-patches

Since the introduction of asynchronous commit, which doesn't wait for a
WAL write to succeed, it's quite easy to clog WAL with huge amounts
write requests. For now, it's only possible from an applier, since it's
the only user of async commit at the moment.

This happens when replica is syncing with master and reads new
transactions at a pace higher than it can write them to WAL (see docbot
request for detailed explanation).

To ameliorate such behavior, we need to introduce some limit on
not-yet-finished WAL write requests. This is what this commit is trying
to do.
Two new counters are added to wal writer: queue_size (in bytes) and
queue_len (in wal messages) together with configuration settings:
`wal_queue_max_size` and `wal_queue_max_len`.
Size and length are increased on every new submitted request, and are
decreased once the tx receives a confirmation that a specific request
was written.
Actually, the limits are added to an abstract journal, but take effect
only for wal writer.

Once size or length reach their maximum values, applier is blocked until
some of the write requests are finished.

The size limit isn't strict, i.e. if there's at least one free byte, the
whole write request fits and no blocking is involved.

The feature is ready for `box.commit{is_async=true}`. Once it's
implemented, it should check whether the queue is full and let the user
decide what to do next. Either wait or roll the tx back.

Part of #5536

@TarantoolBot document
Title: new configuration options: 'wal_queue_max_size', 'wal_queue_max_len'

`wal_queue_max_size` and `wal_queue_max_len` put a limit on the amount
of concurrent write requests submitted to WAL.
`wal_queue_max_size` is measured in number of bytes to be written (0
means unlimited), and `wal_queue_max_len` is measured in number of
transactions, 0 meaning unlimited.
These options only affect replica behaviour at the moment, and default
to 0. They limit the pace at which replica reads new transactions from
master.

Here's when these options come in handy:
Imagine such a situation: there are 2 servers, a master and a replica,
and the replica is down for some period of time. While the replica is
down, the master serves requests at a reasonable pace, possibly close to
its WAL throughput limit. Once the replica reconnects, it has to receive
all the data master has piled up. Now there's no limit in speed at which
master sends the data to replica, and there's no limit at which
replica's applier submits corresponding write requests to WAL. This
leads to a situation when replica's WAL is never in time to serve the
requests and the amount of pending requests is constantly growing.
There's no limit for memory WAL write requests take, and this clogging
of WAL write queue may even lead to replica using up all the available
memory.

Now, when either `wal_queue_max_size` or `wal_queue_max_len` or both are
set, appliers will stop reading new transactions once the limit is
reached. This will let WAL process all the requests that have piled up
and free all the excess memory.
---
https://github.com/tarantool/tarantool/issues/5536
https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom

Changes in v2:
  - Move queue logic to journal.

 src/box/applier.cc                          |   9 ++
 src/box/box.cc                              |  46 +++++++
 src/box/box.h                               |   2 +
 src/box/journal.c                           |  53 ++++++++
 src/box/journal.h                           | 103 ++++++++++++++-
 src/box/lua/cfg.cc                          |  18 +++
 src/box/lua/load_cfg.lua                    |   6 +
 src/box/wal.c                               |  18 +++
 src/box/wal.h                               |  14 +++
 test/app-tap/init_script.result             |   2 +
 test/box-tap/cfg.test.lua                   |   4 +-
 test/box/admin.result                       |   4 +
 test/box/cfg.result                         |   8 ++
 test/replication/gh-5536-wal-limit.result   | 132 ++++++++++++++++++++
 test/replication/gh-5536-wal-limit.test.lua |  58 +++++++++
 test/replication/suite.cfg                  |   1 +
 test/replication/suite.ini                  |   2 +-
 17 files changed, 472 insertions(+), 8 deletions(-)
 create mode 100644 test/replication/gh-5536-wal-limit.result
 create mode 100644 test/replication/gh-5536-wal-limit.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 553db76fc..06aaa0a79 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
 		goto success;
 	}
 
+	/*
+	 * Do not spam WAL with excess write requests, let it process what's
+	 * piled up first.
+	 * This is done before opening the transaction to avoid problems with
+	 * yielding inside it.
+	 */
+	if (journal_queue_is_full(current_journal))
+		journal_wait_queue();
+
 	/**
 	 * Explicitly begin the transaction so that we can
 	 * control fiber->gc life cycle and, in case of apply
diff --git a/src/box/box.cc b/src/box/box.cc
index 26cbe8aab..2b335599e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -754,6 +754,28 @@ box_check_wal_mode(const char *mode_name)
 	return (enum wal_mode) mode;
 }
 
+static int64_t
+box_check_wal_queue_max_len(void)
+{
+	int64_t len = cfg_geti64("wal_queue_max_len");
+	if (len < 0) {
+		diag_set(ClientError, ER_CFG, "wal_queue_max_len",
+			 "wal_queue_max_len must be >= 0");
+	}
+	return len;
+}
+
+static int64_t
+box_check_wal_queue_max_size(void)
+{
+	int64_t size = cfg_geti64("wal_queue_max_size");
+	if (size < 0) {
+		diag_set(ClientError, ER_CFG, "wal_queue_max_size",
+			 "wal_queue_max_size must be >= 0");
+	}
+	return size;
+}
+
 static void
 box_check_readahead(int readahead)
 {
@@ -875,6 +897,10 @@ box_check_config(void)
 	box_check_checkpoint_count(cfg_geti("checkpoint_count"));
 	box_check_wal_max_size(cfg_geti64("wal_max_size"));
 	box_check_wal_mode(cfg_gets("wal_mode"));
+	if (box_check_wal_queue_max_size() < 0)
+		diag_raise();
+	if (box_check_wal_queue_max_len() < 0)
+		diag_raise();
 	if (box_check_memory_quota("memtx_memory") < 0)
 		diag_raise();
 	box_check_memtx_min_tuple_size(cfg_geti64("memtx_min_tuple_size"));
@@ -1411,6 +1437,26 @@ box_set_checkpoint_wal_threshold(void)
 	wal_set_checkpoint_threshold(threshold);
 }
 
+int
+box_set_wal_queue_max_size(void)
+{
+	int64_t size = box_check_wal_queue_max_size();
+	if (size < 0)
+		return -1;
+	wal_set_queue_max_size(size);
+	return 0;
+}
+
+int
+box_set_wal_queue_max_len(void)
+{
+	int64_t len = box_check_wal_queue_max_len();
+	if (len < 0)
+		return -1;
+	wal_set_queue_max_len(len);
+	return 0;
+}
+
 void
 box_set_vinyl_memory(void)
 {
diff --git a/src/box/box.h b/src/box/box.h
index b68047a95..4f5b4b617 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -239,6 +239,8 @@ void box_set_readahead(void);
 void box_set_checkpoint_count(void);
 void box_set_checkpoint_interval(void);
 void box_set_checkpoint_wal_threshold(void);
+int box_set_wal_queue_max_size(void);
+int box_set_wal_queue_max_len(void);
 void box_set_memtx_memory(void);
 void box_set_memtx_max_tuple_size(void);
 void box_set_vinyl_memory(void);
diff --git a/src/box/journal.c b/src/box/journal.c
index cb320b557..19a184580 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -55,3 +55,56 @@ journal_entry_new(size_t n_rows, struct region *region,
 			     complete_data);
 	return entry;
 }
+
+struct journal_queue_entry {
+	/** The fiber waiting for queue space to free. */
+	struct fiber *fiber;
+	/** Whether the fiber should be waken up regardless of queue size. */
+	bool is_ready;
+	/** A link in all waiting fibers list. */
+	struct rlist in_queue;
+};
+
+/**
+ * Wake up the next waiter in journal queue.
+ */
+void
+journal_queue_wakeup_next(struct rlist *link, bool force_ready)
+{
+	/* Empty queue or last entry in queue. */
+	if (link == rlist_last(&current_journal->waiters)) {
+		current_journal->queue_is_woken = false;
+		return;
+	}
+	/*
+	 * When the queue isn't forcefully emptied, no need to wake everyone
+	 * else up until there's some free space.
+	 */
+	if (journal_queue_is_full(current_journal) && !force_ready) {
+		current_journal->queue_is_woken = false;
+		return;
+	}
+	struct journal_queue_entry *e = rlist_entry(rlist_next(link), typeof(*e),
+						    in_queue);
+	e->is_ready |= force_ready;
+	fiber_wakeup(e->fiber);
+}
+
+void
+journal_wait_queue(void)
+{
+	struct journal_queue_entry entry = {
+		.fiber = fiber(),
+		.is_ready = false,
+	};
+	rlist_add_tail_entry(&current_journal->waiters, &entry, in_queue);
+	/*
+	 * Will be waken up by either queue emptying or a synchronous write.
+	 */
+	while (journal_queue_is_full(current_journal) && !entry.is_ready)
+		fiber_yield();
+
+	journal_queue_wakeup_next(&entry.in_queue, entry.is_ready);
+	assert(&entry.in_queue == rlist_first(&current_journal->waiters));
+	rlist_del(&entry.in_queue);
+}
diff --git a/src/box/journal.h b/src/box/journal.h
index 5d8d5a726..9c8af062a 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -115,6 +115,25 @@ journal_entry_new(size_t n_rows, struct region *region,
  * synchronous replication.
  */
 struct journal {
+	/** Maximal size of entries enqueued in journal (in bytes). */
+	int64_t queue_max_size;
+	/** Current approximate size of journal queue. */
+	int64_t queue_size;
+	/** Maximal allowed length of journal queue, in entries. */
+	int64_t queue_max_len;
+	/** Current journal queue length. */
+	int64_t queue_len;
+	/**
+	 * The fibers waiting for some space to free in journal queue.
+	 * Once some space is freed they will be waken up in the same order they
+	 * entered the queue.
+	 */
+	struct rlist waiters;
+	/**
+	 * Whether the queue is being woken or not. Used to avoid multiple
+	 * concurrent wake-ups.
+	 */
+	bool queue_is_woken;
 	/** Asynchronous write */
 	int (*write_async)(struct journal *journal,
 			   struct journal_entry *entry);
@@ -124,6 +143,55 @@ struct journal {
 		     struct journal_entry *entry);
 };
 
+/**
+ * Depending on the step of recovery and instance configuration
+ * points at a concrete implementation of the journal.
+ */
+extern struct journal *current_journal;
+
+void
+journal_queue_wakeup_next(struct rlist *link, bool force_ready);
+
+/** Wake the journal queue up. */
+static inline void
+journal_queue_wakeup(struct journal *j, bool force_ready)
+{
+	assert(j == current_journal);
+	assert(!rlist_empty(&j->waiters));
+	if (j->queue_is_woken)
+		return;
+	j->queue_is_woken = true;
+	journal_queue_wakeup_next(&j->waiters, force_ready);
+}
+
+/**
+ * Check whether any of the queue size limits is reached.
+ * If the queue is full, we must wait for some of the entries to be written
+ * before proceeding with a new asynchronous write request.
+ */
+static inline bool
+journal_queue_is_full(struct journal *j)
+{
+	assert(j == current_journal);
+	return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) ||
+	       (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len);
+}
+
+/**
+ * Check whether anyone is waiting for the journal queue to empty. If there are
+ * other waiters we must go after them to preserve write order.
+ */
+static inline bool
+journal_queue_has_waiters(struct journal *j)
+{
+	assert(j == current_journal);
+	return !rlist_empty(&j->waiters);
+}
+
+/** Yield until there's some space in the journal queue. */
+void
+journal_wait_queue(void);
+
 /**
  * Complete asynchronous write.
  */
@@ -131,15 +199,15 @@ static inline void
 journal_async_complete(struct journal_entry *entry)
 {
 	assert(entry->write_async_cb != NULL);
+	current_journal->queue_len--;
+	current_journal->queue_size -= entry->approx_len;
+	assert(current_journal->queue_len >= 0);
+	assert(current_journal->queue_size >= 0);
+	if (journal_queue_has_waiters(current_journal))
+		journal_queue_wakeup(current_journal, false);
 	entry->write_async_cb(entry);
 }
 
-/**
- * Depending on the step of recovery and instance configuration
- * points at a concrete implementation of the journal.
- */
-extern struct journal *current_journal;
-
 /**
  * Write a single entry to the journal in synchronous way.
  *
@@ -148,6 +216,17 @@ extern struct journal *current_journal;
 static inline int
 journal_write(struct journal_entry *entry)
 {
+	if (journal_queue_has_waiters(current_journal)) {
+		/*
+		 * It's a synchronous write, so it's fine to wait a bit more for
+		 * everyone else to be written. They'll wake us up back
+		 * afterwards.
+		 */
+		journal_queue_wakeup(current_journal, true);
+		journal_wait_queue();
+	}
+	current_journal->queue_size += entry->approx_len;
+	current_journal->queue_len += 1;
 	return current_journal->write(current_journal, entry);
 }
 
@@ -159,6 +238,12 @@ journal_write(struct journal_entry *entry)
 static inline int
 journal_write_async(struct journal_entry *entry)
 {
+	/*
+	 * It's the job of the caller to check whether the queue is full prior
+	 * to submitting the request.
+	 */
+	current_journal->queue_size += entry->approx_len;
+	current_journal->queue_len += 1;
 	return current_journal->write_async(current_journal, entry);
 }
 
@@ -198,6 +283,12 @@ journal_create(struct journal *journal,
 {
 	journal->write_async	= write_async;
 	journal->write		= write;
+	journal->queue_size = 0;
+	journal->queue_max_size = 0;
+	journal->queue_len = 0;
+	journal->queue_max_len = 0;
+	journal->queue_is_woken = false;
+	rlist_create(&journal->waiters);
 }
 
 static inline bool
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 2d3ccbf0e..35f410710 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -164,6 +164,22 @@ lbox_cfg_set_checkpoint_wal_threshold(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_cfg_set_wal_queue_max_size(struct lua_State *L)
+{
+	if (box_set_wal_queue_max_size() != 0)
+		luaT_error(L);
+	return 0;
+}
+
+static int
+lbox_cfg_set_wal_queue_max_len(struct lua_State *L)
+{
+	if (box_set_wal_queue_max_len() != 0)
+		luaT_error(L);
+	return 0;
+}
+
 static int
 lbox_cfg_set_read_only(struct lua_State *L)
 {
@@ -399,6 +415,8 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count},
 		{"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval},
 		{"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold},
+		{"cfg_set_wal_queue_max_size", lbox_cfg_set_wal_queue_max_size},
+		{"cfg_set_wal_queue_max_len", lbox_cfg_set_wal_queue_max_len},
 		{"cfg_set_read_only", lbox_cfg_set_read_only},
 		{"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory},
 		{"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size},
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 574c8bef4..c11a9e103 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -71,6 +71,8 @@ local default_cfg = {
     wal_mode            = "write",
     wal_max_size        = 256 * 1024 * 1024,
     wal_dir_rescan_delay= 2,
+    wal_queue_max_size  = 0,
+    wal_queue_max_len   = 0,
     force_recovery      = false,
     replication         = nil,
     instance_uuid       = nil,
@@ -163,6 +165,8 @@ local template_cfg = {
     coredump            = 'boolean',
     checkpoint_interval = 'number',
     checkpoint_wal_threshold = 'number',
+    wal_queue_max_size  = 'number',
+    wal_queue_max_len   = 'number',
     checkpoint_count    = 'number',
     read_only           = 'boolean',
     hot_standby         = 'boolean',
@@ -277,6 +281,8 @@ local dynamic_cfg = {
     checkpoint_count        = private.cfg_set_checkpoint_count,
     checkpoint_interval     = private.cfg_set_checkpoint_interval,
     checkpoint_wal_threshold = private.cfg_set_checkpoint_wal_threshold,
+    wal_queue_max_size      = private.cfg_set_wal_queue_max_size,
+    wal_queue_max_len       = private.cfg_set_wal_queue_max_len,
     worker_pool_threads     = private.cfg_set_worker_pool_threads,
     feedback_enabled        = ifdef_feedback_set_params,
     feedback_crashinfo      = ifdef_feedback_set_params,
diff --git a/src/box/wal.c b/src/box/wal.c
index 937d47ba9..9fff4220a 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -765,6 +765,24 @@ wal_set_checkpoint_threshold(int64_t threshold)
 	fiber_set_cancellable(cancellable);
 }
 
+void
+wal_set_queue_max_size(int64_t size)
+{
+	struct journal *base = &wal_writer_singleton.base;
+	base->queue_max_size = size;
+	if (journal_queue_has_waiters(base) && !journal_queue_is_full(base))
+		journal_queue_wakeup(base, false);
+}
+
+void
+wal_set_queue_max_len(int64_t len)
+{
+	struct journal *base = &wal_writer_singleton.base;
+	base->queue_max_len = len;
+	if (journal_queue_has_waiters(base) && !journal_queue_is_full(base))
+		journal_queue_wakeup(base, false);
+}
+
 struct wal_gc_msg
 {
 	struct cbus_call_msg base;
diff --git a/src/box/wal.h b/src/box/wal.h
index ca43dc6eb..1db32f66f 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -237,6 +237,20 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint);
 void
 wal_set_checkpoint_threshold(int64_t threshold);
 
+/**
+ * Set the pending write limit in bytes. Once the limit is reached, new
+ * writes are blocked until some previous writes succeed.
+ */
+void
+wal_set_queue_max_size(int64_t size);
+
+/**
+ * Set the pending write limit in journal entries. Once the limit is reached,
+ * new writes are blocked until some previous writes succeeed.
+ */
+void
+wal_set_queue_max_len(int64_t len);
+
 /**
  * Remove WAL files that are not needed by consumers reading
  * rows at @vclock or newer.
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 16c5b01d2..7a224e50e 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -56,6 +56,8 @@ wal_dir:.
 wal_dir_rescan_delay:2
 wal_max_size:268435456
 wal_mode:write
+wal_queue_max_len:0
+wal_queue_max_size:0
 worker_pool_threads:4
 --
 -- Test insert from detached fiber
diff --git a/test/box-tap/cfg.test.lua b/test/box-tap/cfg.test.lua
index a577f023d..3276ddf64 100755
--- a/test/box-tap/cfg.test.lua
+++ b/test/box-tap/cfg.test.lua
@@ -6,7 +6,7 @@ local socket = require('socket')
 local fio = require('fio')
 local uuid = require('uuid')
 local msgpack = require('msgpack')
-test:plan(108)
+test:plan(110)
 
 --------------------------------------------------------------------------------
 -- Invalid values
@@ -49,6 +49,8 @@ invalid('vinyl_run_count_per_level', 0)
 invalid('vinyl_run_size_ratio', 1)
 invalid('vinyl_bloom_fpr', 0)
 invalid('vinyl_bloom_fpr', 1.1)
+invalid('wal_queue_max_size', -1)
+invalid('wal_queue_max_len', -1)
 
 local function invalid_combinations(name, val)
     local status, result = pcall(box.cfg, val)
diff --git a/test/box/admin.result b/test/box/admin.result
index 05debe673..c818f4f9f 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -133,6 +133,10 @@ cfg_filter(box.cfg)
     - 268435456
   - - wal_mode
     - write
+  - - wal_queue_max_len
+    - 0
+  - - wal_queue_max_size
+    - 0
   - - worker_pool_threads
     - 4
 ...
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 22a720c2c..19f322e7d 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -121,6 +121,10 @@ cfg_filter(box.cfg)
  |     - 268435456
  |   - - wal_mode
  |     - write
+ |   - - wal_queue_max_len
+ |     - 0
+ |   - - wal_queue_max_size
+ |     - 0
  |   - - worker_pool_threads
  |     - 4
  | ...
@@ -236,6 +240,10 @@ cfg_filter(box.cfg)
  |     - 268435456
  |   - - wal_mode
  |     - write
+ |   - - wal_queue_max_len
+ |     - 0
+ |   - - wal_queue_max_size
+ |     - 0
  |   - - worker_pool_threads
  |     - 4
  | ...
diff --git a/test/replication/gh-5536-wal-limit.result b/test/replication/gh-5536-wal-limit.result
new file mode 100644
index 000000000..f7799baa8
--- /dev/null
+++ b/test/replication/gh-5536-wal-limit.result
@@ -0,0 +1,132 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+
+--
+-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so
+-- that appliers stop reading new transactions from master once the queue is
+-- full.
+--
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+_ = box.schema.space.create('test')
+ | ---
+ | ...
+_ = box.space.test:create_index('pk')
+ | ---
+ | ...
+
+replication_timeout = box.cfg.replication_timeout
+ | ---
+ | ...
+box.cfg{replication_timeout=1000}
+ | ---
+ | ...
+
+test_run:cmd('create server replica with rpl_master=default,\
+              script="replication/replica.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica with wait=True, wait_load=True')
+ | ---
+ | - true
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+-- Huge replication timeout to not cause reconnects while applier is blocked.
+-- Tiny queue size (in bytes) to allow exactly one queue entry at a time.
+box.cfg{wal_queue_max_size=1, replication_timeout=1000}
+ | ---
+ | ...
+write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT")
+ | ---
+ | ...
+-- Block WAL writes so that we may test queue overflow.
+box.error.injection.set("ERRINJ_WAL_DELAY", true)
+ | ---
+ | - ok
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+for i = 1,10 do box.space.test:insert{i} end
+ | ---
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+-- Wait for replication. Cannot rely on lsn bump here. It won't happen while
+-- WAL is blocked.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\
+end)
+ | ---
+ | - true
+ | ...
+require('fiber').sleep(0.5)
+ | ---
+ | ...
+-- Only one entry fits when the limit is small.
+assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1)
+ | ---
+ | - true
+ | ...
+box.error.injection.set("ERRINJ_WAL_DELAY", false)
+ | ---
+ | - ok
+ | ...
+
+-- Once the block is removed everything is written.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\
+end)
+ | ---
+ | - true
+ | ...
+assert(box.space.test:count() == 10)
+ | ---
+ | - true
+ | ...
+assert(box.info.replication[1].upstream.status == 'follow')
+ | ---
+ | - true
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+-- Cleanup.
+box.cfg{replication_timeout=replication_timeout}
+ | ---
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+box.space.test:drop()
+ | ---
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
+
diff --git a/test/replication/gh-5536-wal-limit.test.lua b/test/replication/gh-5536-wal-limit.test.lua
new file mode 100644
index 000000000..1e7d61ff7
--- /dev/null
+++ b/test/replication/gh-5536-wal-limit.test.lua
@@ -0,0 +1,58 @@
+test_run = require('test_run').new()
+fiber = require('fiber')
+
+--
+-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so
+-- that appliers stop reading new transactions from master once the queue is
+-- full.
+--
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('test')
+_ = box.space.test:create_index('pk')
+
+replication_timeout = box.cfg.replication_timeout
+box.cfg{replication_timeout=1000}
+
+test_run:cmd('create server replica with rpl_master=default,\
+              script="replication/replica.lua"')
+test_run:cmd('start server replica with wait=True, wait_load=True')
+
+test_run:switch('replica')
+-- Huge replication timeout to not cause reconnects while applier is blocked.
+-- Tiny queue size (in bytes) to allow exactly one queue entry at a time.
+box.cfg{wal_queue_max_size=1, replication_timeout=1000}
+write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT")
+-- Block WAL writes so that we may test queue overflow.
+box.error.injection.set("ERRINJ_WAL_DELAY", true)
+
+test_run:switch('default')
+
+for i = 1,10 do box.space.test:insert{i} end
+
+test_run:switch('replica')
+-- Wait for replication. Cannot rely on lsn bump here. It won't happen while
+-- WAL is blocked.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\
+end)
+require('fiber').sleep(0.5)
+-- Only one entry fits when the limit is small.
+assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1)
+box.error.injection.set("ERRINJ_WAL_DELAY", false)
+
+-- Once the block is removed everything is written.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\
+end)
+assert(box.space.test:count() == 10)
+assert(box.info.replication[1].upstream.status == 'follow')
+
+test_run:switch('default')
+
+-- Cleanup.
+box.cfg{replication_timeout=replication_timeout}
+test_run:cmd('stop server replica')
+test_run:cmd('delete server replica')
+box.space.test:drop()
+box.schema.user.revoke('guest', 'replication')
+
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index c80430afc..7e7004592 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -37,6 +37,7 @@
     "gh-4928-tx-boundaries.test.lua": {},
     "gh-5440-qsync-ro.test.lua": {},
     "gh-5435-qsync-clear-synchro-queue-commit-all.test.lua": {},
+    "gh-5536-wal-limit.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index e4812df37..89abfabff 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
 script =  master.lua
 description = tarantool/box, replication
 disabled = consistent.test.lua
-release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua
+release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua
 config = suite.cfg
 lua_libs = lua/fast_replica.lua lua/rlimit.lua
 use_unix_sockets = True
-- 
2.24.3 (Apple Git-128)


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes
  2021-02-11 12:17 [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes Serge Petrenko via Tarantool-patches
@ 2021-02-15 11:17 ` Cyrill Gorcunov via Tarantool-patches
  2021-02-16 12:47   ` Serge Petrenko via Tarantool-patches
  2021-02-17 20:46 ` Vladislav Shpilevoy via Tarantool-patches
  1 sibling, 1 reply; 9+ messages in thread
From: Cyrill Gorcunov via Tarantool-patches @ 2021-02-15 11:17 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: v.shpilevoy, tarantool-patches

On Thu, Feb 11, 2021 at 03:17:50PM +0300, Serge Petrenko wrote:
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 553db76fc..06aaa0a79 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>  		goto success;
>  	}
>  
> +	/*
> +	 * Do not spam WAL with excess write requests, let it process what's
> +	 * piled up first.
> +	 * This is done before opening the transaction to avoid problems with
> +	 * yielding inside it.
> +	 */
> +	if (journal_queue_is_full(current_journal))
> +		journal_wait_queue();
> +

Serge, just a few comments, feel free to ignore.

Maybe it would be better to pass current_journal to journal_wait_queue,
otherwise it looks somehow inconsistent, no?

	if (journal_queue_is_full(current_journal))
		journal_wait_queue(current_journal);

Actually I would name journal queue engine as plain journalq, this would
look like

	if (journalq_is_full(current_journal))
		journalq_wait(current_journal);

But it is very humble pov, lets stick with long `journal_queue`.
...
> +/** Wake the journal queue up. */
> +static inline void
> +journal_queue_wakeup(struct journal *j, bool force_ready)
> +{
> +	assert(j == current_journal);

Seems this assert is not needed. The overall idea of passing
journal as an argument is quite the reverse, ie to work with
any journal. This is not a blocker, could be cleaned up on top
or simply ignored.

> +	assert(!rlist_empty(&j->waiters));
> +	if (j->queue_is_woken)
> +		return;
> +	j->queue_is_woken = true;
> +	journal_queue_wakeup_next(&j->waiters, force_ready);
> +}
> +
> +/**
> + * Check whether any of the queue size limits is reached.
> + * If the queue is full, we must wait for some of the entries to be written
> + * before proceeding with a new asynchronous write request.
> + */
> +static inline bool
> +journal_queue_is_full(struct journal *j)
> +{
> +	assert(j == current_journal);

same, no need for assert()

> +	return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) ||
> +	       (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len);
> +}
> +
> +/**
> + * Check whether anyone is waiting for the journal queue to empty. If there are
> + * other waiters we must go after them to preserve write order.
> + */
> +static inline bool
> +journal_queue_has_waiters(struct journal *j)
> +{
> +	assert(j == current_journal);

same, no need for assert()

> +	return !rlist_empty(&j->waiters);
> +}
> +
> +/** Yield until there's some space in the journal queue. */
> +void
> +journal_wait_queue(void);
> +
>  /**
>   * Complete asynchronous write.
>   */
> @@ -131,15 +199,15 @@ static inline void
>  journal_async_complete(struct journal_entry *entry)
>  {
>  	assert(entry->write_async_cb != NULL);
> +	current_journal->queue_len--;
> +	current_journal->queue_size -= entry->approx_len;

Myabe worth to make queue ops closed into some helper? Because
length and size can't be updated without a tangle. IOW, something
like

static inline void
journal_queue_attr_dec(struct journal *j, struct journal_entry *entry)
{
	j->queue_len--;
	j->queue_size -= entry->approx_len;
}

static inline void
journal_queue_attr_inc(struct journal *j, struct journal_entry *entry)
{
	j->queue_len++;
	j->queue_size += entry->approx_len;
}

Again, this is my pov, **free to ignore**. attr here stands for
attributes because queue_len and queue_size are not the queue
itself but attributes which controls when we need to wait
data to be flushed.

> +	assert(current_journal->queue_len >= 0);
> +	assert(current_journal->queue_size >= 0);
> +	if (journal_queue_has_waiters(current_journal))
> +		journal_queue_wakeup(current_journal, false);
>  	entry->write_async_cb(entry);
>  }

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes
  2021-02-15 11:17 ` Cyrill Gorcunov via Tarantool-patches
@ 2021-02-16 12:47   ` Serge Petrenko via Tarantool-patches
  2021-02-16 12:49     ` Cyrill Gorcunov via Tarantool-patches
  0 siblings, 1 reply; 9+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-02-16 12:47 UTC (permalink / raw)
  To: Cyrill Gorcunov; +Cc: v.shpilevoy, tarantool-patches



15.02.2021 14:17, Cyrill Gorcunov пишет:
> On Thu, Feb 11, 2021 at 03:17:50PM +0300, Serge Petrenko wrote:
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 553db76fc..06aaa0a79 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>>   		goto success;
>>   	}
>>   
>> +	/*
>> +	 * Do not spam WAL with excess write requests, let it process what's
>> +	 * piled up first.
>> +	 * This is done before opening the transaction to avoid problems with
>> +	 * yielding inside it.
>> +	 */
>> +	if (journal_queue_is_full(current_journal))
>> +		journal_wait_queue();
>> +
> Serge, just a few comments, feel free to ignore.

Hi! Thanks for the review!

>
> Maybe it would be better to pass current_journal to journal_wait_queue,
> otherwise it looks somehow inconsistent, no?
>
> 	if (journal_queue_is_full(current_journal))
> 		journal_wait_queue(current_journal);

I tried to remove parameters from almost all methods, as dicsussed in chat.

>
> Actually I would name journal queue engine as plain journalq, this would
> look like
>
> 	if (journalq_is_full(current_journal))
> 		journalq_wait(current_journal);
>
> But it is very humble pov, lets stick with long `journal_queue`.
> ...

To be honest, I like `journal_queue_` prefix more.

>> +/** Wake the journal queue up. */
>> +static inline void
>> +journal_queue_wakeup(struct journal *j, bool force_ready)
>> +{
>> +	assert(j == current_journal);
> Seems this assert is not needed. The overall idea of passing
> journal as an argument is quite the reverse, ie to work with
> any journal. This is not a blocker, could be cleaned up on top
> or simply ignored.

Same as above, discussed verbally.

>
>> +	assert(!rlist_empty(&j->waiters));
>> +	if (j->queue_is_woken)
>> +		return;
>> +	j->queue_is_woken = true;
>> +	journal_queue_wakeup_next(&j->waiters, force_ready);
>> +}
>> +
>> +/**
>> + * Check whether any of the queue size limits is reached.
>> + * If the queue is full, we must wait for some of the entries to be written
>> + * before proceeding with a new asynchronous write request.
>> + */
>> +static inline bool
>> +journal_queue_is_full(struct journal *j)
>> +{
>> +	assert(j == current_journal);
> same, no need for assert()
>
>> +	return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) ||
>> +	       (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len);
>> +}
>> +
>> +/**
>> + * Check whether anyone is waiting for the journal queue to empty. If there are
>> + * other waiters we must go after them to preserve write order.
>> + */
>> +static inline bool
>> +journal_queue_has_waiters(struct journal *j)
>> +{
>> +	assert(j == current_journal);
> same, no need for assert()
>
>> +	return !rlist_empty(&j->waiters);
>> +}
>> +
>> +/** Yield until there's some space in the journal queue. */
>> +void
>> +journal_wait_queue(void);
>> +
>>   /**
>>    * Complete asynchronous write.
>>    */
>> @@ -131,15 +199,15 @@ static inline void
>>   journal_async_complete(struct journal_entry *entry)
>>   {
>>   	assert(entry->write_async_cb != NULL);
>> +	current_journal->queue_len--;
>> +	current_journal->queue_size -= entry->approx_len;
> Myabe worth to make queue ops closed into some helper? Because
> length and size can't be updated without a tangle. IOW, something
> like
>
> static inline void
> journal_queue_attr_dec(struct journal *j, struct journal_entry *entry)
> {
> 	j->queue_len--;
> 	j->queue_size -= entry->approx_len;
> }
>
> static inline void
> journal_queue_attr_inc(struct journal *j, struct journal_entry *entry)
> {
> 	j->queue_len++;
> 	j->queue_size += entry->approx_len;
> }
>
> Again, this is my pov, **free to ignore**. attr here stands for
> attributes because queue_len and queue_size are not the queue
> itself but attributes which controls when we need to wait
> data to be flushed.

Ok, sure. Introduced
`journal_queue_on_append(struct journal_entry *entry)`
and
`journal_queue_on_complete(struct journal_entry *entry)`

>
>> +	assert(current_journal->queue_len >= 0);
>> +	assert(current_journal->queue_size >= 0);
>> +	if (journal_queue_has_waiters(current_journal))
>> +		journal_queue_wakeup(current_journal, false);
>>   	entry->write_async_cb(entry);
>>   }

Here's an incremental diff. It's all pure refactoring with no functional 
changes.
I've intdouced `journal_queue_on_append` and `journal_queue_on_complete` 
for increasing and
decreasing queue length and size, and tried to remove `struct journal` 
parameter from almost
every new method, except `journal_queue_set_max_size` and 
`journal_queue_set_max_len`

====================================================

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 06aaa0a79..7c2452d2b 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -973,7 +973,7 @@ applier_apply_tx(struct applier *applier, struct 
stailq *rows)
       * This is done before opening the transaction to avoid problems with
       * yielding inside it.
       */
-    if (journal_queue_is_full(current_journal))
+    if (journal_queue_is_full())
          journal_wait_queue();

      /**
diff --git a/src/box/journal.c b/src/box/journal.c
index 19a184580..49441e596 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -68,7 +68,7 @@ struct journal_queue_entry {
  /**
   * Wake up the next waiter in journal queue.
   */
-void
+static inline void
  journal_queue_wakeup_next(struct rlist *link, bool force_ready)
  {
      /* Empty queue or last entry in queue. */
@@ -80,16 +80,26 @@ journal_queue_wakeup_next(struct rlist *link, bool 
force_ready)
       * When the queue isn't forcefully emptied, no need to wake everyone
       * else up until there's some free space.
       */
-    if (journal_queue_is_full(current_journal) && !force_ready) {
+    if (!force_ready && journal_queue_is_full()) {
          current_journal->queue_is_woken = false;
          return;
      }
      struct journal_queue_entry *e = rlist_entry(rlist_next(link), 
typeof(*e),
                              in_queue);
-    e->is_ready |= force_ready;
+    e->is_ready = force_ready;
      fiber_wakeup(e->fiber);
  }

+void
+journal_queue_wakeup(bool force_ready)
+{
+    assert(!rlist_empty(&current_journal->waiters));
+    if (current_journal->queue_is_woken)
+        return;
+    current_journal->queue_is_woken = true;
+    journal_queue_wakeup_next(&current_journal->waiters, force_ready);
+}
+
  void
  journal_wait_queue(void)
  {
@@ -101,7 +111,7 @@ journal_wait_queue(void)
      /*
       * Will be waken up by either queue emptying or a synchronous write.
       */
-    while (journal_queue_is_full(current_journal) && !entry.is_ready)
+    while (journal_queue_is_full() && !entry.is_ready)
          fiber_yield();

      journal_queue_wakeup_next(&entry.in_queue, entry.is_ready);
diff --git a/src/box/journal.h b/src/box/journal.h
index 9c8af062a..d295dfa4b 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -149,20 +149,9 @@ struct journal {
   */
  extern struct journal *current_journal;

-void
-journal_queue_wakeup_next(struct rlist *link, bool force_ready);
-
  /** Wake the journal queue up. */
-static inline void
-journal_queue_wakeup(struct journal *j, bool force_ready)
-{
-    assert(j == current_journal);
-    assert(!rlist_empty(&j->waiters));
-    if (j->queue_is_woken)
-        return;
-    j->queue_is_woken = true;
-    journal_queue_wakeup_next(&j->waiters, force_ready);
-}
+void
+journal_queue_wakeup(bool force_ready);

  /**
   * Check whether any of the queue size limits is reached.
@@ -170,9 +159,9 @@ journal_queue_wakeup(struct journal *j, bool 
force_ready)
   * before proceeding with a new asynchronous write request.
   */
  static inline bool
-journal_queue_is_full(struct journal *j)
+journal_queue_is_full(void)
  {
-    assert(j == current_journal);
+    struct journal *j = current_journal;
      return (j->queue_max_size != 0 && j->queue_size >= 
j->queue_max_size) ||
             (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len);
  }
@@ -182,16 +171,53 @@ journal_queue_is_full(struct journal *j)
   * other waiters we must go after them to preserve write order.
   */
  static inline bool
-journal_queue_has_waiters(struct journal *j)
+journal_queue_has_waiters(void)
  {
-    assert(j == current_journal);
-    return !rlist_empty(&j->waiters);
+    return !rlist_empty(&current_journal->waiters);
  }

  /** Yield until there's some space in the journal queue. */
  void
  journal_wait_queue(void);

+/** Set maximal journal queue size in bytes. */
+static inline void
+journal_queue_set_max_size(struct journal *j, int64_t size)
+{
+    assert(j == current_journal);
+    j->queue_max_size = size;
+    if (journal_queue_has_waiters() && !journal_queue_is_full())
+        journal_queue_wakeup(false);
+}
+
+/** Set maximal journal queue length, in entries. */
+static inline void
+journal_queue_set_max_len(struct journal *j, int64_t len)
+{
+    assert(j == current_journal);
+    j->queue_max_len = len;
+    if (journal_queue_has_waiters() && !journal_queue_is_full())
+        journal_queue_wakeup(false);
+}
+
+/** Increase queue size on a new write request. */
+static inline void
+journal_queue_on_append(struct journal_entry *entry)
+{
+    current_journal->queue_len++;
+    current_journal->queue_size += entry->approx_len;
+}
+
+/** Decrease queue size once write request is complete. */
+static inline void
+journal_queue_on_complete(struct journal_entry *entry)
+{
+    current_journal->queue_len--;
+    current_journal->queue_size -= entry->approx_len;
+    assert(current_journal->queue_len >= 0);
+    assert(current_journal->queue_size >= 0);
+}
+
  /**
   * Complete asynchronous write.
   */
@@ -199,12 +225,11 @@ static inline void
  journal_async_complete(struct journal_entry *entry)
  {
      assert(entry->write_async_cb != NULL);
-    current_journal->queue_len--;
-    current_journal->queue_size -= entry->approx_len;
-    assert(current_journal->queue_len >= 0);
-    assert(current_journal->queue_size >= 0);
-    if (journal_queue_has_waiters(current_journal))
-        journal_queue_wakeup(current_journal, false);
+
+    journal_queue_on_complete(entry);
+    if (journal_queue_has_waiters() && !journal_queue_is_full())
+        journal_queue_wakeup(false);
+
      entry->write_async_cb(entry);
  }

@@ -216,17 +241,18 @@ journal_async_complete(struct journal_entry *entry)
  static inline int
  journal_write(struct journal_entry *entry)
  {
-    if (journal_queue_has_waiters(current_journal)) {
+    if (journal_queue_has_waiters()) {
          /*
           * It's a synchronous write, so it's fine to wait a bit more for
           * everyone else to be written. They'll wake us up back
           * afterwards.
           */
-        journal_queue_wakeup(current_journal, true);
+        journal_queue_wakeup(true);
          journal_wait_queue();
      }
-    current_journal->queue_size += entry->approx_len;
-    current_journal->queue_len += 1;
+
+    journal_queue_on_append(entry);
+
      return current_journal->write(current_journal, entry);
  }

@@ -242,8 +268,8 @@ journal_write_async(struct journal_entry *entry)
       * It's the job of the caller to check whether the queue is full prior
       * to submitting the request.
       */
-    current_journal->queue_size += entry->approx_len;
-    current_journal->queue_len += 1;
+    journal_queue_on_append(entry);
+
      return current_journal->write_async(current_journal, entry);
  }

diff --git a/src/box/wal.c b/src/box/wal.c
index 9fff4220a..5bc7a0685 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -768,19 +768,13 @@ wal_set_checkpoint_threshold(int64_t threshold)
  void
  wal_set_queue_max_size(int64_t size)
  {
-    struct journal *base = &wal_writer_singleton.base;
-    base->queue_max_size = size;
-    if (journal_queue_has_waiters(base) && !journal_queue_is_full(base))
-        journal_queue_wakeup(base, false);
+    journal_queue_set_max_size(&wal_writer_singleton.base, size);
  }

  void
  wal_set_queue_max_len(int64_t len)
  {
-    struct journal *base = &wal_writer_singleton.base;
-    base->queue_max_len = len;
-    if (journal_queue_has_waiters(base) && !journal_queue_is_full(base))
-        journal_queue_wakeup(base, false);
+    journal_queue_set_max_len(&wal_writer_singleton.base, len);
  }

  struct wal_gc_msg

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes
  2021-02-16 12:47   ` Serge Petrenko via Tarantool-patches
@ 2021-02-16 12:49     ` Cyrill Gorcunov via Tarantool-patches
  0 siblings, 0 replies; 9+ messages in thread
From: Cyrill Gorcunov via Tarantool-patches @ 2021-02-16 12:49 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: v.shpilevoy, tarantool-patches

On Tue, Feb 16, 2021 at 03:47:09PM +0300, Serge Petrenko wrote:
> 
> Here's an incremental diff. It's all pure refactoring with no functional
> changes.
> I've intdouced `journal_queue_on_append` and `journal_queue_on_complete` for
> increasing and
> decreasing queue length and size, and tried to remove `struct journal`
> parameter from almost
> every new method, except `journal_queue_set_max_size` and
> `journal_queue_set_max_len`

Great! Lets give this patch some time to spin around maybe
we gather more comments. Looks OK to me.

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes
  2021-02-11 12:17 [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes Serge Petrenko via Tarantool-patches
  2021-02-15 11:17 ` Cyrill Gorcunov via Tarantool-patches
@ 2021-02-17 20:46 ` Vladislav Shpilevoy via Tarantool-patches
  2021-02-18 20:06   ` Serge Petrenko via Tarantool-patches
  1 sibling, 1 reply; 9+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-02-17 20:46 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

Hi! Thanks for the patch!

Now looks cool indeed.

Another raw idea on which I don't insist and not even sure it is
good. But just came to my mind: how about making a separate
object called 'journal_queue'? Or 'journal_ctl'?  Which is global
and is not inside of one journal. It can't be changed to another
queue/ctl, and is used by journal API.

So we wouldn't need to worry if we configured the correct journal
because now current_journal can change at runtime, but this ctl
thing - can't.

Another option - call this thing 'journal', and rename the old
'journal' to 'journal_storage' or 'journal_api' or 'journal_vtab'
or something like this.

Another option - ignore this, since it does not matter much. But
just in case you would want to try to fit the solution into one
of these ideas.

See 8 comments below.

> diff --git a/src/box/journal.c b/src/box/journal.c
> index cb320b557..49441e596 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -55,3 +55,66 @@ journal_entry_new(size_t n_rows, struct region *region,
>  			     complete_data);
>  	return entry;
>  }
> +
> +struct journal_queue_entry {
> +	/** The fiber waiting for queue space to free. */
> +	struct fiber *fiber;
> +	/** Whether the fiber should be waken up regardless of queue size. */
> +	bool is_ready;
> +	/** A link in all waiting fibers list. */
> +	struct rlist in_queue;
> +};
> +
> +/**
> + * Wake up the next waiter in journal queue.
> + */
> +static inline void
> +journal_queue_wakeup_next(struct rlist *link, bool force_ready)

1. The flag is known in all usage places at compilation time. Is it
possible to split the function into force/normal versions? The same
for journal_queue_wakeup() from which this runtime uncertainty arises.

Also it is worth adding a comment why is force mode even needed.

> +{
> +	/* Empty queue or last entry in queue. */
> +	if (link == rlist_last(&current_journal->waiters)) {

2. I am not sure I understand what is happening here. Why is this
function in one place called with the pointer at the list itself,
and in another place with the pointer at one element?

> +		current_journal->queue_is_woken = false;
> +		return;
> +	}
> +	/*
> +	 * When the queue isn't forcefully emptied, no need to wake everyone
> +	 * else up until there's some free space.
> +	 */
> +	if (!force_ready && journal_queue_is_full()) {
> +		current_journal->queue_is_woken = false;

3. Maybe woken -> awake?

4. Why do you need the flag? Can you just remove the awake entries
from the queue right away? Then it wouldn't even be possible to make
a double wakeup. See comment 5.

> +		return;
> +	}
> +	struct journal_queue_entry *e = rlist_entry(rlist_next(link), typeof(*e),
> +						    in_queue);
> +	e->is_ready = force_ready;
> +	fiber_wakeup(e->fiber);
> +}
> +
> +void
> +journal_queue_wakeup(bool force_ready)
> +{
> +	assert(!rlist_empty(&current_journal->waiters));
> +	if (current_journal->queue_is_woken)
> +		return;
> +	current_journal->queue_is_woken = true;
> +	journal_queue_wakeup_next(&current_journal->waiters, force_ready);
> +}
> +
> +void
> +journal_wait_queue(void)
> +{
> +	struct journal_queue_entry entry = {
> +		.fiber = fiber(),
> +		.is_ready = false,
> +	};
> +	rlist_add_tail_entry(&current_journal->waiters, &entry, in_queue);
> +	/*
> +	 * Will be waken up by either queue emptying or a synchronous write.
> +	 */
> +	while (journal_queue_is_full() && !entry.is_ready)
> +		fiber_yield();
> +
> +	journal_queue_wakeup_next(&entry.in_queue, entry.is_ready);
> +	assert(&entry.in_queue == rlist_first(&current_journal->waiters));
> +	rlist_del(&entry.in_queue);

5. Can rlist_del be done along with fiber_wakeup()? Then you
wouldn't need is_woken maybe.

> +}
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 5d8d5a726..d295dfa4b 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -124,6 +143,81 @@ struct journal {
>  		     struct journal_entry *entry);
>  };
>  
> +/**
> + * Depending on the step of recovery and instance configuration
> + * points at a concrete implementation of the journal.
> + */
> +extern struct journal *current_journal;
> +
> +/** Wake the journal queue up. */
> +void
> +journal_queue_wakeup(bool force_ready);
> +
> +/**
> + * Check whether any of the queue size limits is reached.
> + * If the queue is full, we must wait for some of the entries to be written
> + * before proceeding with a new asynchronous write request.
> + */
> +static inline bool
> +journal_queue_is_full(void)
> +{
> +	struct journal *j = current_journal;
> +	return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) ||
> +	       (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len);

6. Seems like a lot of checks. Option 1: make queue_max_size = INT64_MAX
when user passes 0. Then no need to check for != 0. The same for queue_max_len.

Option 2 which may be stupid (but combined with option 1): store a flag
'is_full' and update it when update queue_size and queue_len and see they
exceeded the limit. But I am not sure it reduces number of branches. Didn't
check.

> +}
> +
> +/**
> + * Check whether anyone is waiting for the journal queue to empty. If there are
> + * other waiters we must go after them to preserve write order.
> + */
> +static inline bool
> +journal_queue_has_waiters(void)
> +{
> +	return !rlist_empty(&current_journal->waiters);
> +}
> +
> +/** Yield until there's some space in the journal queue. */
> +void
> +journal_wait_queue(void);
> +
> +/** Set maximal journal queue size in bytes. */
> +static inline void
> +journal_queue_set_max_size(struct journal *j, int64_t size)

7. Why do we have journal parameter here, but don't have it in
the other functions? The same journal_queue_set_max_len.

> +{
> +	assert(j == current_journal);
> +	j->queue_max_size = size;
> +	if (journal_queue_has_waiters() && !journal_queue_is_full())
> +		journal_queue_wakeup(false);
> +}
> @@ -159,6 +264,12 @@ journal_write(struct journal_entry *entry)
>  static inline int
>  journal_write_async(struct journal_entry *entry)
>  {
> +	/*
> +	 * It's the job of the caller to check whether the queue is full prior
> +	 * to submitting the request.

8. Maybe add an assert though.

> +	 */
> +	journal_queue_on_append(entry);
> +
>  	return current_journal->write_async(current_journal, entry);
>  }


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes
  2021-02-17 20:46 ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-02-18 20:06   ` Serge Petrenko via Tarantool-patches
  2021-02-23 22:19     ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 1 reply; 9+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-02-18 20:06 UTC (permalink / raw)
  To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches



17.02.2021 23:46, Vladislav Shpilevoy пишет:
> Hi! Thanks for the patch!

Thanks for the review! Please find my answers inline and the
incremental diff below.

> Now looks cool indeed.
>
> Another raw idea on which I don't insist and not even sure it is
> good. But just came to my mind: how about making a separate
> object called 'journal_queue'? Or 'journal_ctl'?  Which is global
> and is not inside of one journal. It can't be changed to another
> queue/ctl, and is used by journal API.
>
> So we wouldn't need to worry if we configured the correct journal
> because now current_journal can change at runtime, but this ctl
> thing - can't.

Yes, this'd fix the problem which bothers me: whether we configure the
correct queue.
I don't want to do this TBH, looks like it's too complex for what
it's trying to achieve.

> Another option - call this thing 'journal', and rename the old
> 'journal' to 'journal_storage' or 'journal_api' or 'journal_vtab'
> or something like this.
>
> Another option - ignore this, since it does not matter much. But
> just in case you would want to try to fit the solution into one
> of these ideas.
>
> See 8 comments below.
>
>> diff --git a/src/box/journal.c b/src/box/journal.c
>> index cb320b557..49441e596 100644
>> --- a/src/box/journal.c
>> +++ b/src/box/journal.c
>> @@ -55,3 +55,66 @@ journal_entry_new(size_t n_rows, struct region *region,
>>   			     complete_data);
>>   	return entry;
>>   }
>> +
>> +struct journal_queue_entry {
>> +	/** The fiber waiting for queue space to free. */
>> +	struct fiber *fiber;
>> +	/** Whether the fiber should be waken up regardless of queue size. */
>> +	bool is_ready;
>> +	/** A link in all waiting fibers list. */
>> +	struct rlist in_queue;
>> +};
>> +
>> +/**
>> + * Wake up the next waiter in journal queue.
>> + */
>> +static inline void
>> +journal_queue_wakeup_next(struct rlist *link, bool force_ready)
> 1. The flag is known in all usage places at compilation time. Is it
> possible to split the function into force/normal versions? The same
> for journal_queue_wakeup() from which this runtime uncertainty arises.

Actually, the parameter is not known at compile time when wakeup_next()
is called from journal_wait_queue(). For now wakeup_next() only has a single
check for force_ready, so moving the check outside would only increase the
number of branches.

journal_queue_wakeup() is called only once per a whole queue wakeup, so
I suppose it doesn't hurt much it has a compile-time known parameter.

> Also it is worth adding a comment why is force mode even needed.

No problem.

>> +{
>> +	/* Empty queue or last entry in queue. */
>> +	if (link == rlist_last(&current_journal->waiters)) {
> 2. I am not sure I understand what is happening here. Why is this
> function in one place called with the pointer at the list itself,
> and in another place with the pointer at one element?

Well, <list head> -> next is the fist list entry, right?
In queue_wakeup() I wake the first waiter up.

Once any waiter gets woken up, it wakes up the next waiter.
Which is <in_queue> -> next.

That's why I have a common helper for these two cases.

>> +		current_journal->queue_is_woken = false;
>> +		return;
>> +	}
>> +	/*
>> +	 * When the queue isn't forcefully emptied, no need to wake everyone
>> +	 * else up until there's some free space.
>> +	 */
>> +	if (!force_ready && journal_queue_is_full()) {
>> +		current_journal->queue_is_woken = false;
> 3. Maybe woken -> awake?

No problem.

> 4. Why do you need the flag? Can you just remove the awake entries
> from the queue right away? Then it wouldn't even be possible to make
> a double wakeup. See comment 5.

I think I can't. Please see answer to comment 5.

>> +		return;
>> +	}
>> +	struct journal_queue_entry *e = rlist_entry(rlist_next(link), typeof(*e),
>> +						    in_queue);
>> +	e->is_ready = force_ready;
>> +	fiber_wakeup(e->fiber);
>> +}
>> +
>> +void
>> +journal_queue_wakeup(bool force_ready)
>> +{
>> +	assert(!rlist_empty(&current_journal->waiters));
>> +	if (current_journal->queue_is_woken)
>> +		return;
>> +	current_journal->queue_is_woken = true;
>> +	journal_queue_wakeup_next(&current_journal->waiters, force_ready);
>> +}
>> +
>> +void
>> +journal_wait_queue(void)
>> +{
>> +	struct journal_queue_entry entry = {
>> +		.fiber = fiber(),
>> +		.is_ready = false,
>> +	};
>> +	rlist_add_tail_entry(&current_journal->waiters, &entry, in_queue);
>> +	/*
>> +	 * Will be waken up by either queue emptying or a synchronous write.
>> +	 */
>> +	while (journal_queue_is_full() && !entry.is_ready)
>> +		fiber_yield();
>> +
>> +	journal_queue_wakeup_next(&entry.in_queue, entry.is_ready);
>> +	assert(&entry.in_queue == rlist_first(&current_journal->waiters));
>> +	rlist_del(&entry.in_queue);
> 5. Can rlist_del be done along with fiber_wakeup()? Then you
> wouldn't need is_woken maybe.

Looks like it can't.
Say we have only one waiter. And remove it from the list on wakeup.
The list would become empty and there'd be no way to check whether
journal has any waiters, and we may reorder the entries (put new ones before
the waiting one). This is not necessarily bad, because I put entries 
into queue
before txn_begin(), but someone may call journal_wait_queue() from 
inside the
transaction, or right before txn_commit(). Then it might be bad to put other
transactions before this one.

So while removing is_woken we would have to introduce queue_has_waiters 
flag for
the sake of this single waiter.



>> +}
>> diff --git a/src/box/journal.h b/src/box/journal.h
>> index 5d8d5a726..d295dfa4b 100644
>> --- a/src/box/journal.h
>> +++ b/src/box/journal.h
>> @@ -124,6 +143,81 @@ struct journal {
>>   		     struct journal_entry *entry);
>>   };
>>   
>> +/**
>> + * Depending on the step of recovery and instance configuration
>> + * points at a concrete implementation of the journal.
>> + */
>> +extern struct journal *current_journal;
>> +
>> +/** Wake the journal queue up. */
>> +void
>> +journal_queue_wakeup(bool force_ready);
>> +
>> +/**
>> + * Check whether any of the queue size limits is reached.
>> + * If the queue is full, we must wait for some of the entries to be written
>> + * before proceeding with a new asynchronous write request.
>> + */
>> +static inline bool
>> +journal_queue_is_full(void)
>> +{
>> +	struct journal *j = current_journal;
>> +	return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) ||
>> +	       (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len);
> 6. Seems like a lot of checks. Option 1: make queue_max_size = INT64_MAX
> when user passes 0. Then no need to check for != 0. The same for queue_max_len.

Sounds good, thanks for the suggestion!

> Option 2 which may be stupid (but combined with option 1): store a flag
> 'is_full' and update it when update queue_size and queue_len and see they
> exceeded the limit. But I am not sure it reduces number of branches. Didn't
> check.

Then we'd evaluate is_full() every journal_confirm() and journal_write: 
for both sync and async writes,
which happens more often than the actual check is needed (only for async 
writes).
I think it's better to calculate is_full on demand rather than every 
time it might change.

>> +}
>> +
>> +/**
>> + * Check whether anyone is waiting for the journal queue to empty. If there are
>> + * other waiters we must go after them to preserve write order.
>> + */
>> +static inline bool
>> +journal_queue_has_waiters(void)
>> +{
>> +	return !rlist_empty(&current_journal->waiters);
>> +}
>> +
>> +/** Yield until there's some space in the journal queue. */
>> +void
>> +journal_wait_queue(void);
>> +
>> +/** Set maximal journal queue size in bytes. */
>> +static inline void
>> +journal_queue_set_max_size(struct journal *j, int64_t size)
> 7. Why do we have journal parameter here, but don't have it in
> the other functions? The same journal_queue_set_max_len.

This is my attempt to make sure only wal_writer's journal has a queue.
I explicitly set queue_max_... parameters only for wal_writer's journal.
And then there's an assert that journal_queue_set_...() is only called with
the current journal.

>> +{
>> +	assert(j == current_journal);
>> +	j->queue_max_size = size;
>> +	if (journal_queue_has_waiters() && !journal_queue_is_full())
>> +		journal_queue_wakeup(false);
>> +}
>> @@ -159,6 +264,12 @@ journal_write(struct journal_entry *entry)
>>   static inline int
>>   journal_write_async(struct journal_entry *entry)
>>   {
>> +	/*
>> +	 * It's the job of the caller to check whether the queue is full prior
>> +	 * to submitting the request.
> 8. Maybe add an assert though.

I wanted to, but it's impossible.
The queue may be full when all the waiters are forcefully waken up by a
synchronous commit. And it's hard to tell whether it was a "force" wakeup
or not. So let's just hope noone misuses this API.

Or, even better, I can remove is_ready field from queue entries and add 
a new field
to the journal: queue_is_ready or something. And addition to queue_is_awake.
Then every entry will check queue_is_ready instead of entry.is_ready and
it'll be possible to add an assert here: !journal_queue_is_full || 
journal_queue_is_ready
Looks like this'll also allow us to extract queue_wakeup_(next)_force, 
like you suggested
in paragraph 1.
What do you think ?

>> +	 */
>> +	journal_queue_on_append(entry);
>> +
>>   	return current_journal->write_async(current_journal, entry);
>>   }


Incremental diff:

diff --git a/src/box/box.cc b/src/box/box.cc
index 2b335599e..9a3b092d0 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -762,6 +762,9 @@ box_check_wal_queue_max_len(void)
          diag_set(ClientError, ER_CFG, "wal_queue_max_len",
               "wal_queue_max_len must be >= 0");
      }
+    /* Unlimited. */
+    if (len == 0)
+        len = INT64_MAX;
      return len;
  }

@@ -773,6 +776,9 @@ box_check_wal_queue_max_size(void)
          diag_set(ClientError, ER_CFG, "wal_queue_max_size",
               "wal_queue_max_size must be >= 0");
      }
+    /* Unlimited. */
+    if (size == 0)
+        size = INT64_MAX;
      return size;
  }

diff --git a/src/box/journal.c b/src/box/journal.c
index 49441e596..931797faf 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -73,7 +73,7 @@ journal_queue_wakeup_next(struct rlist *link, bool 
force_ready)
  {
      /* Empty queue or last entry in queue. */
      if (link == rlist_last(&current_journal->waiters)) {
-        current_journal->queue_is_woken = false;
+        current_journal->queue_is_awake = false;
          return;
      }
      /*
@@ -81,7 +81,7 @@ journal_queue_wakeup_next(struct rlist *link, bool 
force_ready)
       * else up until there's some free space.
       */
      if (!force_ready && journal_queue_is_full()) {
-        current_journal->queue_is_woken = false;
+        current_journal->queue_is_awake = false;
          return;
      }
      struct journal_queue_entry *e = rlist_entry(rlist_next(link), 
typeof(*e),
@@ -94,9 +94,9 @@ void
  journal_queue_wakeup(bool force_ready)
  {
      assert(!rlist_empty(&current_journal->waiters));
-    if (current_journal->queue_is_woken)
+    if (current_journal->queue_is_awake)
          return;
-    current_journal->queue_is_woken = true;
+    current_journal->queue_is_awake = true;
      journal_queue_wakeup_next(&current_journal->waiters, force_ready);
  }

diff --git a/src/box/journal.h b/src/box/journal.h
index d295dfa4b..2caac4099 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -133,7 +133,7 @@ struct journal {
       * Whether the queue is being woken or not. Used to avoid multiple
       * concurrent wake-ups.
       */
-    bool queue_is_woken;
+    bool queue_is_awake;
      /** Asynchronous write */
      int (*write_async)(struct journal *journal,
                 struct journal_entry *entry);
@@ -149,7 +149,11 @@ struct journal {
   */
  extern struct journal *current_journal;

-/** Wake the journal queue up. */
+/**
+ * Wake the journal queue up.
+ * @param force_ready whether waiters should proceed even if the queue 
is still
+ *                    full.
+ */
  void
  journal_queue_wakeup(bool force_ready);

@@ -162,8 +166,8 @@ static inline bool
  journal_queue_is_full(void)
  {
      struct journal *j = current_journal;
-    return (j->queue_max_size != 0 && j->queue_size >= 
j->queue_max_size) ||
-           (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len);
+    return j->queue_size > j->queue_max_size ||
+           j->queue_len > j->queue_max_len;
  }

  /**
@@ -310,10 +314,10 @@ journal_create(struct journal *journal,
      journal->write_async    = write_async;
      journal->write        = write;
      journal->queue_size = 0;
-    journal->queue_max_size = 0;
+    journal->queue_max_size = INT64_MAX;
      journal->queue_len = 0;
-    journal->queue_max_len = 0;
-    journal->queue_is_woken = false;
+    journal->queue_max_len = INT64_MAX;
+    journal->queue_is_awake = false;
      rlist_create(&journal->waiters);
  }



-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes
  2021-02-18 20:06   ` Serge Petrenko via Tarantool-patches
@ 2021-02-23 22:19     ` Vladislav Shpilevoy via Tarantool-patches
  2021-02-24 19:32       ` Serge Petrenko via Tarantool-patches
  0 siblings, 1 reply; 9+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-02-23 22:19 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

Hi! Thanks for the patch!

>>> diff --git a/src/box/journal.c b/src/box/journal.c
>>> index cb320b557..49441e596 100644
>>> --- a/src/box/journal.c
>>> +++ b/src/box/journal.c
>>> @@ -55,3 +55,66 @@ journal_entry_new(size_t n_rows, struct region *region,
>>>                    complete_data);
>>>       return entry;
>>>   }
>>> +
>>> +struct journal_queue_entry {
>>> +    /** The fiber waiting for queue space to free. */
>>> +    struct fiber *fiber;
>>> +    /** Whether the fiber should be waken up regardless of queue size. */
>>> +    bool is_ready;
>>> +    /** A link in all waiting fibers list. */
>>> +    struct rlist in_queue;
>>> +};
>>> +
>>> +/**
>>> + * Wake up the next waiter in journal queue.
>>> + */
>>> +static inline void
>>> +journal_queue_wakeup_next(struct rlist *link, bool force_ready)
>> 1. The flag is known in all usage places at compilation time. Is it
>> possible to split the function into force/normal versions? The same
>> for journal_queue_wakeup() from which this runtime uncertainty arises.
> 
> Actually, the parameter is not known at compile time when wakeup_next()
> is called from journal_wait_queue(). For now wakeup_next() only has a single
> check for force_ready, so moving the check outside would only increase the
> number of branches.
> 
> journal_queue_wakeup() is called only once per a whole queue wakeup, so
> I suppose it doesn't hurt much it has a compile-time known parameter.

Is it called once? Then why does it have `if (current_journal->queue_is_woken)`
check?

>>> +{
>>> +    /* Empty queue or last entry in queue. */
>>> +    if (link == rlist_last(&current_journal->waiters)) {
>> 2. I am not sure I understand what is happening here. Why is this
>> function in one place called with the pointer at the list itself,
>> and in another place with the pointer at one element?
> 
> Well, <list head> -> next is the fist list entry, right?

Perhaps. TBH, I don't remember and when see such tricky things in
the code, it takes time to understand it.

> In queue_wakeup() I wake the first waiter up.
> 
> Once any waiter gets woken up, it wakes up the next waiter.
> Which is <in_queue> -> next.
> 
> That's why I have a common helper for these two cases.

Ok, I see now. But it seems you could make it simpler, right?

====================
@@ -69,10 +69,10 @@ struct journal_queue_entry {
  * Wake up the next waiter in journal queue.
  */
 static inline void
-journal_queue_wakeup_next(struct rlist *link, bool force_ready)
+journal_queue_wakeup_first(bool force_ready)
 {
 	/* Empty queue or last entry in queue. */
-	if (link == rlist_last(&current_journal->waiters)) {
+	if (rlist_empty(&current_journal->waiters)) {
 		current_journal->queue_is_woken = false;
 		return;
 	}
@@ -97,7 +97,7 @@ journal_queue_wakeup(bool force_ready)
 	if (current_journal->queue_is_woken)
 		return;
 	current_journal->queue_is_woken = true;
-	journal_queue_wakeup_next(&current_journal->waiters, force_ready);
+	journal_queue_wakeup_first(force_ready);
 }
 
 void
@@ -114,7 +114,7 @@ journal_wait_queue(void)
 	while (journal_queue_is_full() && !entry.is_ready)
 		fiber_yield();
 
-	journal_queue_wakeup_next(&entry.in_queue, entry.is_ready);
 	assert(&entry.in_queue == rlist_first(&current_journal->waiters));
 	rlist_del(&entry.in_queue);
+	journal_queue_wakeup_first(entry.is_ready);
 }
====================

(I didn't test it.)

>> 5. Can rlist_del be done along with fiber_wakeup()? Then you
>> wouldn't need is_woken maybe.
> 
> Looks like it can't.
> Say we have only one waiter. And remove it from the list on wakeup.
> The list would become empty and there'd be no way to check whether
> journal has any waiters, and we may reorder the entries (put new ones before
> the waiting one). This is not necessarily bad, because I put entries into queue
> before txn_begin(), but someone may call journal_wait_queue() from inside the
> transaction, or right before txn_commit(). Then it might be bad to put other
> transactions before this one.

Order change is definitely not acceptable.

> So while removing is_woken we would have to introduce queue_has_waiters flag for
> the sake of this single waiter.

It would rather become a counter - number of waiters. Because there can
be many. But yeah, I see the problem.

>>> +}
>>> +
>>> +/**
>>> + * Check whether anyone is waiting for the journal queue to empty. If there are
>>> + * other waiters we must go after them to preserve write order.
>>> + */
>>> +static inline bool
>>> +journal_queue_has_waiters(void)
>>> +{
>>> +    return !rlist_empty(&current_journal->waiters);
>>> +}
>>> +
>>> +/** Yield until there's some space in the journal queue. */
>>> +void
>>> +journal_wait_queue(void);
>>> +
>>> +/** Set maximal journal queue size in bytes. */
>>> +static inline void
>>> +journal_queue_set_max_size(struct journal *j, int64_t size)
>> 7. Why do we have journal parameter here, but don't have it in
>> the other functions? The same journal_queue_set_max_len.
> 
> This is my attempt to make sure only wal_writer's journal has a queue.
> I explicitly set queue_max_... parameters only for wal_writer's journal.
> And then there's an assert that journal_queue_set_...() is only called with
> the current journal.

Or the assertion could be done in wal_set_queue_*() functions. To keep the
journal API consistent.

I just realized, journal can be easily unit-tested. It does not depend on
anything except small/ and core/ libs. Although seems like a lot of work so
maybe not now. Probably later, for something more complex and harder to test
via functional tests. However if you would write tests now, it would be
greatly appreciated.

>>> +{
>>> +    assert(j == current_journal);
>>> +    j->queue_max_size = size;
>>> +    if (journal_queue_has_waiters() && !journal_queue_is_full())
>>> +        journal_queue_wakeup(false);
>>> +}
>>> @@ -159,6 +264,12 @@ journal_write(struct journal_entry *entry)
>>>   static inline int
>>>   journal_write_async(struct journal_entry *entry)
>>>   {
>>> +    /*
>>> +     * It's the job of the caller to check whether the queue is full prior
>>> +     * to submitting the request.
>> 8. Maybe add an assert though.
> 
> I wanted to, but it's impossible.
> The queue may be full when all the waiters are forcefully waken up by a
> synchronous commit. And it's hard to tell whether it was a "force" wakeup
> or not. So let's just hope noone misuses this API.

Yeah, I see now.

> Or, even better, I can remove is_ready field from queue entries and add a new field
> to the journal: queue_is_ready or something. And addition to queue_is_awake.
> Then every entry will check queue_is_ready instead of entry.is_ready and
> it'll be possible to add an assert here: !journal_queue_is_full || journal_queue_is_ready
> Looks like this'll also allow us to extract queue_wakeup_(next)_force, like you suggested
> in paragraph 1.
> What do you think ?

Sounds good, worth doing.

See 2 comments below.

>>> +     */
>>> +    journal_queue_on_append(entry);
>>> +
>>>       return current_journal->write_async(current_journal, entry);
>>>   }> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 553db76fc..7c2452d2b 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>  		goto success;
>  	}
>  
> +	/*
> +	 * Do not spam WAL with excess write requests, let it process what's
> +	 * piled up first.
> +	 * This is done before opening the transaction to avoid problems with
> +	 * yielding inside it.
> +	 */
> +	if (journal_queue_is_full())
> +		journal_wait_queue();

1. I just noticed you do the waiting before starting the
transaction. In case of Vinyl the transaction can yield. So
by the time you get to commit, the queue could be full.

Don't know what to do with this. We can't wait before
txn_commit_async() because it would kill the memtx transactions.

Maybe we could not to care now. Because overpopulation never
will exceed number of appliers, which is tiny.

But when async transactions will go to the public API, we
will face this issue anyway. I assume we will need to extract
txn_prepare to the "public" part of txn.h and use it separately
from writing to the journal. So in our code it would look like
this:

    sync:  txn_begin() ... txn_commit()
    async: txn_begin() ... txn_prepare() journal_wait() txn_persist()

or something similar. But don't know for sure.

Summary: leave it as is if don't want to tear commit_async()
and commit() up into parts now.

> +
>  	/**
>  	 * Explicitly begin the transaction so that we can
>  	 * control fiber->gc life cycle and, in case of apply
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 5d8d5a726..d295dfa4b 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -159,6 +264,12 @@ journal_write(struct journal_entry *entry)
>  static inline int
>  journal_write_async(struct journal_entry *entry)
>  {
> +	/*
> +	 * It's the job of the caller to check whether the queue is full prior
> +	 * to submitting the request.
> +	 */
> +	journal_queue_on_append(entry);
> +
>  	return current_journal->write_async(current_journal, entry);

2. What if write_async() is called by some applier when the queue is
not full, but also not empty? It seems it will bypass the existing
waiters and lead to the transaction order change. No?

I start thinking that we need to queue the journal_entry objects right
in the journal object. So if their queue is not empty,
journal_write_async() adds the entry to the queue and does not call
write_async().

Also would be cool to add a test how the applier can reorder WAL writes
in the current patch.

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes
  2021-02-23 22:19     ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-02-24 19:32       ` Serge Petrenko via Tarantool-patches
  2021-02-26  0:58         ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 1 reply; 9+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-02-24 19:32 UTC (permalink / raw)
  To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches



24.02.2021 01:19, Vladislav Shpilevoy пишет:
> Hi! Thanks for the patch!

Thanks for the review!
Please find my answers below and v3 of the patch in your inbox.

>
>>>> diff --git a/src/box/journal.c b/src/box/journal.c
>>>> index cb320b557..49441e596 100644
>>>> --- a/src/box/journal.c
>>>> +++ b/src/box/journal.c
>>>> @@ -55,3 +55,66 @@ journal_entry_new(size_t n_rows, struct region *region,
>>>>                     complete_data);
>>>>        return entry;
>>>>    }
>>>> +
>>>> +struct journal_queue_entry {
>>>> +    /** The fiber waiting for queue space to free. */
>>>> +    struct fiber *fiber;
>>>> +    /** Whether the fiber should be waken up regardless of queue size. */
>>>> +    bool is_ready;
>>>> +    /** A link in all waiting fibers list. */
>>>> +    struct rlist in_queue;
>>>> +};
>>>> +
>>>> +/**
>>>> + * Wake up the next waiter in journal queue.
>>>> + */
>>>> +static inline void
>>>> +journal_queue_wakeup_next(struct rlist *link, bool force_ready)
>>> 1. The flag is known in all usage places at compilation time. Is it
>>> possible to split the function into force/normal versions? The same
>>> for journal_queue_wakeup() from which this runtime uncertainty arises.
>> Actually, the parameter is not known at compile time when wakeup_next()
>> is called from journal_wait_queue(). For now wakeup_next() only has a single
>> check for force_ready, so moving the check outside would only increase the
>> number of branches.
>>
>> journal_queue_wakeup() is called only once per a whole queue wakeup, so
>> I suppose it doesn't hurt much it has a compile-time known parameter.
> Is it called once? Then why does it have `if (current_journal->queue_is_woken)`
> check?

I was trying to say 'it's called once per a bunch of wakeup_next() calls'
Just ignore this. This is irrelevant.
Actually, no, it may be called multiple times, from every 
journal_async_complete().
But it is a no-op for each consequent call, except the first one. (while 
the queue is
being cleared).

>
>>>> +{
>>>> +    /* Empty queue or last entry in queue. */
>>>> +    if (link == rlist_last(&current_journal->waiters)) {
>>> 2. I am not sure I understand what is happening here. Why is this
>>> function in one place called with the pointer at the list itself,
>>> and in another place with the pointer at one element?
>> Well, <list head> -> next is the fist list entry, right?
> Perhaps. TBH, I don't remember and when see such tricky things in
> the code, it takes time to understand it.
>
>> In queue_wakeup() I wake the first waiter up.
>>
>> Once any waiter gets woken up, it wakes up the next waiter.
>> Which is <in_queue> -> next.
>>
>> That's why I have a common helper for these two cases.
> Ok, I see now. But it seems you could make it simpler, right?
>
> ====================
> @@ -69,10 +69,10 @@ struct journal_queue_entry {
>    * Wake up the next waiter in journal queue.
>    */
>   static inline void
> -journal_queue_wakeup_next(struct rlist *link, bool force_ready)
> +journal_queue_wakeup_first(bool force_ready)
>   {
>   	/* Empty queue or last entry in queue. */
> -	if (link == rlist_last(&current_journal->waiters)) {
> +	if (rlist_empty(&current_journal->waiters)) {
>   		current_journal->queue_is_woken = false;
>   		return;
>   	}
> @@ -97,7 +97,7 @@ journal_queue_wakeup(bool force_ready)
>   	if (current_journal->queue_is_woken)
>   		return;
>   	current_journal->queue_is_woken = true;
> -	journal_queue_wakeup_next(&current_journal->waiters, force_ready);
> +	journal_queue_wakeup_first(force_ready);
>   }
>   
>   void
> @@ -114,7 +114,7 @@ journal_wait_queue(void)
>   	while (journal_queue_is_full() && !entry.is_ready)
>   		fiber_yield();
>   
> -	journal_queue_wakeup_next(&entry.in_queue, entry.is_ready);
>   	assert(&entry.in_queue == rlist_first(&current_journal->waiters));
>   	rlist_del(&entry.in_queue);
> +	journal_queue_wakeup_first(entry.is_ready);
>   }
> ====================
>
> (I didn't test it.)

Yes, indeed. Thanks!
Applied with minor changes.

>>> 5. Can rlist_del be done along with fiber_wakeup()? Then you
>>> wouldn't need is_woken maybe.
>> Looks like it can't.
>> Say we have only one waiter. And remove it from the list on wakeup.
>> The list would become empty and there'd be no way to check whether
>> journal has any waiters, and we may reorder the entries (put new ones before
>> the waiting one). This is not necessarily bad, because I put entries into queue
>> before txn_begin(), but someone may call journal_wait_queue() from inside the
>> transaction, or right before txn_commit(). Then it might be bad to put other
>> transactions before this one.
> Order change is definitely not acceptable.
>
>> So while removing is_woken we would have to introduce queue_has_waiters flag for
>> the sake of this single waiter.
> It would rather become a counter - number of waiters. Because there can
> be many. But yeah, I see the problem.
>
>>>> +}
>>>> +
>>>> +/**
>>>> + * Check whether anyone is waiting for the journal queue to empty. If there are
>>>> + * other waiters we must go after them to preserve write order.
>>>> + */
>>>> +static inline bool
>>>> +journal_queue_has_waiters(void)
>>>> +{
>>>> +    return !rlist_empty(&current_journal->waiters);
>>>> +}
>>>> +
>>>> +/** Yield until there's some space in the journal queue. */
>>>> +void
>>>> +journal_wait_queue(void);
>>>> +
>>>> +/** Set maximal journal queue size in bytes. */
>>>> +static inline void
>>>> +journal_queue_set_max_size(struct journal *j, int64_t size)
>>> 7. Why do we have journal parameter here, but don't have it in
>>> the other functions? The same journal_queue_set_max_len.
>> This is my attempt to make sure only wal_writer's journal has a queue.
>> I explicitly set queue_max_... parameters only for wal_writer's journal.
>> And then there's an assert that journal_queue_set_...() is only called with
>> the current journal.
> Or the assertion could be done in wal_set_queue_*() functions. To keep the
> journal API consistent.

Actually, struct journal has a ton queue_* members now, so I'm following 
your
older advice and extracting everything related to queues into struct 
journal_queue.

>
> I just realized, journal can be easily unit-tested. It does not depend on
> anything except small/ and core/ libs. Although seems like a lot of work so
> maybe not now. Probably later, for something more complex and harder to test
> via functional tests. However if you would write tests now, it would be
> greatly appreciated.
>
>>>> +{
>>>> +    assert(j == current_journal);
>>>> +    j->queue_max_size = size;c
>>>> +    if (journal_queue_has_waiters() && !journal_queue_is_full())
>>>> +        journal_queue_wakeup(false);
>>>> +}
>>>> @@ -159,6 +264,12 @@ journal_write(struct journal_entry *entry)
>>>>    static inline int
>>>>    journal_write_async(struct journal_entry *entry)
>>>>    {
>>>> +    /*
>>>> +     * It's the job of the caller to check whether the queue is full prior
>>>> +     * to submitting the request.
>>> 8. Maybe add an assert though.
>> I wanted to, but it's impossible.
>> The queue may be full when all the waiters are forcefully waken up by a
>> synchronous commit. And it's hard to tell whether it was a "force" wakeup
>> or not. So let's just hope noone misuses this API.
> Yeah, I see now.
>
>> Or, even better, I can remove is_ready field from queue entries and add a new field
>> to the journal: queue_is_ready or something. And addition to queue_is_awake.
>> Then every entry will check queue_is_ready instead of entry.is_ready and
>> it'll be possible to add an assert here: !journal_queue_is_full || journal_queue_is_ready
>> Looks like this'll also allow us to extract queue_wakeup_(next)_force, like you suggested
>> in paragraph 1.
>> What do you think ?
> Sounds good, worth doing.

I introduced queue_is_ready and removed entry.is_ready.

The code looks cleaner now and together with your suggestion regarding
journal_queue_wakeup_first(), now it doesn't have parameters at all.
It does have a check for queue_is_ready internally, but there's no point 
in separating
_force and normal versions. This would simply move the check outside the 
function call.

>
> See 2 comments below.
>
>>>> +     */
>>>> +    journal_queue_on_append(entry);
>>>> +
>>>>        return current_journal->write_async(current_journal, entry);
>>>>    }> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 553db76fc..7c2452d2b 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>>   		goto success;
>>   	}
>>   
>> +	/*
>> +	 * Do not spam WAL with excess write requests, let it process what's
>> +	 * piled up first.
>> +	 * This is done before opening the transaction to avoid problems with
>> +	 * yielding inside it.
>> +	 */
>> +	if (journal_queue_is_full())
>> +		journal_wait_queue();
> 1. I just noticed you do the waiting before starting the
> transaction. In case of Vinyl the transaction can yield. So
> by the time you get to commit, the queue could be full.
>
> Don't know what to do with this. We can't wait before
> txn_commit_async() because it would kill the memtx transactions.
>
> Maybe we could not to care now. Because overpopulation never
> will exceed number of appliers, which is tiny.
>
> But when async transactions will go to the public API, we
> will face this issue anyway. I assume we will need to extract
> txn_prepare to the "public" part of txn.h and use it separately
> from writing to the journal. So in our code it would look like
> this:
>
>      sync:  txn_begin() ... txn_commit()
>      async: txn_begin() ... txn_prepare() journal_wait() txn_persist()
>
> or something similar. But don't know for sure.
>
> Summary: leave it as is if don't want to tear commit_async()
> and commit() up into parts now.

Let's leave it as is for now then.

>
>> +
>>   	/**
>>   	 * Explicitly begin the transaction so that we can
>>   	 * control fiber->gc life cycle and, in case of apply
>> diff --git a/src/box/journal.h b/src/box/journal.h
>> index 5d8d5a726..d295dfa4b 100644
>> --- a/src/box/journal.h
>> +++ b/src/box/journal.h
>> @@ -159,6 +264,12 @@ journal_write(struct journal_entry *entry)
>>   static inline int
>>   journal_write_async(struct journal_entry *entry)
>>   {
>> +	/*
>> +	 * It's the job of the caller to check whether the queue is full prior
>> +	 * to submitting the request.
>> +	 */
>> +	journal_queue_on_append(entry);
>> +
>>   	return current_journal->write_async(current_journal, entry);
> 2. What if write_async() is called by some applier when the queue is
> not full, but also not empty? It seems it will bypass the existing
> waiters and lead to the transaction order change. No?

Yes, you're correct, and thanks for noticing this.
This is fixed simply:

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 7c2452d2b..27ddd0f29 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -973,7 +973,7 @@ applier_apply_tx(struct applier *applier, struct 
stailq *rows)
          * This is done before opening the transaction to avoid 
problems with
          * yielding inside it.
          */
-       if (journal_queue_is_full())
+       if (journal_queue_is_full() || journal_queue_has_waiters())
                 journal_wait_queue();

         /**


Having this fix applied, nothing else could go wrong here AFAICS.

>
> I start thinking that we need to queue the journal_entry objects right
> in the journal object. So if their queue is not empty,
> journal_write_async() adds the entry to the queue and does not call
> write_async().

Why?

>
> Also would be cool to add a test how the applier can reorder WAL writes
> in the current patch.

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes
  2021-02-24 19:32       ` Serge Petrenko via Tarantool-patches
@ 2021-02-26  0:58         ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 0 replies; 9+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-02-26  0:58 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

>> I start thinking that we need to queue the journal_entry objects right
>> in the journal object. So if their queue is not empty,
>> journal_write_async() adds the entry to the queue and does not call
>> write_async().
> 
> Why?

Nevermind, I probably didn't think it through enough at that moment.

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2021-02-26  0:58 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-02-11 12:17 [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes Serge Petrenko via Tarantool-patches
2021-02-15 11:17 ` Cyrill Gorcunov via Tarantool-patches
2021-02-16 12:47   ` Serge Petrenko via Tarantool-patches
2021-02-16 12:49     ` Cyrill Gorcunov via Tarantool-patches
2021-02-17 20:46 ` Vladislav Shpilevoy via Tarantool-patches
2021-02-18 20:06   ` Serge Petrenko via Tarantool-patches
2021-02-23 22:19     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-24 19:32       ` Serge Petrenko via Tarantool-patches
2021-02-26  0:58         ` Vladislav Shpilevoy via Tarantool-patches

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox