[Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes

Serge Petrenko sergepetrenko at tarantool.org
Mon Mar 1 22:08:06 MSK 2021



26.02.2021 03:56, Vladislav Shpilevoy пишет:
> Hi! Thanks for the patch!
>
> See 8 comments below, and my diff in the end of the email and on
> top of the branch.

Thanks for the review & the fixes!

>
> On 24.02.2021 20:35, Serge Petrenko via Tarantool-patches wrote:
>> Since the introduction of asynchronous commit, which doesn't wait for a
>> WAL write to succeed, it's quite easy to clog WAL with huge amounts
>> write requests. For now, it's only possible from an applier, since it's
>> the only user of async commit at the moment.
>>
>> This happens when replica is syncing with master and reads new
>> transactions at a pace higher than it can write them to WAL (see docbot
>> request for detailed explanation).
>>
>> To ameliorate such behavior, we need to introduce some limit on
>> not-yet-finished WAL write requests. This is what this commit is trying
>> to do.
>> Two new counters are added to wal writer: queue_size (in bytes) and
>> queue_len (in wal messages) together with configuration settings:
>> `wal_queue_max_size` and `wal_queue_max_len`.
>> Size and length are increased on every new submitted request, and are
>> decreased once the tx receives a confirmation that a specific request
>> was written.
>> Actually, the limits are added to an abstract journal, but take effect
>> only for wal writer.
> 1. Now the limits affect any current journal, regardless of its type.
> Although they really work only for WAL, because only WAL journal
> yields AFAIK. Others just 'commit' immediately.

Correct.
I've reworded this piece.

>
>> ---
>> https://github.com/tarantool/tarantool/issues/5536
>> https://github.com/tarantool/tarantool/tree/sp/gh-5536-replica-oom
>>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 553db76fc..27ddd0f29 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>>   		goto success;
>>   	}
>>   
>> +	/*
>> +	 * Do not spam WAL with excess write requests, let it process what's
>> +	 * piled up first.
>> +	 * This is done before opening the transaction to avoid problems with
>> +	 * yielding inside it.
>> +	 */
>> +	if (journal_queue_is_full() || journal_queue_has_waiters())
>> +		journal_wait_queue();
> 2. Perhaps simply move both checks into journal_wait_queue(). Seems like
> an internal thing for the queue's API.

Looks good.

>
>> diff --git a/src/box/journal.c b/src/box/journal.c
>> index cb320b557..4c31a3dfe 100644
>> --- a/src/box/journal.c
>> +++ b/src/box/journal.c
>> @@ -34,6 +34,16 @@
>>   
>>   struct journal *current_journal = NULL;
>>   
>> +struct journal_queue journal_queue = {
>> +	.max_size = INT64_MAX,
>> +	.size = 0,
>> +	.max_len = INT64_MAX,
>> +	.len = 0,
>> +	.waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters),
>> +	.is_awake = false,
>> +	.is_ready = false,
>> +};
> 3. Kostja might be right about most of the queue's code being a good
> candidate for an extraction to libcore. So the queue itself would
> be the semaphore + queue size and len parameters. But up to you.

I'm not sure I get it. It would be a counting semaphore, if we had a 
single limit,
say, only entry count, or only entry size. But we have both.
So it's not a "normal" counting semaphore. And if not, why extract it as 
a generic
primitive?

Moreover, the limits are now 'soft'. As discussed verbally, we'll wake 
the entry up
and let it proceed, if we see, that the queue isn't full at the time of 
wake up.
But it may be full again once the fiber is actually put to execution. 
And we ignore
this. So it's a "soft counting semaphore with 2 resources".

