From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: v.shpilevoy@tarantool.org, gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org Subject: [Tarantool-patches] [PATCH v4 02/12] xrow: enrich row's meta information with sync replication flags Date: Fri, 16 Apr 2021 19:25:33 +0300 [thread overview] Message-ID: <501dee051380aa635ecf8cd5a0f844741e121f0c.1618590211.git.sergepetrenko@tarantool.org> (raw) In-Reply-To: <cover.1618590211.git.sergepetrenko@tarantool.org> Introduce two new flags to xrow_header: `wait_ack` and `wait_sync`. These flags are set for rows belonging to synchronous transactions in addition to `is_commit`. The new flags help to define whether the rows belong to a synchronous transaction or not without parsing them all and checking whether any of the rows touches a synchronous space. This will be used in applier once it is taught to filter synchronous transactions based on whether they are coming from a raft leader or not. P.S. These flags will also be useful once we allow to turn any transaction synchronous. Once this is done, the flags in row header will be the only source of information on whether the transaction is synchronous or not. Prerequisite #5445 @TarantoolBot document Title: new values for IPROTO_FLAGS field IPROTO_FLAGS bitfield is enriched with two new constant: IPROTO_FLAG_WAIT_SYNC = 0x02 IPROTO_FLAG_WAIT_ACK = 0x04 IPROTO_FLAG_WAIT_SYNC is set for the last row of a transaction which cannot be committed immediately: either because it is synchronous or because it waits for other synchronous transactions to complete. IPROTO_FLAG_WAIT_ACK is set for the last synchronous transaction row. --- src/box/iproto_constants.h | 5 ++ src/box/journal.h | 3 + src/box/txn.c | 9 +++ src/box/wal.c | 6 +- src/box/xrow.c | 13 ++-- src/box/xrow.h | 30 ++++++--- test/unit/xrow.cc | 104 +++++++++++++++++++++++------ test/unit/xrow.result | 133 ++++++++++++++++++++++++++++++++++--- 8 files changed, 256 insertions(+), 47 deletions(-) diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index b07a73b20..e9d1ef5d6 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -49,9 +49,14 @@ enum { XLOG_FIXHEADER_SIZE = 19 }; +/** IPROTO_FLAGS bitfield constants. */ enum { /** Set for the last xrow in a transaction. */ IPROTO_FLAG_COMMIT = 0x01, + /** Set for the last row of a tx residing in limbo. */ + IPROTO_FLAG_WAIT_SYNC = 0x02, + /** Set for the last row of a synchronous tx. */ + IPROTO_FLAG_WAIT_ACK = 0x04, }; enum iproto_key { diff --git a/src/box/journal.h b/src/box/journal.h index 76c70c19f..8f3d56a61 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -63,6 +63,8 @@ struct journal_entry { * A journal entry completion callback argument. */ void *complete_data; + /** Flags that should be set for the last entry row. */ + uint8_t flags; /** * Asynchronous write completion function. */ @@ -97,6 +99,7 @@ journal_entry_create(struct journal_entry *entry, size_t n_rows, entry->approx_len = approx_len; entry->n_rows = n_rows; entry->res = -1; + entry->flags = 0; } /** diff --git a/src/box/txn.c b/src/box/txn.c index c56725cea..a71ccadd0 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -76,6 +76,7 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request) row->lsn = 0; row->sync = 0; row->tm = 0; + row->flags = 0; } /* * Group ID should be set both for requests not having a @@ -667,6 +668,14 @@ txn_journal_entry_new(struct txn *txn) --req->n_rows; } + static const uint8_t flags_map[] = { + [TXN_WAIT_SYNC] = IPROTO_FLAG_WAIT_SYNC, + [TXN_WAIT_ACK] = IPROTO_FLAG_WAIT_ACK, + }; + + req->flags |= flags_map[txn->flags & TXN_WAIT_SYNC]; + req->flags |= flags_map[txn->flags & TXN_WAIT_ACK]; + return req; } diff --git a/src/box/wal.c b/src/box/wal.c index 95ee8e200..5b6200b81 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -996,7 +996,11 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, first_glob_row = row; } (*row)->tsn = tsn == 0 ? (*start)->lsn : tsn; - (*row)->is_commit = row == end - 1; + /* Tx meta is stored in the last tx row. */ + if (row == end - 1) { + (*row)->flags = entry->flags; + (*row)->is_commit = true; + } } else { int64_t diff = (*row)->lsn - vclock_get(base, (*row)->replica_id); if (diff <= vclock_get(vclock_diff, diff --git a/src/box/xrow.c b/src/box/xrow.c index 7368eccff..35e1d1c20 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -183,7 +183,7 @@ error: break; case IPROTO_FLAGS: flags = mp_decode_uint(pos); - header->is_commit = flags & IPROTO_FLAG_COMMIT; + header->flags = flags; break; default: /* unknown header */ @@ -299,6 +299,7 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync, * flag to find transaction boundary (last row in the * transaction stream). */ + uint8_t flags_to_encode = header->flags & ~IPROTO_FLAG_COMMIT; if (header->tsn != 0) { if (header->tsn != header->lsn || !header->is_commit) { /* @@ -314,12 +315,14 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync, map_size++; } if (header->is_commit && header->tsn != header->lsn) { - /* Setup last row for multi row transaction. */ - d = mp_encode_uint(d, IPROTO_FLAGS); - d = mp_encode_uint(d, IPROTO_FLAG_COMMIT); - map_size++; + flags_to_encode |= IPROTO_FLAG_COMMIT; } } + if (flags_to_encode != 0) { + d = mp_encode_uint(d, IPROTO_FLAGS); + d = mp_encode_uint(d, flags_to_encode); + map_size++; + } assert(d <= data + XROW_HEADER_LEN_MAX); mp_encode_map(data, map_size); out->iov_len = d - (char *) out->iov_base; diff --git a/src/box/xrow.h b/src/box/xrow.h index 69337a226..5ea99e792 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -80,14 +80,28 @@ struct xrow_header { * transaction. */ int64_t tsn; - /** - * True for the last row in a multi-statement transaction, - * or single-statement transaction. Is only encoded in the - * write ahead log for multi-statement transactions. - * Single-statement transactions do not encode - * tsn and is_commit flag to save space. - */ - bool is_commit; + /** Transaction meta flags set only in the last transaction row. */ + union { + uint8_t flags; + struct { + /** + * Is only encoded in the write ahead log for + * multi-statement transactions. Single-statement + * transactions do not encode tsn and is_commit flag to + * save space. + */ + bool is_commit : 1; + /** + * True for any transaction that would enter the limbo + * (not necessarily a synchronous one). + */ + bool wait_sync : 1; + /** + * True for a synchronous transaction. + */ + bool wait_ack : 1; + }; + }; int bodycnt; uint32_t schema_version; diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc index 9fd154719..b6018eed9 100644 --- a/test/unit/xrow.cc +++ b/test/unit/xrow.cc @@ -204,7 +204,9 @@ test_greeting() void test_xrow_header_encode_decode() { - plan(10); + /* Test all possible 3-bit combinations. */ + const int bit_comb_count = 1 << 3; + plan(1 + bit_comb_count); struct xrow_header header; char buffer[2048]; char *pos = mp_encode_uint(buffer, 300); @@ -217,27 +219,47 @@ test_xrow_header_encode_decode() header.tm = 123.456; header.bodycnt = 0; header.tsn = header.lsn; - header.is_commit = true; uint64_t sync = 100500; - struct iovec vec[1]; - is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); - int fixheader_len = 200; - pos = (char *)vec[0].iov_base + fixheader_len; - is(mp_decode_map((const char **)&pos), 5, "header map size"); - - struct xrow_header decoded_header; - const char *begin = (const char *)vec[0].iov_base; - begin += fixheader_len; - const char *end = (const char *)vec[0].iov_base; - end += vec[0].iov_len; - is(xrow_header_decode(&decoded_header, &begin, end, true), 0, - "header decode"); - is(header.type, decoded_header.type, "decoded type"); - is(header.replica_id, decoded_header.replica_id, "decoded replica_id"); - is(header.lsn, decoded_header.lsn, "decoded lsn"); - is(header.tm, decoded_header.tm, "decoded tm"); - is(decoded_header.sync, sync, "decoded sync"); - is(decoded_header.bodycnt, 0, "decoded bodycnt"); + for (int opt_idx = 0; opt_idx < bit_comb_count; opt_idx++) { + plan(12); + header.is_commit = opt_idx & 0x01; + header.wait_sync = opt_idx >> 1 & 0x01; + header.wait_ack = opt_idx >> 2 & 0x01; + struct iovec vec[1]; + is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); + int fixheader_len = 200; + pos = (char *)vec[0].iov_base + fixheader_len; + uint32_t exp_map_size = 5; + /* + * header.is_commit flag isn't encoded, since this row looks + * like a single-statement transaction. + */ + if (header.wait_sync || header.wait_ack) + exp_map_size += 1; + /* tsn is encoded explicitly in this case. */ + if (!header.is_commit) + exp_map_size += 1; + uint32_t size = mp_decode_map((const char **)&pos); + is(size, exp_map_size, "header map size"); + + struct xrow_header decoded_header; + const char *begin = (const char *)vec[0].iov_base; + begin += fixheader_len; + const char *end = (const char *)vec[0].iov_base; + end += vec[0].iov_len; + is(xrow_header_decode(&decoded_header, &begin, end, true), 0, + "header decode"); + is(header.is_commit, decoded_header.is_commit, "decoded is_commit"); + is(header.wait_sync, decoded_header.wait_sync, "decoded wait_sync"); + is(header.wait_ack, decoded_header.wait_ack, "decoded wait_ack"); + is(header.type, decoded_header.type, "decoded type"); + is(header.replica_id, decoded_header.replica_id, "decoded replica_id"); + is(header.lsn, decoded_header.lsn, "decoded lsn"); + is(header.tm, decoded_header.tm, "decoded tm"); + is(decoded_header.sync, sync, "decoded sync"); + is(decoded_header.bodycnt, 0, "decoded bodycnt"); + check_plan(); + } check_plan(); } @@ -275,12 +297,49 @@ test_request_str() check_plan(); } +/** + * The compiler doesn't have to preserve bitfields order, + * still we rely on it for convenience sake. + */ +static void +test_xrow_fields() +{ + plan(6); + + struct xrow_header header; + + memset(&header, 0, sizeof(header)); + + header.is_commit = true; + is(header.flags, IPROTO_FLAG_COMMIT, "header.is_commit -> COMMIT"); + header.is_commit = false; + + header.wait_sync = true; + is(header.flags, IPROTO_FLAG_WAIT_SYNC, "header.wait_sync -> WAIT_SYNC"); + header.wait_sync = false; + + header.wait_ack = true; + is(header.flags, IPROTO_FLAG_WAIT_ACK, "header.wait_ack -> WAIT_ACK"); + header.wait_ack = false; + + header.flags = IPROTO_FLAG_COMMIT; + ok(header.is_commit && !header.wait_sync && !header.wait_ack, "COMMIT -> header.is_commit"); + + header.flags = IPROTO_FLAG_WAIT_SYNC; + ok(!header.is_commit && header.wait_sync && !header.wait_ack, "WAIT_SYNC -> header.wait_sync"); + + header.flags = IPROTO_FLAG_WAIT_ACK; + ok(!header.is_commit && !header.wait_sync && header.wait_ack, "WAIT_ACK -> header.wait_ack"); + + check_plan(); +} + int main(void) { memory_init(); fiber_init(fiber_c_invoke); - plan(3); + plan(4); random_init(); @@ -288,6 +347,7 @@ main(void) test_greeting(); test_xrow_header_encode_decode(); test_request_str(); + test_xrow_fields(); random_free(); fiber_free(); diff --git a/test/unit/xrow.result b/test/unit/xrow.result index 5ee92ad7b..3b705d5ba 100644 --- a/test/unit/xrow.result +++ b/test/unit/xrow.result @@ -1,4 +1,4 @@ -1..3 +1..4 1..40 ok 1 - round trip ok 2 - roundtrip.version_id @@ -41,18 +41,129 @@ ok 39 - invalid 10 ok 40 - invalid 11 ok 1 - subtests - 1..10 + 1..9 ok 1 - bad msgpack end - ok 2 - encode - ok 3 - header map size - ok 4 - header decode - ok 5 - decoded type - ok 6 - decoded replica_id - ok 7 - decoded lsn - ok 8 - decoded tm - ok 9 - decoded sync - ok 10 - decoded bodycnt + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 2 - subtests + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 3 - subtests + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 4 - subtests + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 5 - subtests + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 6 - subtests + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 7 - subtests + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 8 - subtests + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 9 - subtests ok 2 - subtests 1..1 ok 1 - request_str ok 3 - subtests + 1..6 + ok 1 - header.is_commit -> COMMIT + ok 2 - header.wait_sync -> WAIT_SYNC + ok 3 - header.wait_ack -> WAIT_ACK + ok 4 - COMMIT -> header.is_commit + ok 5 - WAIT_SYNC -> header.wait_sync + ok 6 - WAIT_ACK -> header.wait_ack +ok 4 - subtests -- 2.24.3 (Apple Git-128)
next prev parent reply other threads:[~2021-04-16 16:26 UTC|newest] Thread overview: 57+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-04-16 16:25 [Tarantool-patches] [PATCH v4 00/12] raft: introduce manual elections and fix a bug with re-applying rolled back transactions Serge Petrenko via Tarantool-patches 2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 01/12] wal: make wal_assign_lsn accept journal entry Serge Petrenko via Tarantool-patches 2021-04-16 16:25 ` Serge Petrenko via Tarantool-patches [this message] 2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 03/12] xrow: introduce a PROMOTE entry Serge Petrenko via Tarantool-patches 2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 04/12] box: actualise iproto_key_type array Serge Petrenko via Tarantool-patches 2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 05/12] box: make clear_synchro_queue() write a PROMOTE entry instead of CONFIRM + ROLLBACK Serge Petrenko via Tarantool-patches 2021-04-16 22:12 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-18 8:24 ` Serge Petrenko via Tarantool-patches 2021-04-20 22:30 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-21 5:58 ` Serge Petrenko via Tarantool-patches 2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 06/12] box: write PROMOTE even for empty limbo Serge Petrenko via Tarantool-patches 2021-04-19 13:39 ` Serge Petrenko via Tarantool-patches 2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 07/12] raft: filter rows based on known peer terms Serge Petrenko via Tarantool-patches 2021-04-16 22:21 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-18 8:49 ` Serge Petrenko via Tarantool-patches 2021-04-18 15:44 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-19 9:31 ` Serge Petrenko via Tarantool-patches 2021-04-18 16:27 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-19 9:30 ` Serge Petrenko via Tarantool-patches 2021-04-20 20:29 ` Serge Petrenko via Tarantool-patches 2021-04-20 20:31 ` Serge Petrenko via Tarantool-patches 2021-04-20 20:55 ` Serge Petrenko via Tarantool-patches 2021-04-20 22:30 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-21 5:58 ` Serge Petrenko via Tarantool-patches 2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 08/12] election: introduce a new election mode: "manual" Serge Petrenko via Tarantool-patches 2021-04-19 22:34 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-20 9:25 ` Serge Petrenko via Tarantool-patches 2021-04-20 17:37 ` Serge Petrenko via Tarantool-patches 2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 09/12] raft: introduce raft_start/stop_candidate Serge Petrenko via Tarantool-patches 2021-04-16 22:23 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-18 8:59 ` Serge Petrenko via Tarantool-patches 2021-04-19 22:35 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-20 9:28 ` Serge Petrenko via Tarantool-patches 2021-04-19 12:52 ` Serge Petrenko via Tarantool-patches 2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 10/12] election: support manual elections in clear_synchro_queue() Serge Petrenko via Tarantool-patches 2021-04-16 22:24 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-18 9:26 ` Serge Petrenko via Tarantool-patches 2021-04-18 16:07 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-19 9:32 ` Serge Petrenko via Tarantool-patches 2021-04-19 12:47 ` Serge Petrenko via Tarantool-patches 2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 11/12] box: remove parameter from clear_synchro_queue Serge Petrenko via Tarantool-patches 2021-04-16 16:25 ` [Tarantool-patches] [PATCH v4 12/12] box.ctl: rename clear_synchro_queue to promote Serge Petrenko via Tarantool-patches 2021-04-19 22:35 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-20 10:22 ` Serge Petrenko via Tarantool-patches 2021-04-18 12:00 ` [Tarantool-patches] [PATCH v4 13/12] replication: send accumulated Raft messages after relay start Serge Petrenko via Tarantool-patches 2021-04-18 16:03 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-19 12:11 ` Serge Petrenko via Tarantool-patches 2021-04-19 22:36 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-20 10:38 ` Serge Petrenko via Tarantool-patches 2021-04-20 22:31 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-21 5:59 ` Serge Petrenko via Tarantool-patches 2021-04-19 22:37 ` [Tarantool-patches] [PATCH v4 00/12] raft: introduce manual elections and fix a bug with re-applying rolled back transactions Vladislav Shpilevoy via Tarantool-patches 2021-04-20 17:38 ` [Tarantool-patches] [PATCH v4 14/12] txn: make NOPs fully asynchronous Serge Petrenko via Tarantool-patches 2021-04-20 22:31 ` Vladislav Shpilevoy via Tarantool-patches 2021-04-21 5:59 ` Serge Petrenko via Tarantool-patches 2021-04-20 22:30 ` [Tarantool-patches] [PATCH v4 00/12] raft: introduce manual elections and fix a bug with re-applying rolled back transactions Vladislav Shpilevoy via Tarantool-patches 2021-04-21 6:01 ` Serge Petrenko via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=501dee051380aa635ecf8cd5a0f844741e121f0c.1618590211.git.sergepetrenko@tarantool.org \ --to=tarantool-patches@dev.tarantool.org \ --cc=gorcunov@gmail.com \ --cc=sergepetrenko@tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v4 02/12] xrow: enrich row'\''s meta information with sync replication flags' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox