[Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes
Serge Petrenko
sergepetrenko at tarantool.org
Tue Feb 16 15:47:09 MSK 2021
15.02.2021 14:17, Cyrill Gorcunov пишет:
> On Thu, Feb 11, 2021 at 03:17:50PM +0300, Serge Petrenko wrote:
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 553db76fc..06aaa0a79 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -967,6 +967,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>> goto success;
>> }
>>
>> + /*
>> + * Do not spam WAL with excess write requests, let it process what's
>> + * piled up first.
>> + * This is done before opening the transaction to avoid problems with
>> + * yielding inside it.
>> + */
>> + if (journal_queue_is_full(current_journal))
>> + journal_wait_queue();
>> +
> Serge, just a few comments, feel free to ignore.
Hi! Thanks for the review!
>
> Maybe it would be better to pass current_journal to journal_wait_queue,
> otherwise it looks somehow inconsistent, no?
>
> if (journal_queue_is_full(current_journal))
> journal_wait_queue(current_journal);
I tried to remove parameters from almost all methods, as dicsussed in chat.
>
> Actually I would name journal queue engine as plain journalq, this would
> look like
>
> if (journalq_is_full(current_journal))
> journalq_wait(current_journal);
>
> But it is very humble pov, lets stick with long `journal_queue`.
> ...
To be honest, I like `journal_queue_` prefix more.
>> +/** Wake the journal queue up. */
>> +static inline void
>> +journal_queue_wakeup(struct journal *j, bool force_ready)
>> +{
>> + assert(j == current_journal);
> Seems this assert is not needed. The overall idea of passing
> journal as an argument is quite the reverse, ie to work with
> any journal. This is not a blocker, could be cleaned up on top
> or simply ignored.
Same as above, discussed verbally.
>
>> + assert(!rlist_empty(&j->waiters));
>> + if (j->queue_is_woken)
>> + return;
>> + j->queue_is_woken = true;
>> + journal_queue_wakeup_next(&j->waiters, force_ready);
>> +}
>> +
>> +/**
>> + * Check whether any of the queue size limits is reached.
>> + * If the queue is full, we must wait for some of the entries to be written
>> + * before proceeding with a new asynchronous write request.
>> + */
>> +static inline bool
>> +journal_queue_is_full(struct journal *j)
>> +{
>> + assert(j == current_journal);
> same, no need for assert()
>
>> + return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) ||
>> + (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len);
>> +}
>> +
>> +/**
>> + * Check whether anyone is waiting for the journal queue to empty. If there are
>> + * other waiters we must go after them to preserve write order.
>> + */
>> +static inline bool
>> +journal_queue_has_waiters(struct journal *j)
>> +{
>> + assert(j == current_journal);
> same, no need for assert()
>
>> + return !rlist_empty(&j->waiters);
>> +}
>> +
>> +/** Yield until there's some space in the journal queue. */
>> +void
>> +journal_wait_queue(void);
>> +
>> /**
>> * Complete asynchronous write.
>> */
>> @@ -131,15 +199,15 @@ static inline void
>> journal_async_complete(struct journal_entry *entry)
>> {
>> assert(entry->write_async_cb != NULL);
>> + current_journal->queue_len--;
>> + current_journal->queue_size -= entry->approx_len;
> Myabe worth to make queue ops closed into some helper? Because
> length and size can't be updated without a tangle. IOW, something
> like
>
> static inline void
> journal_queue_attr_dec(struct journal *j, struct journal_entry *entry)
> {
> j->queue_len--;
> j->queue_size -= entry->approx_len;
> }
>
> static inline void
> journal_queue_attr_inc(struct journal *j, struct journal_entry *entry)
> {
> j->queue_len++;
> j->queue_size += entry->approx_len;
> }
>
> Again, this is my pov, **free to ignore**. attr here stands for
> attributes because queue_len and queue_size are not the queue
> itself but attributes which controls when we need to wait
> data to be flushed.
Ok, sure. Introduced
`journal_queue_on_append(struct journal_entry *entry)`
and
`journal_queue_on_complete(struct journal_entry *entry)`
>
>> + assert(current_journal->queue_len >= 0);
>> + assert(current_journal->queue_size >= 0);
>> + if (journal_queue_has_waiters(current_journal))
>> + journal_queue_wakeup(current_journal, false);
>> entry->write_async_cb(entry);
>> }
Here's an incremental diff. It's all pure refactoring with no functional
changes.
I've intdouced `journal_queue_on_append` and `journal_queue_on_complete`
for increasing and
decreasing queue length and size, and tried to remove `struct journal`
parameter from almost
every new method, except `journal_queue_set_max_size` and
`journal_queue_set_max_len`
====================================================
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 06aaa0a79..7c2452d2b 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -973,7 +973,7 @@ applier_apply_tx(struct applier *applier, struct
stailq *rows)
* This is done before opening the transaction to avoid problems with
* yielding inside it.
*/
- if (journal_queue_is_full(current_journal))
+ if (journal_queue_is_full())
journal_wait_queue();
/**
diff --git a/src/box/journal.c b/src/box/journal.c
index 19a184580..49441e596 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -68,7 +68,7 @@ struct journal_queue_entry {
/**
* Wake up the next waiter in journal queue.
*/
-void
+static inline void
journal_queue_wakeup_next(struct rlist *link, bool force_ready)
{
/* Empty queue or last entry in queue. */
@@ -80,16 +80,26 @@ journal_queue_wakeup_next(struct rlist *link, bool
force_ready)
* When the queue isn't forcefully emptied, no need to wake everyone
* else up until there's some free space.
*/
- if (journal_queue_is_full(current_journal) && !force_ready) {
+ if (!force_ready && journal_queue_is_full()) {
current_journal->queue_is_woken = false;
return;
}
struct journal_queue_entry *e = rlist_entry(rlist_next(link),
typeof(*e),
in_queue);
- e->is_ready |= force_ready;
+ e->is_ready = force_ready;
fiber_wakeup(e->fiber);
}
+void
+journal_queue_wakeup(bool force_ready)
+{
+ assert(!rlist_empty(¤t_journal->waiters));
+ if (current_journal->queue_is_woken)
+ return;
+ current_journal->queue_is_woken = true;
+ journal_queue_wakeup_next(¤t_journal->waiters, force_ready);
+}
+
void
journal_wait_queue(void)
{
@@ -101,7 +111,7 @@ journal_wait_queue(void)
/*
* Will be waken up by either queue emptying or a synchronous write.
*/
- while (journal_queue_is_full(current_journal) && !entry.is_ready)
+ while (journal_queue_is_full() && !entry.is_ready)
fiber_yield();
journal_queue_wakeup_next(&entry.in_queue, entry.is_ready);
diff --git a/src/box/journal.h b/src/box/journal.h
index 9c8af062a..d295dfa4b 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -149,20 +149,9 @@ struct journal {
*/
extern struct journal *current_journal;
-void
-journal_queue_wakeup_next(struct rlist *link, bool force_ready);
-
/** Wake the journal queue up. */
-static inline void
-journal_queue_wakeup(struct journal *j, bool force_ready)
-{
- assert(j == current_journal);
- assert(!rlist_empty(&j->waiters));
- if (j->queue_is_woken)
- return;
- j->queue_is_woken = true;
- journal_queue_wakeup_next(&j->waiters, force_ready);
-}
+void
+journal_queue_wakeup(bool force_ready);
/**
* Check whether any of the queue size limits is reached.
@@ -170,9 +159,9 @@ journal_queue_wakeup(struct journal *j, bool
force_ready)
* before proceeding with a new asynchronous write request.
*/
static inline bool
-journal_queue_is_full(struct journal *j)
+journal_queue_is_full(void)
{
- assert(j == current_journal);
+ struct journal *j = current_journal;
return (j->queue_max_size != 0 && j->queue_size >=
j->queue_max_size) ||
(j->queue_max_len != 0 && j->queue_len >= j->queue_max_len);
}
@@ -182,16 +171,53 @@ journal_queue_is_full(struct journal *j)
* other waiters we must go after them to preserve write order.
*/
static inline bool
-journal_queue_has_waiters(struct journal *j)
+journal_queue_has_waiters(void)
{
- assert(j == current_journal);
- return !rlist_empty(&j->waiters);
+ return !rlist_empty(¤t_journal->waiters);
}
/** Yield until there's some space in the journal queue. */
void
journal_wait_queue(void);
+/** Set maximal journal queue size in bytes. */
+static inline void
+journal_queue_set_max_size(struct journal *j, int64_t size)
+{
+ assert(j == current_journal);
+ j->queue_max_size = size;
+ if (journal_queue_has_waiters() && !journal_queue_is_full())
+ journal_queue_wakeup(false);
+}
+
+/** Set maximal journal queue length, in entries. */
+static inline void
+journal_queue_set_max_len(struct journal *j, int64_t len)
+{
+ assert(j == current_journal);
+ j->queue_max_len = len;
+ if (journal_queue_has_waiters() && !journal_queue_is_full())
+ journal_queue_wakeup(false);
+}
+
+/** Increase queue size on a new write request. */
+static inline void
+journal_queue_on_append(struct journal_entry *entry)
+{
+ current_journal->queue_len++;
+ current_journal->queue_size += entry->approx_len;
+}
+
+/** Decrease queue size once write request is complete. */
+static inline void
+journal_queue_on_complete(struct journal_entry *entry)
+{
+ current_journal->queue_len--;
+ current_journal->queue_size -= entry->approx_len;
+ assert(current_journal->queue_len >= 0);
+ assert(current_journal->queue_size >= 0);
+}
+
/**
* Complete asynchronous write.
*/
@@ -199,12 +225,11 @@ static inline void
journal_async_complete(struct journal_entry *entry)
{
assert(entry->write_async_cb != NULL);
- current_journal->queue_len--;
- current_journal->queue_size -= entry->approx_len;
- assert(current_journal->queue_len >= 0);
- assert(current_journal->queue_size >= 0);
- if (journal_queue_has_waiters(current_journal))
- journal_queue_wakeup(current_journal, false);
+
+ journal_queue_on_complete(entry);
+ if (journal_queue_has_waiters() && !journal_queue_is_full())
+ journal_queue_wakeup(false);
+
entry->write_async_cb(entry);
}
@@ -216,17 +241,18 @@ journal_async_complete(struct journal_entry *entry)
static inline int
journal_write(struct journal_entry *entry)
{
- if (journal_queue_has_waiters(current_journal)) {
+ if (journal_queue_has_waiters()) {
/*
* It's a synchronous write, so it's fine to wait a bit more for
* everyone else to be written. They'll wake us up back
* afterwards.
*/
- journal_queue_wakeup(current_journal, true);
+ journal_queue_wakeup(true);
journal_wait_queue();
}
- current_journal->queue_size += entry->approx_len;
- current_journal->queue_len += 1;
+
+ journal_queue_on_append(entry);
+
return current_journal->write(current_journal, entry);
}
@@ -242,8 +268,8 @@ journal_write_async(struct journal_entry *entry)
* It's the job of the caller to check whether the queue is full prior
* to submitting the request.
*/
- current_journal->queue_size += entry->approx_len;
- current_journal->queue_len += 1;
+ journal_queue_on_append(entry);
+
return current_journal->write_async(current_journal, entry);
}
diff --git a/src/box/wal.c b/src/box/wal.c
index 9fff4220a..5bc7a0685 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -768,19 +768,13 @@ wal_set_checkpoint_threshold(int64_t threshold)
void
wal_set_queue_max_size(int64_t size)
{
- struct journal *base = &wal_writer_singleton.base;
- base->queue_max_size = size;
- if (journal_queue_has_waiters(base) && !journal_queue_is_full(base))
- journal_queue_wakeup(base, false);
+ journal_queue_set_max_size(&wal_writer_singleton.base, size);
}
void
wal_set_queue_max_len(int64_t len)
{
- struct journal *base = &wal_writer_singleton.base;
- base->queue_max_len = len;
- if (journal_queue_has_waiters(base) && !journal_queue_is_full(base))
- journal_queue_wakeup(base, false);
+ journal_queue_set_max_len(&wal_writer_singleton.base, len);
}
struct wal_gc_msg
--
Serge Petrenko
More information about the Tarantool-patches
mailing list