>
>> +
>>   struct journal_entry *
>>   journal_entry_new(size_t n_rows, struct region *region,
>>   		  journal_write_async_f write_async_cb,
>> @@ -55,3 +65,64 @@ journal_entry_new(size_t n_rows, struct region *region,
>>   			     complete_data);
>>   	return entry;
>>   }
>> +
>> +struct journal_queue_entry {
>> +	/** The fiber waiting for queue space to free. */
>> +	struct fiber *fiber;
>> +	/** A link in all waiting fibers list. */
>> +	struct rlist in_queue;
> 4. I see fiber_cond uses fiber->state member. Can we do the same here?
> Because the queue is not much different from fiber_cond. Both are
> built on top of fiber API. Which means the 'state' might be usable in
> the queue as well.

It's a good idea, thanks! Applied with minor changes, described below.

>
>> +};
>> +
>> +/**
>> + * Wake up the first waiter in the journal queue.
>> + */
>> +static inline void
>> +journal_queue_wakeup_first(void)
>> +{
>> +	struct journal_queue_entry *e;
>> +	if (rlist_empty(&journal_queue.waiters))
>> +		goto out;
>> +	/*
>> +	 * When the queue isn't forcefully emptied, no need to wake everyone
>> +	 * else up until there's some free space.
>> +	 */
>> +	if (!journal_queue.is_ready && journal_queue_is_full())
>> +		goto out;
>> +	e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e),
>> +			in_queue);
> 5. Why didn't rlist_first_entry() work?

It sure would work. Just a misprint, thanks for pointing this out!

>
>> +	fiber_wakeup(e->fiber);
>> +	return;
>> +out:
>> +	journal_queue.is_awake = false;
>> +	journal_queue.is_ready = false;
>> +}
>> +
>> +void
>> +journal_queue_wakeup(bool force_ready)
>> +{
>> +	assert(!rlist_empty(&journal_queue.waiters));
>> +	if (journal_queue.is_awake)
>> +		return;
>> +	journal_queue.is_awake = true;
>> +	journal_queue.is_ready = force_ready;
>> +	journal_queue_wakeup_first();
>> +}
>> +
>> +void
>> +journal_wait_queue(void)
>> +{
>> +	struct journal_queue_entry entry = {
>> +		.fiber = fiber(),
>> +	};
>> +	rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue);
>> +	/*
>> +	 * Will be waken up by either queue emptying or a synchronous write.
>> +	 */
>> +	while (journal_queue_is_full() && !journal_queue.is_ready)
>> +		fiber_yield();
> 6. You check for full anyway. So as I mentioned in the comment in
> applier's code, you can move all the checks into there, and do
> them before creating entry and adding it to the queue. As a
> 'fast path'. And it would make journal_wait_queue() easier to use.

Ok

>
>> +
>> +	assert(&entry.in_queue == rlist_first(&journal_queue.waiters));
>> +	rlist_del(&entry.in_queue);
>> +
>> +	journal_queue_wakeup_first();
>> +}
>> diff --git a/src/box/journal.h b/src/box/journal.h
>> index 5d8d5a726..8fec5b27e 100644
>> --- a/src/box/journal.h
>> +++ b/src/box/journal.h
>> @@ -124,6 +154,82 @@ struct journal {
>>   		     struct journal_entry *entry);
>>   };
>>   
>> +/**
>> + * Depending on the step of recovery and instance configuration
>> + * points at a concrete implementation of the journal.
>> + */
>> +extern struct journal *current_journal;
> 7. Now you don't need to move current_journal declaration.

Moved that back.

>
>> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry)
>>   static inline int
>>   journal_write_async(struct journal_entry *entry)
>>   {
>> +	/*
>> +	 * It's the job of the caller to check whether the queue is full prior
>> +	 * to submitting the request.
>> +	 */
>> +	assert(!journal_queue_is_full() || journal_queue.is_ready);
>> +	journal_queue_on_append(entry);
> 8. Probably must assert that waiters list is empty. Otherwise you
> could go out of order.

It's not empty by the time the first entry gets to 'journal_write_async'.
Everyone else is waken up, but not yet removed from the queue.

Looks like we cannot determine whether a write is called after waiting 
in queue
or not.

>
>> +
>>   	return current_journal->write_async(current_journal, entry);
>>   }
>>   
> I tried to do some fixes on top of your branch in order to delete
> the flags and some branching.
>
> Take a look at the diff below and on top of the branch in a separate
> commit.

