[Tarantool-patches] [PATCH v2] wal: introduce limits on simultaneous writes

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Feb 17 23:46:05 MSK 2021


Hi! Thanks for the patch!

Now looks cool indeed.

Another raw idea on which I don't insist and not even sure it is
good. But just came to my mind: how about making a separate
object called 'journal_queue'? Or 'journal_ctl'?  Which is global
and is not inside of one journal. It can't be changed to another
queue/ctl, and is used by journal API.

So we wouldn't need to worry if we configured the correct journal
because now current_journal can change at runtime, but this ctl
thing - can't.

Another option - call this thing 'journal', and rename the old
'journal' to 'journal_storage' or 'journal_api' or 'journal_vtab'
or something like this.

Another option - ignore this, since it does not matter much. But
just in case you would want to try to fit the solution into one
of these ideas.

See 8 comments below.

> diff --git a/src/box/journal.c b/src/box/journal.c
> index cb320b557..49441e596 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -55,3 +55,66 @@ journal_entry_new(size_t n_rows, struct region *region,
>  			     complete_data);
>  	return entry;
>  }
> +
> +struct journal_queue_entry {
> +	/** The fiber waiting for queue space to free. */
> +	struct fiber *fiber;
> +	/** Whether the fiber should be waken up regardless of queue size. */
> +	bool is_ready;
> +	/** A link in all waiting fibers list. */
> +	struct rlist in_queue;
> +};
> +
> +/**
> + * Wake up the next waiter in journal queue.
> + */
> +static inline void
> +journal_queue_wakeup_next(struct rlist *link, bool force_ready)

1. The flag is known in all usage places at compilation time. Is it
possible to split the function into force/normal versions? The same
for journal_queue_wakeup() from which this runtime uncertainty arises.

Also it is worth adding a comment why is force mode even needed.

> +{
> +	/* Empty queue or last entry in queue. */
> +	if (link == rlist_last(&current_journal->waiters)) {

2. I am not sure I understand what is happening here. Why is this
function in one place called with the pointer at the list itself,
and in another place with the pointer at one element?

> +		current_journal->queue_is_woken = false;
> +		return;
> +	}
> +	/*
> +	 * When the queue isn't forcefully emptied, no need to wake everyone
> +	 * else up until there's some free space.
> +	 */
> +	if (!force_ready && journal_queue_is_full()) {
> +		current_journal->queue_is_woken = false;

3. Maybe woken -> awake?

4. Why do you need the flag? Can you just remove the awake entries
from the queue right away? Then it wouldn't even be possible to make
a double wakeup. See comment 5.

> +		return;
> +	}
> +	struct journal_queue_entry *e = rlist_entry(rlist_next(link), typeof(*e),
> +						    in_queue);
> +	e->is_ready = force_ready;
> +	fiber_wakeup(e->fiber);
> +}
> +
> +void
> +journal_queue_wakeup(bool force_ready)
> +{
> +	assert(!rlist_empty(&current_journal->waiters));
> +	if (current_journal->queue_is_woken)
> +		return;
> +	current_journal->queue_is_woken = true;
> +	journal_queue_wakeup_next(&current_journal->waiters, force_ready);
> +}
> +
> +void
> +journal_wait_queue(void)
> +{
> +	struct journal_queue_entry entry = {
> +		.fiber = fiber(),
> +		.is_ready = false,
> +	};
> +	rlist_add_tail_entry(&current_journal->waiters, &entry, in_queue);
> +	/*
> +	 * Will be waken up by either queue emptying or a synchronous write.
> +	 */
> +	while (journal_queue_is_full() && !entry.is_ready)
> +		fiber_yield();
> +
> +	journal_queue_wakeup_next(&entry.in_queue, entry.is_ready);
> +	assert(&entry.in_queue == rlist_first(&current_journal->waiters));
> +	rlist_del(&entry.in_queue);

5. Can rlist_del be done along with fiber_wakeup()? Then you
wouldn't need is_woken maybe.

> +}
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 5d8d5a726..d295dfa4b 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -124,6 +143,81 @@ struct journal {
>  		     struct journal_entry *entry);
>  };
>  
> +/**
> + * Depending on the step of recovery and instance configuration
> + * points at a concrete implementation of the journal.
> + */
> +extern struct journal *current_journal;
> +
> +/** Wake the journal queue up. */
> +void
> +journal_queue_wakeup(bool force_ready);
> +
> +/**
> + * Check whether any of the queue size limits is reached.
> + * If the queue is full, we must wait for some of the entries to be written
> + * before proceeding with a new asynchronous write request.
> + */
> +static inline bool
> +journal_queue_is_full(void)
> +{
> +	struct journal *j = current_journal;
> +	return (j->queue_max_size != 0 && j->queue_size >= j->queue_max_size) ||
> +	       (j->queue_max_len != 0 && j->queue_len >= j->queue_max_len);

6. Seems like a lot of checks. Option 1: make queue_max_size = INT64_MAX
when user passes 0. Then no need to check for != 0. The same for queue_max_len.

Option 2 which may be stupid (but combined with option 1): store a flag
'is_full' and update it when update queue_size and queue_len and see they
exceeded the limit. But I am not sure it reduces number of branches. Didn't
check.

> +}
> +
> +/**
> + * Check whether anyone is waiting for the journal queue to empty. If there are
> + * other waiters we must go after them to preserve write order.
> + */
> +static inline bool
> +journal_queue_has_waiters(void)
> +{
> +	return !rlist_empty(&current_journal->waiters);
> +}
> +
> +/** Yield until there's some space in the journal queue. */
> +void
> +journal_wait_queue(void);
> +
> +/** Set maximal journal queue size in bytes. */
> +static inline void
> +journal_queue_set_max_size(struct journal *j, int64_t size)

7. Why do we have journal parameter here, but don't have it in
the other functions? The same journal_queue_set_max_len.

> +{
> +	assert(j == current_journal);
> +	j->queue_max_size = size;
> +	if (journal_queue_has_waiters() && !journal_queue_is_full())
> +		journal_queue_wakeup(false);
> +}
> @@ -159,6 +264,12 @@ journal_write(struct journal_entry *entry)
>  static inline int
>  journal_write_async(struct journal_entry *entry)
>  {
> +	/*
> +	 * It's the job of the caller to check whether the queue is full prior
> +	 * to submitting the request.

8. Maybe add an assert though.

> +	 */
> +	journal_queue_on_append(entry);
> +
>  	return current_journal->write_async(current_journal, entry);
>  }



More information about the Tarantool-patches mailing list