From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 74DC270358; Tue, 16 Feb 2021 15:47:14 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 74DC270358 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1613479634; bh=UqrWEzQDXBIXRmeowCBaTwWbmKwuiRQyyFX4+D8/D9k=; h=To:Cc:References:Date:In-Reply-To:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=gaAij9ghJSjvznNIqtozS7UeSZUWn6PGTA45AAzXA7WBlzRTJ/IhpKpWoUsnZJXQp h+lPvmrVq6qdkIN3Tm+KDymWUDN6uqgwwwI/xURTvw2d11fyuO9Gb6dvklQFBSd7Zu /Ks+rIFQKmUMU9r+9o9Znjf1qp6Dmf9tXjL3+iDQ= Received: from smtp31.i.mail.ru (smtp31.i.mail.ru [94.100.177.91]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id D268770358 for ; Tue, 16 Feb 2021 15:47:12 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org D268770358 Received: by smtp31.i.mail.ru with esmtpa (envelope-from ) id 1lBzl0-0006e2-84; Tue, 16 Feb 2021 15:47:10 +0300 To: Cyrill Gorcunov Cc: v.shpilevoy@tarantool.org, tarantool-patches@dev.tarantool.org References: <20210211121750.46298-1-sergepetrenko@tarantool.org> Message-ID: <4050015c-bf24-eac3-a890-216efab8c635@tarantool.org> Date: Tue, 16 Feb 2021 15:47:09 +0300 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.16; rv:78.0) Gecko/20100101 Thunderbird/78.7.1 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit Content-Language: en-GB X-7564579A: B8F34718100C35BD X-77F55803: 4F1203BC0FB41BD91883A1EE8D2E99327A08A4263FA5D41CBCD47BF76BEDDBF7182A05F5380850404E250D8092CCF7AC88F9EBE64E0BD638DAF8BD8BC075E73FAA12AE32BA6F8847 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE792C68BF9CD4C0E9EEA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637E16D8B060A9A23898638F802B75D45FF5571747095F342E8C7A0BC55FA0FE5FC187C61FB4D580F019086827B041A32AE9B379F63C38BE09A389733CBF5DBD5E913377AFFFEAFD269A417C69337E82CC2CC7F00164DA146DAFE8445B8C89999729449624AB7ADAF37F6B57BC7E64490611E7FA7ABCAF51C92176DF2183F8FC7C0A3E989B1926288338941B15DA834481F9449624AB7ADAF3735872C767BF85DA29E625A9149C048EE0A3850AC1BE2E735C8D5298E42E60C1F4AD6D5ED66289B524E70A05D1297E1BB35872C767BF85DA227C277FBC8AE2E8BDE44F4F2597C321475ECD9A6C639B01B4E70A05D1297E1BBC6867C52282FAC85D9B7C4F32B44FF57285124B2A10EEC6C00306258E7E6ABB4E4A6367B16DE6309 X-C1DE0DAB: 0D63561A33F958A501823E6C250F4D245B47877D5EB46BC6B4BC94F763535851D59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA75448CF9D3A7B2C848410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D344FDECC3B9E4D57D4D9419EC3318240093267B54DA3ED2CDD72842F8A18376E5ABF318006BA04846A1D7E09C32AA3244CB4317B97DDC25892FF9B513EFFD570B07101BF96129E4011FACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojiBTwj6noE5e2bFUuZ3j20g== X-Mailru-Sender: 3B9A0136629DC9125D61937A2360A44621D4D6DB2D81E8D8A34F6499C59BBCBAAC7AB365EE4EB87F424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BC112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" 15.02.2021 14:17, Cyrill Gorcunov пишет: > On Thu, Feb 11, 2021 at 03:17:50PM +0300, Serge Petrenko wrote: >> diff --git a/src/box/applier.cc b/src/box/applier.cc >> index 553db76fc..06aaa0a79 100644 >> --- a/src/box/applier.cc >> +++ b/src/box/applier.cc >> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) >> goto success; >> } >> >> + /* >> + * Do not spam WAL with excess write requests, let it process what's >> + * piled up first. >> + * This is done before opening the transaction to avoid problems with >> + * yielding inside it. >> + */ >> + if (journal_queue_is_full(current_journal)) >> + journal_wait_queue(); >> + > Serge, just a few comments, feel free to ignore. Hi! Thanks for the review! > > Maybe it would be better to pass current_journal to journal_wait_queue, > otherwise it looks somehow inconsistent, no? > > if (journal_queue_is_full(current_journal)) > journal_wait_queue(current_journal); I tried to remove parameters from almost all methods, as dicsussed in chat. > > Actually I would name journal queue engine as plain journalq, this would > look like > > if (journalq_is_full(current_journal)) > journalq_wait(current_journal); > > But it is very humble pov, lets stick with long `journal_queue`. > ... To be honest, I like `journal_queue_` prefix more. >> +/** Wake the journal queue up. */ >> +static inline void >> +journal_queue_wakeup(struct journal *j, bool force_ready) >> +{ >> + assert(j == current_journal); > Seems this assert is not needed. The overall idea of passing > journal as an argument is quite the reverse, ie to work with > any journal. This is not a blocker, could be cleaned up on top > or simply ignored. Same as above, discussed verbally. > >> + assert(!rlist_empty(&j->waiters)); >> + if (j->queue_is_woken) >> + return; >> + j->queue_is_woken = true; >> + journal_queue_wakeup_next(&j->waiters, force_ready); >> +} >> + >> +/** >> + * Check whether any of the queue size limits is reached. >> + * If the queue is full, we must wait for some of the entries to be written >> + * before proceeding with a new asynchronous write request. >> + */ >> +static inline bool >> +journal_queue_is_full(struct journal *j) >> +{ >> + assert(j == current_journal); > same, no need for assert() > >> + return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) || >> + (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len); >> +} >> + >> +/** >> + * Check whether anyone is waiting for the journal queue to empty. If there are >> + * other waiters we must go after them to preserve write order. >> + */ >> +static inline bool >> +journal_queue_has_waiters(struct journal *j) >> +{ >> + assert(j == current_journal); > same, no need for assert() > >> + return !rlist_empty(&j->waiters); >> +} >> + >> +/** Yield until there's some space in the journal queue. */ >> +void >> +journal_wait_queue(void); >> + >> /** >> * Complete asynchronous write. >> */ >> @@ -131,15 +199,15 @@ static inline void >> journal_async_complete(struct journal_entry *entry) >> { >> assert(entry->write_async_cb != NULL); >> + current_journal->queue_len--; >> + current_journal->queue_size -= entry->approx_len; > Myabe worth to make queue ops closed into some helper? Because > length and size can't be updated without a tangle. IOW, something > like > > static inline void > journal_queue_attr_dec(struct journal *j, struct journal_entry *entry) > { > j->queue_len--; > j->queue_size -= entry->approx_len; > } > > static inline void > journal_queue_attr_inc(struct journal *j, struct journal_entry *entry) > { > j->queue_len++; > j->queue_size += entry->approx_len; > } > > Again, this is my pov, **free to ignore**. attr here stands for > attributes because queue_len and queue_size are not the queue > itself but attributes which controls when we need to wait > data to be flushed. Ok, sure. Introduced `journal_queue_on_append(struct journal_entry *entry)` and `journal_queue_on_complete(struct journal_entry *entry)` > >> + assert(current_journal->queue_len >= 0); >> + assert(current_journal->queue_size >= 0); >> + if (journal_queue_has_waiters(current_journal)) >> + journal_queue_wakeup(current_journal, false); >> entry->write_async_cb(entry); >> } Here's an incremental diff. It's all pure refactoring with no functional changes. I've intdouced `journal_queue_on_append` and `journal_queue_on_complete` for increasing and decreasing queue length and size, and tried to remove `struct journal` parameter from almost every new method, except `journal_queue_set_max_size` and `journal_queue_set_max_len` ==================================================== diff --git a/src/box/applier.cc b/src/box/applier.cc index 06aaa0a79..7c2452d2b 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -973,7 +973,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)       * This is done before opening the transaction to avoid problems with       * yielding inside it.       */ -    if (journal_queue_is_full(current_journal)) +    if (journal_queue_is_full())          journal_wait_queue();      /** diff --git a/src/box/journal.c b/src/box/journal.c index 19a184580..49441e596 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -68,7 +68,7 @@ struct journal_queue_entry {  /**   * Wake up the next waiter in journal queue.   */ -void +static inline void  journal_queue_wakeup_next(struct rlist *link, bool force_ready)  {      /* Empty queue or last entry in queue. */ @@ -80,16 +80,26 @@ journal_queue_wakeup_next(struct rlist *link, bool force_ready)       * When the queue isn't forcefully emptied, no need to wake everyone       * else up until there's some free space.       */ -    if (journal_queue_is_full(current_journal) && !force_ready) { +    if (!force_ready && journal_queue_is_full()) {          current_journal->queue_is_woken = false;          return;      }      struct journal_queue_entry *e = rlist_entry(rlist_next(link), typeof(*e),                              in_queue); -    e->is_ready |= force_ready; +    e->is_ready = force_ready;      fiber_wakeup(e->fiber);  } +void +journal_queue_wakeup(bool force_ready) +{ +    assert(!rlist_empty(¤t_journal->waiters)); +    if (current_journal->queue_is_woken) +        return; +    current_journal->queue_is_woken = true; +    journal_queue_wakeup_next(¤t_journal->waiters, force_ready); +} +  void  journal_wait_queue(void)  { @@ -101,7 +111,7 @@ journal_wait_queue(void)      /*       * Will be waken up by either queue emptying or a synchronous write.       */ -    while (journal_queue_is_full(current_journal) && !entry.is_ready) +    while (journal_queue_is_full() && !entry.is_ready)          fiber_yield();      journal_queue_wakeup_next(&entry.in_queue, entry.is_ready); diff --git a/src/box/journal.h b/src/box/journal.h index 9c8af062a..d295dfa4b 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -149,20 +149,9 @@ struct journal {   */  extern struct journal *current_journal; -void -journal_queue_wakeup_next(struct rlist *link, bool force_ready); -  /** Wake the journal queue up. */ -static inline void -journal_queue_wakeup(struct journal *j, bool force_ready) -{ -    assert(j == current_journal); -    assert(!rlist_empty(&j->waiters)); -    if (j->queue_is_woken) -        return; -    j->queue_is_woken = true; -    journal_queue_wakeup_next(&j->waiters, force_ready); -} +void +journal_queue_wakeup(bool force_ready);  /**   * Check whether any of the queue size limits is reached. @@ -170,9 +159,9 @@ journal_queue_wakeup(struct journal *j, bool force_ready)   * before proceeding with a new asynchronous write request.   */  static inline bool -journal_queue_is_full(struct journal *j) +journal_queue_is_full(void)  { -    assert(j == current_journal); +    struct journal *j = current_journal;      return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) ||             (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len);  } @@ -182,16 +171,53 @@ journal_queue_is_full(struct journal *j)   * other waiters we must go after them to preserve write order.   */  static inline bool -journal_queue_has_waiters(struct journal *j) +journal_queue_has_waiters(void)  { -    assert(j == current_journal); -    return !rlist_empty(&j->waiters); +    return !rlist_empty(¤t_journal->waiters);  }  /** Yield until there's some space in the journal queue. */  void  journal_wait_queue(void); +/** Set maximal journal queue size in bytes. */ +static inline void +journal_queue_set_max_size(struct journal *j, int64_t size) +{ +    assert(j == current_journal); +    j->queue_max_size = size; +    if (journal_queue_has_waiters() && !journal_queue_is_full()) +        journal_queue_wakeup(false); +} + +/** Set maximal journal queue length, in entries. */ +static inline void +journal_queue_set_max_len(struct journal *j, int64_t len) +{ +    assert(j == current_journal); +    j->queue_max_len = len; +    if (journal_queue_has_waiters() && !journal_queue_is_full()) +        journal_queue_wakeup(false); +} + +/** Increase queue size on a new write request. */ +static inline void +journal_queue_on_append(struct journal_entry *entry) +{ +    current_journal->queue_len++; +    current_journal->queue_size += entry->approx_len; +} + +/** Decrease queue size once write request is complete. */ +static inline void +journal_queue_on_complete(struct journal_entry *entry) +{ +    current_journal->queue_len--; +    current_journal->queue_size -= entry->approx_len; +    assert(current_journal->queue_len >= 0); +    assert(current_journal->queue_size >= 0); +} +  /**   * Complete asynchronous write.   */ @@ -199,12 +225,11 @@ static inline void  journal_async_complete(struct journal_entry *entry)  {      assert(entry->write_async_cb != NULL); -    current_journal->queue_len--; -    current_journal->queue_size -= entry->approx_len; -    assert(current_journal->queue_len >= 0); -    assert(current_journal->queue_size >= 0); -    if (journal_queue_has_waiters(current_journal)) -        journal_queue_wakeup(current_journal, false); + +    journal_queue_on_complete(entry); +    if (journal_queue_has_waiters() && !journal_queue_is_full()) +        journal_queue_wakeup(false); +      entry->write_async_cb(entry);  } @@ -216,17 +241,18 @@ journal_async_complete(struct journal_entry *entry)  static inline int  journal_write(struct journal_entry *entry)  { -    if (journal_queue_has_waiters(current_journal)) { +    if (journal_queue_has_waiters()) {          /*           * It's a synchronous write, so it's fine to wait a bit more for           * everyone else to be written. They'll wake us up back           * afterwards.           */ -        journal_queue_wakeup(current_journal, true); +        journal_queue_wakeup(true);          journal_wait_queue();      } -    current_journal->queue_size += entry->approx_len; -    current_journal->queue_len += 1; + +    journal_queue_on_append(entry); +      return current_journal->write(current_journal, entry);  } @@ -242,8 +268,8 @@ journal_write_async(struct journal_entry *entry)       * It's the job of the caller to check whether the queue is full prior       * to submitting the request.       */ -    current_journal->queue_size += entry->approx_len; -    current_journal->queue_len += 1; +    journal_queue_on_append(entry); +      return current_journal->write_async(current_journal, entry);  } diff --git a/src/box/wal.c b/src/box/wal.c index 9fff4220a..5bc7a0685 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -768,19 +768,13 @@ wal_set_checkpoint_threshold(int64_t threshold)  void  wal_set_queue_max_size(int64_t size)  { -    struct journal *base = &wal_writer_singleton.base; -    base->queue_max_size = size; -    if (journal_queue_has_waiters(base) && !journal_queue_is_full(base)) -        journal_queue_wakeup(base, false); +    journal_queue_set_max_size(&wal_writer_singleton.base, size);  }  void  wal_set_queue_max_len(int64_t len)  { -    struct journal *base = &wal_writer_singleton.base; -    base->queue_max_len = len; -    if (journal_queue_has_waiters(base) && !journal_queue_is_full(base)) -        journal_queue_wakeup(base, false); +    journal_queue_set_max_len(&wal_writer_singleton.base, len);  }  struct wal_gc_msg -- Serge Petrenko