Thanks! Looks good, with my changes on top. I squashed everything into one
commit and updated the branch.

============================================

diff --git a/src/box/journal.c b/src/box/journal.c
index 40a5f5b1a..92a773684 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -83,9 +83,7 @@ journal_queue_wait(void)
      /*
       * Will be waken up by either queue emptying or a synchronous write.
       */
-    do {
-        fiber_yield();
-    } while (journal_queue_is_full());
+    fiber_yield();
      --journal_queue.waiter_count;
      journal_queue_wakeup();
  }
diff --git a/src/box/journal.h b/src/box/journal.h
index 3b93158cf..b5d587e3a 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -124,26 +124,13 @@ struct journal_queue {
       * entered the queue.
       */
      struct rlist waiters;
-    /**
-     * Whether the queue is being woken or not. Used to avoid multiple
-     * concurrent wake-ups.
-     */
-    bool waiter_count;
+    /** How many waiters there are in a queue. */
+    int waiter_count;
  };

  /** A single queue for all journal instances. */
  extern struct journal_queue journal_queue;

-/**
- * Check whether anyone is waiting for the journal queue to empty. If 
there are
- * other waiters we must go after them to preserve write order.
- */
-static inline bool
-journal_queue_has_waiters(void)
-{
-    return journal_queue.waiter_count != 0;
-}
-

============================================
Returned this func back to its original place.
============================================

  /**
   * An API for an abstract journal for all transactions of this
   * instance, as well as for multiple instances in case of
@@ -173,6 +160,16 @@ extern struct journal *current_journal;
  void
  journal_queue_wakeup(void);

+/**
+ * Check whether anyone is waiting for the journal queue to empty. If 
there are
+ * other waiters we must go after them to preserve write order.
+ */
+static inline bool
+journal_queue_has_waiters(void)
+{
+    return journal_queue.waiter_count != 0;
+}
+
  /**
   * Check whether any of the queue size limits is reached.
   * If the queue is full, we must wait for some of the entries to be 
written
@@ -235,7 +232,6 @@ journal_async_complete(struct journal_entry *entry)
      assert(entry->write_async_cb != NULL);

      journal_queue_on_complete(entry);
-    journal_queue_wakeup();

============================================
Let's wake the queue up once the whole batch is processed.
This way we avoid excess checks and waking every waiter in queue
up in case the queue is not full (each wakeup wakes the first entry up
and removes it from the list, so a ton of wake-ups would wake up every
entry there is).
============================================

      entry->write_async_cb(entry);
  }
@@ -266,7 +262,6 @@ journal_write_async(struct journal_entry *entry)
       * It's the job of the caller to check whether the queue is full prior
       * to submitting the request.
       */
-    assert(journal_queue.waiter_count == 0);
      journal_queue_on_append(entry);

============================================
As I mentioned above, we cannot assert queue hasn't got waiters here.
============================================

      return current_journal->write_async(current_journal, entry);
diff --git a/src/box/wal.c b/src/box/wal.c
index 328ab092d..7829ccc95 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -274,6 +274,8 @@ tx_schedule_queue(struct stailq *queue)
      struct journal_entry *req, *tmp;
      stailq_foreach_entry_safe(req, tmp, queue, fifo)
          journal_async_complete(req);
+
+    journal_queue_wakeup();
  }

  /**

============================================
>
> ====================
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 27ddd0f29..2586f6654 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -973,8 +973,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>   	 * This is done before opening the transaction to avoid problems with
>   	 * yielding inside it.
>   	 */
> -	if (journal_queue_is_full() || journal_queue_has_waiters())
> -		journal_wait_queue();
> +	journal_queue_wait();
>   
>   	/**
>   	 * Explicitly begin the transaction so that we can
> diff --git a/src/box/journal.c b/src/box/journal.c
> index 4c31a3dfe..40a5f5b1a 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -40,8 +40,7 @@ struct journal_queue journal_queue = {
>   	.max_len = INT64_MAX,
>   	.len = 0,
>   	.waiters = RLIST_HEAD_INITIALIZER(journal_queue.waiters),
> -	.is_awake = false,
> -	.is_ready = false,
> +	.waiter_count = 0,
>   };
>   
>   struct journal_entry *
> @@ -66,63 +65,38 @@ journal_entry_new(size_t n_rows, struct region *region,
>   	return entry;
>   }
>   
> -struct journal_queue_entry {
> -	/** The fiber waiting for queue space to free. */
> -	struct fiber *fiber;
> -	/** A link in all waiting fibers list. */
> -	struct rlist in_queue;
> -};
> -
> -/**
> - * Wake up the first waiter in the journal queue.
> - */
> -static inline void
> -journal_queue_wakeup_first(void)
> -{
> -	struct journal_queue_entry *e;
> -	if (rlist_empty(&journal_queue.waiters))
> -		goto out;
> -	/*
> -	 * When the queue isn't forcefully emptied, no need to wake everyone
> -	 * else up until there's some free space.
> -	 */
> -	if (!journal_queue.is_ready && journal_queue_is_full())
> -		goto out;
> -	e = rlist_entry(rlist_first(&journal_queue.waiters), typeof(*e),
> -			in_queue);
> -	fiber_wakeup(e->fiber);
> -	return;
> -out:
> -	journal_queue.is_awake = false;
> -	journal_queue.is_ready = false;
> -}
> -
>   void
> -journal_queue_wakeup(bool force_ready)
> +journal_queue_wakeup(void)
>   {
> -	assert(!rlist_empty(&journal_queue.waiters));
> -	if (journal_queue.is_awake)
> -		return;
> -	journal_queue.is_awake = true;
> -	journal_queue.is_ready = force_ready;
> -	journal_queue_wakeup_first();
> +	struct rlist *list = &journal_queue.waiters;
> +	if (!rlist_empty(list) && !journal_queue_is_full())
> +		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
>   }
>   
>   void
> -journal_wait_queue(void)
> +journal_queue_wait(void)
>   {
> -	struct journal_queue_entry entry = {
> -		.fiber = fiber(),
> -	};
> -	rlist_add_tail_entry(&journal_queue.waiters, &entry, in_queue);
> +	if (!journal_queue_is_full() && !journal_queue_has_waiters())
> +		return;
> +	++journal_queue.waiter_count;
> +	rlist_add_tail_entry(&journal_queue.waiters, fiber(), state);
>   	/*
>   	 * Will be waken up by either queue emptying or a synchronous write.
>   	 */
> -	while (journal_queue_is_full() && !journal_queue.is_ready)
> +	do {
>   		fiber_yield();
> +	} while (journal_queue_is_full());
> +	--journal_queue.waiter_count;
> +	journal_queue_wakeup();
> +}
>   
> -	assert(&entry.in_queue == rlist_first(&journal_queue.waiters));
> -	rlist_del(&entry.in_queue);
> -
> -	journal_queue_wakeup_first();
> +void
> +journal_queue_flush(void)
> +{
> +	if (!journal_queue_has_waiters())
> +		return;
> +	struct rlist *list = &journal_queue.waiters;
> +	while (!rlist_empty(list))
> +		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
> +	journal_queue_wait();
>   }
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 8fec5b27e..3b93158cf 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -128,17 +128,22 @@ struct journal_queue {
>   	 * Whether the queue is being woken or not. Used to avoid multiple
>   	 * concurrent wake-ups.
>   	 */
> -	bool is_awake;
> -	/**
> -	 * A flag used to tell the waiting fibers they may proceed even if the
> -	 * queue is full, i.e. force them to submit a write request.
> -	 */
> -	bool is_ready;
> +	bool waiter_count;
>   };
>   
>   /** A single queue for all journal instances. */
>   extern struct journal_queue journal_queue;
>   
> +/**
> + * Check whether anyone is waiting for the journal queue to empty. If there are
> + * other waiters we must go after them to preserve write order.
> + */
> +static inline bool
> +journal_queue_has_waiters(void)
> +{
> +	return journal_queue.waiter_count != 0;
> +}
> ====================
>
> I moved this function not on purpose. Cold be kept in
> the old place.
>
> ====================
> +
>   /**
>    * An API for an abstract journal for all transactions of this
>    * instance, as well as for multiple instances in case of
> @@ -166,7 +171,7 @@ extern struct journal *current_journal;
>    *                    full.
>    */
>   void
> -journal_queue_wakeup(bool force_ready);
> +journal_queue_wakeup(void);
>   
>   /**
>    * Check whether any of the queue size limits is reached.
> @@ -180,27 +185,19 @@ journal_queue_is_full(void)
>   	       journal_queue.len > journal_queue.max_len;
>   }
>   
> -/**
> - * Check whether anyone is waiting for the journal queue to empty. If there are
> - * other waiters we must go after them to preserve write order.
> - */
> -static inline bool
> -journal_queue_has_waiters(void)
> -{
> -	return !rlist_empty(&journal_queue.waiters);
> -}
> -
>   /** Yield until there's some space in the journal queue. */
>   void
> -journal_wait_queue(void);
> +journal_queue_wait(void);
> +
> +void
> +journal_queue_flush(void);
>   
>   /** Set maximal journal queue size in bytes. */
>   static inline void
>   journal_queue_set_max_size(int64_t size)
>   {
>   	journal_queue.max_size = size;
> -	if (journal_queue_has_waiters() && !journal_queue_is_full())
> -		journal_queue_wakeup(false);
> +	journal_queue_wakeup();
>   }
>   
>   /** Set maximal journal queue length, in entries. */
> @@ -208,8 +205,7 @@ static inline void
>   journal_queue_set_max_len(int64_t len)
>   {
>   	journal_queue.max_len = len;
> -	if (journal_queue_has_waiters() && !journal_queue_is_full())
> -		journal_queue_wakeup(false);
> +	journal_queue_wakeup();
>   }
>   
>   /** Increase queue size on a new write request. */
> @@ -239,8 +235,7 @@ journal_async_complete(struct journal_entry *entry)
>   	assert(entry->write_async_cb != NULL);
>   
>   	journal_queue_on_complete(entry);
> -	if (journal_queue_has_waiters() && !journal_queue_is_full())
> -		journal_queue_wakeup(false);
> +	journal_queue_wakeup();
>   
>   	entry->write_async_cb(entry);
>   }
> @@ -253,16 +248,7 @@ journal_async_complete(struct journal_entry *entry)
>   static inline int
>   journal_write(struct journal_entry *entry)
>   {
> -	if (journal_queue_has_waiters()) {
> -		/*
> -		 * It's a synchronous write, so it's fine to wait a bit more for
> -		 * everyone else to be written. They'll wake us up back
> -		 * afterwards.
> -		 */
> -		journal_queue_wakeup(true);
> -		journal_wait_queue();
> -	}
> -
> +	journal_queue_flush();
>   	journal_queue_on_append(entry);
>   
>   	return current_journal->write(current_journal, entry);
> @@ -280,7 +266,7 @@ journal_write_async(struct journal_entry *entry)
>   	 * It's the job of the caller to check whether the queue is full prior
>   	 * to submitting the request.
>   	 */
> -	assert(!journal_queue_is_full() || journal_queue.is_ready);
> +	assert(journal_queue.waiter_count == 0);
>   	journal_queue_on_append(entry);
>   
>   	return current_journal->write_async(current_journal, entry);

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list