[Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Wed Feb 17 23:46:05 MSK 2021
Hi! Thanks for the patch!
Now looks cool indeed.
Another raw idea on which I don't insist and not even sure it is
good. But just came to my mind: how about making a separate
object called 'journal_queue'? Or 'journal_ctl'? Which is global
and is not inside of one journal. It can't be changed to another
queue/ctl, and is used by journal API.
So we wouldn't need to worry if we configured the correct journal
because now current_journal can change at runtime, but this ctl
thing - can't.
Another option - call this thing 'journal', and rename the old
'journal' to 'journal_storage' or 'journal_api' or 'journal_vtab'
or something like this.
Another option - ignore this, since it does not matter much. But
just in case you would want to try to fit the solution into one
of these ideas.
See 8 comments below.
> diff --git a/src/box/journal.c b/src/box/journal.c
> index cb320b557..49441e596 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -55,3 +55,66 @@ journal_entry_new(size_t n_rows, struct region *region,
> complete_data);
> return entry;
> }
> +
> +struct journal_queue_entry {
> + /** The fiber waiting for queue space to free. */
> + struct fiber *fiber;
> + /** Whether the fiber should be waken up regardless of queue size. */
> + bool is_ready;
> + /** A link in all waiting fibers list. */
> + struct rlist in_queue;
> +};
> +
> +/**
> + * Wake up the next waiter in journal queue.
> + */
> +static inline void
> +journal_queue_wakeup_next(struct rlist *link, bool force_ready)
1. The flag is known in all usage places at compilation time. Is it
possible to split the function into force/normal versions? The same
for journal_queue_wakeup() from which this runtime uncertainty arises.
Also it is worth adding a comment why is force mode even needed.
> +{
> + /* Empty queue or last entry in queue. */
> + if (link == rlist_last(¤t_journal->waiters)) {
2. I am not sure I understand what is happening here. Why is this
function in one place called with the pointer at the list itself,
and in another place with the pointer at one element?
> + current_journal->queue_is_woken = false;
> + return;
> + }
> + /*
> + * When the queue isn't forcefully emptied, no need to wake everyone
> + * else up until there's some free space.
> + */
> + if (!force_ready && journal_queue_is_full()) {
> + current_journal->queue_is_woken = false;
3. Maybe woken -> awake?
4. Why do you need the flag? Can you just remove the awake entries
from the queue right away? Then it wouldn't even be possible to make
a double wakeup. See comment 5.
> + return;
> + }
> + struct journal_queue_entry *e = rlist_entry(rlist_next(link), typeof(*e),
> + in_queue);
> + e->is_ready = force_ready;
> + fiber_wakeup(e->fiber);
> +}
> +
> +void
> +journal_queue_wakeup(bool force_ready)
> +{
> + assert(!rlist_empty(¤t_journal->waiters));
> + if (current_journal->queue_is_woken)
> + return;
> + current_journal->queue_is_woken = true;
> + journal_queue_wakeup_next(¤t_journal->waiters, force_ready);
> +}
> +
> +void
> +journal_wait_queue(void)
> +{
> + struct journal_queue_entry entry = {
> + .fiber = fiber(),
> + .is_ready = false,
> + };
> + rlist_add_tail_entry(¤t_journal->waiters, &entry, in_queue);
> + /*
> + * Will be waken up by either queue emptying or a synchronous write.
> + */
> + while (journal_queue_is_full() && !entry.is_ready)
> + fiber_yield();
> +
> + journal_queue_wakeup_next(&entry.in_queue, entry.is_ready);
> + assert(&entry.in_queue == rlist_first(¤t_journal->waiters));
> + rlist_del(&entry.in_queue);
5. Can rlist_del be done along with fiber_wakeup()? Then you
wouldn't need is_woken maybe.
> +}
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 5d8d5a726..d295dfa4b 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -124,6 +143,81 @@ struct journal {
> struct journal_entry *entry);
> };
>
> +/**
> + * Depending on the step of recovery and instance configuration
> + * points at a concrete implementation of the journal.
> + */
> +extern struct journal *current_journal;
> +
> +/** Wake the journal queue up. */
> +void
> +journal_queue_wakeup(bool force_ready);
> +
> +/**
> + * Check whether any of the queue size limits is reached.
> + * If the queue is full, we must wait for some of the entries to be written
> + * before proceeding with a new asynchronous write request.
> + */
> +static inline bool
> +journal_queue_is_full(void)
> +{
> + struct journal *j = current_journal;
> + return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) ||
> + (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len);
6. Seems like a lot of checks. Option 1: make queue_max_size = INT64_MAX
when user passes 0. Then no need to check for != 0. The same for queue_max_len.
Option 2 which may be stupid (but combined with option 1): store a flag
'is_full' and update it when update queue_size and queue_len and see they
exceeded the limit. But I am not sure it reduces number of branches. Didn't
check.
> +}
> +
> +/**
> + * Check whether anyone is waiting for the journal queue to empty. If there are
> + * other waiters we must go after them to preserve write order.
> + */
> +static inline bool
> +journal_queue_has_waiters(void)
> +{
> + return !rlist_empty(¤t_journal->waiters);
> +}
> +
> +/** Yield until there's some space in the journal queue. */
> +void
> +journal_wait_queue(void);
> +
> +/** Set maximal journal queue size in bytes. */
> +static inline void
> +journal_queue_set_max_size(struct journal *j, int64_t size)
7. Why do we have journal parameter here, but don't have it in
the other functions? The same journal_queue_set_max_len.
> +{
> + assert(j == current_journal);
> + j->queue_max_size = size;
> + if (journal_queue_has_waiters() && !journal_queue_is_full())
> + journal_queue_wakeup(false);
> +}
> @@ -159,6 +264,12 @@ journal_write(struct journal_entry *entry)
> static inline int
> journal_write_async(struct journal_entry *entry)
> {
> + /*
> + * It's the job of the caller to check whether the queue is full prior
> + * to submitting the request.
8. Maybe add an assert though.
> + */
> + journal_queue_on_append(entry);
> +
> return current_journal->write_async(current_journal, entry);
> }
More information about the Tarantool-patches
mailing list