From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 237DD6EC5B; Mon, 1 Mar 2021 22:08:10 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 237DD6EC5B DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1614625690; bh=TdYJG2SJQ2lFNNt5C+QemakwfS76aDS6odh4R0yixXw=; h=To:Cc:References:Date:In-Reply-To:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=CT/qZItLbPs5dJJw+2JiOURVHqdsl3WUFEpHCAgov32JUoEmD2SYSTko3nfX8EAYt UXj7BPCJe0T330w9VUns0PJ881f5vL/a1LzUAKXaGeLdHvjsZzJ+wCa2p27E6+t5Ac qA/pen/7FY8DS+/ZVEdBEHOxeXOrInawir7RUpaQ= Received: from smtp47.i.mail.ru (smtp47.i.mail.ru [94.100.177.107]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id C975A6EC5B for ; Mon, 1 Mar 2021 22:08:08 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org C975A6EC5B Received: by smtp47.i.mail.ru with esmtpa (envelope-from ) id 1lGntn-0003PW-Mh; Mon, 01 Mar 2021 22:08:08 +0300 To: Vladislav Shpilevoy , gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org References: <20210224193549.70017-1-sergepetrenko@tarantool.org> <469ef629-699c-0180-facb-f166d6d3c073@tarantool.org> Message-ID: <17046c4d-b239-f3d8-f150-941026096b1e@tarantool.org> Date: Mon, 1 Mar 2021 22:08:06 +0300 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.16; rv:78.0) Gecko/20100101 Thunderbird/78.7.1 MIME-Version: 1.0 In-Reply-To: <469ef629-699c-0180-facb-f166d6d3c073@tarantool.org> Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit Content-Language: en-GB X-7564579A: B8F34718100C35BD X-77F55803: 4F1203BC0FB41BD92A98208ECBDD29F5A6709E93A77B8502A831FF4624BDF5D2182A05F53808504089AB25626DBAAD2EBE2FAB8D4FB546A4645CC73D6EF028FBD8ED3C11D941A98E X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7A3295C83650092F9EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006379D5D247705B2C2C78638F802B75D45FF914D58D5BE9E6BC131B5C99E7648C95C25752893F242F32CEABF3D40284473FF25C6B661512C213EA471835C12D1D9774AD6D5ED66289B5259CC434672EE6371117882F4460429724CE54428C33FAD30A8DF7F3B2552694AC26CFBAC0749D213D2E47CDBA5A9658378DA827A17800CE71AE4D56B06699BBC9FA2833FD35BB23DF004C9065253843057739F23D657EF2B13377AFFFEAFD26923F8577A6DFFEA7C0BF7CEC3D2FC0D9C93EC92FD9297F6715571747095F342E857739F23D657EF2BD5E8D9A59859A8B69113E021556D170A089D37D7C0E48F6C5571747095F342E857739F23D657EF2B6825BDBE14D8E7028C9DFF55498CEFB0BD9CCCA9EDD067B1EDA766A37F9254B7 X-C1DE0DAB: 0D63561A33F958A5305D9F80EF8C6722693BE57AEA36C9C40B795B5A80738124D59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA75448CF9D3A7B2C848410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D346F8291983715AC665D69F62B5B237BE5149FFC1275F2B4390FD23B9996067FC8732BE19E899D2D441D7E09C32AA3244C103BB3873C9DA1D5BCAF20B4EC5AB739C86C126E7119A0FEFACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojnpKwxR6GFbsr43wve8F5ug== X-Mailru-Sender: 3B9A0136629DC9125D61937A2360A4460DE816F3C326C20A8DFB7C42DB9771E3D6DA7CA325EFC806424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BC112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" 26.02.2021 03:56, Vladislav Shpilevoy пишет: > Hi! Thanks for the patch! > > See 8 comments below, and my diff in the end of the email and on > top of the branch. Thanks for the review & the fixes! > > On 24.02.2021 20:35, Serge Petrenko via Tarantool-patches wrote: >> Since the introduction of asynchronous commit, which doesn't wait for a >> WAL write to succeed, it's quite easy to clog WAL with huge amounts >> write requests. For now, it's only possible from an applier, since it's >> the only user of async commit at the moment. >> >> This happens when replica is syncing with master and reads new >> transactions at a pace higher than it can write them to WAL (see docbot >> request for detailed explanation). >> >> To ameliorate such behavior, we need to introduce some limit on >> not-yet-finished WAL write requests. This is what this commit is trying >> to do. >> Two new counters are added to wal writer: queue_size (in bytes) and >> queue_len (in wal messages) together with configuration settings: >> `wal_queue_max_size` and `wal_queue_max_len`. >> Size and length are increased on every new submitted request, and are >> decreased once the tx receives a confirmation that a specific request >> was written. >> Actually, the limits are added to an abstract journal, but take effect >> only for wal writer. > 1. Now the limits affect any current journal, regardless of its type. > Although they really work only for WAL, because only WAL journal > yields AFAIK. Others just 'commit' immediately. Correct. I've reworded this piece. > >> --- >> https://github.com/tarantool/tarantool/issues/5536 >> https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom >> >> diff --git a/src/box/applier.cc b/src/box/applier.cc >> index 553db76fc..27ddd0f29 100644 >> --- a/src/box/applier.cc >> +++ b/src/box/applier.cc >> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) >> goto success; >> } >> >> + /* >> + * Do not spam WAL with excess write requests, let it process what's >> + * piled up first. >> + * This is done before opening the transaction to avoid problems with >> + * yielding inside it. >> + */ >> + if (journal_queue_is_full() || journal_queue_has_waiters()) >> + journal_wait_queue(); > 2. Perhaps simply move both checks into journal_wait_queue(). Seems like > an internal thing for the queue's API. Looks good. > >> diff --git a/src/box/journal.c b/src/box/journal.c >> index cb320b557..4c31a3dfe 100644 >> --- a/src/box/journal.c >> +++ b/src/box/journal.c >> @@ -34,6 +34,16 @@ >> >> struct journal *current_journal = NULL; >> >> +struct journal_queue journal_queue = { >> + .max_size = INT64_MAX, >> + .size = 0, >> + .max_len = INT64_MAX, >> + .len = 0, >> + .waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters), >> + .is_awake = false, >> + .is_ready = false, >> +}; > 3. Kostja might be right about most of the queue's code being a good > candidate for an extraction to libcore. So the queue itself would > be the semaphore + queue size and len parameters. But up to you. I'm not sure I get it. It would be a counting semaphore, if we had a single limit, say, only entry count, or only entry size. But we have both. So it's not a "normal" counting semaphore. And if not, why extract it as a generic primitive? Moreover, the limits are now 'soft'. As discussed verbally, we'll wake the entry up and let it proceed, if we see, that the queue isn't full at the time of wake up. But it may be full again once the fiber is actually put to execution. And we ignore this. So it's a "soft counting semaphore with 2 resources". > >> + >> struct journal_entry * >> journal_entry_new(size_t n_rows, struct region *region, >> journal_write_async_f write_async_cb, >> @@ -55,3 +65,64 @@ journal_entry_new(size_t n_rows, struct region *region, >> complete_data); >> return entry; >> } >> + >> +struct journal_queue_entry { >> + /** The fiber waiting for queue space to free. */ >> + struct fiber *fiber; >> + /** A link in all waiting fibers list. */ >> + struct rlist in_queue; > 4. I see fiber_cond uses fiber->state member. Can we do the same here? > Because the queue is not much different from fiber_cond. Both are > built on top of fiber API. Which means the 'state' might be usable in > the queue as well. It's a good idea, thanks! Applied with minor changes, described below. > >> +}; >> + >> +/** >> + * Wake up the first waiter in the journal queue. >> + */ >> +static inline void >> +journal_queue_wakeup_first(void) >> +{ >> + struct journal_queue_entry *e; >> + if (rlist_empty(&journal_queue.waiters)) >> + goto out; >> + /* >> + * When the queue isn't forcefully emptied, no need to wake everyone >> + * else up until there's some free space. >> + */ >> + if (!journal_queue.is_ready && journal_queue_is_full()) >> + goto out; >> + e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e), >> + in_queue); > 5. Why didn't rlist_first_entry() work? It sure would work. Just a misprint, thanks for pointing this out! > >> + fiber_wakeup(e->fiber); >> + return; >> +out: >> + journal_queue.is_awake = false; >> + journal_queue.is_ready = false; >> +} >> + >> +void >> +journal_queue_wakeup(bool force_ready) >> +{ >> + assert(!rlist_empty(&journal_queue.waiters)); >> + if (journal_queue.is_awake) >> + return; >> + journal_queue.is_awake = true; >> + journal_queue.is_ready = force_ready; >> + journal_queue_wakeup_first(); >> +} >> + >> +void >> +journal_wait_queue(void) >> +{ >> + struct journal_queue_entry entry = { >> + .fiber = fiber(), >> + }; >> + rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue); >> + /* >> + * Will be waken up by either queue emptying or a synchronous write. >> + */ >> + while (journal_queue_is_full() && !journal_queue.is_ready) >> + fiber_yield(); > 6. You check for full anyway. So as I mentioned in the comment in > applier's code, you can move all the checks into there, and do > them before creating entry and adding it to the queue. As a > 'fast path'. And it would make journal_wait_queue() easier to use. Ok > >> + >> + assert(&entry.in_queue == rlist_first(&journal_queue.waiters)); >> + rlist_del(&entry.in_queue); >> + >> + journal_queue_wakeup_first(); >> +} >> diff --git a/src/box/journal.h b/src/box/journal.h >> index 5d8d5a726..8fec5b27e 100644 >> --- a/src/box/journal.h >> +++ b/src/box/journal.h >> @@ -124,6 +154,82 @@ struct journal { >> struct journal_entry *entry); >> }; >> >> +/** >> + * Depending on the step of recovery and instance configuration >> + * points at a concrete implementation of the journal. >> + */ >> +extern struct journal *current_journal; > 7. Now you don't need to move current_journal declaration. Moved that back. > >> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry) >> static inline int >> journal_write_async(struct journal_entry *entry) >> { >> + /* >> + * It's the job of the caller to check whether the queue is full prior >> + * to submitting the request. >> + */ >> + assert(!journal_queue_is_full() || journal_queue.is_ready); >> + journal_queue_on_append(entry); > 8. Probably must assert that waiters list is empty. Otherwise you > could go out of order. It's not empty by the time the first entry gets to 'journal_write_async'. Everyone else is waken up, but not yet removed from the queue. Looks like we cannot determine whether a write is called after waiting in queue or not. > >> + >> return current_journal->write_async(current_journal, entry); >> } >> > I tried to do some fixes on top of your branch in order to delete > the flags and some branching. > > Take a look at the diff below and on top of the branch in a separate > commit. Thanks! Looks good, with my changes on top. I squashed everything into one commit and updated the branch. ============================================ diff --git a/src/box/journal.c b/src/box/journal.c index 40a5f5b1a..92a773684 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -83,9 +83,7 @@ journal_queue_wait(void)      /*       * Will be waken up by either queue emptying or a synchronous write.       */ -    do { -        fiber_yield(); -    } while (journal_queue_is_full()); +    fiber_yield();      --journal_queue.waiter_count;      journal_queue_wakeup();  } diff --git a/src/box/journal.h b/src/box/journal.h index 3b93158cf..b5d587e3a 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -124,26 +124,13 @@ struct journal_queue {       * entered the queue.       */      struct rlist waiters; -    /** -     * Whether the queue is being woken or not. Used to avoid multiple -     * concurrent wake-ups. -     */ -    bool waiter_count; +    /** How many waiters there are in a queue. */ +    int waiter_count;  };  /** A single queue for all journal instances. */  extern struct journal_queue journal_queue; -/** - * Check whether anyone is waiting for the journal queue to empty. If there are - * other waiters we must go after them to preserve write order. - */ -static inline bool -journal_queue_has_waiters(void) -{ -    return journal_queue.waiter_count != 0; -} - ============================================ Returned this func back to its original place. ============================================  /**   * An API for an abstract journal for all transactions of this   * instance, as well as for multiple instances in case of @@ -173,6 +160,16 @@ extern struct journal *current_journal;  void  journal_queue_wakeup(void); +/** + * Check whether anyone is waiting for the journal queue to empty. If there are + * other waiters we must go after them to preserve write order. + */ +static inline bool +journal_queue_has_waiters(void) +{ +    return journal_queue.waiter_count != 0; +} +  /**   * Check whether any of the queue size limits is reached.   * If the queue is full, we must wait for some of the entries to be written @@ -235,7 +232,6 @@ journal_async_complete(struct journal_entry *entry)      assert(entry->write_async_cb != NULL);      journal_queue_on_complete(entry); -    journal_queue_wakeup(); ============================================ Let's wake the queue up once the whole batch is processed. This way we avoid excess checks and waking every waiter in queue up in case the queue is not full (each wakeup wakes the first entry up and removes it from the list, so a ton of wake-ups would wake up every entry there is). ============================================      entry->write_async_cb(entry);  } @@ -266,7 +262,6 @@ journal_write_async(struct journal_entry *entry)       * It's the job of the caller to check whether the queue is full prior       * to submitting the request.       */ -    assert(journal_queue.waiter_count == 0);      journal_queue_on_append(entry); ============================================ As I mentioned above, we cannot assert queue hasn't got waiters here. ============================================      return current_journal->write_async(current_journal, entry); diff --git a/src/box/wal.c b/src/box/wal.c index 328ab092d..7829ccc95 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -274,6 +274,8 @@ tx_schedule_queue(struct stailq *queue)      struct journal_entry *req, *tmp;      stailq_foreach_entry_safe(req, tmp, queue, fifo)          journal_async_complete(req); + +    journal_queue_wakeup();  }  /** ============================================ > > ==================== > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 27ddd0f29..2586f6654 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -973,8 +973,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) > * This is done before opening the transaction to avoid problems with > * yielding inside it. > */ > - if (journal_queue_is_full() || journal_queue_has_waiters()) > - journal_wait_queue(); > + journal_queue_wait(); > > /** > * Explicitly begin the transaction so that we can > diff --git a/src/box/journal.c b/src/box/journal.c > index 4c31a3dfe..40a5f5b1a 100644 > --- a/src/box/journal.c > +++ b/src/box/journal.c > @@ -40,8 +40,7 @@ struct journal_queue journal_queue = { > .max_len = INT64_MAX, > .len = 0, > .waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters), > - .is_awake = false, > - .is_ready = false, > + .waiter_count = 0, > }; > > struct journal_entry * > @@ -66,63 +65,38 @@ journal_entry_new(size_t n_rows, struct region *region, > return entry; > } > > -struct journal_queue_entry { > - /** The fiber waiting for queue space to free. */ > - struct fiber *fiber; > - /** A link in all waiting fibers list. */ > - struct rlist in_queue; > -}; > - > -/** > - * Wake up the first waiter in the journal queue. > - */ > -static inline void > -journal_queue_wakeup_first(void) > -{ > - struct journal_queue_entry *e; > - if (rlist_empty(&journal_queue.waiters)) > - goto out; > - /* > - * When the queue isn't forcefully emptied, no need to wake everyone > - * else up until there's some free space. > - */ > - if (!journal_queue.is_ready && journal_queue_is_full()) > - goto out; > - e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e), > - in_queue); > - fiber_wakeup(e->fiber); > - return; > -out: > - journal_queue.is_awake = false; > - journal_queue.is_ready = false; > -} > - > void > -journal_queue_wakeup(bool force_ready) > +journal_queue_wakeup(void) > { > - assert(!rlist_empty(&journal_queue.waiters)); > - if (journal_queue.is_awake) > - return; > - journal_queue.is_awake = true; > - journal_queue.is_ready = force_ready; > - journal_queue_wakeup_first(); > + struct rlist *list = &journal_queue.waiters; > + if (!rlist_empty(list) && !journal_queue_is_full()) > + fiber_wakeup(rlist_first_entry(list, struct fiber, state)); > } > > void > -journal_wait_queue(void) > +journal_queue_wait(void) > { > - struct journal_queue_entry entry = { > - .fiber = fiber(), > - }; > - rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue); > + if (!journal_queue_is_full() && !journal_queue_has_waiters()) > + return; > + ++journal_queue.waiter_count; > + rlist_add_tail_entry(&journal_queue.waiters, fiber(), state); > /* > * Will be waken up by either queue emptying or a synchronous write. > */ > - while (journal_queue_is_full() && !journal_queue.is_ready) > + do { > fiber_yield(); > + } while (journal_queue_is_full()); > + --journal_queue.waiter_count; > + journal_queue_wakeup(); > +} > > - assert(&entry.in_queue == rlist_first(&journal_queue.waiters)); > - rlist_del(&entry.in_queue); > - > - journal_queue_wakeup_first(); > +void > +journal_queue_flush(void) > +{ > + if (!journal_queue_has_waiters()) > + return; > + struct rlist *list = &journal_queue.waiters; > + while (!rlist_empty(list)) > + fiber_wakeup(rlist_first_entry(list, struct fiber, state)); > + journal_queue_wait(); > } > diff --git a/src/box/journal.h b/src/box/journal.h > index 8fec5b27e..3b93158cf 100644 > --- a/src/box/journal.h > +++ b/src/box/journal.h > @@ -128,17 +128,22 @@ struct journal_queue { > * Whether the queue is being woken or not. Used to avoid multiple > * concurrent wake-ups. > */ > - bool is_awake; > - /** > - * A flag used to tell the waiting fibers they may proceed even if the > - * queue is full, i.e. force them to submit a write request. > - */ > - bool is_ready; > + bool waiter_count; > }; > > /** A single queue for all journal instances. */ > extern struct journal_queue journal_queue; > > +/** > + * Check whether anyone is waiting for the journal queue to empty. If there are > + * other waiters we must go after them to preserve write order. > + */ > +static inline bool > +journal_queue_has_waiters(void) > +{ > + return journal_queue.waiter_count != 0; > +} > ==================== > > I moved this function not on purpose. Cold be kept in > the old place. > > ==================== > + > /** > * An API for an abstract journal for all transactions of this > * instance, as well as for multiple instances in case of > @@ -166,7 +171,7 @@ extern struct journal *current_journal; > * full. > */ > void > -journal_queue_wakeup(bool force_ready); > +journal_queue_wakeup(void); > > /** > * Check whether any of the queue size limits is reached. > @@ -180,27 +185,19 @@ journal_queue_is_full(void) > journal_queue.len > journal_queue.max_len; > } > > -/** > - * Check whether anyone is waiting for the journal queue to empty. If there are > - * other waiters we must go after them to preserve write order. > - */ > -static inline bool > -journal_queue_has_waiters(void) > -{ > - return !rlist_empty(&journal_queue.waiters); > -} > - > /** Yield until there's some space in the journal queue. */ > void > -journal_wait_queue(void); > +journal_queue_wait(void); > + > +void > +journal_queue_flush(void); > > /** Set maximal journal queue size in bytes. */ > static inline void > journal_queue_set_max_size(int64_t size) > { > journal_queue.max_size = size; > - if (journal_queue_has_waiters() && !journal_queue_is_full()) > - journal_queue_wakeup(false); > + journal_queue_wakeup(); > } > > /** Set maximal journal queue length, in entries. */ > @@ -208,8 +205,7 @@ static inline void > journal_queue_set_max_len(int64_t len) > { > journal_queue.max_len = len; > - if (journal_queue_has_waiters() && !journal_queue_is_full()) > - journal_queue_wakeup(false); > + journal_queue_wakeup(); > } > > /** Increase queue size on a new write request. */ > @@ -239,8 +235,7 @@ journal_async_complete(struct journal_entry *entry) > assert(entry->write_async_cb != NULL); > > journal_queue_on_complete(entry); > - if (journal_queue_has_waiters() && !journal_queue_is_full()) > - journal_queue_wakeup(false); > + journal_queue_wakeup(); > > entry->write_async_cb(entry); > } > @@ -253,16 +248,7 @@ journal_async_complete(struct journal_entry *entry) > static inline int > journal_write(struct journal_entry *entry) > { > - if (journal_queue_has_waiters()) { > - /* > - * It's a synchronous write, so it's fine to wait a bit more for > - * everyone else to be written. They'll wake us up back > - * afterwards. > - */ > - journal_queue_wakeup(true); > - journal_wait_queue(); > - } > - > + journal_queue_flush(); > journal_queue_on_append(entry); > > return current_journal->write(current_journal, entry); > @@ -280,7 +266,7 @@ journal_write_async(struct journal_entry *entry) > * It's the job of the caller to check whether the queue is full prior > * to submitting the request. > */ > - assert(!journal_queue_is_full() || journal_queue.is_ready); > + assert(journal_queue.waiter_count == 0); > journal_queue_on_append(entry); > > return current_journal->write_async(current_journal, entry); -- Serge Petrenko