From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: Cyrill Gorcunov <gorcunov@gmail.com> Cc: v.shpilevoy@tarantool.org, tarantool-patches@dev.tarantool.org Subject: Re: [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes Date: Tue, 16 Feb 2021 15:47:09 +0300 [thread overview] Message-ID: <4050015c-bf24-eac3-a890-216efab8c635@tarantool.org> (raw) In-Reply-To: <YCpYZ/K8V8ePZvSD@grain> 15.02.2021 14:17, Cyrill Gorcunov пишет: > On Thu, Feb 11, 2021 at 03:17:50PM +0300, Serge Petrenko wrote: >> diff --git a/src/box/applier.cc b/src/box/applier.cc >> index 553db76fc..06aaa0a79 100644 >> --- a/src/box/applier.cc >> +++ b/src/box/applier.cc >> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) >> goto success; >> } >> >> + /* >> + * Do not spam WAL with excess write requests, let it process what's >> + * piled up first. >> + * This is done before opening the transaction to avoid problems with >> + * yielding inside it. >> + */ >> + if (journal_queue_is_full(current_journal)) >> + journal_wait_queue(); >> + > Serge, just a few comments, feel free to ignore. Hi! Thanks for the review! > > Maybe it would be better to pass current_journal to journal_wait_queue, > otherwise it looks somehow inconsistent, no? > > if (journal_queue_is_full(current_journal)) > journal_wait_queue(current_journal); I tried to remove parameters from almost all methods, as dicsussed in chat. > > Actually I would name journal queue engine as plain journalq, this would > look like > > if (journalq_is_full(current_journal)) > journalq_wait(current_journal); > > But it is very humble pov, lets stick with long `journal_queue`. > ... To be honest, I like `journal_queue_` prefix more. >> +/** Wake the journal queue up. */ >> +static inline void >> +journal_queue_wakeup(struct journal *j, bool force_ready) >> +{ >> + assert(j == current_journal); > Seems this assert is not needed. The overall idea of passing > journal as an argument is quite the reverse, ie to work with > any journal. This is not a blocker, could be cleaned up on top > or simply ignored. Same as above, discussed verbally. > >> + assert(!rlist_empty(&j->waiters)); >> + if (j->queue_is_woken) >> + return; >> + j->queue_is_woken = true; >> + journal_queue_wakeup_next(&j->waiters, force_ready); >> +} >> + >> +/** >> + * Check whether any of the queue size limits is reached. >> + * If the queue is full, we must wait for some of the entries to be written >> + * before proceeding with a new asynchronous write request. >> + */ >> +static inline bool >> +journal_queue_is_full(struct journal *j) >> +{ >> + assert(j == current_journal); > same, no need for assert() > >> + return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) || >> + (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len); >> +} >> + >> +/** >> + * Check whether anyone is waiting for the journal queue to empty. If there are >> + * other waiters we must go after them to preserve write order. >> + */ >> +static inline bool >> +journal_queue_has_waiters(struct journal *j) >> +{ >> + assert(j == current_journal); > same, no need for assert() > >> + return !rlist_empty(&j->waiters); >> +} >> + >> +/** Yield until there's some space in the journal queue. */ >> +void >> +journal_wait_queue(void); >> + >> /** >> * Complete asynchronous write. >> */ >> @@ -131,15 +199,15 @@ static inline void >> journal_async_complete(struct journal_entry *entry) >> { >> assert(entry->write_async_cb != NULL); >> + current_journal->queue_len--; >> + current_journal->queue_size -= entry->approx_len; > Myabe worth to make queue ops closed into some helper? Because > length and size can't be updated without a tangle. IOW, something > like > > static inline void > journal_queue_attr_dec(struct journal *j, struct journal_entry *entry) > { > j->queue_len--; > j->queue_size -= entry->approx_len; > } > > static inline void > journal_queue_attr_inc(struct journal *j, struct journal_entry *entry) > { > j->queue_len++; > j->queue_size += entry->approx_len; > } > > Again, this is my pov, **free to ignore**. attr here stands for > attributes because queue_len and queue_size are not the queue > itself but attributes which controls when we need to wait > data to be flushed. Ok, sure. Introduced `journal_queue_on_append(struct journal_entry *entry)` and `journal_queue_on_complete(struct journal_entry *entry)` > >> + assert(current_journal->queue_len >= 0); >> + assert(current_journal->queue_size >= 0); >> + if (journal_queue_has_waiters(current_journal)) >> + journal_queue_wakeup(current_journal, false); >> entry->write_async_cb(entry); >> } Here's an incremental diff. It's all pure refactoring with no functional changes. I've intdouced `journal_queue_on_append` and `journal_queue_on_complete` for increasing and decreasing queue length and size, and tried to remove `struct journal` parameter from almost every new method, except `journal_queue_set_max_size` and `journal_queue_set_max_len` ==================================================== diff --git a/src/box/applier.cc b/src/box/applier.cc index 06aaa0a79..7c2452d2b 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -973,7 +973,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) * This is done before opening the transaction to avoid problems with * yielding inside it. */ - if (journal_queue_is_full(current_journal)) + if (journal_queue_is_full()) journal_wait_queue(); /** diff --git a/src/box/journal.c b/src/box/journal.c index 19a184580..49441e596 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -68,7 +68,7 @@ struct journal_queue_entry { /** * Wake up the next waiter in journal queue. */ -void +static inline void journal_queue_wakeup_next(struct rlist *link, bool force_ready) { /* Empty queue or last entry in queue. */ @@ -80,16 +80,26 @@ journal_queue_wakeup_next(struct rlist *link, bool force_ready) * When the queue isn't forcefully emptied, no need to wake everyone * else up until there's some free space. */ - if (journal_queue_is_full(current_journal) && !force_ready) { + if (!force_ready && journal_queue_is_full()) { current_journal->queue_is_woken = false; return; } struct journal_queue_entry *e = rlist_entry(rlist_next(link), typeof(*e), in_queue); - e->is_ready |= force_ready; + e->is_ready = force_ready; fiber_wakeup(e->fiber); } +void +journal_queue_wakeup(bool force_ready) +{ + assert(!rlist_empty(¤t_journal->waiters)); + if (current_journal->queue_is_woken) + return; + current_journal->queue_is_woken = true; + journal_queue_wakeup_next(¤t_journal->waiters, force_ready); +} + void journal_wait_queue(void) { @@ -101,7 +111,7 @@ journal_wait_queue(void) /* * Will be waken up by either queue emptying or a synchronous write. */ - while (journal_queue_is_full(current_journal) && !entry.is_ready) + while (journal_queue_is_full() && !entry.is_ready) fiber_yield(); journal_queue_wakeup_next(&entry.in_queue, entry.is_ready); diff --git a/src/box/journal.h b/src/box/journal.h index 9c8af062a..d295dfa4b 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -149,20 +149,9 @@ struct journal { */ extern struct journal *current_journal; -void -journal_queue_wakeup_next(struct rlist *link, bool force_ready); - /** Wake the journal queue up. */ -static inline void -journal_queue_wakeup(struct journal *j, bool force_ready) -{ - assert(j == current_journal); - assert(!rlist_empty(&j->waiters)); - if (j->queue_is_woken) - return; - j->queue_is_woken = true; - journal_queue_wakeup_next(&j->waiters, force_ready); -} +void +journal_queue_wakeup(bool force_ready); /** * Check whether any of the queue size limits is reached. @@ -170,9 +159,9 @@ journal_queue_wakeup(struct journal *j, bool force_ready) * before proceeding with a new asynchronous write request. */ static inline bool -journal_queue_is_full(struct journal *j) +journal_queue_is_full(void) { - assert(j == current_journal); + struct journal *j = current_journal; return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) || (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len); } @@ -182,16 +171,53 @@ journal_queue_is_full(struct journal *j) * other waiters we must go after them to preserve write order. */ static inline bool -journal_queue_has_waiters(struct journal *j) +journal_queue_has_waiters(void) { - assert(j == current_journal); - return !rlist_empty(&j->waiters); + return !rlist_empty(¤t_journal->waiters); } /** Yield until there's some space in the journal queue. */ void journal_wait_queue(void); +/** Set maximal journal queue size in bytes. */ +static inline void +journal_queue_set_max_size(struct journal *j, int64_t size) +{ + assert(j == current_journal); + j->queue_max_size = size; + if (journal_queue_has_waiters() && !journal_queue_is_full()) + journal_queue_wakeup(false); +} + +/** Set maximal journal queue length, in entries. */ +static inline void +journal_queue_set_max_len(struct journal *j, int64_t len) +{ + assert(j == current_journal); + j->queue_max_len = len; + if (journal_queue_has_waiters() && !journal_queue_is_full()) + journal_queue_wakeup(false); +} + +/** Increase queue size on a new write request. */ +static inline void +journal_queue_on_append(struct journal_entry *entry) +{ + current_journal->queue_len++; + current_journal->queue_size += entry->approx_len; +} + +/** Decrease queue size once write request is complete. */ +static inline void +journal_queue_on_complete(struct journal_entry *entry) +{ + current_journal->queue_len--; + current_journal->queue_size -= entry->approx_len; + assert(current_journal->queue_len >= 0); + assert(current_journal->queue_size >= 0); +} + /** * Complete asynchronous write. */ @@ -199,12 +225,11 @@ static inline void journal_async_complete(struct journal_entry *entry) { assert(entry->write_async_cb != NULL); - current_journal->queue_len--; - current_journal->queue_size -= entry->approx_len; - assert(current_journal->queue_len >= 0); - assert(current_journal->queue_size >= 0); - if (journal_queue_has_waiters(current_journal)) - journal_queue_wakeup(current_journal, false); + + journal_queue_on_complete(entry); + if (journal_queue_has_waiters() && !journal_queue_is_full()) + journal_queue_wakeup(false); + entry->write_async_cb(entry); } @@ -216,17 +241,18 @@ journal_async_complete(struct journal_entry *entry) static inline int journal_write(struct journal_entry *entry) { - if (journal_queue_has_waiters(current_journal)) { + if (journal_queue_has_waiters()) { /* * It's a synchronous write, so it's fine to wait a bit more for * everyone else to be written. They'll wake us up back * afterwards. */ - journal_queue_wakeup(current_journal, true); + journal_queue_wakeup(true); journal_wait_queue(); } - current_journal->queue_size += entry->approx_len; - current_journal->queue_len += 1; + + journal_queue_on_append(entry); + return current_journal->write(current_journal, entry); } @@ -242,8 +268,8 @@ journal_write_async(struct journal_entry *entry) * It's the job of the caller to check whether the queue is full prior * to submitting the request. */ - current_journal->queue_size += entry->approx_len; - current_journal->queue_len += 1; + journal_queue_on_append(entry); + return current_journal->write_async(current_journal, entry); } diff --git a/src/box/wal.c b/src/box/wal.c index 9fff4220a..5bc7a0685 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -768,19 +768,13 @@ wal_set_checkpoint_threshold(int64_t threshold) void wal_set_queue_max_size(int64_t size) { - struct journal *base = &wal_writer_singleton.base; - base->queue_max_size = size; - if (journal_queue_has_waiters(base) && !journal_queue_is_full(base)) - journal_queue_wakeup(base, false); + journal_queue_set_max_size(&wal_writer_singleton.base, size); } void wal_set_queue_max_len(int64_t len) { - struct journal *base = &wal_writer_singleton.base; - base->queue_max_len = len; - if (journal_queue_has_waiters(base) && !journal_queue_is_full(base)) - journal_queue_wakeup(base, false); + journal_queue_set_max_len(&wal_writer_singleton.base, len); } struct wal_gc_msg -- Serge Petrenko
next prev parent reply other threads:[~2021-02-16 12:47 UTC|newest] Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-02-11 12:17 Serge Petrenko via Tarantool-patches 2021-02-15 11:17 ` Cyrill Gorcunov via Tarantool-patches 2021-02-16 12:47 ` Serge Petrenko via Tarantool-patches [this message] 2021-02-16 12:49 ` Cyrill Gorcunov via Tarantool-patches 2021-02-17 20:46 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-18 20:06 ` Serge Petrenko via Tarantool-patches 2021-02-23 22:19 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-24 19:32 ` Serge Petrenko via Tarantool-patches 2021-02-26 0:58 ` Vladislav Shpilevoy via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=4050015c-bf24-eac3-a890-216efab8c635@tarantool.org \ --to=tarantool-patches@dev.tarantool.org \ --cc=gorcunov@gmail.com \ --cc=sergepetrenko@tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox