[tarantool-patches] Re: [PATCH 1/2] Journal transaction boundaries

Georgy Kirichenko georgy at tarantool.org
Sun Jan 6 16:07:55 MSK 2019


It is a wrong commit, please review the second version.

On Sunday, January 6, 2019 12:26:05 AM MSK Georgy Kirichenko wrote:
> Append txn_id, txn_replica_id and txn_last to xrow_header structure.
> txn_replica_id identifies replica where transaction was started and
> txn_id identifies transaction id on that replica. As transaction id a lsn
> of the first row in this transaction is used.
> txn_last set to true if it is the last row in a transaction, so we could
> commit transaction with last row or use additional NOP requests with
> txn_last = true ans valid txn_id and txn_replica_id.
> For replication all local changes moved to xrows array tail to form a
> separate transaction (like autonomous transaction) because it is not
> possible to replicate such transaction back to it's creator.
> 
> As encoding/deconding rules assumed:
>  1. txn_replica_id is encoded only if it is not equal with replica id. This
>  might have point because of replication trigger
>  2. txn_id and txn_last are encoded only for multi-row transaction. So if we
> do not have txn_id in a xstream then this means that it is a single row
> transaction
> This rules provides compatibility with previous xlog handling.
> 
> Needed for: 2798
> ---
>  src/box/iproto_constants.h |  3 +++
>  src/box/wal.c              | 33 ++++++++++++++++++++++++++++++++-
>  src/box/xrow.c             | 38 ++++++++++++++++++++++++++++++++++++++
>  src/box/xrow.h             |  5 ++++-
>  test/unit/xrow.cc          |  3 +++
>  test/vinyl/errinj.result   |  8 ++++----
>  test/vinyl/info.result     | 38 +++++++++++++++++++-------------------
>  test/vinyl/layout.result   | 24 ++++++++++++------------
>  8 files changed, 115 insertions(+), 37 deletions(-)
> 
> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> index 728514297..d01cdf840 100644
> --- a/src/box/iproto_constants.h
> +++ b/src/box/iproto_constants.h
> @@ -60,6 +60,9 @@ enum iproto_key {
>  	IPROTO_SCHEMA_VERSION = 0x05,
>  	IPROTO_SERVER_VERSION = 0x06,
>  	IPROTO_GROUP_ID = 0x07,
> +	IPROTO_TXN_ID = 0x08,
> +	IPROTO_TXN_REPLICA_ID = 0x09,
> +	IPROTO_TXN_LAST = 0x0a,
>  	/* Leave a gap for other keys in the header. */
>  	IPROTO_SPACE_ID = 0x10,
>  	IPROTO_INDEX_ID = 0x11,
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 4c3537672..584d951c0 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer)
>  }
> 
>  static void
> -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
>  	       struct xrow_header **end)
>  {
> +	struct xrow_header **row = begin;
>  	/** Assign LSN to all local rows. */
>  	for ( ; row < end; row++) {
>  		if ((*row)->replica_id == 0) {
> @@ -917,6 +918,36 @@ wal_assign_lsn(struct vclock *vclock, struct
> xrow_header **row, vclock_follow_xrow(vclock, *row);
>  		}
>  	}
> +	if ((*begin)->replica_id != instance_id) {
> +		/*
> +		 * Move all local changes to the end of rows array and
> +		 * a fake local transaction (like an autonomous 
transaction)
> +		 * because we could not replicate the transaction back.
> +		 */
> +		row = begin;
> +		while (row < end - 1) {
> +			if (row[0]->replica_id == instance_id &&
> +			    row[1]->replica_id != instance_id) {
> +				struct xrow_header *tmp = row[0];
> +				row[0] = row[1];
> +				row[1] = tmp;
> +			}
> +			++row;
> +		}
> +		/* Search begin of local rows tail. */
> +		row = end;
> +		while (row > begin && row[-1]->replica_id == 
instance_id)
> +			--row;
> +		begin = row;
> +	}
> +	/* Setup txn_id and tnx_replica_id for localy generated rows. */
> +	row = begin;
> +	while (row < end) {
> +		row[0]->txn_id = begin[0]->lsn;
> +		row[0]->txn_replica_id = instance_id;
> +		row[0]->txn_last = row == end - 1 ? 1 : 0;
> +		++row;
> +	}
>  }
> 
>  static void
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index ef3f81add..db524b3c8 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -133,12 +133,32 @@ error:
>  		case IPROTO_SCHEMA_VERSION:
>  			header->schema_version = 
mp_decode_uint(pos);
>  			break;
> +		case IPROTO_TXN_ID:
> +			header->txn_id = mp_decode_uint(pos);
> +			break;
> +		case IPROTO_TXN_REPLICA_ID:
> +			header->txn_replica_id = mp_decode_uint(pos);
> +			break;
> +		case IPROTO_TXN_LAST:
> +			header->txn_last = mp_decode_uint(pos);
> +			break;
>  		default:
>  			/* unknown header */
>  			mp_next(pos);
>  		}
>  	}
>  	assert(*pos <= end);
> +	if (header->txn_id == 0) {
> +		/*
> +		 * Transaction id is not set so it is a single statement
> +		 * transaction.
> +		 */
> +		header->txn_id = header->lsn;
> +		header->txn_last = true;
> +	}
> +	if (header->txn_replica_id == 0)
> +		header->txn_replica_id = header->replica_id;
> +
>  	/* Nop requests aren't supposed to have a body. */
>  	if (*pos < end && header->type != IPROTO_NOP) {
>  		const char *body = *pos;
> @@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header,
> uint64_t sync, d = mp_encode_double(d, header->tm);
>  		map_size++;
>  	}
> +	if (header->txn_id != header->lsn || header->txn_last == 0) {
> +		/* Encode txn id for multi row transaction members. */
> +		d = mp_encode_uint(d, IPROTO_TXN_ID);
> +		d = mp_encode_uint(d, header->txn_id);
> +		map_size++;
> +	}
> +	if (header->txn_replica_id != header->replica_id) {
> +		d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID);
> +		d = mp_encode_uint(d, header->txn_replica_id);
> +		map_size++;
> +	}
> +	if (header->txn_last && !(header->txn_id == header->lsn &&
> +				  header->txn_replica_id == 
header->replica_id)) {
> +		/* Set last row for multi row transaction. */
> +		d = mp_encode_uint(d, IPROTO_TXN_LAST);
> +		d = mp_encode_uint(d, header->txn_last);
> +		map_size++;
> +	}
>  	assert(d <= data + XROW_HEADER_LEN_MAX);
>  	mp_encode_map(data, map_size);
>  	out->iov_len = d - (char *) out->iov_base;
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index 6bab0a1fd..4acd84d56 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -47,7 +47,7 @@ enum {
>  	XROW_HEADER_IOVMAX = 1,
>  	XROW_BODY_IOVMAX = 2,
>  	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
> -	XROW_HEADER_LEN_MAX = 40,
> +	XROW_HEADER_LEN_MAX = 60,
>  	XROW_BODY_LEN_MAX = 128,
>  	IPROTO_HEADER_LEN = 28,
>  	/** 7 = sizeof(iproto_body_bin). */
> @@ -69,6 +69,9 @@ struct xrow_header {
>  	uint64_t sync;
>  	int64_t lsn; /* LSN must be signed for correct comparison */
>  	double tm;
> +	int64_t txn_id;
> +	uint32_t txn_replica_id;
> +	uint32_t txn_last;
> 
>  	int bodycnt;
>  	uint32_t schema_version;
> diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
> index 165a543cf..0b796d728 100644
> --- a/test/unit/xrow.cc
> +++ b/test/unit/xrow.cc
> @@ -215,6 +215,9 @@ test_xrow_header_encode_decode()
>  	header.lsn = 400;
>  	header.tm = 123.456;
>  	header.bodycnt = 0;
> +	header.txn_id = header.lsn;
> +	header.txn_replica_id = header.replica_id;
> +	header.txn_last = true;
>  	uint64_t sync = 100500;
>  	struct iovec vec[1];
>  	is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
> diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
> index 23ab845b3..7ea5df777 100644
> --- a/test/vinyl/errinj.result
> +++ b/test/vinyl/errinj.result
> @@ -2164,7 +2164,7 @@ i:stat().disk.compact.queue -- 30 statements
>  - bytes_compressed: <bytes_compressed>
>    pages: 3
>    rows: 30
> -  bytes: 471
> +  bytes: 537
>  ...
>  i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue
>  ---
> @@ -2178,7 +2178,7 @@ i:stat().disk.compact.queue -- 40 statements
>  - bytes_compressed: <bytes_compressed>
>    pages: 4
>    rows: 40
> -  bytes: 628
> +  bytes: 716
>  ...
>  i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue
>  ---
> @@ -2192,7 +2192,7 @@ i:stat().disk.compact.queue -- 50 statements
>  - bytes_compressed: <bytes_compressed>
>    pages: 5
>    rows: 50
> -  bytes: 785
> +  bytes: 895
>  ...
>  i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue
>  ---
> @@ -2206,7 +2206,7 @@ i:stat().disk.compact.queue -- 50 statements
>  - bytes_compressed: <bytes_compressed>
>    pages: 5
>    rows: 50
> -  bytes: 785
> +  bytes: 895
>  ...
>  i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue
>  ---
> diff --git a/test/vinyl/info.result b/test/vinyl/info.result
> index 922728abe..4876ddc68 100644
> --- a/test/vinyl/info.result
> +++ b/test/vinyl/info.result
> @@ -285,19 +285,19 @@ stat_diff(istat(), st)
>          bytes: 26525
>        count: 1
>        out:
> -        bytes: 26049
> +        bytes: 26113
>          pages: 7
>          bytes_compressed: <bytes_compressed>
>          rows: 25
>      index_size: 294
>      rows: 25
> -    bytes: 26049
> +    bytes: 26113
>      bytes_compressed: <bytes_compressed>
>      bloom_size: 70
>      statement:
>        replaces: 25
>      pages: 7
> -  bytes: 26049
> +  bytes: 26113
>    put:
>      rows: 25
>      bytes: 26525
> @@ -325,26 +325,26 @@ stat_diff(istat(), st)
>          bytes: 53050
>        count: 1
>        out:
> -        bytes: 52091
> +        bytes: 52217
>          pages: 13
>          bytes_compressed: <bytes_compressed>
>          rows: 50
>      index_size: 252
>      rows: 25
> -    bytes: 26042
> +    bytes: 26104
>      bytes_compressed: <bytes_compressed>
>      pages: 6
>      statement:
>        replaces: 25
>      compact:
>        in:
> -        bytes: 78140
> +        bytes: 78330
>          pages: 20
>          bytes_compressed: <bytes_compressed>
>          rows: 75
>        count: 1
>        out:
> -        bytes: 52091
> +        bytes: 52217
>          pages: 13
>          bytes_compressed: <bytes_compressed>
>          rows: 50
> @@ -352,7 +352,7 @@ stat_diff(istat(), st)
>      rows: 50
>      bytes: 53050
>    rows: 25
> -  bytes: 26042
> +  bytes: 26104
>  ...
>  -- point lookup from disk + cache put
>  st = istat()
> @@ -376,7 +376,7 @@ stat_diff(istat(), st)
>    disk:
>      iterator:
>        read:
> -        bytes: 4167
> +        bytes: 4177
>          pages: 1
>          bytes_compressed: <bytes_compressed>
>          rows: 4
> @@ -626,7 +626,7 @@ stat_diff(istat(), st)
>    disk:
>      iterator:
>        read:
> -        bytes: 104300
> +        bytes: 104550
>          pages: 25
>          bytes_compressed: <bytes_compressed>
>          rows: 100
> @@ -971,7 +971,7 @@ istat()
>  ---
>  - rows: 306
>    run_avg: 1
> -  bytes: 317731
> +  bytes: 317981
>    upsert:
>      squashed: 0
>      applied: 0
> @@ -1049,7 +1049,7 @@ istat()
>      bloom_size: 140
>      pages: 25
>      bytes_compressed: <bytes_compressed>
> -    bytes: 104300
> +    bytes: 104550
>    txw:
>      bytes: 0
>      rows: 0
> @@ -1082,7 +1082,7 @@ gstat()
>        in: 0
>        queue: 0
>        out: 0
> -    data: 104300
> +    data: 104550
>      index: 1190
>    memory:
>      tuple_cache: 14313
> @@ -1209,7 +1209,7 @@ st2 = i2:stat()
>  ...
>  s:bsize()
>  ---
> -- 52199
> +- 52313
>  ...
>  i1:len(), i2:len()
>  ---
> @@ -1219,7 +1219,7 @@ i1:len(), i2:len()
>  i1:bsize(), i2:bsize()
>  ---
>  - 364
> -- 920
> +- 1022
>  ...
>  s:bsize() == st1.disk.bytes
>  ---
> @@ -1271,7 +1271,7 @@ st2 = i2:stat()
>  ...
>  s:bsize()
>  ---
> -- 107449
> +- 107563
>  ...
>  i1:len(), i2:len()
>  ---
> @@ -1281,7 +1281,7 @@ i1:len(), i2:len()
>  i1:bsize(), i2:bsize()
>  ---
>  - 49516
> -- 50072
> +- 50174
>  ...
>  s:bsize() == st1.memory.bytes + st1.disk.bytes
>  ---
> @@ -1336,7 +1336,7 @@ st2 = i2:stat()
>  ...
>  s:bsize()
>  ---
> -- 52199
> +- 52313
>  ...
>  i1:len(), i2:len()
>  ---
> @@ -1346,7 +1346,7 @@ i1:len(), i2:len()
>  i1:bsize(), i2:bsize()
>  ---
>  - 364
> -- 920
> +- 1022
>  ...
>  s:bsize() == st1.disk.bytes
>  ---
> diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result
> index 14201c5dd..a6b577dbc 100644
> --- a/test/vinyl/layout.result
> +++ b/test/vinyl/layout.result
> @@ -253,8 +253,8 @@ result
>          BODY:
>            row_index_offset: <offset>
>            offset: <offset>
> -          size: 108
> -          unpacked_size: 89
> +          size: 118
> +          unpacked_size: 99
>            row_count: 4
>            min_key: ['ёёё']
>    - - 00000000000000000008.run
> @@ -281,7 +281,7 @@ result
>        - HEADER:
>            type: ROWINDEX
>          BODY:
> -          row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00"
> +          row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06"
>    - - 00000000000000000012.index
>      - - HEADER:
>            type: RUNINFO
> @@ -298,8 +298,8 @@ result
>          BODY:
>            row_index_offset: <offset>
>            offset: <offset>
> -          size: 102
> -          unpacked_size: 83
> +          size: 110
> +          unpacked_size: 91
>            row_count: 3
>            min_key: ['ёёё']
>    - - 00000000000000000012.run
> @@ -324,7 +324,7 @@ result
>        - HEADER:
>            type: ROWINDEX
>          BODY:
> -          row_index: "\0\0\0\0\0\0\0\x14\0\0\0*"
> +          row_index: "\0\0\0\0\0\0\0\x16\0\0\0."
>    - - 00000000000000000006.index
>      - - HEADER:
>            type: RUNINFO
> @@ -341,8 +341,8 @@ result
>          BODY:
>            row_index_offset: <offset>
>            offset: <offset>
> -          size: 108
> -          unpacked_size: 89
> +          size: 118
> +          unpacked_size: 99
>            row_count: 4
>            min_key: [null, 'ёёё']
>    - - 00000000000000000006.run
> @@ -369,7 +369,7 @@ result
>        - HEADER:
>            type: ROWINDEX
>          BODY:
> -          row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00"
> +          row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06"
>    - - 00000000000000000010.index
>      - - HEADER:
>            type: RUNINFO
> @@ -386,8 +386,8 @@ result
>          BODY:
>            row_index_offset: <offset>
>            offset: <offset>
> -          size: 90
> -          unpacked_size: 71
> +          size: 98
> +          unpacked_size: 79
>            row_count: 3
>            min_key: [123, 'ёёё']
>    - - 00000000000000000010.run
> @@ -409,7 +409,7 @@ result
>        - HEADER:
>            type: ROWINDEX
>          BODY:
> -          row_index: "\0\0\0\0\0\0\0\x10\0\0\0\""
> +          row_index: "\0\0\0\0\0\0\0\x12\0\0\0&"
>  ...
>  test_run:cmd("clear filter")
>  ---

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: This is a digitally signed message part.
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20190106/5079573f/attachment.sig>


More information about the Tarantool-patches mailing list