[Tarantool-patches] [PATCH] wal: introduce limits on simultaneous writes

Serge Petrenko sergepetrenko at tarantool.org
Wed Jan 27 13:11:19 MSK 2021


Since the introduction of asynchronous commit, which doesn't wait for a
WAL write to succeed, it's quite easy to clog WAL with huge amounts
write requests. For now, it's only possible from an applier, since it's
the only user of async commit at the moment.

Imagine such a situation: there are 2 servers, a master and a replica,
and the replica is down for some period of time. While the replica is
down, the master serves requests at a reasonable pace, possibly close to
its WAL throughput limit. Once the replica reconnects, it has to receive
all the data master has piled up. Now there's no limit in speed at which
master sends the data to replica, and there's no limit at which
replica's applier submits corresponding write requests to WAL. This
leads to a situation when replica's WAL is never in time to serve the
requests and the amount of pending requests is constantly growing.

To ameliorate such behavior, we need to introduce some limit on
not-yet-finished WAL write requests. This is what this commit is trying
to do.
Two new counters are added to wal writer: queue_size (in bytes) and
queue_len (in wal messages) together with configuration settings:
`wal_queue_max_size` and `wal_queue_max_len`.
Size and length are increased on every new submitted request, and are
decreased once the tx receives a confirmation that a specific request
was written.

Once size or len reach their maximum values, new write requests are
blocked (even for async writes) until the queue gets some free space.

The size limit isn't strict, i.e. if there's at least one free byte, the
whole write request will be added.

Part of #5536

@TarantoolBot document
Title: new configuration options: 'wal_queue_max_size', 'wal_queue_max_len'

`wal_queue_max_size` and `wal_queue_max_len` put a limit on the amount
of concurrent write requests submitted to WAL.
`wal_queue_max_size` is measured in number of bytes to be written (0
means unlimited), and `wal_queue_max_len` is measured in number of wal
messages (correlates to number of bytes / 1024), 0 meaning unlimited.
These options only affect replica behaviour at the moment, and default
to 0. They limit the pace at which replica reads new transactions from
master.
---
https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom
https://github.com/tarantool/tarantool/issues/5536

I didn't implement the `block_on_queue` option since it would always be `true`
for now.

 src/box/box.cc                              |  46 +++++++
 src/box/box.h                               |   2 +
 src/box/journal.h                           |   5 +
 src/box/lua/cfg.cc                          |  18 +++
 src/box/lua/load_cfg.lua                    |   6 +
 src/box/txn.c                               |  11 +-
 src/box/wal.c                               | 139 +++++++++++++++++++-
 src/box/wal.h                               |   8 ++
 test/app-tap/init_script.result             |   2 +
 test/box-tap/cfg.test.lua                   |   4 +-
 test/box/admin.result                       |   4 +
 test/box/cfg.result                         |   8 ++
 test/replication/gh-5536-wal-limit.result   | 132 +++++++++++++++++++
 test/replication/gh-5536-wal-limit.test.lua |  58 ++++++++
 test/replication/suite.cfg                  |   1 +
 test/replication/suite.ini                  |   2 +-
 16 files changed, 439 insertions(+), 7 deletions(-)
 create mode 100644 test/replication/gh-5536-wal-limit.result
 create mode 100644 test/replication/gh-5536-wal-limit.test.lua

diff --git a/src/box/box.cc b/src/box/box.cc
index 26cbe8aab..2b335599e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -754,6 +754,28 @@ box_check_wal_mode(const char *mode_name)
 	return (enum wal_mode) mode;
 }
 
+static int64_t
+box_check_wal_queue_max_len(void)
+{
+	int64_t len = cfg_geti64("wal_queue_max_len");
+	if (len < 0) {
+		diag_set(ClientError, ER_CFG, "wal_queue_max_len",
+			 "wal_queue_max_len must be >= 0");
+	}
+	return len;
+}
+
+static int64_t
+box_check_wal_queue_max_size(void)
+{
+	int64_t size = cfg_geti64("wal_queue_max_size");
+	if (size < 0) {
+		diag_set(ClientError, ER_CFG, "wal_queue_max_size",
+			 "wal_queue_max_size must be >= 0");
+	}
+	return size;
+}
+
 static void
 box_check_readahead(int readahead)
 {
@@ -875,6 +897,10 @@ box_check_config(void)
 	box_check_checkpoint_count(cfg_geti("checkpoint_count"));
 	box_check_wal_max_size(cfg_geti64("wal_max_size"));
 	box_check_wal_mode(cfg_gets("wal_mode"));
+	if (box_check_wal_queue_max_size() < 0)
+		diag_raise();
+	if (box_check_wal_queue_max_len() < 0)
+		diag_raise();
 	if (box_check_memory_quota("memtx_memory") < 0)
 		diag_raise();
 	box_check_memtx_min_tuple_size(cfg_geti64("memtx_min_tuple_size"));
@@ -1411,6 +1437,26 @@ box_set_checkpoint_wal_threshold(void)
 	wal_set_checkpoint_threshold(threshold);
 }
 
+int
+box_set_wal_queue_max_size(void)
+{
+	int64_t size = box_check_wal_queue_max_size();
+	if (size < 0)
+		return -1;
+	wal_set_queue_max_size(size);
+	return 0;
+}
+
+int
+box_set_wal_queue_max_len(void)
+{
+	int64_t len = box_check_wal_queue_max_len();
+	if (len < 0)
+		return -1;
+	wal_set_queue_max_len(len);
+	return 0;
+}
+
 void
 box_set_vinyl_memory(void)
 {
diff --git a/src/box/box.h b/src/box/box.h
index b68047a95..4f5b4b617 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -239,6 +239,8 @@ void box_set_readahead(void);
 void box_set_checkpoint_count(void);
 void box_set_checkpoint_interval(void);
 void box_set_checkpoint_wal_threshold(void);
+int box_set_wal_queue_max_size(void);
+int box_set_wal_queue_max_len(void);
 void box_set_memtx_memory(void);
 void box_set_memtx_max_tuple_size(void);
 void box_set_vinyl_memory(void);
diff --git a/src/box/journal.h b/src/box/journal.h
index 5d8d5a726..6e920c65d 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -140,6 +140,11 @@ journal_async_complete(struct journal_entry *entry)
  */
 extern struct journal *current_journal;
 
+enum {
+	JOURNAL_RC_ERROR = -1,
+	JOURNAL_RC_ROLLBACK = -2,
+};
+
 /**
  * Write a single entry to the journal in synchronous way.
  *
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 2d3ccbf0e..35f410710 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -164,6 +164,22 @@ lbox_cfg_set_checkpoint_wal_threshold(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_cfg_set_wal_queue_max_size(struct lua_State *L)
+{
+	if (box_set_wal_queue_max_size() != 0)
+		luaT_error(L);
+	return 0;
+}
+
+static int
+lbox_cfg_set_wal_queue_max_len(struct lua_State *L)
+{
+	if (box_set_wal_queue_max_len() != 0)
+		luaT_error(L);
+	return 0;
+}
+
 static int
 lbox_cfg_set_read_only(struct lua_State *L)
 {
@@ -399,6 +415,8 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count},
 		{"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval},
 		{"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold},
+		{"cfg_set_wal_queue_max_size", lbox_cfg_set_wal_queue_max_size},
+		{"cfg_set_wal_queue_max_len", lbox_cfg_set_wal_queue_max_len},
 		{"cfg_set_read_only", lbox_cfg_set_read_only},
 		{"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory},
 		{"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size},
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 574c8bef4..c11a9e103 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -71,6 +71,8 @@ local default_cfg = {
     wal_mode            = "write",
     wal_max_size        = 256 * 1024 * 1024,
     wal_dir_rescan_delay= 2,
+    wal_queue_max_size  = 0,
+    wal_queue_max_len   = 0,
     force_recovery      = false,
     replication         = nil,
     instance_uuid       = nil,
@@ -163,6 +165,8 @@ local template_cfg = {
     coredump            = 'boolean',
     checkpoint_interval = 'number',
     checkpoint_wal_threshold = 'number',
+    wal_queue_max_size  = 'number',
+    wal_queue_max_len   = 'number',
     checkpoint_count    = 'number',
     read_only           = 'boolean',
     hot_standby         = 'boolean',
@@ -277,6 +281,8 @@ local dynamic_cfg = {
     checkpoint_count        = private.cfg_set_checkpoint_count,
     checkpoint_interval     = private.cfg_set_checkpoint_interval,
     checkpoint_wal_threshold = private.cfg_set_checkpoint_wal_threshold,
+    wal_queue_max_size      = private.cfg_set_wal_queue_max_size,
+    wal_queue_max_len       = private.cfg_set_wal_queue_max_len,
     worker_pool_threads     = private.cfg_set_worker_pool_threads,
     feedback_enabled        = ifdef_feedback_set_params,
     feedback_crashinfo      = ifdef_feedback_set_params,
diff --git a/src/box/txn.c b/src/box/txn.c
index a5edbfc60..d408d7112 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -860,10 +860,17 @@ txn_commit_async(struct txn *txn)
 	}
 
 	fiber_set_txn(fiber(), NULL);
-	if (journal_write_async(req) != 0) {
-		fiber_set_txn(fiber(), txn);
+	int rc;
+	if ((rc = journal_write_async(req)) != 0) {
 		diag_set(ClientError, ER_WAL_IO);
 		diag_log();
+		/*
+		 * The transaction was rolled back and freed while waiting for
+		 * WAL queue.
+		 */
+		if (rc == JOURNAL_RC_ROLLBACK)
+			return -1;
+		fiber_set_txn(fiber(), txn);
 		goto rollback;
 	}
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 937d47ba9..e38ee8a8e 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -144,6 +144,17 @@ struct wal_writer
 	 * time to trigger checkpointing.
 	 */
 	int64_t checkpoint_threshold;
+	/**
+	 * The maximum allowed amount of pending data to be written before
+	 * stalling new tx commits.
+	 */
+	int64_t queue_max_size;
+	/** Current pending data size, in bytes. */
+	int64_t queue_size;
+	/** The maximum amount of pending outgoing wal messages. */
+	int64_t queue_max_len;
+	/** Current amount of pending wal messages. */
+	int64_t queue_len;
 	/**
 	 * This flag is set if the WAL thread has notified TX that
 	 * the checkpoint threshold has been exceeded. It is cleared
@@ -166,6 +177,7 @@ struct wal_writer
 	 * Used for replication relays.
 	 */
 	struct rlist watchers;
+	struct rlist waiters;
 };
 
 struct wal_msg {
@@ -183,6 +195,29 @@ struct wal_msg {
 	struct vclock vclock;
 };
 
+/**
+ * Possible wal waiter states. There is no "SUCCESS" since the waiter decides
+ * whether it's succeeded or not on its own.
+ */
+enum wal_waiter_state {
+	WAL_WAITER_ROLLBACK = -1,
+	WAL_WAITER_PENDING = 0,
+};
+
+/**
+ * A journal entry waiting for the WAL queue to empty before submitting a write
+ * request.
+ */
+struct wal_waiter {
+	/* The waiting fiber. */
+	struct fiber *fiber;
+	/* The pending entry. Used for cascading rollback. */
+	struct journal_entry *entry;
+	enum wal_waiter_state state;
+	/* Link in waiter list. */
+	struct rlist in_list;
+};
+
 /**
  * Vinyl metadata log writer.
  */
@@ -332,6 +367,16 @@ tx_complete_rollback(void)
 			      fifo) != writer->last_entry)
 		return;
 	stailq_reverse(&writer->rollback);
+	/*
+	 * Every waiting entry came after any of the successfully submitted
+	 * entries, so it must be rolled back first to preserve correct order.
+	 */
+	struct wal_waiter *waiter;
+	rlist_foreach_entry(waiter, &writer->waiters, in_list) {
+		stailq_add_entry(&writer->rollback, waiter->entry, fifo);
+		waiter->state = WAL_WAITER_ROLLBACK;
+		fiber_wakeup(waiter->fiber);
+	}
 	tx_schedule_queue(&writer->rollback);
 	/* TX-thread can try sending transactions to WAL again. */
 	stailq_create(&writer->rollback);
@@ -343,6 +388,16 @@ tx_complete_rollback(void)
 	cpipe_push(&writer->wal_pipe, &msg);
 }
 
+static void
+wal_wakeup_waiters()
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	struct wal_waiter *waiter;
+	rlist_foreach_entry(waiter, &writer->waiters, in_list)
+		fiber_wakeup(waiter->fiber);
+}
+
+
 /**
  * Complete execution of a batch of WAL write requests:
  * schedule all committed requests, and, should there
@@ -368,7 +423,15 @@ tx_complete_batch(struct cmsg *msg)
 	/* Update the tx vclock to the latest written by wal. */
 	vclock_copy(&replicaset.vclock, &batch->vclock);
 	tx_schedule_queue(&batch->commit);
