Tarantool development patches archive
 help / color / mirror / Atom feed
From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: Cyrill Gorcunov <gorcunov@gmail.com>
Cc: v.shpilevoy@tarantool.org, tarantool-patches@dev.tarantool.org
Subject: Re: [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes
Date: Tue, 16 Feb 2021 15:47:09 +0300	[thread overview]
Message-ID: <4050015c-bf24-eac3-a890-216efab8c635@tarantool.org> (raw)
In-Reply-To: <YCpYZ/K8V8ePZvSD@grain>



15.02.2021 14:17, Cyrill Gorcunov пишет:
> On Thu, Feb 11, 2021 at 03:17:50PM +0300, Serge Petrenko wrote:
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 553db76fc..06aaa0a79 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>>   		goto success;
>>   	}
>>   
>> +	/*
>> +	 * Do not spam WAL with excess write requests, let it process what's
>> +	 * piled up first.
>> +	 * This is done before opening the transaction to avoid problems with
>> +	 * yielding inside it.
>> +	 */
>> +	if (journal_queue_is_full(current_journal))
>> +		journal_wait_queue();
>> +
> Serge, just a few comments, feel free to ignore.

Hi! Thanks for the review!

>
> Maybe it would be better to pass current_journal to journal_wait_queue,
> otherwise it looks somehow inconsistent, no?
>
> 	if (journal_queue_is_full(current_journal))
> 		journal_wait_queue(current_journal);

I tried to remove parameters from almost all methods, as dicsussed in chat.

>
> Actually I would name journal queue engine as plain journalq, this would
> look like
>
> 	if (journalq_is_full(current_journal))
> 		journalq_wait(current_journal);
>
> But it is very humble pov, lets stick with long `journal_queue`.
> ...

To be honest, I like `journal_queue_` prefix more.

>> +/** Wake the journal queue up. */
>> +static inline void
>> +journal_queue_wakeup(struct journal *j, bool force_ready)
>> +{
>> +	assert(j == current_journal);
> Seems this assert is not needed. The overall idea of passing
> journal as an argument is quite the reverse, ie to work with
> any journal. This is not a blocker, could be cleaned up on top
> or simply ignored.

Same as above, discussed verbally.

>
>> +	assert(!rlist_empty(&j->waiters));
>> +	if (j->queue_is_woken)
>> +		return;
>> +	j->queue_is_woken = true;
>> +	journal_queue_wakeup_next(&j->waiters, force_ready);
>> +}
>> +
>> +/**
>> + * Check whether any of the queue size limits is reached.
>> + * If the queue is full, we must wait for some of the entries to be written
>> + * before proceeding with a new asynchronous write request.
>> + */
>> +static inline bool
>> +journal_queue_is_full(struct journal *j)
>> +{
>> +	assert(j == current_journal);
> same, no need for assert()
>
>> +	return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) ||
>> +	       (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len);
>> +}
>> +
>> +/**
>> + * Check whether anyone is waiting for the journal queue to empty. If there are
>> + * other waiters we must go after them to preserve write order.
>> + */
>> +static inline bool
>> +journal_queue_has_waiters(struct journal *j)
>> +{
>> +	assert(j == current_journal);
> same, no need for assert()
>
>> +	return !rlist_empty(&j->waiters);
>> +}
>> +
>> +/** Yield until there's some space in the journal queue. */
>> +void
>> +journal_wait_queue(void);
>> +
>>   /**
>>    * Complete asynchronous write.
>>    */
>> @@ -131,15 +199,15 @@ static inline void
>>   journal_async_complete(struct journal_entry *entry)
>>   {
>>   	assert(entry->write_async_cb != NULL);
>> +	current_journal->queue_len--;
>> +	current_journal->queue_size -= entry->approx_len;
> Myabe worth to make queue ops closed into some helper? Because
> length and size can't be updated without a tangle. IOW, something
> like
>
> static inline void
> journal_queue_attr_dec(struct journal *j, struct journal_entry *entry)
> {
> 	j->queue_len--;
> 	j->queue_size -= entry->approx_len;
> }
>
> static inline void
> journal_queue_attr_inc(struct journal *j, struct journal_entry *entry)
> {
> 	j->queue_len++;
> 	j->queue_size += entry->approx_len;
> }
>
> Again, this is my pov, **free to ignore**. attr here stands for
> attributes because queue_len and queue_size are not the queue
> itself but attributes which controls when we need to wait
> data to be flushed.

Ok, sure. Introduced
`journal_queue_on_append(struct journal_entry *entry)`
and
`journal_queue_on_complete(struct journal_entry *entry)`

>
>> +	assert(current_journal->queue_len >= 0);
>> +	assert(current_journal->queue_size >= 0);
>> +	if (journal_queue_has_waiters(current_journal))
>> +		journal_queue_wakeup(current_journal, false);
>>   	entry->write_async_cb(entry);
>>   }

Here's an incremental diff. It's all pure refactoring with no functional 
changes.
I've intdouced `journal_queue_on_append` and `journal_queue_on_complete` 
for increasing and
decreasing queue length and size, and tried to remove `struct journal` 
parameter from almost
every new method, except `journal_queue_set_max_size` and 
`journal_queue_set_max_len`

====================================================

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 06aaa0a79..7c2452d2b 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -973,7 +973,7 @@ applier_apply_tx(struct applier *applier, struct 
stailq *rows)
       * This is done before opening the transaction to avoid problems with
       * yielding inside it.
       */
-    if (journal_queue_is_full(current_journal))
+    if (journal_queue_is_full())
          journal_wait_queue();

      /**
diff --git a/src/box/journal.c b/src/box/journal.c
index 19a184580..49441e596 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -68,7 +68,7 @@ struct journal_queue_entry {
  /**
   * Wake up the next waiter in journal queue.
   */
-void
+static inline void
  journal_queue_wakeup_next(struct rlist *link, bool force_ready)
  {
      /* Empty queue or last entry in queue. */
@@ -80,16 +80,26 @@ journal_queue_wakeup_next(struct rlist *link, bool 
force_ready)
       * When the queue isn't forcefully emptied, no need to wake everyone
       * else up until there's some free space.
       */
-    if (journal_queue_is_full(current_journal) && !force_ready) {
+    if (!force_ready && journal_queue_is_full()) {
          current_journal->queue_is_woken = false;
          return;
      }
      struct journal_queue_entry *e = rlist_entry(rlist_next(link), 
typeof(*e),
                              in_queue);
-    e->is_ready |= force_ready;
+    e->is_ready = force_ready;
      fiber_wakeup(e->fiber);
  }

+void
+journal_queue_wakeup(bool force_ready)
+{
+    assert(!rlist_empty(&current_journal->waiters));
+    if (current_journal->queue_is_woken)
+        return;
+    current_journal->queue_is_woken = true;
+    journal_queue_wakeup_next(&current_journal->waiters, force_ready);
+}
+
  void
  journal_wait_queue(void)
  {
@@ -101,7 +111,7 @@ journal_wait_queue(void)
      /*
       * Will be waken up by either queue emptying or a synchronous write.
       */
-    while (journal_queue_is_full(current_journal) && !entry.is_ready)
+    while (journal_queue_is_full() && !entry.is_ready)
          fiber_yield();

      journal_queue_wakeup_next(&entry.in_queue, entry.is_ready);
diff --git a/src/box/journal.h b/src/box/journal.h
index 9c8af062a..d295dfa4b 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -149,20 +149,9 @@ struct journal {
   */
  extern struct journal *current_journal;

-void
-journal_queue_wakeup_next(struct rlist *link, bool force_ready);
-
  /** Wake the journal queue up. */
-static inline void
-journal_queue_wakeup(struct journal *j, bool force_ready)
-{
-    assert(j == current_journal);
-    assert(!rlist_empty(&j->waiters));
-    if (j->queue_is_woken)
-        return;
-    j->queue_is_woken = true;
-    journal_queue_wakeup_next(&j->waiters, force_ready);
-}
+void
+journal_queue_wakeup(bool force_ready);

  /**
   * Check whether any of the queue size limits is reached.
@@ -170,9 +159,9 @@ journal_queue_wakeup(struct journal *j, bool 
force_ready)
   * before proceeding with a new asynchronous write request.
   */
  static inline bool
-journal_queue_is_full(struct journal *j)
+journal_queue_is_full(void)
  {
-    assert(j == current_journal);
+    struct journal *j = current_journal;
      return (j->queue_max_size != 0 && j->queue_size >= 
j->queue_max_size) ||
             (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len);
  }
@@ -182,16 +171,53 @@ journal_queue_is_full(struct journal *j)
   * other waiters we must go after them to preserve write order.
   */
  static inline bool
-journal_queue_has_waiters(struct journal *j)
+journal_queue_has_waiters(void)
  {
-    assert(j == current_journal);
-    return !rlist_empty(&j->waiters);
+    return !rlist_empty(&current_journal->waiters);
  }

  /** Yield until there's some space in the journal queue. */
  void
  journal_wait_queue(void);

+/** Set maximal journal queue size in bytes. */
+static inline void
+journal_queue_set_max_size(struct journal *j, int64_t size)
+{
+    assert(j == current_journal);
+    j->queue_max_size = size;
+    if (journal_queue_has_waiters() && !journal_queue_is_full())
+        journal_queue_wakeup(false);
+}
+
+/** Set maximal journal queue length, in entries. */
+static inline void
+journal_queue_set_max_len(struct journal *j, int64_t len)
+{
+    assert(j == current_journal);
+    j->queue_max_len = len;
+    if (journal_queue_has_waiters() && !journal_queue_is_full())
+        journal_queue_wakeup(false);
+}
+
+/** Increase queue size on a new write request. */
+static inline void
+journal_queue_on_append(struct journal_entry *entry)
+{
+    current_journal->queue_len++;
+    current_journal->queue_size += entry->approx_len;
+}
+
+/** Decrease queue size once write request is complete. */
+static inline void
+journal_queue_on_complete(struct journal_entry *entry)
+{
+    current_journal->queue_len--;
+    current_journal->queue_size -= entry->approx_len;
+    assert(current_journal->queue_len >= 0);
+    assert(current_journal->queue_size >= 0);
+}
+
  /**
   * Complete asynchronous write.
   */
@@ -199,12 +225,11 @@ static inline void
  journal_async_complete(struct journal_entry *entry)
  {
      assert(entry->write_async_cb != NULL);
-    current_journal->queue_len--;
-    current_journal->queue_size -= entry->approx_len;
-    assert(current_journal->queue_len >= 0);
-    assert(current_journal->queue_size >= 0);
-    if (journal_queue_has_waiters(current_journal))
-        journal_queue_wakeup(current_journal, false);
+
+    journal_queue_on_complete(entry);
+    if (journal_queue_has_waiters() && !journal_queue_is_full())
+        journal_queue_wakeup(false);
+
      entry->write_async_cb(entry);
  }

@@ -216,17 +241,18 @@ journal_async_complete(struct journal_entry *entry)
  static inline int
  journal_write(struct journal_entry *entry)
  {
-    if (journal_queue_has_waiters(current_journal)) {
+    if (journal_queue_has_waiters()) {
          /*
           * It's a synchronous write, so it's fine to wait a bit more for
           * everyone else to be written. They'll wake us up back
           * afterwards.
           */
-        journal_queue_wakeup(current_journal, true);
+        journal_queue_wakeup(true);
          journal_wait_queue();
      }
-    current_journal->queue_size += entry->approx_len;
-    current_journal->queue_len += 1;
+
+    journal_queue_on_append(entry);
+
      return current_journal->write(current_journal, entry);
  }

@@ -242,8 +268,8 @@ journal_write_async(struct journal_entry *entry)
       * It's the job of the caller to check whether the queue is full prior
       * to submitting the request.
       */
-    current_journal->queue_size += entry->approx_len;
-    current_journal->queue_len += 1;
+    journal_queue_on_append(entry);
+
      return current_journal->write_async(current_journal, entry);
  }

diff --git a/src/box/wal.c b/src/box/wal.c
index 9fff4220a..5bc7a0685 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -768,19 +768,13 @@ wal_set_checkpoint_threshold(int64_t threshold)
  void
  wal_set_queue_max_size(int64_t size)
  {
-    struct journal *base = &wal_writer_singleton.base;
-    base->queue_max_size = size;
-    if (journal_queue_has_waiters(base) && !journal_queue_is_full(base))
-        journal_queue_wakeup(base, false);
+    journal_queue_set_max_size(&wal_writer_singleton.base, size);
  }

  void
  wal_set_queue_max_len(int64_t len)
  {
-    struct journal *base = &wal_writer_singleton.base;
-    base->queue_max_len = len;
-    if (journal_queue_has_waiters(base) && !journal_queue_is_full(base))
-        journal_queue_wakeup(base, false);
+    journal_queue_set_max_len(&wal_writer_singleton.base, len);
  }

  struct wal_gc_msg

-- 
Serge Petrenko


  reply	other threads:[~2021-02-16 12:47 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-02-11 12:17 Serge Petrenko via Tarantool-patches
2021-02-15 11:17 ` Cyrill Gorcunov via Tarantool-patches
2021-02-16 12:47   ` Serge Petrenko via Tarantool-patches [this message]
2021-02-16 12:49     ` Cyrill Gorcunov via Tarantool-patches
2021-02-17 20:46 ` Vladislav Shpilevoy via Tarantool-patches
2021-02-18 20:06   ` Serge Petrenko via Tarantool-patches
2021-02-23 22:19     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-24 19:32       ` Serge Petrenko via Tarantool-patches
2021-02-26  0:58         ` Vladislav Shpilevoy via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=4050015c-bf24-eac3-a890-216efab8c635@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=gorcunov@gmail.com \
    --cc=sergepetrenko@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox