From: Georgy Kirichenko <georgy@tarantool.org> To: tarantool-patches@freelists.org Cc: Georgy Kirichenko <georgy@tarantool.org> Subject: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries Date: Tue, 22 Jan 2019 13:57:36 +0300 [thread overview] Message-ID: <7b72afb3f1a3479f1239fcbed936cc151b55f23c.1548153546.git.georgy@tarantool.org> (raw) In-Reply-To: <cover.1548153546.git.georgy@tarantool.org> Append txn_id, txn_replica_id and txn_last to xrow_header structure. txn_replica_id identifies replica where transaction was started and txn_id identifies transaction id on that replica. As transaction id a lsn of the first row in this transaction is used. txn_last set to true if it is the last row in a transaction, so we could commit transaction with last row or use additional NOP requests with txn_last = true ans valid txn_id and txn_replica_id. For replication all local changes moved to xrows array tail to form a separate transaction (like autonomous transaction) because it is not possible to replicate such transaction back to it's creator. As encoding/deconding rules assumed: 1. txn_replica_id is encoded only if it is not equal with replica id. This might have point because of replication trigger 2. txn_id and txn_last are encoded only for multi-row transaction. So if we do not have txn_id in a xstream then this means that it is a single row transaction. This rules provides compatibility with previous xlog handling. Needed for: 2798 --- src/box/iproto_constants.h | 3 ++ src/box/wal.c | 36 +++++++++++++++- src/box/xrow.c | 38 +++++++++++++++++ src/box/xrow.h | 5 ++- test/unit/xrow.cc | 3 ++ test/vinyl/errinj_stat.result | 8 ++-- test/vinyl/layout.result | 24 +++++------ test/vinyl/stat.result | 78 +++++++++++++++++------------------ 8 files changed, 138 insertions(+), 57 deletions(-) diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 728514297..d01cdf840 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -60,6 +60,9 @@ enum iproto_key { IPROTO_SCHEMA_VERSION = 0x05, IPROTO_SERVER_VERSION = 0x06, IPROTO_GROUP_ID = 0x07, + IPROTO_TXN_ID = 0x08, + IPROTO_TXN_REPLICA_ID = 0x09, + IPROTO_TXN_LAST = 0x0a, /* Leave a gap for other keys in the header. */ IPROTO_SPACE_ID = 0x10, IPROTO_INDEX_ID = 0x11, diff --git a/src/box/wal.c b/src/box/wal.c index 4c3537672..17ead08e7 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer) } static void -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin, struct xrow_header **end) { + struct xrow_header **row = begin; /** Assign LSN to all local rows. */ for ( ; row < end; row++) { if ((*row)->replica_id == 0) { @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, vclock_follow_xrow(vclock, *row); } } + if ((*begin)->replica_id != instance_id) { + /* + * Move all local changes to the end of rows array and + * a fake local transaction (like an autonomous transaction) + * because we could not replicate the transaction back. + */ + struct xrow_header **row = end - 1; + while (row >= begin) { + if (row[0]->replica_id != instance_id) { + --row; + continue; + } + /* Local row, move it back. */ + struct xrow_header **local_row = row; + while (local_row < end - 1 && + local_row[1]->replica_id != instance_id) { + struct xrow_header *tmp = local_row[0]; + local_row[0] = local_row[1]; + local_row[1] = tmp; + } + --row; + } + while (begin < end && begin[0]->replica_id != instance_id) + ++begin; + } + /* Setup txn_id and tnx_replica_id for localy generated rows. */ + row = begin; + while (row < end) { + row[0]->txn_id = begin[0]->lsn; + row[0]->txn_replica_id = instance_id; + row[0]->txn_last = row == end - 1 ? 1 : 0; + ++row; + } } static void diff --git a/src/box/xrow.c b/src/box/xrow.c index ef3f81add..db524b3c8 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -133,12 +133,32 @@ error: case IPROTO_SCHEMA_VERSION: header->schema_version = mp_decode_uint(pos); break; + case IPROTO_TXN_ID: + header->txn_id = mp_decode_uint(pos); + break; + case IPROTO_TXN_REPLICA_ID: + header->txn_replica_id = mp_decode_uint(pos); + break; + case IPROTO_TXN_LAST: + header->txn_last = mp_decode_uint(pos); + break; default: /* unknown header */ mp_next(pos); } } assert(*pos <= end); + if (header->txn_id == 0) { + /* + * Transaction id is not set so it is a single statement + * transaction. + */ + header->txn_id = header->lsn; + header->txn_last = true; + } + if (header->txn_replica_id == 0) + header->txn_replica_id = header->replica_id; + /* Nop requests aren't supposed to have a body. */ if (*pos < end && header->type != IPROTO_NOP) { const char *body = *pos; @@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync, d = mp_encode_double(d, header->tm); map_size++; } + if (header->txn_id != header->lsn || header->txn_last == 0) { + /* Encode txn id for multi row transaction members. */ + d = mp_encode_uint(d, IPROTO_TXN_ID); + d = mp_encode_uint(d, header->txn_id); + map_size++; + } + if (header->txn_replica_id != header->replica_id) { + d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID); + d = mp_encode_uint(d, header->txn_replica_id); + map_size++; + } + if (header->txn_last && !(header->txn_id == header->lsn && + header->txn_replica_id == header->replica_id)) { + /* Set last row for multi row transaction. */ + d = mp_encode_uint(d, IPROTO_TXN_LAST); + d = mp_encode_uint(d, header->txn_last); + map_size++; + } assert(d <= data + XROW_HEADER_LEN_MAX); mp_encode_map(data, map_size); out->iov_len = d - (char *) out->iov_base; diff --git a/src/box/xrow.h b/src/box/xrow.h index 6bab0a1fd..4acd84d56 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -47,7 +47,7 @@ enum { XROW_HEADER_IOVMAX = 1, XROW_BODY_IOVMAX = 2, XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, - XROW_HEADER_LEN_MAX = 40, + XROW_HEADER_LEN_MAX = 60, XROW_BODY_LEN_MAX = 128, IPROTO_HEADER_LEN = 28, /** 7 = sizeof(iproto_body_bin). */ @@ -69,6 +69,9 @@ struct xrow_header { uint64_t sync; int64_t lsn; /* LSN must be signed for correct comparison */ double tm; + int64_t txn_id; + uint32_t txn_replica_id; + uint32_t txn_last; int bodycnt; uint32_t schema_version; diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc index 165a543cf..0b796d728 100644 --- a/test/unit/xrow.cc +++ b/test/unit/xrow.cc @@ -215,6 +215,9 @@ test_xrow_header_encode_decode() header.lsn = 400; header.tm = 123.456; header.bodycnt = 0; + header.txn_id = header.lsn; + header.txn_replica_id = header.replica_id; + header.txn_last = true; uint64_t sync = 100500; struct iovec vec[1]; is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); diff --git a/test/vinyl/errinj_stat.result b/test/vinyl/errinj_stat.result index 08801dbc6..361ddf5db 100644 --- a/test/vinyl/errinj_stat.result +++ b/test/vinyl/errinj_stat.result @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements - bytes_compressed: <bytes_compressed> pages: 3 rows: 30 - bytes: 411 + bytes: 477 ... i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue --- @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements - bytes_compressed: <bytes_compressed> pages: 4 rows: 40 - bytes: 548 + bytes: 636 ... i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue --- @@ -97,7 +97,7 @@ i:stat().disk.compaction.queue -- 50 statements - bytes_compressed: <bytes_compressed> pages: 5 rows: 50 - bytes: 685 + bytes: 795 ... i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue --- @@ -111,7 +111,7 @@ i:stat().disk.compaction.queue -- 50 statements - bytes_compressed: <bytes_compressed> pages: 5 rows: 50 - bytes: 685 + bytes: 795 ... i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue --- diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result index 14201c5dd..a6b577dbc 100644 --- a/test/vinyl/layout.result +++ b/test/vinyl/layout.result @@ -253,8 +253,8 @@ result BODY: row_index_offset: <offset> offset: <offset> - size: 108 - unpacked_size: 89 + size: 118 + unpacked_size: 99 row_count: 4 min_key: ['ёёё'] - - 00000000000000000008.run @@ -281,7 +281,7 @@ result - HEADER: type: ROWINDEX BODY: - row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00" + row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06" - - 00000000000000000012.index - - HEADER: type: RUNINFO @@ -298,8 +298,8 @@ result BODY: row_index_offset: <offset> offset: <offset> - size: 102 - unpacked_size: 83 + size: 110 + unpacked_size: 91 row_count: 3 min_key: ['ёёё'] - - 00000000000000000012.run @@ -324,7 +324,7 @@ result - HEADER: type: ROWINDEX BODY: - row_index: "\0\0\0\0\0\0\0\x14\0\0\0*" + row_index: "\0\0\0\0\0\0\0\x16\0\0\0." - - 00000000000000000006.index - - HEADER: type: RUNINFO @@ -341,8 +341,8 @@ result BODY: row_index_offset: <offset> offset: <offset> - size: 108 - unpacked_size: 89 + size: 118 + unpacked_size: 99 row_count: 4 min_key: [null, 'ёёё'] - - 00000000000000000006.run @@ -369,7 +369,7 @@ result - HEADER: type: ROWINDEX BODY: - row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00" + row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06" - - 00000000000000000010.index - - HEADER: type: RUNINFO @@ -386,8 +386,8 @@ result BODY: row_index_offset: <offset> offset: <offset> - size: 90 - unpacked_size: 71 + size: 98 + unpacked_size: 79 row_count: 3 min_key: [123, 'ёёё'] - - 00000000000000000010.run @@ -409,7 +409,7 @@ result - HEADER: type: ROWINDEX BODY: - row_index: "\0\0\0\0\0\0\0\x10\0\0\0\"" + row_index: "\0\0\0\0\0\0\0\x12\0\0\0&" ... test_run:cmd("clear filter") --- diff --git a/test/vinyl/stat.result b/test/vinyl/stat.result index 419d3e6c2..33b604b9f 100644 --- a/test/vinyl/stat.result +++ b/test/vinyl/stat.result @@ -299,7 +299,7 @@ stat_diff(istat(), st) run_count: 1 disk: last_level: - bytes: 26049 + bytes: 26113 pages: 7 bytes_compressed: <bytes_compressed> rows: 25 @@ -312,16 +312,16 @@ stat_diff(istat(), st) bytes: 26525 count: 1 output: - bytes: 26049 + bytes: 26113 pages: 7 bytes_compressed: <bytes_compressed> rows: 25 - bytes: 26049 + bytes: 26113 index_size: 294 pages: 7 bytes_compressed: <bytes_compressed> bloom_size: 70 - bytes: 26049 + bytes: 26113 put: rows: 25 bytes: 26525 @@ -344,7 +344,7 @@ stat_diff(istat(), st) --- - disk: last_level: - bytes: 26042 + bytes: 26104 pages: 6 bytes_compressed: <bytes_compressed> rows: 25 @@ -357,23 +357,23 @@ stat_diff(istat(), st) bytes: 53050 count: 1 output: - bytes: 52091 + bytes: 52217 pages: 13 bytes_compressed: <bytes_compressed> rows: 50 - bytes: 26042 + bytes: 26104 index_size: 252 pages: 6 bytes_compressed: <bytes_compressed> compaction: input: - bytes: 78140 + bytes: 78330 pages: 20 bytes_compressed: <bytes_compressed> rows: 75 count: 1 output: - bytes: 52091 + bytes: 52217 pages: 13 bytes_compressed: <bytes_compressed> rows: 50 @@ -381,7 +381,7 @@ stat_diff(istat(), st) rows: 50 bytes: 53050 rows: 25 - bytes: 26042 + bytes: 26104 ... -- point lookup from disk + cache put st = istat() @@ -405,7 +405,7 @@ stat_diff(istat(), st) disk: iterator: read: - bytes: 4167 + bytes: 4177 pages: 1 bytes_compressed: <bytes_compressed> rows: 4 @@ -655,7 +655,7 @@ stat_diff(istat(), st) disk: iterator: read: - bytes: 104300 + bytes: 104550 pages: 25 bytes_compressed: <bytes_compressed> rows: 100 @@ -1000,7 +1000,7 @@ istat() --- - rows: 306 run_avg: 1 - bytes: 317731 + bytes: 317981 upsert: squashed: 0 applied: 0 @@ -1032,7 +1032,7 @@ istat() bytes_compressed: <bytes_compressed> pages: 25 rows: 100 - bytes: 104300 + bytes: 104550 rows: 100 statement: inserts: 0 @@ -1083,7 +1083,7 @@ istat() count: 0 pages: 25 bytes_compressed: <bytes_compressed> - bytes: 104300 + bytes: 104550 txw: bytes: 0 rows: 0 @@ -1123,8 +1123,8 @@ gstat() page_index: 1050 bloom_filter: 140 disk: - data_compacted: 104300 - data: 104300 + data_compacted: 104550 + data: 104550 index: 1190 scheduler: tasks_inprogress: 0 @@ -1163,7 +1163,7 @@ box.snapshot() stat_diff(gstat(), st, 'scheduler') --- - dump_input: 104200 - dump_output: 103592 + dump_output: 104018 tasks_completed: 2 dump_count: 1 ... @@ -1180,7 +1180,7 @@ box.snapshot() stat_diff(gstat(), st, 'scheduler') --- - dump_input: 10420 - dump_output: 10371 + dump_output: 10417 tasks_completed: 2 dump_count: 1 ... @@ -1195,9 +1195,9 @@ while i1:stat().disk.compaction.count == 0 do fiber.sleep(0.01) end ... stat_diff(gstat(), st, 'scheduler') --- -- compaction_input: 112188 +- compaction_input: 112436 tasks_completed: 1 - compaction_output: 101984 + compaction_output: 102208 ... st = gstat() --- @@ -1210,9 +1210,9 @@ while i2:stat().disk.compaction.count == 0 do fiber.sleep(0.01) end ... stat_diff(gstat(), st, 'scheduler') --- -- compaction_input: 1775 +- compaction_input: 1999 tasks_completed: 1 - compaction_output: 1608 + compaction_output: 1810 ... s:drop() --- @@ -1324,7 +1324,7 @@ st2 = i2:stat() ... s:bsize() --- -- 52199 +- 52313 ... i1:len(), i2:len() --- @@ -1334,7 +1334,7 @@ i1:len(), i2:len() i1:bsize(), i2:bsize() --- - 364 -- 920 +- 1022 ... s:bsize() == st1.disk.bytes --- @@ -1386,7 +1386,7 @@ st2 = i2:stat() ... s:bsize() --- -- 107449 +- 107563 ... i1:len(), i2:len() --- @@ -1396,7 +1396,7 @@ i1:len(), i2:len() i1:bsize(), i2:bsize() --- - 49516 -- 50072 +- 50174 ... s:bsize() == st1.memory.bytes + st1.disk.bytes --- @@ -1451,7 +1451,7 @@ st2 = i2:stat() ... s:bsize() --- -- 52199 +- 52313 ... i1:len(), i2:len() --- @@ -1461,7 +1461,7 @@ i1:len(), i2:len() i1:bsize(), i2:bsize() --- - 364 -- 920 +- 1022 ... s:bsize() == st1.disk.bytes --- @@ -1653,18 +1653,18 @@ i1:stat().disk.last_level - bytes_compressed: <bytes_compressed> pages: 2 rows: 100 - bytes: 11815 + bytes: 12019 ... i2:stat().disk.last_level --- - bytes_compressed: <bytes_compressed> pages: 1 rows: 100 - bytes: 1608 + bytes: 1810 ... box.stat.vinyl().disk.data_compacted --- -- 11815 +- 12019 ... for i = 1, 100, 10 do s:replace{i, i * 1000, digest.urandom(100)} end --- @@ -1678,18 +1678,18 @@ i1:stat().disk.last_level - bytes_compressed: <bytes_compressed> pages: 2 rows: 100 - bytes: 11815 + bytes: 12019 ... i2:stat().disk.last_level --- - bytes_compressed: <bytes_compressed> pages: 1 rows: 100 - bytes: 1608 + bytes: 1810 ... box.stat.vinyl().disk.data_compacted --- -- 11815 +- 12019 ... i1:compact() --- @@ -1702,11 +1702,11 @@ i1:stat().disk.last_level - bytes_compressed: <bytes_compressed> pages: 2 rows: 100 - bytes: 11841 + bytes: 12045 ... box.stat.vinyl().disk.data_compacted --- -- 11841 +- 12045 ... i2:compact() --- @@ -1719,11 +1719,11 @@ i2:stat().disk.last_level - bytes_compressed: <bytes_compressed> pages: 1 rows: 110 - bytes: 1794 + bytes: 2016 ... box.stat.vinyl().disk.data_compacted --- -- 11841 +- 12045 ... s:drop() --- -- 2.20.1
next prev parent reply other threads:[~2019-01-22 10:55 UTC|newest] Thread overview: 15+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-01-22 10:57 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko 2019-01-22 10:57 ` Georgy Kirichenko [this message] 2019-01-28 12:58 ` [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries Vladimir Davydov 2019-01-29 10:09 ` Георгий Кириченко 2019-01-29 11:00 ` Vladimir Davydov 2019-01-31 7:34 ` Георгий Кириченко 2019-01-31 8:19 ` Vladimir Davydov 2019-01-31 14:25 ` Георгий Кириченко 2019-01-31 14:54 ` Vladimir Davydov 2019-02-01 9:31 ` Георгий Кириченко 2019-01-28 13:00 ` Vladimir Davydov 2019-01-28 13:08 ` [tarantool-patches] " Vladislav Shpilevoy 2019-02-08 16:56 ` Konstantin Osipov 2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko 2019-01-28 13:35 ` Vladimir Davydov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=7b72afb3f1a3479f1239fcbed936cc151b55f23c.1548153546.git.georgy@tarantool.org \ --to=georgy@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox