It is a wrong commit, please review the second version. On Sunday, January 6, 2019 12:26:05 AM MSK Georgy Kirichenko wrote: > Append txn_id, txn_replica_id and txn_last to xrow_header structure. > txn_replica_id identifies replica where transaction was started and > txn_id identifies transaction id on that replica. As transaction id a lsn > of the first row in this transaction is used. > txn_last set to true if it is the last row in a transaction, so we could > commit transaction with last row or use additional NOP requests with > txn_last = true ans valid txn_id and txn_replica_id. > For replication all local changes moved to xrows array tail to form a > separate transaction (like autonomous transaction) because it is not > possible to replicate such transaction back to it's creator. > > As encoding/deconding rules assumed: > 1. txn_replica_id is encoded only if it is not equal with replica id. This > might have point because of replication trigger > 2. txn_id and txn_last are encoded only for multi-row transaction. So if we > do not have txn_id in a xstream then this means that it is a single row > transaction > This rules provides compatibility with previous xlog handling. > > Needed for: 2798 > --- > src/box/iproto_constants.h | 3 +++ > src/box/wal.c | 33 ++++++++++++++++++++++++++++++++- > src/box/xrow.c | 38 ++++++++++++++++++++++++++++++++++++++ > src/box/xrow.h | 5 ++++- > test/unit/xrow.cc | 3 +++ > test/vinyl/errinj.result | 8 ++++---- > test/vinyl/info.result | 38 +++++++++++++++++++------------------- > test/vinyl/layout.result | 24 ++++++++++++------------ > 8 files changed, 115 insertions(+), 37 deletions(-) > > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h > index 728514297..d01cdf840 100644 > --- a/src/box/iproto_constants.h > +++ b/src/box/iproto_constants.h > @@ -60,6 +60,9 @@ enum iproto_key { > IPROTO_SCHEMA_VERSION = 0x05, > IPROTO_SERVER_VERSION = 0x06, > IPROTO_GROUP_ID = 0x07, > + IPROTO_TXN_ID = 0x08, > + IPROTO_TXN_REPLICA_ID = 0x09, > + IPROTO_TXN_LAST = 0x0a, > /* Leave a gap for other keys in the header. */ > IPROTO_SPACE_ID = 0x10, > IPROTO_INDEX_ID = 0x11, > diff --git a/src/box/wal.c b/src/box/wal.c > index 4c3537672..584d951c0 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer) > } > > static void > -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin, > struct xrow_header **end) > { > + struct xrow_header **row = begin; > /** Assign LSN to all local rows. */ > for ( ; row < end; row++) { > if ((*row)->replica_id == 0) { > @@ -917,6 +918,36 @@ wal_assign_lsn(struct vclock *vclock, struct > xrow_header **row, vclock_follow_xrow(vclock, *row); > } > } > + if ((*begin)->replica_id != instance_id) { > + /* > + * Move all local changes to the end of rows array and > + * a fake local transaction (like an autonomous transaction) > + * because we could not replicate the transaction back. > + */ > + row = begin; > + while (row < end - 1) { > + if (row[0]->replica_id == instance_id && > + row[1]->replica_id != instance_id) { > + struct xrow_header *tmp = row[0]; > + row[0] = row[1]; > + row[1] = tmp; > + } > + ++row; > + } > + /* Search begin of local rows tail. */ > + row = end; > + while (row > begin && row[-1]->replica_id == instance_id) > + --row; > + begin = row; > + } > + /* Setup txn_id and tnx_replica_id for localy generated rows. */ > + row = begin; > + while (row < end) { > + row[0]->txn_id = begin[0]->lsn; > + row[0]->txn_replica_id = instance_id; > + row[0]->txn_last = row == end - 1 ? 1 : 0; > + ++row; > + } > } > > static void > diff --git a/src/box/xrow.c b/src/box/xrow.c > index ef3f81add..db524b3c8 100644 > --- a/src/box/xrow.c > +++ b/src/box/xrow.c > @@ -133,12 +133,32 @@ error: > case IPROTO_SCHEMA_VERSION: > header->schema_version = mp_decode_uint(pos); > break; > + case IPROTO_TXN_ID: > + header->txn_id = mp_decode_uint(pos); > + break; > + case IPROTO_TXN_REPLICA_ID: > + header->txn_replica_id = mp_decode_uint(pos); > + break; > + case IPROTO_TXN_LAST: > + header->txn_last = mp_decode_uint(pos); > + break; > default: > /* unknown header */ > mp_next(pos); > } > } > assert(*pos <= end); > + if (header->txn_id == 0) { > + /* > + * Transaction id is not set so it is a single statement > + * transaction. > + */ > + header->txn_id = header->lsn; > + header->txn_last = true; > + } > + if (header->txn_replica_id == 0) > + header->txn_replica_id = header->replica_id; > + > /* Nop requests aren't supposed to have a body. */ > if (*pos < end && header->type != IPROTO_NOP) { > const char *body = *pos; > @@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header, > uint64_t sync, d = mp_encode_double(d, header->tm); > map_size++; > } > + if (header->txn_id != header->lsn || header->txn_last == 0) { > + /* Encode txn id for multi row transaction members. */ > + d = mp_encode_uint(d, IPROTO_TXN_ID); > + d = mp_encode_uint(d, header->txn_id); > + map_size++; > + } > + if (header->txn_replica_id != header->replica_id) { > + d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID); > + d = mp_encode_uint(d, header->txn_replica_id); > + map_size++; > + } > + if (header->txn_last && !(header->txn_id == header->lsn && > + header->txn_replica_id == header->replica_id)) { > + /* Set last row for multi row transaction. */ > + d = mp_encode_uint(d, IPROTO_TXN_LAST); > + d = mp_encode_uint(d, header->txn_last); > + map_size++; > + } > assert(d <= data + XROW_HEADER_LEN_MAX); > mp_encode_map(data, map_size); > out->iov_len = d - (char *) out->iov_base; > diff --git a/src/box/xrow.h b/src/box/xrow.h > index 6bab0a1fd..4acd84d56 100644 > --- a/src/box/xrow.h > +++ b/src/box/xrow.h > @@ -47,7 +47,7 @@ enum { > XROW_HEADER_IOVMAX = 1, > XROW_BODY_IOVMAX = 2, > XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, > - XROW_HEADER_LEN_MAX = 40, > + XROW_HEADER_LEN_MAX = 60, > XROW_BODY_LEN_MAX = 128, > IPROTO_HEADER_LEN = 28, > /** 7 = sizeof(iproto_body_bin). */ > @@ -69,6 +69,9 @@ struct xrow_header { > uint64_t sync; > int64_t lsn; /* LSN must be signed for correct comparison */ > double tm; > + int64_t txn_id; > + uint32_t txn_replica_id; > + uint32_t txn_last; > > int bodycnt; > uint32_t schema_version; > diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc > index 165a543cf..0b796d728 100644 > --- a/test/unit/xrow.cc > +++ b/test/unit/xrow.cc > @@ -215,6 +215,9 @@ test_xrow_header_encode_decode() > header.lsn = 400; > header.tm = 123.456; > header.bodycnt = 0; > + header.txn_id = header.lsn; > + header.txn_replica_id = header.replica_id; > + header.txn_last = true; > uint64_t sync = 100500; > struct iovec vec[1]; > is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); > diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result > index 23ab845b3..7ea5df777 100644 > --- a/test/vinyl/errinj.result > +++ b/test/vinyl/errinj.result > @@ -2164,7 +2164,7 @@ i:stat().disk.compact.queue -- 30 statements > - bytes_compressed: > pages: 3 > rows: 30 > - bytes: 471 > + bytes: 537 > ... > i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue > --- > @@ -2178,7 +2178,7 @@ i:stat().disk.compact.queue -- 40 statements > - bytes_compressed: > pages: 4 > rows: 40 > - bytes: 628 > + bytes: 716 > ... > i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue > --- > @@ -2192,7 +2192,7 @@ i:stat().disk.compact.queue -- 50 statements > - bytes_compressed: > pages: 5 > rows: 50 > - bytes: 785 > + bytes: 895 > ... > i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue > --- > @@ -2206,7 +2206,7 @@ i:stat().disk.compact.queue -- 50 statements > - bytes_compressed: > pages: 5 > rows: 50 > - bytes: 785 > + bytes: 895 > ... > i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue > --- > diff --git a/test/vinyl/info.result b/test/vinyl/info.result > index 922728abe..4876ddc68 100644 > --- a/test/vinyl/info.result > +++ b/test/vinyl/info.result > @@ -285,19 +285,19 @@ stat_diff(istat(), st) > bytes: 26525 > count: 1 > out: > - bytes: 26049 > + bytes: 26113 > pages: 7 > bytes_compressed: > rows: 25 > index_size: 294 > rows: 25 > - bytes: 26049 > + bytes: 26113 > bytes_compressed: > bloom_size: 70 > statement: > replaces: 25 > pages: 7 > - bytes: 26049 > + bytes: 26113 > put: > rows: 25 > bytes: 26525 > @@ -325,26 +325,26 @@ stat_diff(istat(), st) > bytes: 53050 > count: 1 > out: > - bytes: 52091 > + bytes: 52217 > pages: 13 > bytes_compressed: > rows: 50 > index_size: 252 > rows: 25 > - bytes: 26042 > + bytes: 26104 > bytes_compressed: > pages: 6 > statement: > replaces: 25 > compact: > in: > - bytes: 78140 > + bytes: 78330 > pages: 20 > bytes_compressed: > rows: 75 > count: 1 > out: > - bytes: 52091 > + bytes: 52217 > pages: 13 > bytes_compressed: > rows: 50 > @@ -352,7 +352,7 @@ stat_diff(istat(), st) > rows: 50 > bytes: 53050 > rows: 25 > - bytes: 26042 > + bytes: 26104 > ... > -- point lookup from disk + cache put > st = istat() > @@ -376,7 +376,7 @@ stat_diff(istat(), st) > disk: > iterator: > read: > - bytes: 4167 > + bytes: 4177 > pages: 1 > bytes_compressed: > rows: 4 > @@ -626,7 +626,7 @@ stat_diff(istat(), st) > disk: > iterator: > read: > - bytes: 104300 > + bytes: 104550 > pages: 25 > bytes_compressed: > rows: 100 > @@ -971,7 +971,7 @@ istat() > --- > - rows: 306 > run_avg: 1 > - bytes: 317731 > + bytes: 317981 > upsert: > squashed: 0 > applied: 0 > @@ -1049,7 +1049,7 @@ istat() > bloom_size: 140 > pages: 25 > bytes_compressed: > - bytes: 104300 > + bytes: 104550 > txw: > bytes: 0 > rows: 0 > @@ -1082,7 +1082,7 @@ gstat() > in: 0 > queue: 0 > out: 0 > - data: 104300 > + data: 104550 > index: 1190 > memory: > tuple_cache: 14313 > @@ -1209,7 +1209,7 @@ st2 = i2:stat() > ... > s:bsize() > --- > -- 52199 > +- 52313 > ... > i1:len(), i2:len() > --- > @@ -1219,7 +1219,7 @@ i1:len(), i2:len() > i1:bsize(), i2:bsize() > --- > - 364 > -- 920 > +- 1022 > ... > s:bsize() == st1.disk.bytes > --- > @@ -1271,7 +1271,7 @@ st2 = i2:stat() > ... > s:bsize() > --- > -- 107449 > +- 107563 > ... > i1:len(), i2:len() > --- > @@ -1281,7 +1281,7 @@ i1:len(), i2:len() > i1:bsize(), i2:bsize() > --- > - 49516 > -- 50072 > +- 50174 > ... > s:bsize() == st1.memory.bytes + st1.disk.bytes > --- > @@ -1336,7 +1336,7 @@ st2 = i2:stat() > ... > s:bsize() > --- > -- 52199 > +- 52313 > ... > i1:len(), i2:len() > --- > @@ -1346,7 +1346,7 @@ i1:len(), i2:len() > i1:bsize(), i2:bsize() > --- > - 364 > -- 920 > +- 1022 > ... > s:bsize() == st1.disk.bytes > --- > diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result > index 14201c5dd..a6b577dbc 100644 > --- a/test/vinyl/layout.result > +++ b/test/vinyl/layout.result > @@ -253,8 +253,8 @@ result > BODY: > row_index_offset: > offset: > - size: 108 > - unpacked_size: 89 > + size: 118 > + unpacked_size: 99 > row_count: 4 > min_key: ['ёёё'] > - - 00000000000000000008.run > @@ -281,7 +281,7 @@ result > - HEADER: > type: ROWINDEX > BODY: > - row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00" > + row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06" > - - 00000000000000000012.index > - - HEADER: > type: RUNINFO > @@ -298,8 +298,8 @@ result > BODY: > row_index_offset: > offset: > - size: 102 > - unpacked_size: 83 > + size: 110 > + unpacked_size: 91 > row_count: 3 > min_key: ['ёёё'] > - - 00000000000000000012.run > @@ -324,7 +324,7 @@ result > - HEADER: > type: ROWINDEX > BODY: > - row_index: "\0\0\0\0\0\0\0\x14\0\0\0*" > + row_index: "\0\0\0\0\0\0\0\x16\0\0\0." > - - 00000000000000000006.index > - - HEADER: > type: RUNINFO > @@ -341,8 +341,8 @@ result > BODY: > row_index_offset: > offset: > - size: 108 > - unpacked_size: 89 > + size: 118 > + unpacked_size: 99 > row_count: 4 > min_key: [null, 'ёёё'] > - - 00000000000000000006.run > @@ -369,7 +369,7 @@ result > - HEADER: > type: ROWINDEX > BODY: > - row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00" > + row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06" > - - 00000000000000000010.index > - - HEADER: > type: RUNINFO > @@ -386,8 +386,8 @@ result > BODY: > row_index_offset: > offset: > - size: 90 > - unpacked_size: 71 > + size: 98 > + unpacked_size: 79 > row_count: 3 > min_key: [123, 'ёёё'] > - - 00000000000000000010.run > @@ -409,7 +409,7 @@ result > - HEADER: > type: ROWINDEX > BODY: > - row_index: "\0\0\0\0\0\0\0\x10\0\0\0\"" > + row_index: "\0\0\0\0\0\0\0\x12\0\0\0&" > ... > test_run:cmd("clear filter") > ---