Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: Serge Petrenko <sergepetrenko@tarantool.org>, gorcunov@gmail.com
Cc: tarantool-patches@dev.tarantool.org
Subject: Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
Date: Fri, 26 Feb 2021 01:56:51 +0100	[thread overview]
Message-ID: <469ef629-699c-0180-facb-f166d6d3c073@tarantool.org> (raw)
In-Reply-To: <20210224193549.70017-1-sergepetrenko@tarantool.org>

Hi! Thanks for the patch!

See 8 comments below, and my diff in the end of the email and on
top of the branch.

On 24.02.2021 20:35, Serge Petrenko via Tarantool-patches wrote:
> Since the introduction of asynchronous commit, which doesn't wait for a
> WAL write to succeed, it's quite easy to clog WAL with huge amounts
> write requests. For now, it's only possible from an applier, since it's
> the only user of async commit at the moment.
> 
> This happens when replica is syncing with master and reads new
> transactions at a pace higher than it can write them to WAL (see docbot
> request for detailed explanation).
> 
> To ameliorate such behavior, we need to introduce some limit on
> not-yet-finished WAL write requests. This is what this commit is trying
> to do.
> Two new counters are added to wal writer: queue_size (in bytes) and
> queue_len (in wal messages) together with configuration settings:
> `wal_queue_max_size` and `wal_queue_max_len`.
> Size and length are increased on every new submitted request, and are
> decreased once the tx receives a confirmation that a specific request
> was written.
> Actually, the limits are added to an abstract journal, but take effect
> only for wal writer.

1. Now the limits affect any current journal, regardless of its type.
Although they really work only for WAL, because only WAL journal
yields AFAIK. Others just 'commit' immediately.

> ---
> https://github.com/tarantool/tarantool/issues/5536
> https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 553db76fc..27ddd0f29 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>  		goto success;
>  	}
>  
> +	/*
> +	 * Do not spam WAL with excess write requests, let it process what's
> +	 * piled up first.
> +	 * This is done before opening the transaction to avoid problems with
> +	 * yielding inside it.
> +	 */
> +	if (journal_queue_is_full() || journal_queue_has_waiters())
> +		journal_wait_queue();

2. Perhaps simply move both checks into journal_wait_queue(). Seems like
an internal thing for the queue's API.

> diff --git a/src/box/journal.c b/src/box/journal.c
> index cb320b557..4c31a3dfe 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -34,6 +34,16 @@
>  
>  struct journal *current_journal = NULL;
>  
> +struct journal_queue journal_queue = {
> +	.max_size = INT64_MAX,
> +	.size = 0,
> +	.max_len = INT64_MAX,
> +	.len = 0,
> +	.waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters),
> +	.is_awake = false,
> +	.is_ready = false,
> +};

3. Kostja might be right about most of the queue's code being a good
candidate for an extraction to libcore. So the queue itself would
be the semaphore + queue size and len parameters. But up to you.

> +
>  struct journal_entry *
>  journal_entry_new(size_t n_rows, struct region *region,
>  		  journal_write_async_f write_async_cb,
> @@ -55,3 +65,64 @@ journal_entry_new(size_t n_rows, struct region *region,
>  			     complete_data);
>  	return entry;
>  }
> +
> +struct journal_queue_entry {
> +	/** The fiber waiting for queue space to free. */
> +	struct fiber *fiber;
> +	/** A link in all waiting fibers list. */
> +	struct rlist in_queue;

4. I see fiber_cond uses fiber->state member. Can we do the same here?
Because the queue is not much different from fiber_cond. Both are
built on top of fiber API. Which means the 'state' might be usable in
the queue as well.

> +};
> +
> +/**
> + * Wake up the first waiter in the journal queue.
> + */
> +static inline void
> +journal_queue_wakeup_first(void)
> +{
> +	struct journal_queue_entry *e;
> +	if (rlist_empty(&journal_queue.waiters))
> +		goto out;
> +	/*
> +	 * When the queue isn't forcefully emptied, no need to wake everyone
> +	 * else up until there's some free space.
> +	 */
> +	if (!journal_queue.is_ready && journal_queue_is_full())
> +		goto out;
> +	e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e),
> +			in_queue);

5. Why didn't rlist_first_entry() work?

> +	fiber_wakeup(e->fiber);
> +	return;
> +out:
> +	journal_queue.is_awake = false;
> +	journal_queue.is_ready = false;
> +}
> +
> +void
> +journal_queue_wakeup(bool force_ready)
> +{
> +	assert(!rlist_empty(&journal_queue.waiters));
> +	if (journal_queue.is_awake)
> +		return;
> +	journal_queue.is_awake = true;
> +	journal_queue.is_ready = force_ready;
> +	journal_queue_wakeup_first();
> +}
> +
> +void
> +journal_wait_queue(void)
> +{
> +	struct journal_queue_entry entry = {
> +		.fiber = fiber(),
> +	};
> +	rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue);
> +	/*
> +	 * Will be waken up by either queue emptying or a synchronous write.
> +	 */
> +	while (journal_queue_is_full() && !journal_queue.is_ready)
> +		fiber_yield();

6. You check for full anyway. So as I mentioned in the comment in
applier's code, you can move all the checks into there, and do
them before creating entry and adding it to the queue. As a
'fast path'. And it would make journal_wait_queue() easier to use.

> +
> +	assert(&entry.in_queue == rlist_first(&journal_queue.waiters));
> +	rlist_del(&entry.in_queue);
> +
> +	journal_queue_wakeup_first();
> +}
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 5d8d5a726..8fec5b27e 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -124,6 +154,82 @@ struct journal {
>  		     struct journal_entry *entry);
>  };
>  
> +/**
> + * Depending on the step of recovery and instance configuration
> + * points at a concrete implementation of the journal.
> + */
> +extern struct journal *current_journal;

7. Now you don't need to move current_journal declaration.

> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry)
>  static inline int
>  journal_write_async(struct journal_entry *entry)
>  {
> +	/*
> +	 * It's the job of the caller to check whether the queue is full prior
> +	 * to submitting the request.
> +	 */
> +	assert(!journal_queue_is_full() || journal_queue.is_ready);
> +	journal_queue_on_append(entry);

8. Probably must assert that waiters list is empty. Otherwise you
could go out of order.

> +
>  	return current_journal->write_async(current_journal, entry);
>  }
>  

I tried to do some fixes on top of your branch in order to delete
the flags and some branching.

Take a look at the diff below and on top of the branch in a separate
commit.

====================
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 27ddd0f29..2586f6654 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -973,8 +973,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
 	 * This is done before opening the transaction to avoid problems with
 	 * yielding inside it.
 	 */
-	if (journal_queue_is_full() || journal_queue_has_waiters())
-		journal_wait_queue();
+	journal_queue_wait();
 
 	/**
 	 * Explicitly begin the transaction so that we can
diff --git a/src/box/journal.c b/src/box/journal.c
index 4c31a3dfe..40a5f5b1a 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -40,8 +40,7 @@ struct journal_queue journal_queue = {
 	.max_len = INT64_MAX,
 	.len = 0,
 	.waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters),
-	.is_awake = false,
-	.is_ready = false,
+	.waiter_count = 0,
 };
 
 struct journal_entry *
@@ -66,63 +65,38 @@ journal_entry_new(size_t n_rows, struct region *region,
 	return entry;
 }
 
-struct journal_queue_entry {
-	/** The fiber waiting for queue space to free. */
-	struct fiber *fiber;
-	/** A link in all waiting fibers list. */
-	struct rlist in_queue;
-};
-
-/**
- * Wake up the first waiter in the journal queue.
- */
-static inline void
-journal_queue_wakeup_first(void)
-{
-	struct journal_queue_entry *e;
-	if (rlist_empty(&journal_queue.waiters))
-		goto out;
-	/*
-	 * When the queue isn't forcefully emptied, no need to wake everyone
-	 * else up until there's some free space.
-	 */
-	if (!journal_queue.is_ready && journal_queue_is_full())
-		goto out;
-	e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e),
-			in_queue);
-	fiber_wakeup(e->fiber);
-	return;
-out:
-	journal_queue.is_awake = false;
-	journal_queue.is_ready = false;
-}
-
 void
-journal_queue_wakeup(bool force_ready)
+journal_queue_wakeup(void)
 {
-	assert(!rlist_empty(&journal_queue.waiters));
-	if (journal_queue.is_awake)
-		return;
-	journal_queue.is_awake = true;
-	journal_queue.is_ready = force_ready;
-	journal_queue_wakeup_first();
+	struct rlist *list = &journal_queue.waiters;
+	if (!rlist_empty(list) && !journal_queue_is_full())
+		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
 }
 
 void
-journal_wait_queue(void)
+journal_queue_wait(void)
 {
-	struct journal_queue_entry entry = {
-		.fiber = fiber(),
-	};
-	rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue);
+	if (!journal_queue_is_full() && !journal_queue_has_waiters())
+		return;
+	++journal_queue.waiter_count;
+	rlist_add_tail_entry(&journal_queue.waiters, fiber(), state);
 	/*
 	 * Will be waken up by either queue emptying or a synchronous write.
 	 */
-	while (journal_queue_is_full() && !journal_queue.is_ready)
+	do {
 		fiber_yield();
+	} while (journal_queue_is_full());
+	--journal_queue.waiter_count;
+	journal_queue_wakeup();
+}
 
-	assert(&entry.in_queue == rlist_first(&journal_queue.waiters));
-	rlist_del(&entry.in_queue);
-
-	journal_queue_wakeup_first();
+void
+journal_queue_flush(void)
+{
+	if (!journal_queue_has_waiters())
+		return;
+	struct rlist *list = &journal_queue.waiters;
+	while (!rlist_empty(list))
+		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
+	journal_queue_wait();
 }
diff --git a/src/box/journal.h b/src/box/journal.h
index 8fec5b27e..3b93158cf 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -128,17 +128,22 @@ struct journal_queue {
 	 * Whether the queue is being woken or not. Used to avoid multiple
 	 * concurrent wake-ups.
 	 */
-	bool is_awake;
-	/**
-	 * A flag used to tell the waiting fibers they may proceed even if the
-	 * queue is full, i.e. force them to submit a write request.
-	 */
-	bool is_ready;
+	bool waiter_count;
 };
 
 /** A single queue for all journal instances. */
 extern struct journal_queue journal_queue;
 
+/**
+ * Check whether anyone is waiting for the journal queue to empty. If there are
+ * other waiters we must go after them to preserve write order.
+ */
+static inline bool
+journal_queue_has_waiters(void)
+{
+	return journal_queue.waiter_count != 0;
+}
====================

I moved this function not on purpose. Cold be kept in
the old place.

====================
+
 /**
  * An API for an abstract journal for all transactions of this
  * instance, as well as for multiple instances in case of
@@ -166,7 +171,7 @@ extern struct journal *current_journal;
  *                    full.
  */
 void
-journal_queue_wakeup(bool force_ready);
+journal_queue_wakeup(void);
 
 /**
  * Check whether any of the queue size limits is reached.
@@ -180,27 +185,19 @@ journal_queue_is_full(void)
 	       journal_queue.len > journal_queue.max_len;
 }
 
-/**
- * Check whether anyone is waiting for the journal queue to empty. If there are
- * other waiters we must go after them to preserve write order.
- */
-static inline bool
-journal_queue_has_waiters(void)
-{
-	return !rlist_empty(&journal_queue.waiters);
-}
-
 /** Yield until there's some space in the journal queue. */
 void
-journal_wait_queue(void);
+journal_queue_wait(void);
+
+void
+journal_queue_flush(void);
 
 /** Set maximal journal queue size in bytes. */
 static inline void
 journal_queue_set_max_size(int64_t size)
 {
 	journal_queue.max_size = size;
-	if (journal_queue_has_waiters() && !journal_queue_is_full())
-		journal_queue_wakeup(false);
+	journal_queue_wakeup();
 }
 
 /** Set maximal journal queue length, in entries. */
@@ -208,8 +205,7 @@ static inline void
 journal_queue_set_max_len(int64_t len)
 {
 	journal_queue.max_len = len;
-	if (journal_queue_has_waiters() && !journal_queue_is_full())
-		journal_queue_wakeup(false);
+	journal_queue_wakeup();
 }
 
 /** Increase queue size on a new write request. */
@@ -239,8 +235,7 @@ journal_async_complete(struct journal_entry *entry)
 	assert(entry->write_async_cb != NULL);
 
 	journal_queue_on_complete(entry);
-	if (journal_queue_has_waiters() && !journal_queue_is_full())
-		journal_queue_wakeup(false);
+	journal_queue_wakeup();
 
 	entry->write_async_cb(entry);
 }
@@ -253,16 +248,7 @@ journal_async_complete(struct journal_entry *entry)
 static inline int
 journal_write(struct journal_entry *entry)
 {
-	if (journal_queue_has_waiters()) {
-		/*
-		 * It's a synchronous write, so it's fine to wait a bit more for
-		 * everyone else to be written. They'll wake us up back
-		 * afterwards.
-		 */
-		journal_queue_wakeup(true);
-		journal_wait_queue();
-	}
-
+	journal_queue_flush();
 	journal_queue_on_append(entry);
 
 	return current_journal->write(current_journal, entry);
@@ -280,7 +266,7 @@ journal_write_async(struct journal_entry *entry)
 	 * It's the job of the caller to check whether the queue is full prior
 	 * to submitting the request.
 	 */
-	assert(!journal_queue_is_full() || journal_queue.is_ready);
+	assert(journal_queue.waiter_count == 0);
 	journal_queue_on_append(entry);
 
 	return current_journal->write_async(current_journal, entry);

  parent reply	other threads:[~2021-02-26  0:56 UTC|newest]

Thread overview: 30+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-02-24 19:35 Serge Petrenko via Tarantool-patches
2021-02-24 19:40 ` Serge Petrenko via Tarantool-patches
2021-02-25 13:05 ` Konstantin Osipov via Tarantool-patches
2021-02-26  0:57   ` Vladislav Shpilevoy via Tarantool-patches
2021-02-26  7:18     ` Konstantin Osipov via Tarantool-patches
2021-02-26 20:23       ` Vladislav Shpilevoy via Tarantool-patches
2021-02-26 21:20         ` Konstantin Osipov via Tarantool-patches
2021-02-26 22:44           ` Vladislav Shpilevoy via Tarantool-patches
2021-02-27 13:27             ` Konstantin Osipov via Tarantool-patches
2021-03-01 19:15   ` Serge Petrenko via Tarantool-patches
2021-03-01 21:46     ` Konstantin Osipov via Tarantool-patches
2021-02-26  0:56 ` Vladislav Shpilevoy via Tarantool-patches [this message]
2021-03-01 19:08   ` Serge Petrenko via Tarantool-patches
2021-03-01 22:05     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-02 17:51       ` Serge Petrenko via Tarantool-patches
2021-03-03 20:59         ` Vladislav Shpilevoy via Tarantool-patches
2021-03-09 15:10           ` Serge Petrenko via Tarantool-patches
2021-03-09 19:49 ` Vladislav Shpilevoy via Tarantool-patches
2021-03-10  8:18   ` Konstantin Osipov via Tarantool-patches
2021-03-12 17:10     ` Serge Petrenko via Tarantool-patches
2021-03-13 19:14       ` Konstantin Osipov via Tarantool-patches
2021-03-15 23:42       ` Vladislav Shpilevoy via Tarantool-patches
2021-03-16  6:45         ` Konstantin Osipov via Tarantool-patches
2021-03-16 20:27           ` Vladislav Shpilevoy via Tarantool-patches
2021-03-16 10:19         ` Serge Petrenko via Tarantool-patches
2021-03-16 20:48           ` Vladislav Shpilevoy via Tarantool-patches
2021-03-17 12:14             ` Serge Petrenko via Tarantool-patches
2021-03-17 21:02           ` Vladislav Shpilevoy via Tarantool-patches
2021-03-19 11:32             ` Serge Petrenko via Tarantool-patches
2021-03-19 15:36 ` Kirill Yukhin via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=469ef629-699c-0180-facb-f166d6d3c073@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=gorcunov@gmail.com \
    --cc=sergepetrenko@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox