[tarantool-patches] [PATCH v5 1/2] Journal transaction boundaries

Georgy Kirichenko georgy at tarantool.org
Thu Feb 21 18:29:16 MSK 2019


Append txn_id and is_commit to xrow_header structure, txn_id identifies
transaction id on replica where transaction was started. As transaction id
a lsn of the first row in the transaction is used. is_commit is set to true
for the last row in a transaction.

As encoding/deconding rule assumed:
 * txn_id encoded using transaction sequence number iproto field
   as IPROTO_TSN = lsn - txn_id,
 * is_commit packed into IPROTO_FLAGS field with a bit mask,
 * txn_id and is_commit are encoded only for multi-row transactions.
   So if we do not have txn_id after row decoding then this means that it
   is a single row transaction.

These rules provide compatibility with previous xlog format as well
as good compaction level.

Needed for: 2798
---
 src/box/iproto_constants.c     |   8 +--
 src/box/iproto_constants.h     |   7 +++
 src/box/lua/xlog.c             |  10 +++
 src/box/wal.c                  |   5 ++
 src/box/xrow.c                 |  41 +++++++++++++
 src/box/xrow.h                 |   4 +-
 test/unit/xrow.cc              |   2 +
 test/xlog/transaction.result   | 108 +++++++++++++++++++++++++++++++++
 test/xlog/transaction.test.lua |  46 ++++++++++++++
 9 files changed, 226 insertions(+), 5 deletions(-)
 create mode 100644 test/xlog/transaction.result
 create mode 100644 test/xlog/transaction.test.lua

diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index 7fd295775..09ded1ecb 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
 		/* 0x05 */	MP_UINT,   /* IPROTO_SCHEMA_VERSION */
 		/* 0x06 */	MP_UINT,   /* IPROTO_SERVER_VERSION */
 		/* 0x07 */	MP_UINT,   /* IPROTO_GROUP_ID */
+		/* 0x08 */	MP_UINT,   /* IPROTO_TSN */
+		/* 0x09 */	MP_UINT,   /* IPROTO_FLAGS */
 	/* }}} */
 
 	/* {{{ unused */
-		/* 0x08 */	MP_UINT,
-		/* 0x09 */	MP_UINT,
 		/* 0x0a */	MP_UINT,
 		/* 0x0b */	MP_UINT,
 		/* 0x0c */	MP_UINT,
@@ -136,8 +136,8 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = {
 	"schema version",   /* 0x05 */
 	"server version",   /* 0x06 */
 	"group id",         /* 0x07 */
-	NULL,               /* 0x08 */
-	NULL,               /* 0x09 */
+	"tsn",              /* 0x08 */
+	"flags",            /* 0x09 */
 	NULL,               /* 0x0a */
 	NULL,               /* 0x0b */
 	NULL,               /* 0x0c */
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 728514297..126d73352 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -49,6 +49,11 @@ enum {
 	XLOG_FIXHEADER_SIZE = 19
 };
 
+enum {
+	/** Set for the last xrow in a transaction. */
+	IPROTO_FLAG_COMMIT = 0x01,
+};
+
 enum iproto_key {
 	IPROTO_REQUEST_TYPE = 0x00,
 	IPROTO_SYNC = 0x01,
@@ -60,6 +65,8 @@ enum iproto_key {
 	IPROTO_SCHEMA_VERSION = 0x05,
 	IPROTO_SERVER_VERSION = 0x06,
 	IPROTO_GROUP_ID = 0x07,
+	IPROTO_TSN = 0x08,
+	IPROTO_FLAGS = 0x09,
 	/* Leave a gap for other keys in the header. */
 	IPROTO_SPACE_ID = 0x10,
 	IPROTO_INDEX_ID = 0x11,
diff --git a/src/box/lua/xlog.c b/src/box/lua/xlog.c
index 3c7cab38c..bd30187ae 100644
--- a/src/box/lua/xlog.c
+++ b/src/box/lua/xlog.c
@@ -221,6 +221,16 @@ lbox_xlog_parser_iterate(struct lua_State *L)
 		lua_pushnumber(L, row.tm);
 		lua_settable(L, -3); /* timestamp */
 	}
+	if (row.txn_id != row.lsn || !row.is_commit) {
+		lua_pushstring(L, "txn_id");
+		lua_pushnumber(L, row.txn_id);
+		lua_settable(L, -3); /* txn_id */
+	}
+	if (row.is_commit && row.txn_id != row.lsn) {
+		lua_pushstring(L, "commit");
+		lua_pushboolean(L, true);
+		lua_settable(L, -3); /* txn_commit flag */
+	}
 
 	lua_settable(L, -3); /* HEADER */
 
diff --git a/src/box/wal.c b/src/box/wal.c
index b2652bb17..78112da77 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -895,12 +895,17 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 	       struct xrow_header **row,
 	       struct xrow_header **end)
 {
+	int64_t txn_id = 0;
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
 		if ((*row)->replica_id == 0) {
 			(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
 				      vclock_get(base, instance_id);
 			(*row)->replica_id = instance_id;
+			/* Use the lsn of the first local row as txn_id. */
+			txn_id = txn_id == 0? (*row)->lsn: txn_id;
+			(*row)->txn_id = txn_id;
+			(*row)->is_commit = row == end - 1;
 		} else {
 			vclock_follow(vclock_diff, (*row)->replica_id,
 				      (*row)->lsn - vclock_get(base,
diff --git a/src/box/xrow.c b/src/box/xrow.c
index fec8873d0..8496a694f 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -102,6 +102,8 @@ error:
 
 	if (mp_typeof(**pos) != MP_MAP)
 		goto error;
+	bool has_tsn = false;
+	uint32_t flags = 0;
 
 	uint32_t size = mp_decode_map(pos);
 	for (uint32_t i = 0; i < size; i++) {
@@ -133,12 +135,30 @@ error:
 		case IPROTO_SCHEMA_VERSION:
 			header->schema_version = mp_decode_uint(pos);
 			break;
+		case IPROTO_TSN:
+			has_tsn = true;
+			header->txn_id = mp_decode_uint(pos);
+			break;
+		case IPROTO_FLAGS:
+			flags = mp_decode_uint(pos);
+			header->is_commit = flags & IPROTO_FLAG_COMMIT;
+			break;
 		default:
 			/* unknown header */
 			mp_next(pos);
 		}
 	}
 	assert(*pos <= end);
+	if (!has_tsn) {
+		/*
+		 * Transaction id is not set so it is a single statement
+		 * transaction.
+		 */
+		header->is_commit = true;
+	}
+	/* Restore transaction id from lsn and transaction serial number. */
+	header->txn_id = header->lsn - header->txn_id;
+
 	/* Nop requests aren't supposed to have a body. */
 	if (*pos < end && header->type != IPROTO_NOP) {
 		const char *body = *pos;
@@ -223,6 +243,27 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
 		d = mp_encode_double(d, header->tm);
 		map_size++;
 	}
+	if (header->txn_id != 0) {
+		if (header->txn_id != header->lsn || header->is_commit == 0) {
+			/*
+			 * Encode a transaction identifier for multi row
+			 * transaction members.
+			 */
+			d = mp_encode_uint(d, IPROTO_TSN);
+			/*
+			 * Differential encoding: write a transaction serial
+			 * number (it is equal to lsn - transaction id) instead.
+			 */
+			d = mp_encode_uint(d, header->lsn - header->txn_id);
+			map_size++;
+		}
+		if (header->is_commit && header->txn_id != header->lsn) {
+			/* Setup last row for multi row transaction. */
+			d = mp_encode_uint(d, IPROTO_FLAGS);
+			d = mp_encode_uint(d, IPROTO_FLAG_COMMIT);
+			map_size++;
+		}
+	}
 	assert(d <= data + XROW_HEADER_LEN_MAX);
 	mp_encode_map(data, map_size);
 	out->iov_len = d - (char *) out->iov_base;
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 2fce83bbc..ccd57f8b2 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -47,7 +47,7 @@ enum {
 	XROW_HEADER_IOVMAX = 1,
 	XROW_BODY_IOVMAX = 2,
 	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
-	XROW_HEADER_LEN_MAX = 40,
+	XROW_HEADER_LEN_MAX = 52,
 	XROW_BODY_LEN_MAX = 128,
 	IPROTO_HEADER_LEN = 28,
 	/** 7 = sizeof(iproto_body_bin). */
@@ -63,6 +63,8 @@ struct xrow_header {
 	uint64_t sync;
 	int64_t lsn; /* LSN must be signed for correct comparison */
 	double tm;
+	int64_t txn_id;
+	bool is_commit;
 
 	int bodycnt;
 	uint32_t schema_version;
diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
index 022d1f998..e0e7f1279 100644
--- a/test/unit/xrow.cc
+++ b/test/unit/xrow.cc
@@ -215,6 +215,8 @@ test_xrow_header_encode_decode()
 	header.lsn = 400;
 	header.tm = 123.456;
 	header.bodycnt = 0;
+	header.txn_id = header.lsn;
+	header.is_commit = true;
 	uint64_t sync = 100500;
 	struct iovec vec[1];
 	is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
diff --git a/test/xlog/transaction.result b/test/xlog/transaction.result
new file mode 100644
index 000000000..d2d9053ae
--- /dev/null
+++ b/test/xlog/transaction.result
@@ -0,0 +1,108 @@
+fio = require('fio')
+---
+...
+xlog = require('xlog').pairs
+---
+...
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function read_xlog(file)
+    local val = {}
+    for k, v in xlog(file) do
+        table.insert(val, setmetatable(v, { __serialize = "map"}))
+    end
+    return val
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- gh-2798 check for journal transaction encoding
+_ = box.schema.space.create('test'):create_index('pk')
+---
+...
+-- generate a new xlog
+box.snapshot()
+---
+- ok
+...
+lsn = box.info.lsn
+---
+...
+-- autocommit transaction
+box.space.test:replace({1})
+---
+- [1]
+...
+-- one row transaction
+box.begin() box.space.test:replace({2}) box.commit()
+---
+...
+-- two row transaction
+box.begin() for i = 3, 4 do box.space.test:replace({i}) end box.commit()
+---
+...
+-- four row transaction
+box.begin() for i = 5, 8 do box.space.test:replace({i}) end box.commit()
+---
+...
+-- open a new xlog
+box.snapshot()
+---
+- ok
+...
+-- read a previous one
+lsn_str = tostring(lsn)
+---
+...
+data = read_xlog(fio.pathjoin(box.cfg.wal_dir, string.rep('0', 20 - #lsn_str) .. tostring(lsn_str) .. '.xlog'))
+---
+...
+-- check nothing changed for single row transactions
+data[1].HEADER.txn_id == nil and data[1].HEADER.commit == nil
+---
+- true
+...
+data[2].HEADER.txn_id == nil and data[2].HEADER.commit == nil
+---
+- true
+...
+-- check two row transaction
+data[3].HEADER.txn_id == data[3].HEADER.lsn and data[3].HEADER.commit == nil
+---
+- true
+...
+data[4].HEADER.txn_id == data[3].HEADER.txn_id and data[4].HEADER.commit == true
+---
+- true
+...
+-- check four row transaction
+data[5].HEADER.txn_id == data[5].HEADER.lsn and data[5].HEADER.commit == nil
+---
+- true
+...
+data[6].HEADER.txn_id == data[5].HEADER.txn_id and data[6].HEADER.commit == nil
+---
+- true
+...
+data[7].HEADER.txn_id == data[5].HEADER.txn_id and data[7].HEADER.commit == nil
+---
+- true
+...
+data[8].HEADER.txn_id == data[5].HEADER.txn_id and data[8].HEADER.commit == true
+---
+- true
+...
+box.space.test:drop()
+---
+...
diff --git a/test/xlog/transaction.test.lua b/test/xlog/transaction.test.lua
new file mode 100644
index 000000000..062635098
--- /dev/null
+++ b/test/xlog/transaction.test.lua
@@ -0,0 +1,46 @@
+fio = require('fio')
+xlog = require('xlog').pairs
+env = require('test_run')
+test_run = env.new()
+
+test_run:cmd("setopt delimiter ';'")
+function read_xlog(file)
+    local val = {}
+    for k, v in xlog(file) do
+        table.insert(val, setmetatable(v, { __serialize = "map"}))
+    end
+    return val
+end;
+test_run:cmd("setopt delimiter ''");
+
+
+-- gh-2798 check for journal transaction encoding
+_ = box.schema.space.create('test'):create_index('pk')
+-- generate a new xlog
+box.snapshot()
+lsn = box.info.lsn
+-- autocommit transaction
+box.space.test:replace({1})
+-- one row transaction
+box.begin() box.space.test:replace({2}) box.commit()
+-- two row transaction
+box.begin() for i = 3, 4 do box.space.test:replace({i}) end box.commit()
+-- four row transaction
+box.begin() for i = 5, 8 do box.space.test:replace({i}) end box.commit()
+-- open a new xlog
+box.snapshot()
+-- read a previous one
+lsn_str = tostring(lsn)
+data = read_xlog(fio.pathjoin(box.cfg.wal_dir, string.rep('0', 20 - #lsn_str) .. tostring(lsn_str) .. '.xlog'))
+-- check nothing changed for single row transactions
+data[1].HEADER.txn_id == nil and data[1].HEADER.commit == nil
+data[2].HEADER.txn_id == nil and data[2].HEADER.commit == nil
+-- check two row transaction
+data[3].HEADER.txn_id == data[3].HEADER.lsn and data[3].HEADER.commit == nil
+data[4].HEADER.txn_id == data[3].HEADER.txn_id and data[4].HEADER.commit == true
+-- check four row transaction
+data[5].HEADER.txn_id == data[5].HEADER.lsn and data[5].HEADER.commit == nil
+data[6].HEADER.txn_id == data[5].HEADER.txn_id and data[6].HEADER.commit == nil
+data[7].HEADER.txn_id == data[5].HEADER.txn_id and data[7].HEADER.commit == nil
+data[8].HEADER.txn_id == data[5].HEADER.txn_id and data[8].HEADER.commit == true
+box.space.test:drop()
-- 
2.20.1





More information about the Tarantool-patches mailing list