From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>, gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org Subject: Re: [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes Date: Thu, 18 Feb 2021 23:06:53 +0300 [thread overview] Message-ID: <e44edd9b-b352-5ac0-2a50-1697e612b7a6@tarantool.org> (raw) In-Reply-To: <dd49c61a-913d-79b0-4409-000f0d1d08ab@tarantool.org> 17.02.2021 23:46, Vladislav Shpilevoy пишет: > Hi! Thanks for the patch! Thanks for the review! Please find my answers inline and the incremental diff below. > Now looks cool indeed. > > Another raw idea on which I don't insist and not even sure it is > good. But just came to my mind: how about making a separate > object called 'journal_queue'? Or 'journal_ctl'? Which is global > and is not inside of one journal. It can't be changed to another > queue/ctl, and is used by journal API. > > So we wouldn't need to worry if we configured the correct journal > because now current_journal can change at runtime, but this ctl > thing - can't. Yes, this'd fix the problem which bothers me: whether we configure the correct queue. I don't want to do this TBH, looks like it's too complex for what it's trying to achieve. > Another option - call this thing 'journal', and rename the old > 'journal' to 'journal_storage' or 'journal_api' or 'journal_vtab' > or something like this. > > Another option - ignore this, since it does not matter much. But > just in case you would want to try to fit the solution into one > of these ideas. > > See 8 comments below. > >> diff --git a/src/box/journal.c b/src/box/journal.c >> index cb320b557..49441e596 100644 >> --- a/src/box/journal.c >> +++ b/src/box/journal.c >> @@ -55,3 +55,66 @@ journal_entry_new(size_t n_rows, struct region *region, >> complete_data); >> return entry; >> } >> + >> +struct journal_queue_entry { >> + /** The fiber waiting for queue space to free. */ >> + struct fiber *fiber; >> + /** Whether the fiber should be waken up regardless of queue size. */ >> + bool is_ready; >> + /** A link in all waiting fibers list. */ >> + struct rlist in_queue; >> +}; >> + >> +/** >> + * Wake up the next waiter in journal queue. >> + */ >> +static inline void >> +journal_queue_wakeup_next(struct rlist *link, bool force_ready) > 1. The flag is known in all usage places at compilation time. Is it > possible to split the function into force/normal versions? The same > for journal_queue_wakeup() from which this runtime uncertainty arises. Actually, the parameter is not known at compile time when wakeup_next() is called from journal_wait_queue(). For now wakeup_next() only has a single check for force_ready, so moving the check outside would only increase the number of branches. journal_queue_wakeup() is called only once per a whole queue wakeup, so I suppose it doesn't hurt much it has a compile-time known parameter. > Also it is worth adding a comment why is force mode even needed. No problem. >> +{ >> + /* Empty queue or last entry in queue. */ >> + if (link == rlist_last(¤t_journal->waiters)) { > 2. I am not sure I understand what is happening here. Why is this > function in one place called with the pointer at the list itself, > and in another place with the pointer at one element? Well, <list head> -> next is the fist list entry, right? In queue_wakeup() I wake the first waiter up. Once any waiter gets woken up, it wakes up the next waiter. Which is <in_queue> -> next. That's why I have a common helper for these two cases. >> + current_journal->queue_is_woken = false; >> + return; >> + } >> + /* >> + * When the queue isn't forcefully emptied, no need to wake everyone >> + * else up until there's some free space. >> + */ >> + if (!force_ready && journal_queue_is_full()) { >> + current_journal->queue_is_woken = false; > 3. Maybe woken -> awake? No problem. > 4. Why do you need the flag? Can you just remove the awake entries > from the queue right away? Then it wouldn't even be possible to make > a double wakeup. See comment 5. I think I can't. Please see answer to comment 5. >> + return; >> + } >> + struct journal_queue_entry *e = rlist_entry(rlist_next(link), typeof(*e), >> + in_queue); >> + e->is_ready = force_ready; >> + fiber_wakeup(e->fiber); >> +} >> + >> +void >> +journal_queue_wakeup(bool force_ready) >> +{ >> + assert(!rlist_empty(¤t_journal->waiters)); >> + if (current_journal->queue_is_woken) >> + return; >> + current_journal->queue_is_woken = true; >> + journal_queue_wakeup_next(¤t_journal->waiters, force_ready); >> +} >> + >> +void >> +journal_wait_queue(void) >> +{ >> + struct journal_queue_entry entry = { >> + .fiber = fiber(), >> + .is_ready = false, >> + }; >> + rlist_add_tail_entry(¤t_journal->waiters, &entry, in_queue); >> + /* >> + * Will be waken up by either queue emptying or a synchronous write. >> + */ >> + while (journal_queue_is_full() && !entry.is_ready) >> + fiber_yield(); >> + >> + journal_queue_wakeup_next(&entry.in_queue, entry.is_ready); >> + assert(&entry.in_queue == rlist_first(¤t_journal->waiters)); >> + rlist_del(&entry.in_queue); > 5. Can rlist_del be done along with fiber_wakeup()? Then you > wouldn't need is_woken maybe. Looks like it can't. Say we have only one waiter. And remove it from the list on wakeup. The list would become empty and there'd be no way to check whether journal has any waiters, and we may reorder the entries (put new ones before the waiting one). This is not necessarily bad, because I put entries into queue before txn_begin(), but someone may call journal_wait_queue() from inside the transaction, or right before txn_commit(). Then it might be bad to put other transactions before this one. So while removing is_woken we would have to introduce queue_has_waiters flag for the sake of this single waiter. >> +} >> diff --git a/src/box/journal.h b/src/box/journal.h >> index 5d8d5a726..d295dfa4b 100644 >> --- a/src/box/journal.h >> +++ b/src/box/journal.h >> @@ -124,6 +143,81 @@ struct journal { >> struct journal_entry *entry); >> }; >> >> +/** >> + * Depending on the step of recovery and instance configuration >> + * points at a concrete implementation of the journal. >> + */ >> +extern struct journal *current_journal; >> + >> +/** Wake the journal queue up. */ >> +void >> +journal_queue_wakeup(bool force_ready); >> + >> +/** >> + * Check whether any of the queue size limits is reached. >> + * If the queue is full, we must wait for some of the entries to be written >> + * before proceeding with a new asynchronous write request. >> + */ >> +static inline bool >> +journal_queue_is_full(void) >> +{ >> + struct journal *j = current_journal; >> + return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) || >> + (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len); > 6. Seems like a lot of checks. Option 1: make queue_max_size = INT64_MAX > when user passes 0. Then no need to check for != 0. The same for queue_max_len. Sounds good, thanks for the suggestion! > Option 2 which may be stupid (but combined with option 1): store a flag > 'is_full' and update it when update queue_size and queue_len and see they > exceeded the limit. But I am not sure it reduces number of branches. Didn't > check. Then we'd evaluate is_full() every journal_confirm() and journal_write: for both sync and async writes, which happens more often than the actual check is needed (only for async writes). I think it's better to calculate is_full on demand rather than every time it might change. >> +} >> + >> +/** >> + * Check whether anyone is waiting for the journal queue to empty. If there are >> + * other waiters we must go after them to preserve write order. >> + */ >> +static inline bool >> +journal_queue_has_waiters(void) >> +{ >> + return !rlist_empty(¤t_journal->waiters); >> +} >> + >> +/** Yield until there's some space in the journal queue. */ >> +void >> +journal_wait_queue(void); >> + >> +/** Set maximal journal queue size in bytes. */ >> +static inline void >> +journal_queue_set_max_size(struct journal *j, int64_t size) > 7. Why do we have journal parameter here, but don't have it in > the other functions? The same journal_queue_set_max_len. This is my attempt to make sure only wal_writer's journal has a queue. I explicitly set queue_max_... parameters only for wal_writer's journal. And then there's an assert that journal_queue_set_...() is only called with the current journal. >> +{ >> + assert(j == current_journal); >> + j->queue_max_size = size; >> + if (journal_queue_has_waiters() && !journal_queue_is_full()) >> + journal_queue_wakeup(false); >> +} >> @@ -159,6 +264,12 @@ journal_write(struct journal_entry *entry) >> static inline int >> journal_write_async(struct journal_entry *entry) >> { >> + /* >> + * It's the job of the caller to check whether the queue is full prior >> + * to submitting the request. > 8. Maybe add an assert though. I wanted to, but it's impossible. The queue may be full when all the waiters are forcefully waken up by a synchronous commit. And it's hard to tell whether it was a "force" wakeup or not. So let's just hope noone misuses this API. Or, even better, I can remove is_ready field from queue entries and add a new field to the journal: queue_is_ready or something. And addition to queue_is_awake. Then every entry will check queue_is_ready instead of entry.is_ready and it'll be possible to add an assert here: !journal_queue_is_full || journal_queue_is_ready Looks like this'll also allow us to extract queue_wakeup_(next)_force, like you suggested in paragraph 1. What do you think ? >> + */ >> + journal_queue_on_append(entry); >> + >> return current_journal->write_async(current_journal, entry); >> } Incremental diff: diff --git a/src/box/box.cc b/src/box/box.cc index 2b335599e..9a3b092d0 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -762,6 +762,9 @@ box_check_wal_queue_max_len(void) diag_set(ClientError, ER_CFG, "wal_queue_max_len", "wal_queue_max_len must be >= 0"); } + /* Unlimited. */ + if (len == 0) + len = INT64_MAX; return len; } @@ -773,6 +776,9 @@ box_check_wal_queue_max_size(void) diag_set(ClientError, ER_CFG, "wal_queue_max_size", "wal_queue_max_size must be >= 0"); } + /* Unlimited. */ + if (size == 0) + size = INT64_MAX; return size; } diff --git a/src/box/journal.c b/src/box/journal.c index 49441e596..931797faf 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -73,7 +73,7 @@ journal_queue_wakeup_next(struct rlist *link, bool force_ready) { /* Empty queue or last entry in queue. */ if (link == rlist_last(¤t_journal->waiters)) { - current_journal->queue_is_woken = false; + current_journal->queue_is_awake = false; return; } /* @@ -81,7 +81,7 @@ journal_queue_wakeup_next(struct rlist *link, bool force_ready) * else up until there's some free space. */ if (!force_ready && journal_queue_is_full()) { - current_journal->queue_is_woken = false; + current_journal->queue_is_awake = false; return; } struct journal_queue_entry *e = rlist_entry(rlist_next(link), typeof(*e), @@ -94,9 +94,9 @@ void journal_queue_wakeup(bool force_ready) { assert(!rlist_empty(¤t_journal->waiters)); - if (current_journal->queue_is_woken) + if (current_journal->queue_is_awake) return; - current_journal->queue_is_woken = true; + current_journal->queue_is_awake = true; journal_queue_wakeup_next(¤t_journal->waiters, force_ready); } diff --git a/src/box/journal.h b/src/box/journal.h index d295dfa4b..2caac4099 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -133,7 +133,7 @@ struct journal { * Whether the queue is being woken or not. Used to avoid multiple * concurrent wake-ups. */ - bool queue_is_woken; + bool queue_is_awake; /** Asynchronous write */ int (*write_async)(struct journal *journal, struct journal_entry *entry); @@ -149,7 +149,11 @@ struct journal { */ extern struct journal *current_journal; -/** Wake the journal queue up. */ +/** + * Wake the journal queue up. + * @param force_ready whether waiters should proceed even if the queue is still + * full. + */ void journal_queue_wakeup(bool force_ready); @@ -162,8 +166,8 @@ static inline bool journal_queue_is_full(void) { struct journal *j = current_journal; - return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) || - (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len); + return j->queue_size > j->queue_max_size || + j->queue_len > j->queue_max_len; } /** @@ -310,10 +314,10 @@ journal_create(struct journal *journal, journal->write_async = write_async; journal->write = write; journal->queue_size = 0; - journal->queue_max_size = 0; + journal->queue_max_size = INT64_MAX; journal->queue_len = 0; - journal->queue_max_len = 0; - journal->queue_is_woken = false; + journal->queue_max_len = INT64_MAX; + journal->queue_is_awake = false; rlist_create(&journal->waiters); } -- Serge Petrenko
next prev parent reply other threads:[~2021-02-18 20:06 UTC|newest] Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-02-11 12:17 Serge Petrenko via Tarantool-patches 2021-02-15 11:17 ` Cyrill Gorcunov via Tarantool-patches 2021-02-16 12:47 ` Serge Petrenko via Tarantool-patches 2021-02-16 12:49 ` Cyrill Gorcunov via Tarantool-patches 2021-02-17 20:46 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-18 20:06 ` Serge Petrenko via Tarantool-patches [this message] 2021-02-23 22:19 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-24 19:32 ` Serge Petrenko via Tarantool-patches 2021-02-26 0:58 ` Vladislav Shpilevoy via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=e44edd9b-b352-5ac0-2a50-1697e612b7a6@tarantool.org \ --to=tarantool-patches@dev.tarantool.org \ --cc=gorcunov@gmail.com \ --cc=sergepetrenko@tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox