[tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries

Vladimir Davydov vdavydov.dev at gmail.com
Mon Jan 28 15:58:59 MSK 2019


On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote:
> Append txn_id, txn_replica_id and txn_last to xrow_header structure.
> txn_replica_id identifies replica where transaction was started and
> txn_id identifies transaction id on that replica. As transaction id
> a lsn of the first row in this transaction is used.
> txn_last set to true if it is the last row in a transaction, so we
> could commit transaction with last row or use additional NOP requests
> with txn_last = true ans valid txn_id and txn_replica_id.
> For replication all local changes moved to xrows array tail to form
> a separate transaction (like autonomous transaction) because it is not
> possible to replicate such transaction back to it's creator.
> 
> As encoding/deconding rules assumed:
>  1. txn_replica_id is encoded only if it is not equal with replica
>     id. This might have point because of replication trigger
>  2. txn_id and txn_last are encoded only for multi-row transaction.
>     So if we do not have txn_id in a xstream then this means that it
>     is a single row transaction.
> This rules provides compatibility with previous xlog handling.
> 
> Needed for: 2798
> ---
>  src/box/iproto_constants.h    |  3 ++

Looks like you forgot to update iproto_constants.c.

>  src/box/wal.c                 | 36 +++++++++++++++-
>  src/box/xrow.c                | 38 +++++++++++++++++
>  src/box/xrow.h                |  5 ++-
>  test/unit/xrow.cc             |  3 ++
>  test/vinyl/errinj_stat.result |  8 ++--
>  test/vinyl/layout.result      | 24 +++++------
>  test/vinyl/stat.result        | 78 +++++++++++++++++------------------
>  8 files changed, 138 insertions(+), 57 deletions(-)
> 
> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> index 728514297..d01cdf840 100644
> --- a/src/box/iproto_constants.h
> +++ b/src/box/iproto_constants.h
> @@ -60,6 +60,9 @@ enum iproto_key {
>  	IPROTO_SCHEMA_VERSION = 0x05,
>  	IPROTO_SERVER_VERSION = 0x06,
>  	IPROTO_GROUP_ID = 0x07,
> +	IPROTO_TXN_ID = 0x08,
> +	IPROTO_TXN_REPLICA_ID = 0x09,

Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I would
expect TXN_ID to be enough - we could use REPLICA_ID to make sure that
transaction identifiers from different instances don't overlap, because
there couldn't be a "multi-instance" transaction, could there?

> +	IPROTO_TXN_LAST = 0x0a,

I think we should instead introduce BEGIN and COMMIT commands, because:

 - We might need to attach some extra information to each transaction,
   e.g. mark transactions that were committed in parallel on the master
   so that they can be committed in parallel on a replica. Attaching
   such information to each row would be excessive.

 - We will need BEGIN and COMMIT for IPROTO transactions. It would be
   nice if we could share the code with them.

 - Having separate BEGIN/COMMIT rows would make tarantoolctl-cat output
   easier to read.

>  	/* Leave a gap for other keys in the header. */
>  	IPROTO_SPACE_ID = 0x10,
>  	IPROTO_INDEX_ID = 0x11,
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 4c3537672..17ead08e7 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer)
>  }
>  
>  static void
> -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
>  	       struct xrow_header **end)
>  {
> +	struct xrow_header **row = begin;
>  	/** Assign LSN to all local rows. */
>  	for ( ; row < end; row++) {
>  		if ((*row)->replica_id == 0) {
> @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
>  			vclock_follow_xrow(vclock, *row);
>  		}
>  	}
> +	if ((*begin)->replica_id != instance_id) {
> +		/*
> +		 * Move all local changes to the end of rows array and
> +		 * a fake local transaction (like an autonomous transaction)
> +		 * because we could not replicate the transaction back.
> +		 */
> +		struct xrow_header **row = end - 1;
> +		while (row >= begin) {
> +			if (row[0]->replica_id != instance_id) {
> +				--row;
> +				continue;
> +			}
> +			/* Local row, move it back. */
> +			struct xrow_header **local_row = row;
> +			while (local_row < end - 1 &&
> +			       local_row[1]->replica_id != instance_id) {
> +				struct xrow_header *tmp = local_row[0];
> +				local_row[0] = local_row[1];
> +				local_row[1] = tmp;

There's SWAP for this.

> +			}
> +			--row;
> +		}
> +		while (begin < end && begin[0]->replica_id != instance_id)
> +			++begin;
> +	}

I don't understand why we need to move rows generated locally (by
an on_replace trigger I surmise) to the end of a transaction. We
have TXN_ID attached to each row so we could leave the transactions
interleaved, couldn't we?

> +	/* Setup txn_id and tnx_replica_id for localy generated rows. */
> +	row = begin;
> +	while (row < end) {
> +		row[0]->txn_id = begin[0]->lsn;
> +		row[0]->txn_replica_id = instance_id;
> +		row[0]->txn_last = row == end - 1 ? 1 : 0;
> +		++row;
> +	}

I think we better use txn->id for TXN_ID rather than LSN.
Why do you think LSN should be used? I don't see any rationale
for that anywhere in the comments. Also, setting TXN_ID looks
like a job that should be done by txn_add_redo...

>  }
>  
>  static void
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index ef3f81add..db524b3c8 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -133,12 +133,32 @@ error:
>  		case IPROTO_SCHEMA_VERSION:
>  			header->schema_version = mp_decode_uint(pos);
>  			break;
> +		case IPROTO_TXN_ID:
> +			header->txn_id = mp_decode_uint(pos);
> +			break;
> +		case IPROTO_TXN_REPLICA_ID:
> +			header->txn_replica_id = mp_decode_uint(pos);
> +			break;
> +		case IPROTO_TXN_LAST:
> +			header->txn_last = mp_decode_uint(pos);

Should be bool?

> +			break;
>  		default:
>  			/* unknown header */
>  			mp_next(pos);
>  		}
>  	}
>  	assert(*pos <= end);
> +	if (header->txn_id == 0) {
> +		/*
> +		 * Transaction id is not set so it is a single statement
> +		 * transaction.
> +		 */
> +		header->txn_id = header->lsn;
> +		header->txn_last = true;
> +	}
> +	if (header->txn_replica_id == 0)
> +		header->txn_replica_id = header->replica_id;
> +
>  	/* Nop requests aren't supposed to have a body. */
>  	if (*pos < end && header->type != IPROTO_NOP) {
>  		const char *body = *pos;
> @@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
>  		d = mp_encode_double(d, header->tm);
>  		map_size++;
>  	}
> +	if (header->txn_id != header->lsn || header->txn_last == 0) {
> +		/* Encode txn id for multi row transaction members. */
> +		d = mp_encode_uint(d, IPROTO_TXN_ID);
> +		d = mp_encode_uint(d, header->txn_id);
> +		map_size++;
> +	}
> +	if (header->txn_replica_id != header->replica_id) {
> +		d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID);
> +		d = mp_encode_uint(d, header->txn_replica_id);
> +		map_size++;
> +	}
> +	if (header->txn_last && !(header->txn_id == header->lsn &&
> +				  header->txn_replica_id == header->replica_id)) {
> +		/* Set last row for multi row transaction. */
> +		d = mp_encode_uint(d, IPROTO_TXN_LAST);
> +		d = mp_encode_uint(d, header->txn_last);
> +		map_size++;
> +	}
>  	assert(d <= data + XROW_HEADER_LEN_MAX);
>  	mp_encode_map(data, map_size);
>  	out->iov_len = d - (char *) out->iov_base;
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index 6bab0a1fd..4acd84d56 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -47,7 +47,7 @@ enum {
>  	XROW_HEADER_IOVMAX = 1,
>  	XROW_BODY_IOVMAX = 2,
>  	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
> -	XROW_HEADER_LEN_MAX = 40,
> +	XROW_HEADER_LEN_MAX = 60,
>  	XROW_BODY_LEN_MAX = 128,
>  	IPROTO_HEADER_LEN = 28,
>  	/** 7 = sizeof(iproto_body_bin). */
> @@ -69,6 +69,9 @@ struct xrow_header {
>  	uint64_t sync;
>  	int64_t lsn; /* LSN must be signed for correct comparison */
>  	double tm;
> +	int64_t txn_id;
> +	uint32_t txn_replica_id;
> +	uint32_t txn_last;
>  
>  	int bodycnt;
>  	uint32_t schema_version;
> diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
> index 165a543cf..0b796d728 100644
> --- a/test/unit/xrow.cc
> +++ b/test/unit/xrow.cc
> @@ -215,6 +215,9 @@ test_xrow_header_encode_decode()
>  	header.lsn = 400;
>  	header.tm = 123.456;
>  	header.bodycnt = 0;
> +	header.txn_id = header.lsn;
> +	header.txn_replica_id = header.replica_id;
> +	header.txn_last = true;
>  	uint64_t sync = 100500;
>  	struct iovec vec[1];
>  	is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
> diff --git a/test/vinyl/errinj_stat.result b/test/vinyl/errinj_stat.result
> index 08801dbc6..361ddf5db 100644
> --- a/test/vinyl/errinj_stat.result
> +++ b/test/vinyl/errinj_stat.result
> @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements
>  - bytes_compressed: <bytes_compressed>
>    pages: 3
>    rows: 30
> -  bytes: 411
> +  bytes: 477
>  ...
>  i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue
>  ---
> @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements
>  - bytes_compressed: <bytes_compressed>
>    pages: 4
>    rows: 40
> -  bytes: 548
> +  bytes: 636

I wouldn't expect vinyl stats to be changed by this patch.
Why did it happen?



More information about the Tarantool-patches mailing list