From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id F18EF6EC6F; Fri, 26 Feb 2021 03:56:54 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org F18EF6EC6F DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1614301015; bh=9IBih0lOjj9DhMkxmaLXkQedPshq4UnKVtMABhvmCMs=; h=To:Cc:References:Date:In-Reply-To:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=NrtQgbY03JcMYbC40SZk+o1H5obIL6b8G9dv/1kZPku5WbiZNKKiAupvkNZJhttQV BnvbFPbHwCBbM7DZEEzQ0ltwWqdFfnPh/frGWOeeNKU/1zuyiD7rN0wQDQEHWuyxPi tIw03CMe7ZtufSphq+X8Qp6MuqDNYWtB9rnQcOeA= Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id E5FE96EC6F for ; Fri, 26 Feb 2021 03:56:53 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org E5FE96EC6F Received: by smtpng3.m.smailru.net with esmtpa (envelope-from ) id 1lFRR6-0004CS-S7; Fri, 26 Feb 2021 03:56:53 +0300 To: Serge Petrenko , gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org References: <20210224193549.70017-1-sergepetrenko@tarantool.org> Message-ID: <469ef629-699c-0180-facb-f166d6d3c073@tarantool.org> Date: Fri, 26 Feb 2021 01:56:51 +0100 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:78.0) Gecko/20100101 Thunderbird/78.7.1 MIME-Version: 1.0 In-Reply-To: <20210224193549.70017-1-sergepetrenko@tarantool.org> Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 7bit X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD975C3EC174F56692243410BA6471F01664CBE9F2A03BCE963182A05F5380850408DE8477893A4E010DBD7B8E4781A46E72512FA504319A985105DB0BF948745A6 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7F1942E6D70B4A2F0EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637861E0AC852D5F5A18638F802B75D45FF914D58D5BE9E6BC131B5C99E7648C95CDDE882590F889B1C3CF58CE9FF6AF34AA8BFC71EA7406303A471835C12D1D9774AD6D5ED66289B5259CC434672EE6371117882F4460429724CE54428C33FAD30A8DF7F3B2552694AC26CFBAC0749D213D2E47CDBA5A9658378DA827A17800CE7328B01A8D746D8839FA2833FD35BB23DF004C9065253843057739F23D657EF2B13377AFFFEAFD26923F8577A6DFFEA7CB0EC3B1FCAE4A06993EC92FD9297F6715571747095F342E857739F23D657EF2BD5E8D9A59859A8B6D99E715FD69BE3FA089D37D7C0E48F6C5571747095F342E857739F23D657EF2B6825BDBE14D8E7025EC15B47EAE72BACBD9CCCA9EDD067B1EDA766A37F9254B7 X-C1DE0DAB: 0D63561A33F958A589C8B4A7FDC46A4F1611284637E339240F493F58A69C45CBD59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA75968C9853642EB7C3410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34AA13E2DDB906786218EAA92CEE7E965BBD8787F39D86FA0822A6C3958FFD07085FB0FDEC09F6A9331D7E09C32AA3244C234177B89FABE675CC6561FCF8E4E616F94338140B71B8EEFACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2bioj8mqzvzJVEn0wdTXlw3iG3Q== X-Mailru-Sender: 689FA8AB762F73936BC43F508A0638220B7C5C2BD080B79B35FF1B6168F55BDB3841015FED1DE5223CC9A89AB576DD93FB559BB5D741EB963CF37A108A312F5C27E8A8C3839CE0E267EA787935ED9F1B X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Vladislav Shpilevoy via Tarantool-patches Reply-To: Vladislav Shpilevoy Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Hi! Thanks for the patch! See 8 comments below, and my diff in the end of the email and on top of the branch. On 24.02.2021 20:35, Serge Petrenko via Tarantool-patches wrote: > Since the introduction of asynchronous commit, which doesn't wait for a > WAL write to succeed, it's quite easy to clog WAL with huge amounts > write requests. For now, it's only possible from an applier, since it's > the only user of async commit at the moment. > > This happens when replica is syncing with master and reads new > transactions at a pace higher than it can write them to WAL (see docbot > request for detailed explanation). > > To ameliorate such behavior, we need to introduce some limit on > not-yet-finished WAL write requests. This is what this commit is trying > to do. > Two new counters are added to wal writer: queue_size (in bytes) and > queue_len (in wal messages) together with configuration settings: > `wal_queue_max_size` and `wal_queue_max_len`. > Size and length are increased on every new submitted request, and are > decreased once the tx receives a confirmation that a specific request > was written. > Actually, the limits are added to an abstract journal, but take effect > only for wal writer. 1. Now the limits affect any current journal, regardless of its type. Although they really work only for WAL, because only WAL journal yields AFAIK. Others just 'commit' immediately. > --- > https://github.com/tarantool/tarantool/issues/5536 > https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 553db76fc..27ddd0f29 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) > goto success; > } > > + /* > + * Do not spam WAL with excess write requests, let it process what's > + * piled up first. > + * This is done before opening the transaction to avoid problems with > + * yielding inside it. > + */ > + if (journal_queue_is_full() || journal_queue_has_waiters()) > + journal_wait_queue(); 2. Perhaps simply move both checks into journal_wait_queue(). Seems like an internal thing for the queue's API. > diff --git a/src/box/journal.c b/src/box/journal.c > index cb320b557..4c31a3dfe 100644 > --- a/src/box/journal.c > +++ b/src/box/journal.c > @@ -34,6 +34,16 @@ > > struct journal *current_journal = NULL; > > +struct journal_queue journal_queue = { > + .max_size = INT64_MAX, > + .size = 0, > + .max_len = INT64_MAX, > + .len = 0, > + .waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters), > + .is_awake = false, > + .is_ready = false, > +}; 3. Kostja might be right about most of the queue's code being a good candidate for an extraction to libcore. So the queue itself would be the semaphore + queue size and len parameters. But up to you. > + > struct journal_entry * > journal_entry_new(size_t n_rows, struct region *region, > journal_write_async_f write_async_cb, > @@ -55,3 +65,64 @@ journal_entry_new(size_t n_rows, struct region *region, > complete_data); > return entry; > } > + > +struct journal_queue_entry { > + /** The fiber waiting for queue space to free. */ > + struct fiber *fiber; > + /** A link in all waiting fibers list. */ > + struct rlist in_queue; 4. I see fiber_cond uses fiber->state member. Can we do the same here? Because the queue is not much different from fiber_cond. Both are built on top of fiber API. Which means the 'state' might be usable in the queue as well. > +}; > + > +/** > + * Wake up the first waiter in the journal queue. > + */ > +static inline void > +journal_queue_wakeup_first(void) > +{ > + struct journal_queue_entry *e; > + if (rlist_empty(&journal_queue.waiters)) > + goto out; > + /* > + * When the queue isn't forcefully emptied, no need to wake everyone > + * else up until there's some free space. > + */ > + if (!journal_queue.is_ready && journal_queue_is_full()) > + goto out; > + e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e), > + in_queue); 5. Why didn't rlist_first_entry() work? > + fiber_wakeup(e->fiber); > + return; > +out: > + journal_queue.is_awake = false; > + journal_queue.is_ready = false; > +} > + > +void > +journal_queue_wakeup(bool force_ready) > +{ > + assert(!rlist_empty(&journal_queue.waiters)); > + if (journal_queue.is_awake) > + return; > + journal_queue.is_awake = true; > + journal_queue.is_ready = force_ready; > + journal_queue_wakeup_first(); > +} > + > +void > +journal_wait_queue(void) > +{ > + struct journal_queue_entry entry = { > + .fiber = fiber(), > + }; > + rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue); > + /* > + * Will be waken up by either queue emptying or a synchronous write. > + */ > + while (journal_queue_is_full() && !journal_queue.is_ready) > + fiber_yield(); 6. You check for full anyway. So as I mentioned in the comment in applier's code, you can move all the checks into there, and do them before creating entry and adding it to the queue. As a 'fast path'. And it would make journal_wait_queue() easier to use. > + > + assert(&entry.in_queue == rlist_first(&journal_queue.waiters)); > + rlist_del(&entry.in_queue); > + > + journal_queue_wakeup_first(); > +} > diff --git a/src/box/journal.h b/src/box/journal.h > index 5d8d5a726..8fec5b27e 100644 > --- a/src/box/journal.h > +++ b/src/box/journal.h > @@ -124,6 +154,82 @@ struct journal { > struct journal_entry *entry); > }; > > +/** > + * Depending on the step of recovery and instance configuration > + * points at a concrete implementation of the journal. > + */ > +extern struct journal *current_journal; 7. Now you don't need to move current_journal declaration. > @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry) > static inline int > journal_write_async(struct journal_entry *entry) > { > + /* > + * It's the job of the caller to check whether the queue is full prior > + * to submitting the request. > + */ > + assert(!journal_queue_is_full() || journal_queue.is_ready); > + journal_queue_on_append(entry); 8. Probably must assert that waiters list is empty. Otherwise you could go out of order. > + > return current_journal->write_async(current_journal, entry); > } > I tried to do some fixes on top of your branch in order to delete the flags and some branching. Take a look at the diff below and on top of the branch in a separate commit. ==================== diff --git a/src/box/applier.cc b/src/box/applier.cc index 27ddd0f29..2586f6654 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -973,8 +973,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) * This is done before opening the transaction to avoid problems with * yielding inside it. */ - if (journal_queue_is_full() || journal_queue_has_waiters()) - journal_wait_queue(); + journal_queue_wait(); /** * Explicitly begin the transaction so that we can diff --git a/src/box/journal.c b/src/box/journal.c index 4c31a3dfe..40a5f5b1a 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -40,8 +40,7 @@ struct journal_queue journal_queue = { .max_len = INT64_MAX, .len = 0, .waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters), - .is_awake = false, - .is_ready = false, + .waiter_count = 0, }; struct journal_entry * @@ -66,63 +65,38 @@ journal_entry_new(size_t n_rows, struct region *region, return entry; } -struct journal_queue_entry { - /** The fiber waiting for queue space to free. */ - struct fiber *fiber; - /** A link in all waiting fibers list. */ - struct rlist in_queue; -}; - -/** - * Wake up the first waiter in the journal queue. - */ -static inline void -journal_queue_wakeup_first(void) -{ - struct journal_queue_entry *e; - if (rlist_empty(&journal_queue.waiters)) - goto out; - /* - * When the queue isn't forcefully emptied, no need to wake everyone - * else up until there's some free space. - */ - if (!journal_queue.is_ready && journal_queue_is_full()) - goto out; - e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e), - in_queue); - fiber_wakeup(e->fiber); - return; -out: - journal_queue.is_awake = false; - journal_queue.is_ready = false; -} - void -journal_queue_wakeup(bool force_ready) +journal_queue_wakeup(void) { - assert(!rlist_empty(&journal_queue.waiters)); - if (journal_queue.is_awake) - return; - journal_queue.is_awake = true; - journal_queue.is_ready = force_ready; - journal_queue_wakeup_first(); + struct rlist *list = &journal_queue.waiters; + if (!rlist_empty(list) && !journal_queue_is_full()) + fiber_wakeup(rlist_first_entry(list, struct fiber, state)); } void -journal_wait_queue(void) +journal_queue_wait(void) { - struct journal_queue_entry entry = { - .fiber = fiber(), - }; - rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue); + if (!journal_queue_is_full() && !journal_queue_has_waiters()) + return; + ++journal_queue.waiter_count; + rlist_add_tail_entry(&journal_queue.waiters, fiber(), state); /* * Will be waken up by either queue emptying or a synchronous write. */ - while (journal_queue_is_full() && !journal_queue.is_ready) + do { fiber_yield(); + } while (journal_queue_is_full()); + --journal_queue.waiter_count; + journal_queue_wakeup(); +} - assert(&entry.in_queue == rlist_first(&journal_queue.waiters)); - rlist_del(&entry.in_queue); - - journal_queue_wakeup_first(); +void +journal_queue_flush(void) +{ + if (!journal_queue_has_waiters()) + return; + struct rlist *list = &journal_queue.waiters; + while (!rlist_empty(list)) + fiber_wakeup(rlist_first_entry(list, struct fiber, state)); + journal_queue_wait(); } diff --git a/src/box/journal.h b/src/box/journal.h index 8fec5b27e..3b93158cf 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -128,17 +128,22 @@ struct journal_queue { * Whether the queue is being woken or not. Used to avoid multiple * concurrent wake-ups. */ - bool is_awake; - /** - * A flag used to tell the waiting fibers they may proceed even if the - * queue is full, i.e. force them to submit a write request. - */ - bool is_ready; + bool waiter_count; }; /** A single queue for all journal instances. */ extern struct journal_queue journal_queue; +/** + * Check whether anyone is waiting for the journal queue to empty. If there are + * other waiters we must go after them to preserve write order. + */ +static inline bool +journal_queue_has_waiters(void) +{ + return journal_queue.waiter_count != 0; +} ==================== I moved this function not on purpose. Cold be kept in the old place. ==================== + /** * An API for an abstract journal for all transactions of this * instance, as well as for multiple instances in case of @@ -166,7 +171,7 @@ extern struct journal *current_journal; * full. */ void -journal_queue_wakeup(bool force_ready); +journal_queue_wakeup(void); /** * Check whether any of the queue size limits is reached. @@ -180,27 +185,19 @@ journal_queue_is_full(void) journal_queue.len > journal_queue.max_len; } -/** - * Check whether anyone is waiting for the journal queue to empty. If there are - * other waiters we must go after them to preserve write order. - */ -static inline bool -journal_queue_has_waiters(void) -{ - return !rlist_empty(&journal_queue.waiters); -} - /** Yield until there's some space in the journal queue. */ void -journal_wait_queue(void); +journal_queue_wait(void); + +void +journal_queue_flush(void); /** Set maximal journal queue size in bytes. */ static inline void journal_queue_set_max_size(int64_t size) { journal_queue.max_size = size; - if (journal_queue_has_waiters() && !journal_queue_is_full()) - journal_queue_wakeup(false); + journal_queue_wakeup(); } /** Set maximal journal queue length, in entries. */ @@ -208,8 +205,7 @@ static inline void journal_queue_set_max_len(int64_t len) { journal_queue.max_len = len; - if (journal_queue_has_waiters() && !journal_queue_is_full()) - journal_queue_wakeup(false); + journal_queue_wakeup(); } /** Increase queue size on a new write request. */ @@ -239,8 +235,7 @@ journal_async_complete(struct journal_entry *entry) assert(entry->write_async_cb != NULL); journal_queue_on_complete(entry); - if (journal_queue_has_waiters() && !journal_queue_is_full()) - journal_queue_wakeup(false); + journal_queue_wakeup(); entry->write_async_cb(entry); } @@ -253,16 +248,7 @@ journal_async_complete(struct journal_entry *entry) static inline int journal_write(struct journal_entry *entry) { - if (journal_queue_has_waiters()) { - /* - * It's a synchronous write, so it's fine to wait a bit more for - * everyone else to be written. They'll wake us up back - * afterwards. - */ - journal_queue_wakeup(true); - journal_wait_queue(); - } - + journal_queue_flush(); journal_queue_on_append(entry); return current_journal->write(current_journal, entry); @@ -280,7 +266,7 @@ journal_write_async(struct journal_entry *entry) * It's the job of the caller to check whether the queue is full prior * to submitting the request. */ - assert(!journal_queue_is_full() || journal_queue.is_ready); + assert(journal_queue.waiter_count == 0); journal_queue_on_append(entry); return current_journal->write_async(current_journal, entry);