Since the introduction of asynchronous commit, which doesn't wait for a WAL write to succeed, it's quite easy to clog WAL with huge amounts write requests. For now, it's only possible from an applier, since it's the only user of async commit at the moment. This happens when replica is syncing with master and reads new transactions at a pace higher than it can write them to WAL (see docbot request for detailed explanation). To ameliorate such behavior, we need to introduce some limit on not-yet-finished WAL write requests. This is what this commit is trying to do. Two new counters are added to wal writer: queue_size (in bytes) and queue_len (in wal messages) together with configuration settings: `wal_queue_max_size` and `wal_queue_max_len`. Size and length are increased on every new submitted request, and are decreased once the tx receives a confirmation that a specific request was written. Actually, the limits are added to an abstract journal, but take effect only for wal writer. Once size or length reach their maximum values, applier is blocked until some of the write requests are finished. The size limit isn't strict, i.e. if there's at least one free byte, the whole write request fits and no blocking is involved. The feature is ready for `box.commit{is_async=true}`. Once it's implemented, it should check whether the queue is full and let the user decide what to do next. Either wait or roll the tx back. Part of #5536 @TarantoolBot document Title: new configuration options: 'wal_queue_max_size', 'wal_queue_max_len' `wal_queue_max_size` and `wal_queue_max_len` put a limit on the amount of concurrent write requests submitted to WAL. `wal_queue_max_size` is measured in number of bytes to be written (0 means unlimited), and `wal_queue_max_len` is measured in number of transactions, 0 meaning unlimited. These options only affect replica behaviour at the moment, and default to 0. They limit the pace at which replica reads new transactions from master. Here's when these options come in handy: Imagine such a situation: there are 2 servers, a master and a replica, and the replica is down for some period of time. While the replica is down, the master serves requests at a reasonable pace, possibly close to its WAL throughput limit. Once the replica reconnects, it has to receive all the data master has piled up. Now there's no limit in speed at which master sends the data to replica, and there's no limit at which replica's applier submits corresponding write requests to WAL. This leads to a situation when replica's WAL is never in time to serve the requests and the amount of pending requests is constantly growing. There's no limit for memory WAL write requests take, and this clogging of WAL write queue may even lead to replica using up all the available memory. Now, when either `wal_queue_max_size` or `wal_queue_max_len` or both are set, appliers will stop reading new transactions once the limit is reached. This will let WAL process all the requests that have piled up and free all the excess memory. --- https://github.com/tarantool/tarantool/issues/5536 https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom Changes in v2: - Move queue logic to journal. src/box/applier.cc | 9 ++ src/box/box.cc | 46 +++++++ src/box/box.h | 2 + src/box/journal.c | 53 ++++++++ src/box/journal.h | 103 ++++++++++++++- src/box/lua/cfg.cc | 18 +++ src/box/lua/load_cfg.lua | 6 + src/box/wal.c | 18 +++ src/box/wal.h | 14 +++ test/app-tap/init_script.result | 2 + test/box-tap/cfg.test.lua | 4 +- test/box/admin.result | 4 + test/box/cfg.result | 8 ++ test/replication/gh-5536-wal-limit.result | 132 ++++++++++++++++++++ test/replication/gh-5536-wal-limit.test.lua | 58 +++++++++ test/replication/suite.cfg | 1 + test/replication/suite.ini | 2 +- 17 files changed, 472 insertions(+), 8 deletions(-) create mode 100644 test/replication/gh-5536-wal-limit.result create mode 100644 test/replication/gh-5536-wal-limit.test.lua diff --git a/src/box/applier.cc b/src/box/applier.cc index 553db76fc..06aaa0a79 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) goto success; } + /* + * Do not spam WAL with excess write requests, let it process what's + * piled up first. + * This is done before opening the transaction to avoid problems with + * yielding inside it. + */ + if (journal_queue_is_full(current_journal)) + journal_wait_queue(); + /** * Explicitly begin the transaction so that we can * control fiber->gc life cycle and, in case of apply diff --git a/src/box/box.cc b/src/box/box.cc index 26cbe8aab..2b335599e 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -754,6 +754,28 @@ box_check_wal_mode(const char *mode_name) return (enum wal_mode) mode; } +static int64_t +box_check_wal_queue_max_len(void) +{ + int64_t len = cfg_geti64("wal_queue_max_len"); + if (len < 0) { + diag_set(ClientError, ER_CFG, "wal_queue_max_len", + "wal_queue_max_len must be >= 0"); + } + return len; +} + +static int64_t +box_check_wal_queue_max_size(void) +{ + int64_t size = cfg_geti64("wal_queue_max_size"); + if (size < 0) { + diag_set(ClientError, ER_CFG, "wal_queue_max_size", + "wal_queue_max_size must be >= 0"); + } + return size; +} + static void box_check_readahead(int readahead) { @@ -875,6 +897,10 @@ box_check_config(void) box_check_checkpoint_count(cfg_geti("checkpoint_count")); box_check_wal_max_size(cfg_geti64("wal_max_size")); box_check_wal_mode(cfg_gets("wal_mode")); + if (box_check_wal_queue_max_size() < 0) + diag_raise(); + if (box_check_wal_queue_max_len() < 0) + diag_raise(); if (box_check_memory_quota("memtx_memory") < 0) diag_raise(); box_check_memtx_min_tuple_size(cfg_geti64("memtx_min_tuple_size")); @@ -1411,6 +1437,26 @@ box_set_checkpoint_wal_threshold(void) wal_set_checkpoint_threshold(threshold); } +int +box_set_wal_queue_max_size(void) +{ + int64_t size = box_check_wal_queue_max_size(); + if (size < 0) + return -1; + wal_set_queue_max_size(size); + return 0; +} + +int +box_set_wal_queue_max_len(void) +{ + int64_t len = box_check_wal_queue_max_len(); + if (len < 0) + return -1; + wal_set_queue_max_len(len); + return 0; +} + void box_set_vinyl_memory(void) { diff --git a/src/box/box.h b/src/box/box.h index b68047a95..4f5b4b617 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -239,6 +239,8 @@ void box_set_readahead(void); void box_set_checkpoint_count(void); void box_set_checkpoint_interval(void); void box_set_checkpoint_wal_threshold(void); +int box_set_wal_queue_max_size(void); +int box_set_wal_queue_max_len(void); void box_set_memtx_memory(void); void box_set_memtx_max_tuple_size(void); void box_set_vinyl_memory(void); diff --git a/src/box/journal.c b/src/box/journal.c index cb320b557..19a184580 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -55,3 +55,56 @@ journal_entry_new(size_t n_rows, struct region *region, complete_data); return entry; } + +struct journal_queue_entry { + /** The fiber waiting for queue space to free. */ + struct fiber *fiber; + /** Whether the fiber should be waken up regardless of queue size. */ + bool is_ready; + /** A link in all waiting fibers list. */ + struct rlist in_queue; +}; + +/** + * Wake up the next waiter in journal queue. + */ +void +journal_queue_wakeup_next(struct rlist *link, bool force_ready) +{ + /* Empty queue or last entry in queue. */ + if (link == rlist_last(¤t_journal->waiters)) { + current_journal->queue_is_woken = false; + return; + } + /* + * When the queue isn't forcefully emptied, no need to wake everyone + * else up until there's some free space. + */ + if (journal_queue_is_full(current_journal) && !force_ready) { + current_journal->queue_is_woken = false; + return; + } + struct journal_queue_entry *e = rlist_entry(rlist_next(link), typeof(*e), + in_queue); + e->is_ready |= force_ready; + fiber_wakeup(e->fiber); +} + +void +journal_wait_queue(void) +{ + struct journal_queue_entry entry = { + .fiber = fiber(), + .is_ready = false, + }; + rlist_add_tail_entry(¤t_journal->waiters, &entry, in_queue); + /* + * Will be waken up by either queue emptying or a synchronous write. + */ + while (journal_queue_is_full(current_journal) && !entry.is_ready) + fiber_yield(); + + journal_queue_wakeup_next(&entry.in_queue, entry.is_ready); + assert(&entry.in_queue == rlist_first(¤t_journal->waiters)); + rlist_del(&entry.in_queue); +} diff --git a/src/box/journal.h b/src/box/journal.h index 5d8d5a726..9c8af062a 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -115,6 +115,25 @@ journal_entry_new(size_t n_rows, struct region *region, * synchronous replication. */ struct journal { + /** Maximal size of entries enqueued in journal (in bytes). */ + int64_t queue_max_size; + /** Current approximate size of journal queue. */ + int64_t queue_size; + /** Maximal allowed length of journal queue, in entries. */ + int64_t queue_max_len; + /** Current journal queue length. */ + int64_t queue_len; + /** + * The fibers waiting for some space to free in journal queue. + * Once some space is freed they will be waken up in the same order they + * entered the queue. + */ + struct rlist waiters; + /** + * Whether the queue is being woken or not. Used to avoid multiple + * concurrent wake-ups. + */ + bool queue_is_woken; /** Asynchronous write */ int (*write_async)(struct journal *journal, struct journal_entry *entry); @@ -124,6 +143,55 @@ struct journal { struct journal_entry *entry); }; +/** + * Depending on the step of recovery and instance configuration + * points at a concrete implementation of the journal. + */ +extern struct journal *current_journal; + +void +journal_queue_wakeup_next(struct rlist *link, bool force_ready); + +/** Wake the journal queue up. */ +static inline void +journal_queue_wakeup(struct journal *j, bool force_ready) +{ + assert(j == current_journal); + assert(!rlist_empty(&j->waiters)); + if (j->queue_is_woken) + return; + j->queue_is_woken = true; + journal_queue_wakeup_next(&j->waiters, force_ready); +} + +/** + * Check whether any of the queue size limits is reached. + * If the queue is full, we must wait for some of the entries to be written + * before proceeding with a new asynchronous write request. + */ +static inline bool +journal_queue_is_full(struct journal *j) +{ + assert(j == current_journal); + return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) || + (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len); +} + +/** + * Check whether anyone is waiting for the journal queue to empty. If there are + * other waiters we must go after them to preserve write order. + */ +static inline bool +journal_queue_has_waiters(struct journal *j) +{ + assert(j == current_journal); + return !rlist_empty(&j->waiters); +} + +/** Yield until there's some space in the journal queue. */ +void +journal_wait_queue(void); + /** * Complete asynchronous write. */ @@ -131,15 +199,15 @@ static inline void journal_async_complete(struct journal_entry *entry) { assert(entry->write_async_cb != NULL); + current_journal->queue_len--; + current_journal->queue_size -= entry->approx_len; + assert(current_journal->queue_len >= 0); + assert(current_journal->queue_size >= 0); + if (journal_queue_has_waiters(current_journal)) + journal_queue_wakeup(current_journal, false); entry->write_async_cb(entry); } -/** - * Depending on the step of recovery and instance configuration - * points at a concrete implementation of the journal. - */ -extern struct journal *current_journal; - /** * Write a single entry to the journal in synchronous way. * @@ -148,6 +216,17 @@ extern struct journal *current_journal; static inline int journal_write(struct journal_entry *entry) { + if (journal_queue_has_waiters(current_journal)) { + /* + * It's a synchronous write, so it's fine to wait a bit more for + * everyone else to be written. They'll wake us up back + * afterwards. + */ + journal_queue_wakeup(current_journal, true); + journal_wait_queue(); + } + current_journal->queue_size += entry->approx_len; + current_journal->queue_len += 1; return current_journal->write(current_journal, entry); } @@ -159,6 +238,12 @@ journal_write(struct journal_entry *entry) static inline int journal_write_async(struct journal_entry *entry) { + /* + * It's the job of the caller to check whether the queue is full prior + * to submitting the request. + */ + current_journal->queue_size += entry->approx_len; + current_journal->queue_len += 1; return current_journal->write_async(current_journal, entry); } @@ -198,6 +283,12 @@ journal_create(struct journal *journal, { journal->write_async = write_async; journal->write = write; + journal->queue_size = 0; + journal->queue_max_size = 0; + journal->queue_len = 0; + journal->queue_max_len = 0; + journal->queue_is_woken = false; + rlist_create(&journal->waiters); } static inline bool diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc index 2d3ccbf0e..35f410710 100644 --- a/src/box/lua/cfg.cc +++ b/src/box/lua/cfg.cc @@ -164,6 +164,22 @@ lbox_cfg_set_checkpoint_wal_threshold(struct lua_State *L) return 0; } +static int +lbox_cfg_set_wal_queue_max_size(struct lua_State *L) +{ + if (box_set_wal_queue_max_size() != 0) + luaT_error(L); + return 0; +} + +static int +lbox_cfg_set_wal_queue_max_len(struct lua_State *L) +{ + if (box_set_wal_queue_max_len() != 0) + luaT_error(L); + return 0; +} + static int lbox_cfg_set_read_only(struct lua_State *L) { @@ -399,6 +415,8 @@ box_lua_cfg_init(struct lua_State *L) {"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count}, {"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval}, {"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold}, + {"cfg_set_wal_queue_max_size", lbox_cfg_set_wal_queue_max_size}, + {"cfg_set_wal_queue_max_len", lbox_cfg_set_wal_queue_max_len}, {"cfg_set_read_only", lbox_cfg_set_read_only}, {"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory}, {"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size}, diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 574c8bef4..c11a9e103 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -71,6 +71,8 @@ local default_cfg = { wal_mode = "write", wal_max_size = 256 * 1024 * 1024, wal_dir_rescan_delay= 2, + wal_queue_max_size = 0, + wal_queue_max_len = 0, force_recovery = false, replication = nil, instance_uuid = nil, @@ -163,6 +165,8 @@ local template_cfg = { coredump = 'boolean', checkpoint_interval = 'number', checkpoint_wal_threshold = 'number', + wal_queue_max_size = 'number', + wal_queue_max_len = 'number', checkpoint_count = 'number', read_only = 'boolean', hot_standby = 'boolean', @@ -277,6 +281,8 @@ local dynamic_cfg = { checkpoint_count = private.cfg_set_checkpoint_count, checkpoint_interval = private.cfg_set_checkpoint_interval, checkpoint_wal_threshold = private.cfg_set_checkpoint_wal_threshold, + wal_queue_max_size = private.cfg_set_wal_queue_max_size, + wal_queue_max_len = private.cfg_set_wal_queue_max_len, worker_pool_threads = private.cfg_set_worker_pool_threads, feedback_enabled = ifdef_feedback_set_params, feedback_crashinfo = ifdef_feedback_set_params, diff --git a/src/box/wal.c b/src/box/wal.c index 937d47ba9..9fff4220a 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -765,6 +765,24 @@ wal_set_checkpoint_threshold(int64_t threshold) fiber_set_cancellable(cancellable); } +void +wal_set_queue_max_size(int64_t size) +{ + struct journal *base = &wal_writer_singleton.base; + base->queue_max_size = size; + if (journal_queue_has_waiters(base) && !journal_queue_is_full(base)) + journal_queue_wakeup(base, false); +} + +void +wal_set_queue_max_len(int64_t len) +{ + struct journal *base = &wal_writer_singleton.base; + base->queue_max_len = len; + if (journal_queue_has_waiters(base) && !journal_queue_is_full(base)) + journal_queue_wakeup(base, false); +} + struct wal_gc_msg { struct cbus_call_msg base; diff --git a/src/box/wal.h b/src/box/wal.h index ca43dc6eb..1db32f66f 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -237,6 +237,20 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint); void wal_set_checkpoint_threshold(int64_t threshold); +/** + * Set the pending write limit in bytes. Once the limit is reached, new + * writes are blocked until some previous writes succeed. + */ +void +wal_set_queue_max_size(int64_t size); + +/** + * Set the pending write limit in journal entries. Once the limit is reached, + * new writes are blocked until some previous writes succeeed. + */ +void +wal_set_queue_max_len(int64_t len); + /** * Remove WAL files that are not needed by consumers reading * rows at @vclock or newer. diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result index 16c5b01d2..7a224e50e 100644 --- a/test/app-tap/init_script.result +++ b/test/app-tap/init_script.result @@ -56,6 +56,8 @@ wal_dir:. wal_dir_rescan_delay:2 wal_max_size:268435456 wal_mode:write +wal_queue_max_len:0 +wal_queue_max_size:0 worker_pool_threads:4 -- -- Test insert from detached fiber diff --git a/test/box-tap/cfg.test.lua b/test/box-tap/cfg.test.lua index a577f023d..3276ddf64 100755 --- a/test/box-tap/cfg.test.lua +++ b/test/box-tap/cfg.test.lua @@ -6,7 +6,7 @@ local socket = require('socket') local fio = require('fio') local uuid = require('uuid') local msgpack = require('msgpack') -test:plan(108) +test:plan(110) -------------------------------------------------------------------------------- -- Invalid values @@ -49,6 +49,8 @@ invalid('vinyl_run_count_per_level', 0) invalid('vinyl_run_size_ratio', 1) invalid('vinyl_bloom_fpr', 0) invalid('vinyl_bloom_fpr', 1.1) +invalid('wal_queue_max_size', -1) +invalid('wal_queue_max_len', -1) local function invalid_combinations(name, val) local status, result = pcall(box.cfg, val) diff --git a/test/box/admin.result b/test/box/admin.result index 05debe673..c818f4f9f 100644 --- a/test/box/admin.result +++ b/test/box/admin.result @@ -133,6 +133,10 @@ cfg_filter(box.cfg) - 268435456 - - wal_mode - write + - - wal_queue_max_len + - 0 + - - wal_queue_max_size + - 0 - - worker_pool_threads - 4 ... diff --git a/test/box/cfg.result b/test/box/cfg.result index 22a720c2c..19f322e7d 100644 --- a/test/box/cfg.result +++ b/test/box/cfg.result @@ -121,6 +121,10 @@ cfg_filter(box.cfg) | - 268435456 | - - wal_mode | - write + | - - wal_queue_max_len + | - 0 + | - - wal_queue_max_size + | - 0 | - - worker_pool_threads | - 4 | ... @@ -236,6 +240,10 @@ cfg_filter(box.cfg) | - 268435456 | - - wal_mode | - write + | - - wal_queue_max_len + | - 0 + | - - wal_queue_max_size + | - 0 | - - worker_pool_threads | - 4 | ... diff --git a/test/replication/gh-5536-wal-limit.result b/test/replication/gh-5536-wal-limit.result new file mode 100644 index 000000000..f7799baa8 --- /dev/null +++ b/test/replication/gh-5536-wal-limit.result @@ -0,0 +1,132 @@ +-- test-run result file version 2 +test_run = require('test_run').new() + | --- + | ... +fiber = require('fiber') + | --- + | ... + +-- +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so +-- that appliers stop reading new transactions from master once the queue is +-- full. +-- +box.schema.user.grant('guest', 'replication') + | --- + | ... +_ = box.schema.space.create('test') + | --- + | ... +_ = box.space.test:create_index('pk') + | --- + | ... + +replication_timeout = box.cfg.replication_timeout + | --- + | ... +box.cfg{replication_timeout=1000} + | --- + | ... + +test_run:cmd('create server replica with rpl_master=default,\ + script="replication/replica.lua"') + | --- + | - true + | ... +test_run:cmd('start server replica with wait=True, wait_load=True') + | --- + | - true + | ... + +test_run:switch('replica') + | --- + | - true + | ... +-- Huge replication timeout to not cause reconnects while applier is blocked. +-- Tiny queue size (in bytes) to allow exactly one queue entry at a time. +box.cfg{wal_queue_max_size=1, replication_timeout=1000} + | --- + | ... +write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") + | --- + | ... +-- Block WAL writes so that we may test queue overflow. +box.error.injection.set("ERRINJ_WAL_DELAY", true) + | --- + | - ok + | ... + +test_run:switch('default') + | --- + | - true + | ... + +for i = 1,10 do box.space.test:insert{i} end + | --- + | ... + +test_run:switch('replica') + | --- + | - true + | ... +-- Wait for replication. Cannot rely on lsn bump here. It won't happen while +-- WAL is blocked. +test_run:wait_cond(function()\ + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\ +end) + | --- + | - true + | ... +require('fiber').sleep(0.5) + | --- + | ... +-- Only one entry fits when the limit is small. +assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1) + | --- + | - true + | ... +box.error.injection.set("ERRINJ_WAL_DELAY", false) + | --- + | - ok + | ... + +-- Once the block is removed everything is written. +test_run:wait_cond(function()\ + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\ +end) + | --- + | - true + | ... +assert(box.space.test:count() == 10) + | --- + | - true + | ... +assert(box.info.replication[1].upstream.status == 'follow') + | --- + | - true + | ... + +test_run:switch('default') + | --- + | - true + | ... + +-- Cleanup. +box.cfg{replication_timeout=replication_timeout} + | --- + | ... +test_run:cmd('stop server replica') + | --- + | - true + | ... +test_run:cmd('delete server replica') + | --- + | - true + | ... +box.space.test:drop() + | --- + | ... +box.schema.user.revoke('guest', 'replication') + | --- + | ... + diff --git a/test/replication/gh-5536-wal-limit.test.lua b/test/replication/gh-5536-wal-limit.test.lua new file mode 100644 index 000000000..1e7d61ff7 --- /dev/null +++ b/test/replication/gh-5536-wal-limit.test.lua @@ -0,0 +1,58 @@ +test_run = require('test_run').new() +fiber = require('fiber') + +-- +-- gh-5536: out of memory on a joining replica. Introduce a WAL queue limit so +-- that appliers stop reading new transactions from master once the queue is +-- full. +-- +box.schema.user.grant('guest', 'replication') +_ = box.schema.space.create('test') +_ = box.space.test:create_index('pk') + +replication_timeout = box.cfg.replication_timeout +box.cfg{replication_timeout=1000} + +test_run:cmd('create server replica with rpl_master=default,\ + script="replication/replica.lua"') +test_run:cmd('start server replica with wait=True, wait_load=True') + +test_run:switch('replica') +-- Huge replication timeout to not cause reconnects while applier is blocked. +-- Tiny queue size (in bytes) to allow exactly one queue entry at a time. +box.cfg{wal_queue_max_size=1, replication_timeout=1000} +write_cnt = box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") +-- Block WAL writes so that we may test queue overflow. +box.error.injection.set("ERRINJ_WAL_DELAY", true) + +test_run:switch('default') + +for i = 1,10 do box.space.test:insert{i} end + +test_run:switch('replica') +-- Wait for replication. Cannot rely on lsn bump here. It won't happen while +-- WAL is blocked. +test_run:wait_cond(function()\ + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") > write_cnt\ +end) +require('fiber').sleep(0.5) +-- Only one entry fits when the limit is small. +assert(box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 1) +box.error.injection.set("ERRINJ_WAL_DELAY", false) + +-- Once the block is removed everything is written. +test_run:wait_cond(function()\ + return box.error.injection.get("ERRINJ_WAL_WRITE_COUNT") == write_cnt + 10\ +end) +assert(box.space.test:count() == 10) +assert(box.info.replication[1].upstream.status == 'follow') + +test_run:switch('default') + +-- Cleanup. +box.cfg{replication_timeout=replication_timeout} +test_run:cmd('stop server replica') +test_run:cmd('delete server replica') +box.space.test:drop() +box.schema.user.revoke('guest', 'replication') + diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index c80430afc..7e7004592 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -37,6 +37,7 @@ "gh-4928-tx-boundaries.test.lua": {}, "gh-5440-qsync-ro.test.lua": {}, "gh-5435-qsync-clear-synchro-queue-commit-all.test.lua": {}, + "gh-5536-wal-limit.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} diff --git a/test/replication/suite.ini b/test/replication/suite.ini index e4812df37..89abfabff 100644 --- a/test/replication/suite.ini +++ b/test/replication/suite.ini @@ -3,7 +3,7 @@ core = tarantool script = master.lua description = tarantool/box, replication disabled = consistent.test.lua -release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua +release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua config = suite.cfg lua_libs = lua/fast_replica.lua lua/rlimit.lua use_unix_sockets = True -- 2.24.3 (Apple Git-128)
On Thu, Feb 11, 2021 at 03:17:50PM +0300, Serge Petrenko wrote: > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 553db76fc..06aaa0a79 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) > goto success; > } > > + /* > + * Do not spam WAL with excess write requests, let it process what's > + * piled up first. > + * This is done before opening the transaction to avoid problems with > + * yielding inside it. > + */ > + if (journal_queue_is_full(current_journal)) > + journal_wait_queue(); > + Serge, just a few comments, feel free to ignore. Maybe it would be better to pass current_journal to journal_wait_queue, otherwise it looks somehow inconsistent, no? if (journal_queue_is_full(current_journal)) journal_wait_queue(current_journal); Actually I would name journal queue engine as plain journalq, this would look like if (journalq_is_full(current_journal)) journalq_wait(current_journal); But it is very humble pov, lets stick with long `journal_queue`. ... > +/** Wake the journal queue up. */ > +static inline void > +journal_queue_wakeup(struct journal *j, bool force_ready) > +{ > + assert(j == current_journal); Seems this assert is not needed. The overall idea of passing journal as an argument is quite the reverse, ie to work with any journal. This is not a blocker, could be cleaned up on top or simply ignored. > + assert(!rlist_empty(&j->waiters)); > + if (j->queue_is_woken) > + return; > + j->queue_is_woken = true; > + journal_queue_wakeup_next(&j->waiters, force_ready); > +} > + > +/** > + * Check whether any of the queue size limits is reached. > + * If the queue is full, we must wait for some of the entries to be written > + * before proceeding with a new asynchronous write request. > + */ > +static inline bool > +journal_queue_is_full(struct journal *j) > +{ > + assert(j == current_journal); same, no need for assert() > + return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) || > + (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len); > +} > + > +/** > + * Check whether anyone is waiting for the journal queue to empty. If there are > + * other waiters we must go after them to preserve write order. > + */ > +static inline bool > +journal_queue_has_waiters(struct journal *j) > +{ > + assert(j == current_journal); same, no need for assert() > + return !rlist_empty(&j->waiters); > +} > + > +/** Yield until there's some space in the journal queue. */ > +void > +journal_wait_queue(void); > + > /** > * Complete asynchronous write. > */ > @@ -131,15 +199,15 @@ static inline void > journal_async_complete(struct journal_entry *entry) > { > assert(entry->write_async_cb != NULL); > + current_journal->queue_len--; > + current_journal->queue_size -= entry->approx_len; Myabe worth to make queue ops closed into some helper? Because length and size can't be updated without a tangle. IOW, something like static inline void journal_queue_attr_dec(struct journal *j, struct journal_entry *entry) { j->queue_len--; j->queue_size -= entry->approx_len; } static inline void journal_queue_attr_inc(struct journal *j, struct journal_entry *entry) { j->queue_len++; j->queue_size += entry->approx_len; } Again, this is my pov, **free to ignore**. attr here stands for attributes because queue_len and queue_size are not the queue itself but attributes which controls when we need to wait data to be flushed. > + assert(current_journal->queue_len >= 0); > + assert(current_journal->queue_size >= 0); > + if (journal_queue_has_waiters(current_journal)) > + journal_queue_wakeup(current_journal, false); > entry->write_async_cb(entry); > }
15.02.2021 14:17, Cyrill Gorcunov пишет: > On Thu, Feb 11, 2021 at 03:17:50PM +0300, Serge Petrenko wrote: >> diff --git a/src/box/applier.cc b/src/box/applier.cc >> index 553db76fc..06aaa0a79 100644 >> --- a/src/box/applier.cc >> +++ b/src/box/applier.cc >> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) >> goto success; >> } >> >> + /* >> + * Do not spam WAL with excess write requests, let it process what's >> + * piled up first. >> + * This is done before opening the transaction to avoid problems with >> + * yielding inside it. >> + */ >> + if (journal_queue_is_full(current_journal)) >> + journal_wait_queue(); >> + > Serge, just a few comments, feel free to ignore. Hi! Thanks for the review! > > Maybe it would be better to pass current_journal to journal_wait_queue, > otherwise it looks somehow inconsistent, no? > > if (journal_queue_is_full(current_journal)) > journal_wait_queue(current_journal); I tried to remove parameters from almost all methods, as dicsussed in chat. > > Actually I would name journal queue engine as plain journalq, this would > look like > > if (journalq_is_full(current_journal)) > journalq_wait(current_journal); > > But it is very humble pov, lets stick with long `journal_queue`. > ... To be honest, I like `journal_queue_` prefix more. >> +/** Wake the journal queue up. */ >> +static inline void >> +journal_queue_wakeup(struct journal *j, bool force_ready) >> +{ >> + assert(j == current_journal); > Seems this assert is not needed. The overall idea of passing > journal as an argument is quite the reverse, ie to work with > any journal. This is not a blocker, could be cleaned up on top > or simply ignored. Same as above, discussed verbally. > >> + assert(!rlist_empty(&j->waiters)); >> + if (j->queue_is_woken) >> + return; >> + j->queue_is_woken = true; >> + journal_queue_wakeup_next(&j->waiters, force_ready); >> +} >> + >> +/** >> + * Check whether any of the queue size limits is reached. >> + * If the queue is full, we must wait for some of the entries to be written >> + * before proceeding with a new asynchronous write request. >> + */ >> +static inline bool >> +journal_queue_is_full(struct journal *j) >> +{ >> + assert(j == current_journal); > same, no need for assert() > >> + return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) || >> + (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len); >> +} >> + >> +/** >> + * Check whether anyone is waiting for the journal queue to empty. If there are >> + * other waiters we must go after them to preserve write order. >> + */ >> +static inline bool >> +journal_queue_has_waiters(struct journal *j) >> +{ >> + assert(j == current_journal); > same, no need for assert() > >> + return !rlist_empty(&j->waiters); >> +} >> + >> +/** Yield until there's some space in the journal queue. */ >> +void >> +journal_wait_queue(void); >> + >> /** >> * Complete asynchronous write. >> */ >> @@ -131,15 +199,15 @@ static inline void >> journal_async_complete(struct journal_entry *entry) >> { >> assert(entry->write_async_cb != NULL); >> + current_journal->queue_len--; >> + current_journal->queue_size -= entry->approx_len; > Myabe worth to make queue ops closed into some helper? Because > length and size can't be updated without a tangle. IOW, something > like > > static inline void > journal_queue_attr_dec(struct journal *j, struct journal_entry *entry) > { > j->queue_len--; > j->queue_size -= entry->approx_len; > } > > static inline void > journal_queue_attr_inc(struct journal *j, struct journal_entry *entry) > { > j->queue_len++; > j->queue_size += entry->approx_len; > } > > Again, this is my pov, **free to ignore**. attr here stands for > attributes because queue_len and queue_size are not the queue > itself but attributes which controls when we need to wait > data to be flushed. Ok, sure. Introduced `journal_queue_on_append(struct journal_entry *entry)` and `journal_queue_on_complete(struct journal_entry *entry)` > >> + assert(current_journal->queue_len >= 0); >> + assert(current_journal->queue_size >= 0); >> + if (journal_queue_has_waiters(current_journal)) >> + journal_queue_wakeup(current_journal, false); >> entry->write_async_cb(entry); >> } Here's an incremental diff. It's all pure refactoring with no functional changes. I've intdouced `journal_queue_on_append` and `journal_queue_on_complete` for increasing and decreasing queue length and size, and tried to remove `struct journal` parameter from almost every new method, except `journal_queue_set_max_size` and `journal_queue_set_max_len` ==================================================== diff --git a/src/box/applier.cc b/src/box/applier.cc index 06aaa0a79..7c2452d2b 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -973,7 +973,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) * This is done before opening the transaction to avoid problems with * yielding inside it. */ - if (journal_queue_is_full(current_journal)) + if (journal_queue_is_full()) journal_wait_queue(); /** diff --git a/src/box/journal.c b/src/box/journal.c index 19a184580..49441e596 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -68,7 +68,7 @@ struct journal_queue_entry { /** * Wake up the next waiter in journal queue. */ -void +static inline void journal_queue_wakeup_next(struct rlist *link, bool force_ready) { /* Empty queue or last entry in queue. */ @@ -80,16 +80,26 @@ journal_queue_wakeup_next(struct rlist *link, bool force_ready) * When the queue isn't forcefully emptied, no need to wake everyone * else up until there's some free space. */ - if (journal_queue_is_full(current_journal) && !force_ready) { + if (!force_ready && journal_queue_is_full()) { current_journal->queue_is_woken = false; return; } struct journal_queue_entry *e = rlist_entry(rlist_next(link), typeof(*e), in_queue); - e->is_ready |= force_ready; + e->is_ready = force_ready; fiber_wakeup(e->fiber); } +void +journal_queue_wakeup(bool force_ready) +{ + assert(!rlist_empty(¤t_journal->waiters)); + if (current_journal->queue_is_woken) + return; + current_journal->queue_is_woken = true; + journal_queue_wakeup_next(¤t_journal->waiters, force_ready); +} + void journal_wait_queue(void) { @@ -101,7 +111,7 @@ journal_wait_queue(void) /* * Will be waken up by either queue emptying or a synchronous write. */ - while (journal_queue_is_full(current_journal) && !entry.is_ready) + while (journal_queue_is_full() && !entry.is_ready) fiber_yield(); journal_queue_wakeup_next(&entry.in_queue, entry.is_ready); diff --git a/src/box/journal.h b/src/box/journal.h index 9c8af062a..d295dfa4b 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -149,20 +149,9 @@ struct journal { */ extern struct journal *current_journal; -void -journal_queue_wakeup_next(struct rlist *link, bool force_ready); - /** Wake the journal queue up. */ -static inline void -journal_queue_wakeup(struct journal *j, bool force_ready) -{ - assert(j == current_journal); - assert(!rlist_empty(&j->waiters)); - if (j->queue_is_woken) - return; - j->queue_is_woken = true; - journal_queue_wakeup_next(&j->waiters, force_ready); -} +void +journal_queue_wakeup(bool force_ready); /** * Check whether any of the queue size limits is reached. @@ -170,9 +159,9 @@ journal_queue_wakeup(struct journal *j, bool force_ready) * before proceeding with a new asynchronous write request. */ static inline bool -journal_queue_is_full(struct journal *j) +journal_queue_is_full(void) { - assert(j == current_journal); + struct journal *j = current_journal; return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) || (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len); } @@ -182,16 +171,53 @@ journal_queue_is_full(struct journal *j) * other waiters we must go after them to preserve write order. */ static inline bool -journal_queue_has_waiters(struct journal *j) +journal_queue_has_waiters(void) { - assert(j == current_journal); - return !rlist_empty(&j->waiters); + return !rlist_empty(¤t_journal->waiters); } /** Yield until there's some space in the journal queue. */ void journal_wait_queue(void); +/** Set maximal journal queue size in bytes. */ +static inline void +journal_queue_set_max_size(struct journal *j, int64_t size) +{ + assert(j == current_journal); + j->queue_max_size = size; + if (journal_queue_has_waiters() && !journal_queue_is_full()) + journal_queue_wakeup(false); +} + +/** Set maximal journal queue length, in entries. */ +static inline void +journal_queue_set_max_len(struct journal *j, int64_t len) +{ + assert(j == current_journal); + j->queue_max_len = len; + if (journal_queue_has_waiters() && !journal_queue_is_full()) + journal_queue_wakeup(false); +} + +/** Increase queue size on a new write request. */ +static inline void +journal_queue_on_append(struct journal_entry *entry) +{ + current_journal->queue_len++; + current_journal->queue_size += entry->approx_len; +} + +/** Decrease queue size once write request is complete. */ +static inline void +journal_queue_on_complete(struct journal_entry *entry) +{ + current_journal->queue_len--; + current_journal->queue_size -= entry->approx_len; + assert(current_journal->queue_len >= 0); + assert(current_journal->queue_size >= 0); +} + /** * Complete asynchronous write. */ @@ -199,12 +225,11 @@ static inline void journal_async_complete(struct journal_entry *entry) { assert(entry->write_async_cb != NULL); - current_journal->queue_len--; - current_journal->queue_size -= entry->approx_len; - assert(current_journal->queue_len >= 0); - assert(current_journal->queue_size >= 0); - if (journal_queue_has_waiters(current_journal)) - journal_queue_wakeup(current_journal, false); + + journal_queue_on_complete(entry); + if (journal_queue_has_waiters() && !journal_queue_is_full()) + journal_queue_wakeup(false); + entry->write_async_cb(entry); } @@ -216,17 +241,18 @@ journal_async_complete(struct journal_entry *entry) static inline int journal_write(struct journal_entry *entry) { - if (journal_queue_has_waiters(current_journal)) { + if (journal_queue_has_waiters()) { /* * It's a synchronous write, so it's fine to wait a bit more for * everyone else to be written. They'll wake us up back * afterwards. */ - journal_queue_wakeup(current_journal, true); + journal_queue_wakeup(true); journal_wait_queue(); } - current_journal->queue_size += entry->approx_len; - current_journal->queue_len += 1; + + journal_queue_on_append(entry); + return current_journal->write(current_journal, entry); } @@ -242,8 +268,8 @@ journal_write_async(struct journal_entry *entry) * It's the job of the caller to check whether the queue is full prior * to submitting the request. */ - current_journal->queue_size += entry->approx_len; - current_journal->queue_len += 1; + journal_queue_on_append(entry); + return current_journal->write_async(current_journal, entry); } diff --git a/src/box/wal.c b/src/box/wal.c index 9fff4220a..5bc7a0685 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -768,19 +768,13 @@ wal_set_checkpoint_threshold(int64_t threshold) void wal_set_queue_max_size(int64_t size) { - struct journal *base = &wal_writer_singleton.base; - base->queue_max_size = size; - if (journal_queue_has_waiters(base) && !journal_queue_is_full(base)) - journal_queue_wakeup(base, false); + journal_queue_set_max_size(&wal_writer_singleton.base, size); } void wal_set_queue_max_len(int64_t len) { - struct journal *base = &wal_writer_singleton.base; - base->queue_max_len = len; - if (journal_queue_has_waiters(base) && !journal_queue_is_full(base)) - journal_queue_wakeup(base, false); + journal_queue_set_max_len(&wal_writer_singleton.base, len); } struct wal_gc_msg -- Serge Petrenko
On Tue, Feb 16, 2021 at 03:47:09PM +0300, Serge Petrenko wrote:
>
> Here's an incremental diff. It's all pure refactoring with no functional
> changes.
> I've intdouced `journal_queue_on_append` and `journal_queue_on_complete` for
> increasing and
> decreasing queue length and size, and tried to remove `struct journal`
> parameter from almost
> every new method, except `journal_queue_set_max_size` and
> `journal_queue_set_max_len`
Great! Lets give this patch some time to spin around maybe
we gather more comments. Looks OK to me.
Hi! Thanks for the patch! Now looks cool indeed. Another raw idea on which I don't insist and not even sure it is good. But just came to my mind: how about making a separate object called 'journal_queue'? Or 'journal_ctl'? Which is global and is not inside of one journal. It can't be changed to another queue/ctl, and is used by journal API. So we wouldn't need to worry if we configured the correct journal because now current_journal can change at runtime, but this ctl thing - can't. Another option - call this thing 'journal', and rename the old 'journal' to 'journal_storage' or 'journal_api' or 'journal_vtab' or something like this. Another option - ignore this, since it does not matter much. But just in case you would want to try to fit the solution into one of these ideas. See 8 comments below. > diff --git a/src/box/journal.c b/src/box/journal.c > index cb320b557..49441e596 100644 > --- a/src/box/journal.c > +++ b/src/box/journal.c > @@ -55,3 +55,66 @@ journal_entry_new(size_t n_rows, struct region *region, > complete_data); > return entry; > } > + > +struct journal_queue_entry { > + /** The fiber waiting for queue space to free. */ > + struct fiber *fiber; > + /** Whether the fiber should be waken up regardless of queue size. */ > + bool is_ready; > + /** A link in all waiting fibers list. */ > + struct rlist in_queue; > +}; > + > +/** > + * Wake up the next waiter in journal queue. > + */ > +static inline void > +journal_queue_wakeup_next(struct rlist *link, bool force_ready) 1. The flag is known in all usage places at compilation time. Is it possible to split the function into force/normal versions? The same for journal_queue_wakeup() from which this runtime uncertainty arises. Also it is worth adding a comment why is force mode even needed. > +{ > + /* Empty queue or last entry in queue. */ > + if (link == rlist_last(¤t_journal->waiters)) { 2. I am not sure I understand what is happening here. Why is this function in one place called with the pointer at the list itself, and in another place with the pointer at one element? > + current_journal->queue_is_woken = false; > + return; > + } > + /* > + * When the queue isn't forcefully emptied, no need to wake everyone > + * else up until there's some free space. > + */ > + if (!force_ready && journal_queue_is_full()) { > + current_journal->queue_is_woken = false; 3. Maybe woken -> awake? 4. Why do you need the flag? Can you just remove the awake entries from the queue right away? Then it wouldn't even be possible to make a double wakeup. See comment 5. > + return; > + } > + struct journal_queue_entry *e = rlist_entry(rlist_next(link), typeof(*e), > + in_queue); > + e->is_ready = force_ready; > + fiber_wakeup(e->fiber); > +} > + > +void > +journal_queue_wakeup(bool force_ready) > +{ > + assert(!rlist_empty(¤t_journal->waiters)); > + if (current_journal->queue_is_woken) > + return; > + current_journal->queue_is_woken = true; > + journal_queue_wakeup_next(¤t_journal->waiters, force_ready); > +} > + > +void > +journal_wait_queue(void) > +{ > + struct journal_queue_entry entry = { > + .fiber = fiber(), > + .is_ready = false, > + }; > + rlist_add_tail_entry(¤t_journal->waiters, &entry, in_queue); > + /* > + * Will be waken up by either queue emptying or a synchronous write. > + */ > + while (journal_queue_is_full() && !entry.is_ready) > + fiber_yield(); > + > + journal_queue_wakeup_next(&entry.in_queue, entry.is_ready); > + assert(&entry.in_queue == rlist_first(¤t_journal->waiters)); > + rlist_del(&entry.in_queue); 5. Can rlist_del be done along with fiber_wakeup()? Then you wouldn't need is_woken maybe. > +} > diff --git a/src/box/journal.h b/src/box/journal.h > index 5d8d5a726..d295dfa4b 100644 > --- a/src/box/journal.h > +++ b/src/box/journal.h > @@ -124,6 +143,81 @@ struct journal { > struct journal_entry *entry); > }; > > +/** > + * Depending on the step of recovery and instance configuration > + * points at a concrete implementation of the journal. > + */ > +extern struct journal *current_journal; > + > +/** Wake the journal queue up. */ > +void > +journal_queue_wakeup(bool force_ready); > + > +/** > + * Check whether any of the queue size limits is reached. > + * If the queue is full, we must wait for some of the entries to be written > + * before proceeding with a new asynchronous write request. > + */ > +static inline bool > +journal_queue_is_full(void) > +{ > + struct journal *j = current_journal; > + return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) || > + (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len); 6. Seems like a lot of checks. Option 1: make queue_max_size = INT64_MAX when user passes 0. Then no need to check for != 0. The same for queue_max_len. Option 2 which may be stupid (but combined with option 1): store a flag 'is_full' and update it when update queue_size and queue_len and see they exceeded the limit. But I am not sure it reduces number of branches. Didn't check. > +} > + > +/** > + * Check whether anyone is waiting for the journal queue to empty. If there are > + * other waiters we must go after them to preserve write order. > + */ > +static inline bool > +journal_queue_has_waiters(void) > +{ > + return !rlist_empty(¤t_journal->waiters); > +} > + > +/** Yield until there's some space in the journal queue. */ > +void > +journal_wait_queue(void); > + > +/** Set maximal journal queue size in bytes. */ > +static inline void > +journal_queue_set_max_size(struct journal *j, int64_t size) 7. Why do we have journal parameter here, but don't have it in the other functions? The same journal_queue_set_max_len. > +{ > + assert(j == current_journal); > + j->queue_max_size = size; > + if (journal_queue_has_waiters() && !journal_queue_is_full()) > + journal_queue_wakeup(false); > +} > @@ -159,6 +264,12 @@ journal_write(struct journal_entry *entry) > static inline int > journal_write_async(struct journal_entry *entry) > { > + /* > + * It's the job of the caller to check whether the queue is full prior > + * to submitting the request. 8. Maybe add an assert though. > + */ > + journal_queue_on_append(entry); > + > return current_journal->write_async(current_journal, entry); > }
17.02.2021 23:46, Vladislav Shpilevoy пишет: > Hi! Thanks for the patch! Thanks for the review! Please find my answers inline and the incremental diff below. > Now looks cool indeed. > > Another raw idea on which I don't insist and not even sure it is > good. But just came to my mind: how about making a separate > object called 'journal_queue'? Or 'journal_ctl'? Which is global > and is not inside of one journal. It can't be changed to another > queue/ctl, and is used by journal API. > > So we wouldn't need to worry if we configured the correct journal > because now current_journal can change at runtime, but this ctl > thing - can't. Yes, this'd fix the problem which bothers me: whether we configure the correct queue. I don't want to do this TBH, looks like it's too complex for what it's trying to achieve. > Another option - call this thing 'journal', and rename the old > 'journal' to 'journal_storage' or 'journal_api' or 'journal_vtab' > or something like this. > > Another option - ignore this, since it does not matter much. But > just in case you would want to try to fit the solution into one > of these ideas. > > See 8 comments below. > >> diff --git a/src/box/journal.c b/src/box/journal.c >> index cb320b557..49441e596 100644 >> --- a/src/box/journal.c >> +++ b/src/box/journal.c >> @@ -55,3 +55,66 @@ journal_entry_new(size_t n_rows, struct region *region, >> complete_data); >> return entry; >> } >> + >> +struct journal_queue_entry { >> + /** The fiber waiting for queue space to free. */ >> + struct fiber *fiber; >> + /** Whether the fiber should be waken up regardless of queue size. */ >> + bool is_ready; >> + /** A link in all waiting fibers list. */ >> + struct rlist in_queue; >> +}; >> + >> +/** >> + * Wake up the next waiter in journal queue. >> + */ >> +static inline void >> +journal_queue_wakeup_next(struct rlist *link, bool force_ready) > 1. The flag is known in all usage places at compilation time. Is it > possible to split the function into force/normal versions? The same > for journal_queue_wakeup() from which this runtime uncertainty arises. Actually, the parameter is not known at compile time when wakeup_next() is called from journal_wait_queue(). For now wakeup_next() only has a single check for force_ready, so moving the check outside would only increase the number of branches. journal_queue_wakeup() is called only once per a whole queue wakeup, so I suppose it doesn't hurt much it has a compile-time known parameter. > Also it is worth adding a comment why is force mode even needed. No problem. >> +{ >> + /* Empty queue or last entry in queue. */ >> + if (link == rlist_last(¤t_journal->waiters)) { > 2. I am not sure I understand what is happening here. Why is this > function in one place called with the pointer at the list itself, > and in another place with the pointer at one element? Well, <list head> -> next is the fist list entry, right? In queue_wakeup() I wake the first waiter up. Once any waiter gets woken up, it wakes up the next waiter. Which is <in_queue> -> next. That's why I have a common helper for these two cases. >> + current_journal->queue_is_woken = false; >> + return; >> + } >> + /* >> + * When the queue isn't forcefully emptied, no need to wake everyone >> + * else up until there's some free space. >> + */ >> + if (!force_ready && journal_queue_is_full()) { >> + current_journal->queue_is_woken = false; > 3. Maybe woken -> awake? No problem. > 4. Why do you need the flag? Can you just remove the awake entries > from the queue right away? Then it wouldn't even be possible to make > a double wakeup. See comment 5. I think I can't. Please see answer to comment 5. >> + return; >> + } >> + struct journal_queue_entry *e = rlist_entry(rlist_next(link), typeof(*e), >> + in_queue); >> + e->is_ready = force_ready; >> + fiber_wakeup(e->fiber); >> +} >> + >> +void >> +journal_queue_wakeup(bool force_ready) >> +{ >> + assert(!rlist_empty(¤t_journal->waiters)); >> + if (current_journal->queue_is_woken) >> + return; >> + current_journal->queue_is_woken = true; >> + journal_queue_wakeup_next(¤t_journal->waiters, force_ready); >> +} >> + >> +void >> +journal_wait_queue(void) >> +{ >> + struct journal_queue_entry entry = { >> + .fiber = fiber(), >> + .is_ready = false, >> + }; >> + rlist_add_tail_entry(¤t_journal->waiters, &entry, in_queue); >> + /* >> + * Will be waken up by either queue emptying or a synchronous write. >> + */ >> + while (journal_queue_is_full() && !entry.is_ready) >> + fiber_yield(); >> + >> + journal_queue_wakeup_next(&entry.in_queue, entry.is_ready); >> + assert(&entry.in_queue == rlist_first(¤t_journal->waiters)); >> + rlist_del(&entry.in_queue); > 5. Can rlist_del be done along with fiber_wakeup()? Then you > wouldn't need is_woken maybe. Looks like it can't. Say we have only one waiter. And remove it from the list on wakeup. The list would become empty and there'd be no way to check whether journal has any waiters, and we may reorder the entries (put new ones before the waiting one). This is not necessarily bad, because I put entries into queue before txn_begin(), but someone may call journal_wait_queue() from inside the transaction, or right before txn_commit(). Then it might be bad to put other transactions before this one. So while removing is_woken we would have to introduce queue_has_waiters flag for the sake of this single waiter. >> +} >> diff --git a/src/box/journal.h b/src/box/journal.h >> index 5d8d5a726..d295dfa4b 100644 >> --- a/src/box/journal.h >> +++ b/src/box/journal.h >> @@ -124,6 +143,81 @@ struct journal { >> struct journal_entry *entry); >> }; >> >> +/** >> + * Depending on the step of recovery and instance configuration >> + * points at a concrete implementation of the journal. >> + */ >> +extern struct journal *current_journal; >> + >> +/** Wake the journal queue up. */ >> +void >> +journal_queue_wakeup(bool force_ready); >> + >> +/** >> + * Check whether any of the queue size limits is reached. >> + * If the queue is full, we must wait for some of the entries to be written >> + * before proceeding with a new asynchronous write request. >> + */ >> +static inline bool >> +journal_queue_is_full(void) >> +{ >> + struct journal *j = current_journal; >> + return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) || >> + (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len); > 6. Seems like a lot of checks. Option 1: make queue_max_size = INT64_MAX > when user passes 0. Then no need to check for != 0. The same for queue_max_len. Sounds good, thanks for the suggestion! > Option 2 which may be stupid (but combined with option 1): store a flag > 'is_full' and update it when update queue_size and queue_len and see they > exceeded the limit. But I am not sure it reduces number of branches. Didn't > check. Then we'd evaluate is_full() every journal_confirm() and journal_write: for both sync and async writes, which happens more often than the actual check is needed (only for async writes). I think it's better to calculate is_full on demand rather than every time it might change. >> +} >> + >> +/** >> + * Check whether anyone is waiting for the journal queue to empty. If there are >> + * other waiters we must go after them to preserve write order. >> + */ >> +static inline bool >> +journal_queue_has_waiters(void) >> +{ >> + return !rlist_empty(¤t_journal->waiters); >> +} >> + >> +/** Yield until there's some space in the journal queue. */ >> +void >> +journal_wait_queue(void); >> + >> +/** Set maximal journal queue size in bytes. */ >> +static inline void >> +journal_queue_set_max_size(struct journal *j, int64_t size) > 7. Why do we have journal parameter here, but don't have it in > the other functions? The same journal_queue_set_max_len. This is my attempt to make sure only wal_writer's journal has a queue. I explicitly set queue_max_... parameters only for wal_writer's journal. And then there's an assert that journal_queue_set_...() is only called with the current journal. >> +{ >> + assert(j == current_journal); >> + j->queue_max_size = size; >> + if (journal_queue_has_waiters() && !journal_queue_is_full()) >> + journal_queue_wakeup(false); >> +} >> @@ -159,6 +264,12 @@ journal_write(struct journal_entry *entry) >> static inline int >> journal_write_async(struct journal_entry *entry) >> { >> + /* >> + * It's the job of the caller to check whether the queue is full prior >> + * to submitting the request. > 8. Maybe add an assert though. I wanted to, but it's impossible. The queue may be full when all the waiters are forcefully waken up by a synchronous commit. And it's hard to tell whether it was a "force" wakeup or not. So let's just hope noone misuses this API. Or, even better, I can remove is_ready field from queue entries and add a new field to the journal: queue_is_ready or something. And addition to queue_is_awake. Then every entry will check queue_is_ready instead of entry.is_ready and it'll be possible to add an assert here: !journal_queue_is_full || journal_queue_is_ready Looks like this'll also allow us to extract queue_wakeup_(next)_force, like you suggested in paragraph 1. What do you think ? >> + */ >> + journal_queue_on_append(entry); >> + >> return current_journal->write_async(current_journal, entry); >> } Incremental diff: diff --git a/src/box/box.cc b/src/box/box.cc index 2b335599e..9a3b092d0 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -762,6 +762,9 @@ box_check_wal_queue_max_len(void) diag_set(ClientError, ER_CFG, "wal_queue_max_len", "wal_queue_max_len must be >= 0"); } + /* Unlimited. */ + if (len == 0) + len = INT64_MAX; return len; } @@ -773,6 +776,9 @@ box_check_wal_queue_max_size(void) diag_set(ClientError, ER_CFG, "wal_queue_max_size", "wal_queue_max_size must be >= 0"); } + /* Unlimited. */ + if (size == 0) + size = INT64_MAX; return size; } diff --git a/src/box/journal.c b/src/box/journal.c index 49441e596..931797faf 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -73,7 +73,7 @@ journal_queue_wakeup_next(struct rlist *link, bool force_ready) { /* Empty queue or last entry in queue. */ if (link == rlist_last(¤t_journal->waiters)) { - current_journal->queue_is_woken = false; + current_journal->queue_is_awake = false; return; } /* @@ -81,7 +81,7 @@ journal_queue_wakeup_next(struct rlist *link, bool force_ready) * else up until there's some free space. */ if (!force_ready && journal_queue_is_full()) { - current_journal->queue_is_woken = false; + current_journal->queue_is_awake = false; return; } struct journal_queue_entry *e = rlist_entry(rlist_next(link), typeof(*e), @@ -94,9 +94,9 @@ void journal_queue_wakeup(bool force_ready) { assert(!rlist_empty(¤t_journal->waiters)); - if (current_journal->queue_is_woken) + if (current_journal->queue_is_awake) return; - current_journal->queue_is_woken = true; + current_journal->queue_is_awake = true; journal_queue_wakeup_next(¤t_journal->waiters, force_ready); } diff --git a/src/box/journal.h b/src/box/journal.h index d295dfa4b..2caac4099 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -133,7 +133,7 @@ struct journal { * Whether the queue is being woken or not. Used to avoid multiple * concurrent wake-ups. */ - bool queue_is_woken; + bool queue_is_awake; /** Asynchronous write */ int (*write_async)(struct journal *journal, struct journal_entry *entry); @@ -149,7 +149,11 @@ struct journal { */ extern struct journal *current_journal; -/** Wake the journal queue up. */ +/** + * Wake the journal queue up. + * @param force_ready whether waiters should proceed even if the queue is still + * full. + */ void journal_queue_wakeup(bool force_ready); @@ -162,8 +166,8 @@ static inline bool journal_queue_is_full(void) { struct journal *j = current_journal; - return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) || - (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len); + return j->queue_size > j->queue_max_size || + j->queue_len > j->queue_max_len; } /** @@ -310,10 +314,10 @@ journal_create(struct journal *journal, journal->write_async = write_async; journal->write = write; journal->queue_size = 0; - journal->queue_max_size = 0; + journal->queue_max_size = INT64_MAX; journal->queue_len = 0; - journal->queue_max_len = 0; - journal->queue_is_woken = false; + journal->queue_max_len = INT64_MAX; + journal->queue_is_awake = false; rlist_create(&journal->waiters); } -- Serge Petrenko
Hi! Thanks for the patch! >>> diff --git a/src/box/journal.c b/src/box/journal.c >>> index cb320b557..49441e596 100644 >>> --- a/src/box/journal.c >>> +++ b/src/box/journal.c >>> @@ -55,3 +55,66 @@ journal_entry_new(size_t n_rows, struct region *region, >>> complete_data); >>> return entry; >>> } >>> + >>> +struct journal_queue_entry { >>> + /** The fiber waiting for queue space to free. */ >>> + struct fiber *fiber; >>> + /** Whether the fiber should be waken up regardless of queue size. */ >>> + bool is_ready; >>> + /** A link in all waiting fibers list. */ >>> + struct rlist in_queue; >>> +}; >>> + >>> +/** >>> + * Wake up the next waiter in journal queue. >>> + */ >>> +static inline void >>> +journal_queue_wakeup_next(struct rlist *link, bool force_ready) >> 1. The flag is known in all usage places at compilation time. Is it >> possible to split the function into force/normal versions? The same >> for journal_queue_wakeup() from which this runtime uncertainty arises. > > Actually, the parameter is not known at compile time when wakeup_next() > is called from journal_wait_queue(). For now wakeup_next() only has a single > check for force_ready, so moving the check outside would only increase the > number of branches. > > journal_queue_wakeup() is called only once per a whole queue wakeup, so > I suppose it doesn't hurt much it has a compile-time known parameter. Is it called once? Then why does it have `if (current_journal->queue_is_woken)` check? >>> +{ >>> + /* Empty queue or last entry in queue. */ >>> + if (link == rlist_last(¤t_journal->waiters)) { >> 2. I am not sure I understand what is happening here. Why is this >> function in one place called with the pointer at the list itself, >> and in another place with the pointer at one element? > > Well, <list head> -> next is the fist list entry, right? Perhaps. TBH, I don't remember and when see such tricky things in the code, it takes time to understand it. > In queue_wakeup() I wake the first waiter up. > > Once any waiter gets woken up, it wakes up the next waiter. > Which is <in_queue> -> next. > > That's why I have a common helper for these two cases. Ok, I see now. But it seems you could make it simpler, right? ==================== @@ -69,10 +69,10 @@ struct journal_queue_entry { * Wake up the next waiter in journal queue. */ static inline void -journal_queue_wakeup_next(struct rlist *link, bool force_ready) +journal_queue_wakeup_first(bool force_ready) { /* Empty queue or last entry in queue. */ - if (link == rlist_last(¤t_journal->waiters)) { + if (rlist_empty(¤t_journal->waiters)) { current_journal->queue_is_woken = false; return; } @@ -97,7 +97,7 @@ journal_queue_wakeup(bool force_ready) if (current_journal->queue_is_woken) return; current_journal->queue_is_woken = true; - journal_queue_wakeup_next(¤t_journal->waiters, force_ready); + journal_queue_wakeup_first(force_ready); } void @@ -114,7 +114,7 @@ journal_wait_queue(void) while (journal_queue_is_full() && !entry.is_ready) fiber_yield(); - journal_queue_wakeup_next(&entry.in_queue, entry.is_ready); assert(&entry.in_queue == rlist_first(¤t_journal->waiters)); rlist_del(&entry.in_queue); + journal_queue_wakeup_first(entry.is_ready); } ==================== (I didn't test it.) >> 5. Can rlist_del be done along with fiber_wakeup()? Then you >> wouldn't need is_woken maybe. > > Looks like it can't. > Say we have only one waiter. And remove it from the list on wakeup. > The list would become empty and there'd be no way to check whether > journal has any waiters, and we may reorder the entries (put new ones before > the waiting one). This is not necessarily bad, because I put entries into queue > before txn_begin(), but someone may call journal_wait_queue() from inside the > transaction, or right before txn_commit(). Then it might be bad to put other > transactions before this one. Order change is definitely not acceptable. > So while removing is_woken we would have to introduce queue_has_waiters flag for > the sake of this single waiter. It would rather become a counter - number of waiters. Because there can be many. But yeah, I see the problem. >>> +} >>> + >>> +/** >>> + * Check whether anyone is waiting for the journal queue to empty. If there are >>> + * other waiters we must go after them to preserve write order. >>> + */ >>> +static inline bool >>> +journal_queue_has_waiters(void) >>> +{ >>> + return !rlist_empty(¤t_journal->waiters); >>> +} >>> + >>> +/** Yield until there's some space in the journal queue. */ >>> +void >>> +journal_wait_queue(void); >>> + >>> +/** Set maximal journal queue size in bytes. */ >>> +static inline void >>> +journal_queue_set_max_size(struct journal *j, int64_t size) >> 7. Why do we have journal parameter here, but don't have it in >> the other functions? The same journal_queue_set_max_len. > > This is my attempt to make sure only wal_writer's journal has a queue. > I explicitly set queue_max_... parameters only for wal_writer's journal. > And then there's an assert that journal_queue_set_...() is only called with > the current journal. Or the assertion could be done in wal_set_queue_*() functions. To keep the journal API consistent. I just realized, journal can be easily unit-tested. It does not depend on anything except small/ and core/ libs. Although seems like a lot of work so maybe not now. Probably later, for something more complex and harder to test via functional tests. However if you would write tests now, it would be greatly appreciated. >>> +{ >>> + assert(j == current_journal); >>> + j->queue_max_size = size; >>> + if (journal_queue_has_waiters() && !journal_queue_is_full()) >>> + journal_queue_wakeup(false); >>> +} >>> @@ -159,6 +264,12 @@ journal_write(struct journal_entry *entry) >>> static inline int >>> journal_write_async(struct journal_entry *entry) >>> { >>> + /* >>> + * It's the job of the caller to check whether the queue is full prior >>> + * to submitting the request. >> 8. Maybe add an assert though. > > I wanted to, but it's impossible. > The queue may be full when all the waiters are forcefully waken up by a > synchronous commit. And it's hard to tell whether it was a "force" wakeup > or not. So let's just hope noone misuses this API. Yeah, I see now. > Or, even better, I can remove is_ready field from queue entries and add a new field > to the journal: queue_is_ready or something. And addition to queue_is_awake. > Then every entry will check queue_is_ready instead of entry.is_ready and > it'll be possible to add an assert here: !journal_queue_is_full || journal_queue_is_ready > Looks like this'll also allow us to extract queue_wakeup_(next)_force, like you suggested > in paragraph 1. > What do you think ? Sounds good, worth doing. See 2 comments below. >>> + */ >>> + journal_queue_on_append(entry); >>> + >>> return current_journal->write_async(current_journal, entry); >>> }> diff --git a/src/box/applier.cc b/src/box/applier.cc > index 553db76fc..7c2452d2b 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) > goto success; > } > > + /* > + * Do not spam WAL with excess write requests, let it process what's > + * piled up first. > + * This is done before opening the transaction to avoid problems with > + * yielding inside it. > + */ > + if (journal_queue_is_full()) > + journal_wait_queue(); 1. I just noticed you do the waiting before starting the transaction. In case of Vinyl the transaction can yield. So by the time you get to commit, the queue could be full. Don't know what to do with this. We can't wait before txn_commit_async() because it would kill the memtx transactions. Maybe we could not to care now. Because overpopulation never will exceed number of appliers, which is tiny. But when async transactions will go to the public API, we will face this issue anyway. I assume we will need to extract txn_prepare to the "public" part of txn.h and use it separately from writing to the journal. So in our code it would look like this: sync: txn_begin() ... txn_commit() async: txn_begin() ... txn_prepare() journal_wait() txn_persist() or something similar. But don't know for sure. Summary: leave it as is if don't want to tear commit_async() and commit() up into parts now. > + > /** > * Explicitly begin the transaction so that we can > * control fiber->gc life cycle and, in case of apply > diff --git a/src/box/journal.h b/src/box/journal.h > index 5d8d5a726..d295dfa4b 100644 > --- a/src/box/journal.h > +++ b/src/box/journal.h > @@ -159,6 +264,12 @@ journal_write(struct journal_entry *entry) > static inline int > journal_write_async(struct journal_entry *entry) > { > + /* > + * It's the job of the caller to check whether the queue is full prior > + * to submitting the request. > + */ > + journal_queue_on_append(entry); > + > return current_journal->write_async(current_journal, entry); 2. What if write_async() is called by some applier when the queue is not full, but also not empty? It seems it will bypass the existing waiters and lead to the transaction order change. No? I start thinking that we need to queue the journal_entry objects right in the journal object. So if their queue is not empty, journal_write_async() adds the entry to the queue and does not call write_async(). Also would be cool to add a test how the applier can reorder WAL writes in the current patch.
24.02.2021 01:19, Vladislav Shpilevoy пишет: > Hi! Thanks for the patch! Thanks for the review! Please find my answers below and v3 of the patch in your inbox. > >>>> diff --git a/src/box/journal.c b/src/box/journal.c >>>> index cb320b557..49441e596 100644 >>>> --- a/src/box/journal.c >>>> +++ b/src/box/journal.c >>>> @@ -55,3 +55,66 @@ journal_entry_new(size_t n_rows, struct region *region, >>>> complete_data); >>>> return entry; >>>> } >>>> + >>>> +struct journal_queue_entry { >>>> + /** The fiber waiting for queue space to free. */ >>>> + struct fiber *fiber; >>>> + /** Whether the fiber should be waken up regardless of queue size. */ >>>> + bool is_ready; >>>> + /** A link in all waiting fibers list. */ >>>> + struct rlist in_queue; >>>> +}; >>>> + >>>> +/** >>>> + * Wake up the next waiter in journal queue. >>>> + */ >>>> +static inline void >>>> +journal_queue_wakeup_next(struct rlist *link, bool force_ready) >>> 1. The flag is known in all usage places at compilation time. Is it >>> possible to split the function into force/normal versions? The same >>> for journal_queue_wakeup() from which this runtime uncertainty arises. >> Actually, the parameter is not known at compile time when wakeup_next() >> is called from journal_wait_queue(). For now wakeup_next() only has a single >> check for force_ready, so moving the check outside would only increase the >> number of branches. >> >> journal_queue_wakeup() is called only once per a whole queue wakeup, so >> I suppose it doesn't hurt much it has a compile-time known parameter. > Is it called once? Then why does it have `if (current_journal->queue_is_woken)` > check? I was trying to say 'it's called once per a bunch of wakeup_next() calls' Just ignore this. This is irrelevant. Actually, no, it may be called multiple times, from every journal_async_complete(). But it is a no-op for each consequent call, except the first one. (while the queue is being cleared). > >>>> +{ >>>> + /* Empty queue or last entry in queue. */ >>>> + if (link == rlist_last(¤t_journal->waiters)) { >>> 2. I am not sure I understand what is happening here. Why is this >>> function in one place called with the pointer at the list itself, >>> and in another place with the pointer at one element? >> Well, <list head> -> next is the fist list entry, right? > Perhaps. TBH, I don't remember and when see such tricky things in > the code, it takes time to understand it. > >> In queue_wakeup() I wake the first waiter up. >> >> Once any waiter gets woken up, it wakes up the next waiter. >> Which is <in_queue> -> next. >> >> That's why I have a common helper for these two cases. > Ok, I see now. But it seems you could make it simpler, right? > > ==================== > @@ -69,10 +69,10 @@ struct journal_queue_entry { > * Wake up the next waiter in journal queue. > */ > static inline void > -journal_queue_wakeup_next(struct rlist *link, bool force_ready) > +journal_queue_wakeup_first(bool force_ready) > { > /* Empty queue or last entry in queue. */ > - if (link == rlist_last(¤t_journal->waiters)) { > + if (rlist_empty(¤t_journal->waiters)) { > current_journal->queue_is_woken = false; > return; > } > @@ -97,7 +97,7 @@ journal_queue_wakeup(bool force_ready) > if (current_journal->queue_is_woken) > return; > current_journal->queue_is_woken = true; > - journal_queue_wakeup_next(¤t_journal->waiters, force_ready); > + journal_queue_wakeup_first(force_ready); > } > > void > @@ -114,7 +114,7 @@ journal_wait_queue(void) > while (journal_queue_is_full() && !entry.is_ready) > fiber_yield(); > > - journal_queue_wakeup_next(&entry.in_queue, entry.is_ready); > assert(&entry.in_queue == rlist_first(¤t_journal->waiters)); > rlist_del(&entry.in_queue); > + journal_queue_wakeup_first(entry.is_ready); > } > ==================== > > (I didn't test it.) Yes, indeed. Thanks! Applied with minor changes. >>> 5. Can rlist_del be done along with fiber_wakeup()? Then you >>> wouldn't need is_woken maybe. >> Looks like it can't. >> Say we have only one waiter. And remove it from the list on wakeup. >> The list would become empty and there'd be no way to check whether >> journal has any waiters, and we may reorder the entries (put new ones before >> the waiting one). This is not necessarily bad, because I put entries into queue >> before txn_begin(), but someone may call journal_wait_queue() from inside the >> transaction, or right before txn_commit(). Then it might be bad to put other >> transactions before this one. > Order change is definitely not acceptable. > >> So while removing is_woken we would have to introduce queue_has_waiters flag for >> the sake of this single waiter. > It would rather become a counter - number of waiters. Because there can > be many. But yeah, I see the problem. > >>>> +} >>>> + >>>> +/** >>>> + * Check whether anyone is waiting for the journal queue to empty. If there are >>>> + * other waiters we must go after them to preserve write order. >>>> + */ >>>> +static inline bool >>>> +journal_queue_has_waiters(void) >>>> +{ >>>> + return !rlist_empty(¤t_journal->waiters); >>>> +} >>>> + >>>> +/** Yield until there's some space in the journal queue. */ >>>> +void >>>> +journal_wait_queue(void); >>>> + >>>> +/** Set maximal journal queue size in bytes. */ >>>> +static inline void >>>> +journal_queue_set_max_size(struct journal *j, int64_t size) >>> 7. Why do we have journal parameter here, but don't have it in >>> the other functions? The same journal_queue_set_max_len. >> This is my attempt to make sure only wal_writer's journal has a queue. >> I explicitly set queue_max_... parameters only for wal_writer's journal. >> And then there's an assert that journal_queue_set_...() is only called with >> the current journal. > Or the assertion could be done in wal_set_queue_*() functions. To keep the > journal API consistent. Actually, struct journal has a ton queue_* members now, so I'm following your older advice and extracting everything related to queues into struct journal_queue. > > I just realized, journal can be easily unit-tested. It does not depend on > anything except small/ and core/ libs. Although seems like a lot of work so > maybe not now. Probably later, for something more complex and harder to test > via functional tests. However if you would write tests now, it would be > greatly appreciated. > >>>> +{ >>>> + assert(j == current_journal); >>>> + j->queue_max_size = size;c >>>> + if (journal_queue_has_waiters() && !journal_queue_is_full()) >>>> + journal_queue_wakeup(false); >>>> +} >>>> @@ -159,6 +264,12 @@ journal_write(struct journal_entry *entry) >>>> static inline int >>>> journal_write_async(struct journal_entry *entry) >>>> { >>>> + /* >>>> + * It's the job of the caller to check whether the queue is full prior >>>> + * to submitting the request. >>> 8. Maybe add an assert though. >> I wanted to, but it's impossible. >> The queue may be full when all the waiters are forcefully waken up by a >> synchronous commit. And it's hard to tell whether it was a "force" wakeup >> or not. So let's just hope noone misuses this API. > Yeah, I see now. > >> Or, even better, I can remove is_ready field from queue entries and add a new field >> to the journal: queue_is_ready or something. And addition to queue_is_awake. >> Then every entry will check queue_is_ready instead of entry.is_ready and >> it'll be possible to add an assert here: !journal_queue_is_full || journal_queue_is_ready >> Looks like this'll also allow us to extract queue_wakeup_(next)_force, like you suggested >> in paragraph 1. >> What do you think ? > Sounds good, worth doing. I introduced queue_is_ready and removed entry.is_ready. The code looks cleaner now and together with your suggestion regarding journal_queue_wakeup_first(), now it doesn't have parameters at all. It does have a check for queue_is_ready internally, but there's no point in separating _force and normal versions. This would simply move the check outside the function call. > > See 2 comments below. > >>>> + */ >>>> + journal_queue_on_append(entry); >>>> + >>>> return current_journal->write_async(current_journal, entry); >>>> }> diff --git a/src/box/applier.cc b/src/box/applier.cc >> index 553db76fc..7c2452d2b 100644 >> --- a/src/box/applier.cc >> +++ b/src/box/applier.cc >> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) >> goto success; >> } >> >> + /* >> + * Do not spam WAL with excess write requests, let it process what's >> + * piled up first. >> + * This is done before opening the transaction to avoid problems with >> + * yielding inside it. >> + */ >> + if (journal_queue_is_full()) >> + journal_wait_queue(); > 1. I just noticed you do the waiting before starting the > transaction. In case of Vinyl the transaction can yield. So > by the time you get to commit, the queue could be full. > > Don't know what to do with this. We can't wait before > txn_commit_async() because it would kill the memtx transactions. > > Maybe we could not to care now. Because overpopulation never > will exceed number of appliers, which is tiny. > > But when async transactions will go to the public API, we > will face this issue anyway. I assume we will need to extract > txn_prepare to the "public" part of txn.h and use it separately > from writing to the journal. So in our code it would look like > this: > > sync: txn_begin() ... txn_commit() > async: txn_begin() ... txn_prepare() journal_wait() txn_persist() > > or something similar. But don't know for sure. > > Summary: leave it as is if don't want to tear commit_async() > and commit() up into parts now. Let's leave it as is for now then. > >> + >> /** >> * Explicitly begin the transaction so that we can >> * control fiber->gc life cycle and, in case of apply >> diff --git a/src/box/journal.h b/src/box/journal.h >> index 5d8d5a726..d295dfa4b 100644 >> --- a/src/box/journal.h >> +++ b/src/box/journal.h >> @@ -159,6 +264,12 @@ journal_write(struct journal_entry *entry) >> static inline int >> journal_write_async(struct journal_entry *entry) >> { >> + /* >> + * It's the job of the caller to check whether the queue is full prior >> + * to submitting the request. >> + */ >> + journal_queue_on_append(entry); >> + >> return current_journal->write_async(current_journal, entry); > 2. What if write_async() is called by some applier when the queue is > not full, but also not empty? It seems it will bypass the existing > waiters and lead to the transaction order change. No? Yes, you're correct, and thanks for noticing this. This is fixed simply: diff --git a/src/box/applier.cc b/src/box/applier.cc index 7c2452d2b..27ddd0f29 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -973,7 +973,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) * This is done before opening the transaction to avoid problems with * yielding inside it. */ - if (journal_queue_is_full()) + if (journal_queue_is_full() || journal_queue_has_waiters()) journal_wait_queue(); /** Having this fix applied, nothing else could go wrong here AFAICS. > > I start thinking that we need to queue the journal_entry objects right > in the journal object. So if their queue is not empty, > journal_write_async() adds the entry to the queue and does not call > write_async(). Why? > > Also would be cool to add a test how the applier can reorder WAL writes > in the current patch. -- Serge Petrenko
>> I start thinking that we need to queue the journal_entry objects right
>> in the journal object. So if their queue is not empty,
>> journal_write_async() adds the entry to the queue and does not call
>> write_async().
>
> Why?
Nevermind, I probably didn't think it through enough at that moment.