From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 5B58A6EC5B; Mon, 12 Apr 2021 22:40:56 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 5B58A6EC5B DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1618256456; bh=bhHTEZUJGs3xI582ly5Y3z6NAji1l3fZjLGEonvgcfg=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To:Cc: From; b=F3+UfQnIgUo9u9cN+9G4gtoHrSoFL4Z+fZUGJwNRP5tSMfIccfSz9Q0tv6S4AlgoB amYX83+AgXnORQ6VEd4ky2Wy17asLWKrvepPZQKbcc1zFdksN8qD/x6zzwy46SHO0c TPn7Nw+df+IMsc8fzPJhMc9ZZaYcZccyAwIDO+gM= Received: from smtp53.i.mail.ru (smtp53.i.mail.ru [94.100.177.113]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 1C5D66EC5B for ; Mon, 12 Apr 2021 22:40:25 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 1C5D66EC5B Received: by smtp53.i.mail.ru with esmtpa (envelope-from ) id 1lW2Q4-0008Az-Bu; Mon, 12 Apr 2021 22:40:24 +0300 To: v.shpilevoy@tarantool.org, gorcunov@gmail.com Date: Mon, 12 Apr 2021 22:40:14 +0300 Message-Id: <0d29b025b7213b5c9596715d849b4b371e1fef32.1618256019.git.sergepetrenko@tarantool.org> X-Mailer: git-send-email 2.24.3 (Apple Git-128) In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD92FFCB8E6708E748094FADAEB10E66ADA4C48BE3C291E66DA182A05F5380850401AD424C2147AD6FFFCF4EE9463F375CB1DE37DC9E356E5BA7E53A41DE92DB212 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE714773D61402E8DE9EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F790063714A07403F1CB524B8638F802B75D45FF914D58D5BE9E6BC1A93B80C6DEB9DEE97C6FB206A91F05B2BA6B95F5E3342F40F8E430BA4E1EEA5B3999716826408816D2E47CDBA5A96583C09775C1D3CA48CFED8438A78DFE0A9E117882F4460429724CE54428C33FAD30A8DF7F3B2552694AC26CFBAC0749D213D2E47CDBA5A9658378DA827A17800CE7328B01A8D746D8839FA2833FD35BB23DF004C90652538430302FCEF25BFAB3454AD6D5ED66289B5278DA827A17800CE7AF4529DB28D876D6D32BA5DBAC0009BE395957E7521B51C20BC6067A898B09E4090A508E0FED62991661749BA6B97735C19A449F2BED0899CD04E86FAF290E2D7E9C4E3C761E06A71DD303D21008E298D5E8D9A59859A8B6B372FE9A2E580EFC725E5C173C3A84C382C525EC1B8D9DED35872C767BF85DA2F004C90652538430E4A6367B16DE6309 X-B7AD71C0: AC4F5C86D027EB782CDD5689AFBDA7A2368A440D3B0F6089093C9A16E5BC824A2A04A2ABAA09D25379311020FFC8D4ADF048BB164F7B76AF2AE6B0F10E2AB739 X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8183A4AFAF3EA6BDC44C234C8B12C006B7AE4F16467A2949FE916314281A398FF3F88A4BD4B7634F4D8B1881A6453793CE9C32612AADDFBE061C61BE10805914D3804EBA3D8E7E5B87ABF8C51168CD8EBDB63AF70AF8205D7DCDC48ACC2A39D04F89CDFB48F4795C241BDAD6C7F3747799A X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D340297C696F996E384464535A2EDBE62EAC5C3B2438D4A41CCD37F244DA0C36E1B3FC5FB7F225F3ED11D7E09C32AA3244CA6F1C3F8BEA38519628936A5DF5A96BA8A6D4CC6FBFAC251927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojq8JA+pXcDunvPi/66LvxHg== X-Mailru-Sender: 583F1D7ACE8F49BDD2846D59FC20E9F8AFCB052CDF231E7B48B5D72CAF2FEB59FDF93A8370D74BA0424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BC112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: [Tarantool-patches] [PATCH v2 1/9] wal: enrich row's meta information with sync replication flags X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Cc: tarantool-patches@dev.tarantool.org Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Introduce two new flags to xrow_header: `wait_ack` and `wait_sync`. These flags are set for rows belonging to synchronous transactions in addition to `is_commit`. The new flags help to define whether the rows belong to a synchronous transaction or not without parsing them all and checking whether any of the rows touches a synchronous space. This will be used in applier once it is taught to filter synchronous transactions based on whether they are coming from a raft leader or not. P.S. These flags will also be useful once we allow to turn any transaction synchronous. Once this is done, the flags in row header will be the only source of information on whether the transaction is synchronous or not. Prerequisite #5445 @TarantoolBot document Title: new values for IPROTO_FLAGS field IPROTO_FLAGS bitfield is enriched with two new constant: IPROTO_FLAG_WAIT_SYNC = 0x02 IPROTO_FLAG_WAIT_ACK = 0x04 IPROTO_FLAG_WAIT_SYNC is set for the last synchronous transaction row. IPROTO_FLAG_WAIT_ACK is set for the last row of a transaction which is not applied because other synchronous transactions are in fly. --- src/box/iproto_constants.h | 5 ++ src/box/journal.h | 2 + src/box/txn.c | 5 ++ src/box/wal.c | 26 ++++---- src/box/xrow.c | 13 ++-- src/box/xrow.h | 30 ++++++--- test/unit/xrow.cc | 64 ++++++++++++------- test/unit/xrow.result | 123 ++++++++++++++++++++++++++++++++++--- 8 files changed, 213 insertions(+), 55 deletions(-) diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index d3738c705..f7f46088f 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -49,9 +49,14 @@ enum { XLOG_FIXHEADER_SIZE = 19 }; +/** IPROTO_FLAGS bitfield constants. */ enum { /** Set for the last xrow in a transaction. */ IPROTO_FLAG_COMMIT = 0x01, + /** Set for the last row of a tx residing in limbo. */ + IPROTO_FLAG_WAIT_SYNC = 0x02, + /** Set for the last row of a synchronous tx. */ + IPROTO_FLAG_WAIT_ACK = 0x04, }; enum iproto_key { diff --git a/src/box/journal.h b/src/box/journal.h index 76c70c19f..9ab8af2c1 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -63,6 +63,7 @@ struct journal_entry { * A journal entry completion callback argument. */ void *complete_data; + uint8_t opt_flags; /** * Asynchronous write completion function. */ @@ -97,6 +98,7 @@ journal_entry_create(struct journal_entry *entry, size_t n_rows, entry->approx_len = approx_len; entry->n_rows = n_rows; entry->res = -1; + entry->opt_flags = 0; } /** diff --git a/src/box/txn.c b/src/box/txn.c index 40061ff09..d65315f58 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -76,6 +76,7 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request) row->lsn = 0; row->sync = 0; row->tm = 0; + row->opt_flags = 0; } /* * Group ID should be set both for requests not having a @@ -681,6 +682,10 @@ txn_journal_entry_new(struct txn *txn) --req->n_rows; } + req->opt_flags |= + (txn_has_flag(txn, TXN_WAIT_SYNC) ? IPROTO_FLAG_WAIT_SYNC : 0) | + (txn_has_flag(txn, TXN_WAIT_ACK) ? IPROTO_FLAG_WAIT_ACK : 0); + return req; } diff --git a/src/box/wal.c b/src/box/wal.c index 34af0bda6..00fcb21b4 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -962,14 +962,14 @@ out: */ static void wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, - struct xrow_header **row, - struct xrow_header **end) + struct journal_entry *entry) { int64_t tsn = 0; - struct xrow_header **start = row; - struct xrow_header **first_glob_row = row; + struct xrow_header **start = entry->rows; + struct xrow_header **end = entry->rows + entry->n_rows; + struct xrow_header **first_glob_row = start; /** Assign LSN to all local rows. */ - for ( ; row < end; row++) { + for (struct xrow_header **row = start; row < end; row++) { if ((*row)->replica_id == 0) { /* * All rows representing local space data @@ -996,7 +996,13 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, first_glob_row = row; } (*row)->tsn = tsn == 0 ? (*start)->lsn : tsn; - (*row)->is_commit = row == end - 1; + if (row < end - 1) + continue; + /* Tx meta is stored in the last tx row. */ + if (row == end - 1) { + (*row)->opt_flags = entry->opt_flags; + (*row)->is_commit = true; + } } else { int64_t diff = (*row)->lsn - vclock_get(base, (*row)->replica_id); if (diff <= vclock_get(vclock_diff, @@ -1020,7 +1026,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, * the first global row. tsn was yet unknown when those * rows were processed. */ - for (row = start; row < first_glob_row; row++) + for (struct xrow_header **row = start; row < first_glob_row; row++) (*row)->tsn = tsn; } @@ -1098,8 +1104,7 @@ wal_write_to_disk(struct cmsg *msg) struct journal_entry *entry; struct stailq_entry *last_committed = NULL; stailq_foreach_entry(entry, &wal_msg->commit, fifo) { - wal_assign_lsn(&vclock_diff, &writer->vclock, - entry->rows, entry->rows + entry->n_rows); + wal_assign_lsn(&vclock_diff, &writer->vclock, entry); entry->res = vclock_sum(&vclock_diff) + vclock_sum(&writer->vclock); rc = xlog_write_entry(l, entry); @@ -1319,8 +1324,7 @@ wal_write_none_async(struct journal *journal, struct vclock vclock_diff; vclock_create(&vclock_diff); - wal_assign_lsn(&vclock_diff, &writer->vclock, entry->rows, - entry->rows + entry->n_rows); + wal_assign_lsn(&vclock_diff, &writer->vclock, entry); vclock_merge(&writer->vclock, &vclock_diff); vclock_copy(&replicaset.vclock, &writer->vclock); entry->res = vclock_sum(&writer->vclock); diff --git a/src/box/xrow.c b/src/box/xrow.c index bc06738ad..cc8e43ed4 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -183,7 +183,7 @@ error: break; case IPROTO_FLAGS: flags = mp_decode_uint(pos); - header->is_commit = flags & IPROTO_FLAG_COMMIT; + header->opt_flags = flags; break; default: /* unknown header */ @@ -299,6 +299,7 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync, * flag to find transaction boundary (last row in the * transaction stream). */ + uint8_t flags_to_encode = header->opt_flags & ~IPROTO_FLAG_COMMIT; if (header->tsn != 0) { if (header->tsn != header->lsn || !header->is_commit) { /* @@ -314,12 +315,14 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync, map_size++; } if (header->is_commit && header->tsn != header->lsn) { - /* Setup last row for multi row transaction. */ - d = mp_encode_uint(d, IPROTO_FLAGS); - d = mp_encode_uint(d, IPROTO_FLAG_COMMIT); - map_size++; + flags_to_encode |= IPROTO_FLAG_COMMIT; } } + if (flags_to_encode != 0) { + d = mp_encode_uint(d, IPROTO_FLAGS); + d = mp_encode_uint(d, flags_to_encode); + map_size++; + } assert(d <= data + XROW_HEADER_LEN_MAX); mp_encode_map(data, map_size); out->iov_len = d - (char *) out->iov_base; diff --git a/src/box/xrow.h b/src/box/xrow.h index fde8f9474..2a18733c0 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -80,14 +80,28 @@ struct xrow_header { * transaction. */ int64_t tsn; - /** - * True for the last row in a multi-statement transaction, - * or single-statement transaction. Is only encoded in the - * write ahead log for multi-statement transactions. - * Single-statement transactions do not encode - * tsn and is_commit flag to save space. - */ - bool is_commit; + /** Transaction meta flags set only in the last transaction row. */ + union { + uint8_t opt_flags; + struct { + /** + * Is only encoded in the write ahead log for + * multi-statement transactions. Single-statement + * transactions do not encode tsn and is_commit flag to + * save space. + */ + bool is_commit : 1; + /** + * True for a synchronous transaction. + */ + bool wait_sync : 1; + /** + * True for any transaction that would enter the limbo + * (not necessarily a synchronous one). + */ + bool wait_ack : 1; + }; + }; int bodycnt; uint32_t schema_version; diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc index 9fd154719..ea1ee1767 100644 --- a/test/unit/xrow.cc +++ b/test/unit/xrow.cc @@ -204,7 +204,9 @@ test_greeting() void test_xrow_header_encode_decode() { - plan(10); + /* Test all possible 3-bit combinations. */ + const int bit_comb_count = 1 << 3; + plan(1 + bit_comb_count); struct xrow_header header; char buffer[2048]; char *pos = mp_encode_uint(buffer, 300); @@ -217,27 +219,47 @@ test_xrow_header_encode_decode() header.tm = 123.456; header.bodycnt = 0; header.tsn = header.lsn; - header.is_commit = true; uint64_t sync = 100500; - struct iovec vec[1]; - is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); - int fixheader_len = 200; - pos = (char *)vec[0].iov_base + fixheader_len; - is(mp_decode_map((const char **)&pos), 5, "header map size"); - - struct xrow_header decoded_header; - const char *begin = (const char *)vec[0].iov_base; - begin += fixheader_len; - const char *end = (const char *)vec[0].iov_base; - end += vec[0].iov_len; - is(xrow_header_decode(&decoded_header, &begin, end, true), 0, - "header decode"); - is(header.type, decoded_header.type, "decoded type"); - is(header.replica_id, decoded_header.replica_id, "decoded replica_id"); - is(header.lsn, decoded_header.lsn, "decoded lsn"); - is(header.tm, decoded_header.tm, "decoded tm"); - is(decoded_header.sync, sync, "decoded sync"); - is(decoded_header.bodycnt, 0, "decoded bodycnt"); + for (int opt_idx = 0; opt_idx < bit_comb_count; opt_idx++) { + plan(12); + header.is_commit = opt_idx & 0x01; + header.wait_sync = opt_idx >> 1 & 0x01; + header.wait_ack = opt_idx >> 2 & 0x01; + struct iovec vec[1]; + is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); + int fixheader_len = 200; + pos = (char *)vec[0].iov_base + fixheader_len; + uint32_t exp_map_size = 5; + /* + * header.is_commit flag isn't encoded, since this row looks + * like a single-statement transaction. + */ + if (header.wait_sync || header.wait_ack) + exp_map_size += 1; + /* tsn is encoded explicitly in this case. */ + if (!header.is_commit) + exp_map_size += 1; + uint32_t size = mp_decode_map((const char **)&pos); + is(size, exp_map_size, "header map size"); + + struct xrow_header decoded_header; + const char *begin = (const char *)vec[0].iov_base; + begin += fixheader_len; + const char *end = (const char *)vec[0].iov_base; + end += vec[0].iov_len; + is(xrow_header_decode(&decoded_header, &begin, end, true), 0, + "header decode"); + is(header.is_commit, decoded_header.is_commit, "decoded is_commit"); + is(header.wait_sync, decoded_header.wait_sync, "decoded wait_sync"); + is(header.wait_ack, decoded_header.wait_ack, "decoded wait_ack"); + is(header.type, decoded_header.type, "decoded type"); + is(header.replica_id, decoded_header.replica_id, "decoded replica_id"); + is(header.lsn, decoded_header.lsn, "decoded lsn"); + is(header.tm, decoded_header.tm, "decoded tm"); + is(decoded_header.sync, sync, "decoded sync"); + is(decoded_header.bodycnt, 0, "decoded bodycnt"); + check_plan(); + } check_plan(); } diff --git a/test/unit/xrow.result b/test/unit/xrow.result index 5ee92ad7b..e06ba5261 100644 --- a/test/unit/xrow.result +++ b/test/unit/xrow.result @@ -41,17 +41,120 @@ ok 39 - invalid 10 ok 40 - invalid 11 ok 1 - subtests - 1..10 + 1..9 ok 1 - bad msgpack end - ok 2 - encode - ok 3 - header map size - ok 4 - header decode - ok 5 - decoded type - ok 6 - decoded replica_id - ok 7 - decoded lsn - ok 8 - decoded tm - ok 9 - decoded sync - ok 10 - decoded bodycnt + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 2 - subtests + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 3 - subtests + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 4 - subtests + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 5 - subtests + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 6 - subtests + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 7 - subtests + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 8 - subtests + 1..12 + ok 1 - encode + ok 2 - header map size + ok 3 - header decode + ok 4 - decoded is_commit + ok 5 - decoded wait_sync + ok 6 - decoded wait_ack + ok 7 - decoded type + ok 8 - decoded replica_id + ok 9 - decoded lsn + ok 10 - decoded tm + ok 11 - decoded sync + ok 12 - decoded bodycnt + ok 9 - subtests ok 2 - subtests 1..1 ok 1 - request_str -- 2.24.3 (Apple Git-128)