[Tarantool-patches] [PATCH v3 01/10] wal: enrich row's meta information with sync replication flags
Serge Petrenko
sergepetrenko at tarantool.org
Fri Apr 16 11:57:25 MSK 2021
16.04.2021 10:08, Serge Petrenko via Tarantool-patches пишет:
>
>
> 16.04.2021 02:18, Vladislav Shpilevoy пишет:
>> Good job on the patch!
>>
>> See 3 comments below.
>
> Hi! Thanks for the review!
>
>>> diff --git a/src/box/journal.h b/src/box/journal.h
>>> index 76c70c19f..3ce9c869e 100644
>>> --- a/src/box/journal.h
>>> +++ b/src/box/journal.h
>>> @@ -63,6 +63,7 @@ struct journal_entry {
>>> * A journal entry completion callback argument.
>>> */
>>> void *complete_data;
>>> + uint8_t opt_flags;
>> 1. I propose to call them just flags. There is no a third value
>> like 'no flag'. They are either set or not, am I right? Also the
>> member is missing a comment. The most important thing to say -
>> these flags are only for the last row.
>
> Ok, fixed.
>
>>
>>> /**
>>> * Asynchronous write completion function.
>>> */
>>> @@ -97,6 +98,7 @@ journal_entry_create(struct journal_entry *entry,
>>> size_t n_rows,
>>> entry->approx_len = approx_len;
>>> entry->n_rows = n_rows;
>>> entry->res = -1;
>>> + entry->opt_flags = 0;
>> 2. You could initialize it with IPROTO_FLAG_COMMIT right here and
>> drop (*row)->is_commit = true from wal_assign_lsn. But this one up
>> to you. Maybe it is not a good idea.
>
> This would look better, indeed, but neither journal nor wal know
> about iproto constants. And I don't think it's a good idea to
> introduce such a dependency.
>
> I can add entry->flags |= IPROTO_FLAG_COMMIT to
> txn_journal_entry_new().
> I actually like how this turned out. It's none of WAL's or journal's
> business which row is commit and which isn't.
Forget about that. Let's leave (row)->is_commit = true in wal_assign_lsn().
Otherwise I need to set entry->flags for synchro entries and raft
entries, which
aren't transactions, actually, so their journal entries do not go through
txn_journal_entry_new().
========================================
diff --git a/src/box/txn.c b/src/box/txn.c
index 31f664aa0..a71ccadd0 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -676,9 +676,6 @@ txn_journal_entry_new(struct txn *txn)
req->flags |= flags_map[txn->flags & TXN_WAIT_SYNC];
req->flags |= flags_map[txn->flags & TXN_WAIT_ACK];
- /* is_commit is always set for the last tx row. */
- req->flags |= IPROTO_FLAG_COMMIT;
-
return req;
}
diff --git a/src/box/wal.c b/src/box/wal.c
index 53d896972..5b6200b81 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -997,8 +997,10 @@ wal_assign_lsn(struct vclock *vclock_diff, struct
vclock *base,
}
(*row)->tsn = tsn == 0 ? (*start)->lsn : tsn;
/* Tx meta is stored in the last tx row. */
- if (row == end - 1)
+ if (row == end - 1) {
(*row)->flags = entry->flags;
+ (*row)->is_commit = true;
+ }
} else {
int64_t diff = (*row)->lsn - vclock_get(base,
(*row)->replica_id);
if (diff <= vclock_get(vclock_diff,
>
>>
>>> }
>>> /**
>>> diff --git a/src/box/wal.c b/src/box/wal.c
>>> index 34af0bda6..4ec8034a3 100644
>>> --- a/src/box/wal.c
>>> +++ b/src/box/wal.c
>>> @@ -962,14 +962,14 @@ out:
>>> */
>>> static void
>>> wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
>>> - struct xrow_header **row,
>>> - struct xrow_header **end)
>>> + struct journal_entry *entry)
>> 3. This part could be a separate commit, otherwise it is hard to
>> see the functional changes. Up to you if you want to split.
>
> Good idea, let's do that.
>
> Incremental diff for this commit is below and the extracted commit
> regarding wal_assign_lsn() refactoring is in reply to this email.
>
>>> {
>>> int64_t tsn = 0;
>>> - struct xrow_header **start = row;
>>> - struct xrow_header **first_glob_row = row;
>>> + struct xrow_header **start = entry->rows;
>>> + struct xrow_header **end = entry->rows + entry->n_rows;
>>> + struct xrow_header **first_glob_row = entry->rows;
>>> /** Assign LSN to all local rows. */
>>> - for ( ; row < end; row++) {
>>> + for (struct xrow_header **row = start; row < end; row++) {
>>> if ((*row)->replica_id == 0) {
>>> /*
>>> * All rows representing local space data
>
> ========================================================
>
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 3ce9c869e..8f3d56a61 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -63,7 +63,8 @@ struct journal_entry {
> * A journal entry completion callback argument.
> */
> void *complete_data;
> - uint8_t opt_flags;
> + /** Flags that should be set for the last entry row. */
> + uint8_t flags;
> /**
> * Asynchronous write completion function.
> */
> @@ -98,7 +99,7 @@ journal_entry_create(struct journal_entry *entry,
> size_t n_rows,
> entry->approx_len = approx_len;
> entry->n_rows = n_rows;
> entry->res = -1;
> - entry->opt_flags = 0;
> + entry->flags = 0;
> }
>
> /**
> diff --git a/src/box/txn.c b/src/box/txn.c
> index e090d58fc..31f664aa0 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -76,7 +76,7 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt,
> struct request *request)
> row->lsn = 0;
> row->sync = 0;
> row->tm = 0;
> - row->opt_flags = 0;
> + row->flags = 0;
> }
> /*
> * Group ID should be set both for requests not having a
> @@ -668,13 +668,16 @@ txn_journal_entry_new(struct txn *txn)
> --req->n_rows;
> }
>
> - static const uint8_t opt_flags_map[] = {
> + static const uint8_t flags_map[] = {
> [TXN_WAIT_SYNC] = IPROTO_FLAG_WAIT_SYNC,
> [TXN_WAIT_ACK] = IPROTO_FLAG_WAIT_ACK,
> };
>
> - req->opt_flags |= opt_flags_map[txn->flags & TXN_WAIT_SYNC];
> - req->opt_flags |= opt_flags_map[txn->flags & TXN_WAIT_ACK];
> + req->flags |= flags_map[txn->flags & TXN_WAIT_SYNC];
> + req->flags |= flags_map[txn->flags & TXN_WAIT_ACK];
> +
> + /* is_commit is always set for the last tx row. */
> + req->flags |= IPROTO_FLAG_COMMIT;
>
> return req;
> }
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 4ec8034a3..53d896972 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -997,10 +997,8 @@ wal_assign_lsn(struct vclock *vclock_diff, struct
> vclock *base,
> }
> (*row)->tsn = tsn == 0 ? (*start)->lsn : tsn;
> /* Tx meta is stored in the last tx row. */
> - if (row == end - 1) {
> - (*row)->opt_flags = entry->opt_flags;
> - (*row)->is_commit = true;
> - }
> + if (row == end - 1)
> + (*row)->flags = entry->flags;
> } else {
> int64_t diff = (*row)->lsn - vclock_get(base,
> (*row)->replica_id);
> if (diff <= vclock_get(vclock_diff,
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index ba121799b..35e1d1c20 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -183,7 +183,7 @@ error:
> break;
> case IPROTO_FLAGS:
> flags = mp_decode_uint(pos);
> - header->opt_flags = flags;
> + header->flags = flags;
> break;
> default:
> /* unknown header */
> @@ -299,7 +299,7 @@ xrow_header_encode(const struct xrow_header
> *header, uint64_t sync,
> * flag to find transaction boundary (last row in the
> * transaction stream).
> */
> - uint8_t flags_to_encode = header->opt_flags & ~IPROTO_FLAG_COMMIT;
> + uint8_t flags_to_encode = header->flags & ~IPROTO_FLAG_COMMIT;
> if (header->tsn != 0) {
> if (header->tsn != header->lsn || !header->is_commit) {
> /*
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index 0526e3cd9..5ea99e792 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -82,7 +82,7 @@ struct xrow_header {
> int64_t tsn;
> /** Transaction meta flags set only in the last transaction row. */
> union {
> - uint8_t opt_flags;
> + uint8_t flags;
> struct {
> /**
> * Is only encoded in the write ahead log for
> diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
> index 3d7d8bee1..b6018eed9 100644
> --- a/test/unit/xrow.cc
> +++ b/test/unit/xrow.cc
> @@ -302,7 +302,7 @@ test_request_str()
> * still we rely on it for convenience sake.
> */
> static void
> -test_xrow_opt_field()
> +test_xrow_fields()
> {
> plan(6);
>
> @@ -311,24 +311,24 @@ test_xrow_opt_field()
> memset(&header, 0, sizeof(header));
>
> header.is_commit = true;
> - is(header.opt_flags, IPROTO_FLAG_COMMIT, "header.is_commit ->
> COMMIT");
> + is(header.flags, IPROTO_FLAG_COMMIT, "header.is_commit -> COMMIT");
> header.is_commit = false;
>
> header.wait_sync = true;
> - is(header.opt_flags, IPROTO_FLAG_WAIT_SYNC, "header.wait_sync ->
> WAIT_SYNC");
> + is(header.flags, IPROTO_FLAG_WAIT_SYNC, "header.wait_sync ->
> WAIT_SYNC");
> header.wait_sync = false;
>
> header.wait_ack = true;
> - is(header.opt_flags, IPROTO_FLAG_WAIT_ACK, "header.wait_ack ->
> WAIT_ACK");
> + is(header.flags, IPROTO_FLAG_WAIT_ACK, "header.wait_ack ->
> WAIT_ACK");
> header.wait_ack = false;
>
> - header.opt_flags = IPROTO_FLAG_COMMIT;
> + header.flags = IPROTO_FLAG_COMMIT;
> ok(header.is_commit && !header.wait_sync && !header.wait_ack,
> "COMMIT -> header.is_commit");
>
> - header.opt_flags = IPROTO_FLAG_WAIT_SYNC;
> + header.flags = IPROTO_FLAG_WAIT_SYNC;
> ok(!header.is_commit && header.wait_sync && !header.wait_ack,
> "WAIT_SYNC -> header.wait_sync");
>
> - header.opt_flags = IPROTO_FLAG_WAIT_ACK;
> + header.flags = IPROTO_FLAG_WAIT_ACK;
> ok(!header.is_commit && !header.wait_sync && header.wait_ack,
> "WAIT_ACK -> header.wait_ack");
>
> check_plan();
> @@ -347,7 +347,7 @@ main(void)
> test_greeting();
> test_xrow_header_encode_decode();
> test_request_str();
> - test_xrow_opt_field();
> + test_xrow_fields();
>
> random_free();
> fiber_free();
>
--
Serge Petrenko
More information about the Tarantool-patches
mailing list