* [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes @ 2021-02-24 19:35 Serge Petrenko via Tarantool-patches 2021-02-24 19:40 ` Serge Petrenko via Tarantool-patches ` (4 more replies) 0 siblings, 5 replies; 30+ messages in thread From: Serge Petrenko via Tarantool-patches @ 2021-02-24 19:35 UTC (permalink / raw) To: gorcunov, v.shpilevoy; +Cc: tarantool-patches Since the introduction of asynchronous commit, which doesn't wait for a WAL write to succeed, it's quite easy to clog WAL with huge amounts write requests. For now, it's only possible from an applier, since it's the only user of async commit at the moment. This happens when replica is syncing with master and reads new transactions at a pace higher than it can write them to WAL (see docbot request for detailed explanation). To ameliorate such behavior, we need to introduce some limit on not-yet-finished WAL write requests. This is what this commit is trying to do. Two new counters are added to wal writer: queue_size (in bytes) and queue_len (in wal messages) together with configuration settings: `wal_queue_max_size` and `wal_queue_max_len`. Size and length are increased on every new submitted request, and are decreased once the tx receives a confirmation that a specific request was written. Actually, the limits are added to an abstract journal, but take effect only for wal writer. Once size or length reach their maximum values, applier is blocked until some of the write requests are finished. The size limit isn't strict, i.e. if there's at least one free byte, the whole write request fits and no blocking is involved. The feature is ready for `box.commit{is_async=true}`. Once it's implemented, it should check whether the queue is full and let the user decide what to do next. Either wait or roll the tx back. Part of #5536 @TarantoolBot document Title: new configuration options: 'wal_queue_max_size', 'wal_queue_max_len' `wal_queue_max_size` and `wal_queue_max_len` put a limit on the amount of concurrent write requests submitted to WAL. `wal_queue_max_size` is measured in number of bytes to be written (0 means unlimited), and `wal_queue_max_len` is measured in number of transactions, 0 meaning unlimited. These options only affect replica behaviour at the moment, and default to 0. They limit the pace at which replica reads new transactions from master. Here's when these options come in handy: Imagine such a situation: there are 2 servers, a master and a replica, and the replica is down for some period of time. While the replica is down, the master serves requests at a reasonable pace, possibly close to its WAL throughput limit. Once the replica reconnects, it has to receive all the data master has piled up. Now there's no limit in speed at which master sends the data to replica, and there's no limit at which replica's applier submits corresponding write requests to WAL. This leads to a situation when replica's WAL is never in time to serve the requests and the amount of pending requests is constantly growing. There's no limit for memory WAL write requests take, and this clogging of WAL write queue may even lead to replica using up all the available memory. Now, when either `wal_queue_max_size` or `wal_queue_max_len` or both are set, appliers will stop reading new transactions once the limit is reached. This will let WAL process all the requests that have piled up and free all the excess memory. --- https://github.com/tarantool/tarantool/issues/5536 https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom Changes in v3: - Extract queue internals into new struct journal_queue. - Move is_ready flag to queue from queue entry. - Minor refactoring. Changes in v2: - Move queue logic to journal. src/box/applier.cc | 9 ++ src/box/box.cc | 52 ++++++++ src/box/box.h | 2 + src/box/journal.c | 71 ++++++++++ src/box/journal.h | 136 +++++++++++++++++++- src/box/lua/cfg.cc | 18 +++ src/box/lua/load_cfg.lua | 6 + src/box/wal.c | 14 ++ src/box/wal.h | 14 ++ test/app-tap/init_script.result | 2 + test/box-tap/cfg.test.lua | 4 +- test/box/admin.result | 4 + test/box/cfg.result | 8 ++ test/replication/gh-5536-wal-limit.result | 132 +++++++++++++++++++ test/replication/gh-5536-wal-limit.test.lua | 58 +++++++++ test/replication/suite.cfg | 1 + test/replication/suite.ini | 2 +- 17 files changed, 525 insertions(+), 8 deletions(-) create mode 100644 test/replication/gh-5536-wal-limit.result create mode 100644 test/replication/gh-5536-wal-limit.test.lua diff --git a/src/box/applier.cc b/src/box/applier.cc index 553db76fc..27ddd0f29 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) goto success; } + /* + * Do not spam WAL with excess write requests, let it process what's + * piled up first. + * This is done before opening the transaction to avoid problems with + * yielding inside it. + */ + if (journal_queue_is_full() || journal_queue_has_waiters()) + journal_wait_queue(); + /** * Explicitly begin the transaction so that we can * control fiber->gc life cycle and, in case of apply diff --git a/src/box/box.cc b/src/box/box.cc index 26cbe8aab..9a3b092d0 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -754,6 +754,34 @@ box_check_wal_mode(const char *mode_name) return (enum wal_mode) mode; } +static int64_t +box_check_wal_queue_max_len(void) +{ + int64_t len = cfg_geti64("wal_queue_max_len"); + if (len < 0) { + diag_set(ClientError, ER_CFG, "wal_queue_max_len", + "wal_queue_max_len must be >= 0"); + } + /* Unlimited. */ + if (len == 0) + len = INT64_MAX; + return len; +} + +static int64_t +box_check_wal_queue_max_size(void) +{ + int64_t size = cfg_geti64("wal_queue_max_size"); + if (size < 0) { + diag_set(ClientError, ER_CFG, "wal_queue_max_size", + "wal_queue_max_size must be >= 0"); + } + /* Unlimited. */ + if (size == 0) + size = INT64_MAX; + return size; +} + static void box_check_readahead(int readahead) { @@ -875,6 +903,10 @@ box_check_config(void) box_check_checkpoint_count(cfg_geti("checkpoint_count")); box_check_wal_max_size(cfg_geti64("wal_max_size")); box_check_wal_mode(cfg_gets("wal_mode")); + if (box_check_wal_queue_max_size() < 0) + diag_raise(); + if (box_check_wal_queue_max_len() < 0) + diag_raise(); if (box_check_memory_quota("memtx_memory") < 0) diag_raise(); box_check_memtx_min_tuple_size(cfg_geti64("memtx_min_tuple_size")); @@ -1411,6 +1443,26 @@ box_set_checkpoint_wal_threshold(void) wal_set_checkpoint_threshold(threshold); } +int +box_set_wal_queue_max_size(void) +{ + int64_t size = box_check_wal_queue_max_size(); + if (size < 0) + return -1; + wal_set_queue_max_size(size); + return 0; +} + +int +box_set_wal_queue_max_len(void) +{ + int64_t len = box_check_wal_queue_max_len(); + if (len < 0) + return -1; + wal_set_queue_max_len(len); + return 0; +} + void box_set_vinyl_memory(void) { diff --git a/src/box/box.h b/src/box/box.h index b68047a95..4f5b4b617 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -239,6 +239,8 @@ void box_set_readahead(void); void box_set_checkpoint_count(void); void box_set_checkpoint_interval(void); void box_set_checkpoint_wal_threshold(void); +int box_set_wal_queue_max_size(void); +int box_set_wal_queue_max_len(void); void box_set_memtx_memory(void); void box_set_memtx_max_tuple_size(void); void box_set_vinyl_memory(void); diff --git a/src/box/journal.c b/src/box/journal.c index cb320b557..4c31a3dfe 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -34,6 +34,16 @@ struct journal *current_journal = NULL; +struct journal_queue journal_queue = { + .max_size = INT64_MAX, + .size = 0, + .max_len = INT64_MAX, + .len = 0, + .waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters), + .is_awake = false, + .is_ready = false, +}; + struct journal_entry * journal_entry_new(size_t n_rows, struct region *region, journal_write_async_f write_async_cb, @@ -55,3 +65,64 @@ journal_entry_new(size_t n_rows, struct region *region, complete_data); return entry; } + +struct journal_queue_entry { + /** The fiber waiting for queue space to free. */ + struct fiber *fiber; + /** A link in all waiting fibers list. */ + struct rlist in_queue; +}; + +/** + * Wake up the first waiter in the journal queue. + */ +static inline void +journal_queue_wakeup_first(void) +{ + struct journal_queue_entry *e; + if (rlist_empty(&journal_queue.waiters)) + goto out; + /* + * When the queue isn't forcefully emptied, no need to wake everyone + * else up until there's some free space. + */ + if (!journal_queue.is_ready && journal_queue_is_full()) + goto out; + e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e), + in_queue); + fiber_wakeup(e->fiber); + return; +out: + journal_queue.is_awake = false; + journal_queue.is_ready = false; +} + +void +journal_queue_wakeup(bool force_ready) +{ + assert(!rlist_empty(&journal_queue.waiters)); + if (journal_queue.is_awake) + return; + journal_queue.is_awake = true; + journal_queue.is_ready = force_ready; + journal_queue_wakeup_first(); +} + +void +journal_wait_queue(void) +{ + struct journal_queue_entry entry = { + .fiber = fiber(), + }; + rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue); + /* + * Will be waken up by either queue emptying or a synchronous write. + */ + while (journal_queue_is_full() && !journal_queue.is_ready) + fiber_yield(); + + assert(&entry.in_queue == rlist_first(&journal_queue.waiters)); + rlist_del(&entry.in_queue); + + journal_queue_wakeup_first(); +} diff --git a/src/box/journal.h b/src/box/journal.h index 5d8d5a726..8fec5b27e 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -109,6 +109,36 @@ journal_entry_new(size_t n_rows, struct region *region, journal_write_async_f write_async_cb, void *complete_data); +struct journal_queue { + /** Maximal size of entries enqueued in journal (in bytes). */ + int64_t max_size; + /** Current approximate size of journal queue. */ + int64_t size; + /** Maximal allowed length of journal queue, in entries. */ + int64_t max_len; + /** Current journal queue length. */ + int64_t len; + /** + * The fibers waiting for some space to free in journal queue. + * Once some space is freed they will be waken up in the same order they + * entered the queue. + */ + struct rlist waiters; + /** + * Whether the queue is being woken or not. Used to avoid multiple + * concurrent wake-ups. + */ + bool is_awake; + /** + * A flag used to tell the waiting fibers they may proceed even if the + * queue is full, i.e. force them to submit a write request. + */ + bool is_ready; +}; + +/** A single queue for all journal instances. */ +extern struct journal_queue journal_queue; + /** * An API for an abstract journal for all transactions of this * instance, as well as for multiple instances in case of @@ -124,6 +154,82 @@ struct journal { struct journal_entry *entry); }; +/** + * Depending on the step of recovery and instance configuration + * points at a concrete implementation of the journal. + */ +extern struct journal *current_journal; + +/** + * Wake the journal queue up. + * @param force_ready whether waiters should proceed even if the queue is still + * full. + */ +void +journal_queue_wakeup(bool force_ready); + +/** + * Check whether any of the queue size limits is reached. + * If the queue is full, we must wait for some of the entries to be written + * before proceeding with a new asynchronous write request. + */ +static inline bool +journal_queue_is_full(void) +{ + return journal_queue.size > journal_queue.max_size || + journal_queue.len > journal_queue.max_len; +} + +/** + * Check whether anyone is waiting for the journal queue to empty. If there are + * other waiters we must go after them to preserve write order. + */ +static inline bool +journal_queue_has_waiters(void) +{ + return !rlist_empty(&journal_queue.waiters); +} + +/** Yield until there's some space in the journal queue. */ +void +journal_wait_queue(void); + +/** Set maximal journal queue size in bytes. */ +static inline void +journal_queue_set_max_size(int64_t size) +{ + journal_queue.max_size = size; + if (journal_queue_has_waiters() && !journal_queue_is_full()) + journal_queue_wakeup(false); +} + +/** Set maximal journal queue length, in entries. */ +static inline void +journal_queue_set_max_len(int64_t len) +{ + journal_queue.max_len = len; + if (journal_queue_has_waiters() && !journal_queue_is_full()) + journal_queue_wakeup(false); +} + +/** Increase queue size on a new write request. */ +static inline void +journal_queue_on_append(struct journal_entry *entry) +{ + journal_queue.len++; + journal_queue.size += entry->approx_len; +} + +/** Decrease queue size once write request is complete. */ +static inline void +journal_queue_on_complete(struct journal_entry *entry) +{ + journal_queue.len--; + journal_queue.size -= entry->approx_len; + assert(journal_queue.len >= 0); + assert(journal_queue.size >= 0); +} + /** * Complete asynchronous write. */ @@ -131,15 +237,14 @@ static inline void journal_async_complete(struct journal_entry *entry) { assert(entry->write_async_cb != NULL); + + journal_queue_on_complete(entry); + if (journal_queue_has_waiters() && !journal_queue_is_full()) + journal_queue_wakeup(false); + entry->write_async_cb(entry); } -/** - * Depending on the step of recovery and instance configuration - * points at a concrete implementation of the journal. - */ -extern struct journal *current_journal; - /** * Write a single entry to the journal in synchronous way. * @@ -148,6 +253,18 @@ extern struct journal *current_journal; static inline int journal_write(struct journal_entry *entry) { + if (journal_queue_has_waiters()) { + /* + * It's a synchronous write, so it's fine to wait a bit more for + * everyone else to be written. They'll wake us up back + * afterwards. + */ + journal_queue_wakeup(true); + journal_wait_queue(); + } + + journal_queue_on_append(entry); + return current_journal->write(current_journal, entry); } @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry) static inline int journal_write_async(struct journal_entry *entry) { + /* + * It's the job of the caller to check whether the queue is full prior + * to submitting the request. + */ + assert(!journal_queue_is_full() || journal_queue.is_ready); + journal_queue_on_append(entry); + return current_journal->write_async(current_journal, entry); } diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc index 2d3ccbf0e..35f410710 100644 --- a/src/box/lua/cfg.cc +++ b/src/box/lua/cfg.cc @@ -164,6 +164,22 @@ lbox_cfg_set_checkpoint_wal_threshold(struct lua_State *L) return 0; } +static int +lbox_cfg_set_wal_queue_max_size(struct lua_State *L) +{ + if (box_set_wal_queue_max_size() != 0) + luaT_error(L); + return 0; +} + +static int +lbox_cfg_set_wal_queue_max_len(struct lua_State *L) +{ + if (box_set_wal_queue_max_len() != 0) + luaT_error(L); + return 0; +} + static int lbox_cfg_set_read_only(struct lua_State *L) { @@ -399,6 +415,8 @@ box_lua_cfg_init(struct lua_State *L) {"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count}, {"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval}, {"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold}, + {"cfg_set_wal_queue_max_size", lbox_cfg_set_wal_queue_max_size}, + {"cfg_set_wal_queue_max_len", lbox_cfg_set_wal_queue_max_len}, {"cfg_set_read_only", lbox_cfg_set_read_only}, {"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory}, {"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size}, diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 574c8bef4..c11a9e103 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -71,6 +71,8 @@ local default_cfg = { wal_mode = "write", wal_max_size = 256 * 1024 * 1024, wal_dir_rescan_delay= 2, + wal_queue_max_size = 0, + wal_queue_max_len = 0, force_recovery = false, replication = nil, instance_uuid = nil, @@ -163,6 +165,8 @@ local template_cfg = { coredump = 'boolean', checkpoint_interval = 'number', checkpoint_wal_threshold = 'number', + wal_queue_max_size = 'number', + wal_queue_max_len = 'number', checkpoint_count = 'number', read_only = 'boolean', hot_standby = 'boolean', @@ -277,6 +281,8 @@ local dynamic_cfg = { checkpoint_count = private.cfg_set_checkpoint_count, checkpoint_interval = private.cfg_set_checkpoint_interval, checkpoint_wal_threshold = private.cfg_set_checkpoint_wal_threshold, + wal_queue_max_size = private.cfg_set_wal_queue_max_size, + wal_queue_max_len = private.cfg_set_wal_queue_max_len, worker_pool_threads = private.cfg_set_worker_pool_threads, feedback_enabled = ifdef_feedback_set_params, feedback_crashinfo = ifdef_feedback_set_params, diff --git a/src/box/wal.c b/src/box/wal.c index 937d47ba9..4a0381cf4 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -765,6 +765,20 @@ wal_set_checkpoint_threshold(int64_t threshold) fiber_set_cancellable(cancellable); } +void +wal_set_queue_max_size(int64_t size) +{ + assert(&wal_writer_singleton.base == current_journal); + journal_queue_set_max_size(size); +} + +void +wal_set_queue_max_len(int64_t len) +{ + assert(&wal_writer_singleton.base == current_journal); + journal_queue_set_max_len(len); +} + struct wal_gc_msg { struct cbus_call_msg base; diff --git a/src/box/wal.h b/src/box/wal.h index ca43dc6eb..1db32f66f 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -237,6 +237,20 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint); void wal_set_checkpoint_threshold(int64_t threshold); +/** + * Set the pending write limit in bytes. Once the limit is reached, new + * writes are blocked until some previous writes succeed. + */ +void +wal_set_queue_max_size(int64_t size); + +/** + * Set the pending write limit in journal entries. Once the limit is reached, + * new writes are blocked until some previous writes succeeed. + */ +void +wal_set_queue_max_len(int64_t len); + /** * Remove WAL files that are not needed by consumers reading * rows at @vclock or newer. diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result index 16c5b01d2..7a224e50e 100644 --- a/test/app-tap/init_script.result +++ b/test/app-tap/init_script.result @@ -56,6 +56,8 @@ wal_dir:. wal_dir_rescan_delay:2 wal_max_size:268435456 wal_mode:write +wal_queue_max_len:0 +wal_queue_max_size:0 worker_pool_threads:4 -- -- Test insert from detached fiber diff --git a/test/box-tap/cfg.test.lua b/test/box-tap/cfg.test.lua index a577f023d..3276ddf64 100755 --- a/test/box-tap/cfg.test.lua +++ b/test/box-tap/cfg.test.lua @@ -6,7 +6,7 @@ local socket = require('socket') local fio = require('fio') local uuid = require('uuid') local msgpack = require('msgpack') -test:plan(108) +test:plan(110) -------------------------------------------------------------------------------- -- Invalid values @@ -49,6 +49,8 @@ invalid('vinyl_run_count_per_level', 0) invalid('vinyl_run_size_ratio', 1) invalid('vinyl_bloom_fpr', 0) invalid('vinyl_bloom_fpr', 1.1) +invalid('wal_queue_max_size', -1) +invalid('wal_queue_max_len', -1) local function invalid_combinations(name, val) local status, result = pcall(box.cfg, val) diff --git a/test/box/admin.result b/test/box/admin.result index 05debe673..c818f4f9f 100644 --- a/test/box/admin.result +++ b/test/box/admin.result @@ -133,6 +133,10 @@ cfg_filter(box.cfg) - 268435456 - - wal_mode - write + - - wal_queue_max_len + - 0 + - - wal_queue_max_size + - 0 - - worker_pool_threads - 4 ... diff --git a/test/box/cfg.result b/test/box/cfg.result index 22a720c2c..19f322e7d 100644 --- a/test/box/cfg.result +++ b/test/box/cfg.result @@ -121,6 +121,10 @@ cfg_filter(box.cfg) | - 268435456 | - - wal_mode | - write + | - - wal_queue_max_len + | - 0 + | - - wal_queue_max_size + | - 0 | - - worker_pool_threads | - 4 | ... @@ -236,6 +240,10 @@ cfg_filter(box.cfg) | - 268435456 | - - wal_mode | - write + | - - wal_queue_max_len + | - 0 + | - - wal_queue_max_size + | - 0 | - - worker_pool_threads | - 4 | ... diff --git a/test/replication/gh-5536-wal-limit.result b/test/replication/gh-5536-wal-limit.result new file mode 100644 index 000000000..f7799baa8 --- /dev/null +++ b/test/replication/gh-5536-wal-limit.result @@ -0,0 +1,132 @@ +-- test-run result file version 2 +test_run = require('test_run').new() + | --- + | ... +fiber = require('fiber') + | --- + | ... + +-- +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so +-- that appliers stop reading new transactions from master once the queue is +-- full. +-- +box.schema.user.grant('guest', 'replication') + | --- + | ... +_ = box.schema.space.create('test') + | --- + | ... +_ = box.space.test:create_index('pk') + | --- + | ... + +replication_timeout = box.cfg.replication_timeout + | --- + | ... +box.cfg{replication_timeout=1000} + | --- + | ... + +test_run:cmd('create server replica with rpl_master=default,\ + script="replication/replica.lua"') + | --- + | - true + | ... +test_run:cmd('start server replica with wait=True, wait_load=True') + | --- + | - true + | ... + +test_run:switch('replica') + | --- + | - true + | ... +-- Huge replication timeout to not cause reconnects while applier is blocked. +-- Tiny queue size (in bytes) to allow exactly one queue entry at a time. +box.cfg{wal_queue_max_size=1, replication_timeout=1000} + | --- + | ... +write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") + | --- + | ... +-- Block WAL writes so that we may test queue overflow. +box.error.injection.set("ERRINJ_WAL_DELAY", true) + | --- + | - ok + | ... + +test_run:switch('default') + | --- + | - true + | ... + +for i = 1,10 do box.space.test:insert{i} end + | --- + | ... + +test_run:switch('replica') + | --- + | - true + | ... +-- Wait for replication. Cannot rely on lsn bump here. It won't happen while +-- WAL is blocked. +test_run:wait_cond(function()\ + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\ +end) + | --- + | - true + | ... +require('fiber').sleep(0.5) + | --- + | ... +-- Only one entry fits when the limit is small. +assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1) + | --- + | - true + | ... +box.error.injection.set("ERRINJ_WAL_DELAY", false) + | --- + | - ok + | ... + +-- Once the block is removed everything is written. +test_run:wait_cond(function()\ + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\ +end) + | --- + | - true + | ... +assert(box.space.test:count() == 10) + | --- + | - true + | ... +assert(box.info.replication[1].upstream.status == 'follow') + | --- + | - true + | ... + +test_run:switch('default') + | --- + | - true + | ... + +-- Cleanup. +box.cfg{replication_timeout=replication_timeout} + | --- + | ... +test_run:cmd('stop server replica') + | --- + | - true + | ... +test_run:cmd('delete server replica') + | --- + | - true + | ... +box.space.test:drop() + | --- + | ... +box.schema.user.revoke('guest', 'replication') + | --- + | ... + diff --git a/test/replication/gh-5536-wal-limit.test.lua b/test/replication/gh-5536-wal-limit.test.lua new file mode 100644 index 000000000..1e7d61ff7 --- /dev/null +++ b/test/replication/gh-5536-wal-limit.test.lua @@ -0,0 +1,58 @@ +test_run = require('test_run').new() +fiber = require('fiber') + +-- +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so +-- that appliers stop reading new transactions from master once the queue is +-- full. +-- +box.schema.user.grant('guest', 'replication') +_ = box.schema.space.create('test') +_ = box.space.test:create_index('pk') + +replication_timeout = box.cfg.replication_timeout +box.cfg{replication_timeout=1000} + +test_run:cmd('create server replica with rpl_master=default,\ + script="replication/replica.lua"') +test_run:cmd('start server replica with wait=True, wait_load=True') + +test_run:switch('replica') +-- Huge replication timeout to not cause reconnects while applier is blocked. +-- Tiny queue size (in bytes) to allow exactly one queue entry at a time. +box.cfg{wal_queue_max_size=1, replication_timeout=1000} +write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") +-- Block WAL writes so that we may test queue overflow. +box.error.injection.set("ERRINJ_WAL_DELAY", true) + +test_run:switch('default') + +for i = 1,10 do box.space.test:insert{i} end + +test_run:switch('replica') +-- Wait for replication. Cannot rely on lsn bump here. It won't happen while +-- WAL is blocked. +test_run:wait_cond(function()\ + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\ +end) +require('fiber').sleep(0.5) +-- Only one entry fits when the limit is small. +assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1) +box.error.injection.set("ERRINJ_WAL_DELAY", false) + +-- Once the block is removed everything is written. +test_run:wait_cond(function()\ + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\ +end) +assert(box.space.test:count() == 10) +assert(box.info.replication[1].upstream.status == 'follow') + +test_run:switch('default') + +-- Cleanup. +box.cfg{replication_timeout=replication_timeout} +test_run:cmd('stop server replica') +test_run:cmd('delete server replica') +box.space.test:drop() +box.schema.user.revoke('guest', 'replication') + diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index c80430afc..7e7004592 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -37,6 +37,7 @@ "gh-4928-tx-boundaries.test.lua": {}, "gh-5440-qsync-ro.test.lua": {}, "gh-5435-qsync-clear-synchro-queue-commit-all.test.lua": {}, + "gh-5536-wal-limit.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} diff --git a/test/replication/suite.ini b/test/replication/suite.ini index e4812df37..89abfabff 100644 --- a/test/replication/suite.ini +++ b/test/replication/suite.ini @@ -3,7 +3,7 @@ core = tarantool script = master.lua description = tarantool/box, replication disabled = consistent.test.lua -release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua +release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua config = suite.cfg lua_libs = lua/fast_replica.lua lua/rlimit.lua use_unix_sockets = True -- 2.24.3 (Apple Git-128) ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-02-24 19:35 [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes Serge Petrenko via Tarantool-patches @ 2021-02-24 19:40 ` Serge Petrenko via Tarantool-patches 2021-02-25 13:05 ` Konstantin Osipov via Tarantool-patches ` (3 subsequent siblings) 4 siblings, 0 replies; 30+ messages in thread From: Serge Petrenko via Tarantool-patches @ 2021-02-24 19:40 UTC (permalink / raw) To: gorcunov, v.shpilevoy; +Cc: tarantool-patches 24.02.2021 22:35, Serge Petrenko пишет: > diff --git a/src/box/wal.c b/src/box/wal.c > index 937d47ba9..4a0381cf4 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -765,6 +765,20 @@ wal_set_checkpoint_threshold(int64_t threshold) > fiber_set_cancellable(cancellable); > } > > +void > +wal_set_queue_max_size(int64_t size) > +{ > + assert(&wal_writer_singleton.base == current_journal); > + journal_queue_set_max_size(size); > +} > + > +void > +wal_set_queue_max_len(int64_t len) > +{ > + assert(&wal_writer_singleton.base == current_journal); > + journal_queue_set_max_len(len); > +} > + A small fix: these asserts aren't needed anymore. So, diff --git a/src/box/wal.c b/src/box/wal.c index 4a0381cf4..328ab092d 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -768,14 +768,12 @@ wal_set_checkpoint_threshold(int64_t threshold) void wal_set_queue_max_size(int64_t size) { - assert(&wal_writer_singleton.base == current_journal); journal_queue_set_max_size(size); } void wal_set_queue_max_len(int64_t len) { - assert(&wal_writer_singleton.base == current_journal); journal_queue_set_max_len(len); } -- Serge Petrenko ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-02-24 19:35 [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes Serge Petrenko via Tarantool-patches 2021-02-24 19:40 ` Serge Petrenko via Tarantool-patches @ 2021-02-25 13:05 ` Konstantin Osipov via Tarantool-patches 2021-02-26 0:57 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-01 19:15 ` Serge Petrenko via Tarantool-patches 2021-02-26 0:56 ` Vladislav Shpilevoy via Tarantool-patches ` (2 subsequent siblings) 4 siblings, 2 replies; 30+ messages in thread From: Konstantin Osipov via Tarantool-patches @ 2021-02-25 13:05 UTC (permalink / raw) To: Serge Petrenko; +Cc: tarantool-patches, v.shpilevoy * Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org> [21/02/24 22:39]: Looks like a simple counting semaphore. I don't see why it has to be specific to class journal, more like part of lib/core. https://en.cppreference.com/w/cpp/thread/counting_semaphore I'd also question the place where you decided to put this gate. The source of the issue is async requests, not WAL, which worked fine in absence of async requests. So it's async requests that should be gated, not WAL. Otherwise your overflow will just spill out someplace else. > Since the introduction of asynchronous commit, which doesn't wait for a > WAL write to succeed, it's quite easy to clog WAL with huge amounts > write requests. For now, it's only possible from an applier, since it's > the only user of async commit at the moment. > > This happens when replica is syncing with master and reads new > transactions at a pace higher than it can write them to WAL (see docbot > request for detailed explanation). > > To ameliorate such behavior, we need to introduce some limit on > not-yet-finished WAL write requests. This is what this commit is trying > to do. > Two new counters are added to wal writer: queue_size (in bytes) and > queue_len (in wal messages) together with configuration settings: > `wal_queue_max_size` and `wal_queue_max_len`. > Size and length are increased on every new submitted request, and are > decreased once the tx receives a confirmation that a specific request > was written. > Actually, the limits are added to an abstract journal, but take effect > only for wal writer. > > Once size or length reach their maximum values, applier is blocked until > some of the write requests are finished. > > The size limit isn't strict, i.e. if there's at least one free byte, the > whole write request fits and no blocking is involved. > > The feature is ready for `box.commit{is_async=true}`. Once it's > implemented, it should check whether the queue is full and let the user > decide what to do next. Either wait or roll the tx back. > > Part of #5536 > > @TarantoolBot document > Title: new configuration options: 'wal_queue_max_size', 'wal_queue_max_len' > > `wal_queue_max_size` and `wal_queue_max_len` put a limit on the amount > of concurrent write requests submitted to WAL. > `wal_queue_max_size` is measured in number of bytes to be written (0 > means unlimited), and `wal_queue_max_len` is measured in number of > transactions, 0 meaning unlimited. > These options only affect replica behaviour at the moment, and default > to 0. They limit the pace at which replica reads new transactions from > master. > > Here's when these options come in handy: > Imagine such a situation: there are 2 servers, a master and a replica, > and the replica is down for some period of time. While the replica is > down, the master serves requests at a reasonable pace, possibly close to > its WAL throughput limit. Once the replica reconnects, it has to receive > all the data master has piled up. Now there's no limit in speed at which > master sends the data to replica, and there's no limit at which > replica's applier submits corresponding write requests to WAL. This > leads to a situation when replica's WAL is never in time to serve the > requests and the amount of pending requests is constantly growing. > There's no limit for memory WAL write requests take, and this clogging > of WAL write queue may even lead to replica using up all the available > memory. > > Now, when either `wal_queue_max_size` or `wal_queue_max_len` or both are > set, appliers will stop reading new transactions once the limit is > reached. This will let WAL process all the requests that have piled up > and free all the excess memory. > --- > https://github.com/tarantool/tarantool/issues/5536 > https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom > > Changes in v3: > - Extract queue internals into new struct journal_queue. > - Move is_ready flag to queue from queue entry. > - Minor refactoring. > > Changes in v2: > - Move queue logic to journal. > > src/box/applier.cc | 9 ++ > src/box/box.cc | 52 ++++++++ > src/box/box.h | 2 + > src/box/journal.c | 71 ++++++++++ > src/box/journal.h | 136 +++++++++++++++++++- > src/box/lua/cfg.cc | 18 +++ > src/box/lua/load_cfg.lua | 6 + > src/box/wal.c | 14 ++ > src/box/wal.h | 14 ++ > test/app-tap/init_script.result | 2 + > test/box-tap/cfg.test.lua | 4 +- > test/box/admin.result | 4 + > test/box/cfg.result | 8 ++ > test/replication/gh-5536-wal-limit.result | 132 +++++++++++++++++++ > test/replication/gh-5536-wal-limit.test.lua | 58 +++++++++ > test/replication/suite.cfg | 1 + > test/replication/suite.ini | 2 +- > 17 files changed, 525 insertions(+), 8 deletions(-) > create mode 100644 test/replication/gh-5536-wal-limit.result > create mode 100644 test/replication/gh-5536-wal-limit.test.lua > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 553db76fc..27ddd0f29 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) > goto success; > } > > + /* > + * Do not spam WAL with excess write requests, let it process what's > + * piled up first. > + * This is done before opening the transaction to avoid problems with > + * yielding inside it. > + */ > + if (journal_queue_is_full() || journal_queue_has_waiters()) > + journal_wait_queue(); > + > /** > * Explicitly begin the transaction so that we can > * control fiber->gc life cycle and, in case of apply > diff --git a/src/box/box.cc b/src/box/box.cc > index 26cbe8aab..9a3b092d0 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -754,6 +754,34 @@ box_check_wal_mode(const char *mode_name) > return (enum wal_mode) mode; > } > > +static int64_t > +box_check_wal_queue_max_len(void) > +{ > + int64_t len = cfg_geti64("wal_queue_max_len"); > + if (len < 0) { > + diag_set(ClientError, ER_CFG, "wal_queue_max_len", > + "wal_queue_max_len must be >= 0"); > + } > + /* Unlimited. */ > + if (len == 0) > + len = INT64_MAX; > + return len; > +} > + > +static int64_t > +box_check_wal_queue_max_size(void) > +{ > + int64_t size = cfg_geti64("wal_queue_max_size"); > + if (size < 0) { > + diag_set(ClientError, ER_CFG, "wal_queue_max_size", > + "wal_queue_max_size must be >= 0"); > + } > + /* Unlimited. */ > + if (size == 0) > + size = INT64_MAX; > + return size; > +} > + > static void > box_check_readahead(int readahead) > { > @@ -875,6 +903,10 @@ box_check_config(void) > box_check_checkpoint_count(cfg_geti("checkpoint_count")); > box_check_wal_max_size(cfg_geti64("wal_max_size")); > box_check_wal_mode(cfg_gets("wal_mode")); > + if (box_check_wal_queue_max_size() < 0) > + diag_raise(); > + if (box_check_wal_queue_max_len() < 0) > + diag_raise(); > if (box_check_memory_quota("memtx_memory") < 0) > diag_raise(); > box_check_memtx_min_tuple_size(cfg_geti64("memtx_min_tuple_size")); > @@ -1411,6 +1443,26 @@ box_set_checkpoint_wal_threshold(void) > wal_set_checkpoint_threshold(threshold); > } > > +int > +box_set_wal_queue_max_size(void) > +{ > + int64_t size = box_check_wal_queue_max_size(); > + if (size < 0) > + return -1; > + wal_set_queue_max_size(size); > + return 0; > +} > + > +int > +box_set_wal_queue_max_len(void) > +{ > + int64_t len = box_check_wal_queue_max_len(); > + if (len < 0) > + return -1; > + wal_set_queue_max_len(len); > + return 0; > +} > + > void > box_set_vinyl_memory(void) > { > diff --git a/src/box/box.h b/src/box/box.h > index b68047a95..4f5b4b617 100644 > --- a/src/box/box.h > +++ b/src/box/box.h > @@ -239,6 +239,8 @@ void box_set_readahead(void); > void box_set_checkpoint_count(void); > void box_set_checkpoint_interval(void); > void box_set_checkpoint_wal_threshold(void); > +int box_set_wal_queue_max_size(void); > +int box_set_wal_queue_max_len(void); > void box_set_memtx_memory(void); > void box_set_memtx_max_tuple_size(void); > void box_set_vinyl_memory(void); > diff --git a/src/box/journal.c b/src/box/journal.c > index cb320b557..4c31a3dfe 100644 > --- a/src/box/journal.c > +++ b/src/box/journal.c > @@ -34,6 +34,16 @@ > > struct journal *current_journal = NULL; > > +struct journal_queue journal_queue = { > + .max_size = INT64_MAX, > + .size = 0, > + .max_len = INT64_MAX, > + .len = 0, > + .waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters), > + .is_awake = false, > + .is_ready = false, > +}; > + > struct journal_entry * > journal_entry_new(size_t n_rows, struct region *region, > journal_write_async_f write_async_cb, > @@ -55,3 +65,64 @@ journal_entry_new(size_t n_rows, struct region *region, > complete_data); > return entry; > } > + > +struct journal_queue_entry { > + /** The fiber waiting for queue space to free. */ > + struct fiber *fiber; > + /** A link in all waiting fibers list. */ > + struct rlist in_queue; > +}; > + > +/** > + * Wake up the first waiter in the journal queue. > + */ > +static inline void > +journal_queue_wakeup_first(void) > +{ > + struct journal_queue_entry *e; > + if (rlist_empty(&journal_queue.waiters)) > + goto out; > + /* > + * When the queue isn't forcefully emptied, no need to wake everyone > + * else up until there's some free space. > + */ > + if (!journal_queue.is_ready && journal_queue_is_full()) > + goto out; > + e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e), > + in_queue); > + fiber_wakeup(e->fiber); > + return; > +out: > + journal_queue.is_awake = false; > + journal_queue.is_ready = false; > +} > + > +void > +journal_queue_wakeup(bool force_ready) > +{ > + assert(!rlist_empty(&journal_queue.waiters)); > + if (journal_queue.is_awake) > + return; > + journal_queue.is_awake = true; > + journal_queue.is_ready = force_ready; > + journal_queue_wakeup_first(); > +} > + > +void > +journal_wait_queue(void) > +{ > + struct journal_queue_entry entry = { > + .fiber = fiber(), > + }; > + rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue); > + /* > + * Will be waken up by either queue emptying or a synchronous write. > + */ > + while (journal_queue_is_full() && !journal_queue.is_ready) > + fiber_yield(); > + > + assert(&entry.in_queue == rlist_first(&journal_queue.waiters)); > + rlist_del(&entry.in_queue); > + > + journal_queue_wakeup_first(); > +} > diff --git a/src/box/journal.h b/src/box/journal.h > index 5d8d5a726..8fec5b27e 100644 > --- a/src/box/journal.h > +++ b/src/box/journal.h > @@ -109,6 +109,36 @@ journal_entry_new(size_t n_rows, struct region *region, > journal_write_async_f write_async_cb, > void *complete_data); > > +struct journal_queue { > + /** Maximal size of entries enqueued in journal (in bytes). */ > + int64_t max_size; > + /** Current approximate size of journal queue. */ > + int64_t size; > + /** Maximal allowed length of journal queue, in entries. */ > + int64_t max_len; > + /** Current journal queue length. */ > + int64_t len; > + /** > + * The fibers waiting for some space to free in journal queue. > + * Once some space is freed they will be waken up in the same order they > + * entered the queue. > + */ > + struct rlist waiters; > + /** > + * Whether the queue is being woken or not. Used to avoid multiple > + * concurrent wake-ups. > + */ > + bool is_awake; > + /** > + * A flag used to tell the waiting fibers they may proceed even if the > + * queue is full, i.e. force them to submit a write request. > + */ > + bool is_ready; > +}; > + > +/** A single queue for all journal instances. */ > +extern struct journal_queue journal_queue; > + > /** > * An API for an abstract journal for all transactions of this > * instance, as well as for multiple instances in case of > @@ -124,6 +154,82 @@ struct journal { > struct journal_entry *entry); > }; > > +/** > + * Depending on the step of recovery and instance configuration > + * points at a concrete implementation of the journal. > + */ > +extern struct journal *current_journal; > + > +/** > + * Wake the journal queue up. > + * @param force_ready whether waiters should proceed even if the queue is still > + * full. > + */ > +void > +journal_queue_wakeup(bool force_ready); > + > +/** > + * Check whether any of the queue size limits is reached. > + * If the queue is full, we must wait for some of the entries to be written > + * before proceeding with a new asynchronous write request. > + */ > +static inline bool > +journal_queue_is_full(void) > +{ > + return journal_queue.size > journal_queue.max_size || > + journal_queue.len > journal_queue.max_len; > +} > + > +/** > + * Check whether anyone is waiting for the journal queue to empty. If there are > + * other waiters we must go after them to preserve write order. > + */ > +static inline bool > +journal_queue_has_waiters(void) > +{ > + return !rlist_empty(&journal_queue.waiters); > +} > + > +/** Yield until there's some space in the journal queue. */ > +void > +journal_wait_queue(void); > + > +/** Set maximal journal queue size in bytes. */ > +static inline void > +journal_queue_set_max_size(int64_t size) > +{ > + journal_queue.max_size = size; > + if (journal_queue_has_waiters() && !journal_queue_is_full()) > + journal_queue_wakeup(false); > +} > + > +/** Set maximal journal queue length, in entries. */ > +static inline void > +journal_queue_set_max_len(int64_t len) > +{ > + journal_queue.max_len = len; > + if (journal_queue_has_waiters() && !journal_queue_is_full()) > + journal_queue_wakeup(false); > +} > + > +/** Increase queue size on a new write request. */ > +static inline void > +journal_queue_on_append(struct journal_entry *entry) > +{ > + journal_queue.len++; > + journal_queue.size += entry->approx_len; > +} > + > +/** Decrease queue size once write request is complete. */ > +static inline void > +journal_queue_on_complete(struct journal_entry *entry) > +{ > + journal_queue.len--; > + journal_queue.size -= entry->approx_len; > + assert(journal_queue.len >= 0); > + assert(journal_queue.size >= 0); > +} > + > /** > * Complete asynchronous write. > */ > @@ -131,15 +237,14 @@ static inline void > journal_async_complete(struct journal_entry *entry) > { > assert(entry->write_async_cb != NULL); > + > + journal_queue_on_complete(entry); > + if (journal_queue_has_waiters() && !journal_queue_is_full()) > + journal_queue_wakeup(false); > + > entry->write_async_cb(entry); > } > > -/** > - * Depending on the step of recovery and instance configuration > - * points at a concrete implementation of the journal. > - */ > -extern struct journal *current_journal; > - > /** > * Write a single entry to the journal in synchronous way. > * > @@ -148,6 +253,18 @@ extern struct journal *current_journal; > static inline int > journal_write(struct journal_entry *entry) > { > + if (journal_queue_has_waiters()) { > + /* > + * It's a synchronous write, so it's fine to wait a bit more for > + * everyone else to be written. They'll wake us up back > + * afterwards. > + */ > + journal_queue_wakeup(true); > + journal_wait_queue(); > + } > + > + journal_queue_on_append(entry); > + > return current_journal->write(current_journal, entry); > } > > @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry) > static inline int > journal_write_async(struct journal_entry *entry) > { > + /* > + * It's the job of the caller to check whether the queue is full prior > + * to submitting the request. > + */ > + assert(!journal_queue_is_full() || journal_queue.is_ready); > + journal_queue_on_append(entry); > + > return current_journal->write_async(current_journal, entry); > } > > diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc > index 2d3ccbf0e..35f410710 100644 > --- a/src/box/lua/cfg.cc > +++ b/src/box/lua/cfg.cc > @@ -164,6 +164,22 @@ lbox_cfg_set_checkpoint_wal_threshold(struct lua_State *L) > return 0; > } > > +static int > +lbox_cfg_set_wal_queue_max_size(struct lua_State *L) > +{ > + if (box_set_wal_queue_max_size() != 0) > + luaT_error(L); > + return 0; > +} > + > +static int > +lbox_cfg_set_wal_queue_max_len(struct lua_State *L) > +{ > + if (box_set_wal_queue_max_len() != 0) > + luaT_error(L); > + return 0; > +} > + > static int > lbox_cfg_set_read_only(struct lua_State *L) > { > @@ -399,6 +415,8 @@ box_lua_cfg_init(struct lua_State *L) > {"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count}, > {"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval}, > {"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold}, > + {"cfg_set_wal_queue_max_size", lbox_cfg_set_wal_queue_max_size}, > + {"cfg_set_wal_queue_max_len", lbox_cfg_set_wal_queue_max_len}, > {"cfg_set_read_only", lbox_cfg_set_read_only}, > {"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory}, > {"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size}, > diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua > index 574c8bef4..c11a9e103 100644 > --- a/src/box/lua/load_cfg.lua > +++ b/src/box/lua/load_cfg.lua > @@ -71,6 +71,8 @@ local default_cfg = { > wal_mode = "write", > wal_max_size = 256 * 1024 * 1024, > wal_dir_rescan_delay= 2, > + wal_queue_max_size = 0, > + wal_queue_max_len = 0, > force_recovery = false, > replication = nil, > instance_uuid = nil, > @@ -163,6 +165,8 @@ local template_cfg = { > coredump = 'boolean', > checkpoint_interval = 'number', > checkpoint_wal_threshold = 'number', > + wal_queue_max_size = 'number', > + wal_queue_max_len = 'number', > checkpoint_count = 'number', > read_only = 'boolean', > hot_standby = 'boolean', > @@ -277,6 +281,8 @@ local dynamic_cfg = { > checkpoint_count = private.cfg_set_checkpoint_count, > checkpoint_interval = private.cfg_set_checkpoint_interval, > checkpoint_wal_threshold = private.cfg_set_checkpoint_wal_threshold, > + wal_queue_max_size = private.cfg_set_wal_queue_max_size, > + wal_queue_max_len = private.cfg_set_wal_queue_max_len, > worker_pool_threads = private.cfg_set_worker_pool_threads, > feedback_enabled = ifdef_feedback_set_params, > feedback_crashinfo = ifdef_feedback_set_params, > diff --git a/src/box/wal.c b/src/box/wal.c > index 937d47ba9..4a0381cf4 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -765,6 +765,20 @@ wal_set_checkpoint_threshold(int64_t threshold) > fiber_set_cancellable(cancellable); > } > > +void > +wal_set_queue_max_size(int64_t size) > +{ > + assert(&wal_writer_singleton.base == current_journal); > + journal_queue_set_max_size(size); > +} > + > +void > +wal_set_queue_max_len(int64_t len) > +{ > + assert(&wal_writer_singleton.base == current_journal); > + journal_queue_set_max_len(len); > +} > + > struct wal_gc_msg > { > struct cbus_call_msg base; > diff --git a/src/box/wal.h b/src/box/wal.h > index ca43dc6eb..1db32f66f 100644 > --- a/src/box/wal.h > +++ b/src/box/wal.h > @@ -237,6 +237,20 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint); > void > wal_set_checkpoint_threshold(int64_t threshold); > > +/** > + * Set the pending write limit in bytes. Once the limit is reached, new > + * writes are blocked until some previous writes succeed. > + */ > +void > +wal_set_queue_max_size(int64_t size); > + > +/** > + * Set the pending write limit in journal entries. Once the limit is reached, > + * new writes are blocked until some previous writes succeeed. > + */ > +void > +wal_set_queue_max_len(int64_t len); > + > /** > * Remove WAL files that are not needed by consumers reading > * rows at @vclock or newer. > diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result > index 16c5b01d2..7a224e50e 100644 > --- a/test/app-tap/init_script.result > +++ b/test/app-tap/init_script.result > @@ -56,6 +56,8 @@ wal_dir:. > wal_dir_rescan_delay:2 > wal_max_size:268435456 > wal_mode:write > +wal_queue_max_len:0 > +wal_queue_max_size:0 > worker_pool_threads:4 > -- > -- Test insert from detached fiber > diff --git a/test/box-tap/cfg.test.lua b/test/box-tap/cfg.test.lua > index a577f023d..3276ddf64 100755 > --- a/test/box-tap/cfg.test.lua > +++ b/test/box-tap/cfg.test.lua > @@ -6,7 +6,7 @@ local socket = require('socket') > local fio = require('fio') > local uuid = require('uuid') > local msgpack = require('msgpack') > -test:plan(108) > +test:plan(110) > > -------------------------------------------------------------------------------- > -- Invalid values > @@ -49,6 +49,8 @@ invalid('vinyl_run_count_per_level', 0) > invalid('vinyl_run_size_ratio', 1) > invalid('vinyl_bloom_fpr', 0) > invalid('vinyl_bloom_fpr', 1.1) > +invalid('wal_queue_max_size', -1) > +invalid('wal_queue_max_len', -1) > > local function invalid_combinations(name, val) > local status, result = pcall(box.cfg, val) > diff --git a/test/box/admin.result b/test/box/admin.result > index 05debe673..c818f4f9f 100644 > --- a/test/box/admin.result > +++ b/test/box/admin.result > @@ -133,6 +133,10 @@ cfg_filter(box.cfg) > - 268435456 > - - wal_mode > - write > + - - wal_queue_max_len > + - 0 > + - - wal_queue_max_size > + - 0 > - - worker_pool_threads > - 4 > ... > diff --git a/test/box/cfg.result b/test/box/cfg.result > index 22a720c2c..19f322e7d 100644 > --- a/test/box/cfg.result > +++ b/test/box/cfg.result > @@ -121,6 +121,10 @@ cfg_filter(box.cfg) > | - 268435456 > | - - wal_mode > | - write > + | - - wal_queue_max_len > + | - 0 > + | - - wal_queue_max_size > + | - 0 > | - - worker_pool_threads > | - 4 > | ... > @@ -236,6 +240,10 @@ cfg_filter(box.cfg) > | - 268435456 > | - - wal_mode > | - write > + | - - wal_queue_max_len > + | - 0 > + | - - wal_queue_max_size > + | - 0 > | - - worker_pool_threads > | - 4 > | ... > diff --git a/test/replication/gh-5536-wal-limit.result b/test/replication/gh-5536-wal-limit.result > new file mode 100644 > index 000000000..f7799baa8 > --- /dev/null > +++ b/test/replication/gh-5536-wal-limit.result > @@ -0,0 +1,132 @@ > +-- test-run result file version 2 > +test_run = require('test_run').new() > + | --- > + | ... > +fiber = require('fiber') > + | --- > + | ... > + > +-- > +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so > +-- that appliers stop reading new transactions from master once the queue is > +-- full. > +-- > +box.schema.user.grant('guest', 'replication') > + | --- > + | ... > +_ = box.schema.space.create('test') > + | --- > + | ... > +_ = box.space.test:create_index('pk') > + | --- > + | ... > + > +replication_timeout = box.cfg.replication_timeout > + | --- > + | ... > +box.cfg{replication_timeout=1000} > + | --- > + | ... > + > +test_run:cmd('create server replica with rpl_master=default,\ > + script="replication/replica.lua"') > + | --- > + | - true > + | ... > +test_run:cmd('start server replica with wait=True, wait_load=True') > + | --- > + | - true > + | ... > + > +test_run:switch('replica') > + | --- > + | - true > + | ... > +-- Huge replication timeout to not cause reconnects while applier is blocked. > +-- Tiny queue size (in bytes) to allow exactly one queue entry at a time. > +box.cfg{wal_queue_max_size=1, replication_timeout=1000} > + | --- > + | ... > +write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > + | --- > + | ... > +-- Block WAL writes so that we may test queue overflow. > +box.error.injection.set("ERRINJ_WAL_DELAY", true) > + | --- > + | - ok > + | ... > + > +test_run:switch('default') > + | --- > + | - true > + | ... > + > +for i = 1,10 do box.space.test:insert{i} end > + | --- > + | ... > + > +test_run:switch('replica') > + | --- > + | - true > + | ... > +-- Wait for replication. Cannot rely on lsn bump here. It won't happen while > +-- WAL is blocked. > +test_run:wait_cond(function()\ > + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\ > +end) > + | --- > + | - true > + | ... > +require('fiber').sleep(0.5) > + | --- > + | ... > +-- Only one entry fits when the limit is small. > +assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1) > + | --- > + | - true > + | ... > +box.error.injection.set("ERRINJ_WAL_DELAY", false) > + | --- > + | - ok > + | ... > + > +-- Once the block is removed everything is written. > +test_run:wait_cond(function()\ > + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\ > +end) > + | --- > + | - true > + | ... > +assert(box.space.test:count() == 10) > + | --- > + | - true > + | ... > +assert(box.info.replication[1].upstream.status == 'follow') > + | --- > + | - true > + | ... > + > +test_run:switch('default') > + | --- > + | - true > + | ... > + > +-- Cleanup. > +box.cfg{replication_timeout=replication_timeout} > + | --- > + | ... > +test_run:cmd('stop server replica') > + | --- > + | - true > + | ... > +test_run:cmd('delete server replica') > + | --- > + | - true > + | ... > +box.space.test:drop() > + | --- > + | ... > +box.schema.user.revoke('guest', 'replication') > + | --- > + | ... > + > diff --git a/test/replication/gh-5536-wal-limit.test.lua b/test/replication/gh-5536-wal-limit.test.lua > new file mode 100644 > index 000000000..1e7d61ff7 > --- /dev/null > +++ b/test/replication/gh-5536-wal-limit.test.lua > @@ -0,0 +1,58 @@ > +test_run = require('test_run').new() > +fiber = require('fiber') > + > +-- > +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so > +-- that appliers stop reading new transactions from master once the queue is > +-- full. > +-- > +box.schema.user.grant('guest', 'replication') > +_ = box.schema.space.create('test') > +_ = box.space.test:create_index('pk') > + > +replication_timeout = box.cfg.replication_timeout > +box.cfg{replication_timeout=1000} > + > +test_run:cmd('create server replica with rpl_master=default,\ > + script="replication/replica.lua"') > +test_run:cmd('start server replica with wait=True, wait_load=True') > + > +test_run:switch('replica') > +-- Huge replication timeout to not cause reconnects while applier is blocked. > +-- Tiny queue size (in bytes) to allow exactly one queue entry at a time. > +box.cfg{wal_queue_max_size=1, replication_timeout=1000} > +write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > +-- Block WAL writes so that we may test queue overflow. > +box.error.injection.set("ERRINJ_WAL_DELAY", true) > + > +test_run:switch('default') > + > +for i = 1,10 do box.space.test:insert{i} end > + > +test_run:switch('replica') > +-- Wait for replication. Cannot rely on lsn bump here. It won't happen while > +-- WAL is blocked. > +test_run:wait_cond(function()\ > + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\ > +end) > +require('fiber').sleep(0.5) > +-- Only one entry fits when the limit is small. > +assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1) > +box.error.injection.set("ERRINJ_WAL_DELAY", false) > + > +-- Once the block is removed everything is written. > +test_run:wait_cond(function()\ > + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\ > +end) > +assert(box.space.test:count() == 10) > +assert(box.info.replication[1].upstream.status == 'follow') > + > +test_run:switch('default') > + > +-- Cleanup. > +box.cfg{replication_timeout=replication_timeout} > +test_run:cmd('stop server replica') > +test_run:cmd('delete server replica') > +box.space.test:drop() > +box.schema.user.revoke('guest', 'replication') > + > diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg > index c80430afc..7e7004592 100644 > --- a/test/replication/suite.cfg > +++ b/test/replication/suite.cfg > @@ -37,6 +37,7 @@ > "gh-4928-tx-boundaries.test.lua": {}, > "gh-5440-qsync-ro.test.lua": {}, > "gh-5435-qsync-clear-synchro-queue-commit-all.test.lua": {}, > + "gh-5536-wal-limit.test.lua": {}, > "*": { > "memtx": {"engine": "memtx"}, > "vinyl": {"engine": "vinyl"} > diff --git a/test/replication/suite.ini b/test/replication/suite.ini > index e4812df37..89abfabff 100644 > --- a/test/replication/suite.ini > +++ b/test/replication/suite.ini > @@ -3,7 +3,7 @@ core = tarantool > script = master.lua > description = tarantool/box, replication > disabled = consistent.test.lua > -release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua > +release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua > config = suite.cfg > lua_libs = lua/fast_replica.lua lua/rlimit.lua > use_unix_sockets = True > -- > 2.24.3 (Apple Git-128) -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-02-25 13:05 ` Konstantin Osipov via Tarantool-patches @ 2021-02-26 0:57 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-26 7:18 ` Konstantin Osipov via Tarantool-patches 2021-03-01 19:15 ` Serge Petrenko via Tarantool-patches 1 sibling, 1 reply; 30+ messages in thread From: Vladislav Shpilevoy via Tarantool-patches @ 2021-02-26 0:57 UTC (permalink / raw) To: Konstantin Osipov, Serge Petrenko, gorcunov, tarantool-patches > I'd also question the place where you decided to put this gate. > The source of the issue is async requests, not WAL, which worked > fine in absence of async requests. So it's async requests that > should be gated, not WAL. In the commit message it is clearly stated why it is in the journal's code, not just in the applier: The feature is ready for `box.commit{is_async=true}`. Once it's implemented, it should check whether the queue is full and let the user decide what to do next. Either wait or roll the tx back. Async transactions will be exposed to 'userspace' to be able to reduce latency for network requests ending with a transaction. They won't have to wait for WAL write to end. > Otherwise your overflow will just spill out someplace else. On the contrary. Your proposal to do it in the applier would lead to queue overflow in some other place - in userspace. When the queue is for the entire WAL, it won't overflow. ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-02-26 0:57 ` Vladislav Shpilevoy via Tarantool-patches @ 2021-02-26 7:18 ` Konstantin Osipov via Tarantool-patches 2021-02-26 20:23 ` Vladislav Shpilevoy via Tarantool-patches 0 siblings, 1 reply; 30+ messages in thread From: Konstantin Osipov via Tarantool-patches @ 2021-02-26 7:18 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [21/02/26 10:15]: > > I'd also question the place where you decided to put this gate. > > The source of the issue is async requests, not WAL, which worked > > fine in absence of async requests. So it's async requests that > > should be gated, not WAL. > > In the commit message it is clearly stated why it is in the > journal's code, not just in the applier: > > The feature is ready for `box.commit{is_async=true}`. Once it's > implemented, it should check whether the queue is full and let the user > decide what to do next. Either wait or roll the tx back. > > Async transactions will be exposed to 'userspace' to be able to reduce > latency for network requests ending with a transaction. They won't have > to wait for WAL write to end. You did not understand my comment. I tried to say that a major part of this code is generic and should reside in lib/core as a counting semaphore abstraction. Async transaction simply use this counting semaphore to throttle themselves. Then neither WAL nor any other resource used by async transactions will be overloaded. Otherwise, the system would be allowed to create async transactions, and while WAL will not overflow, some other resource (memory, transaction identifiers, whatever) may still overflow. > > Otherwise your overflow will just spill out someplace else. > > On the contrary. Your proposal to do it in the applier would lead to > queue overflow in some other place - in userspace. When the queue is > for the entire WAL, it won't overflow. I did not say it should be in the applier. -- Konstantin Osipov, Moscow, Russia https://scylladb.com ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-02-26 7:18 ` Konstantin Osipov via Tarantool-patches @ 2021-02-26 20:23 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-26 21:20 ` Konstantin Osipov via Tarantool-patches 0 siblings, 1 reply; 30+ messages in thread From: Vladislav Shpilevoy via Tarantool-patches @ 2021-02-26 20:23 UTC (permalink / raw) To: Konstantin Osipov, Serge Petrenko, gorcunov, tarantool-patches On 26.02.2021 08:18, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [21/02/26 10:15]: >>> I'd also question the place where you decided to put this gate. >>> The source of the issue is async requests, not WAL, which worked >>> fine in absence of async requests. So it's async requests that >>> should be gated, not WAL. >> >> In the commit message it is clearly stated why it is in the >> journal's code, not just in the applier: >> >> The feature is ready for `box.commit{is_async=true}`. Once it's >> implemented, it should check whether the queue is full and let the user >> decide what to do next. Either wait or roll the tx back. >> >> Async transactions will be exposed to 'userspace' to be able to reduce >> latency for network requests ending with a transaction. They won't have >> to wait for WAL write to end. > > You did not understand my comment. I tried to say that a major > part of this code is generic and should reside in lib/core as a > counting semaphore abstraction. Async transaction simply use this > counting semaphore to throttle themselves. Then neither WAL nor > any other resource used by async transactions will be overloaded. > > Otherwise, the system would be allowed to create async > transactions, and while WAL will not overflow, some other resource > (memory, transaction identifiers, whatever) may still overflow. Ok, now I understand. Yeah, I also think it is a good idea to move it libcore if nothing major will change in the patch due to any reason. Talking of the other limits - firstly we need to find if some of them really overflows. Then yes, such a semaphone-thing could be applied there too. But AFAIK, there are no other known similar bugs yet. >>> Otherwise your overflow will just spill out someplace else. >> >> On the contrary. Your proposal to do it in the applier would lead to >> queue overflow in some other place - in userspace. When the queue is >> for the entire WAL, it won't overflow. > > I did not say it should be in the applier. It was a misunderstanding. ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-02-26 20:23 ` Vladislav Shpilevoy via Tarantool-patches @ 2021-02-26 21:20 ` Konstantin Osipov via Tarantool-patches 2021-02-26 22:44 ` Vladislav Shpilevoy via Tarantool-patches 0 siblings, 1 reply; 30+ messages in thread From: Konstantin Osipov via Tarantool-patches @ 2021-02-26 21:20 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [21/02/26 23:24]: > Talking of the other limits - firstly we need to find if some of them > really overflows. Then yes, such a semaphone-thing could be applied > there too. But AFAIK, there are no other known similar bugs yet. Exploring this rather theoretically, since there are no user async transactions yet, I can imagine such transaction takes up memory and then blocks on WAL semaphore. If there is no limit on the number of async transactions, it can be a lot of memory. On the other hand this can be limited by a yet another semaphore. > >>> Otherwise your overflow will just spill out someplace else. > >> > >> On the contrary. Your proposal to do it in the applier would lead to > >> queue overflow in some other place - in userspace. When the queue is > >> for the entire WAL, it won't overflow. > > > > I did not say it should be in the applier. > > It was a misunderstanding. -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-02-26 21:20 ` Konstantin Osipov via Tarantool-patches @ 2021-02-26 22:44 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-27 13:27 ` Konstantin Osipov via Tarantool-patches 0 siblings, 1 reply; 30+ messages in thread From: Vladislav Shpilevoy via Tarantool-patches @ 2021-02-26 22:44 UTC (permalink / raw) To: Konstantin Osipov, Serge Petrenko, gorcunov, tarantool-patches On 26.02.2021 22:20, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [21/02/26 23:24]: >> Talking of the other limits - firstly we need to find if some of them >> really overflows. Then yes, such a semaphone-thing could be applied >> there too. But AFAIK, there are no other known similar bugs yet. > > Exploring this rather theoretically, since there are no user async > transactions yet, I can imagine such transaction takes up memory > and then blocks on WAL semaphore. If there is no limit on the > number of async transactions, it can be a lot of memory. On the > other hand this can be limited by a yet another semaphore. Yes, such transactions will only occupy memory. But the plan is return an error from box.commit({is_async}) if the WAL queue is full already. Because we don't want to block the async commit in anyway. Better bail out earlier and give the user a chance to call normal box.commit() if necessary. Or introduce some kind of 'try_async' to block if the queue is full, but no block if not full, I don't know. We didn't think on the API yet. There will be a limit both on number of transactions and on their total size. The limits are introduced right in this patch, and will be used by async transactions in the future. The main usage, if I understand correctly, will be for latency-sensitive applications, when you send your transactions/calls/IPROTO commands via network, send your data to WAL and return response to the client immediately. You don't have to wait even until writev() completes. Interestingly, it won't lead to loss of any guarantees, because anyway completion of writev() does not mean anything. Still can power off before it hits the disk, and still the node can fail loosing all non-replicated data. So seems like a good feature. Especially should be good for synchrononus transactions, where commit duration can be quite big. ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-02-26 22:44 ` Vladislav Shpilevoy via Tarantool-patches @ 2021-02-27 13:27 ` Konstantin Osipov via Tarantool-patches 0 siblings, 0 replies; 30+ messages in thread From: Konstantin Osipov via Tarantool-patches @ 2021-02-27 13:27 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [21/02/27 16:22]: > On 26.02.2021 22:20, Konstantin Osipov wrote: > > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [21/02/26 23:24]: > >> Talking of the other limits - firstly we need to find if some of them > >> really overflows. Then yes, such a semaphone-thing could be applied > >> there too. But AFAIK, there are no other known similar bugs yet. > > > > Exploring this rather theoretically, since there are no user async > > transactions yet, I can imagine such transaction takes up memory > > and then blocks on WAL semaphore. If there is no limit on the > > number of async transactions, it can be a lot of memory. On the > > other hand this can be limited by a yet another semaphore. > > Yes, such transactions will only occupy memory. But the plan is return > an error from box.commit({is_async}) if the WAL queue is full already. > Because we don't want to block the async commit in anyway. Better bail > out earlier and give the user a chance to call normal box.commit() if > necessary. Or introduce some kind of 'try_async' to block if the queue > is full, but no block if not full, I don't know. We didn't think on the > API yet. There is no point in doing parasitic work in this case - performing a transaction and then rolling it back. > -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-02-25 13:05 ` Konstantin Osipov via Tarantool-patches 2021-02-26 0:57 ` Vladislav Shpilevoy via Tarantool-patches @ 2021-03-01 19:15 ` Serge Petrenko via Tarantool-patches 2021-03-01 21:46 ` Konstantin Osipov via Tarantool-patches 1 sibling, 1 reply; 30+ messages in thread From: Serge Petrenko via Tarantool-patches @ 2021-03-01 19:15 UTC (permalink / raw) To: Konstantin Osipov, gorcunov, v.shpilevoy, tarantool-patches 25.02.2021 16:05, Konstantin Osipov пишет: > * Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org> [21/02/24 22:39]: Hi! Thanks for the review! > > Looks like a simple counting semaphore. I don't see why it has > to be specific to class journal, more like part of lib/core. > > https://en.cppreference.com/w/cpp/thread/counting_semaphore Not completely. It has 2 limits instead of 1, size and len, and the limits are 'soft', meaning the resource is free at the moment we wake the waiters up but it may be occupied again once the waiters actually wake up. Some fibers put to execution before the waken ones may have exhausted the limit, but we don't care about that. IMO this looks quite specialised. And there wouldn't be much use of such a primitive in lib/core. > > I'd also question the place where you decided to put this gate. > The source of the issue is async requests, not WAL, which worked > fine in absence of async requests. So it's async requests that > should be gated, not WAL. Otherwise your overflow will just spill > out someplace else. > >> Since the introduction of asynchronous commit, which doesn't wait for a >> WAL write to succeed, it's quite easy to clog WAL with huge amounts >> write requests. For now, it's only possible from an applier, since it's >> the only user of async commit at the moment. >> >> This happens when replica is syncing with master and reads new >> transactions at a pace higher than it can write them to WAL (see docbot >> request for detailed explanation). >> >> To ameliorate such behavior, we need to introduce some limit on >> not-yet-finished WAL write requests. This is what this commit is trying >> to do. >> Two new counters are added to wal writer: queue_size (in bytes) and >> queue_len (in wal messages) together with configuration settings: >> `wal_queue_max_size` and `wal_queue_max_len`. >> Size and length are increased on every new submitted request, and are >> decreased once the tx receives a confirmation that a specific request >> was written. >> Actually, the limits are added to an abstract journal, but take effect >> only for wal writer. >> >> Once size or length reach their maximum values, applier is blocked until >> some of the write requests are finished. >> >> The size limit isn't strict, i.e. if there's at least one free byte, the >> whole write request fits and no blocking is involved. >> >> The feature is ready for `box.commit{is_async=true}`. Once it's >> implemented, it should check whether the queue is full and let the user >> decide what to do next. Either wait or roll the tx back. >> >> Part of #5536 >> >> @TarantoolBot document >> Title: new configuration options: 'wal_queue_max_size', 'wal_queue_max_len' >> >> `wal_queue_max_size` and `wal_queue_max_len` put a limit on the amount >> of concurrent write requests submitted to WAL. >> `wal_queue_max_size` is measured in number of bytes to be written (0 >> means unlimited), and `wal_queue_max_len` is measured in number of >> transactions, 0 meaning unlimited. >> These options only affect replica behaviour at the moment, and default >> to 0. They limit the pace at which replica reads new transactions from >> master. >> >> Here's when these options come in handy: >> Imagine such a situation: there are 2 servers, a master and a replica, >> and the replica is down for some period of time. While the replica is >> down, the master serves requests at a reasonable pace, possibly close to >> its WAL throughput limit. Once the replica reconnects, it has to receive >> all the data master has piled up. Now there's no limit in speed at which >> master sends the data to replica, and there's no limit at which >> replica's applier submits corresponding write requests to WAL. This >> leads to a situation when replica's WAL is never in time to serve the >> requests and the amount of pending requests is constantly growing. >> There's no limit for memory WAL write requests take, and this clogging >> of WAL write queue may even lead to replica using up all the available >> memory. >> >> Now, when either `wal_queue_max_size` or `wal_queue_max_len` or both are >> set, appliers will stop reading new transactions once the limit is >> reached. This will let WAL process all the requests that have piled up >> and free all the excess memory. >> --- >> https://github.com/tarantool/tarantool/issues/5536 >> https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom >> >> Changes in v3: >> - Extract queue internals into new struct journal_queue. >> - Move is_ready flag to queue from queue entry. >> - Minor refactoring. >> >> Changes in v2: >> - Move queue logic to journal. >> >> src/box/applier.cc | 9 ++ >> src/box/box.cc | 52 ++++++++ >> src/box/box.h | 2 + >> src/box/journal.c | 71 ++++++++++ >> src/box/journal.h | 136 +++++++++++++++++++- >> src/box/lua/cfg.cc | 18 +++ >> src/box/lua/load_cfg.lua | 6 + >> src/box/wal.c | 14 ++ >> src/box/wal.h | 14 ++ >> test/app-tap/init_script.result | 2 + >> test/box-tap/cfg.test.lua | 4 +- >> test/box/admin.result | 4 + >> test/box/cfg.result | 8 ++ >> test/replication/gh-5536-wal-limit.result | 132 +++++++++++++++++++ >> test/replication/gh-5536-wal-limit.test.lua | 58 +++++++++ >> test/replication/suite.cfg | 1 + >> test/replication/suite.ini | 2 +- >> 17 files changed, 525 insertions(+), 8 deletions(-) >> create mode 100644 test/replication/gh-5536-wal-limit.result >> create mode 100644 test/replication/gh-5536-wal-limit.test.lua >> >> diff --git a/src/box/applier.cc b/src/box/applier.cc >> index 553db76fc..27ddd0f29 100644 >> --- a/src/box/applier.cc >> +++ b/src/box/applier.cc >> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) >> goto success; >> } >> >> + /* >> + * Do not spam WAL with excess write requests, let it process what's >> + * piled up first. >> + * This is done before opening the transaction to avoid problems with >> + * yielding inside it. >> + */ >> + if (journal_queue_is_full() || journal_queue_has_waiters()) >> + journal_wait_queue(); >> + >> /** >> * Explicitly begin the transaction so that we can >> * control fiber->gc life cycle and, in case of apply >> diff --git a/src/box/box.cc b/src/box/box.cc >> index 26cbe8aab..9a3b092d0 100644 >> --- a/src/box/box.cc >> +++ b/src/box/box.cc >> @@ -754,6 +754,34 @@ box_check_wal_mode(const char *mode_name) >> return (enum wal_mode) mode; >> } >> >> +static int64_t >> +box_check_wal_queue_max_len(void) >> +{ >> + int64_t len = cfg_geti64("wal_queue_max_len"); >> + if (len < 0) { >> + diag_set(ClientError, ER_CFG, "wal_queue_max_len", >> + "wal_queue_max_len must be >= 0"); >> + } >> + /* Unlimited. */ >> + if (len == 0) >> + len = INT64_MAX; >> + return len; >> +} >> + >> +static int64_t >> +box_check_wal_queue_max_size(void) >> +{ >> + int64_t size = cfg_geti64("wal_queue_max_size"); >> + if (size < 0) { >> + diag_set(ClientError, ER_CFG, "wal_queue_max_size", >> + "wal_queue_max_size must be >= 0"); >> + } >> + /* Unlimited. */ >> + if (size == 0) >> + size = INT64_MAX; >> + return size; >> +} >> + >> static void >> box_check_readahead(int readahead) >> { >> @@ -875,6 +903,10 @@ box_check_config(void) >> box_check_checkpoint_count(cfg_geti("checkpoint_count")); >> box_check_wal_max_size(cfg_geti64("wal_max_size")); >> box_check_wal_mode(cfg_gets("wal_mode")); >> + if (box_check_wal_queue_max_size() < 0) >> + diag_raise(); >> + if (box_check_wal_queue_max_len() < 0) >> + diag_raise(); >> if (box_check_memory_quota("memtx_memory") < 0) >> diag_raise(); >> box_check_memtx_min_tuple_size(cfg_geti64("memtx_min_tuple_size")); >> @@ -1411,6 +1443,26 @@ box_set_checkpoint_wal_threshold(void) >> wal_set_checkpoint_threshold(threshold); >> } >> >> +int >> +box_set_wal_queue_max_size(void) >> +{ >> + int64_t size = box_check_wal_queue_max_size(); >> + if (size < 0) >> + return -1; >> + wal_set_queue_max_size(size); >> + return 0; >> +} >> + >> +int >> +box_set_wal_queue_max_len(void) >> +{ >> + int64_t len = box_check_wal_queue_max_len(); >> + if (len < 0) >> + return -1; >> + wal_set_queue_max_len(len); >> + return 0; >> +} >> + >> void >> box_set_vinyl_memory(void) >> { >> diff --git a/src/box/box.h b/src/box/box.h >> index b68047a95..4f5b4b617 100644 >> --- a/src/box/box.h >> +++ b/src/box/box.h >> @@ -239,6 +239,8 @@ void box_set_readahead(void); >> void box_set_checkpoint_count(void); >> void box_set_checkpoint_interval(void); >> void box_set_checkpoint_wal_threshold(void); >> +int box_set_wal_queue_max_size(void); >> +int box_set_wal_queue_max_len(void); >> void box_set_memtx_memory(void); >> void box_set_memtx_max_tuple_size(void); >> void box_set_vinyl_memory(void); >> diff --git a/src/box/journal.c b/src/box/journal.c >> index cb320b557..4c31a3dfe 100644 >> --- a/src/box/journal.c >> +++ b/src/box/journal.c >> @@ -34,6 +34,16 @@ >> >> struct journal *current_journal = NULL; >> >> +struct journal_queue journal_queue = { >> + .max_size = INT64_MAX, >> + .size = 0, >> + .max_len = INT64_MAX, >> + .len = 0, >> + .waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters), >> + .is_awake = false, >> + .is_ready = false, >> +}; >> + >> struct journal_entry * >> journal_entry_new(size_t n_rows, struct region *region, >> journal_write_async_f write_async_cb, >> @@ -55,3 +65,64 @@ journal_entry_new(size_t n_rows, struct region *region, >> complete_data); >> return entry; >> } >> + >> +struct journal_queue_entry { >> + /** The fiber waiting for queue space to free. */ >> + struct fiber *fiber; >> + /** A link in all waiting fibers list. */ >> + struct rlist in_queue; >> +}; >> + >> +/** >> + * Wake up the first waiter in the journal queue. >> + */ >> +static inline void >> +journal_queue_wakeup_first(void) >> +{ >> + struct journal_queue_entry *e; >> + if (rlist_empty(&journal_queue.waiters)) >> + goto out; >> + /* >> + * When the queue isn't forcefully emptied, no need to wake everyone >> + * else up until there's some free space. >> + */ >> + if (!journal_queue.is_ready && journal_queue_is_full()) >> + goto out; >> + e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e), >> + in_queue); >> + fiber_wakeup(e->fiber); >> + return; >> +out: >> + journal_queue.is_awake = false; >> + journal_queue.is_ready = false; >> +} >> + >> +void >> +journal_queue_wakeup(bool force_ready) >> +{ >> + assert(!rlist_empty(&journal_queue.waiters)); >> + if (journal_queue.is_awake) >> + return; >> + journal_queue.is_awake = true; >> + journal_queue.is_ready = force_ready; >> + journal_queue_wakeup_first(); >> +} >> + >> +void >> +journal_wait_queue(void) >> +{ >> + struct journal_queue_entry entry = { >> + .fiber = fiber(), >> + }; >> + rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue); >> + /* >> + * Will be waken up by either queue emptying or a synchronous write. >> + */ >> + while (journal_queue_is_full() && !journal_queue.is_ready) >> + fiber_yield(); >> + >> + assert(&entry.in_queue == rlist_first(&journal_queue.waiters)); >> + rlist_del(&entry.in_queue); >> + >> + journal_queue_wakeup_first(); >> +} >> diff --git a/src/box/journal.h b/src/box/journal.h >> index 5d8d5a726..8fec5b27e 100644 >> --- a/src/box/journal.h >> +++ b/src/box/journal.h >> @@ -109,6 +109,36 @@ journal_entry_new(size_t n_rows, struct region *region, >> journal_write_async_f write_async_cb, >> void *complete_data); >> >> +struct journal_queue { >> + /** Maximal size of entries enqueued in journal (in bytes). */ >> + int64_t max_size; >> + /** Current approximate size of journal queue. */ >> + int64_t size; >> + /** Maximal allowed length of journal queue, in entries. */ >> + int64_t max_len; >> + /** Current journal queue length. */ >> + int64_t len; >> + /** >> + * The fibers waiting for some space to free in journal queue. >> + * Once some space is freed they will be waken up in the same order they >> + * entered the queue. >> + */ >> + struct rlist waiters; >> + /** >> + * Whether the queue is being woken or not. Used to avoid multiple >> + * concurrent wake-ups. >> + */ >> + bool is_awake; >> + /** >> + * A flag used to tell the waiting fibers they may proceed even if the >> + * queue is full, i.e. force them to submit a write request. >> + */ >> + bool is_ready; >> +}; >> + >> +/** A single queue for all journal instances. */ >> +extern struct journal_queue journal_queue; >> + >> /** >> * An API for an abstract journal for all transactions of this >> * instance, as well as for multiple instances in case of >> @@ -124,6 +154,82 @@ struct journal { >> struct journal_entry *entry); >> }; >> >> +/** >> + * Depending on the step of recovery and instance configuration >> + * points at a concrete implementation of the journal. >> + */ >> +extern struct journal *current_journal; >> + >> +/** >> + * Wake the journal queue up. >> + * @param force_ready whether waiters should proceed even if the queue is still >> + * full. >> + */ >> +void >> +journal_queue_wakeup(bool force_ready); >> + >> +/** >> + * Check whether any of the queue size limits is reached. >> + * If the queue is full, we must wait for some of the entries to be written >> + * before proceeding with a new asynchronous write request. >> + */ >> +static inline bool >> +journal_queue_is_full(void) >> +{ >> + return journal_queue.size > journal_queue.max_size || >> + journal_queue.len > journal_queue.max_len; >> +} >> + >> +/** >> + * Check whether anyone is waiting for the journal queue to empty. If there are >> + * other waiters we must go after them to preserve write order. >> + */ >> +static inline bool >> +journal_queue_has_waiters(void) >> +{ >> + return !rlist_empty(&journal_queue.waiters); >> +} >> + >> +/** Yield until there's some space in the journal queue. */ >> +void >> +journal_wait_queue(void); >> + >> +/** Set maximal journal queue size in bytes. */ >> +static inline void >> +journal_queue_set_max_size(int64_t size) >> +{ >> + journal_queue.max_size = size; >> + if (journal_queue_has_waiters() && !journal_queue_is_full()) >> + journal_queue_wakeup(false); >> +} >> + >> +/** Set maximal journal queue length, in entries. */ >> +static inline void >> +journal_queue_set_max_len(int64_t len) >> +{ >> + journal_queue.max_len = len; >> + if (journal_queue_has_waiters() && !journal_queue_is_full()) >> + journal_queue_wakeup(false); >> +} >> + >> +/** Increase queue size on a new write request. */ >> +static inline void >> +journal_queue_on_append(struct journal_entry *entry) >> +{ >> + journal_queue.len++; >> + journal_queue.size += entry->approx_len; >> +} >> + >> +/** Decrease queue size once write request is complete. */ >> +static inline void >> +journal_queue_on_complete(struct journal_entry *entry) >> +{ >> + journal_queue.len--; >> + journal_queue.size -= entry->approx_len; >> + assert(journal_queue.len >= 0); >> + assert(journal_queue.size >= 0); >> +} >> + >> /** >> * Complete asynchronous write. >> */ >> @@ -131,15 +237,14 @@ static inline void >> journal_async_complete(struct journal_entry *entry) >> { >> assert(entry->write_async_cb != NULL); >> + >> + journal_queue_on_complete(entry); >> + if (journal_queue_has_waiters() && !journal_queue_is_full()) >> + journal_queue_wakeup(false); >> + >> entry->write_async_cb(entry); >> } >> >> -/** >> - * Depending on the step of recovery and instance configuration >> - * points at a concrete implementation of the journal. >> - */ >> -extern struct journal *current_journal; >> - >> /** >> * Write a single entry to the journal in synchronous way. >> * >> @@ -148,6 +253,18 @@ extern struct journal *current_journal; >> static inline int >> journal_write(struct journal_entry *entry) >> { >> + if (journal_queue_has_waiters()) { >> + /* >> + * It's a synchronous write, so it's fine to wait a bit more for >> + * everyone else to be written. They'll wake us up back >> + * afterwards. >> + */ >> + journal_queue_wakeup(true); >> + journal_wait_queue(); >> + } >> + >> + journal_queue_on_append(entry); >> + >> return current_journal->write(current_journal, entry); >> } >> >> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry) >> static inline int >> journal_write_async(struct journal_entry *entry) >> { >> + /* >> + * It's the job of the caller to check whether the queue is full prior >> + * to submitting the request. >> + */ >> + assert(!journal_queue_is_full() || journal_queue.is_ready); >> + journal_queue_on_append(entry); >> + >> return current_journal->write_async(current_journal, entry); >> } >> >> diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc >> index 2d3ccbf0e..35f410710 100644 >> --- a/src/box/lua/cfg.cc >> +++ b/src/box/lua/cfg.cc >> @@ -164,6 +164,22 @@ lbox_cfg_set_checkpoint_wal_threshold(struct lua_State *L) >> return 0; >> } >> >> +static int >> +lbox_cfg_set_wal_queue_max_size(struct lua_State *L) >> +{ >> + if (box_set_wal_queue_max_size() != 0) >> + luaT_error(L); >> + return 0; >> +} >> + >> +static int >> +lbox_cfg_set_wal_queue_max_len(struct lua_State *L) >> +{ >> + if (box_set_wal_queue_max_len() != 0) >> + luaT_error(L); >> + return 0; >> +} >> + >> static int >> lbox_cfg_set_read_only(struct lua_State *L) >> { >> @@ -399,6 +415,8 @@ box_lua_cfg_init(struct lua_State *L) >> {"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count}, >> {"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval}, >> {"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold}, >> + {"cfg_set_wal_queue_max_size", lbox_cfg_set_wal_queue_max_size}, >> + {"cfg_set_wal_queue_max_len", lbox_cfg_set_wal_queue_max_len}, >> {"cfg_set_read_only", lbox_cfg_set_read_only}, >> {"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory}, >> {"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size}, >> diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua >> index 574c8bef4..c11a9e103 100644 >> --- a/src/box/lua/load_cfg.lua >> +++ b/src/box/lua/load_cfg.lua >> @@ -71,6 +71,8 @@ local default_cfg = { >> wal_mode = "write", >> wal_max_size = 256 * 1024 * 1024, >> wal_dir_rescan_delay= 2, >> + wal_queue_max_size = 0, >> + wal_queue_max_len = 0, >> force_recovery = false, >> replication = nil, >> instance_uuid = nil, >> @@ -163,6 +165,8 @@ local template_cfg = { >> coredump = 'boolean', >> checkpoint_interval = 'number', >> checkpoint_wal_threshold = 'number', >> + wal_queue_max_size = 'number', >> + wal_queue_max_len = 'number', >> checkpoint_count = 'number', >> read_only = 'boolean', >> hot_standby = 'boolean', >> @@ -277,6 +281,8 @@ local dynamic_cfg = { >> checkpoint_count = private.cfg_set_checkpoint_count, >> checkpoint_interval = private.cfg_set_checkpoint_interval, >> checkpoint_wal_threshold = private.cfg_set_checkpoint_wal_threshold, >> + wal_queue_max_size = private.cfg_set_wal_queue_max_size, >> + wal_queue_max_len = private.cfg_set_wal_queue_max_len, >> worker_pool_threads = private.cfg_set_worker_pool_threads, >> feedback_enabled = ifdef_feedback_set_params, >> feedback_crashinfo = ifdef_feedback_set_params, >> diff --git a/src/box/wal.c b/src/box/wal.c >> index 937d47ba9..4a0381cf4 100644 >> --- a/src/box/wal.c >> +++ b/src/box/wal.c >> @@ -765,6 +765,20 @@ wal_set_checkpoint_threshold(int64_t threshold) >> fiber_set_cancellable(cancellable); >> } >> >> +void >> +wal_set_queue_max_size(int64_t size) >> +{ >> + assert(&wal_writer_singleton.base == current_journal); >> + journal_queue_set_max_size(size); >> +} >> + >> +void >> +wal_set_queue_max_len(int64_t len) >> +{ >> + assert(&wal_writer_singleton.base == current_journal); >> + journal_queue_set_max_len(len); >> +} >> + >> struct wal_gc_msg >> { >> struct cbus_call_msg base; >> diff --git a/src/box/wal.h b/src/box/wal.h >> index ca43dc6eb..1db32f66f 100644 >> --- a/src/box/wal.h >> +++ b/src/box/wal.h >> @@ -237,6 +237,20 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint); >> void >> wal_set_checkpoint_threshold(int64_t threshold); >> >> +/** >> + * Set the pending write limit in bytes. Once the limit is reached, new >> + * writes are blocked until some previous writes succeed. >> + */ >> +void >> +wal_set_queue_max_size(int64_t size); >> + >> +/** >> + * Set the pending write limit in journal entries. Once the limit is reached, >> + * new writes are blocked until some previous writes succeeed. >> + */ >> +void >> +wal_set_queue_max_len(int64_t len); >> + >> /** >> * Remove WAL files that are not needed by consumers reading >> * rows at @vclock or newer. >> diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result >> index 16c5b01d2..7a224e50e 100644 >> --- a/test/app-tap/init_script.result >> +++ b/test/app-tap/init_script.result >> @@ -56,6 +56,8 @@ wal_dir:. >> wal_dir_rescan_delay:2 >> wal_max_size:268435456 >> wal_mode:write >> +wal_queue_max_len:0 >> +wal_queue_max_size:0 >> worker_pool_threads:4 >> -- >> -- Test insert from detached fiber >> diff --git a/test/box-tap/cfg.test.lua b/test/box-tap/cfg.test.lua >> index a577f023d..3276ddf64 100755 >> --- a/test/box-tap/cfg.test.lua >> +++ b/test/box-tap/cfg.test.lua >> @@ -6,7 +6,7 @@ local socket = require('socket') >> local fio = require('fio') >> local uuid = require('uuid') >> local msgpack = require('msgpack') >> -test:plan(108) >> +test:plan(110) >> >> -------------------------------------------------------------------------------- >> -- Invalid values >> @@ -49,6 +49,8 @@ invalid('vinyl_run_count_per_level', 0) >> invalid('vinyl_run_size_ratio', 1) >> invalid('vinyl_bloom_fpr', 0) >> invalid('vinyl_bloom_fpr', 1.1) >> +invalid('wal_queue_max_size', -1) >> +invalid('wal_queue_max_len', -1) >> >> local function invalid_combinations(name, val) >> local status, result = pcall(box.cfg, val) >> diff --git a/test/box/admin.result b/test/box/admin.result >> index 05debe673..c818f4f9f 100644 >> --- a/test/box/admin.result >> +++ b/test/box/admin.result >> @@ -133,6 +133,10 @@ cfg_filter(box.cfg) >> - 268435456 >> - - wal_mode >> - write >> + - - wal_queue_max_len >> + - 0 >> + - - wal_queue_max_size >> + - 0 >> - - worker_pool_threads >> - 4 >> ... >> diff --git a/test/box/cfg.result b/test/box/cfg.result >> index 22a720c2c..19f322e7d 100644 >> --- a/test/box/cfg.result >> +++ b/test/box/cfg.result >> @@ -121,6 +121,10 @@ cfg_filter(box.cfg) >> | - 268435456 >> | - - wal_mode >> | - write >> + | - - wal_queue_max_len >> + | - 0 >> + | - - wal_queue_max_size >> + | - 0 >> | - - worker_pool_threads >> | - 4 >> | ... >> @@ -236,6 +240,10 @@ cfg_filter(box.cfg) >> | - 268435456 >> | - - wal_mode >> | - write >> + | - - wal_queue_max_len >> + | - 0 >> + | - - wal_queue_max_size >> + | - 0 >> | - - worker_pool_threads >> | - 4 >> | ... >> diff --git a/test/replication/gh-5536-wal-limit.result b/test/replication/gh-5536-wal-limit.result >> new file mode 100644 >> index 000000000..f7799baa8 >> --- /dev/null >> +++ b/test/replication/gh-5536-wal-limit.result >> @@ -0,0 +1,132 @@ >> +-- test-run result file version 2 >> +test_run = require('test_run').new() >> + | --- >> + | ... >> +fiber = require('fiber') >> + | --- >> + | ... >> + >> +-- >> +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so >> +-- that appliers stop reading new transactions from master once the queue is >> +-- full. >> +-- >> +box.schema.user.grant('guest', 'replication') >> + | --- >> + | ... >> +_ = box.schema.space.create('test') >> + | --- >> + | ... >> +_ = box.space.test:create_index('pk') >> + | --- >> + | ... >> + >> +replication_timeout = box.cfg.replication_timeout >> + | --- >> + | ... >> +box.cfg{replication_timeout=1000} >> + | --- >> + | ... >> + >> +test_run:cmd('create server replica with rpl_master=default,\ >> + script="replication/replica.lua"') >> + | --- >> + | - true >> + | ... >> +test_run:cmd('start server replica with wait=True, wait_load=True') >> + | --- >> + | - true >> + | ... >> + >> +test_run:switch('replica') >> + | --- >> + | - true >> + | ... >> +-- Huge replication timeout to not cause reconnects while applier is blocked. >> +-- Tiny queue size (in bytes) to allow exactly one queue entry at a time. >> +box.cfg{wal_queue_max_size=1, replication_timeout=1000} >> + | --- >> + | ... >> +write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") >> + | --- >> + | ... >> +-- Block WAL writes so that we may test queue overflow. >> +box.error.injection.set("ERRINJ_WAL_DELAY", true) >> + | --- >> + | - ok >> + | ... >> + >> +test_run:switch('default') >> + | --- >> + | - true >> + | ... >> + >> +for i = 1,10 do box.space.test:insert{i} end >> + | --- >> + | ... >> + >> +test_run:switch('replica') >> + | --- >> + | - true >> + | ... >> +-- Wait for replication. Cannot rely on lsn bump here. It won't happen while >> +-- WAL is blocked. >> +test_run:wait_cond(function()\ >> + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\ >> +end) >> + | --- >> + | - true >> + | ... >> +require('fiber').sleep(0.5) >> + | --- >> + | ... >> +-- Only one entry fits when the limit is small. >> +assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1) >> + | --- >> + | - true >> + | ... >> +box.error.injection.set("ERRINJ_WAL_DELAY", false) >> + | --- >> + | - ok >> + | ... >> + >> +-- Once the block is removed everything is written. >> +test_run:wait_cond(function()\ >> + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\ >> +end) >> + | --- >> + | - true >> + | ... >> +assert(box.space.test:count() == 10) >> + | --- >> + | - true >> + | ... >> +assert(box.info.replication[1].upstream.status == 'follow') >> + | --- >> + | - true >> + | ... >> + >> +test_run:switch('default') >> + | --- >> + | - true >> + | ... >> + >> +-- Cleanup. >> +box.cfg{replication_timeout=replication_timeout} >> + | --- >> + | ... >> +test_run:cmd('stop server replica') >> + | --- >> + | - true >> + | ... >> +test_run:cmd('delete server replica') >> + | --- >> + | - true >> + | ... >> +box.space.test:drop() >> + | --- >> + | ... >> +box.schema.user.revoke('guest', 'replication') >> + | --- >> + | ... >> + >> diff --git a/test/replication/gh-5536-wal-limit.test.lua b/test/replication/gh-5536-wal-limit.test.lua >> new file mode 100644 >> index 000000000..1e7d61ff7 >> --- /dev/null >> +++ b/test/replication/gh-5536-wal-limit.test.lua >> @@ -0,0 +1,58 @@ >> +test_run = require('test_run').new() >> +fiber = require('fiber') >> + >> +-- >> +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so >> +-- that appliers stop reading new transactions from master once the queue is >> +-- full. >> +-- >> +box.schema.user.grant('guest', 'replication') >> +_ = box.schema.space.create('test') >> +_ = box.space.test:create_index('pk') >> + >> +replication_timeout = box.cfg.replication_timeout >> +box.cfg{replication_timeout=1000} >> + >> +test_run:cmd('create server replica with rpl_master=default,\ >> + script="replication/replica.lua"') >> +test_run:cmd('start server replica with wait=True, wait_load=True') >> + >> +test_run:switch('replica') >> +-- Huge replication timeout to not cause reconnects while applier is blocked. >> +-- Tiny queue size (in bytes) to allow exactly one queue entry at a time. >> +box.cfg{wal_queue_max_size=1, replication_timeout=1000} >> +write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") >> +-- Block WAL writes so that we may test queue overflow. >> +box.error.injection.set("ERRINJ_WAL_DELAY", true) >> + >> +test_run:switch('default') >> + >> +for i = 1,10 do box.space.test:insert{i} end >> + >> +test_run:switch('replica') >> +-- Wait for replication. Cannot rely on lsn bump here. It won't happen while >> +-- WAL is blocked. >> +test_run:wait_cond(function()\ >> + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\ >> +end) >> +require('fiber').sleep(0.5) >> +-- Only one entry fits when the limit is small. >> +assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1) >> +box.error.injection.set("ERRINJ_WAL_DELAY", false) >> + >> +-- Once the block is removed everything is written. >> +test_run:wait_cond(function()\ >> + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\ >> +end) >> +assert(box.space.test:count() == 10) >> +assert(box.info.replication[1].upstream.status == 'follow') >> + >> +test_run:switch('default') >> + >> +-- Cleanup. >> +box.cfg{replication_timeout=replication_timeout} >> +test_run:cmd('stop server replica') >> +test_run:cmd('delete server replica') >> +box.space.test:drop() >> +box.schema.user.revoke('guest', 'replication') >> + >> diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg >> index c80430afc..7e7004592 100644 >> --- a/test/replication/suite.cfg >> +++ b/test/replication/suite.cfg >> @@ -37,6 +37,7 @@ >> "gh-4928-tx-boundaries.test.lua": {}, >> "gh-5440-qsync-ro.test.lua": {}, >> "gh-5435-qsync-clear-synchro-queue-commit-all.test.lua": {}, >> + "gh-5536-wal-limit.test.lua": {}, >> "*": { >> "memtx": {"engine": "memtx"}, >> "vinyl": {"engine": "vinyl"} >> diff --git a/test/replication/suite.ini b/test/replication/suite.ini >> index e4812df37..89abfabff 100644 >> --- a/test/replication/suite.ini >> +++ b/test/replication/suite.ini >> @@ -3,7 +3,7 @@ core = tarantool >> script = master.lua >> description = tarantool/box, replication >> disabled = consistent.test.lua >> -release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua >> +release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua >> config = suite.cfg >> lua_libs = lua/fast_replica.lua lua/rlimit.lua >> use_unix_sockets = True >> -- >> 2.24.3 (Apple Git-128) -- Serge Petrenko ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-01 19:15 ` Serge Petrenko via Tarantool-patches @ 2021-03-01 21:46 ` Konstantin Osipov via Tarantool-patches 0 siblings, 0 replies; 30+ messages in thread From: Konstantin Osipov via Tarantool-patches @ 2021-03-01 21:46 UTC (permalink / raw) To: Serge Petrenko; +Cc: tarantool-patches, v.shpilevoy * Serge Petrenko <sergepetrenko@tarantool.org> [21/03/01 22:19]: > 25.02.2021 16:05, Konstantin Osipov пишет: > > * Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org> [21/02/24 22:39]: > > Hi! Thanks for the review! > > > > > Looks like a simple counting semaphore. I don't see why it has > > to be specific to class journal, more like part of lib/core. > > > > https://en.cppreference.com/w/cpp/thread/counting_semaphore > > Not completely. It has 2 limits instead of 1, size and len, > and the limits are 'soft', meaning the resource is free at the > moment we wake the waiters up but it may be occupied again once > the waiters actually wake up. For problem 1, use two semaphores, if you need two limits. For problem 2, I don't see any value in doing it this way. Do you? > Some fibers put to execution before > the waken ones may have exhausted the limit, but we don't care > about that. Then they don't take the semaphore? > IMO this looks quite specialised. And there wouldn't be much use > of such a primitive in lib/core. There is inherent value to solve the problem using standard primitives. Non-standard primitives should be justified over standard ones, not vice versa. -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-02-24 19:35 [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes Serge Petrenko via Tarantool-patches 2021-02-24 19:40 ` Serge Petrenko via Tarantool-patches 2021-02-25 13:05 ` Konstantin Osipov via Tarantool-patches @ 2021-02-26 0:56 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-01 19:08 ` Serge Petrenko via Tarantool-patches 2021-03-09 19:49 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-19 15:36 ` Kirill Yukhin via Tarantool-patches 4 siblings, 1 reply; 30+ messages in thread From: Vladislav Shpilevoy via Tarantool-patches @ 2021-02-26 0:56 UTC (permalink / raw) To: Serge Petrenko, gorcunov; +Cc: tarantool-patches Hi! Thanks for the patch! See 8 comments below, and my diff in the end of the email and on top of the branch. On 24.02.2021 20:35, Serge Petrenko via Tarantool-patches wrote: > Since the introduction of asynchronous commit, which doesn't wait for a > WAL write to succeed, it's quite easy to clog WAL with huge amounts > write requests. For now, it's only possible from an applier, since it's > the only user of async commit at the moment. > > This happens when replica is syncing with master and reads new > transactions at a pace higher than it can write them to WAL (see docbot > request for detailed explanation). > > To ameliorate such behavior, we need to introduce some limit on > not-yet-finished WAL write requests. This is what this commit is trying > to do. > Two new counters are added to wal writer: queue_size (in bytes) and > queue_len (in wal messages) together with configuration settings: > `wal_queue_max_size` and `wal_queue_max_len`. > Size and length are increased on every new submitted request, and are > decreased once the tx receives a confirmation that a specific request > was written. > Actually, the limits are added to an abstract journal, but take effect > only for wal writer. 1. Now the limits affect any current journal, regardless of its type. Although they really work only for WAL, because only WAL journal yields AFAIK. Others just 'commit' immediately. > --- > https://github.com/tarantool/tarantool/issues/5536 > https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 553db76fc..27ddd0f29 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) > goto success; > } > > + /* > + * Do not spam WAL with excess write requests, let it process what's > + * piled up first. > + * This is done before opening the transaction to avoid problems with > + * yielding inside it. > + */ > + if (journal_queue_is_full() || journal_queue_has_waiters()) > + journal_wait_queue(); 2. Perhaps simply move both checks into journal_wait_queue(). Seems like an internal thing for the queue's API. > diff --git a/src/box/journal.c b/src/box/journal.c > index cb320b557..4c31a3dfe 100644 > --- a/src/box/journal.c > +++ b/src/box/journal.c > @@ -34,6 +34,16 @@ > > struct journal *current_journal = NULL; > > +struct journal_queue journal_queue = { > + .max_size = INT64_MAX, > + .size = 0, > + .max_len = INT64_MAX, > + .len = 0, > + .waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters), > + .is_awake = false, > + .is_ready = false, > +}; 3. Kostja might be right about most of the queue's code being a good candidate for an extraction to libcore. So the queue itself would be the semaphore + queue size and len parameters. But up to you. > + > struct journal_entry * > journal_entry_new(size_t n_rows, struct region *region, > journal_write_async_f write_async_cb, > @@ -55,3 +65,64 @@ journal_entry_new(size_t n_rows, struct region *region, > complete_data); > return entry; > } > + > +struct journal_queue_entry { > + /** The fiber waiting for queue space to free. */ > + struct fiber *fiber; > + /** A link in all waiting fibers list. */ > + struct rlist in_queue; 4. I see fiber_cond uses fiber->state member. Can we do the same here? Because the queue is not much different from fiber_cond. Both are built on top of fiber API. Which means the 'state' might be usable in the queue as well. > +}; > + > +/** > + * Wake up the first waiter in the journal queue. > + */ > +static inline void > +journal_queue_wakeup_first(void) > +{ > + struct journal_queue_entry *e; > + if (rlist_empty(&journal_queue.waiters)) > + goto out; > + /* > + * When the queue isn't forcefully emptied, no need to wake everyone > + * else up until there's some free space. > + */ > + if (!journal_queue.is_ready && journal_queue_is_full()) > + goto out; > + e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e), > + in_queue); 5. Why didn't rlist_first_entry() work? > + fiber_wakeup(e->fiber); > + return; > +out: > + journal_queue.is_awake = false; > + journal_queue.is_ready = false; > +} > + > +void > +journal_queue_wakeup(bool force_ready) > +{ > + assert(!rlist_empty(&journal_queue.waiters)); > + if (journal_queue.is_awake) > + return; > + journal_queue.is_awake = true; > + journal_queue.is_ready = force_ready; > + journal_queue_wakeup_first(); > +} > + > +void > +journal_wait_queue(void) > +{ > + struct journal_queue_entry entry = { > + .fiber = fiber(), > + }; > + rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue); > + /* > + * Will be waken up by either queue emptying or a synchronous write. > + */ > + while (journal_queue_is_full() && !journal_queue.is_ready) > + fiber_yield(); 6. You check for full anyway. So as I mentioned in the comment in applier's code, you can move all the checks into there, and do them before creating entry and adding it to the queue. As a 'fast path'. And it would make journal_wait_queue() easier to use. > + > + assert(&entry.in_queue == rlist_first(&journal_queue.waiters)); > + rlist_del(&entry.in_queue); > + > + journal_queue_wakeup_first(); > +} > diff --git a/src/box/journal.h b/src/box/journal.h > index 5d8d5a726..8fec5b27e 100644 > --- a/src/box/journal.h > +++ b/src/box/journal.h > @@ -124,6 +154,82 @@ struct journal { > struct journal_entry *entry); > }; > > +/** > + * Depending on the step of recovery and instance configuration > + * points at a concrete implementation of the journal. > + */ > +extern struct journal *current_journal; 7. Now you don't need to move current_journal declaration. > @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry) > static inline int > journal_write_async(struct journal_entry *entry) > { > + /* > + * It's the job of the caller to check whether the queue is full prior > + * to submitting the request. > + */ > + assert(!journal_queue_is_full() || journal_queue.is_ready); > + journal_queue_on_append(entry); 8. Probably must assert that waiters list is empty. Otherwise you could go out of order. > + > return current_journal->write_async(current_journal, entry); > } > I tried to do some fixes on top of your branch in order to delete the flags and some branching. Take a look at the diff below and on top of the branch in a separate commit. ==================== diff --git a/src/box/applier.cc b/src/box/applier.cc index 27ddd0f29..2586f6654 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -973,8 +973,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) * This is done before opening the transaction to avoid problems with * yielding inside it. */ - if (journal_queue_is_full() || journal_queue_has_waiters()) - journal_wait_queue(); + journal_queue_wait(); /** * Explicitly begin the transaction so that we can diff --git a/src/box/journal.c b/src/box/journal.c index 4c31a3dfe..40a5f5b1a 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -40,8 +40,7 @@ struct journal_queue journal_queue = { .max_len = INT64_MAX, .len = 0, .waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters), - .is_awake = false, - .is_ready = false, + .waiter_count = 0, }; struct journal_entry * @@ -66,63 +65,38 @@ journal_entry_new(size_t n_rows, struct region *region, return entry; } -struct journal_queue_entry { - /** The fiber waiting for queue space to free. */ - struct fiber *fiber; - /** A link in all waiting fibers list. */ - struct rlist in_queue; -}; - -/** - * Wake up the first waiter in the journal queue. - */ -static inline void -journal_queue_wakeup_first(void) -{ - struct journal_queue_entry *e; - if (rlist_empty(&journal_queue.waiters)) - goto out; - /* - * When the queue isn't forcefully emptied, no need to wake everyone - * else up until there's some free space. - */ - if (!journal_queue.is_ready && journal_queue_is_full()) - goto out; - e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e), - in_queue); - fiber_wakeup(e->fiber); - return; -out: - journal_queue.is_awake = false; - journal_queue.is_ready = false; -} - void -journal_queue_wakeup(bool force_ready) +journal_queue_wakeup(void) { - assert(!rlist_empty(&journal_queue.waiters)); - if (journal_queue.is_awake) - return; - journal_queue.is_awake = true; - journal_queue.is_ready = force_ready; - journal_queue_wakeup_first(); + struct rlist *list = &journal_queue.waiters; + if (!rlist_empty(list) && !journal_queue_is_full()) + fiber_wakeup(rlist_first_entry(list, struct fiber, state)); } void -journal_wait_queue(void) +journal_queue_wait(void) { - struct journal_queue_entry entry = { - .fiber = fiber(), - }; - rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue); + if (!journal_queue_is_full() && !journal_queue_has_waiters()) + return; + ++journal_queue.waiter_count; + rlist_add_tail_entry(&journal_queue.waiters, fiber(), state); /* * Will be waken up by either queue emptying or a synchronous write. */ - while (journal_queue_is_full() && !journal_queue.is_ready) + do { fiber_yield(); + } while (journal_queue_is_full()); + --journal_queue.waiter_count; + journal_queue_wakeup(); +} - assert(&entry.in_queue == rlist_first(&journal_queue.waiters)); - rlist_del(&entry.in_queue); - - journal_queue_wakeup_first(); +void +journal_queue_flush(void) +{ + if (!journal_queue_has_waiters()) + return; + struct rlist *list = &journal_queue.waiters; + while (!rlist_empty(list)) + fiber_wakeup(rlist_first_entry(list, struct fiber, state)); + journal_queue_wait(); } diff --git a/src/box/journal.h b/src/box/journal.h index 8fec5b27e..3b93158cf 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -128,17 +128,22 @@ struct journal_queue { * Whether the queue is being woken or not. Used to avoid multiple * concurrent wake-ups. */ - bool is_awake; - /** - * A flag used to tell the waiting fibers they may proceed even if the - * queue is full, i.e. force them to submit a write request. - */ - bool is_ready; + bool waiter_count; }; /** A single queue for all journal instances. */ extern struct journal_queue journal_queue; +/** + * Check whether anyone is waiting for the journal queue to empty. If there are + * other waiters we must go after them to preserve write order. + */ +static inline bool +journal_queue_has_waiters(void) +{ + return journal_queue.waiter_count != 0; +} ==================== I moved this function not on purpose. Cold be kept in the old place. ==================== + /** * An API for an abstract journal for all transactions of this * instance, as well as for multiple instances in case of @@ -166,7 +171,7 @@ extern struct journal *current_journal; * full. */ void -journal_queue_wakeup(bool force_ready); +journal_queue_wakeup(void); /** * Check whether any of the queue size limits is reached. @@ -180,27 +185,19 @@ journal_queue_is_full(void) journal_queue.len > journal_queue.max_len; } -/** - * Check whether anyone is waiting for the journal queue to empty. If there are - * other waiters we must go after them to preserve write order. - */ -static inline bool -journal_queue_has_waiters(void) -{ - return !rlist_empty(&journal_queue.waiters); -} - /** Yield until there's some space in the journal queue. */ void -journal_wait_queue(void); +journal_queue_wait(void); + +void +journal_queue_flush(void); /** Set maximal journal queue size in bytes. */ static inline void journal_queue_set_max_size(int64_t size) { journal_queue.max_size = size; - if (journal_queue_has_waiters() && !journal_queue_is_full()) - journal_queue_wakeup(false); + journal_queue_wakeup(); } /** Set maximal journal queue length, in entries. */ @@ -208,8 +205,7 @@ static inline void journal_queue_set_max_len(int64_t len) { journal_queue.max_len = len; - if (journal_queue_has_waiters() && !journal_queue_is_full()) - journal_queue_wakeup(false); + journal_queue_wakeup(); } /** Increase queue size on a new write request. */ @@ -239,8 +235,7 @@ journal_async_complete(struct journal_entry *entry) assert(entry->write_async_cb != NULL); journal_queue_on_complete(entry); - if (journal_queue_has_waiters() && !journal_queue_is_full()) - journal_queue_wakeup(false); + journal_queue_wakeup(); entry->write_async_cb(entry); } @@ -253,16 +248,7 @@ journal_async_complete(struct journal_entry *entry) static inline int journal_write(struct journal_entry *entry) { - if (journal_queue_has_waiters()) { - /* - * It's a synchronous write, so it's fine to wait a bit more for - * everyone else to be written. They'll wake us up back - * afterwards. - */ - journal_queue_wakeup(true); - journal_wait_queue(); - } - + journal_queue_flush(); journal_queue_on_append(entry); return current_journal->write(current_journal, entry); @@ -280,7 +266,7 @@ journal_write_async(struct journal_entry *entry) * It's the job of the caller to check whether the queue is full prior * to submitting the request. */ - assert(!journal_queue_is_full() || journal_queue.is_ready); + assert(journal_queue.waiter_count == 0); journal_queue_on_append(entry); return current_journal->write_async(current_journal, entry); ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-02-26 0:56 ` Vladislav Shpilevoy via Tarantool-patches @ 2021-03-01 19:08 ` Serge Petrenko via Tarantool-patches 2021-03-01 22:05 ` Vladislav Shpilevoy via Tarantool-patches 0 siblings, 1 reply; 30+ messages in thread From: Serge Petrenko via Tarantool-patches @ 2021-03-01 19:08 UTC (permalink / raw) To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches 26.02.2021 03:56, Vladislav Shpilevoy пишет: > Hi! Thanks for the patch! > > See 8 comments below, and my diff in the end of the email and on > top of the branch. Thanks for the review & the fixes! > > On 24.02.2021 20:35, Serge Petrenko via Tarantool-patches wrote: >> Since the introduction of asynchronous commit, which doesn't wait for a >> WAL write to succeed, it's quite easy to clog WAL with huge amounts >> write requests. For now, it's only possible from an applier, since it's >> the only user of async commit at the moment. >> >> This happens when replica is syncing with master and reads new >> transactions at a pace higher than it can write them to WAL (see docbot >> request for detailed explanation). >> >> To ameliorate such behavior, we need to introduce some limit on >> not-yet-finished WAL write requests. This is what this commit is trying >> to do. >> Two new counters are added to wal writer: queue_size (in bytes) and >> queue_len (in wal messages) together with configuration settings: >> `wal_queue_max_size` and `wal_queue_max_len`. >> Size and length are increased on every new submitted request, and are >> decreased once the tx receives a confirmation that a specific request >> was written. >> Actually, the limits are added to an abstract journal, but take effect >> only for wal writer. > 1. Now the limits affect any current journal, regardless of its type. > Although they really work only for WAL, because only WAL journal > yields AFAIK. Others just 'commit' immediately. Correct. I've reworded this piece. > >> --- >> https://github.com/tarantool/tarantool/issues/5536 >> https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom >> >> diff --git a/src/box/applier.cc b/src/box/applier.cc >> index 553db76fc..27ddd0f29 100644 >> --- a/src/box/applier.cc >> +++ b/src/box/applier.cc >> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) >> goto success; >> } >> >> + /* >> + * Do not spam WAL with excess write requests, let it process what's >> + * piled up first. >> + * This is done before opening the transaction to avoid problems with >> + * yielding inside it. >> + */ >> + if (journal_queue_is_full() || journal_queue_has_waiters()) >> + journal_wait_queue(); > 2. Perhaps simply move both checks into journal_wait_queue(). Seems like > an internal thing for the queue's API. Looks good. > >> diff --git a/src/box/journal.c b/src/box/journal.c >> index cb320b557..4c31a3dfe 100644 >> --- a/src/box/journal.c >> +++ b/src/box/journal.c >> @@ -34,6 +34,16 @@ >> >> struct journal *current_journal = NULL; >> >> +struct journal_queue journal_queue = { >> + .max_size = INT64_MAX, >> + .size = 0, >> + .max_len = INT64_MAX, >> + .len = 0, >> + .waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters), >> + .is_awake = false, >> + .is_ready = false, >> +}; > 3. Kostja might be right about most of the queue's code being a good > candidate for an extraction to libcore. So the queue itself would > be the semaphore + queue size and len parameters. But up to you. I'm not sure I get it. It would be a counting semaphore, if we had a single limit, say, only entry count, or only entry size. But we have both. So it's not a "normal" counting semaphore. And if not, why extract it as a generic primitive? Moreover, the limits are now 'soft'. As discussed verbally, we'll wake the entry up and let it proceed, if we see, that the queue isn't full at the time of wake up. But it may be full again once the fiber is actually put to execution. And we ignore this. So it's a "soft counting semaphore with 2 resources". > >> + >> struct journal_entry * >> journal_entry_new(size_t n_rows, struct region *region, >> journal_write_async_f write_async_cb, >> @@ -55,3 +65,64 @@ journal_entry_new(size_t n_rows, struct region *region, >> complete_data); >> return entry; >> } >> + >> +struct journal_queue_entry { >> + /** The fiber waiting for queue space to free. */ >> + struct fiber *fiber; >> + /** A link in all waiting fibers list. */ >> + struct rlist in_queue; > 4. I see fiber_cond uses fiber->state member. Can we do the same here? > Because the queue is not much different from fiber_cond. Both are > built on top of fiber API. Which means the 'state' might be usable in > the queue as well. It's a good idea, thanks! Applied with minor changes, described below. > >> +}; >> + >> +/** >> + * Wake up the first waiter in the journal queue. >> + */ >> +static inline void >> +journal_queue_wakeup_first(void) >> +{ >> + struct journal_queue_entry *e; >> + if (rlist_empty(&journal_queue.waiters)) >> + goto out; >> + /* >> + * When the queue isn't forcefully emptied, no need to wake everyone >> + * else up until there's some free space. >> + */ >> + if (!journal_queue.is_ready && journal_queue_is_full()) >> + goto out; >> + e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e), >> + in_queue); > 5. Why didn't rlist_first_entry() work? It sure would work. Just a misprint, thanks for pointing this out! > >> + fiber_wakeup(e->fiber); >> + return; >> +out: >> + journal_queue.is_awake = false; >> + journal_queue.is_ready = false; >> +} >> + >> +void >> +journal_queue_wakeup(bool force_ready) >> +{ >> + assert(!rlist_empty(&journal_queue.waiters)); >> + if (journal_queue.is_awake) >> + return; >> + journal_queue.is_awake = true; >> + journal_queue.is_ready = force_ready; >> + journal_queue_wakeup_first(); >> +} >> + >> +void >> +journal_wait_queue(void) >> +{ >> + struct journal_queue_entry entry = { >> + .fiber = fiber(), >> + }; >> + rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue); >> + /* >> + * Will be waken up by either queue emptying or a synchronous write. >> + */ >> + while (journal_queue_is_full() && !journal_queue.is_ready) >> + fiber_yield(); > 6. You check for full anyway. So as I mentioned in the comment in > applier's code, you can move all the checks into there, and do > them before creating entry and adding it to the queue. As a > 'fast path'. And it would make journal_wait_queue() easier to use. Ok > >> + >> + assert(&entry.in_queue == rlist_first(&journal_queue.waiters)); >> + rlist_del(&entry.in_queue); >> + >> + journal_queue_wakeup_first(); >> +} >> diff --git a/src/box/journal.h b/src/box/journal.h >> index 5d8d5a726..8fec5b27e 100644 >> --- a/src/box/journal.h >> +++ b/src/box/journal.h >> @@ -124,6 +154,82 @@ struct journal { >> struct journal_entry *entry); >> }; >> >> +/** >> + * Depending on the step of recovery and instance configuration >> + * points at a concrete implementation of the journal. >> + */ >> +extern struct journal *current_journal; > 7. Now you don't need to move current_journal declaration. Moved that back. > >> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry) >> static inline int >> journal_write_async(struct journal_entry *entry) >> { >> + /* >> + * It's the job of the caller to check whether the queue is full prior >> + * to submitting the request. >> + */ >> + assert(!journal_queue_is_full() || journal_queue.is_ready); >> + journal_queue_on_append(entry); > 8. Probably must assert that waiters list is empty. Otherwise you > could go out of order. It's not empty by the time the first entry gets to 'journal_write_async'. Everyone else is waken up, but not yet removed from the queue. Looks like we cannot determine whether a write is called after waiting in queue or not. > >> + >> return current_journal->write_async(current_journal, entry); >> } >> > I tried to do some fixes on top of your branch in order to delete > the flags and some branching. > > Take a look at the diff below and on top of the branch in a separate > commit. Thanks! Looks good, with my changes on top. I squashed everything into one commit and updated the branch. ============================================ diff --git a/src/box/journal.c b/src/box/journal.c index 40a5f5b1a..92a773684 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -83,9 +83,7 @@ journal_queue_wait(void) /* * Will be waken up by either queue emptying or a synchronous write. */ - do { - fiber_yield(); - } while (journal_queue_is_full()); + fiber_yield(); --journal_queue.waiter_count; journal_queue_wakeup(); } diff --git a/src/box/journal.h b/src/box/journal.h index 3b93158cf..b5d587e3a 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -124,26 +124,13 @@ struct journal_queue { * entered the queue. */ struct rlist waiters; - /** - * Whether the queue is being woken or not. Used to avoid multiple - * concurrent wake-ups. - */ - bool waiter_count; + /** How many waiters there are in a queue. */ + int waiter_count; }; /** A single queue for all journal instances. */ extern struct journal_queue journal_queue; -/** - * Check whether anyone is waiting for the journal queue to empty. If there are - * other waiters we must go after them to preserve write order. - */ -static inline bool -journal_queue_has_waiters(void) -{ - return journal_queue.waiter_count != 0; -} - ============================================ Returned this func back to its original place. ============================================ /** * An API for an abstract journal for all transactions of this * instance, as well as for multiple instances in case of @@ -173,6 +160,16 @@ extern struct journal *current_journal; void journal_queue_wakeup(void); +/** + * Check whether anyone is waiting for the journal queue to empty. If there are + * other waiters we must go after them to preserve write order. + */ +static inline bool +journal_queue_has_waiters(void) +{ + return journal_queue.waiter_count != 0; +} + /** * Check whether any of the queue size limits is reached. * If the queue is full, we must wait for some of the entries to be written @@ -235,7 +232,6 @@ journal_async_complete(struct journal_entry *entry) assert(entry->write_async_cb != NULL); journal_queue_on_complete(entry); - journal_queue_wakeup(); ============================================ Let's wake the queue up once the whole batch is processed. This way we avoid excess checks and waking every waiter in queue up in case the queue is not full (each wakeup wakes the first entry up and removes it from the list, so a ton of wake-ups would wake up every entry there is). ============================================ entry->write_async_cb(entry); } @@ -266,7 +262,6 @@ journal_write_async(struct journal_entry *entry) * It's the job of the caller to check whether the queue is full prior * to submitting the request. */ - assert(journal_queue.waiter_count == 0); journal_queue_on_append(entry); ============================================ As I mentioned above, we cannot assert queue hasn't got waiters here. ============================================ return current_journal->write_async(current_journal, entry); diff --git a/src/box/wal.c b/src/box/wal.c index 328ab092d..7829ccc95 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -274,6 +274,8 @@ tx_schedule_queue(struct stailq *queue) struct journal_entry *req, *tmp; stailq_foreach_entry_safe(req, tmp, queue, fifo) journal_async_complete(req); + + journal_queue_wakeup(); } /** ============================================ > > ==================== > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 27ddd0f29..2586f6654 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -973,8 +973,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) > * This is done before opening the transaction to avoid problems with > * yielding inside it. > */ > - if (journal_queue_is_full() || journal_queue_has_waiters()) > - journal_wait_queue(); > + journal_queue_wait(); > > /** > * Explicitly begin the transaction so that we can > diff --git a/src/box/journal.c b/src/box/journal.c > index 4c31a3dfe..40a5f5b1a 100644 > --- a/src/box/journal.c > +++ b/src/box/journal.c > @@ -40,8 +40,7 @@ struct journal_queue journal_queue = { > .max_len = INT64_MAX, > .len = 0, > .waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters), > - .is_awake = false, > - .is_ready = false, > + .waiter_count = 0, > }; > > struct journal_entry * > @@ -66,63 +65,38 @@ journal_entry_new(size_t n_rows, struct region *region, > return entry; > } > > -struct journal_queue_entry { > - /** The fiber waiting for queue space to free. */ > - struct fiber *fiber; > - /** A link in all waiting fibers list. */ > - struct rlist in_queue; > -}; > - > -/** > - * Wake up the first waiter in the journal queue. > - */ > -static inline void > -journal_queue_wakeup_first(void) > -{ > - struct journal_queue_entry *e; > - if (rlist_empty(&journal_queue.waiters)) > - goto out; > - /* > - * When the queue isn't forcefully emptied, no need to wake everyone > - * else up until there's some free space. > - */ > - if (!journal_queue.is_ready && journal_queue_is_full()) > - goto out; > - e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e), > - in_queue); > - fiber_wakeup(e->fiber); > - return; > -out: > - journal_queue.is_awake = false; > - journal_queue.is_ready = false; > -} > - > void > -journal_queue_wakeup(bool force_ready) > +journal_queue_wakeup(void) > { > - assert(!rlist_empty(&journal_queue.waiters)); > - if (journal_queue.is_awake) > - return; > - journal_queue.is_awake = true; > - journal_queue.is_ready = force_ready; > - journal_queue_wakeup_first(); > + struct rlist *list = &journal_queue.waiters; > + if (!rlist_empty(list) && !journal_queue_is_full()) > + fiber_wakeup(rlist_first_entry(list, struct fiber, state)); > } > > void > -journal_wait_queue(void) > +journal_queue_wait(void) > { > - struct journal_queue_entry entry = { > - .fiber = fiber(), > - }; > - rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue); > + if (!journal_queue_is_full() && !journal_queue_has_waiters()) > + return; > + ++journal_queue.waiter_count; > + rlist_add_tail_entry(&journal_queue.waiters, fiber(), state); > /* > * Will be waken up by either queue emptying or a synchronous write. > */ > - while (journal_queue_is_full() && !journal_queue.is_ready) > + do { > fiber_yield(); > + } while (journal_queue_is_full()); > + --journal_queue.waiter_count; > + journal_queue_wakeup(); > +} > > - assert(&entry.in_queue == rlist_first(&journal_queue.waiters)); > - rlist_del(&entry.in_queue); > - > - journal_queue_wakeup_first(); > +void > +journal_queue_flush(void) > +{ > + if (!journal_queue_has_waiters()) > + return; > + struct rlist *list = &journal_queue.waiters; > + while (!rlist_empty(list)) > + fiber_wakeup(rlist_first_entry(list, struct fiber, state)); > + journal_queue_wait(); > } > diff --git a/src/box/journal.h b/src/box/journal.h > index 8fec5b27e..3b93158cf 100644 > --- a/src/box/journal.h > +++ b/src/box/journal.h > @@ -128,17 +128,22 @@ struct journal_queue { > * Whether the queue is being woken or not. Used to avoid multiple > * concurrent wake-ups. > */ > - bool is_awake; > - /** > - * A flag used to tell the waiting fibers they may proceed even if the > - * queue is full, i.e. force them to submit a write request. > - */ > - bool is_ready; > + bool waiter_count; > }; > > /** A single queue for all journal instances. */ > extern struct journal_queue journal_queue; > > +/** > + * Check whether anyone is waiting for the journal queue to empty. If there are > + * other waiters we must go after them to preserve write order. > + */ > +static inline bool > +journal_queue_has_waiters(void) > +{ > + return journal_queue.waiter_count != 0; > +} > ==================== > > I moved this function not on purpose. Cold be kept in > the old place. > > ==================== > + > /** > * An API for an abstract journal for all transactions of this > * instance, as well as for multiple instances in case of > @@ -166,7 +171,7 @@ extern struct journal *current_journal; > * full. > */ > void > -journal_queue_wakeup(bool force_ready); > +journal_queue_wakeup(void); > > /** > * Check whether any of the queue size limits is reached. > @@ -180,27 +185,19 @@ journal_queue_is_full(void) > journal_queue.len > journal_queue.max_len; > } > > -/** > - * Check whether anyone is waiting for the journal queue to empty. If there are > - * other waiters we must go after them to preserve write order. > - */ > -static inline bool > -journal_queue_has_waiters(void) > -{ > - return !rlist_empty(&journal_queue.waiters); > -} > - > /** Yield until there's some space in the journal queue. */ > void > -journal_wait_queue(void); > +journal_queue_wait(void); > + > +void > +journal_queue_flush(void); > > /** Set maximal journal queue size in bytes. */ > static inline void > journal_queue_set_max_size(int64_t size) > { > journal_queue.max_size = size; > - if (journal_queue_has_waiters() && !journal_queue_is_full()) > - journal_queue_wakeup(false); > + journal_queue_wakeup(); > } > > /** Set maximal journal queue length, in entries. */ > @@ -208,8 +205,7 @@ static inline void > journal_queue_set_max_len(int64_t len) > { > journal_queue.max_len = len; > - if (journal_queue_has_waiters() && !journal_queue_is_full()) > - journal_queue_wakeup(false); > + journal_queue_wakeup(); > } > > /** Increase queue size on a new write request. */ > @@ -239,8 +235,7 @@ journal_async_complete(struct journal_entry *entry) > assert(entry->write_async_cb != NULL); > > journal_queue_on_complete(entry); > - if (journal_queue_has_waiters() && !journal_queue_is_full()) > - journal_queue_wakeup(false); > + journal_queue_wakeup(); > > entry->write_async_cb(entry); > } > @@ -253,16 +248,7 @@ journal_async_complete(struct journal_entry *entry) > static inline int > journal_write(struct journal_entry *entry) > { > - if (journal_queue_has_waiters()) { > - /* > - * It's a synchronous write, so it's fine to wait a bit more for > - * everyone else to be written. They'll wake us up back > - * afterwards. > - */ > - journal_queue_wakeup(true); > - journal_wait_queue(); > - } > - > + journal_queue_flush(); > journal_queue_on_append(entry); > > return current_journal->write(current_journal, entry); > @@ -280,7 +266,7 @@ journal_write_async(struct journal_entry *entry) > * It's the job of the caller to check whether the queue is full prior > * to submitting the request. > */ > - assert(!journal_queue_is_full() || journal_queue.is_ready); > + assert(journal_queue.waiter_count == 0); > journal_queue_on_append(entry); > > return current_journal->write_async(current_journal, entry); -- Serge Petrenko ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-01 19:08 ` Serge Petrenko via Tarantool-patches @ 2021-03-01 22:05 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-02 17:51 ` Serge Petrenko via Tarantool-patches 0 siblings, 1 reply; 30+ messages in thread From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-01 22:05 UTC (permalink / raw) To: Serge Petrenko, gorcunov; +Cc: tarantool-patches Hi! Thanks for the fixes! >>> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry) >>> static inline int >>> journal_write_async(struct journal_entry *entry) >>> { >>> + /* >>> + * It's the job of the caller to check whether the queue is full prior >>> + * to submitting the request. >>> + */ >>> + assert(!journal_queue_is_full() || journal_queue.is_ready); >>> + journal_queue_on_append(entry); >> 8. Probably must assert that waiters list is empty. Otherwise you >> could go out of order. > > It's not empty by the time the first entry gets to 'journal_write_async'. > Everyone else is waken up, but not yet removed from the queue. > > Looks like we cannot determine whether a write is called after waiting in queue > or not. It bothers me a lot, the rest looks good. We don't have any protection against a reorder, even not an assertion. How about this? (I didn't test): ==================== --- a/src/box/journal.c +++ b/src/box/journal.c @@ -69,8 +69,11 @@ void journal_queue_wakeup(void) { struct rlist *list = &journal_queue.waiters; - if (!rlist_empty(list) && !journal_queue_is_full()) + if (!rlist_empty(list) && !journal_queue_is_full()) { fiber_wakeup(rlist_first_entry(list, struct fiber, state)); + --journal_queue.waiter_count; + assert(journal_queue.waiter_count >= 0); + } } void @@ -84,7 +87,6 @@ journal_queue_wait(void) * Will be waken up by either queue emptying or a synchronous write. */ fiber_yield(); - --journal_queue.waiter_count; journal_queue_wakeup(); } @@ -96,5 +98,6 @@ journal_queue_flush(void) struct rlist *list = &journal_queue.waiters; while (!rlist_empty(list)) fiber_wakeup(rlist_first_entry(list, struct fiber, state)); + journal_queue.waiter_count = 0; journal_queue_wait(); } diff --git a/src/box/journal.h b/src/box/journal.h index 5f0e0accd..ea56043e2 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -260,6 +260,7 @@ journal_write_async(struct journal_entry *entry) * to submitting the request. */ journal_queue_on_append(entry); + assert(journal_queue.waiter_count == 0); return current_journal->write_async(current_journal, entry); } ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-01 22:05 ` Vladislav Shpilevoy via Tarantool-patches @ 2021-03-02 17:51 ` Serge Petrenko via Tarantool-patches 2021-03-03 20:59 ` Vladislav Shpilevoy via Tarantool-patches 0 siblings, 1 reply; 30+ messages in thread From: Serge Petrenko via Tarantool-patches @ 2021-03-02 17:51 UTC (permalink / raw) To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches 02.03.2021 01:05, Vladislav Shpilevoy пишет: > Hi! Thanks for the fixes! > >>>> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry) >>>> static inline int >>>> journal_write_async(struct journal_entry *entry) >>>> { >>>> + /* >>>> + * It's the job of the caller to check whether the queue is full prior >>>> + * to submitting the request. >>>> + */ >>>> + assert(!journal_queue_is_full() || journal_queue.is_ready); >>>> + journal_queue_on_append(entry); >>> 8. Probably must assert that waiters list is empty. Otherwise you >>> could go out of order. >> It's not empty by the time the first entry gets to 'journal_write_async'. >> Everyone else is waken up, but not yet removed from the queue. >> >> Looks like we cannot determine whether a write is called after waiting in queue >> or not. > It bothers me a lot, the rest looks good. We don't have any protection against > a reorder, even not an assertion. How about this? (I didn't test): Thanks for thinking this through! Yes, indeed, this is a problem. Unfortunately, your solution will allow reordering. We needed the waiter count to prevent anyone from sliding in front when the qeueue is already waken, but the fibers are not yet scheduled. We also discussed that a reorder may happen if applier tries to apply a vinyl tx, which may yield after it has waited in queue. In this case, if some tx is committed (synchronously) and added to the queue while the vinyl tx sleeps, the vinyl tx will bypass that synchronous tx in queue. So, there shouldn't be any yields between journal_queue_wait() and journal_write(_async)(). We cannot yield in memtx transactions, so the only solution is to put journal_queue_wait() after txn_prepare() inside txn_commit_async(). Take a look at this patch (on top of the branch): I decided to move journal_queue_wait() to journal_write_async() to be consistent with journal_write(), which also checks queue. Now reordering is not a problem since waiting in queue is done right before the write request is submitted. No yields in between. Also, I renamed journal_write_async() to journal_write_try_async() and txn_commit_async() to txn_commit_try_async() to indicate that they may yield occasionally now. What do you think of this approach? P.S. there's another thing I wanted to discuss: should we set defaults for wal_queue_max_size(len) to some finite value? If yes, then what value should it be? wal_queue_max_size=default memtx_memory (256 Mb)? Don't know which value to choose for wal_queue_max_len at all. =================================== Subject: [PATCH] [tosquash] journal_write and txn_commit: async -> try_async Make journal_write_async() wait for journal queue. Rename it to journal_write_try_async() to show that it might yield now (rarely). Same for txn_commit_async(): rename it to txn_commit_try_async() since it uses journal_write_try_async(). --- src/box/applier.cc | 12 ++---------- src/box/box.cc | 4 ++-- src/box/journal.h | 7 ++----- src/box/txn.c | 4 ++-- src/box/txn.h | 3 ++- src/box/txn_limbo.h | 2 +- 6 files changed, 11 insertions(+), 21 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index dff43795c..5a88a013e 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -868,7 +868,7 @@ apply_synchro_row(struct xrow_header *row) if (entry == NULL) goto err; - if (journal_write_async(&entry->journal_entry) != 0) { + if (journal_write_try_async(&entry->journal_entry) != 0) { diag_set(ClientError, ER_WAL_IO); goto err; } @@ -967,14 +967,6 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) goto success; } - /* - * Do not spam WAL with excess write requests, let it process what's - * piled up first. - * This is done before opening the transaction to avoid problems with - * yielding inside it. - */ - journal_queue_wait(); - /** * Explicitly begin the transaction so that we can * control fiber->gc life cycle and, in case of apply @@ -1048,7 +1040,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL); txn_on_wal_write(txn, on_wal_write); - if (txn_commit_async(txn) < 0) + if (txn_commit_try_async(txn) < 0) goto fail; success: diff --git a/src/box/box.cc b/src/box/box.cc index 9a3b092d0..a8aa2663f 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -246,12 +246,12 @@ box_process_rw(struct request *request, struct space *space, * synchronous tx it meets until confirm timeout * is reached and the tx is rolled back, yielding * an error. - * Moreover, txn_commit_async() doesn't hurt at + * Moreover, txn_commit_try_async() doesn't hurt at * all during local recovery, since journal_write * is faked at this stage and returns immediately. */ if (is_local_recovery) { - res = txn_commit_async(txn); + res = txn_commit_try_async(txn); } else { res = txn_commit(txn); } diff --git a/src/box/journal.h b/src/box/journal.h index 5f0e0accd..3a945fa53 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -253,12 +253,9 @@ journal_write(struct journal_entry *entry) * @return 0 if write was queued to a backend or -1 in case of an error. */ static inline int -journal_write_async(struct journal_entry *entry) +journal_write_try_async(struct journal_entry *entry) { - /* - * It's the job of the caller to check whether the queue is full prior - * to submitting the request. - */ + journal_queue_wait(); journal_queue_on_append(entry); return current_journal->write_async(current_journal, entry); diff --git a/src/box/txn.c b/src/box/txn.c index 71c89ce5f..40061ff09 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -789,7 +789,7 @@ txn_limbo_on_rollback(struct trigger *trig, void *event) } int -txn_commit_async(struct txn *txn) +txn_commit_try_async(struct txn *txn) { struct journal_entry *req; @@ -859,7 +859,7 @@ txn_commit_async(struct txn *txn) } fiber_set_txn(fiber(), NULL); - if (journal_write_async(req) != 0) { + if (journal_write_try_async(req) != 0) { fiber_set_txn(fiber(), txn); diag_set(ClientError, ER_WAL_IO); diag_log(); diff --git a/src/box/txn.h b/src/box/txn.h index 29fe6d5ce..a45518064 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -473,12 +473,13 @@ txn_rollback(struct txn *txn); * journal write completion. Note, the journal write may still fail. * To track transaction status, one is supposed to use on_commit and * on_rollback triggers. + * Note, this may yield occasionally, once journal queue gets full. * * On failure -1 is returned and the transaction is rolled back and * freed. */ int -txn_commit_async(struct txn *txn); +txn_commit_try_async(struct txn *txn); /** * Most txns don't have triggers, and txn objects diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index c28b5666d..af0addf8d 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -229,7 +229,7 @@ txn_limbo_assign_local_lsn(struct txn_limbo *limbo, * remote transactions. The function exists to be used in a * context, where a transaction is not known whether it is local * or not. For example, when a transaction is committed not bound - * to any fiber (txn_commit_async()), it can be created by applier + * to any fiber (txn_commit_try_async()), it can be created by applier * (then it is remote) or by recovery (then it is local). Besides, * recovery can commit remote transactions as well, when works on * a replica - it will recover data received from master. -- 2.24.3 (Apple Git-128) =================================== > > ==================== > --- a/src/box/journal.c > +++ b/src/box/journal.c > @@ -69,8 +69,11 @@ void > journal_queue_wakeup(void) > { > struct rlist *list = &journal_queue.waiters; > - if (!rlist_empty(list) && !journal_queue_is_full()) > + if (!rlist_empty(list) && !journal_queue_is_full()) { > fiber_wakeup(rlist_first_entry(list, struct fiber, state)); > + --journal_queue.waiter_count; > + assert(journal_queue.waiter_count >= 0); > + } > } > > void > @@ -84,7 +87,6 @@ journal_queue_wait(void) > * Will be waken up by either queue emptying or a synchronous write. > */ > fiber_yield(); > - --journal_queue.waiter_count; > journal_queue_wakeup(); > } > > @@ -96,5 +98,6 @@ journal_queue_flush(void) > struct rlist *list = &journal_queue.waiters; > while (!rlist_empty(list)) > fiber_wakeup(rlist_first_entry(list, struct fiber, state)); > + journal_queue.waiter_count = 0; > journal_queue_wait(); > } > diff --git a/src/box/journal.h b/src/box/journal.h > index 5f0e0accd..ea56043e2 100644 > --- a/src/box/journal.h > +++ b/src/box/journal.h > @@ -260,6 +260,7 @@ journal_write_async(struct journal_entry *entry) > * to submitting the request. > */ > journal_queue_on_append(entry); > + assert(journal_queue.waiter_count == 0); > > return current_journal->write_async(current_journal, entry); > } -- Serge Petrenko ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-02 17:51 ` Serge Petrenko via Tarantool-patches @ 2021-03-03 20:59 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-09 15:10 ` Serge Petrenko via Tarantool-patches 0 siblings, 1 reply; 30+ messages in thread From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-03 20:59 UTC (permalink / raw) To: Serge Petrenko, gorcunov; +Cc: tarantool-patches >>>>> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry) >>>>> static inline int >>>>> journal_write_async(struct journal_entry *entry) >>>>> { >>>>> + /* >>>>> + * It's the job of the caller to check whether the queue is full prior >>>>> + * to submitting the request. >>>>> + */ >>>>> + assert(!journal_queue_is_full() || journal_queue.is_ready); >>>>> + journal_queue_on_append(entry); >>>> 8. Probably must assert that waiters list is empty. Otherwise you >>>> could go out of order. >>> It's not empty by the time the first entry gets to 'journal_write_async'. >>> Everyone else is waken up, but not yet removed from the queue. >>> >>> Looks like we cannot determine whether a write is called after waiting in queue >>> or not. >> It bothers me a lot, the rest looks good. We don't have any protection against >> a reorder, even not an assertion. How about this? (I didn't test): > > Thanks for thinking this through! > Yes, indeed, this is a problem. Unfortunately, your solution will allow reordering. > We needed the waiter count to prevent anyone from sliding in front when the qeueue > is already waken, but the fibers are not yet scheduled. > > We also discussed that a reorder may happen if applier tries to apply > a vinyl tx, which may yield after it has waited in queue. > In this case, if some tx is committed (synchronously) and added to the queue > while the vinyl tx sleeps, the vinyl tx will bypass that synchronous tx in queue. > > So, there shouldn't be any yields between journal_queue_wait() and > journal_write(_async)(). > > We cannot yield in memtx transactions, so the only solution is to put > journal_queue_wait() after txn_prepare() inside txn_commit_async(). > > Take a look at this patch (on top of the branch): > I decided to move journal_queue_wait() to journal_write_async() to be > consistent with journal_write(), which also checks queue. > > Now reordering is not a problem since waiting in queue is done right before > the write request is submitted. No yields in between. > > Also, I renamed journal_write_async() to journal_write_try_async() and > txn_commit_async() to txn_commit_try_async() to indicate that they may > yield occasionally now. > > What do you think of this approach? Looks good. > P.S. there's another thing I wanted to discuss: should we set defaults for > wal_queue_max_size(len) to some finite value? If yes, then what value should > it be? wal_queue_max_size=default memtx_memory (256 Mb)? > Don't know which value to choose for wal_queue_max_len at all. Probably we could try something sanely big. For instance, max_len = 1000000 in assumption that you might replicate a million rows per second, and a WAL write is likely to end faster than a second. But if you still have a queue bigger than million, then it looks wrong. On the other hand, should give some bit of protection against OOM assuming the transactions aren't big. For max_size we could be a bit more brave than 256MB. Perhaps 5GB? 10GB? Just to have some sane limit up to which it is still imaginable that the instance really has that much memory. Better than infinity. Don't know. Also can leave unlimited, or ask Mons for good defaults since he has experience of configuring Tarantool in prod. See 2 comments below. > diff --git a/test/replication/gh-5536-wal-limit.result b/test/replication/gh-5536-wal-limit.result > new file mode 100644 > index 000000000..f7799baa8 > --- /dev/null > +++ b/test/replication/gh-5536-wal-limit.result > @@ -0,0 +1,132 @@ > +-- test-run result file version 2 > +test_run = require('test_run').new() > + | --- > + | ... > +fiber = require('fiber') > + | --- > + | ... > + > +-- > +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so > +-- that appliers stop reading new transactions from master once the queue is > +-- full. > +-- > +box.schema.user.grant('guest', 'replication') > + | --- > + | ... > +_ = box.schema.space.create('test') > + | --- > + | ... > +_ = box.space.test:create_index('pk') > + | --- > + | ... > + > +replication_timeout = box.cfg.replication_timeout 1. Need to save the WAL limit in case it won't be defaulted to 0 in future. To properly restore it in the end of the test. > + | --- > + | ... > +box.cfg{replication_timeout=1000} > + | --- > + | ...= 2. Would be also good to have at least one test for WAL row count limit, not only for the byte size. ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-03 20:59 ` Vladislav Shpilevoy via Tarantool-patches @ 2021-03-09 15:10 ` Serge Petrenko via Tarantool-patches 0 siblings, 0 replies; 30+ messages in thread From: Serge Petrenko via Tarantool-patches @ 2021-03-09 15:10 UTC (permalink / raw) To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches 03.03.2021 23:59, Vladislav Shpilevoy пишет: >>>>>> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry) >>>>>> static inline int >>>>>> journal_write_async(struct journal_entry *entry) >>>>>> { >>>>>> + /* >>>>>> + * It's the job of the caller to check whether the queue is full prior >>>>>> + * to submitting the request. >>>>>> + */ >>>>>> + assert(!journal_queue_is_full() || journal_queue.is_ready); >>>>>> + journal_queue_on_append(entry); >>>>> 8. Probably must assert that waiters list is empty. Otherwise you >>>>> could go out of order. >>>> It's not empty by the time the first entry gets to 'journal_write_async'. >>>> Everyone else is waken up, but not yet removed from the queue. >>>> >>>> Looks like we cannot determine whether a write is called after waiting in queue >>>> or not. >>> It bothers me a lot, the rest looks good. We don't have any protection against >>> a reorder, even not an assertion. How about this? (I didn't test): >> Thanks for thinking this through! >> Yes, indeed, this is a problem. Unfortunately, your solution will allow reordering. >> We needed the waiter count to prevent anyone from sliding in front when the qeueue >> is already waken, but the fibers are not yet scheduled. >> >> We also discussed that a reorder may happen if applier tries to apply >> a vinyl tx, which may yield after it has waited in queue. >> In this case, if some tx is committed (synchronously) and added to the queue >> while the vinyl tx sleeps, the vinyl tx will bypass that synchronous tx in queue. >> >> So, there shouldn't be any yields between journal_queue_wait() and >> journal_write(_async)(). >> >> We cannot yield in memtx transactions, so the only solution is to put >> journal_queue_wait() after txn_prepare() inside txn_commit_async(). >> >> Take a look at this patch (on top of the branch): >> I decided to move journal_queue_wait() to journal_write_async() to be >> consistent with journal_write(), which also checks queue. >> >> Now reordering is not a problem since waiting in queue is done right before >> the write request is submitted. No yields in between. >> >> Also, I renamed journal_write_async() to journal_write_try_async() and >> txn_commit_async() to txn_commit_try_async() to indicate that they may >> yield occasionally now. >> >> What do you think of this approach? > Looks good. Ok, squashed. > >> P.S. there's another thing I wanted to discuss: should we set defaults for >> wal_queue_max_size(len) to some finite value? If yes, then what value should >> it be? wal_queue_max_size=default memtx_memory (256 Mb)? >> Don't know which value to choose for wal_queue_max_len at all. > Probably we could try something sanely big. For instance, max_len = 1000000 > in assumption that you might replicate a million rows per second, and a WAL > write is likely to end faster than a second. But if you still have a queue > bigger than million, then it looks wrong. On the other hand, should give some > bit of protection against OOM assuming the transactions aren't big. > > For max_size we could be a bit more brave than 256MB. Perhaps 5GB? 10GB? Just > to have some sane limit up to which it is still imaginable that the instance > really has that much memory. Better than infinity. > > Don't know. Also can leave unlimited, or ask Mons for good defaults since he > has experience of configuring Tarantool in prod. Asked Mons. He suggests 10-100k transactions and 10-50 Mb buffer. Let's settle with 100k transactions and 100Mb buffer, just cause I think bigger is better. > > See 2 comments below. > >> diff --git a/test/replication/gh-5536-wal-limit.result b/test/replication/gh-5536-wal-limit.result >> new file mode 100644 >> index 000000000..f7799baa8 >> --- /dev/null >> +++ b/test/replication/gh-5536-wal-limit.result >> @@ -0,0 +1,132 @@ >> +-- test-run result file version 2 >> +test_run = require('test_run').new() >> + | --- >> + | ... >> +fiber = require('fiber') >> + | --- >> + | ... >> + >> +-- >> +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so >> +-- that appliers stop reading new transactions from master once the queue is >> +-- full. >> +-- >> +box.schema.user.grant('guest', 'replication') >> + | --- >> + | ... >> +_ = box.schema.space.create('test') >> + | --- >> + | ... >> +_ = box.space.test:create_index('pk') >> + | --- >> + | ... >> + >> +replication_timeout = box.cfg.replication_timeout > 1. Need to save the WAL limit in case it won't be defaulted > to 0 in future. To properly restore it in the end of the test. Actually, no. WAL limits are only set on replica. Which's destroyed in the end of the test anyway. > >> + | --- >> + | ... >> +box.cfg{replication_timeout=1000} >> + | --- >> + | ...= > 2. Would be also good to have at least one test for WAL row count > limit, not only for the byte size. Sure. Here's the incremental diff (I've also added the changelog entry): =============================== diff --git a/changelogs/unreleased/wal-queue-limit.md b/changelogs/unreleased/wal-queue-limit.md new file mode 100644 index 000000000..393932456 --- /dev/null +++ b/changelogs/unreleased/wal-queue-limit.md @@ -0,0 +1,9 @@ +## feature/core + +* Introduce the concept of WAL queue and 2 new configuration options: + `wal_queue_max_len`, measured in transactions, with 100k default and + `wal_queue_max_size`, measured in bytes, with 100 Mb default. + The options help limit the pace at which replica submits new transactions + to WAL: the limits are checked every time a transaction from master is + submitted to replica's WAL, and the space taken by a transaction is + considered empty once it's successfully written (gh-5536). diff --git a/src/box/journal.c b/src/box/journal.c index 92a773684..4319f8b33 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -35,9 +35,9 @@ struct journal *current_journal = NULL; struct journal_queue journal_queue = { - .max_size = INT64_MAX, + .max_size = 100 * 1024 * 1024, /* 100 megabytes */ .size = 0, - .max_len = INT64_MAX, + .max_len = 100000, /* 100k journal entries */ .len = 0, .waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters), .waiter_count = 0, diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index c11a9e103..c16ebd290 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -71,8 +71,8 @@ local default_cfg = { wal_mode = "write", wal_max_size = 256 * 1024 * 1024, wal_dir_rescan_delay= 2, - wal_queue_max_size = 0, - wal_queue_max_len = 0, + wal_queue_max_size = 100 * 1024 * 1024, + wal_queue_max_len = 100000, force_recovery = false, replication = nil, instance_uuid = nil, diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result index 7a224e50e..2964f21bb 100644 --- a/test/app-tap/init_script.result +++ b/test/app-tap/init_script.result @@ -56,8 +56,8 @@ wal_dir:. wal_dir_rescan_delay:2 wal_max_size:268435456 wal_mode:write -wal_queue_max_len:0 -wal_queue_max_size:0 +wal_queue_max_len:100000 +wal_queue_max_size:104857600 worker_pool_threads:4 -- -- Test insert from detached fiber diff --git a/test/box/admin.result b/test/box/admin.result index c818f4f9f..91b193d1e 100644 --- a/test/box/admin.result +++ b/test/box/admin.result @@ -134,9 +134,9 @@ cfg_filter(box.cfg) - - wal_mode - write - - wal_queue_max_len - - 0 + - 100000 - - wal_queue_max_size - - 0 + - 104857600 - - worker_pool_threads - 4 ... diff --git a/test/box/cfg.result b/test/box/cfg.result index 19f322e7d..3adfbd9df 100644 --- a/test/box/cfg.result +++ b/test/box/cfg.result @@ -122,9 +122,9 @@ cfg_filter(box.cfg) | - - wal_mode | - write | - - wal_queue_max_len - | - 0 + | - 100000 | - - wal_queue_max_size - | - 0 + | - 104857600 | - - worker_pool_threads | - 4 | ... @@ -241,9 +241,9 @@ cfg_filter(box.cfg) | - - wal_mode | - write | - - wal_queue_max_len - | - 0 + | - 100000 | - - wal_queue_max_size - | - 0 + | - 104857600 | - - worker_pool_threads | - 4 | ... diff --git a/test/replication/gh-5536-wal-limit.result b/test/replication/gh-5536-wal-limit.result index f7799baa8..2f6e7644b 100644 --- a/test/replication/gh-5536-wal-limit.result +++ b/test/replication/gh-5536-wal-limit.result @@ -42,9 +42,19 @@ test_run:switch('replica') | --- | - true | ... + +queue_cfg_opt = test_run:get_cfg('cfg_option') + | --- + | ... -- Huge replication timeout to not cause reconnects while applier is blocked. --- Tiny queue size (in bytes) to allow exactly one queue entry at a time. -box.cfg{wal_queue_max_size=1, replication_timeout=1000} +cfg_tbl = {replication_timeout=1000} + | --- + | ... +-- Tiny queue size to allow exactly one queue entry at a time. +cfg_tbl[queue_cfg_opt] = 1 + | --- + | ... +box.cfg(cfg_tbl) | --- | ... write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") diff --git a/test/replication/gh-5536-wal-limit.test.lua b/test/replication/gh-5536-wal-limit.test.lua index 1e7d61ff7..c32fbb08f 100644 --- a/test/replication/gh-5536-wal-limit.test.lua +++ b/test/replication/gh-5536-wal-limit.test.lua @@ -18,9 +18,13 @@ test_run:cmd('create server replica with rpl_master=default,\ test_run:cmd('start server replica with wait=True, wait_load=True') test_run:switch('replica') + +queue_cfg_opt = test_run:get_cfg('cfg_option') -- Huge replication timeout to not cause reconnects while applier is blocked. --- Tiny queue size (in bytes) to allow exactly one queue entry at a time. -box.cfg{wal_queue_max_size=1, replication_timeout=1000} +cfg_tbl = {replication_timeout=1000} +-- Tiny queue size to allow exactly one queue entry at a time. +cfg_tbl[queue_cfg_opt] = 1 +box.cfg(cfg_tbl) write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") -- Block WAL writes so that we may test queue overflow. box.error.injection.set("ERRINJ_WAL_DELAY", true) diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index 7e7004592..e7f660e47 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -37,7 +37,10 @@ "gh-4928-tx-boundaries.test.lua": {}, "gh-5440-qsync-ro.test.lua": {}, "gh-5435-qsync-clear-synchro-queue-commit-all.test.lua": {}, - "gh-5536-wal-limit.test.lua": {}, + "gh-5536-wal-limit.test.lua": { + "wal_queue_max_len": {"cfg_option": "wal_queue_max_len"}, + "wal_queue_max_size": {"cfg_option": "wal_queue_max_size"} + }, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} -- Serge Petrenko ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-02-24 19:35 [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes Serge Petrenko via Tarantool-patches ` (2 preceding siblings ...) 2021-02-26 0:56 ` Vladislav Shpilevoy via Tarantool-patches @ 2021-03-09 19:49 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-10 8:18 ` Konstantin Osipov via Tarantool-patches 2021-03-19 15:36 ` Kirill Yukhin via Tarantool-patches 4 siblings, 1 reply; 30+ messages in thread From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-09 19:49 UTC (permalink / raw) To: Serge Petrenko, gorcunov; +Cc: tarantool-patches Hi! Thanks for the patch! LGTM. > https://github.com/tarantool/tarantool/issues/5536 > https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-09 19:49 ` Vladislav Shpilevoy via Tarantool-patches @ 2021-03-10 8:18 ` Konstantin Osipov via Tarantool-patches 2021-03-12 17:10 ` Serge Petrenko via Tarantool-patches 0 siblings, 1 reply; 30+ messages in thread From: Konstantin Osipov via Tarantool-patches @ 2021-03-10 8:18 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy via Tarantool-patches <tarantool-patches@dev.tarantool.org> [21/03/09 23:44]: > Hi! Thanks for the patch! > > LGTM. > > > https://github.com/tarantool/tarantool/issues/5536 > > https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom I don't understand why you need two parameters, size and len. What difference does it make how many requests are in the queue? The queue is IO and memory bound, not CPU bound, at least not more cpu bound than any other subsystem. Pushing the problem to the user by forcing them to choose not one, but two configuration values is poor design. How am I supposed to know what is the right value for the option? I guess don't know it either - the patch was not followed by a benchmark which would prove the selected defaults are optimal. Tarantool has an incoming throttling limit which makes the whole system work like a clock: it's the fiber pool size and net msg size. They were carefully selected after days and weeks of testing. -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-10 8:18 ` Konstantin Osipov via Tarantool-patches @ 2021-03-12 17:10 ` Serge Petrenko via Tarantool-patches 2021-03-13 19:14 ` Konstantin Osipov via Tarantool-patches 2021-03-15 23:42 ` Vladislav Shpilevoy via Tarantool-patches 0 siblings, 2 replies; 30+ messages in thread From: Serge Petrenko via Tarantool-patches @ 2021-03-12 17:10 UTC (permalink / raw) To: Konstantin Osipov, Vladislav Shpilevoy, gorcunov, tarantool-patches 10.03.2021 11:18, Konstantin Osipov пишет: > * Vladislav Shpilevoy via Tarantool-patches <tarantool-patches@dev.tarantool.org> [21/03/09 23:44]: >> Hi! Thanks for the patch! >> >> LGTM. >> >>> https://github.com/tarantool/tarantool/issues/5536 >>> https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom > I don't understand why you need two parameters, size and len. What > difference does it make how many requests are in the queue? The > queue is IO and memory bound, not CPU bound, at least not more cpu > bound than any other subsystem. > > Pushing the problem to the user by forcing them to choose not one, > but two configuration values is poor design. How am I supposed to > know what is the right value for the option? After some consideration, let's only leave size. I've force pushed the changes on the branch. I've also implemented some kind of a fiber semaphore in a separate commit on top. I'm not sure I like how it turned out, so feel free to throw it away. Or keep it. > > I guess don't know it either - the patch was not followed by > a benchmark which would prove the selected defaults are optimal. > > Tarantool has an incoming throttling limit which makes the whole > system work like a clock: it's the fiber pool size and net msg > size. They were carefully selected after days and weeks of > testing. > -- Serge Petrenko ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-12 17:10 ` Serge Petrenko via Tarantool-patches @ 2021-03-13 19:14 ` Konstantin Osipov via Tarantool-patches 2021-03-15 23:42 ` Vladislav Shpilevoy via Tarantool-patches 1 sibling, 0 replies; 30+ messages in thread From: Konstantin Osipov via Tarantool-patches @ 2021-03-13 19:14 UTC (permalink / raw) To: Serge Petrenko; +Cc: tarantool-patches, Vladislav Shpilevoy * Serge Petrenko <sergepetrenko@tarantool.org> [21/03/12 20:14]: > > * Vladislav Shpilevoy via Tarantool-patches <tarantool-patches@dev.tarantool.org> [21/03/09 23:44]: > > > Hi! Thanks for the patch! > > > > > > LGTM. > > > > > > > https://github.com/tarantool/tarantool/issues/5536 > > > > https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom > > I don't understand why you need two parameters, size and len. What > > difference does it make how many requests are in the queue? The > > queue is IO and memory bound, not CPU bound, at least not more cpu > > bound than any other subsystem. > > > > Pushing the problem to the user by forcing them to choose not one, > > but two configuration values is poor design. How am I supposed to > > know what is the right value for the option? > > After some consideration, let's only leave size. > I've force pushed the changes on the branch. > > I've also implemented some kind of a fiber semaphore in a separate commit on > top. > I'm not sure I like how it turned out, so feel free to throw it away. Or > keep it. As long as you can benchmark it showing that it is has a reasonable default (it should be pretty straightforward to benchmark - send a huge xlog over 10Gb link to tarantool running on a reasonably fresh SSD and find out optimal limit size) I will be pretty happy with dropping the size. -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-12 17:10 ` Serge Petrenko via Tarantool-patches 2021-03-13 19:14 ` Konstantin Osipov via Tarantool-patches @ 2021-03-15 23:42 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-16 6:45 ` Konstantin Osipov via Tarantool-patches 2021-03-16 10:19 ` Serge Petrenko via Tarantool-patches 1 sibling, 2 replies; 30+ messages in thread From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-15 23:42 UTC (permalink / raw) To: Serge Petrenko, Konstantin Osipov, gorcunov, tarantool-patches Hi! Thanks for the patch! I must admit, it looks kind of ugly :D. The class we have now only remotely looks like a semaphore. Number of reasons, some of which you already listed in the header file: - It is advisory. Can be bypassed easily if you forget to check wouldblock. But not a big issue really. An optional thing like 'try_take' is needed for box.commit({is_async = true}) anyway, not to block the fiber; - You can take more amount of the resource than there is. Bearable as well, but still; - sem_release() does not wakeup anybody. Quite counter-intuitive; - The wouldblock check not only checks the resource being available, but also if there are any waiters. It wouldn't matter for a real semaphore, because it has nothing to do with ordering the waiters in FIFO. It is a detail of the journal which slipped into the general class. But maybe that is the only way to make it fair? Otherwise some fibers could be blocked forever due to starvation. The last thing I am not sure is even an issue. Might be a feature. The others probably can be fixed if we would rework journal_queue API. For instance, not have journal_queue_wait() separated from journal_queue_on_append(). Then sem_take() could become blocking and obligatory. You simply inline everything into journal_write() and journal_write_try_async(), and you will see that you can always call take() and block inside of it. But I don't know if it is worth doing TBH. It is used in a single place so far. This is hard to define fiber_sem API which would be suitable for future usages. I would vote for not doing it now and see if we would need the semaphore in the future. Although the idea about removing journal_queue_wait() might be worth trying. It is used either right before journal_queue_on_append(), or in journal_queue_flush() which is also right before journal_queue_on_append(). Up to you. Anyway we need to return to this code for box.commit({is_async}) feature, which means the hard polishing might be not so useful. ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-15 23:42 ` Vladislav Shpilevoy via Tarantool-patches @ 2021-03-16 6:45 ` Konstantin Osipov via Tarantool-patches 2021-03-16 20:27 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-16 10:19 ` Serge Petrenko via Tarantool-patches 1 sibling, 1 reply; 30+ messages in thread From: Konstantin Osipov via Tarantool-patches @ 2021-03-16 6:45 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [21/03/16 09:16]: BTW, one option is to change semantics of "async" to be "best effort async". That is, if the queue is full, we don't drop the transaction, we turn it into a waiting one. Thoughts? > Hi! Thanks for the patch! > > I must admit, it looks kind of ugly :D. The class we have now only remotely > looks like a semaphore. > > Number of reasons, some of which you already listed in the header file: > > - It is advisory. Can be bypassed easily if you forget to check wouldblock. > But not a big issue really. An optional thing like 'try_take' is needed > for box.commit({is_async = true}) anyway, not to block the fiber; > > - You can take more amount of the resource than there is. Bearable as well, > but still; > > - sem_release() does not wakeup anybody. Quite counter-intuitive; > > - The wouldblock check not only checks the resource being available, but also > if there are any waiters. It wouldn't matter for a real semaphore, because > it has nothing to do with ordering the waiters in FIFO. It is a detail of > the journal which slipped into the general class. > But maybe that is the only way to make it fair? Otherwise some fibers > could be blocked forever due to starvation. > > The last thing I am not sure is even an issue. Might be a feature. > > The others probably can be fixed if we would rework journal_queue API. For > instance, not have journal_queue_wait() separated from journal_queue_on_append(). > Then sem_take() could become blocking and obligatory. > > You simply inline everything into journal_write() and journal_write_try_async(), > and you will see that you can always call take() and block inside of it. > > But I don't know if it is worth doing TBH. It is used in a single place so far. > This is hard to define fiber_sem API which would be suitable for future usages. > I would vote for not doing it now and see if we would need the semaphore in the > future. > > Although the idea about removing journal_queue_wait() might be worth trying. > It is used either right before journal_queue_on_append(), or in > journal_queue_flush() which is also right before journal_queue_on_append(). > Up to you. Anyway we need to return to this code for box.commit({is_async}) > feature, which means the hard polishing might be not so useful. -- Konstantin Osipov, Moscow, Russia https://scylladb.com ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-16 6:45 ` Konstantin Osipov via Tarantool-patches @ 2021-03-16 20:27 ` Vladislav Shpilevoy via Tarantool-patches 0 siblings, 0 replies; 30+ messages in thread From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-16 20:27 UTC (permalink / raw) To: Konstantin Osipov, Serge Petrenko, gorcunov, tarantool-patches > BTW, one option is to change semantics of "async" to be "best > effort async". That is, if the queue is full, we don't drop the > transaction, we turn it into a waiting one. > > Thoughts? If you are talking about internal API (txn_commit_async), Sergey did it in this patch. It is now called txn_commit_try_async and it blocks the fiber if the queue is full. If you are talking about is_async option for box.commit(), then yeah, this is a good idea. One of them. It is not decided yet what options and with which names we will expose in box.commit(). No any design so far. You can left comments here https://github.com/tarantool/tarantool/issues/67 with your proposals so they are not lost. ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-15 23:42 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-16 6:45 ` Konstantin Osipov via Tarantool-patches @ 2021-03-16 10:19 ` Serge Petrenko via Tarantool-patches 2021-03-16 20:48 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-17 21:02 ` Vladislav Shpilevoy via Tarantool-patches 1 sibling, 2 replies; 30+ messages in thread From: Serge Petrenko via Tarantool-patches @ 2021-03-16 10:19 UTC (permalink / raw) To: Vladislav Shpilevoy, Konstantin Osipov, gorcunov, tarantool-patches 16.03.2021 02:42, Vladislav Shpilevoy пишет: > Hi! Thanks for the patch! Thanks for your answer! > > I must admit, it looks kind of ugly :D. The class we have now only remotely > looks like a semaphore. > > Number of reasons, some of which you already listed in the header file: > > - It is advisory. Can be bypassed easily if you forget to check wouldblock. > But not a big issue really. An optional thing like 'try_take' is needed > for box.commit({is_async = true}) anyway, not to block the fiber; > > - You can take more amount of the resource than there is. Bearable as well, > but still; > > - sem_release() does not wakeup anybody. Quite counter-intuitive; > > - The wouldblock check not only checks the resource being available, but also > if there are any waiters. It wouldn't matter for a real semaphore, because > it has nothing to do with ordering the waiters in FIFO. It is a detail of > the journal which slipped into the general class. > But maybe that is the only way to make it fair? Otherwise some fibers > could be blocked forever due to starvation. > > The last thing I am not sure is even an issue. Might be a feature. > > The others probably can be fixed if we would rework journal_queue API. For > instance, not have journal_queue_wait() separated from journal_queue_on_append(). > Then sem_take() could become blocking and obligatory. > > You simply inline everything into journal_write() and journal_write_try_async(), > and you will see that you can always call take() and block inside of it. > > But I don't know if it is worth doing TBH. It is used in a single place so far. > This is hard to define fiber_sem API which would be suitable for future usages. > I would vote for not doing it now and see if we would need the semaphore in the > future. > > Although the idea about removing journal_queue_wait() might be worth trying. > It is used either right before journal_queue_on_append(), or in > journal_queue_flush() which is also right before journal_queue_on_append(). > Up to you. Anyway we need to return to this code for box.commit({is_async}) > feature, which means the hard polishing might be not so useful. Let's drop the semaphore thing then. I removed the commit introducing it and pushed the patch on a new branch: sp/gh-5536-replica-oom-no-sem -- Serge Petrenko ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-16 10:19 ` Serge Petrenko via Tarantool-patches @ 2021-03-16 20:48 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-17 12:14 ` Serge Petrenko via Tarantool-patches 2021-03-17 21:02 ` Vladislav Shpilevoy via Tarantool-patches 1 sibling, 1 reply; 30+ messages in thread From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-16 20:48 UTC (permalink / raw) To: Serge Petrenko, Konstantin Osipov, gorcunov, tarantool-patches Hi! Thanks for the fixes! See 3 comments below. > Here's when the option comes in handy: > Imagine such a situation: there are 2 servers, a master and a replica, > and the replica is down for some period of time. While the replica is > down, the master serves requests at a reasonable pace, possibly close to > its WAL throughput limit. Once the replica reconnects, it has to receive > all the data master has piled up. Now there's no limit in speed at which > master sends the data to replica, and there's no limit at which > replica's applier submits corresponding write requests to WAL. This > leads to a situation when replica's WAL is never in time to serve the > requests and the amount of pending requests is constantly growing. > There's no limit for memory WAL write requests take, and this clogging > of WAL write queue may even lead to replica using up all the available > memory. > > Now, when `wal_queue_max_size` is set, appliers will stop reading new > transactions once the limit is reached. This will let WAL process all the > requests that have piled up and free all the excess memory. > > [tosquash] remove wal_queue_max_len 1. You forgot something, the last line. Also, while we are here, it probably would be easier for the doc team if the old behaviour was described using a past tense, while the new one using the present tense. Currently you use 'now' word both for the old and for the new behaviour. For instance, you say Now there's no limit in speed at which master sends the data to replica, and there's no limit at which replica's applier submits corresponding write requests to WAL But >now< there is a limit. Even if 'wal_queue_max_size' is not set, it works with the default value. > diff --git a/changelogs/unreleased/wal-queue-limit.md b/changelogs/unreleased/wal-queue-limit.md > new file mode 100644 > index 000000000..393932456 > --- /dev/null > +++ b/changelogs/unreleased/wal-queue-limit.md > @@ -0,0 +1,9 @@ > +## feature/core > + > +* Introduce the concept of WAL queue and 2 new configuration options: > + `wal_queue_max_len`, measured in transactions, with 100k default and > + `wal_queue_max_size`, measured in bytes, with 100 Mb default. 2. There is 1 option now, not 2. > + The options help limit the pace at which replica submits new transactions > + to WAL: the limits are checked every time a transaction from master is > + submitted to replica's WAL, and the space taken by a transaction is > + considered empty once it's successfully written (gh-5536).> diff --git a/src/box/journal.h b/src/box/journal.h > index 5d8d5a726..437257728 100644 > --- a/src/box/journal.h > +++ b/src/box/journal.h > @@ -124,6 +142,62 @@ struct journal { > + > +/** Set maximal journal queue size in bytes. */ > +static inline void > +journal_queue_set_max_size(int64_t size) > +{ > + journal_queue.max_size = size; > + journal_queue_wakeup(); > +} > + > +/** Increase queue size on a new write request. */ > +static inline void > +journal_queue_on_append(struct journal_entry *entry) 3. Since you will amend the patch anyway, you could also make the entry 'const', the same in journal_queue_on_complete(). > +{ > + journal_queue.size += entry->approx_len; > +} > + > +/** Decrease queue size once write request is complete. */ > +static inline void > +journal_queue_on_complete(struct journal_entry *entry) > +{ > + journal_queue.size -= entry->approx_len; > + assert(journal_queue.size >= 0); > +} ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-16 20:48 ` Vladislav Shpilevoy via Tarantool-patches @ 2021-03-17 12:14 ` Serge Petrenko via Tarantool-patches 0 siblings, 0 replies; 30+ messages in thread From: Serge Petrenko via Tarantool-patches @ 2021-03-17 12:14 UTC (permalink / raw) To: Vladislav Shpilevoy, Konstantin Osipov, gorcunov, tarantool-patches 16.03.2021 23:48, Vladislav Shpilevoy пишет: > Hi! Thanks for the fixes! Thanks for the review! > See 3 comments below. > >> Here's when the option comes in handy: >> Imagine such a situation: there are 2 servers, a master and a replica, >> and the replica is down for some period of time. While the replica is >> down, the master serves requests at a reasonable pace, possibly close to >> its WAL throughput limit. Once the replica reconnects, it has to receive >> all the data master has piled up. Now there's no limit in speed at which >> master sends the data to replica, and there's no limit at which >> replica's applier submits corresponding write requests to WAL. This >> leads to a situation when replica's WAL is never in time to serve the >> requests and the amount of pending requests is constantly growing. >> There's no limit for memory WAL write requests take, and this clogging >> of WAL write queue may even lead to replica using up all the available >> memory. >> >> Now, when `wal_queue_max_size` is set, appliers will stop reading new >> transactions once the limit is reached. This will let WAL process all the >> requests that have piled up and free all the excess memory. >> >> [tosquash] remove wal_queue_max_len > 1. You forgot something, the last line. Also, while we are here, it probably > would be easier for the doc team if the old behaviour was described using a > past tense, while the new one using the present tense. Currently you use > 'now' word both for the old and for the new behaviour. For instance, you say > > Now there's no limit in speed at which master sends the data to replica, > and there's no limit at which replica's applier submits corresponding > write requests to WAL > > But >now< there is a limit. Even if 'wal_queue_max_size' is not set, it works > with the default value. Thanks! Take a look at the revised paragraph: ===================================================== Here's when the option comes in handy: Before this option was introduced such a situation could be possible: there are 2 servers, a master and a replica, and the replica is down for some period of time. While the replica is down, master serves requests at a reasonable pace, possibly close to its WAL throughput limit. Once the replica reconnects, it has to receive all the data master has piled up and there's no limit in speed at which master sends the data to replica, and, without the option, there was no limit in speed at which replica submitted corresponding write requests to WAL. This lead to a situation when replica's WAL was never in time to serve the requests and the amount of pending requests was constantly growing. There was no limit for memory WAL write requests take, and this clogging of WAL write queue could even lead to replica using up all the available memory. Now, when `wal_queue_max_size` is set, appliers will stop reading new transactions once the limit is reached. This will let WAL process all the requests that have piled up and free all the excess memory. ===================================================== >> diff --git a/changelogs/unreleased/wal-queue-limit.md b/changelogs/unreleased/wal-queue-limit.md >> new file mode 100644 >> index 000000000..393932456 >> --- /dev/null >> +++ b/changelogs/unreleased/wal-queue-limit.md >> @@ -0,0 +1,9 @@ >> +## feature/core >> + >> +* Introduce the concept of WAL queue and 2 new configuration options: >> + `wal_queue_max_len`, measured in transactions, with 100k default and >> + `wal_queue_max_size`, measured in bytes, with 100 Mb default. > 2. There is 1 option now, not 2. Sorry for the inattention, fixed. > >> + The options help limit the pace at which replica submits new transactions >> + to WAL: the limits are checked every time a transaction from master is >> + submitted to replica's WAL, and the space taken by a transaction is >> + considered empty once it's successfully written (gh-5536).> diff --git a/src/box/journal.h b/src/box/journal.h >> index 5d8d5a726..437257728 100644 >> --- a/src/box/journal.h >> +++ b/src/box/journal.h >> @@ -124,6 +142,62 @@ struct journal { >> + >> +/** Set maximal journal queue size in bytes. */ >> +static inline void >> +journal_queue_set_max_size(int64_t size) >> +{ >> + journal_queue.max_size = size; >> + journal_queue_wakeup(); >> +} >> + >> +/** Increase queue size on a new write request. */ >> +static inline void >> +journal_queue_on_append(struct journal_entry *entry) > 3. Since you will amend the patch anyway, you could also > make the entry 'const', the same in journal_queue_on_complete(). Sure. The diff's below. > >> +{ >> + journal_queue.size += entry->approx_len; >> +} >> + >> +/** Decrease queue size once write request is complete. */ >> +static inline void >> +journal_queue_on_complete(struct journal_entry *entry) >> +{ >> + journal_queue.size -= entry->approx_len; >> + assert(journal_queue.size >= 0); >> +} =====================================================diff --git a/changelogs/unreleased/wal-queue-limit.md b/changelogs/unreleased/wal-queue-limit.md index 393932456..1708e46e6 100644 --- a/changelogs/unreleased/wal-queue-limit.md +++ b/changelogs/unreleased/wal-queue-limit.md @@ -1,9 +1,8 @@ ## feature/core -* Introduce the concept of WAL queue and 2 new configuration options: - `wal_queue_max_len`, measured in transactions, with 100k default and +* Introduce the concept of WAL queue and a new configuration option: `wal_queue_max_size`, measured in bytes, with 100 Mb default. - The options help limit the pace at which replica submits new transactions - to WAL: the limits are checked every time a transaction from master is + The option helps limit the pace at which replica submits new transactions + to WAL: the limit is checked every time a transaction from master is submitted to replica's WAL, and the space taken by a transaction is considered empty once it's successfully written (gh-5536). diff --git a/src/box/journal.h b/src/box/journal.h index 437257728..76c70c19f 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -185,14 +185,14 @@ journal_queue_set_max_size(int64_t size) /** Increase queue size on a new write request. */ static inline void -journal_queue_on_append(struct journal_entry *entry) +journal_queue_on_append(const struct journal_entry *entry) { journal_queue.size += entry->approx_len; } /** Decrease queue size once write request is complete. */ static inline void -journal_queue_on_complete(struct journal_entry *entry) +journal_queue_on_complete(const struct journal_entry *entry) { journal_queue.size -= entry->approx_len; assert(journal_queue.size >= 0); diff --git a/test/box-tap/cfg.test.lua b/test/box-tap/cfg.test.lua index 3276ddf64..8f21c5628 100755 --- a/test/box-tap/cfg.test.lua +++ b/test/box-tap/cfg.test.lua @@ -6,7 +6,7 @@ local socket = require('socket') local fio = require('fio') local uuid = require('uuid') local msgpack = require('msgpack') -test:plan(110) +test:plan(109) -------------------------------------------------------------------------------- -- Invalid values @@ -50,7 +50,6 @@ invalid('vinyl_run_size_ratio', 1) invalid('vinyl_bloom_fpr', 0) invalid('vinyl_bloom_fpr', 1.1) invalid('wal_queue_max_size', -1) -invalid('wal_queue_max_len', -1) local function invalid_combinations(name, val) local status, result = pcall(box.cfg, val) -- Serge Petrenko ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-16 10:19 ` Serge Petrenko via Tarantool-patches 2021-03-16 20:48 ` Vladislav Shpilevoy via Tarantool-patches @ 2021-03-17 21:02 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-19 11:32 ` Serge Petrenko via Tarantool-patches 1 sibling, 1 reply; 30+ messages in thread From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-17 21:02 UTC (permalink / raw) To: Serge Petrenko, Konstantin Osipov, gorcunov, tarantool-patches Hi! Thanks for the patch! LGTM > sp/gh-5536-replica-oom-no-sem ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-03-17 21:02 ` Vladislav Shpilevoy via Tarantool-patches @ 2021-03-19 11:32 ` Serge Petrenko via Tarantool-patches 0 siblings, 0 replies; 30+ messages in thread From: Serge Petrenko via Tarantool-patches @ 2021-03-19 11:32 UTC (permalink / raw) To: Vladislav Shpilevoy, Konstantin Osipov, gorcunov, tarantool-patches 18.03.2021 00:02, Vladislav Shpilevoy пишет: > Hi! Thanks for the patch! > > LGTM > >> sp/gh-5536-replica-oom-no-sem Hi! Updated the queue size to 16 Mb, as discussed with Mons and Vlad. No other changes. Branch: sp/gh-5536-replica-oom-no-sem -- Serge Petrenko ^ permalink raw reply [flat|nested] 30+ messages in thread
* Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes 2021-02-24 19:35 [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes Serge Petrenko via Tarantool-patches ` (3 preceding siblings ...) 2021-03-09 19:49 ` Vladislav Shpilevoy via Tarantool-patches @ 2021-03-19 15:36 ` Kirill Yukhin via Tarantool-patches 4 siblings, 0 replies; 30+ messages in thread From: Kirill Yukhin via Tarantool-patches @ 2021-03-19 15:36 UTC (permalink / raw) To: Serge Petrenko; +Cc: tarantool-patches, v.shpilevoy Hello, On 24 фев 22:35, Serge Petrenko via Tarantool-patches wrote: > Since the introduction of asynchronous commit, which doesn't wait for a > WAL write to succeed, it's quite easy to clog WAL with huge amounts > write requests. For now, it's only possible from an applier, since it's > the only user of async commit at the moment. > > This happens when replica is syncing with master and reads new > transactions at a pace higher than it can write them to WAL (see docbot > request for detailed explanation). > > To ameliorate such behavior, we need to introduce some limit on > not-yet-finished WAL write requests. This is what this commit is trying > to do. > Two new counters are added to wal writer: queue_size (in bytes) and > queue_len (in wal messages) together with configuration settings: > `wal_queue_max_size` and `wal_queue_max_len`. > Size and length are increased on every new submitted request, and are > decreased once the tx receives a confirmation that a specific request > was written. > Actually, the limits are added to an abstract journal, but take effect > only for wal writer. > > Once size or length reach their maximum values, applier is blocked until > some of the write requests are finished. > > The size limit isn't strict, i.e. if there's at least one free byte, the > whole write request fits and no blocking is involved. > > The feature is ready for `box.commit{is_async=true}`. Once it's > implemented, it should check whether the queue is full and let the user > decide what to do next. Either wait or roll the tx back. > > Part of #5536 I've checked your patch into 2.6, 2.7 and master. -- Regards, Kirill Yukhin ^ permalink raw reply [flat|nested] 30+ messages in thread
end of thread, other threads:[~2021-03-19 15:36 UTC | newest] Thread overview: 30+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2021-02-24 19:35 [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes Serge Petrenko via Tarantool-patches 2021-02-24 19:40 ` Serge Petrenko via Tarantool-patches 2021-02-25 13:05 ` Konstantin Osipov via Tarantool-patches 2021-02-26 0:57 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-26 7:18 ` Konstantin Osipov via Tarantool-patches 2021-02-26 20:23 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-26 21:20 ` Konstantin Osipov via Tarantool-patches 2021-02-26 22:44 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-27 13:27 ` Konstantin Osipov via Tarantool-patches 2021-03-01 19:15 ` Serge Petrenko via Tarantool-patches 2021-03-01 21:46 ` Konstantin Osipov via Tarantool-patches 2021-02-26 0:56 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-01 19:08 ` Serge Petrenko via Tarantool-patches 2021-03-01 22:05 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-02 17:51 ` Serge Petrenko via Tarantool-patches 2021-03-03 20:59 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-09 15:10 ` Serge Petrenko via Tarantool-patches 2021-03-09 19:49 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-10 8:18 ` Konstantin Osipov via Tarantool-patches 2021-03-12 17:10 ` Serge Petrenko via Tarantool-patches 2021-03-13 19:14 ` Konstantin Osipov via Tarantool-patches 2021-03-15 23:42 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-16 6:45 ` Konstantin Osipov via Tarantool-patches 2021-03-16 20:27 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-16 10:19 ` Serge Petrenko via Tarantool-patches 2021-03-16 20:48 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-17 12:14 ` Serge Petrenko via Tarantool-patches 2021-03-17 21:02 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-19 11:32 ` Serge Petrenko via Tarantool-patches 2021-03-19 15:36 ` Kirill Yukhin via Tarantool-patches
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox