[tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries

Vladimir Davydov vdavydov.dev at gmail.com
Fri Feb 15 16:15:41 MSK 2019


On Tue, Feb 12, 2019 at 11:04:31PM +0300, Georgy Kirichenko wrote:
> Append txn_id and txn_commit to xrow_header structure, txn_id identifies
> transaction id on replica where transaction was started. As transaction id
> a lsn of the first row in the transaction is used. txn_commit is set to true
> if it is the last row in a transaction, so we could commit transaction by the
> last row or by additional NOP requests with txn_commit set as well as
> start transaction with NOP and corresponding txn_id. In case of  replication
> all local changes moved to an journal entry tail to form a separate transaction
> (like autonomous transaction) to be able to replicate changes back.
> 
> As encoding/deconding rule assumed:
>  * txn_id and txn_commit are encoded only for multi-row transactions.
>    So if we do not have txn_id while row decoding then this means that it
>    is a single row transaction.
>  * TXN_ID field is differential encoded as lsn - txn_id value
>  * txn_commit packed into TXN_FLAGS field
> 
> These rules provide compatibility with previous xlog format as well
> as good compaction level.
> 
> Needed for: 2798
> ---
>  src/box/iproto_constants.c |  4 ++--
>  src/box/iproto_constants.h |  7 +++++++
>  src/box/txn.c              | 21 +++++++++++++++++----
>  src/box/txn.h              |  2 ++
>  src/box/wal.c              | 28 ++++++++++++++++++++++++----
>  src/box/xrow.c             | 36 ++++++++++++++++++++++++++++++++++++
>  src/box/xrow.h             |  4 +++-
>  test/unit/xrow.cc          |  2 ++
>  8 files changed, 93 insertions(+), 11 deletions(-)
> 
> diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
> index 7fd295775..4d2e21752 100644
> --- a/src/box/iproto_constants.c
> +++ b/src/box/iproto_constants.c
> @@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
>  		/* 0x05 */	MP_UINT,   /* IPROTO_SCHEMA_VERSION */
>  		/* 0x06 */	MP_UINT,   /* IPROTO_SERVER_VERSION */
>  		/* 0x07 */	MP_UINT,   /* IPROTO_GROUP_ID */
> +		/* 0x08 */	MP_UINT,   /* IPROTO_TXN_ID */
> +		/* 0x09 */	MP_UINT,   /* IPROTO_TXN_FLAGS */

I don't quite like the name, because we encode not a txn id, but the
statement number in a txn. Let's rename it to IPROTO_TXN_STMT_NO or
IPROTO_TXN_STMT_ID or IPROTO_TXN_STMT_SEQ?

xrow_header::txn_id is OK though.

>  	/* }}} */
>  
>  	/* {{{ unused */
> -		/* 0x08 */	MP_UINT,
> -		/* 0x09 */	MP_UINT,
>  		/* 0x0a */	MP_UINT,
>  		/* 0x0b */	MP_UINT,
>  		/* 0x0c */	MP_UINT,

You forgot to patch iproto_key_strs.

Let's please add a test that ensures that those new headers are written
to xlogs. You could use xlog reader for that.

> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> index 728514297..fd80e3111 100644
> --- a/src/box/iproto_constants.h
> +++ b/src/box/iproto_constants.h
> @@ -49,6 +49,11 @@ enum {
>  	XLOG_FIXHEADER_SIZE = 19
>  };
>  

/* IPROTO_TXN_FLAGS bits. */

> +enum {
> +	/** Set for the last xrow in a transaction. */
> +	TXN_FLAG_COMMIT = 0x01,

IPROTO_TXN_COMMIT?

> +};
> +
>  enum iproto_key {
>  	IPROTO_REQUEST_TYPE = 0x00,
>  	IPROTO_SYNC = 0x01,
> @@ -60,6 +65,8 @@ enum iproto_key {
>  	IPROTO_SCHEMA_VERSION = 0x05,
>  	IPROTO_SERVER_VERSION = 0x06,
>  	IPROTO_GROUP_ID = 0x07,
> +	IPROTO_TXN_ID = 0x08,
> +	IPROTO_TXN_FLAGS = 0x09,
>  	/* Leave a gap for other keys in the header. */
>  	IPROTO_SPACE_ID = 0x10,
>  	IPROTO_INDEX_ID = 0x11,
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 7f4e85b47..de0152706 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -34,6 +34,7 @@
>  #include "journal.h"
>  #include <fiber.h>
>  #include "xrow.h"
> +#include "replication.h"
>  
>  double too_long_threshold;
>  
> @@ -150,6 +151,7 @@ txn_begin(bool is_autocommit)
>  	txn->engine = NULL;
>  	txn->engine_tx = NULL;
>  	txn->psql_txn = NULL;
> +	txn->remote_row_count = 0;

Nit: let's rename it to n_remote_rows to match n_rows and keep the two
member initializers together.

>  	/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
>  	fiber_set_txn(fiber(), txn);
>  	return txn;
> @@ -231,6 +233,10 @@ txn_commit_stmt(struct txn *txn, struct request *request)
>  	 * stmt->space can be NULL for IRPOTO_NOP.
>  	 */
>  	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
> +		if (request->header &&
> +		    request->header->replica_id != instance_id &&
> +		    request->header->replica_id != 0)
> +			++txn->remote_row_count;

Nit: if we moved this after txn_add_redo(), then we wouldn't have to
check if request->header is set.

>  		if (txn_add_redo(stmt, request) != 0)
>  			goto fail;
>  		++txn->n_rows;
> @@ -271,14 +277,21 @@ txn_write_to_wal(struct txn *txn)
>  		return -1;
>  
>  	struct txn_stmt *stmt;
> -	struct xrow_header **row = req->rows;
> +	struct xrow_header **remote_row = req->rows;
> +	struct xrow_header **local_row = req->rows + txn->remote_row_count;
>  	stailq_foreach_entry(stmt, &txn->stmts, next) {
>  		if (stmt->row == NULL)
> -			continue; /* A read (e.g. select) request */
> -		*row++ = stmt->row;
> +			/* A read (e.g. select) request */
> +			continue;

Nit: pointless change, please remove.

> +		if (stmt->row->replica_id == instance_id ||
> +		    stmt->row->replica_id == 0)
> +			*local_row++ = stmt->row;
> +		else
> +			*remote_row++ = stmt->row;

This piece of code looks nice, but it definitely needs a comment: what
we do, why we do that...

Anyway, we should add a test for this change. May be, it's even worth
submitting this change in a separate patch.

>  		req->approx_len += xrow_approx_len(stmt->row);
>  	}
> -	assert(row == req->rows + req->n_rows);
> +	assert(remote_row == req->rows + txn->remote_row_count);
> +	assert(local_row == req->rows + req->n_rows);
>  
>  	ev_tstamp start = ev_monotonic_now(loop());
>  	int64_t res = journal_write(req);
> diff --git a/src/box/txn.h b/src/box/txn.h
> index de5cb0de4..143f21715 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -180,6 +180,8 @@ struct txn {
>  	 /** Commit and rollback triggers */
>  	struct rlist on_commit, on_rollback;
>  	struct sql_txn *psql_txn;
> +	/** Count of remote rows. */
> +	uint32_t remote_row_count;

Nit: please use int rather than uint32_t and move the definition after
n_rows, because those are closely related.

>  };
>  
>  /* Pointer to the current transaction (if any) */
> diff --git a/src/box/wal.c b/src/box/wal.c
> index cdcaabc00..0ea3be68d 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -896,10 +896,11 @@ wal_writer_begin_rollback(struct wal_writer *writer)
>  	cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback);
>  }
>  
> -static void
> -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> +static int
> +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
>  	       struct xrow_header **end)
>  {
> +	struct xrow_header **row = begin;
>  	/** Assign LSN to all local rows. */
>  	for ( ; row < end; row++) {
>  		if ((*row)->replica_id == 0) {
> @@ -909,6 +910,21 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
>  			vclock_follow_xrow(vclock, *row);
>  		}
>  	}
> +	while (begin < end && begin[0]->replica_id != instance_id)
> +		++begin;
> +	/* Setup txn_id and tnx_replica_id for locally generated rows. */
> +	row = begin;
> +	while (row < end) {
> +		if (row[0]->replica_id != instance_id) {
> +			diag_set(ClientError, ER_UNSUPPORTED,
> +				 "Interleaved transactions");
> +			return -1;

Do we really need to bother about it here, in WAL? IMO a check in
applier would be enough.

> +		}
> +		row[0]->txn_id = begin[0]->lsn;
> +		row[0]->txn_commit = row == end - 1 ? 1 : 0;
> +		++row;

Why can't we do this while we are iterating over rows just a few lines
above, assigning LSNs?

> +	}
> +	return 0;
>  }
>  
>  static void
> @@ -979,7 +995,9 @@ wal_write_to_disk(struct cmsg *msg)
>  	struct journal_entry *entry;
>  	struct stailq_entry *last_committed = NULL;
>  	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
> -		wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows);
> +		if (wal_assign_lsn(&vclock, entry->rows,
> +				   entry->rows + entry->n_rows) < 0)
> +			goto done;
>  		entry->res = vclock_sum(&vclock);
>  		rc = xlog_write_entry(l, entry);
>  		if (rc < 0)
> @@ -1173,7 +1191,9 @@ wal_write_in_wal_mode_none(struct journal *journal,
>  			   struct journal_entry *entry)
>  {
>  	struct wal_writer *writer = (struct wal_writer *) journal;
> -	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
> +	if (wal_assign_lsn(&writer->vclock, entry->rows,
> +			   entry->rows + entry->n_rows) != 0)
> +		return -1;
>  	vclock_copy(&replicaset.vclock, &writer->vclock);
>  	return vclock_sum(&writer->vclock);
>  }
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index fec8873d0..29fa75de4 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -102,6 +102,8 @@ error:
>  
>  	if (mp_typeof(**pos) != MP_MAP)
>  		goto error;
> +	bool txn_is_set = false;
> +	uint32_t txn_flags = 0;
>  
>  	uint32_t size = mp_decode_map(pos);
>  	for (uint32_t i = 0; i < size; i++) {
> @@ -133,12 +135,32 @@ error:
>  		case IPROTO_SCHEMA_VERSION:
>  			header->schema_version = mp_decode_uint(pos);
>  			break;
> +		case IPROTO_TXN_ID:
> +			txn_is_set = true;
> +			header->txn_id = mp_decode_uint(pos);
> +			break;
> +		case IPROTO_TXN_FLAGS:
> +			txn_flags = mp_decode_uint(pos);
> +			header->txn_commit = txn_flags & TXN_FLAG_COMMIT;
> +			if ((txn_flags & ~TXN_FLAG_COMMIT) != 0)
> +				/* Unknow flags. */
> +				goto error;

We silently ignore unknown headers. I think we can silently ignore
unknown flags as well.

> +			break;
>  		default:
>  			/* unknown header */
>  			mp_next(pos);
>  		}
>  	}
>  	assert(*pos <= end);
> +	if (!txn_is_set) {
> +		/*
> +		 * Transaction id is not set so it is a single statement
> +		 * transaction.
> +		 */
> +		header->txn_commit = true;
> +	}
> +	header->txn_id = header->lsn + header->txn_id;
> +
>  	/* Nop requests aren't supposed to have a body. */
>  	if (*pos < end && header->type != IPROTO_NOP) {
>  		const char *body = *pos;
> @@ -223,6 +245,20 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
>  		d = mp_encode_double(d, header->tm);
>  		map_size++;
>  	}
> +	if (header->txn_id != 0) {
> +		if (header->txn_id != header->lsn || header->txn_commit == 0) {

Nit: txn_commit is a bool so

	s/header->txn_commit == 0/!header->txn_commit

> +			/* Encode txn id for multi row transaction members. */
> +			d = mp_encode_uint(d, IPROTO_TXN_ID);
> +			d = mp_encode_uint(d, header->lsn - header->txn_id);
> +			map_size++;
> +		}
> +		if (header->txn_commit && header->txn_id != header->lsn) {
> +			/* Setup last row for multi row transaction. */
> +			d = mp_encode_uint(d, IPROTO_TXN_FLAGS);
> +			d = mp_encode_uint(d, TXN_FLAG_COMMIT);
> +			map_size++;
> +		}
> +	}
>  	assert(d <= data + XROW_HEADER_LEN_MAX);
>  	mp_encode_map(data, map_size);
>  	out->iov_len = d - (char *) out->iov_base;
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index 719add4f0..bc4c4a2d7 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -47,7 +47,7 @@ enum {
>  	XROW_HEADER_IOVMAX = 1,
>  	XROW_BODY_IOVMAX = 2,
>  	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
> -	XROW_HEADER_LEN_MAX = 40,
> +	XROW_HEADER_LEN_MAX = 52,
>  	XROW_BODY_LEN_MAX = 128,
>  	IPROTO_HEADER_LEN = 28,
>  	/** 7 = sizeof(iproto_body_bin). */
> @@ -63,6 +63,8 @@ struct xrow_header {
>  	uint64_t sync;
>  	int64_t lsn; /* LSN must be signed for correct comparison */
>  	double tm;
> +	int64_t txn_id;
> +	bool txn_commit;

Please add a comment explaining how these are mapped to IPROTO headers.

>  
>  	int bodycnt;
>  	uint32_t schema_version;
> diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
> index 022d1f998..bc99285de 100644
> --- a/test/unit/xrow.cc
> +++ b/test/unit/xrow.cc
> @@ -215,6 +215,8 @@ test_xrow_header_encode_decode()
>  	header.lsn = 400;
>  	header.tm = 123.456;
>  	header.bodycnt = 0;
> +	header.txn_id = header.lsn;
> +	header.txn_commit = true;
>  	uint64_t sync = 100500;
>  	struct iovec vec[1];
>  	is(1, xrow_header_encode(&header, sync, vec, 200), "encode");



More information about the Tarantool-patches mailing list