[Tarantool-patches] [PATCH v3] wal: introduce limits on simultaneous writes
    Serge Petrenko 
    sergepetrenko at tarantool.org
       
    Tue Mar  2 20:51:55 MSK 2021
    
    
  
02.03.2021 01:05, Vladislav Shpilevoy пишет:
> Hi! Thanks for the fixes!
>
>>>> @@ -159,6 +276,13 @@ journal_write(struct journal_entry *entry)
>>>>    static inline int
>>>>    journal_write_async(struct journal_entry *entry)
>>>>    {
>>>> +    /*
>>>> +     * It's the job of the caller to check whether the queue is full prior
>>>> +     * to submitting the request.
>>>> +     */
>>>> +    assert(!journal_queue_is_full() || journal_queue.is_ready);
>>>> +    journal_queue_on_append(entry);
>>> 8. Probably must assert that waiters list is empty. Otherwise you
>>> could go out of order.
>> It's not empty by the time the first entry gets to 'journal_write_async'.
>> Everyone else is waken up, but not yet removed from the queue.
>>
>> Looks like we cannot determine whether a write is called after waiting in queue
>> or not.
> It bothers me a lot, the rest looks good. We don't have any protection against
> a reorder, even not an assertion. How about this? (I didn't test):
Thanks for thinking this through!
Yes, indeed, this is a problem. Unfortunately, your solution will allow 
reordering.
We needed the waiter count to prevent anyone from sliding in front when 
the qeueue
is already waken, but the fibers are not yet scheduled.
We also discussed that a reorder may happen if applier tries to apply
a vinyl tx, which may yield after it has waited in queue.
In this case, if some tx is committed (synchronously) and added to the queue
while the vinyl tx sleeps, the vinyl tx will bypass that synchronous tx 
in queue.
So, there shouldn't be any yields between journal_queue_wait() and
journal_write(_async)().
We cannot yield in memtx transactions, so the only solution is to put
journal_queue_wait() after txn_prepare() inside txn_commit_async().
Take a look at this patch (on top of the branch):
I decided to move journal_queue_wait() to journal_write_async() to be
consistent with journal_write(), which also checks queue.
Now reordering is not a problem since waiting in queue is done right before
the write request is submitted. No yields in between.
Also, I renamed journal_write_async() to journal_write_try_async() and
txn_commit_async() to txn_commit_try_async() to indicate that they may
yield occasionally now.
What do you think of this approach?
P.S. there's another thing I wanted to discuss: should we set defaults for
wal_queue_max_size(len) to some finite value? If yes, then what value should
it be? wal_queue_max_size=default memtx_memory (256 Mb)?
Don't know which value to choose for wal_queue_max_len at all.
===================================
Subject: [PATCH] [tosquash] journal_write and txn_commit: async -> try_async
Make journal_write_async() wait for journal queue. Rename it to
journal_write_try_async() to show that it might yield now (rarely).
Same for txn_commit_async(): rename it to txn_commit_try_async() since
it uses journal_write_try_async().
---
  src/box/applier.cc  | 12 ++----------
  src/box/box.cc      |  4 ++--
  src/box/journal.h   |  7 ++-----
  src/box/txn.c       |  4 ++--
  src/box/txn.h       |  3 ++-
  src/box/txn_limbo.h |  2 +-
  6 files changed, 11 insertions(+), 21 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index dff43795c..5a88a013e 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -868,7 +868,7 @@ apply_synchro_row(struct xrow_header *row)
      if (entry == NULL)
          goto err;
-    if (journal_write_async(&entry->journal_entry) != 0) {
+    if (journal_write_try_async(&entry->journal_entry) != 0) {
          diag_set(ClientError, ER_WAL_IO);
          goto err;
      }
@@ -967,14 +967,6 @@ applier_apply_tx(struct applier *applier, struct 
stailq *rows)
          goto success;
      }
-    /*
-     * Do not spam WAL with excess write requests, let it process what's
-     * piled up first.
-     * This is done before opening the transaction to avoid problems with
-     * yielding inside it.
-     */
-    journal_queue_wait();
-
      /**
       * Explicitly begin the transaction so that we can
       * control fiber->gc life cycle and, in case of apply
@@ -1048,7 +1040,7 @@ applier_apply_tx(struct applier *applier, struct 
stailq *rows)
      trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
      txn_on_wal_write(txn, on_wal_write);
-    if (txn_commit_async(txn) < 0)
+    if (txn_commit_try_async(txn) < 0)
          goto fail;
  success:
diff --git a/src/box/box.cc b/src/box/box.cc
index 9a3b092d0..a8aa2663f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -246,12 +246,12 @@ box_process_rw(struct request *request, struct 
space *space,
           * synchronous tx it meets until confirm timeout
           * is reached and the tx is rolled back, yielding
           * an error.
-         * Moreover, txn_commit_async() doesn't hurt at
+         * Moreover, txn_commit_try_async() doesn't hurt at
           * all during local recovery, since journal_write
           * is faked at this stage and returns immediately.
           */
          if (is_local_recovery) {
-            res = txn_commit_async(txn);
+            res = txn_commit_try_async(txn);
          } else {
              res = txn_commit(txn);
          }
diff --git a/src/box/journal.h b/src/box/journal.h
index 5f0e0accd..3a945fa53 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -253,12 +253,9 @@ journal_write(struct journal_entry *entry)
   * @return 0 if write was queued to a backend or -1 in case of an error.
   */
  static inline int
-journal_write_async(struct journal_entry *entry)
+journal_write_try_async(struct journal_entry *entry)
  {
-    /*
-     * It's the job of the caller to check whether the queue is full prior
-     * to submitting the request.
-     */
+    journal_queue_wait();
      journal_queue_on_append(entry);
      return current_journal->write_async(current_journal, entry);
diff --git a/src/box/txn.c b/src/box/txn.c
index 71c89ce5f..40061ff09 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -789,7 +789,7 @@ txn_limbo_on_rollback(struct trigger *trig, void *event)
  }
  int
-txn_commit_async(struct txn *txn)
+txn_commit_try_async(struct txn *txn)
  {
      struct journal_entry *req;
@@ -859,7 +859,7 @@ txn_commit_async(struct txn *txn)
      }
      fiber_set_txn(fiber(), NULL);
-    if (journal_write_async(req) != 0) {
+    if (journal_write_try_async(req) != 0) {
          fiber_set_txn(fiber(), txn);
          diag_set(ClientError, ER_WAL_IO);
          diag_log();
diff --git a/src/box/txn.h b/src/box/txn.h
index 29fe6d5ce..a45518064 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -473,12 +473,13 @@ txn_rollback(struct txn *txn);
   * journal write completion. Note, the journal write may still fail.
   * To track transaction status, one is supposed to use on_commit and
   * on_rollback triggers.
+ * Note, this may yield occasionally, once journal queue gets full.
   *
   * On failure -1 is returned and the transaction is rolled back and
   * freed.
   */
  int
-txn_commit_async(struct txn *txn);
+txn_commit_try_async(struct txn *txn);
  /**
   * Most txns don't have triggers, and txn objects
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index c28b5666d..af0addf8d 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -229,7 +229,7 @@ txn_limbo_assign_local_lsn(struct txn_limbo *limbo,
   * remote transactions. The function exists to be used in a
   * context, where a transaction is not known whether it is local
   * or not. For example, when a transaction is committed not bound
- * to any fiber (txn_commit_async()), it can be created by applier
+ * to any fiber (txn_commit_try_async()), it can be created by applier
   * (then it is remote) or by recovery (then it is local). Besides,
   * recovery can commit remote transactions as well, when works on
   * a replica - it will recover data received from master.
-- 
2.24.3 (Apple Git-128)
===================================
>
> ====================
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -69,8 +69,11 @@ void
>   journal_queue_wakeup(void)
>   {
>   	struct rlist *list = &journal_queue.waiters;
> -	if (!rlist_empty(list) && !journal_queue_is_full())
> +	if (!rlist_empty(list) && !journal_queue_is_full()) {
>   		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
> +		--journal_queue.waiter_count;
> +		assert(journal_queue.waiter_count >= 0);
> +	}
>   }
>   
>   void
> @@ -84,7 +87,6 @@ journal_queue_wait(void)
>   	 * Will be waken up by either queue emptying or a synchronous write.
>   	 */
>   	fiber_yield();
> -	--journal_queue.waiter_count;
>   	journal_queue_wakeup();
>   }
>   
> @@ -96,5 +98,6 @@ journal_queue_flush(void)
>   	struct rlist *list = &journal_queue.waiters;
>   	while (!rlist_empty(list))
>   		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
> +	journal_queue.waiter_count = 0;
>   	journal_queue_wait();
>   }
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 5f0e0accd..ea56043e2 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -260,6 +260,7 @@ journal_write_async(struct journal_entry *entry)
>   	 * to submitting the request.
>   	 */
>   	journal_queue_on_append(entry);
> +	assert(journal_queue.waiter_count == 0);
>   
>   	return current_journal->write_async(current_journal, entry);
>   }
-- 
Serge Petrenko
    
    
More information about the Tarantool-patches
mailing list