+	writer->queue_len--;
+	writer->queue_size -= batch->approx_len;
 	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
+	/*
+	 * Do not wake up waiters if we see there's a rollback planned.
+	 * We'll handle them together with other rolled back entries.
+	 */
+	if (stailq_empty(&writer->rollback))
+		wal_wakeup_waiters();
 }
 
 /**
@@ -442,6 +505,12 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	writer->on_garbage_collection = on_garbage_collection;
 	writer->on_checkpoint_threshold = on_checkpoint_threshold;
 
+	rlist_create(&writer->waiters);
+	writer->queue_size = 0;
+	writer->queue_max_size = 0;
+	writer->queue_len = 0;
+	writer->queue_max_len = 0;
+
 	mempool_create(&writer->msg_pool, &cord()->slabc,
 		       sizeof(struct wal_msg));
 }
@@ -765,6 +834,27 @@ wal_set_checkpoint_threshold(int64_t threshold)
 	fiber_set_cancellable(cancellable);
 }
 
+static inline bool
+wal_queue_is_full(void);
+
+void
+wal_set_queue_max_size(int64_t size)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	writer->queue_max_size = size;
+	if (!wal_queue_is_full() && stailq_empty(&writer->rollback))
+		wal_wakeup_waiters();
+}
+
+void
+wal_set_queue_max_len(int64_t len)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	writer->queue_max_len = len;
+	if (!wal_queue_is_full() && stailq_empty(&writer->rollback))
+		wal_wakeup_waiters();
+}
+
 struct wal_gc_msg
 {
 	struct cbus_call_msg base;
@@ -1218,6 +1308,40 @@ wal_writer_f(va_list ap)
 	return 0;
 }
 
+static inline bool
+wal_queue_is_full(void)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	return (writer->queue_max_len > 0 &&
+		writer->queue_len >= writer->queue_max_len) ||
+	       (writer->queue_max_size > 0 &&
+		writer->queue_size >= writer->queue_max_size);
+}
+
+static int
+wal_wait_queue(struct journal_entry *entry)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	/*
+	 * To preserve correct order the tx must be put in queue when it's not
+	 * empty.
+	 */
+	if (!wal_queue_is_full() && rlist_empty(&writer->waiters))
+		return 0;
+	struct wal_waiter waiter = {
+		.fiber = fiber(),
+		.entry = entry,
+		.state = WAL_WAITER_PENDING,
+	};
+	rlist_add_tail_entry(&writer->waiters, &waiter, in_list);
+	do {
+		fiber_yield();
+	} while (wal_queue_is_full() &&
+		 waiter.state != WAL_WAITER_ROLLBACK);
+	rlist_del_entry(&waiter, in_list);
+	return waiter.state == WAL_WAITER_ROLLBACK ? -1 : 0;
+}
+
 /**
  * WAL writer main entry point: queue a single request
  * to be written to disk.
@@ -1226,6 +1350,7 @@ static int
 wal_write_async(struct journal *journal, struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
+	int rc = -1;
 
 	ERROR_INJECT(ERRINJ_WAL_IO, {
 		goto fail;
@@ -1245,6 +1370,11 @@ wal_write_async(struct journal *journal, struct journal_entry *entry)
 		goto fail;
 	}
 
+	if (wal_wait_queue(entry) != 0) {
+		rc = JOURNAL_RC_ROLLBACK;
+		goto fail;
+	}
+
 	struct wal_msg *batch;
 	if (!stailq_empty(&writer->wal_pipe.input) &&
 	    (batch = wal_msg(stailq_first_entry(&writer->wal_pipe.input,
@@ -1259,6 +1389,7 @@ wal_write_async(struct journal *journal, struct journal_entry *entry)
 			goto fail;
 		}
 		wal_msg_create(batch);
+		writer->queue_len++;
 		/*
 		 * Sic: first add a request, then push the batch,
 		 * since cpipe_push() may pass the batch to WAL
@@ -1274,6 +1405,7 @@ wal_write_async(struct journal *journal, struct journal_entry *entry)
 	 */
 	writer->last_entry = entry;
 	batch->approx_len += entry->approx_len;
+	writer->queue_size += entry->approx_len;
 	writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
 #ifndef NDEBUG
 	++errinj(ERRINJ_WAL_WRITE_COUNT, ERRINJ_INT)->iparam;
@@ -1283,7 +1415,7 @@ wal_write_async(struct journal *journal, struct journal_entry *entry)
 
 fail:
 	entry->res = -1;
-	return -1;
+	return rc;
 }
 
 static int
@@ -1293,8 +1425,9 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	 * We can reuse async WAL engine transparently
 	 * to the caller.
 	 */
-	if (wal_write_async(journal, entry) != 0)
-		return -1;
+	int rc;
+	if ((rc = wal_write_async(journal, entry)) != 0)
+		return rc;
 
 	bool cancellable = fiber_set_cancellable(false);
 	fiber_yield();
diff --git a/src/box/wal.h b/src/box/wal.h
index ca43dc6eb..491335126 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -237,6 +237,14 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint);
 void
 wal_set_checkpoint_threshold(int64_t threshold);
 
+/** Set the maximum size of pending data in WAL queue in bytes. */
+void
+wal_set_queue_max_size(int64_t size);
+
+/** Set the maximum amount of outgoing WAL messages. */
+void
+wal_set_queue_max_len(int64_t len);
+
 /**
  * Remove WAL files that are not needed by consumers reading
  * rows at @vclock or newer.
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 16c5b01d2..7a224e50e 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -56,6 +56,8 @@ wal_dir:.
 wal_dir_rescan_delay:2
 wal_max_size:268435456
 wal_mode:write
+wal_queue_max_len:0
+wal_queue_max_size:0
 worker_pool_threads:4
 --
 -- Test insert from detached fiber
diff --git a/test/box-tap/cfg.test.lua b/test/box-tap/cfg.test.lua
index a577f023d..3276ddf64 100755
--- a/test/box-tap/cfg.test.lua
+++ b/test/box-tap/cfg.test.lua
@@ -6,7 +6,7 @@ local socket = require('socket')
 local fio = require('fio')
 local uuid = require('uuid')
 local msgpack = require('msgpack')
-test:plan(108)
+test:plan(110)
 
 --------------------------------------------------------------------------------
 -- Invalid values
@@ -49,6 +49,8 @@ invalid('vinyl_run_count_per_level', 0)
 invalid('vinyl_run_size_ratio', 1)
 invalid('vinyl_bloom_fpr', 0)
 invalid('vinyl_bloom_fpr', 1.1)
+invalid('wal_queue_max_size', -1)
+invalid('wal_queue_max_len', -1)
 
 local function invalid_combinations(name, val)
     local status, result = pcall(box.cfg, val)
diff --git a/test/box/admin.result b/test/box/admin.result
index 05debe673..c818f4f9f 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -133,6 +133,10 @@ cfg_filter(box.cfg)
     - 268435456
   - - wal_mode
     - write
+  - - wal_queue_max_len
+    - 0
+  - - wal_queue_max_size
+    - 0
   - - worker_pool_threads
     - 4
 ...
diff --git a/test/box/cfg.result b/test/box/cfg.result
index 22a720c2c..19f322e7d 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -121,6 +121,10 @@ cfg_filter(box.cfg)
  |     - 268435456
  |   - - wal_mode
  |     - write
+ |   - - wal_queue_max_len
+ |     - 0
+ |   - - wal_queue_max_size
+ |     - 0
  |   - - worker_pool_threads
  |     - 4
  | ...
@@ -236,6 +240,10 @@ cfg_filter(box.cfg)
  |     - 268435456
  |   - - wal_mode
  |     - write
+ |   - - wal_queue_max_len
+ |     - 0
+ |   - - wal_queue_max_size
+ |     - 0
  |   - - worker_pool_threads
  |     - 4
  | ...
diff --git a/test/replication/gh-5536-wal-limit.result b/test/replication/gh-5536-wal-limit.result
new file mode 100644
index 000000000..f7799baa8
--- /dev/null
+++ b/test/replication/gh-5536-wal-limit.result
@@ -0,0 +1,132 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+
+--
+-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so
+-- that appliers stop reading new transactions from master once the queue is
+-- full.
+--
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+_ = box.schema.space.create('test')
+ | ---
+ | ...
+_ = box.space.test:create_index('pk')
+ | ---
+ | ...
+
+replication_timeout = box.cfg.replication_timeout
+ | ---
+ | ...
+box.cfg{replication_timeout=1000}
+ | ---
+ | ...
+
+test_run:cmd('create server replica with rpl_master=default,\
+              script="replication/replica.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica with wait=True, wait_load=True')
+ | ---
+ | - true
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+-- Huge replication timeout to not cause reconnects while applier is blocked.
+-- Tiny queue size (in bytes) to allow exactly one queue entry at a time.
+box.cfg{wal_queue_max_size=1, replication_timeout=1000}
+ | ---
+ | ...
+write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT")
+ | ---
+ | ...
+-- Block WAL writes so that we may test queue overflow.
+box.error.injection.set("ERRINJ_WAL_DELAY", true)
+ | ---
+ | - ok
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+for i = 1,10 do box.space.test:insert{i} end
+ | ---
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+-- Wait for replication. Cannot rely on lsn bump here. It won't happen while
+-- WAL is blocked.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\
+end)
+ | ---
+ | - true
+ | ...
+require('fiber').sleep(0.5)
+ | ---
+ | ...
+-- Only one entry fits when the limit is small.
+assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1)
+ | ---
+ | - true
+ | ...
+box.error.injection.set("ERRINJ_WAL_DELAY", false)
+ | ---
+ | - ok
+ | ...
+
+-- Once the block is removed everything is written.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\
+end)
+ | ---
+ | - true
+ | ...
+assert(box.space.test:count() == 10)
+ | ---
+ | - true
+ | ...
+assert(box.info.replication[1].upstream.status == 'follow')
+ | ---
+ | - true
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+-- Cleanup.
+box.cfg{replication_timeout=replication_timeout}
+ | ---
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+box.space.test:drop()
+ | ---
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
+
diff --git a/test/replication/gh-5536-wal-limit.test.lua b/test/replication/gh-5536-wal-limit.test.lua
new file mode 100644
index 000000000..1e7d61ff7
--- /dev/null
+++ b/test/replication/gh-5536-wal-limit.test.lua
@@ -0,0 +1,58 @@
+test_run = require('test_run').new()
+fiber = require('fiber')
+
+--
+-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so
+-- that appliers stop reading new transactions from master once the queue is
+-- full.
+--
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('test')
+_ = box.space.test:create_index('pk')
+
+replication_timeout = box.cfg.replication_timeout
+box.cfg{replication_timeout=1000}
+
+test_run:cmd('create server replica with rpl_master=default,\
+              script="replication/replica.lua"')
+test_run:cmd('start server replica with wait=True, wait_load=True')
+
+test_run:switch('replica')
+-- Huge replication timeout to not cause reconnects while applier is blocked.
+-- Tiny queue size (in bytes) to allow exactly one queue entry at a time.
+box.cfg{wal_queue_max_size=1, replication_timeout=1000}
+write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT")
+-- Block WAL writes so that we may test queue overflow.
+box.error.injection.set("ERRINJ_WAL_DELAY", true)
+
+test_run:switch('default')
+
+for i = 1,10 do box.space.test:insert{i} end
+
+test_run:switch('replica')
+-- Wait for replication. Cannot rely on lsn bump here. It won't happen while
+-- WAL is blocked.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\
+end)
+require('fiber').sleep(0.5)
+-- Only one entry fits when the limit is small.
+assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1)
+box.error.injection.set("ERRINJ_WAL_DELAY", false)
+
+-- Once the block is removed everything is written.
+test_run:wait_cond(function()\
+    return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\
+end)
+assert(box.space.test:count() == 10)
+assert(box.info.replication[1].upstream.status == 'follow')
+
+test_run:switch('default')
+
+-- Cleanup.
+box.cfg{replication_timeout=replication_timeout}
+test_run:cmd('stop server replica')
+test_run:cmd('delete server replica')
+box.space.test:drop()
+box.schema.user.revoke('guest', 'replication')
+
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index c80430afc..7e7004592 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -37,6 +37,7 @@
     "gh-4928-tx-boundaries.test.lua": {},
     "gh-5440-qsync-ro.test.lua": {},
     "gh-5435-qsync-clear-synchro-queue-commit-all.test.lua": {},
+    "gh-5536-wal-limit.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index e4812df37..89abfabff 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
 script =  master.lua
 description = tarantool/box, replication
 disabled = consistent.test.lua
-release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua
+release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua
 config = suite.cfg
 lua_libs = lua/fast_replica.lua lua/rlimit.lua
 use_unix_sockets = True
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list