* [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol @ 2019-01-22 10:57 Georgy Kirichenko 2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries Georgy Kirichenko 2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko 0 siblings, 2 replies; 16+ messages in thread From: Georgy Kirichenko @ 2019-01-22 10:57 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko This patchset introduces transactional replication and consist of two commits: * the first one forms transaction boundaries in a xstream * the second one forms transactions in applier buffers and then applies them with correct begin/commit boundaries. Note: this pathchset based on g.kirichenko/gh-980-disable-lsn-gaps Note: distributed transaction are not supported so journal forms a separate transaction for all local triggers effects. Changes in v2: - Rebased against latest 2.1 - Fixed local transaction extraction Issue: https://github.com/tarantool/tarantool/issues/2798 Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-2798-transaction-boundaries Georgy Kirichenko (2): Journal transaction boundaries Transaction support for applier src/box/applier.cc | 207 ++++++++++++++++++++++++---------- src/box/iproto_constants.h | 3 + src/box/wal.c | 36 +++++- src/box/xrow.c | 38 +++++++ src/box/xrow.h | 5 +- test/unit/xrow.cc | 3 + test/vinyl/errinj_stat.result | 8 +- test/vinyl/layout.result | 24 ++-- test/vinyl/stat.result | 78 ++++++------- 9 files changed, 286 insertions(+), 116 deletions(-) -- 2.20.1 ^ permalink raw reply [flat|nested] 16+ messages in thread
* [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries 2019-01-22 10:57 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko @ 2019-01-22 10:57 ` Georgy Kirichenko 2019-01-28 12:58 ` Vladimir Davydov ` (2 more replies) 2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko 1 sibling, 3 replies; 16+ messages in thread From: Georgy Kirichenko @ 2019-01-22 10:57 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko Append txn_id, txn_replica_id and txn_last to xrow_header structure. txn_replica_id identifies replica where transaction was started and txn_id identifies transaction id on that replica. As transaction id a lsn of the first row in this transaction is used. txn_last set to true if it is the last row in a transaction, so we could commit transaction with last row or use additional NOP requests with txn_last = true ans valid txn_id and txn_replica_id. For replication all local changes moved to xrows array tail to form a separate transaction (like autonomous transaction) because it is not possible to replicate such transaction back to it's creator. As encoding/deconding rules assumed: 1. txn_replica_id is encoded only if it is not equal with replica id. This might have point because of replication trigger 2. txn_id and txn_last are encoded only for multi-row transaction. So if we do not have txn_id in a xstream then this means that it is a single row transaction. This rules provides compatibility with previous xlog handling. Needed for: 2798 --- src/box/iproto_constants.h | 3 ++ src/box/wal.c | 36 +++++++++++++++- src/box/xrow.c | 38 +++++++++++++++++ src/box/xrow.h | 5 ++- test/unit/xrow.cc | 3 ++ test/vinyl/errinj_stat.result | 8 ++-- test/vinyl/layout.result | 24 +++++------ test/vinyl/stat.result | 78 +++++++++++++++++------------------ 8 files changed, 138 insertions(+), 57 deletions(-) diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 728514297..d01cdf840 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -60,6 +60,9 @@ enum iproto_key { IPROTO_SCHEMA_VERSION = 0x05, IPROTO_SERVER_VERSION = 0x06, IPROTO_GROUP_ID = 0x07, + IPROTO_TXN_ID = 0x08, + IPROTO_TXN_REPLICA_ID = 0x09, + IPROTO_TXN_LAST = 0x0a, /* Leave a gap for other keys in the header. */ IPROTO_SPACE_ID = 0x10, IPROTO_INDEX_ID = 0x11, diff --git a/src/box/wal.c b/src/box/wal.c index 4c3537672..17ead08e7 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer) } static void -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin, struct xrow_header **end) { + struct xrow_header **row = begin; /** Assign LSN to all local rows. */ for ( ; row < end; row++) { if ((*row)->replica_id == 0) { @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, vclock_follow_xrow(vclock, *row); } } + if ((*begin)->replica_id != instance_id) { + /* + * Move all local changes to the end of rows array and + * a fake local transaction (like an autonomous transaction) + * because we could not replicate the transaction back. + */ + struct xrow_header **row = end - 1; + while (row >= begin) { + if (row[0]->replica_id != instance_id) { + --row; + continue; + } + /* Local row, move it back. */ + struct xrow_header **local_row = row; + while (local_row < end - 1 && + local_row[1]->replica_id != instance_id) { + struct xrow_header *tmp = local_row[0]; + local_row[0] = local_row[1]; + local_row[1] = tmp; + } + --row; + } + while (begin < end && begin[0]->replica_id != instance_id) + ++begin; + } + /* Setup txn_id and tnx_replica_id for localy generated rows. */ + row = begin; + while (row < end) { + row[0]->txn_id = begin[0]->lsn; + row[0]->txn_replica_id = instance_id; + row[0]->txn_last = row == end - 1 ? 1 : 0; + ++row; + } } static void diff --git a/src/box/xrow.c b/src/box/xrow.c index ef3f81add..db524b3c8 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -133,12 +133,32 @@ error: case IPROTO_SCHEMA_VERSION: header->schema_version = mp_decode_uint(pos); break; + case IPROTO_TXN_ID: + header->txn_id = mp_decode_uint(pos); + break; + case IPROTO_TXN_REPLICA_ID: + header->txn_replica_id = mp_decode_uint(pos); + break; + case IPROTO_TXN_LAST: + header->txn_last = mp_decode_uint(pos); + break; default: /* unknown header */ mp_next(pos); } } assert(*pos <= end); + if (header->txn_id == 0) { + /* + * Transaction id is not set so it is a single statement + * transaction. + */ + header->txn_id = header->lsn; + header->txn_last = true; + } + if (header->txn_replica_id == 0) + header->txn_replica_id = header->replica_id; + /* Nop requests aren't supposed to have a body. */ if (*pos < end && header->type != IPROTO_NOP) { const char *body = *pos; @@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync, d = mp_encode_double(d, header->tm); map_size++; } + if (header->txn_id != header->lsn || header->txn_last == 0) { + /* Encode txn id for multi row transaction members. */ + d = mp_encode_uint(d, IPROTO_TXN_ID); + d = mp_encode_uint(d, header->txn_id); + map_size++; + } + if (header->txn_replica_id != header->replica_id) { + d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID); + d = mp_encode_uint(d, header->txn_replica_id); + map_size++; + } + if (header->txn_last && !(header->txn_id == header->lsn && + header->txn_replica_id == header->replica_id)) { + /* Set last row for multi row transaction. */ + d = mp_encode_uint(d, IPROTO_TXN_LAST); + d = mp_encode_uint(d, header->txn_last); + map_size++; + } assert(d <= data + XROW_HEADER_LEN_MAX); mp_encode_map(data, map_size); out->iov_len = d - (char *) out->iov_base; diff --git a/src/box/xrow.h b/src/box/xrow.h index 6bab0a1fd..4acd84d56 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -47,7 +47,7 @@ enum { XROW_HEADER_IOVMAX = 1, XROW_BODY_IOVMAX = 2, XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, - XROW_HEADER_LEN_MAX = 40, + XROW_HEADER_LEN_MAX = 60, XROW_BODY_LEN_MAX = 128, IPROTO_HEADER_LEN = 28, /** 7 = sizeof(iproto_body_bin). */ @@ -69,6 +69,9 @@ struct xrow_header { uint64_t sync; int64_t lsn; /* LSN must be signed for correct comparison */ double tm; + int64_t txn_id; + uint32_t txn_replica_id; + uint32_t txn_last; int bodycnt; uint32_t schema_version; diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc index 165a543cf..0b796d728 100644 --- a/test/unit/xrow.cc +++ b/test/unit/xrow.cc @@ -215,6 +215,9 @@ test_xrow_header_encode_decode() header.lsn = 400; header.tm = 123.456; header.bodycnt = 0; + header.txn_id = header.lsn; + header.txn_replica_id = header.replica_id; + header.txn_last = true; uint64_t sync = 100500; struct iovec vec[1]; is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); diff --git a/test/vinyl/errinj_stat.result b/test/vinyl/errinj_stat.result index 08801dbc6..361ddf5db 100644 --- a/test/vinyl/errinj_stat.result +++ b/test/vinyl/errinj_stat.result @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements - bytes_compressed: <bytes_compressed> pages: 3 rows: 30 - bytes: 411 + bytes: 477 ... i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue --- @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements - bytes_compressed: <bytes_compressed> pages: 4 rows: 40 - bytes: 548 + bytes: 636 ... i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue --- @@ -97,7 +97,7 @@ i:stat().disk.compaction.queue -- 50 statements - bytes_compressed: <bytes_compressed> pages: 5 rows: 50 - bytes: 685 + bytes: 795 ... i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue --- @@ -111,7 +111,7 @@ i:stat().disk.compaction.queue -- 50 statements - bytes_compressed: <bytes_compressed> pages: 5 rows: 50 - bytes: 685 + bytes: 795 ... i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue --- diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result index 14201c5dd..a6b577dbc 100644 --- a/test/vinyl/layout.result +++ b/test/vinyl/layout.result @@ -253,8 +253,8 @@ result BODY: row_index_offset: <offset> offset: <offset> - size: 108 - unpacked_size: 89 + size: 118 + unpacked_size: 99 row_count: 4 min_key: ['ёёё'] - - 00000000000000000008.run @@ -281,7 +281,7 @@ result - HEADER: type: ROWINDEX BODY: - row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00" + row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06" - - 00000000000000000012.index - - HEADER: type: RUNINFO @@ -298,8 +298,8 @@ result BODY: row_index_offset: <offset> offset: <offset> - size: 102 - unpacked_size: 83 + size: 110 + unpacked_size: 91 row_count: 3 min_key: ['ёёё'] - - 00000000000000000012.run @@ -324,7 +324,7 @@ result - HEADER: type: ROWINDEX BODY: - row_index: "\0\0\0\0\0\0\0\x14\0\0\0*" + row_index: "\0\0\0\0\0\0\0\x16\0\0\0." - - 00000000000000000006.index - - HEADER: type: RUNINFO @@ -341,8 +341,8 @@ result BODY: row_index_offset: <offset> offset: <offset> - size: 108 - unpacked_size: 89 + size: 118 + unpacked_size: 99 row_count: 4 min_key: [null, 'ёёё'] - - 00000000000000000006.run @@ -369,7 +369,7 @@ result - HEADER: type: ROWINDEX BODY: - row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00" + row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06" - - 00000000000000000010.index - - HEADER: type: RUNINFO @@ -386,8 +386,8 @@ result BODY: row_index_offset: <offset> offset: <offset> - size: 90 - unpacked_size: 71 + size: 98 + unpacked_size: 79 row_count: 3 min_key: [123, 'ёёё'] - - 00000000000000000010.run @@ -409,7 +409,7 @@ result - HEADER: type: ROWINDEX BODY: - row_index: "\0\0\0\0\0\0\0\x10\0\0\0\"" + row_index: "\0\0\0\0\0\0\0\x12\0\0\0&" ... test_run:cmd("clear filter") --- diff --git a/test/vinyl/stat.result b/test/vinyl/stat.result index 419d3e6c2..33b604b9f 100644 --- a/test/vinyl/stat.result +++ b/test/vinyl/stat.result @@ -299,7 +299,7 @@ stat_diff(istat(), st) run_count: 1 disk: last_level: - bytes: 26049 + bytes: 26113 pages: 7 bytes_compressed: <bytes_compressed> rows: 25 @@ -312,16 +312,16 @@ stat_diff(istat(), st) bytes: 26525 count: 1 output: - bytes: 26049 + bytes: 26113 pages: 7 bytes_compressed: <bytes_compressed> rows: 25 - bytes: 26049 + bytes: 26113 index_size: 294 pages: 7 bytes_compressed: <bytes_compressed> bloom_size: 70 - bytes: 26049 + bytes: 26113 put: rows: 25 bytes: 26525 @@ -344,7 +344,7 @@ stat_diff(istat(), st) --- - disk: last_level: - bytes: 26042 + bytes: 26104 pages: 6 bytes_compressed: <bytes_compressed> rows: 25 @@ -357,23 +357,23 @@ stat_diff(istat(), st) bytes: 53050 count: 1 output: - bytes: 52091 + bytes: 52217 pages: 13 bytes_compressed: <bytes_compressed> rows: 50 - bytes: 26042 + bytes: 26104 index_size: 252 pages: 6 bytes_compressed: <bytes_compressed> compaction: input: - bytes: 78140 + bytes: 78330 pages: 20 bytes_compressed: <bytes_compressed> rows: 75 count: 1 output: - bytes: 52091 + bytes: 52217 pages: 13 bytes_compressed: <bytes_compressed> rows: 50 @@ -381,7 +381,7 @@ stat_diff(istat(), st) rows: 50 bytes: 53050 rows: 25 - bytes: 26042 + bytes: 26104 ... -- point lookup from disk + cache put st = istat() @@ -405,7 +405,7 @@ stat_diff(istat(), st) disk: iterator: read: - bytes: 4167 + bytes: 4177 pages: 1 bytes_compressed: <bytes_compressed> rows: 4 @@ -655,7 +655,7 @@ stat_diff(istat(), st) disk: iterator: read: - bytes: 104300 + bytes: 104550 pages: 25 bytes_compressed: <bytes_compressed> rows: 100 @@ -1000,7 +1000,7 @@ istat() --- - rows: 306 run_avg: 1 - bytes: 317731 + bytes: 317981 upsert: squashed: 0 applied: 0 @@ -1032,7 +1032,7 @@ istat() bytes_compressed: <bytes_compressed> pages: 25 rows: 100 - bytes: 104300 + bytes: 104550 rows: 100 statement: inserts: 0 @@ -1083,7 +1083,7 @@ istat() count: 0 pages: 25 bytes_compressed: <bytes_compressed> - bytes: 104300 + bytes: 104550 txw: bytes: 0 rows: 0 @@ -1123,8 +1123,8 @@ gstat() page_index: 1050 bloom_filter: 140 disk: - data_compacted: 104300 - data: 104300 + data_compacted: 104550 + data: 104550 index: 1190 scheduler: tasks_inprogress: 0 @@ -1163,7 +1163,7 @@ box.snapshot() stat_diff(gstat(), st, 'scheduler') --- - dump_input: 104200 - dump_output: 103592 + dump_output: 104018 tasks_completed: 2 dump_count: 1 ... @@ -1180,7 +1180,7 @@ box.snapshot() stat_diff(gstat(), st, 'scheduler') --- - dump_input: 10420 - dump_output: 10371 + dump_output: 10417 tasks_completed: 2 dump_count: 1 ... @@ -1195,9 +1195,9 @@ while i1:stat().disk.compaction.count == 0 do fiber.sleep(0.01) end ... stat_diff(gstat(), st, 'scheduler') --- -- compaction_input: 112188 +- compaction_input: 112436 tasks_completed: 1 - compaction_output: 101984 + compaction_output: 102208 ... st = gstat() --- @@ -1210,9 +1210,9 @@ while i2:stat().disk.compaction.count == 0 do fiber.sleep(0.01) end ... stat_diff(gstat(), st, 'scheduler') --- -- compaction_input: 1775 +- compaction_input: 1999 tasks_completed: 1 - compaction_output: 1608 + compaction_output: 1810 ... s:drop() --- @@ -1324,7 +1324,7 @@ st2 = i2:stat() ... s:bsize() --- -- 52199 +- 52313 ... i1:len(), i2:len() --- @@ -1334,7 +1334,7 @@ i1:len(), i2:len() i1:bsize(), i2:bsize() --- - 364 -- 920 +- 1022 ... s:bsize() == st1.disk.bytes --- @@ -1386,7 +1386,7 @@ st2 = i2:stat() ... s:bsize() --- -- 107449 +- 107563 ... i1:len(), i2:len() --- @@ -1396,7 +1396,7 @@ i1:len(), i2:len() i1:bsize(), i2:bsize() --- - 49516 -- 50072 +- 50174 ... s:bsize() == st1.memory.bytes + st1.disk.bytes --- @@ -1451,7 +1451,7 @@ st2 = i2:stat() ... s:bsize() --- -- 52199 +- 52313 ... i1:len(), i2:len() --- @@ -1461,7 +1461,7 @@ i1:len(), i2:len() i1:bsize(), i2:bsize() --- - 364 -- 920 +- 1022 ... s:bsize() == st1.disk.bytes --- @@ -1653,18 +1653,18 @@ i1:stat().disk.last_level - bytes_compressed: <bytes_compressed> pages: 2 rows: 100 - bytes: 11815 + bytes: 12019 ... i2:stat().disk.last_level --- - bytes_compressed: <bytes_compressed> pages: 1 rows: 100 - bytes: 1608 + bytes: 1810 ... box.stat.vinyl().disk.data_compacted --- -- 11815 +- 12019 ... for i = 1, 100, 10 do s:replace{i, i * 1000, digest.urandom(100)} end --- @@ -1678,18 +1678,18 @@ i1:stat().disk.last_level - bytes_compressed: <bytes_compressed> pages: 2 rows: 100 - bytes: 11815 + bytes: 12019 ... i2:stat().disk.last_level --- - bytes_compressed: <bytes_compressed> pages: 1 rows: 100 - bytes: 1608 + bytes: 1810 ... box.stat.vinyl().disk.data_compacted --- -- 11815 +- 12019 ... i1:compact() --- @@ -1702,11 +1702,11 @@ i1:stat().disk.last_level - bytes_compressed: <bytes_compressed> pages: 2 rows: 100 - bytes: 11841 + bytes: 12045 ... box.stat.vinyl().disk.data_compacted --- -- 11841 +- 12045 ... i2:compact() --- @@ -1719,11 +1719,11 @@ i2:stat().disk.last_level - bytes_compressed: <bytes_compressed> pages: 1 rows: 110 - bytes: 1794 + bytes: 2016 ... box.stat.vinyl().disk.data_compacted --- -- 11841 +- 12045 ... s:drop() --- -- 2.20.1 ^ permalink raw reply [flat|nested] 16+ messages in thread
* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries 2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries Georgy Kirichenko @ 2019-01-28 12:58 ` Vladimir Davydov 2019-01-29 10:09 ` Георгий Кириченко 2019-01-28 13:00 ` Vladimir Davydov 2019-02-08 16:56 ` Konstantin Osipov 2 siblings, 1 reply; 16+ messages in thread From: Vladimir Davydov @ 2019-01-28 12:58 UTC (permalink / raw) To: Georgy Kirichenko; +Cc: tarantool-patches On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote: > Append txn_id, txn_replica_id and txn_last to xrow_header structure. > txn_replica_id identifies replica where transaction was started and > txn_id identifies transaction id on that replica. As transaction id > a lsn of the first row in this transaction is used. > txn_last set to true if it is the last row in a transaction, so we > could commit transaction with last row or use additional NOP requests > with txn_last = true ans valid txn_id and txn_replica_id. > For replication all local changes moved to xrows array tail to form > a separate transaction (like autonomous transaction) because it is not > possible to replicate such transaction back to it's creator. > > As encoding/deconding rules assumed: > 1. txn_replica_id is encoded only if it is not equal with replica > id. This might have point because of replication trigger > 2. txn_id and txn_last are encoded only for multi-row transaction. > So if we do not have txn_id in a xstream then this means that it > is a single row transaction. > This rules provides compatibility with previous xlog handling. > > Needed for: 2798 > --- > src/box/iproto_constants.h | 3 ++ Looks like you forgot to update iproto_constants.c. > src/box/wal.c | 36 +++++++++++++++- > src/box/xrow.c | 38 +++++++++++++++++ > src/box/xrow.h | 5 ++- > test/unit/xrow.cc | 3 ++ > test/vinyl/errinj_stat.result | 8 ++-- > test/vinyl/layout.result | 24 +++++------ > test/vinyl/stat.result | 78 +++++++++++++++++------------------ > 8 files changed, 138 insertions(+), 57 deletions(-) > > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h > index 728514297..d01cdf840 100644 > --- a/src/box/iproto_constants.h > +++ b/src/box/iproto_constants.h > @@ -60,6 +60,9 @@ enum iproto_key { > IPROTO_SCHEMA_VERSION = 0x05, > IPROTO_SERVER_VERSION = 0x06, > IPROTO_GROUP_ID = 0x07, > + IPROTO_TXN_ID = 0x08, > + IPROTO_TXN_REPLICA_ID = 0x09, Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I would expect TXN_ID to be enough - we could use REPLICA_ID to make sure that transaction identifiers from different instances don't overlap, because there couldn't be a "multi-instance" transaction, could there? > + IPROTO_TXN_LAST = 0x0a, I think we should instead introduce BEGIN and COMMIT commands, because: - We might need to attach some extra information to each transaction, e.g. mark transactions that were committed in parallel on the master so that they can be committed in parallel on a replica. Attaching such information to each row would be excessive. - We will need BEGIN and COMMIT for IPROTO transactions. It would be nice if we could share the code with them. - Having separate BEGIN/COMMIT rows would make tarantoolctl-cat output easier to read. > /* Leave a gap for other keys in the header. */ > IPROTO_SPACE_ID = 0x10, > IPROTO_INDEX_ID = 0x11, > diff --git a/src/box/wal.c b/src/box/wal.c > index 4c3537672..17ead08e7 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer) > } > > static void > -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin, > struct xrow_header **end) > { > + struct xrow_header **row = begin; > /** Assign LSN to all local rows. */ > for ( ; row < end; row++) { > if ((*row)->replica_id == 0) { > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, > vclock_follow_xrow(vclock, *row); > } > } > + if ((*begin)->replica_id != instance_id) { > + /* > + * Move all local changes to the end of rows array and > + * a fake local transaction (like an autonomous transaction) > + * because we could not replicate the transaction back. > + */ > + struct xrow_header **row = end - 1; > + while (row >= begin) { > + if (row[0]->replica_id != instance_id) { > + --row; > + continue; > + } > + /* Local row, move it back. */ > + struct xrow_header **local_row = row; > + while (local_row < end - 1 && > + local_row[1]->replica_id != instance_id) { > + struct xrow_header *tmp = local_row[0]; > + local_row[0] = local_row[1]; > + local_row[1] = tmp; There's SWAP for this. > + } > + --row; > + } > + while (begin < end && begin[0]->replica_id != instance_id) > + ++begin; > + } I don't understand why we need to move rows generated locally (by an on_replace trigger I surmise) to the end of a transaction. We have TXN_ID attached to each row so we could leave the transactions interleaved, couldn't we? > + /* Setup txn_id and tnx_replica_id for localy generated rows. */ > + row = begin; > + while (row < end) { > + row[0]->txn_id = begin[0]->lsn; > + row[0]->txn_replica_id = instance_id; > + row[0]->txn_last = row == end - 1 ? 1 : 0; > + ++row; > + } I think we better use txn->id for TXN_ID rather than LSN. Why do you think LSN should be used? I don't see any rationale for that anywhere in the comments. Also, setting TXN_ID looks like a job that should be done by txn_add_redo... > } > > static void > diff --git a/src/box/xrow.c b/src/box/xrow.c > index ef3f81add..db524b3c8 100644 > --- a/src/box/xrow.c > +++ b/src/box/xrow.c > @@ -133,12 +133,32 @@ error: > case IPROTO_SCHEMA_VERSION: > header->schema_version = mp_decode_uint(pos); > break; > + case IPROTO_TXN_ID: > + header->txn_id = mp_decode_uint(pos); > + break; > + case IPROTO_TXN_REPLICA_ID: > + header->txn_replica_id = mp_decode_uint(pos); > + break; > + case IPROTO_TXN_LAST: > + header->txn_last = mp_decode_uint(pos); Should be bool? > + break; > default: > /* unknown header */ > mp_next(pos); > } > } > assert(*pos <= end); > + if (header->txn_id == 0) { > + /* > + * Transaction id is not set so it is a single statement > + * transaction. > + */ > + header->txn_id = header->lsn; > + header->txn_last = true; > + } > + if (header->txn_replica_id == 0) > + header->txn_replica_id = header->replica_id; > + > /* Nop requests aren't supposed to have a body. */ > if (*pos < end && header->type != IPROTO_NOP) { > const char *body = *pos; > @@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync, > d = mp_encode_double(d, header->tm); > map_size++; > } > + if (header->txn_id != header->lsn || header->txn_last == 0) { > + /* Encode txn id for multi row transaction members. */ > + d = mp_encode_uint(d, IPROTO_TXN_ID); > + d = mp_encode_uint(d, header->txn_id); > + map_size++; > + } > + if (header->txn_replica_id != header->replica_id) { > + d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID); > + d = mp_encode_uint(d, header->txn_replica_id); > + map_size++; > + } > + if (header->txn_last && !(header->txn_id == header->lsn && > + header->txn_replica_id == header->replica_id)) { > + /* Set last row for multi row transaction. */ > + d = mp_encode_uint(d, IPROTO_TXN_LAST); > + d = mp_encode_uint(d, header->txn_last); > + map_size++; > + } > assert(d <= data + XROW_HEADER_LEN_MAX); > mp_encode_map(data, map_size); > out->iov_len = d - (char *) out->iov_base; > diff --git a/src/box/xrow.h b/src/box/xrow.h > index 6bab0a1fd..4acd84d56 100644 > --- a/src/box/xrow.h > +++ b/src/box/xrow.h > @@ -47,7 +47,7 @@ enum { > XROW_HEADER_IOVMAX = 1, > XROW_BODY_IOVMAX = 2, > XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, > - XROW_HEADER_LEN_MAX = 40, > + XROW_HEADER_LEN_MAX = 60, > XROW_BODY_LEN_MAX = 128, > IPROTO_HEADER_LEN = 28, > /** 7 = sizeof(iproto_body_bin). */ > @@ -69,6 +69,9 @@ struct xrow_header { > uint64_t sync; > int64_t lsn; /* LSN must be signed for correct comparison */ > double tm; > + int64_t txn_id; > + uint32_t txn_replica_id; > + uint32_t txn_last; > > int bodycnt; > uint32_t schema_version; > diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc > index 165a543cf..0b796d728 100644 > --- a/test/unit/xrow.cc > +++ b/test/unit/xrow.cc > @@ -215,6 +215,9 @@ test_xrow_header_encode_decode() > header.lsn = 400; > header.tm = 123.456; > header.bodycnt = 0; > + header.txn_id = header.lsn; > + header.txn_replica_id = header.replica_id; > + header.txn_last = true; > uint64_t sync = 100500; > struct iovec vec[1]; > is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); > diff --git a/test/vinyl/errinj_stat.result b/test/vinyl/errinj_stat.result > index 08801dbc6..361ddf5db 100644 > --- a/test/vinyl/errinj_stat.result > +++ b/test/vinyl/errinj_stat.result > @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements > - bytes_compressed: <bytes_compressed> > pages: 3 > rows: 30 > - bytes: 411 > + bytes: 477 > ... > i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue > --- > @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements > - bytes_compressed: <bytes_compressed> > pages: 4 > rows: 40 > - bytes: 548 > + bytes: 636 I wouldn't expect vinyl stats to be changed by this patch. Why did it happen? ^ permalink raw reply [flat|nested] 16+ messages in thread
* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries 2019-01-28 12:58 ` Vladimir Davydov @ 2019-01-29 10:09 ` Георгий Кириченко 2019-01-29 11:00 ` Vladimir Davydov 0 siblings, 1 reply; 16+ messages in thread From: Георгий Кириченко @ 2019-01-29 10:09 UTC (permalink / raw) To: Vladimir Davydov; +Cc: tarantool-patches [-- Attachment #1: Type: text/plain, Size: 11012 bytes --] On Monday, January 28, 2019 3:58:59 PM MSK Vladimir Davydov wrote: > On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote: > > Append txn_id, txn_replica_id and txn_last to xrow_header structure. > > txn_replica_id identifies replica where transaction was started and > > txn_id identifies transaction id on that replica. As transaction id > > a lsn of the first row in this transaction is used. > > txn_last set to true if it is the last row in a transaction, so we > > could commit transaction with last row or use additional NOP requests > > with txn_last = true ans valid txn_id and txn_replica_id. > > For replication all local changes moved to xrows array tail to form > > a separate transaction (like autonomous transaction) because it is not > > possible to replicate such transaction back to it's creator. > > > > As encoding/deconding rules assumed: > > 1. txn_replica_id is encoded only if it is not equal with replica > > > > id. This might have point because of replication trigger > > > > 2. txn_id and txn_last are encoded only for multi-row transaction. > > > > So if we do not have txn_id in a xstream then this means that it > > is a single row transaction. > > > > This rules provides compatibility with previous xlog handling. > > > > Needed for: 2798 > > --- > > > > src/box/iproto_constants.h | 3 ++ > > Looks like you forgot to update iproto_constants.c. > > > src/box/wal.c | 36 +++++++++++++++- > > src/box/xrow.c | 38 +++++++++++++++++ > > src/box/xrow.h | 5 ++- > > test/unit/xrow.cc | 3 ++ > > test/vinyl/errinj_stat.result | 8 ++-- > > test/vinyl/layout.result | 24 +++++------ > > test/vinyl/stat.result | 78 +++++++++++++++++------------------ > > 8 files changed, 138 insertions(+), 57 deletions(-) > > > > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h > > index 728514297..d01cdf840 100644 > > --- a/src/box/iproto_constants.h > > +++ b/src/box/iproto_constants.h > > @@ -60,6 +60,9 @@ enum iproto_key { > > > > IPROTO_SCHEMA_VERSION = 0x05, > > IPROTO_SERVER_VERSION = 0x06, > > IPROTO_GROUP_ID = 0x07, > > > > + IPROTO_TXN_ID = 0x08, > > + IPROTO_TXN_REPLICA_ID = 0x09, > > Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I would > expect TXN_ID to be enough - we could use REPLICA_ID to make sure that > transaction identifiers from different instances don't overlap, because > there couldn't be a "multi-instance" transaction, could there? > > > + IPROTO_TXN_LAST = 0x0a, > > I think we should instead introduce BEGIN and COMMIT commands, because: I completely do not like any auto-commit logic in a xlog file. You suggestion breaks backward compatibility because previous logs do not have any BEGIN/ COMMIT. Also separate BEGIN and COMMIT messages increase transaction size. It is worth noting, that IPROTO_NOP with a new txn_id or txn_last emulates BEGIN or COMMIT. > > - We might need to attach some extra information to each transaction, > e.g. mark transactions that were committed in parallel on the master > so that they can be committed in parallel on a replica. Attaching > such information to each row would be excessive. The patch is not about this. > > - We will need BEGIN and COMMIT for IPROTO transactions. It would be > nice if we could share the code with them. The biggest issue we could not know transaction identifier in case of IPROTO. Iproto is single stream proto, but wal might be not as it is multiplexing a lot of transactions in a one output, so it might be bad be in paradigm of universally format for both IPROTO and WAL. > > - Having separate BEGIN/COMMIT rows would make tarantoolctl-cat output > easier to read. > > > /* Leave a gap for other keys in the header. */ > > IPROTO_SPACE_ID = 0x10, > > IPROTO_INDEX_ID = 0x11, > > > > diff --git a/src/box/wal.c b/src/box/wal.c > > index 4c3537672..17ead08e7 100644 > > --- a/src/box/wal.c > > +++ b/src/box/wal.c > > @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer) > > > > } > > > > static void > > > > -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, > > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin, > > > > struct xrow_header **end) > > > > { > > > > + struct xrow_header **row = begin; > > > > /** Assign LSN to all local rows. */ > > for ( ; row < end; row++) { > > > > if ((*row)->replica_id == 0) { > > > > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct > > xrow_header **row,> > > vclock_follow_xrow(vclock, *row); > > > > } > > > > } > > > > + if ((*begin)->replica_id != instance_id) { > > + /* > > + * Move all local changes to the end of rows array and > > + * a fake local transaction (like an autonomous transaction) > > + * because we could not replicate the transaction back. > > + */ > > + struct xrow_header **row = end - 1; > > + while (row >= begin) { > > + if (row[0]->replica_id != instance_id) { > > + --row; > > + continue; > > + } > > + /* Local row, move it back. */ > > + struct xrow_header **local_row = row; > > + while (local_row < end - 1 && > > + local_row[1]->replica_id != instance_id) { > > + struct xrow_header *tmp = local_row[0]; > > + local_row[0] = local_row[1]; > > + local_row[1] = tmp; > > There's SWAP for this. > > > + } > > + --row; > > + } > > + while (begin < end && begin[0]->replica_id != instance_id) > > + ++begin; > > + } > > I don't understand why we need to move rows generated locally (by > an on_replace trigger I surmise) to the end of a transaction. We > have TXN_ID attached to each row so we could leave the transactions > interleaved, couldn't we? You are right, but in that case applier should track a lot of transaction simultaneously. Also it complicates recovery too. I hope it will be fixed while parallel applier implementing. > > > + /* Setup txn_id and tnx_replica_id for localy generated rows. */ > > + row = begin; > > + while (row < end) { > > + row[0]->txn_id = begin[0]->lsn; > > + row[0]->txn_replica_id = instance_id; > > + row[0]->txn_last = row == end - 1 ? 1 : 0; > > + ++row; > > + } > > I think we better use txn->id for TXN_ID rather than LSN. > Why do you think LSN should be used? I don't see any rationale > for that anywhere in the comments. Also, setting TXN_ID looks > like a job that should be done by txn_add_redo... > > > } > > > > static void > > > > diff --git a/src/box/xrow.c b/src/box/xrow.c > > index ef3f81add..db524b3c8 100644 > > --- a/src/box/xrow.c > > +++ b/src/box/xrow.c > > > > @@ -133,12 +133,32 @@ error: > > case IPROTO_SCHEMA_VERSION: > > header->schema_version = mp_decode_uint(pos); > > break; > > > > + case IPROTO_TXN_ID: > > + header->txn_id = mp_decode_uint(pos); > > + break; > > + case IPROTO_TXN_REPLICA_ID: > > + header->txn_replica_id = mp_decode_uint(pos); > > + break; > > + case IPROTO_TXN_LAST: > > + header->txn_last = mp_decode_uint(pos); > > Should be bool? > > > + break; > > > > default: > > /* unknown header */ > > mp_next(pos); > > > > } > > > > } > > assert(*pos <= end); > > > > + if (header->txn_id == 0) { > > + /* > > + * Transaction id is not set so it is a single statement > > + * transaction. > > + */ > > + header->txn_id = header->lsn; > > + header->txn_last = true; > > + } > > + if (header->txn_replica_id == 0) > > + header->txn_replica_id = header->replica_id; > > + > > > > /* Nop requests aren't supposed to have a body. */ > > if (*pos < end && header->type != IPROTO_NOP) { > > > > const char *body = *pos; > > > > @@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header, > > uint64_t sync,> > > d = mp_encode_double(d, header->tm); > > map_size++; > > > > } > > > > + if (header->txn_id != header->lsn || header->txn_last == 0) { > > + /* Encode txn id for multi row transaction members. */ > > + d = mp_encode_uint(d, IPROTO_TXN_ID); > > + d = mp_encode_uint(d, header->txn_id); > > + map_size++; > > + } > > + if (header->txn_replica_id != header->replica_id) { > > + d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID); > > + d = mp_encode_uint(d, header->txn_replica_id); > > + map_size++; > > + } > > + if (header->txn_last && !(header->txn_id == header->lsn && > > + header->txn_replica_id == header->replica_id)) { > > + /* Set last row for multi row transaction. */ > > + d = mp_encode_uint(d, IPROTO_TXN_LAST); > > + d = mp_encode_uint(d, header->txn_last); > > + map_size++; > > + } > > > > assert(d <= data + XROW_HEADER_LEN_MAX); > > mp_encode_map(data, map_size); > > out->iov_len = d - (char *) out->iov_base; > > > > diff --git a/src/box/xrow.h b/src/box/xrow.h > > index 6bab0a1fd..4acd84d56 100644 > > --- a/src/box/xrow.h > > +++ b/src/box/xrow.h > > @@ -47,7 +47,7 @@ enum { > > > > XROW_HEADER_IOVMAX = 1, > > XROW_BODY_IOVMAX = 2, > > XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, > > > > - XROW_HEADER_LEN_MAX = 40, > > + XROW_HEADER_LEN_MAX = 60, > > > > XROW_BODY_LEN_MAX = 128, > > IPROTO_HEADER_LEN = 28, > > /** 7 = sizeof(iproto_body_bin). */ > > > > @@ -69,6 +69,9 @@ struct xrow_header { > > > > uint64_t sync; > > int64_t lsn; /* LSN must be signed for correct comparison */ > > double tm; > > > > + int64_t txn_id; > > + uint32_t txn_replica_id; > > + uint32_t txn_last; > > > > int bodycnt; > > uint32_t schema_version; > > > > diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc > > index 165a543cf..0b796d728 100644 > > --- a/test/unit/xrow.cc > > +++ b/test/unit/xrow.cc > > @@ -215,6 +215,9 @@ test_xrow_header_encode_decode() > > > > header.lsn = 400; > > header.tm = 123.456; > > header.bodycnt = 0; > > > > + header.txn_id = header.lsn; > > + header.txn_replica_id = header.replica_id; > > + header.txn_last = true; > > > > uint64_t sync = 100500; > > struct iovec vec[1]; > > is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); > > > > diff --git a/test/vinyl/errinj_stat.result b/test/vinyl/errinj_stat.result > > index 08801dbc6..361ddf5db 100644 > > --- a/test/vinyl/errinj_stat.result > > +++ b/test/vinyl/errinj_stat.result > > @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements > > > > - bytes_compressed: <bytes_compressed> > > > > pages: 3 > > rows: 30 > > > > - bytes: 411 > > + bytes: 477 > > > > ... > > i:stat().disk.compaction.queue.bytes == > > box.stat.vinyl().scheduler.compaction_queue --- > > > > @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements > > > > - bytes_compressed: <bytes_compressed> > > > > pages: 4 > > rows: 40 > > > > - bytes: 548 > > + bytes: 636 > > I wouldn't expect vinyl stats to be changed by this patch. > Why did it happen? Because if rows were written in an one entry, then wal creates transaction. [-- Attachment #2: This is a digitally signed message part. --] [-- Type: application/pgp-signature, Size: 488 bytes --] ^ permalink raw reply [flat|nested] 16+ messages in thread
* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries 2019-01-29 10:09 ` Георгий Кириченко @ 2019-01-29 11:00 ` Vladimir Davydov 2019-01-31 7:34 ` Георгий Кириченко 0 siblings, 1 reply; 16+ messages in thread From: Vladimir Davydov @ 2019-01-29 11:00 UTC (permalink / raw) To: Георгий Кириченко Cc: tarantool-patches On Tue, Jan 29, 2019 at 01:09:50PM +0300, Георгий Кириченко wrote: > On Monday, January 28, 2019 3:58:59 PM MSK Vladimir Davydov wrote: > > On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote: > > > Append txn_id, txn_replica_id and txn_last to xrow_header structure. > > > txn_replica_id identifies replica where transaction was started and > > > txn_id identifies transaction id on that replica. As transaction id > > > a lsn of the first row in this transaction is used. > > > txn_last set to true if it is the last row in a transaction, so we > > > could commit transaction with last row or use additional NOP requests > > > with txn_last = true ans valid txn_id and txn_replica_id. > > > For replication all local changes moved to xrows array tail to form > > > a separate transaction (like autonomous transaction) because it is not > > > possible to replicate such transaction back to it's creator. > > > > > > As encoding/deconding rules assumed: > > > 1. txn_replica_id is encoded only if it is not equal with replica > > > > > > id. This might have point because of replication trigger > > > > > > 2. txn_id and txn_last are encoded only for multi-row transaction. > > > > > > So if we do not have txn_id in a xstream then this means that it > > > is a single row transaction. > > > > > > This rules provides compatibility with previous xlog handling. > > > > > > Needed for: 2798 > > > --- > > > > > > src/box/iproto_constants.h | 3 ++ > > > > Looks like you forgot to update iproto_constants.c. > > > > > src/box/wal.c | 36 +++++++++++++++- > > > src/box/xrow.c | 38 +++++++++++++++++ > > > src/box/xrow.h | 5 ++- > > > test/unit/xrow.cc | 3 ++ > > > test/vinyl/errinj_stat.result | 8 ++-- > > > test/vinyl/layout.result | 24 +++++------ > > > test/vinyl/stat.result | 78 +++++++++++++++++------------------ > > > 8 files changed, 138 insertions(+), 57 deletions(-) > > > > > > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h > > > index 728514297..d01cdf840 100644 > > > --- a/src/box/iproto_constants.h > > > +++ b/src/box/iproto_constants.h > > > @@ -60,6 +60,9 @@ enum iproto_key { > > > > > > IPROTO_SCHEMA_VERSION = 0x05, > > > IPROTO_SERVER_VERSION = 0x06, > > > IPROTO_GROUP_ID = 0x07, > > > > > > + IPROTO_TXN_ID = 0x08, > > > + IPROTO_TXN_REPLICA_ID = 0x09, > > > > Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I would > > expect TXN_ID to be enough - we could use REPLICA_ID to make sure that > > transaction identifiers from different instances don't overlap, because > > there couldn't be a "multi-instance" transaction, could there? > > > > > + IPROTO_TXN_LAST = 0x0a, > > > > I think we should instead introduce BEGIN and COMMIT commands, because: > I completely do not like any auto-commit logic in a xlog file. You suggestion > breaks backward compatibility because previous logs do not have any BEGIN/ It wouldn't break backward compatibility. It might break forward compatibility, which is fine by me (we do it all the time). > COMMIT. Also separate BEGIN and COMMIT messages increase transaction size. I doubt that after compression you'll see a difference. > It is worth noting, that IPROTO_NOP with a new txn_id or txn_last emulates > BEGIN or COMMIT. Yeah, but that would look weird. > > > > > - We might need to attach some extra information to each transaction, > > e.g. mark transactions that were committed in parallel on the master > > so that they can be committed in parallel on a replica. Attaching > > such information to each row would be excessive. > The patch is not about this. But we have to think about that in advance, don't we? > > > > - We will need BEGIN and COMMIT for IPROTO transactions. It would be > > nice if we could share the code with them. > The biggest issue we could not know transaction identifier in case of IPROTO. > Iproto is single stream proto, but wal might be not as it is multiplexing a > lot of transactions in a one output, so it might be bad be in paradigm of > universally format for both IPROTO and WAL. OK. I think we need to discuss the options with Kostja. > > > > - Having separate BEGIN/COMMIT rows would make tarantoolctl-cat output > > easier to read. > > > > > /* Leave a gap for other keys in the header. */ > > > IPROTO_SPACE_ID = 0x10, > > > IPROTO_INDEX_ID = 0x11, > > > > > > diff --git a/src/box/wal.c b/src/box/wal.c > > > index 4c3537672..17ead08e7 100644 > > > --- a/src/box/wal.c > > > +++ b/src/box/wal.c > > > @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer) > > > > > > } > > > > > > static void > > > > > > -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, > > > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin, > > > > > > struct xrow_header **end) > > > > > > { > > > > > > + struct xrow_header **row = begin; > > > > > > /** Assign LSN to all local rows. */ > > > for ( ; row < end; row++) { > > > > > > if ((*row)->replica_id == 0) { > > > > > > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct > > > xrow_header **row,> > > > vclock_follow_xrow(vclock, *row); > > > > > > } > > > > > > } > > > > > > + if ((*begin)->replica_id != instance_id) { > > > + /* > > > + * Move all local changes to the end of rows array and > > > + * a fake local transaction (like an autonomous transaction) > > > + * because we could not replicate the transaction back. > > > + */ > > > + struct xrow_header **row = end - 1; > > > + while (row >= begin) { > > > + if (row[0]->replica_id != instance_id) { > > > + --row; > > > + continue; > > > + } > > > + /* Local row, move it back. */ > > > + struct xrow_header **local_row = row; > > > + while (local_row < end - 1 && > > > + local_row[1]->replica_id != instance_id) { > > > + struct xrow_header *tmp = local_row[0]; > > > + local_row[0] = local_row[1]; > > > + local_row[1] = tmp; > > > > There's SWAP for this. > > > > > + } > > > + --row; > > > + } > > > + while (begin < end && begin[0]->replica_id != instance_id) > > > + ++begin; > > > + } > > > > I don't understand why we need to move rows generated locally (by > > an on_replace trigger I surmise) to the end of a transaction. We > > have TXN_ID attached to each row so we could leave the transactions > > interleaved, couldn't we? > You are right, but in that case applier should track a lot of transaction > simultaneously. Also it complicates recovery too. I hope it will be fixed while > parallel applier implementing. May be, we should implement parallel applier in the scope of this issue then? Anyway, without it, sync replication won't scale with the number of parallel transactions. > > > > > + /* Setup txn_id and tnx_replica_id for localy generated rows. */ > > > + row = begin; > > > + while (row < end) { > > > + row[0]->txn_id = begin[0]->lsn; > > > + row[0]->txn_replica_id = instance_id; > > > + row[0]->txn_last = row == end - 1 ? 1 : 0; > > > + ++row; > > > + } > > > > I think we better use txn->id for TXN_ID rather than LSN. > > Why do you think LSN should be used? I don't see any rationale > > for that anywhere in the comments. Also, setting TXN_ID looks > > like a job that should be done by txn_add_redo... > > > > > } > > > > > > static void > > > > > > diff --git a/src/box/xrow.c b/src/box/xrow.c > > > index ef3f81add..db524b3c8 100644 > > > --- a/src/box/xrow.c > > > +++ b/src/box/xrow.c > > > > > > @@ -133,12 +133,32 @@ error: > > > case IPROTO_SCHEMA_VERSION: > > > header->schema_version = mp_decode_uint(pos); > > > break; > > > > > > + case IPROTO_TXN_ID: > > > + header->txn_id = mp_decode_uint(pos); > > > + break; > > > + case IPROTO_TXN_REPLICA_ID: > > > + header->txn_replica_id = mp_decode_uint(pos); > > > + break; > > > + case IPROTO_TXN_LAST: > > > + header->txn_last = mp_decode_uint(pos); > > > > Should be bool? > > > > > + break; > > > > > > default: > > > /* unknown header */ > > > mp_next(pos); > > > > > > } > > > > > > } > > > assert(*pos <= end); > > > > > > + if (header->txn_id == 0) { > > > + /* > > > + * Transaction id is not set so it is a single statement > > > + * transaction. > > > + */ > > > + header->txn_id = header->lsn; > > > + header->txn_last = true; > > > + } > > > + if (header->txn_replica_id == 0) > > > + header->txn_replica_id = header->replica_id; > > > + > > > > > > /* Nop requests aren't supposed to have a body. */ > > > if (*pos < end && header->type != IPROTO_NOP) { > > > > > > const char *body = *pos; > > > > > > @@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header, > > > uint64_t sync,> > > > d = mp_encode_double(d, header->tm); > > > map_size++; > > > > > > } > > > > > > + if (header->txn_id != header->lsn || header->txn_last == 0) { > > > + /* Encode txn id for multi row transaction members. */ > > > + d = mp_encode_uint(d, IPROTO_TXN_ID); > > > + d = mp_encode_uint(d, header->txn_id); > > > + map_size++; > > > + } > > > + if (header->txn_replica_id != header->replica_id) { > > > + d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID); > > > + d = mp_encode_uint(d, header->txn_replica_id); > > > + map_size++; > > > + } > > > + if (header->txn_last && !(header->txn_id == header->lsn && > > > + header->txn_replica_id == header->replica_id)) > { > > > + /* Set last row for multi row transaction. */ > > > + d = mp_encode_uint(d, IPROTO_TXN_LAST); > > > + d = mp_encode_uint(d, header->txn_last); > > > + map_size++; > > > + } > > > > > > assert(d <= data + XROW_HEADER_LEN_MAX); > > > mp_encode_map(data, map_size); > > > out->iov_len = d - (char *) out->iov_base; > > > > > > diff --git a/src/box/xrow.h b/src/box/xrow.h > > > index 6bab0a1fd..4acd84d56 100644 > > > --- a/src/box/xrow.h > > > +++ b/src/box/xrow.h > > > @@ -47,7 +47,7 @@ enum { > > > > > > XROW_HEADER_IOVMAX = 1, > > > XROW_BODY_IOVMAX = 2, > > > XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, > > > > > > - XROW_HEADER_LEN_MAX = 40, > > > + XROW_HEADER_LEN_MAX = 60, > > > > > > XROW_BODY_LEN_MAX = 128, > > > IPROTO_HEADER_LEN = 28, > > > /** 7 = sizeof(iproto_body_bin). */ > > > > > > @@ -69,6 +69,9 @@ struct xrow_header { > > > > > > uint64_t sync; > > > int64_t lsn; /* LSN must be signed for correct comparison */ > > > double tm; > > > > > > + int64_t txn_id; > > > + uint32_t txn_replica_id; > > > + uint32_t txn_last; > > > > > > int bodycnt; > > > uint32_t schema_version; > > > > > > diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc > > > index 165a543cf..0b796d728 100644 > > > --- a/test/unit/xrow.cc > > > +++ b/test/unit/xrow.cc > > > @@ -215,6 +215,9 @@ test_xrow_header_encode_decode() > > > > > > header.lsn = 400; > > > header.tm = 123.456; > > > header.bodycnt = 0; > > > > > > + header.txn_id = header.lsn; > > > + header.txn_replica_id = header.replica_id; > > > + header.txn_last = true; > > > > > > uint64_t sync = 100500; > > > struct iovec vec[1]; > > > is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); > > > > > > diff --git a/test/vinyl/errinj_stat.result b/test/vinyl/errinj_stat.result > > > index 08801dbc6..361ddf5db 100644 > > > --- a/test/vinyl/errinj_stat.result > > > +++ b/test/vinyl/errinj_stat.result > > > @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements > > > > > > - bytes_compressed: <bytes_compressed> > > > > > > pages: 3 > > > rows: 30 > > > > > > - bytes: 411 > > > + bytes: 477 > > > > > > ... > > > i:stat().disk.compaction.queue.bytes == > > > box.stat.vinyl().scheduler.compaction_queue --- > > > > > > @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements > > > > > > - bytes_compressed: <bytes_compressed> > > > > > > pages: 4 > > > rows: 40 > > > > > > - bytes: 548 > > > + bytes: 636 > > > > I wouldn't expect vinyl stats to be changed by this patch. > > Why did it happen? > Because if rows were written in an one entry, then wal creates transaction. But those are vinyl files (run, index). They shouldn't be affected by this, should they? ^ permalink raw reply [flat|nested] 16+ messages in thread
* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries 2019-01-29 11:00 ` Vladimir Davydov @ 2019-01-31 7:34 ` Георгий Кириченко 2019-01-31 8:19 ` Vladimir Davydov 0 siblings, 1 reply; 16+ messages in thread From: Георгий Кириченко @ 2019-01-31 7:34 UTC (permalink / raw) To: Vladimir Davydov; +Cc: tarantool-patches [-- Attachment #1: Type: text/plain, Size: 14635 bytes --] On Tuesday, January 29, 2019 2:00:10 PM MSK Vladimir Davydov wrote: > On Tue, Jan 29, 2019 at 01:09:50PM +0300, Георгий Кириченко wrote: > > On Monday, January 28, 2019 3:58:59 PM MSK Vladimir Davydov wrote: > > > On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote: > > > > Append txn_id, txn_replica_id and txn_last to xrow_header structure. > > > > txn_replica_id identifies replica where transaction was started and > > > > txn_id identifies transaction id on that replica. As transaction id > > > > a lsn of the first row in this transaction is used. > > > > txn_last set to true if it is the last row in a transaction, so we > > > > could commit transaction with last row or use additional NOP requests > > > > with txn_last = true ans valid txn_id and txn_replica_id. > > > > For replication all local changes moved to xrows array tail to form > > > > a separate transaction (like autonomous transaction) because it is not > > > > possible to replicate such transaction back to it's creator. > > > > > > > > As encoding/deconding rules assumed: > > > > 1. txn_replica_id is encoded only if it is not equal with replica > > > > > > > > id. This might have point because of replication trigger > > > > > > > > 2. txn_id and txn_last are encoded only for multi-row transaction. > > > > > > > > So if we do not have txn_id in a xstream then this means that it > > > > is a single row transaction. > > > > > > > > This rules provides compatibility with previous xlog handling. > > > > > > > > Needed for: 2798 > > > > --- > > > > > > > > src/box/iproto_constants.h | 3 ++ > > > > > > Looks like you forgot to update iproto_constants.c. > > > > > > > src/box/wal.c | 36 +++++++++++++++- > > > > src/box/xrow.c | 38 +++++++++++++++++ > > > > src/box/xrow.h | 5 ++- > > > > test/unit/xrow.cc | 3 ++ > > > > test/vinyl/errinj_stat.result | 8 ++-- > > > > test/vinyl/layout.result | 24 +++++------ > > > > test/vinyl/stat.result | 78 > > > > +++++++++++++++++------------------ > > > > 8 files changed, 138 insertions(+), 57 deletions(-) > > > > > > > > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h > > > > index 728514297..d01cdf840 100644 > > > > --- a/src/box/iproto_constants.h > > > > +++ b/src/box/iproto_constants.h > > > > @@ -60,6 +60,9 @@ enum iproto_key { > > > > > > > > IPROTO_SCHEMA_VERSION = 0x05, > > > > IPROTO_SERVER_VERSION = 0x06, > > > > IPROTO_GROUP_ID = 0x07, > > > > > > > > + IPROTO_TXN_ID = 0x08, > > > > + IPROTO_TXN_REPLICA_ID = 0x09, > > > > > > Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I would > > > expect TXN_ID to be enough - we could use REPLICA_ID to make sure that > > > transaction identifiers from different instances don't overlap, because > > > there couldn't be a "multi-instance" transaction, could there? txn_replica_id might be differ from replica_id in case when transaction would be finished on other node, e.g. after recovery. > > > > > > > + IPROTO_TXN_LAST = 0x0a, > > > > > > I think we should instead introduce BEGIN and COMMIT commands, because: > > I completely do not like any auto-commit logic in a xlog file. You > > suggestion breaks backward compatibility because previous logs do not > > have any BEGIN/ > It wouldn't break backward compatibility. It might break forward > compatibility, which is fine by me (we do it all the time). Suggested xrow encoding/decoding rules means that any xrow without txn_id, txn_replica_id, txn_last should be processed as a single statement transaction as it was before. If we would require explicit begin/commit then previous logs turns into an invalid stream without autocommit semantic. But I think, that txn_last should be renamed into txn_commit. Also explicit begin operation is redundant because a new one pair txn_id/ txn_replica_id already means begin of an transaction. > > > COMMIT. Also separate BEGIN and COMMIT messages increase transaction size. > > I doubt that after compression you'll see a difference. replication stream does not have any compression. > > > It is worth noting, that IPROTO_NOP with a new txn_id or txn_last emulates > > BEGIN or COMMIT. > > Yeah, but that would look weird. > > > > - We might need to attach some extra information to each transaction, > > > > > > e.g. mark transactions that were committed in parallel on the master > > > so that they can be committed in parallel on a replica. Attaching > > > such information to each row would be excessive. > > > > The patch is not about this. > > But we have to think about that in advance, don't we? We do not have clear view how it should be done. > > > > - We will need BEGIN and COMMIT for IPROTO transactions. It would be > > > > > > nice if we could share the code with them. > > > > The biggest issue we could not know transaction identifier in case of > > IPROTO. Iproto is single stream proto, but wal might be not as it is > > multiplexing a lot of transactions in a one output, so it might be bad be > > in paradigm of universally format for both IPROTO and WAL. > > OK. I think we need to discuss the options with Kostja. > > > > - Having separate BEGIN/COMMIT rows would make tarantoolctl-cat output > > > > > > easier to read. > > > > > > > /* Leave a gap for other keys in the header. */ > > > > IPROTO_SPACE_ID = 0x10, > > > > IPROTO_INDEX_ID = 0x11, > > > > > > > > diff --git a/src/box/wal.c b/src/box/wal.c > > > > index 4c3537672..17ead08e7 100644 > > > > --- a/src/box/wal.c > > > > +++ b/src/box/wal.c > > > > @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer > > > > *writer) > > > > > > > > } > > > > > > > > static void > > > > > > > > -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, > > > > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin, > > > > > > > > struct xrow_header **end) > > > > > > > > { > > > > > > > > + struct xrow_header **row = begin; > > > > > > > > /** Assign LSN to all local rows. */ > > > > for ( ; row < end; row++) { > > > > > > > > if ((*row)->replica_id == 0) { > > > > > > > > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct > > > > xrow_header **row,> > > > > > > > > vclock_follow_xrow(vclock, *row); > > > > > > > > } > > > > > > > > } > > > > > > > > + if ((*begin)->replica_id != instance_id) { > > > > + /* > > > > + * Move all local changes to the end of rows array and > > > > + * a fake local transaction (like an autonomous transaction) > > > > + * because we could not replicate the transaction back. > > > > + */ > > > > + struct xrow_header **row = end - 1; > > > > + while (row >= begin) { > > > > + if (row[0]->replica_id != instance_id) { > > > > + --row; > > > > + continue; > > > > + } > > > > + /* Local row, move it back. */ > > > > + struct xrow_header **local_row = row; > > > > + while (local_row < end - 1 && > > > > + local_row[1]->replica_id != instance_id) { > > > > + struct xrow_header *tmp = local_row[0]; > > > > + local_row[0] = local_row[1]; > > > > + local_row[1] = tmp; > > > > > > There's SWAP for this. > > > > > > > + } > > > > + --row; > > > > + } > > > > + while (begin < end && begin[0]->replica_id != instance_id) > > > > + ++begin; > > > > + } > > > > > > I don't understand why we need to move rows generated locally (by > > > an on_replace trigger I surmise) to the end of a transaction. We > > > have TXN_ID attached to each row so we could leave the transactions > > > interleaved, couldn't we? > > > > You are right, but in that case applier should track a lot of transaction > > simultaneously. Also it complicates recovery too. I hope it will be fixed > > while parallel applier implementing. > > May be, we should implement parallel applier in the scope of this issue > then? Anyway, without it, sync replication won't scale with the number > of parallel transactions. I do not think so. Parallel applier depends on that feature, but it is completely different issue. Also parallel applier is ignorant on a xlog format. > > > > > + /* Setup txn_id and tnx_replica_id for localy generated rows. */ > > > > + row = begin; > > > > + while (row < end) { > > > > + row[0]->txn_id = begin[0]->lsn; > > > > + row[0]->txn_replica_id = instance_id; > > > > + row[0]->txn_last = row == end - 1 ? 1 : 0; > > > > + ++row; > > > > + } > > > > > > I think we better use txn->id for TXN_ID rather than LSN. > > > Why do you think LSN should be used? I don't see any rationale > > > for that anywhere in the comments. Also, setting TXN_ID looks > > > like a job that should be done by txn_add_redo... > > > > > > > } > > > > > > > > static void > > > > > > > > diff --git a/src/box/xrow.c b/src/box/xrow.c > > > > index ef3f81add..db524b3c8 100644 > > > > --- a/src/box/xrow.c > > > > +++ b/src/box/xrow.c > > > > > > > > @@ -133,12 +133,32 @@ error: > > > > case IPROTO_SCHEMA_VERSION: > > > > header->schema_version = mp_decode_uint(pos); > > > > break; > > > > > > > > + case IPROTO_TXN_ID: > > > > + header->txn_id = mp_decode_uint(pos); > > > > + break; > > > > + case IPROTO_TXN_REPLICA_ID: > > > > + header->txn_replica_id = mp_decode_uint(pos); > > > > + break; > > > > + case IPROTO_TXN_LAST: > > > > + header->txn_last = mp_decode_uint(pos); > > > > > > Should be bool? > > > > > > > + break; > > > > > > > > default: > > > > /* unknown header */ > > > > mp_next(pos); > > > > > > > > } > > > > > > > > } > > > > assert(*pos <= end); > > > > > > > > + if (header->txn_id == 0) { > > > > + /* > > > > + * Transaction id is not set so it is a single statement > > > > + * transaction. > > > > + */ > > > > + header->txn_id = header->lsn; > > > > + header->txn_last = true; > > > > + } > > > > + if (header->txn_replica_id == 0) > > > > + header->txn_replica_id = header->replica_id; > > > > + > > > > > > > > /* Nop requests aren't supposed to have a body. */ > > > > if (*pos < end && header->type != IPROTO_NOP) { > > > > > > > > const char *body = *pos; > > > > > > > > @@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header > > > > *header, > > > > uint64_t sync,> > > > > > > > > d = mp_encode_double(d, header->tm); > > > > map_size++; > > > > > > > > } > > > > > > > > + if (header->txn_id != header->lsn || header->txn_last == 0) { > > > > + /* Encode txn id for multi row transaction members. */ > > > > + d = mp_encode_uint(d, IPROTO_TXN_ID); > > > > + d = mp_encode_uint(d, header->txn_id); > > > > + map_size++; > > > > + } > > > > + if (header->txn_replica_id != header->replica_id) { > > > > + d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID); > > > > + d = mp_encode_uint(d, header->txn_replica_id); > > > > + map_size++; > > > > + } > > > > + if (header->txn_last && !(header->txn_id == header->lsn && > > > > + header->txn_replica_id == header->replica_id)) > > > > { > > > > > > + /* Set last row for multi row transaction. */ > > > > + d = mp_encode_uint(d, IPROTO_TXN_LAST); > > > > + d = mp_encode_uint(d, header->txn_last); > > > > + map_size++; > > > > + } > > > > > > > > assert(d <= data + XROW_HEADER_LEN_MAX); > > > > mp_encode_map(data, map_size); > > > > out->iov_len = d - (char *) out->iov_base; > > > > > > > > diff --git a/src/box/xrow.h b/src/box/xrow.h > > > > index 6bab0a1fd..4acd84d56 100644 > > > > --- a/src/box/xrow.h > > > > +++ b/src/box/xrow.h > > > > @@ -47,7 +47,7 @@ enum { > > > > > > > > XROW_HEADER_IOVMAX = 1, > > > > XROW_BODY_IOVMAX = 2, > > > > XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, > > > > > > > > - XROW_HEADER_LEN_MAX = 40, > > > > + XROW_HEADER_LEN_MAX = 60, > > > > > > > > XROW_BODY_LEN_MAX = 128, > > > > IPROTO_HEADER_LEN = 28, > > > > /** 7 = sizeof(iproto_body_bin). */ > > > > > > > > @@ -69,6 +69,9 @@ struct xrow_header { > > > > > > > > uint64_t sync; > > > > int64_t lsn; /* LSN must be signed for correct comparison */ > > > > double tm; > > > > > > > > + int64_t txn_id; > > > > + uint32_t txn_replica_id; > > > > + uint32_t txn_last; > > > > > > > > int bodycnt; > > > > uint32_t schema_version; > > > > > > > > diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc > > > > index 165a543cf..0b796d728 100644 > > > > --- a/test/unit/xrow.cc > > > > +++ b/test/unit/xrow.cc > > > > @@ -215,6 +215,9 @@ test_xrow_header_encode_decode() > > > > > > > > header.lsn = 400; > > > > header.tm = 123.456; > > > > header.bodycnt = 0; > > > > > > > > + header.txn_id = header.lsn; > > > > + header.txn_replica_id = header.replica_id; > > > > + header.txn_last = true; > > > > > > > > uint64_t sync = 100500; > > > > struct iovec vec[1]; > > > > is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); > > > > > > > > diff --git a/test/vinyl/errinj_stat.result > > > > b/test/vinyl/errinj_stat.result > > > > index 08801dbc6..361ddf5db 100644 > > > > --- a/test/vinyl/errinj_stat.result > > > > +++ b/test/vinyl/errinj_stat.result > > > > @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements > > > > > > > > - bytes_compressed: <bytes_compressed> > > > > > > > > pages: 3 > > > > rows: 30 > > > > > > > > - bytes: 411 > > > > + bytes: 477 > > > > > > > > ... > > > > i:stat().disk.compaction.queue.bytes == > > > > box.stat.vinyl().scheduler.compaction_queue --- > > > > > > > > @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements > > > > > > > > - bytes_compressed: <bytes_compressed> > > > > > > > > pages: 4 > > > > rows: 40 > > > > > > > > - bytes: 548 > > > > + bytes: 636 > > > > > > I wouldn't expect vinyl stats to be changed by this patch. > > > Why did it happen? > > > > Because if rows were written in an one entry, then wal creates > > transaction. > > But those are vinyl files (run, index). They shouldn't be affected by > this, should they? vinyl uses xlog_write_entry that forms a transaction. I do not think that introducing a new version of xlog_write_entry with/without transaction is a good idea. [-- Attachment #2: This is a digitally signed message part. --] [-- Type: application/pgp-signature, Size: 484 bytes --] ^ permalink raw reply [flat|nested] 16+ messages in thread
* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries 2019-01-31 7:34 ` Георгий Кириченко @ 2019-01-31 8:19 ` Vladimir Davydov 2019-01-31 14:25 ` Георгий Кириченко 0 siblings, 1 reply; 16+ messages in thread From: Vladimir Davydov @ 2019-01-31 8:19 UTC (permalink / raw) To: Георгий Кириченко Cc: tarantool-patches On Thu, Jan 31, 2019 at 10:34:43AM +0300, Георгий Кириченко wrote: > On Tuesday, January 29, 2019 2:00:10 PM MSK Vladimir Davydov wrote: > > On Tue, Jan 29, 2019 at 01:09:50PM +0300, Георгий Кириченко wrote: > > > On Monday, January 28, 2019 3:58:59 PM MSK Vladimir Davydov wrote: > > > > On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote: > > > > > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h > > > > > index 728514297..d01cdf840 100644 > > > > > --- a/src/box/iproto_constants.h > > > > > +++ b/src/box/iproto_constants.h > > > > > @@ -60,6 +60,9 @@ enum iproto_key { > > > > > > > > > > IPROTO_SCHEMA_VERSION = 0x05, > > > > > IPROTO_SERVER_VERSION = 0x06, > > > > > IPROTO_GROUP_ID = 0x07, > > > > > > > > > > + IPROTO_TXN_ID = 0x08, > > > > > + IPROTO_TXN_REPLICA_ID = 0x09, > > > > > > > > Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I would > > > > expect TXN_ID to be enough - we could use REPLICA_ID to make sure that > > > > transaction identifiers from different instances don't overlap, because > > > > there couldn't be a "multi-instance" transaction, could there? > txn_replica_id might be differ from replica_id in case when transaction would > be finished on other node, e.g. after recovery. I'm kinda out of scope here. I think you need to write an RFC or something explaining all the design decisions you made when you introduced transaction boundaries. Or we should get together with Kostja and discuss it. Or both. Neither the commit message nor comments shed the light on the big picture. > > > > > > > > > + IPROTO_TXN_LAST = 0x0a, > > > > > > > > I think we should instead introduce BEGIN and COMMIT commands, because: > > > I completely do not like any auto-commit logic in a xlog file. You > > > suggestion breaks backward compatibility because previous logs do not > > > have any BEGIN/ > > It wouldn't break backward compatibility. It might break forward > > compatibility, which is fine by me (we do it all the time). > Suggested xrow encoding/decoding rules means that any xrow without txn_id, > txn_replica_id, txn_last should be processed as a single statement transaction > as it was before. > If we would require explicit begin/commit then previous logs turns into an > invalid stream without autocommit semantic. Yeah, we would have to add autocommit. > But I think, that txn_last should be renamed into txn_commit. > Also explicit begin operation is redundant because a new one pair txn_id/ > txn_replica_id already means begin of an transaction. > > > > > COMMIT. Also separate BEGIN and COMMIT messages increase transaction size. > > > > I doubt that after compression you'll see a difference. > replication stream does not have any compression. OK. > > > > > It is worth noting, that IPROTO_NOP with a new txn_id or txn_last emulates > > > BEGIN or COMMIT. > > > > Yeah, but that would look weird. > > > > > > - We might need to attach some extra information to each transaction, > > > > > > > > e.g. mark transactions that were committed in parallel on the master > > > > so that they can be committed in parallel on a replica. Attaching > > > > such information to each row would be excessive. > > > > > > The patch is not about this. > > > > But we have to think about that in advance, don't we? > We do not have clear view how it should be done. Which means we should think it through IMO. The two issues are closely related. > > > > > > - We will need BEGIN and COMMIT for IPROTO transactions. It would be > > > > > > > > nice if we could share the code with them. > > > > > > The biggest issue we could not know transaction identifier in case of > > > IPROTO. Iproto is single stream proto, but wal might be not as it is > > > multiplexing a lot of transactions in a one output, so it might be bad be > > > in paradigm of universally format for both IPROTO and WAL. > > > > OK. I think we need to discuss the options with Kostja. > > > > > > - Having separate BEGIN/COMMIT rows would make tarantoolctl-cat output > > > > > > > > easier to read. > > > > > > > > > /* Leave a gap for other keys in the header. */ > > > > > IPROTO_SPACE_ID = 0x10, > > > > > IPROTO_INDEX_ID = 0x11, > > > > > > > > > > diff --git a/src/box/wal.c b/src/box/wal.c > > > > > index 4c3537672..17ead08e7 100644 > > > > > --- a/src/box/wal.c > > > > > +++ b/src/box/wal.c > > > > > @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer > > > > > *writer) > > > > > > > > > > } > > > > > > > > > > static void > > > > > > > > > > -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, > > > > > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin, > > > > > > > > > > struct xrow_header **end) > > > > > > > > > > { > > > > > > > > > > + struct xrow_header **row = begin; > > > > > > > > > > /** Assign LSN to all local rows. */ > > > > > for ( ; row < end; row++) { > > > > > > > > > > if ((*row)->replica_id == 0) { > > > > > > > > > > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct > > > > > xrow_header **row,> > > > > > > > > > > vclock_follow_xrow(vclock, *row); > > > > > > > > > > } > > > > > > > > > > } > > > > > > > > > > + if ((*begin)->replica_id != instance_id) { > > > > > + /* > > > > > + * Move all local changes to the end of rows array and > > > > > + * a fake local transaction (like an autonomous transaction) > > > > > + * because we could not replicate the transaction back. > > > > > + */ > > > > > + struct xrow_header **row = end - 1; > > > > > + while (row >= begin) { > > > > > + if (row[0]->replica_id != instance_id) { > > > > > + --row; > > > > > + continue; > > > > > + } > > > > > + /* Local row, move it back. */ > > > > > + struct xrow_header **local_row = row; > > > > > + while (local_row < end - 1 && > > > > > + local_row[1]->replica_id != instance_id) { > > > > > + struct xrow_header *tmp = local_row[0]; > > > > > + local_row[0] = local_row[1]; > > > > > + local_row[1] = tmp; > > > > > > > > There's SWAP for this. > > > > > > > > > + } > > > > > + --row; > > > > > + } > > > > > + while (begin < end && begin[0]->replica_id != instance_id) > > > > > + ++begin; > > > > > + } > > > > > > > > I don't understand why we need to move rows generated locally (by > > > > an on_replace trigger I surmise) to the end of a transaction. We > > > > have TXN_ID attached to each row so we could leave the transactions > > > > interleaved, couldn't we? > > > > > > You are right, but in that case applier should track a lot of transaction > > > simultaneously. Also it complicates recovery too. I hope it will be fixed > > > while parallel applier implementing. > > > > May be, we should implement parallel applier in the scope of this issue > > then? Anyway, without it, sync replication won't scale with the number > > of parallel transactions. > I do not think so. Parallel applier depends on that feature, but it is > completely different issue. But as I said, sync replication won't scale without it. > Also parallel applier is ignorant on a xlog format. Depends on how you implement it. For example, MySQL marks all transactions that were committed in parallel on the master so that they can be committed in parallel on replicas. > > > > > diff --git a/test/vinyl/errinj_stat.result > > > > > b/test/vinyl/errinj_stat.result > > > > > index 08801dbc6..361ddf5db 100644 > > > > > --- a/test/vinyl/errinj_stat.result > > > > > +++ b/test/vinyl/errinj_stat.result > > > > > @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements > > > > > > > > > > - bytes_compressed: <bytes_compressed> > > > > > > > > > > pages: 3 > > > > > rows: 30 > > > > > > > > > > - bytes: 411 > > > > > + bytes: 477 > > > > > > > > > > ... > > > > > i:stat().disk.compaction.queue.bytes == > > > > > box.stat.vinyl().scheduler.compaction_queue --- > > > > > > > > > > @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements > > > > > > > > > > - bytes_compressed: <bytes_compressed> > > > > > > > > > > pages: 4 > > > > > rows: 40 > > > > > > > > > > - bytes: 548 > > > > > + bytes: 636 > > > > > > > > I wouldn't expect vinyl stats to be changed by this patch. > > > > Why did it happen? > > > > > > Because if rows were written in an one entry, then wal creates > > > transaction. > > > > But those are vinyl files (run, index). They shouldn't be affected by > > this, should they? > vinyl uses xlog_write_entry that forms a transaction. No, not for run/index files, it does not. Take a look at vy_run_dump_stmt. There's no point in writing any extra transaction information to run/index files. > I do not think that introducing a new version of xlog_write_entry > with/without transaction is a good idea. ^ permalink raw reply [flat|nested] 16+ messages in thread
* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries 2019-01-31 8:19 ` Vladimir Davydov @ 2019-01-31 14:25 ` Георгий Кириченко 2019-01-31 14:54 ` Vladimir Davydov 0 siblings, 1 reply; 16+ messages in thread From: Георгий Кириченко @ 2019-01-31 14:25 UTC (permalink / raw) To: Vladimir Davydov; +Cc: tarantool-patches [-- Attachment #1: Type: text/plain, Size: 11127 bytes --] On Thursday, January 31, 2019 11:19:51 AM MSK Vladimir Davydov wrote: > On Thu, Jan 31, 2019 at 10:34:43AM +0300, Георгий Кириченко wrote: > > On Tuesday, January 29, 2019 2:00:10 PM MSK Vladimir Davydov wrote: > > > On Tue, Jan 29, 2019 at 01:09:50PM +0300, Георгий Кириченко wrote: > > > > On Monday, January 28, 2019 3:58:59 PM MSK Vladimir Davydov wrote: > > > > > On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote: > > > > > > diff --git a/src/box/iproto_constants.h > > > > > > b/src/box/iproto_constants.h > > > > > > index 728514297..d01cdf840 100644 > > > > > > --- a/src/box/iproto_constants.h > > > > > > +++ b/src/box/iproto_constants.h > > > > > > @@ -60,6 +60,9 @@ enum iproto_key { > > > > > > > > > > > > IPROTO_SCHEMA_VERSION = 0x05, > > > > > > IPROTO_SERVER_VERSION = 0x06, > > > > > > IPROTO_GROUP_ID = 0x07, > > > > > > > > > > > > + IPROTO_TXN_ID = 0x08, > > > > > > + IPROTO_TXN_REPLICA_ID = 0x09, > > > > > > > > > > Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I > > > > > would > > > > > expect TXN_ID to be enough - we could use REPLICA_ID to make sure > > > > > that > > > > > transaction identifiers from different instances don't overlap, > > > > > because > > > > > there couldn't be a "multi-instance" transaction, could there? > > > > txn_replica_id might be differ from replica_id in case when transaction > > would be finished on other node, e.g. after recovery. > > I'm kinda out of scope here. I think you need to write an RFC or > something explaining all the design decisions you made when you > introduced transaction boundaries. Or we should get together with > Kostja and discuss it. Or both. Neither the commit message nor > comments shed the light on the big picture. > > > > > > > + IPROTO_TXN_LAST = 0x0a, > > > > > > > > > > I think we should instead introduce BEGIN and COMMIT commands, because: > > > > I completely do not like any auto-commit logic in a xlog file. You > > > > suggestion breaks backward compatibility because previous logs do not > > > > have any BEGIN/ > > > > > > It wouldn't break backward compatibility. It might break forward > > > compatibility, which is fine by me (we do it all the time). > > > > Suggested xrow encoding/decoding rules means that any xrow without txn_id, > > txn_replica_id, txn_last should be processed as a single statement > > transaction as it was before. > > If we would require explicit begin/commit then previous logs turns into an > > invalid stream without autocommit semantic. > > Yeah, we would have to add autocommit. I am disagreed with autocommit. There are my key points: 1. We could not interleave autocommit transactions with others, or we would have to have special mark for this transaction but this is no autocommit after all. 2. Applier would not able to check transaction boundaries. 3. There is no difference between an autocommit transaction and a single-row transaction. 4. Autocommit is about interface behavior but not about log format. Also let me explain xrow encoding rules: If we have only one row in transaction, then we do not encode txn_id and txn_last. So single-row transaction (or autocommit if you wish) does not change its' representation. And then while decoding we set txn_id = lsn and txn_last if txn_id was not set. In that case each row from previous xlog acts as one single-row transaction. For multi-row transaction we have only penalty in transaction identifiers for each row and one commit flag for the last. > > > But I think, that txn_last should be renamed into txn_commit. > > Also explicit begin operation is redundant because a new one pair txn_id/ > > txn_replica_id already means begin of an transaction. > > > > > > COMMIT. Also separate BEGIN and COMMIT messages increase transaction > > > > size. > > > > > > I doubt that after compression you'll see a difference. > > > > replication stream does not have any compression. > > OK. > > > > > It is worth noting, that IPROTO_NOP with a new txn_id or txn_last > > > > emulates > > > > BEGIN or COMMIT. > > > > > > Yeah, but that would look weird. > > > > > > > > - We might need to attach some extra information to each > > > > > transaction, > > > > > > > > > > e.g. mark transactions that were committed in parallel on the > > > > > master > > > > > so that they can be committed in parallel on a replica. Attaching > > > > > such information to each row would be excessive. > > > > > > > > The patch is not about this. > > > > > > But we have to think about that in advance, don't we? > > > > We do not have clear view how it should be done. > > Which means we should think it through IMO. The two issues are closely > related. The main issue - proposed decision (track which transaction were batched) is not applicable, because vinyl might yield in a completely unpredictable manner. Also I have full-parallel solution but it does not require any information except transaction boundaries. > > > > > > - We will need BEGIN and COMMIT for IPROTO transactions. It would > > > > > be > > > > > > > > > > nice if we could share the code with them. > > > > > > > > The biggest issue we could not know transaction identifier in case of > > > > IPROTO. Iproto is single stream proto, but wal might be not as it is > > > > multiplexing a lot of transactions in a one output, so it might be bad > > > > be > > > > in paradigm of universally format for both IPROTO and WAL. > > > > > > OK. I think we need to discuss the options with Kostja. > > > > > > > > - Having separate BEGIN/COMMIT rows would make tarantoolctl-cat > > > > > output > > > > > > > > > > easier to read. > > > > > > > > > > > /* Leave a gap for other keys in the header. */ > > > > > > IPROTO_SPACE_ID = 0x10, > > > > > > IPROTO_INDEX_ID = 0x11, > > > > > > > > > > > > diff --git a/src/box/wal.c b/src/box/wal.c > > > > > > index 4c3537672..17ead08e7 100644 > > > > > > --- a/src/box/wal.c > > > > > > +++ b/src/box/wal.c > > > > > > @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer > > > > > > *writer) > > > > > > > > > > > > } > > > > > > > > > > > > static void > > > > > > > > > > > > -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, > > > > > > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin, > > > > > > > > > > > > struct xrow_header **end) > > > > > > > > > > > > { > > > > > > > > > > > > + struct xrow_header **row = begin; > > > > > > > > > > > > /** Assign LSN to all local rows. */ > > > > > > for ( ; row < end; row++) { > > > > > > > > > > > > if ((*row)->replica_id == 0) { > > > > > > > > > > > > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct > > > > > > xrow_header **row,> > > > > > > > > > > > > vclock_follow_xrow(vclock, *row); > > > > > > > > > > > > } > > > > > > > > > > > > } > > > > > > > > > > > > + if ((*begin)->replica_id != instance_id) { > > > > > > + /* > > > > > > + * Move all local changes to the end of rows array and > > > > > > + * a fake local transaction (like an autonomous transaction) > > > > > > + * because we could not replicate the transaction back. > > > > > > + */ > > > > > > + struct xrow_header **row = end - 1; > > > > > > + while (row >= begin) { > > > > > > + if (row[0]->replica_id != instance_id) { > > > > > > + --row; > > > > > > + continue; > > > > > > + } > > > > > > + /* Local row, move it back. */ > > > > > > + struct xrow_header **local_row = row; > > > > > > + while (local_row < end - 1 && > > > > > > + local_row[1]->replica_id != instance_id) { > > > > > > + struct xrow_header *tmp = local_row[0]; > > > > > > + local_row[0] = local_row[1]; > > > > > > + local_row[1] = tmp; > > > > > > > > > > There's SWAP for this. > > > > > > > > > > > + } > > > > > > + --row; > > > > > > + } > > > > > > + while (begin < end && begin[0]->replica_id != instance_id) > > > > > > + ++begin; > > > > > > + } > > > > > > > > > > I don't understand why we need to move rows generated locally (by > > > > > an on_replace trigger I surmise) to the end of a transaction. We > > > > > have TXN_ID attached to each row so we could leave the transactions > > > > > interleaved, couldn't we? > > > > > > > > You are right, but in that case applier should track a lot of > > > > transaction > > > > simultaneously. Also it complicates recovery too. I hope it will be > > > > fixed > > > > while parallel applier implementing. > > > > > > May be, we should implement parallel applier in the scope of this issue > > > then? Anyway, without it, sync replication won't scale with the number > > > of parallel transactions. > > > > I do not think so. Parallel applier depends on that feature, but it is > > completely different issue. > > But as I said, sync replication won't scale without it. > > > Also parallel applier is ignorant on a xlog format. > > Depends on how you implement it. For example, MySQL marks all > transactions that were committed in parallel on the master so that > they can be committed in parallel on replicas. In case of vinyl it is not true. > > > > > > > diff --git a/test/vinyl/errinj_stat.result > > > > > > b/test/vinyl/errinj_stat.result > > > > > > index 08801dbc6..361ddf5db 100644 > > > > > > --- a/test/vinyl/errinj_stat.result > > > > > > +++ b/test/vinyl/errinj_stat.result > > > > > > @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements > > > > > > > > > > > > - bytes_compressed: <bytes_compressed> > > > > > > > > > > > > pages: 3 > > > > > > rows: 30 > > > > > > > > > > > > - bytes: 411 > > > > > > + bytes: 477 > > > > > > > > > > > > ... > > > > > > i:stat().disk.compaction.queue.bytes == > > > > > > box.stat.vinyl().scheduler.compaction_queue --- > > > > > > > > > > > > @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements > > > > > > > > > > > > - bytes_compressed: <bytes_compressed> > > > > > > > > > > > > pages: 4 > > > > > > rows: 40 > > > > > > > > > > > > - bytes: 548 > > > > > > + bytes: 636 > > > > > > > > > > I wouldn't expect vinyl stats to be changed by this patch. > > > > > Why did it happen? > > > > > > > > Because if rows were written in an one entry, then wal creates > > > > transaction. > > > > > > But those are vinyl files (run, index). They shouldn't be affected by > > > this, should they? > > > > vinyl uses xlog_write_entry that forms a transaction. > > No, not for run/index files, it does not. Take a look at > vy_run_dump_stmt. There's no point in writing any extra > transaction information to run/index files. Thanks, will check it > > > I do not think that introducing a new version of xlog_write_entry > > with/without transaction is a good idea. [-- Attachment #2: This is a digitally signed message part. --] [-- Type: application/pgp-signature, Size: 488 bytes --] ^ permalink raw reply [flat|nested] 16+ messages in thread
* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries 2019-01-31 14:25 ` Георгий Кириченко @ 2019-01-31 14:54 ` Vladimir Davydov 2019-02-01 9:31 ` Георгий Кириченко 0 siblings, 1 reply; 16+ messages in thread From: Vladimir Davydov @ 2019-01-31 14:54 UTC (permalink / raw) To: Георгий Кириченко Cc: tarantool-patches On Thu, Jan 31, 2019 at 05:25:24PM +0300, Георгий Кириченко wrote: > On Thursday, January 31, 2019 11:19:51 AM MSK Vladimir Davydov wrote: > > On Thu, Jan 31, 2019 at 10:34:43AM +0300, Георгий Кириченко wrote: > > > On Tuesday, January 29, 2019 2:00:10 PM MSK Vladimir Davydov wrote: > > > > On Tue, Jan 29, 2019 at 01:09:50PM +0300, Георгий Кириченко wrote: > > > > > On Monday, January 28, 2019 3:58:59 PM MSK Vladimir Davydov wrote: > > > > > > On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote: > > > > > > > diff --git a/src/box/iproto_constants.h > > > > > > > b/src/box/iproto_constants.h > > > > > > > index 728514297..d01cdf840 100644 > > > > > > > --- a/src/box/iproto_constants.h > > > > > > > +++ b/src/box/iproto_constants.h > > > > > > > @@ -60,6 +60,9 @@ enum iproto_key { > > > > > > > > > > > > > > IPROTO_SCHEMA_VERSION = 0x05, > > > > > > > IPROTO_SERVER_VERSION = 0x06, > > > > > > > IPROTO_GROUP_ID = 0x07, > > > > > > > > > > > > > > + IPROTO_TXN_ID = 0x08, > > > > > > > + IPROTO_TXN_REPLICA_ID = 0x09, > > > > > > > > > > > > Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I > > > > > > would > > > > > > expect TXN_ID to be enough - we could use REPLICA_ID to make sure > > > > > > that > > > > > > transaction identifiers from different instances don't overlap, > > > > > > because > > > > > > there couldn't be a "multi-instance" transaction, could there? > > > > > > txn_replica_id might be differ from replica_id in case when transaction > > > would be finished on other node, e.g. after recovery. > > > > I'm kinda out of scope here. I think you need to write an RFC or > > something explaining all the design decisions you made when you > > introduced transaction boundaries. Or we should get together with > > Kostja and discuss it. Or both. Neither the commit message nor > > comments shed the light on the big picture. > > > > > > > > > + IPROTO_TXN_LAST = 0x0a, > > > > > > > > > > > > I think we should instead introduce BEGIN and COMMIT commands, because: > > > > > I completely do not like any auto-commit logic in a xlog file. You > > > > > suggestion breaks backward compatibility because previous logs do not > > > > > have any BEGIN/ > > > > > > > > It wouldn't break backward compatibility. It might break forward > > > > compatibility, which is fine by me (we do it all the time). > > > > > > Suggested xrow encoding/decoding rules means that any xrow without txn_id, > > > txn_replica_id, txn_last should be processed as a single statement > > > transaction as it was before. > > > If we would require explicit begin/commit then previous logs turns into an > > > invalid stream without autocommit semantic. > > > > Yeah, we would have to add autocommit. > I am disagreed with autocommit. There are my key points: > 1. We could not interleave autocommit transactions with others, or we would > have to have special mark for this transaction but this is no autocommit after > all. Do we need to support interleaving transactions in xlog? I'm not sure. May be, for sync replication we do. Anyway, we could add BEGIN/COMMIT *in addition* to TXN_ID stored in each row. They would serve as a header and footer. This would give us a place holder to store transaction-wide information if we ever need it. Besides, it would make the stream more robust: if we loose BEGIN, we will detect it while if we loose the first transaction row in your case we won't. Yeah, explicit BEGIN/COMMIT will add a few bytes to each transaction, but IMO it's not critical. Let's discuss it f2f when we can, because we seem to repeat ourselves without moving forward. > 2. Applier would not able to check transaction boundaries. > 3. There is no difference between an autocommit transaction and a single-row > transaction. > 4. Autocommit is about interface behavior but not about log format. > > Also let me explain xrow encoding rules: > If we have only one row in transaction, then we do not encode txn_id and > txn_last. So single-row transaction (or autocommit if you wish) does not > change its' representation. And then while decoding we set txn_id = lsn and > txn_last if txn_id was not set. In that case each row from previous xlog acts > as one single-row transaction. > For multi-row transaction we have only penalty in transaction identifiers for > each row and one commit flag for the last. > > > > > > > > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct > > > > > > > xrow_header **row,> > > > > > > > > > > > > > > vclock_follow_xrow(vclock, *row); > > > > > > > > > > > > > > } > > > > > > > > > > > > > > } > > > > > > > > > > > > > > + if ((*begin)->replica_id != instance_id) { > > > > > > > + /* > > > > > > > + * Move all local changes to the end of rows array and > > > > > > > + * a fake local transaction (like an autonomous transaction) > > > > > > > + * because we could not replicate the transaction back. > > > > > > > + */ > > > > > > > + struct xrow_header **row = end - 1; > > > > > > > + while (row >= begin) { > > > > > > > + if (row[0]->replica_id != instance_id) { > > > > > > > + --row; > > > > > > > + continue; > > > > > > > + } > > > > > > > + /* Local row, move it back. */ > > > > > > > + struct xrow_header **local_row = row; > > > > > > > + while (local_row < end - 1 && > > > > > > > + local_row[1]->replica_id != instance_id) { > > > > > > > + struct xrow_header *tmp = local_row[0]; > > > > > > > + local_row[0] = local_row[1]; > > > > > > > + local_row[1] = tmp; > > > > > > > > > > > > There's SWAP for this. > > > > > > > > > > > > > + } > > > > > > > + --row; > > > > > > > + } > > > > > > > + while (begin < end && begin[0]->replica_id != instance_id) > > > > > > > + ++begin; > > > > > > > + } > > > > > > > > > > > > I don't understand why we need to move rows generated locally (by > > > > > > an on_replace trigger I surmise) to the end of a transaction. We > > > > > > have TXN_ID attached to each row so we could leave the transactions > > > > > > interleaved, couldn't we? > > > > > > > > > > You are right, but in that case applier should track a lot of > > > > > transaction > > > > > simultaneously. Also it complicates recovery too. I hope it will be > > > > > fixed > > > > > while parallel applier implementing. > > > > > > > > May be, we should implement parallel applier in the scope of this issue > > > > then? Anyway, without it, sync replication won't scale with the number > > > > of parallel transactions. > > > > > > I do not think so. Parallel applier depends on that feature, but it is > > > completely different issue. > > > > But as I said, sync replication won't scale without it. > > > > > Also parallel applier is ignorant on a xlog format. > > > > Depends on how you implement it. For example, MySQL marks all > > transactions that were committed in parallel on the master so that > > they can be committed in parallel on replicas. > In case of vinyl it is not true. In case of vinyl we can mark transactions that overlapped in time on the master. In fact, that's what MySQL actually does. I guess I misleaded you by saying "committed in parallel". I meant "executed in parallel". ^ permalink raw reply [flat|nested] 16+ messages in thread
* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries 2019-01-31 14:54 ` Vladimir Davydov @ 2019-02-01 9:31 ` Георгий Кириченко 0 siblings, 0 replies; 16+ messages in thread From: Георгий Кириченко @ 2019-02-01 9:31 UTC (permalink / raw) To: Vladimir Davydov; +Cc: tarantool-patches [-- Attachment #1: Type: text/plain, Size: 8757 bytes --] On Thursday, January 31, 2019 5:54:54 PM MSK Vladimir Davydov wrote: > On Thu, Jan 31, 2019 at 05:25:24PM +0300, Георгий Кириченко wrote: > > On Thursday, January 31, 2019 11:19:51 AM MSK Vladimir Davydov wrote: > > > On Thu, Jan 31, 2019 at 10:34:43AM +0300, Георгий Кириченко wrote: > > > > On Tuesday, January 29, 2019 2:00:10 PM MSK Vladimir Davydov wrote: > > > > > On Tue, Jan 29, 2019 at 01:09:50PM +0300, Георгий Кириченко wrote: > > > > > > On Monday, January 28, 2019 3:58:59 PM MSK Vladimir Davydov wrote: > > > > > > > On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote: > > > > > > > > diff --git a/src/box/iproto_constants.h > > > > > > > > b/src/box/iproto_constants.h > > > > > > > > index 728514297..d01cdf840 100644 > > > > > > > > --- a/src/box/iproto_constants.h > > > > > > > > +++ b/src/box/iproto_constants.h > > > > > > > > @@ -60,6 +60,9 @@ enum iproto_key { > > > > > > > > > > > > > > > > IPROTO_SCHEMA_VERSION = 0x05, > > > > > > > > IPROTO_SERVER_VERSION = 0x06, > > > > > > > > IPROTO_GROUP_ID = 0x07, > > > > > > > > > > > > > > > > + IPROTO_TXN_ID = 0x08, > > > > > > > > + IPROTO_TXN_REPLICA_ID = 0x09, > > > > > > > > > > > > > > Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I > > > > > > > would > > > > > > > expect TXN_ID to be enough - we could use REPLICA_ID to make > > > > > > > sure > > > > > > > that > > > > > > > transaction identifiers from different instances don't overlap, > > > > > > > because > > > > > > > there couldn't be a "multi-instance" transaction, could there? > > > > > > > > txn_replica_id might be differ from replica_id in case when > > > > transaction > > > > would be finished on other node, e.g. after recovery. > > > > > > I'm kinda out of scope here. I think you need to write an RFC or > > > something explaining all the design decisions you made when you > > > introduced transaction boundaries. Or we should get together with > > > Kostja and discuss it. Or both. Neither the commit message nor > > > comments shed the light on the big picture. > > > > > > > > > > > + IPROTO_TXN_LAST = 0x0a, > > > > > > > > > > > > > > I think we should instead introduce BEGIN and COMMIT commands, because: > > > > > > I completely do not like any auto-commit logic in a xlog file. You > > > > > > suggestion breaks backward compatibility because previous logs do > > > > > > not > > > > > > have any BEGIN/ > > > > > > > > > > It wouldn't break backward compatibility. It might break forward > > > > > compatibility, which is fine by me (we do it all the time). > > > > > > > > Suggested xrow encoding/decoding rules means that any xrow without > > > > txn_id, > > > > txn_replica_id, txn_last should be processed as a single statement > > > > transaction as it was before. > > > > If we would require explicit begin/commit then previous logs turns > > > > into an > > > > invalid stream without autocommit semantic. > > > > > > Yeah, we would have to add autocommit. > > > > I am disagreed with autocommit. There are my key points: > > 1. We could not interleave autocommit transactions with others, or we > > would > > have to have special mark for this transaction but this is no autocommit > > after all. > > Do we need to support interleaving transactions in xlog? I'm not sure. > May be, for sync replication we do. > > Anyway, we could add BEGIN/COMMIT *in addition* to TXN_ID stored in each > row. They would serve as a header and footer. This would give us a place > holder to store transaction-wide information if we ever need it. > Besides, it would make the stream more robust: if we loose BEGIN, we > will detect it while if we loose the first transaction row in your case > we won't. Yeah, explicit BEGIN/COMMIT will add a few bytes to each > transaction, but IMO it's not critical. > > Let's discuss it f2f when we can, because we seem to repeat ourselves > without moving forward. > > > 2. Applier would not able to check transaction boundaries. > > 3. There is no difference between an autocommit transaction and a > > single-row transaction. > > 4. Autocommit is about interface behavior but not about log format. > > > > Also let me explain xrow encoding rules: > > If we have only one row in transaction, then we do not encode txn_id and > > txn_last. So single-row transaction (or autocommit if you wish) does not > > change its' representation. And then while decoding we set txn_id = lsn > > and > > txn_last if txn_id was not set. In that case each row from previous xlog > > acts as one single-row transaction. > > For multi-row transaction we have only penalty in transaction identifiers > > for each row and one commit flag for the last. > > > > > > > > > > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, > > > > > > > > struct > > > > > > > > xrow_header **row,> > > > > > > > > > > > > > > > > vclock_follow_xrow(vclock, *row); > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > + if ((*begin)->replica_id != instance_id) { > > > > > > > > + /* > > > > > > > > + * Move all local changes to the end of rows array and > > > > > > > > + * a fake local transaction (like an autonomous > > > > > > > > transaction) > > > > > > > > + * because we could not replicate the transaction back. > > > > > > > > + */ > > > > > > > > + struct xrow_header **row = end - 1; > > > > > > > > + while (row >= begin) { > > > > > > > > + if (row[0]->replica_id != instance_id) { > > > > > > > > + --row; > > > > > > > > + continue; > > > > > > > > + } > > > > > > > > + /* Local row, move it back. */ > > > > > > > > + struct xrow_header **local_row = row; > > > > > > > > + while (local_row < end - 1 && > > > > > > > > + local_row[1]->replica_id != instance_id) { > > > > > > > > + struct xrow_header *tmp = local_row[0]; > > > > > > > > + local_row[0] = local_row[1]; > > > > > > > > + local_row[1] = tmp; > > > > > > > > > > > > > > There's SWAP for this. > > > > > > > > > > > > > > > + } > > > > > > > > + --row; > > > > > > > > + } > > > > > > > > + while (begin < end && begin[0]->replica_id != instance_id) > > > > > > > > + ++begin; > > > > > > > > + } > > > > > > > > > > > > > > I don't understand why we need to move rows generated locally > > > > > > > (by > > > > > > > an on_replace trigger I surmise) to the end of a transaction. We > > > > > > > have TXN_ID attached to each row so we could leave the > > > > > > > transactions > > > > > > > interleaved, couldn't we? > > > > > > > > > > > > You are right, but in that case applier should track a lot of > > > > > > transaction > > > > > > simultaneously. Also it complicates recovery too. I hope it will > > > > > > be > > > > > > fixed > > > > > > while parallel applier implementing. > > > > > > > > > > May be, we should implement parallel applier in the scope of this > > > > > issue > > > > > then? Anyway, without it, sync replication won't scale with the > > > > > number > > > > > of parallel transactions. > > > > > > > > I do not think so. Parallel applier depends on that feature, but it is > > > > completely different issue. > > > > > > But as I said, sync replication won't scale without it. > > > > > > > Also parallel applier is ignorant on a xlog format. > > > > > > Depends on how you implement it. For example, MySQL marks all > > > transactions that were committed in parallel on the master so that > > > they can be committed in parallel on replicas. > > > > In case of vinyl it is not true. > > In case of vinyl we can mark transactions that overlapped in time on the > master. In fact, that's what MySQL actually does. I guess I misleaded > you by saying "committed in parallel". I meant "executed in parallel". The only thing we need is to send all transaction to commit in right order and restart a transaction in case of conflict. For example master did transactions t1, t2, t3. Replica starts t1, t2 and t3 in received order. But if t3 finished earlier that t2 then we should hold t3 commit until t2 sent to wal. After t3 resume there are two possibilities: t3 is conflicting or not. In the second case we will send it to wal, in the case of conflict - just restart - we have all info to do that. And we do not have to have any information about transaction batches on master - because all job will be done by local transaction manager. In comparison with MySQL we also could process transaction in parallel event they ware done in a sequence. [-- Attachment #2: This is a digitally signed message part. --] [-- Type: application/pgp-signature, Size: 488 bytes --] ^ permalink raw reply [flat|nested] 16+ messages in thread
* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries 2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries Georgy Kirichenko 2019-01-28 12:58 ` Vladimir Davydov @ 2019-01-28 13:00 ` Vladimir Davydov 2019-01-28 13:08 ` [tarantool-patches] " Vladislav Shpilevoy 2019-02-08 16:56 ` Konstantin Osipov 2 siblings, 1 reply; 16+ messages in thread From: Vladimir Davydov @ 2019-01-28 13:00 UTC (permalink / raw) To: Georgy Kirichenko; +Cc: tarantool-patches On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote: > Append txn_id, txn_replica_id and txn_last to xrow_header structure. > txn_replica_id identifies replica where transaction was started and > txn_id identifies transaction id on that replica. As transaction id > a lsn of the first row in this transaction is used. > txn_last set to true if it is the last row in a transaction, so we > could commit transaction with last row or use additional NOP requests > with txn_last = true ans valid txn_id and txn_replica_id. > For replication all local changes moved to xrows array tail to form > a separate transaction (like autonomous transaction) because it is not > possible to replicate such transaction back to it's creator. > > As encoding/deconding rules assumed: > 1. txn_replica_id is encoded only if it is not equal with replica > id. This might have point because of replication trigger > 2. txn_id and txn_last are encoded only for multi-row transaction. > So if we do not have txn_id in a xstream then this means that it > is a single row transaction. > This rules provides compatibility with previous xlog handling. > > Needed for: 2798 > --- > src/box/iproto_constants.h | 3 ++ > src/box/wal.c | 36 +++++++++++++++- > src/box/xrow.c | 38 +++++++++++++++++ > src/box/xrow.h | 5 ++- > test/unit/xrow.cc | 3 ++ > test/vinyl/errinj_stat.result | 8 ++-- > test/vinyl/layout.result | 24 +++++------ > test/vinyl/stat.result | 78 +++++++++++++++++------------------ Also, forgot to say that this patch lacks a test case. We should probably check that transaction boundaries are written using the xlog reader module. ^ permalink raw reply [flat|nested] 16+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v2 1/2] Journal transaction boundaries 2019-01-28 13:00 ` Vladimir Davydov @ 2019-01-28 13:08 ` Vladislav Shpilevoy 0 siblings, 0 replies; 16+ messages in thread From: Vladislav Shpilevoy @ 2019-01-28 13:08 UTC (permalink / raw) To: tarantool-patches, Vladimir Davydov, Georgy Kirichenko On 28/01/2019 16:00, Vladimir Davydov wrote: > On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote: >> Append txn_id, txn_replica_id and txn_last to xrow_header structure. >> txn_replica_id identifies replica where transaction was started and >> txn_id identifies transaction id on that replica. As transaction id >> a lsn of the first row in this transaction is used. >> txn_last set to true if it is the last row in a transaction, so we >> could commit transaction with last row or use additional NOP requests >> with txn_last = true ans valid txn_id and txn_replica_id. >> For replication all local changes moved to xrows array tail to form >> a separate transaction (like autonomous transaction) because it is not >> possible to replicate such transaction back to it's creator. >> >> As encoding/deconding rules assumed: >> 1. txn_replica_id is encoded only if it is not equal with replica >> id. This might have point because of replication trigger >> 2. txn_id and txn_last are encoded only for multi-row transaction. >> So if we do not have txn_id in a xstream then this means that it >> is a single row transaction. >> This rules provides compatibility with previous xlog handling. >> >> Needed for: 2798 >> --- >> src/box/iproto_constants.h | 3 ++ >> src/box/wal.c | 36 +++++++++++++++- >> src/box/xrow.c | 38 +++++++++++++++++ >> src/box/xrow.h | 5 ++- >> test/unit/xrow.cc | 3 ++ >> test/vinyl/errinj_stat.result | 8 ++-- >> test/vinyl/layout.result | 24 +++++------ >> test/vinyl/stat.result | 78 +++++++++++++++++------------------ > > Also, forgot to say that this patch lacks a test case. We should > probably check that transaction boundaries are written using the > xlog reader module. > I have a test in my old branch on this issue. You can find it either in the branches list, or in the issue GitHub webpage, because I referred to it in the commit. ^ permalink raw reply [flat|nested] 16+ messages in thread
* [tarantool-patches] Re: [PATCH v2 1/2] Journal transaction boundaries 2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries Georgy Kirichenko 2019-01-28 12:58 ` Vladimir Davydov 2019-01-28 13:00 ` Vladimir Davydov @ 2019-02-08 16:56 ` Konstantin Osipov 2 siblings, 0 replies; 16+ messages in thread From: Konstantin Osipov @ 2019-02-08 16:56 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko * Georgy Kirichenko <georgy@tarantool.org> [19/01/22 15:45]: > Append txn_id, txn_replica_id and txn_last to xrow_header structure. > txn_replica_id identifies replica where transaction was started and > txn_id identifies transaction id on that replica. As transaction id > a lsn of the first row in this transaction is used. > txn_last set to true if it is the last row in a transaction, so we > could commit transaction with last row or use additional NOP requests > with txn_last = true ans valid txn_id and txn_replica_id. > For replication all local changes moved to xrows array tail to form > a separate transaction (like autonomous transaction) because it is not > possible to replicate such transaction back to it's creator. I think we should not need txn_replica_id at all. Let's discuss. And I also thought we decided to drop txn_last? Having both txn_last and txn_id seems redundant. We could set txn_id to the last LSN of this txn, that would make txn_last unnecessary too, while giving us easy to track transaction boundaries. What about adding something like "write concern" to xrow header at the same time, so that we can select sync property individually for each transaction? > > As encoding/deconding rules assumed: > 1. txn_replica_id is encoded only if it is not equal with replica > id. This might have point because of replication trigger > 2. txn_id and txn_last are encoded only for multi-row transaction. > So if we do not have txn_id in a xstream then this means that it > is a single row transaction. > This rules provides compatibility with previous xlog handling. > -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 16+ messages in thread
* [tarantool-patches] [PATCH v2 2/2] Transaction support for applier 2019-01-22 10:57 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko 2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries Georgy Kirichenko @ 2019-01-22 10:57 ` Georgy Kirichenko 2019-01-28 13:35 ` Vladimir Davydov 1 sibling, 1 reply; 16+ messages in thread From: Georgy Kirichenko @ 2019-01-22 10:57 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko Applier fetch incoming rows to form a transaction and then apply it. Implementation assumes that transaction could not mix in a replication stream. Also distributed transaction are not supported yet. Closes: #2798 Needed for: #980 --- src/box/applier.cc | 207 ++++++++++++++++++++++++++++++++------------- 1 file changed, 148 insertions(+), 59 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index adbe88679..0e3832ad8 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -48,6 +48,7 @@ #include "error.h" #include "session.h" #include "cfg.h" +#include "txn.h" STRS(applier_state, applier_STATE); @@ -380,6 +381,102 @@ applier_join(struct applier *applier) applier_set_state(applier, APPLIER_READY); } +/** + * Read one transaction from network. + * Transaction rows are placed into row_buf as an array, row's bodies are + * placed into obuf because it is not allowed to relocate row's bodies. + * Also we could not use applier input buffer because rpos adjusted after xrow + * decoding and corresponding space going to reuse. + * + * Note: current implementation grants that transaction could not be mixed, so + * we read each transaction from first xrow until xrow with txn_last = true. + */ +static int64_t +applier_read_tx(struct applier *applier, struct ibuf *row_buf, + struct obuf *data_buf) +{ + struct xrow_header *row; + struct ev_io *coio = &applier->io; + struct ibuf *ibuf = &applier->ibuf; + int64_t txn_id = 0; + uint32_t txn_replica_id = 0; + + do { + row = (struct xrow_header *)ibuf_alloc(row_buf, + sizeof(struct xrow_header)); + if (row == NULL) { + diag_set(OutOfMemory, sizeof(struct xrow_header), + "slab", "struct xrow_header"); + goto error; + } + + double timeout = replication_disconnect_timeout(); + try { + /* TODO: we should have a C version of this function. */ + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); + } catch (...) { + goto error; + } + + if (iproto_type_is_error(row->type)) { + xrow_decode_error(row); + goto error; + } + + /* Replication request. */ + if (row->replica_id == REPLICA_ID_NIL || + row->replica_id >= VCLOCK_MAX) { + /* + * A safety net, this can only occur + * if we're fed a strangely broken xlog. + */ + diag_set(ClientError, ER_UNKNOWN_REPLICA, + int2str(row->replica_id), + tt_uuid_str(&REPLICASET_UUID)); + goto error; + } + if (ibuf_used(row_buf) == sizeof(struct xrow_header)) { + /* + * First row in a transaction. In order to enforce + * consistency check that first row lsn and replica id + * match with transaction. + */ + txn_id = row->lsn; + txn_replica_id = row->replica_id; + } + if (txn_id != row->txn_id || + txn_replica_id != row->txn_replica_id) { + /* We are not able to handle interleaving transactions. */ + diag_set(ClientError, ER_UNSUPPORTED, + "replications", + "interleaving transactions"); + goto error; + } + + + applier->lag = ev_now(loop()) - row->tm; + applier->last_row_time = ev_monotonic_now(loop()); + + if (row->body->iov_base != NULL) { + void *new_base = obuf_alloc(data_buf, row->body->iov_len); + if (new_base == NULL) { + diag_set(OutOfMemory, row->body->iov_len, + "slab", "xrow_data"); + goto error; + } + memcpy(new_base, row->body->iov_base, row->body->iov_len); + row->body->iov_base = new_base; + } + + } while (row->txn_last == 0); + + return 0; +error: + ibuf_reset(row_buf); + obuf_reset(data_buf); + return -1; +} + /** * Execute and process SUBSCRIBE request (follow updates from a master). */ @@ -396,6 +493,10 @@ applier_subscribe(struct applier *applier) struct ibuf *ibuf = &applier->ibuf; struct xrow_header row; struct vclock remote_vclock_at_subscribe; + struct ibuf row_buf; + struct obuf data_buf; + ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header)); + obuf_create(&data_buf, &cord()->slabc, 0x10000); xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, &replicaset.vclock); @@ -475,87 +576,75 @@ applier_subscribe(struct applier *applier) applier_set_state(applier, APPLIER_FOLLOW); } - /* - * Tarantool < 1.7.7 does not send periodic heartbeat - * messages so we can't assume that if we haven't heard - * from the master for quite a while the connection is - * broken - the master might just be idle. - */ - if (applier->version_id < version_id(1, 7, 7)) { - coio_read_xrow(coio, ibuf, &row); - } else { - double timeout = replication_disconnect_timeout(); - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout); - } + if (applier_read_tx(applier, &row_buf, &data_buf) != 0) + diag_raise(); - if (iproto_type_is_error(row.type)) - xrow_decode_error_xc(&row); /* error */ - /* Replication request. */ - if (row.replica_id == REPLICA_ID_NIL || - row.replica_id >= VCLOCK_MAX) { - /* - * A safety net, this can only occur - * if we're fed a strangely broken xlog. - */ - tnt_raise(ClientError, ER_UNKNOWN_REPLICA, - int2str(row.replica_id), - tt_uuid_str(&REPLICASET_UUID)); - } + struct txn *txn = NULL; + struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos; + struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1; - applier->lag = ev_now(loop()) - row.tm; + applier->lag = ev_now(loop()) - last_row->tm; applier->last_row_time = ev_monotonic_now(loop()); - struct replica *replica = replica_by_id(row.replica_id); + struct replica *replica = replica_by_id(first_row->txn_replica_id); struct latch *latch = (replica ? &replica->order_latch : &replicaset.applier.order_latch); - /* - * In a full mesh topology, the same set - * of changes may arrive via two - * concurrently running appliers. Thanks - * to vclock_follow() above, the first row - * in the set will be skipped - but the - * remaining may execute out of order, - * when the following xstream_write() - * yields on WAL. Hence we need a latch to - * strictly order all changes which belong - * to the same server id. - */ latch_lock(latch); + /* First row identifies a transaction. */ + assert(first_row->lsn == first_row->txn_id); + assert(first_row->replica_id == first_row->txn_replica_id); if (vclock_get(&replicaset.applier.vclock, - row.replica_id) < row.lsn) { - if (row.replica_id == instance_id && + first_row->replica_id) < first_row->lsn) { + if (first_row->replica_id == instance_id && vclock_get(&replicaset.vclock, instance_id) < - row.lsn) { + first_row->lsn) { /* Local row returned back. */ goto done; } /* Preserve old lsn value. */ int64_t old_lsn = vclock_get(&replicaset.applier.vclock, - row.replica_id); - vclock_follow_xrow(&replicaset.applier.vclock, &row); - int res = xstream_write(applier->subscribe_stream, &row); - struct error *e = diag_last_error(diag_get()); - if (res != 0 && e->type == &type_ClientError && - box_error_code(e) == ER_TUPLE_FOUND && - replication_skip_conflict) { - /** - * Silently skip ER_TUPLE_FOUND error if such - * option is set in config. - */ - diag_clear(diag_get()); - row.type = IPROTO_NOP; - row.bodycnt = 0; - res = xstream_write(applier->subscribe_stream, - &row); + first_row->replica_id); + + struct xrow_header *row = first_row; + if (first_row != last_row) + txn = txn_begin(false); + int res = 0; + while (row <= last_row && res == 0) { + vclock_follow_xrow(&replicaset.applier.vclock, row); + res = xstream_write(applier->subscribe_stream, row); + struct error *e; + if (res != 0 && + (e = diag_last_error(diag_get()))->type == + &type_ClientError && + box_error_code(e) == ER_TUPLE_FOUND && + replication_skip_conflict) { + /** + * Silently skip ER_TUPLE_FOUND error + * if such option is set in config. + */ + diag_clear(diag_get()); + row->type = IPROTO_NOP; + row->bodycnt = 0; + res = xstream_write(applier->subscribe_stream, + row); + } + ++row; } + if (res == 0 && txn != NULL) + res = txn_commit(txn); + if (res != 0) { /* Rollback lsn to have a chance for a retry. */ vclock_set(&replicaset.applier.vclock, - row.replica_id, old_lsn); + first_row->replica_id, old_lsn); + obuf_reset(&data_buf); + ibuf_reset(&row_buf); latch_unlock(latch); diag_raise(); } } done: + obuf_reset(&data_buf); + ibuf_reset(&row_buf); latch_unlock(latch); /* * Stay 'orphan' until appliers catch up with -- 2.20.1 ^ permalink raw reply [flat|nested] 16+ messages in thread
* Re: [tarantool-patches] [PATCH v2 2/2] Transaction support for applier 2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko @ 2019-01-28 13:35 ` Vladimir Davydov 0 siblings, 0 replies; 16+ messages in thread From: Vladimir Davydov @ 2019-01-28 13:35 UTC (permalink / raw) To: Georgy Kirichenko; +Cc: tarantool-patches On Tue, Jan 22, 2019 at 01:57:37PM +0300, Georgy Kirichenko wrote: > Applier fetch incoming rows to form a transaction and then apply it. > Implementation assumes that transaction could not mix in a > replication stream. Also distributed transaction are not supported yet. > > Closes: #2798 > Needed for: #980 > --- > src/box/applier.cc | 207 ++++++++++++++++++++++++++++++++------------- > 1 file changed, 148 insertions(+), 59 deletions(-) Without a test, this patch is inadmissible. Vlad mentioned that he has some tests left from his old implementation. Please salvage those. > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index adbe88679..0e3832ad8 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -48,6 +48,7 @@ > #include "error.h" > #include "session.h" > #include "cfg.h" > +#include "txn.h" > > STRS(applier_state, applier_STATE); > > @@ -380,6 +381,102 @@ applier_join(struct applier *applier) > applier_set_state(applier, APPLIER_READY); > } > > +/** > + * Read one transaction from network. > + * Transaction rows are placed into row_buf as an array, row's bodies are > + * placed into obuf because it is not allowed to relocate row's bodies. > + * Also we could not use applier input buffer because rpos adjusted after xrow > + * decoding and corresponding space going to reuse. > + * > + * Note: current implementation grants that transaction could not be mixed, so > + * we read each transaction from first xrow until xrow with txn_last = true. > + */ > +static int64_t > +applier_read_tx(struct applier *applier, struct ibuf *row_buf, > + struct obuf *data_buf) > +{ > + struct xrow_header *row; > + struct ev_io *coio = &applier->io; > + struct ibuf *ibuf = &applier->ibuf; > + int64_t txn_id = 0; > + uint32_t txn_replica_id = 0; > + > + do { > + row = (struct xrow_header *)ibuf_alloc(row_buf, > + sizeof(struct xrow_header)); > + if (row == NULL) { > + diag_set(OutOfMemory, sizeof(struct xrow_header), > + "slab", "struct xrow_header"); > + goto error; > + } > + > + double timeout = replication_disconnect_timeout(); > + try { > + /* TODO: we should have a C version of this function. */ > + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); > + } catch (...) { > + goto error; > + } > + > + if (iproto_type_is_error(row->type)) { > + xrow_decode_error(row); > + goto error; > + } > + > + /* Replication request. */ > + if (row->replica_id == REPLICA_ID_NIL || > + row->replica_id >= VCLOCK_MAX) { > + /* > + * A safety net, this can only occur > + * if we're fed a strangely broken xlog. > + */ > + diag_set(ClientError, ER_UNKNOWN_REPLICA, > + int2str(row->replica_id), > + tt_uuid_str(&REPLICASET_UUID)); > + goto error; > + } > + if (ibuf_used(row_buf) == sizeof(struct xrow_header)) { > + /* > + * First row in a transaction. In order to enforce > + * consistency check that first row lsn and replica id > + * match with transaction. > + */ > + txn_id = row->lsn; > + txn_replica_id = row->replica_id; > + } > + if (txn_id != row->txn_id || > + txn_replica_id != row->txn_replica_id) { > + /* We are not able to handle interleaving transactions. */ > + diag_set(ClientError, ER_UNSUPPORTED, > + "replications", > + "interleaving transactions"); > + goto error; > + } Accumulating rows feels like the iproto realm. I don't think that it's a good idea to implement a dirty ad-hoc solution for this. Instead we should move applier to iproto IMO. This would probably allow us to reuse the code for interactive iproto transactions - the two issues look very similar to me and I think we should use the same protocol and code to get them both working. > + > + > + applier->lag = ev_now(loop()) - row->tm; > + applier->last_row_time = ev_monotonic_now(loop()); > + > + if (row->body->iov_base != NULL) { > + void *new_base = obuf_alloc(data_buf, row->body->iov_len); > + if (new_base == NULL) { > + diag_set(OutOfMemory, row->body->iov_len, > + "slab", "xrow_data"); > + goto error; > + } > + memcpy(new_base, row->body->iov_base, row->body->iov_len); > + row->body->iov_base = new_base; > + } > + > + } while (row->txn_last == 0); > + > + return 0; > +error: > + ibuf_reset(row_buf); > + obuf_reset(data_buf); > + return -1; > +} > + > /** > * Execute and process SUBSCRIBE request (follow updates from a master). > */ > @@ -396,6 +493,10 @@ applier_subscribe(struct applier *applier) > struct ibuf *ibuf = &applier->ibuf; > struct xrow_header row; > struct vclock remote_vclock_at_subscribe; > + struct ibuf row_buf; > + struct obuf data_buf; > + ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header)); > + obuf_create(&data_buf, &cord()->slabc, 0x10000); > > xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, > &replicaset.vclock); > @@ -475,87 +576,75 @@ applier_subscribe(struct applier *applier) > applier_set_state(applier, APPLIER_FOLLOW); > } > > - /* > - * Tarantool < 1.7.7 does not send periodic heartbeat > - * messages so we can't assume that if we haven't heard > - * from the master for quite a while the connection is > - * broken - the master might just be idle. > - */ > - if (applier->version_id < version_id(1, 7, 7)) { > - coio_read_xrow(coio, ibuf, &row); > - } else { > - double timeout = replication_disconnect_timeout(); > - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout); > - } > + if (applier_read_tx(applier, &row_buf, &data_buf) != 0) > + diag_raise(); > > - if (iproto_type_is_error(row.type)) > - xrow_decode_error_xc(&row); /* error */ > - /* Replication request. */ > - if (row.replica_id == REPLICA_ID_NIL || > - row.replica_id >= VCLOCK_MAX) { > - /* > - * A safety net, this can only occur > - * if we're fed a strangely broken xlog. > - */ > - tnt_raise(ClientError, ER_UNKNOWN_REPLICA, > - int2str(row.replica_id), > - tt_uuid_str(&REPLICASET_UUID)); > - } > + struct txn *txn = NULL; > + struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos; > + struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1; > > - applier->lag = ev_now(loop()) - row.tm; > + applier->lag = ev_now(loop()) - last_row->tm; > applier->last_row_time = ev_monotonic_now(loop()); > - struct replica *replica = replica_by_id(row.replica_id); > + struct replica *replica = replica_by_id(first_row->txn_replica_id); > struct latch *latch = (replica ? &replica->order_latch : > &replicaset.applier.order_latch); > - /* > - * In a full mesh topology, the same set > - * of changes may arrive via two > - * concurrently running appliers. Thanks > - * to vclock_follow() above, the first row > - * in the set will be skipped - but the > - * remaining may execute out of order, > - * when the following xstream_write() > - * yields on WAL. Hence we need a latch to > - * strictly order all changes which belong > - * to the same server id. > - */ > latch_lock(latch); > + /* First row identifies a transaction. */ > + assert(first_row->lsn == first_row->txn_id); > + assert(first_row->replica_id == first_row->txn_replica_id); > if (vclock_get(&replicaset.applier.vclock, > - row.replica_id) < row.lsn) { > - if (row.replica_id == instance_id && > + first_row->replica_id) < first_row->lsn) { > + if (first_row->replica_id == instance_id && > vclock_get(&replicaset.vclock, instance_id) < > - row.lsn) { > + first_row->lsn) { > /* Local row returned back. */ > goto done; > } > /* Preserve old lsn value. */ > int64_t old_lsn = vclock_get(&replicaset.applier.vclock, > - row.replica_id); > - vclock_follow_xrow(&replicaset.applier.vclock, &row); > - int res = xstream_write(applier->subscribe_stream, &row); > - struct error *e = diag_last_error(diag_get()); > - if (res != 0 && e->type == &type_ClientError && > - box_error_code(e) == ER_TUPLE_FOUND && > - replication_skip_conflict) { > - /** > - * Silently skip ER_TUPLE_FOUND error if such > - * option is set in config. > - */ > - diag_clear(diag_get()); > - row.type = IPROTO_NOP; > - row.bodycnt = 0; > - res = xstream_write(applier->subscribe_stream, > - &row); > + first_row->replica_id); > + > + struct xrow_header *row = first_row; > + if (first_row != last_row) > + txn = txn_begin(false); So we have xstream_write to hide box internals, but we still use txn_begin/commit. This looks ugly. We should encapsulate those somehow as well, I guess. > + int res = 0; > + while (row <= last_row && res == 0) { > + vclock_follow_xrow(&replicaset.applier.vclock, row); > + res = xstream_write(applier->subscribe_stream, row); > + struct error *e; > + if (res != 0 && > + (e = diag_last_error(diag_get()))->type == > + &type_ClientError && > + box_error_code(e) == ER_TUPLE_FOUND && > + replication_skip_conflict) { > + /** > + * Silently skip ER_TUPLE_FOUND error > + * if such option is set in config. > + */ > + diag_clear(diag_get()); > + row->type = IPROTO_NOP; > + row->bodycnt = 0; > + res = xstream_write(applier->subscribe_stream, > + row); > + } > + ++row; > } > + if (res == 0 && txn != NULL) > + res = txn_commit(txn); > + > if (res != 0) { > /* Rollback lsn to have a chance for a retry. */ > vclock_set(&replicaset.applier.vclock, > - row.replica_id, old_lsn); > + first_row->replica_id, old_lsn); > + obuf_reset(&data_buf); > + ibuf_reset(&row_buf); > latch_unlock(latch); > diag_raise(); > } > } > done: > + obuf_reset(&data_buf); > + ibuf_reset(&row_buf); > latch_unlock(latch); > /* > * Stay 'orphan' until appliers catch up with ^ permalink raw reply [flat|nested] 16+ messages in thread
* [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol @ 2019-01-06 13:05 Georgy Kirichenko 2019-01-06 13:05 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko 0 siblings, 1 reply; 16+ messages in thread From: Georgy Kirichenko @ 2019-01-06 13:05 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko This patchset introduces transactional replication and consist of two commits: * the first one forms transaction boundaries in a xstream * the second one forms transactions in applier buffers and then applies them with correct begin/commit boundaries. Note: distributed transaction are not supported so journal forms a separate transaction for all local triggers effects. Changes in v2: - Fixed local transaction extraction Georgy Kirichenko (2): Journal transaction boundaries Transaction support for applier src/box/applier.cc | 202 ++++++++++++++++++++++++++----------- src/box/iproto_constants.h | 3 + src/box/wal.c | 36 ++++++- src/box/xrow.c | 38 +++++++ src/box/xrow.h | 5 +- test/unit/xrow.cc | 3 + test/vinyl/errinj.result | 8 +- test/vinyl/info.result | 38 +++---- test/vinyl/layout.result | 24 ++--- 9 files changed, 263 insertions(+), 94 deletions(-) -- 2.20.1 ^ permalink raw reply [flat|nested] 16+ messages in thread
* [tarantool-patches] [PATCH v2 2/2] Transaction support for applier 2019-01-06 13:05 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko @ 2019-01-06 13:05 ` Georgy Kirichenko 0 siblings, 0 replies; 16+ messages in thread From: Georgy Kirichenko @ 2019-01-06 13:05 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko Applier fetch incoming rows to form a transaction and then apply it. Implementation assumes that transaction could not mix in a replication stream. Also distributed transaction are not supported yet. Closes: #2798 Needed for: #980 --- src/box/applier.cc | 202 ++++++++++++++++++++++++++++++++------------- 1 file changed, 145 insertions(+), 57 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 6c0eb45d5..7e208aaa2 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -48,6 +48,7 @@ #include "error.h" #include "session.h" #include "cfg.h" +#include "txn.h" STRS(applier_state, applier_STATE); @@ -380,6 +381,102 @@ applier_join(struct applier *applier) applier_set_state(applier, APPLIER_READY); } +/** + * Read one transaction from network. + * Transaction rows are placed into row_buf as an array, row's bodies are + * placed into obuf because it is not allowed to relocate row's bodies. + * Also we could not use applier input buffer because rpos adjusted after xrow + * decoding and corresponding space going to reuse. + * + * Note: current implementation grants that transaction could not be mixed, so + * we read each transaction from first xrow until xrow with txn_last = true. + */ +static int64_t +applier_read_tx(struct applier *applier, struct ibuf *row_buf, + struct obuf *data_buf) +{ + struct xrow_header *row; + struct ev_io *coio = &applier->io; + struct ibuf *ibuf = &applier->ibuf; + int64_t txn_id = 0; + uint32_t txn_replica_id = 0; + + do { + row = (struct xrow_header *)ibuf_alloc(row_buf, + sizeof(struct xrow_header)); + if (row == NULL) { + diag_set(OutOfMemory, sizeof(struct xrow_header), + "slab", "struct xrow_header"); + goto error; + } + + double timeout = replication_disconnect_timeout(); + try { + /* TODO: we should have a C version of this function. */ + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); + } catch (...) { + goto error; + } + + if (iproto_type_is_error(row->type)) { + xrow_decode_error(row); + goto error; + } + + /* Replication request. */ + if (row->replica_id == REPLICA_ID_NIL || + row->replica_id >= VCLOCK_MAX) { + /* + * A safety net, this can only occur + * if we're fed a strangely broken xlog. + */ + diag_set(ClientError, ER_UNKNOWN_REPLICA, + int2str(row->replica_id), + tt_uuid_str(&REPLICASET_UUID)); + goto error; + } + if (ibuf_used(row_buf) == sizeof(struct xrow_header)) { + /* + * First row in a transaction. In order to enforce + * consistency check that first row lsn and replica id + * match with transaction. + */ + txn_id = row->lsn; + txn_replica_id = row->replica_id; + } + if (txn_id != row->txn_id || + txn_replica_id != row->txn_replica_id) { + /* We are not able to handle interleaving transactions. */ + diag_set(ClientError, ER_UNSUPPORTED, + "replications", + "interleaving transactions"); + goto error; + } + + + applier->lag = ev_now(loop()) - row->tm; + applier->last_row_time = ev_monotonic_now(loop()); + + if (row->body->iov_base != NULL) { + void *new_base = obuf_alloc(data_buf, row->body->iov_len); + if (new_base == NULL) { + diag_set(OutOfMemory, row->body->iov_len, + "slab", "xrow_data"); + goto error; + } + memcpy(new_base, row->body->iov_base, row->body->iov_len); + row->body->iov_base = new_base; + } + + } while (row->txn_last == 0); + + return 0; +error: + ibuf_reset(row_buf); + obuf_reset(data_buf); + return -1; +} + /** * Execute and process SUBSCRIBE request (follow updates from a master). */ @@ -396,6 +493,10 @@ applier_subscribe(struct applier *applier) struct ibuf *ibuf = &applier->ibuf; struct xrow_header row; struct vclock remote_vclock_at_subscribe; + struct ibuf row_buf; + struct obuf data_buf; + ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header)); + obuf_create(&data_buf, &cord()->slabc, 0x10000); xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, &replicaset.vclock); @@ -475,80 +576,67 @@ applier_subscribe(struct applier *applier) applier_set_state(applier, APPLIER_FOLLOW); } - /* - * Tarantool < 1.7.7 does not send periodic heartbeat - * messages so we can't assume that if we haven't heard - * from the master for quite a while the connection is - * broken - the master might just be idle. - */ - if (applier->version_id < version_id(1, 7, 7)) { - coio_read_xrow(coio, ibuf, &row); - } else { - double timeout = replication_disconnect_timeout(); - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout); - } + if (applier_read_tx(applier, &row_buf, &data_buf) != 0) + diag_raise(); - if (iproto_type_is_error(row.type)) - xrow_decode_error_xc(&row); /* error */ - /* Replication request. */ - if (row.replica_id == REPLICA_ID_NIL || - row.replica_id >= VCLOCK_MAX) { - /* - * A safety net, this can only occur - * if we're fed a strangely broken xlog. - */ - tnt_raise(ClientError, ER_UNKNOWN_REPLICA, - int2str(row.replica_id), - tt_uuid_str(&REPLICASET_UUID)); - } + struct txn *txn = NULL; + struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos; + struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1; - applier->lag = ev_now(loop()) - row.tm; + applier->lag = ev_now(loop()) - last_row->tm; applier->last_row_time = ev_monotonic_now(loop()); - struct replica *replica = replica_by_id(row.replica_id); + struct replica *replica = replica_by_id(first_row->txn_replica_id); struct latch *latch = (replica ? &replica->order_latch : &replicaset.applier.order_latch); - /* - * In a full mesh topology, the same set - * of changes may arrive via two - * concurrently running appliers. Thanks - * to vclock_follow() above, the first row - * in the set will be skipped - but the - * remaining may execute out of order, - * when the following xstream_write() - * yields on WAL. Hence we need a latch to - * strictly order all changes which belong - * to the same server id. - */ latch_lock(latch); + /* First row identifies a transaction. */ + assert(first_row->lsn == first_row->txn_id); + assert(first_row->replica_id == first_row->txn_replica_id); if (vclock_get(&replicaset.applier.vclock, - row.replica_id) < row.lsn) { + first_row->replica_id) < first_row->lsn) { /* Preserve old lsn value. */ int64_t old_lsn = vclock_get(&replicaset.applier.vclock, - row.replica_id); - vclock_follow_xrow(&replicaset.applier.vclock, &row); - int res = xstream_write(applier->subscribe_stream, &row); - struct error *e = diag_last_error(diag_get()); - if (res != 0 && e->type == &type_ClientError && - box_error_code(e) == ER_TUPLE_FOUND && - replication_skip_conflict) { - /** - * Silently skip ER_TUPLE_FOUND error if such - * option is set in config. - */ - diag_clear(diag_get()); - row.type = IPROTO_NOP; - row.bodycnt = 0; - res = xstream_write(applier->subscribe_stream, - &row); + first_row->replica_id); + + struct xrow_header *row = first_row; + if (first_row != last_row) + txn = txn_begin(false); + int res = 0; + while (row <= last_row && res == 0) { + vclock_follow_xrow(&replicaset.applier.vclock, row); + res = xstream_write(applier->subscribe_stream, row); + struct error *e; + if (res != 0 && + (e = diag_last_error(diag_get()))->type == + &type_ClientError && + box_error_code(e) == ER_TUPLE_FOUND && + replication_skip_conflict) { + /** + * Silently skip ER_TUPLE_FOUND error + * if such option is set in config. + */ + diag_clear(diag_get()); + row->type = IPROTO_NOP; + row->bodycnt = 0; + res = xstream_write(applier->subscribe_stream, + row); + } + ++row; } + if (res == 0 && txn != NULL) + res = txn_commit(txn); if (res != 0) { /* Rollback lsn to have a chance for a retry. */ vclock_set(&replicaset.applier.vclock, - row.replica_id, old_lsn); + first_row->replica_id, old_lsn); + obuf_reset(&data_buf); + ibuf_reset(&row_buf); latch_unlock(latch); diag_raise(); } } + obuf_reset(&data_buf); + ibuf_reset(&row_buf); latch_unlock(latch); /* * Stay 'orphan' until appliers catch up with -- 2.20.1 ^ permalink raw reply [flat|nested] 16+ messages in thread
end of thread, other threads:[~2019-02-08 16:56 UTC | newest] Thread overview: 16+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2019-01-22 10:57 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko 2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries Georgy Kirichenko 2019-01-28 12:58 ` Vladimir Davydov 2019-01-29 10:09 ` Георгий Кириченко 2019-01-29 11:00 ` Vladimir Davydov 2019-01-31 7:34 ` Георгий Кириченко 2019-01-31 8:19 ` Vladimir Davydov 2019-01-31 14:25 ` Георгий Кириченко 2019-01-31 14:54 ` Vladimir Davydov 2019-02-01 9:31 ` Георгий Кириченко 2019-01-28 13:00 ` Vladimir Davydov 2019-01-28 13:08 ` [tarantool-patches] " Vladislav Shpilevoy 2019-02-08 16:56 ` Konstantin Osipov 2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko 2019-01-28 13:35 ` Vladimir Davydov -- strict thread matches above, loose matches on Subject: below -- 2019-01-06 13:05 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko 2019-01-06 13:05 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox