[Tarantool-patches] [PATCH v3 01/10] wal: enrich row's meta information with sync replication flags

Serge Petrenko sergepetrenko at tarantool.org
Fri Apr 16 11:57:25 MSK 2021



16.04.2021 10:08, Serge Petrenko via Tarantool-patches пишет:
>
>
> 16.04.2021 02:18, Vladislav Shpilevoy пишет:
>> Good job on the patch!
>>
>> See 3 comments below.
>
> Hi! Thanks for the review!
>
>>> diff --git a/src/box/journal.h b/src/box/journal.h
>>> index 76c70c19f..3ce9c869e 100644
>>> --- a/src/box/journal.h
>>> +++ b/src/box/journal.h
>>> @@ -63,6 +63,7 @@ struct journal_entry {
>>>        * A journal entry completion callback argument.
>>>        */
>>>       void *complete_data;
>>> +    uint8_t opt_flags;
>> 1. I propose to call them just flags. There is no a third value
>> like 'no flag'. They are either set or not, am I right? Also the
>> member is missing a comment. The most important thing to say -
>> these flags are only for the last row.
>
> Ok, fixed.
>
>>
>>>       /**
>>>        * Asynchronous write completion function.
>>>        */
>>> @@ -97,6 +98,7 @@ journal_entry_create(struct journal_entry *entry, 
>>> size_t n_rows,
>>>       entry->approx_len    = approx_len;
>>>       entry->n_rows        = n_rows;
>>>       entry->res        = -1;
>>> +    entry->opt_flags    = 0;
>> 2. You could initialize it with IPROTO_FLAG_COMMIT right here and
>> drop (*row)->is_commit = true from wal_assign_lsn. But this one up
>> to you. Maybe it is not a good idea.
>
> This would look better, indeed, but neither journal nor wal know
> about iproto constants. And I don't think it's a good idea to
> introduce such a dependency.
>
> I can add entry->flags |= IPROTO_FLAG_COMMIT to
> txn_journal_entry_new().
> I actually like how this turned out. It's none of WAL's or journal's
> business which row is commit and which isn't.

Forget about that. Let's leave (row)->is_commit = true in wal_assign_lsn().

Otherwise I need to set entry->flags for synchro entries and raft 
entries, which
aren't transactions, actually, so their journal entries do not go through
txn_journal_entry_new().

========================================

diff --git a/src/box/txn.c b/src/box/txn.c
index 31f664aa0..a71ccadd0 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -676,9 +676,6 @@ txn_journal_entry_new(struct txn *txn)
         req->flags |= flags_map[txn->flags & TXN_WAIT_SYNC];
         req->flags |= flags_map[txn->flags & TXN_WAIT_ACK];

-       /* is_commit is always set for the last tx row. */
-       req->flags |= IPROTO_FLAG_COMMIT;
-
         return req;
  }

diff --git a/src/box/wal.c b/src/box/wal.c
index 53d896972..5b6200b81 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -997,8 +997,10 @@ wal_assign_lsn(struct vclock *vclock_diff, struct 
vclock *base,
                         }
                         (*row)->tsn = tsn == 0 ? (*start)->lsn : tsn;
                         /* Tx meta is stored in the last tx row. */
-                       if (row == end - 1)
+                       if (row == end - 1) {
                                 (*row)->flags = entry->flags;
+                               (*row)->is_commit = true;
+                       }
                 } else {
                         int64_t diff = (*row)->lsn - vclock_get(base, 
(*row)->replica_id);
                         if (diff <= vclock_get(vclock_diff,
>
>>
>>>   }
>>>     /**
>>> diff --git a/src/box/wal.c b/src/box/wal.c
>>> index 34af0bda6..4ec8034a3 100644
>>> --- a/src/box/wal.c
>>> +++ b/src/box/wal.c
>>> @@ -962,14 +962,14 @@ out:
>>>    */
>>>   static void
>>>   wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
>>> -           struct xrow_header **row,
>>> -           struct xrow_header **end)
>>> +           struct journal_entry *entry)
>> 3. This part could be a separate commit, otherwise it is hard to
>> see the functional changes. Up to you if you want to split.
>
> Good idea, let's do that.
>
> Incremental diff for this commit is below and the extracted commit
> regarding wal_assign_lsn() refactoring is in reply to this email.
>
>>>   {
>>>       int64_t tsn = 0;
>>> -    struct xrow_header **start = row;
>>> -    struct xrow_header **first_glob_row = row;
>>> +    struct xrow_header **start = entry->rows;
>>> +    struct xrow_header **end = entry->rows + entry->n_rows;
>>> +    struct xrow_header **first_glob_row = entry->rows;
>>>       /** Assign LSN to all local rows. */
>>> -    for ( ; row < end; row++) {
>>> +    for (struct xrow_header **row = start; row < end; row++) {
>>>           if ((*row)->replica_id == 0) {
>>>               /*
>>>                * All rows representing local space data
>
> ========================================================
>
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 3ce9c869e..8f3d56a61 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -63,7 +63,8 @@ struct journal_entry {
>       * A journal entry completion callback argument.
>       */
>      void *complete_data;
> -    uint8_t opt_flags;
> +    /** Flags that should be set for the last entry row. */
> +    uint8_t flags;
>      /**
>       * Asynchronous write completion function.
>       */
> @@ -98,7 +99,7 @@ journal_entry_create(struct journal_entry *entry, 
> size_t n_rows,
>      entry->approx_len    = approx_len;
>      entry->n_rows        = n_rows;
>      entry->res        = -1;
> -    entry->opt_flags    = 0;
> +    entry->flags        = 0;
>  }
>
>  /**
> diff --git a/src/box/txn.c b/src/box/txn.c
> index e090d58fc..31f664aa0 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -76,7 +76,7 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, 
> struct request *request)
>          row->lsn = 0;
>          row->sync = 0;
>          row->tm = 0;
> -        row->opt_flags = 0;
> +        row->flags = 0;
>      }
>      /*
>       * Group ID should be set both for requests not having a
> @@ -668,13 +668,16 @@ txn_journal_entry_new(struct txn *txn)
>          --req->n_rows;
>      }
>
> -    static const uint8_t opt_flags_map[] = {
> +    static const uint8_t flags_map[] = {
>          [TXN_WAIT_SYNC] = IPROTO_FLAG_WAIT_SYNC,
>          [TXN_WAIT_ACK] = IPROTO_FLAG_WAIT_ACK,
>      };
>
> -    req->opt_flags |= opt_flags_map[txn->flags & TXN_WAIT_SYNC];
> -    req->opt_flags |= opt_flags_map[txn->flags & TXN_WAIT_ACK];
> +    req->flags |= flags_map[txn->flags & TXN_WAIT_SYNC];
> +    req->flags |= flags_map[txn->flags & TXN_WAIT_ACK];
> +
> +    /* is_commit is always set for the last tx row. */
> +    req->flags |= IPROTO_FLAG_COMMIT;
>
>      return req;
>  }
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 4ec8034a3..53d896972 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -997,10 +997,8 @@ wal_assign_lsn(struct vclock *vclock_diff, struct 
> vclock *base,
>              }
>              (*row)->tsn = tsn == 0 ? (*start)->lsn : tsn;
>              /* Tx meta is stored in the last tx row. */
> -            if (row == end - 1) {
> -                (*row)->opt_flags = entry->opt_flags;
> -                (*row)->is_commit = true;
> -            }
> +            if (row == end - 1)
> +                (*row)->flags = entry->flags;
>          } else {
>              int64_t diff = (*row)->lsn - vclock_get(base, 
> (*row)->replica_id);
>              if (diff <= vclock_get(vclock_diff,
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index ba121799b..35e1d1c20 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -183,7 +183,7 @@ error:
>              break;
>          case IPROTO_FLAGS:
>              flags = mp_decode_uint(pos);
> -            header->opt_flags = flags;
> +            header->flags = flags;
>              break;
>          default:
>              /* unknown header */
> @@ -299,7 +299,7 @@ xrow_header_encode(const struct xrow_header 
> *header, uint64_t sync,
>       *   flag to find transaction boundary (last row in the
>       *   transaction stream).
>       */
> -    uint8_t flags_to_encode = header->opt_flags & ~IPROTO_FLAG_COMMIT;
> +    uint8_t flags_to_encode = header->flags & ~IPROTO_FLAG_COMMIT;
>      if (header->tsn != 0) {
>          if (header->tsn != header->lsn || !header->is_commit) {
>              /*
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index 0526e3cd9..5ea99e792 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -82,7 +82,7 @@ struct xrow_header {
>      int64_t tsn;
>      /** Transaction meta flags set only in the last transaction row. */
>      union {
> -        uint8_t opt_flags;
> +        uint8_t flags;
>          struct {
>              /**
>               * Is only encoded in the write ahead log for
> diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
> index 3d7d8bee1..b6018eed9 100644
> --- a/test/unit/xrow.cc
> +++ b/test/unit/xrow.cc
> @@ -302,7 +302,7 @@ test_request_str()
>   * still we rely on it for convenience sake.
>   */
>  static void
> -test_xrow_opt_field()
> +test_xrow_fields()
>  {
>      plan(6);
>
> @@ -311,24 +311,24 @@ test_xrow_opt_field()
>      memset(&header, 0, sizeof(header));
>
>      header.is_commit = true;
> -    is(header.opt_flags, IPROTO_FLAG_COMMIT, "header.is_commit -> 
> COMMIT");
> +    is(header.flags, IPROTO_FLAG_COMMIT, "header.is_commit -> COMMIT");
>      header.is_commit = false;
>
>      header.wait_sync = true;
> -    is(header.opt_flags, IPROTO_FLAG_WAIT_SYNC, "header.wait_sync -> 
> WAIT_SYNC");
> +    is(header.flags, IPROTO_FLAG_WAIT_SYNC, "header.wait_sync -> 
> WAIT_SYNC");
>      header.wait_sync = false;
>
>      header.wait_ack = true;
> -    is(header.opt_flags, IPROTO_FLAG_WAIT_ACK, "header.wait_ack -> 
> WAIT_ACK");
> +    is(header.flags, IPROTO_FLAG_WAIT_ACK, "header.wait_ack -> 
> WAIT_ACK");
>      header.wait_ack = false;
>
> -    header.opt_flags = IPROTO_FLAG_COMMIT;
> +    header.flags = IPROTO_FLAG_COMMIT;
>      ok(header.is_commit && !header.wait_sync && !header.wait_ack, 
> "COMMIT -> header.is_commit");
>
> -    header.opt_flags = IPROTO_FLAG_WAIT_SYNC;
> +    header.flags = IPROTO_FLAG_WAIT_SYNC;
>      ok(!header.is_commit && header.wait_sync && !header.wait_ack, 
> "WAIT_SYNC -> header.wait_sync");
>
> -    header.opt_flags = IPROTO_FLAG_WAIT_ACK;
> +    header.flags = IPROTO_FLAG_WAIT_ACK;
>      ok(!header.is_commit && !header.wait_sync && header.wait_ack, 
> "WAIT_ACK -> header.wait_ack");
>
>      check_plan();
> @@ -347,7 +347,7 @@ main(void)
>      test_greeting();
>      test_xrow_header_encode_decode();
>      test_request_str();
> -    test_xrow_opt_field();
> +    test_xrow_fields();
>
>      random_free();
>      fiber_free();
>

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list