[tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries

Georgy Kirichenko georgy at tarantool.org
Tue Jan 22 13:57:36 MSK 2019


Append txn_id, txn_replica_id and txn_last to xrow_header structure.
txn_replica_id identifies replica where transaction was started and
txn_id identifies transaction id on that replica. As transaction id
a lsn of the first row in this transaction is used.
txn_last set to true if it is the last row in a transaction, so we
could commit transaction with last row or use additional NOP requests
with txn_last = true ans valid txn_id and txn_replica_id.
For replication all local changes moved to xrows array tail to form
a separate transaction (like autonomous transaction) because it is not
possible to replicate such transaction back to it's creator.

As encoding/deconding rules assumed:
 1. txn_replica_id is encoded only if it is not equal with replica
    id. This might have point because of replication trigger
 2. txn_id and txn_last are encoded only for multi-row transaction.
    So if we do not have txn_id in a xstream then this means that it
    is a single row transaction.
This rules provides compatibility with previous xlog handling.

Needed for: 2798
---
 src/box/iproto_constants.h    |  3 ++
 src/box/wal.c                 | 36 +++++++++++++++-
 src/box/xrow.c                | 38 +++++++++++++++++
 src/box/xrow.h                |  5 ++-
 test/unit/xrow.cc             |  3 ++
 test/vinyl/errinj_stat.result |  8 ++--
 test/vinyl/layout.result      | 24 +++++------
 test/vinyl/stat.result        | 78 +++++++++++++++++------------------
 8 files changed, 138 insertions(+), 57 deletions(-)

diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 728514297..d01cdf840 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -60,6 +60,9 @@ enum iproto_key {
 	IPROTO_SCHEMA_VERSION = 0x05,
 	IPROTO_SERVER_VERSION = 0x06,
 	IPROTO_GROUP_ID = 0x07,
+	IPROTO_TXN_ID = 0x08,
+	IPROTO_TXN_REPLICA_ID = 0x09,
+	IPROTO_TXN_LAST = 0x0a,
 	/* Leave a gap for other keys in the header. */
 	IPROTO_SPACE_ID = 0x10,
 	IPROTO_INDEX_ID = 0x11,
diff --git a/src/box/wal.c b/src/box/wal.c
index 4c3537672..17ead08e7 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer)
 }
 
 static void
-wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
+wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
 	       struct xrow_header **end)
 {
+	struct xrow_header **row = begin;
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
 		if ((*row)->replica_id == 0) {
@@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
 			vclock_follow_xrow(vclock, *row);
 		}
 	}
+	if ((*begin)->replica_id != instance_id) {
+		/*
+		 * Move all local changes to the end of rows array and
+		 * a fake local transaction (like an autonomous transaction)
+		 * because we could not replicate the transaction back.
+		 */
+		struct xrow_header **row = end - 1;
+		while (row >= begin) {
+			if (row[0]->replica_id != instance_id) {
+				--row;
+				continue;
+			}
+			/* Local row, move it back. */
+			struct xrow_header **local_row = row;
+			while (local_row < end - 1 &&
+			       local_row[1]->replica_id != instance_id) {
+				struct xrow_header *tmp = local_row[0];
+				local_row[0] = local_row[1];
+				local_row[1] = tmp;
+			}
+			--row;
+		}
+		while (begin < end && begin[0]->replica_id != instance_id)
+			++begin;
+	}
+	/* Setup txn_id and tnx_replica_id for localy generated rows. */
+	row = begin;
+	while (row < end) {
+		row[0]->txn_id = begin[0]->lsn;
+		row[0]->txn_replica_id = instance_id;
+		row[0]->txn_last = row == end - 1 ? 1 : 0;
+		++row;
+	}
 }
 
 static void
diff --git a/src/box/xrow.c b/src/box/xrow.c
index ef3f81add..db524b3c8 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -133,12 +133,32 @@ error:
 		case IPROTO_SCHEMA_VERSION:
 			header->schema_version = mp_decode_uint(pos);
 			break;
+		case IPROTO_TXN_ID:
+			header->txn_id = mp_decode_uint(pos);
+			break;
+		case IPROTO_TXN_REPLICA_ID:
+			header->txn_replica_id = mp_decode_uint(pos);
+			break;
+		case IPROTO_TXN_LAST:
+			header->txn_last = mp_decode_uint(pos);
+			break;
 		default:
 			/* unknown header */
 			mp_next(pos);
 		}
 	}
 	assert(*pos <= end);
+	if (header->txn_id == 0) {
+		/*
+		 * Transaction id is not set so it is a single statement
+		 * transaction.
+		 */
+		header->txn_id = header->lsn;
+		header->txn_last = true;
+	}
+	if (header->txn_replica_id == 0)
+		header->txn_replica_id = header->replica_id;
+
 	/* Nop requests aren't supposed to have a body. */
 	if (*pos < end && header->type != IPROTO_NOP) {
 		const char *body = *pos;
@@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
 		d = mp_encode_double(d, header->tm);
 		map_size++;
 	}
+	if (header->txn_id != header->lsn || header->txn_last == 0) {
+		/* Encode txn id for multi row transaction members. */
+		d = mp_encode_uint(d, IPROTO_TXN_ID);
+		d = mp_encode_uint(d, header->txn_id);
+		map_size++;
+	}
+	if (header->txn_replica_id != header->replica_id) {
+		d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID);
+		d = mp_encode_uint(d, header->txn_replica_id);
+		map_size++;
+	}
+	if (header->txn_last && !(header->txn_id == header->lsn &&
+				  header->txn_replica_id == header->replica_id)) {
+		/* Set last row for multi row transaction. */
+		d = mp_encode_uint(d, IPROTO_TXN_LAST);
+		d = mp_encode_uint(d, header->txn_last);
+		map_size++;
+	}
 	assert(d <= data + XROW_HEADER_LEN_MAX);
 	mp_encode_map(data, map_size);
 	out->iov_len = d - (char *) out->iov_base;
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 6bab0a1fd..4acd84d56 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -47,7 +47,7 @@ enum {
 	XROW_HEADER_IOVMAX = 1,
 	XROW_BODY_IOVMAX = 2,
 	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
-	XROW_HEADER_LEN_MAX = 40,
+	XROW_HEADER_LEN_MAX = 60,
 	XROW_BODY_LEN_MAX = 128,
 	IPROTO_HEADER_LEN = 28,
 	/** 7 = sizeof(iproto_body_bin). */
@@ -69,6 +69,9 @@ struct xrow_header {
 	uint64_t sync;
 	int64_t lsn; /* LSN must be signed for correct comparison */
 	double tm;
+	int64_t txn_id;
+	uint32_t txn_replica_id;
+	uint32_t txn_last;
 
 	int bodycnt;
 	uint32_t schema_version;
diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
index 165a543cf..0b796d728 100644
--- a/test/unit/xrow.cc
+++ b/test/unit/xrow.cc
@@ -215,6 +215,9 @@ test_xrow_header_encode_decode()
 	header.lsn = 400;
 	header.tm = 123.456;
 	header.bodycnt = 0;
+	header.txn_id = header.lsn;
+	header.txn_replica_id = header.replica_id;
+	header.txn_last = true;
 	uint64_t sync = 100500;
 	struct iovec vec[1];
 	is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
diff --git a/test/vinyl/errinj_stat.result b/test/vinyl/errinj_stat.result
index 08801dbc6..361ddf5db 100644
--- a/test/vinyl/errinj_stat.result
+++ b/test/vinyl/errinj_stat.result
@@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements
 - bytes_compressed: <bytes_compressed>
   pages: 3
   rows: 30
-  bytes: 411
+  bytes: 477
 ...
 i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue
 ---
@@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements
 - bytes_compressed: <bytes_compressed>
   pages: 4
   rows: 40
-  bytes: 548
+  bytes: 636
 ...
 i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue
 ---
@@ -97,7 +97,7 @@ i:stat().disk.compaction.queue -- 50 statements
 - bytes_compressed: <bytes_compressed>
   pages: 5
   rows: 50
-  bytes: 685
+  bytes: 795
 ...
 i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue
 ---
@@ -111,7 +111,7 @@ i:stat().disk.compaction.queue -- 50 statements
 - bytes_compressed: <bytes_compressed>
   pages: 5
   rows: 50
-  bytes: 685
+  bytes: 795
 ...
 i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue
 ---
diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result
index 14201c5dd..a6b577dbc 100644
--- a/test/vinyl/layout.result
+++ b/test/vinyl/layout.result
@@ -253,8 +253,8 @@ result
         BODY:
           row_index_offset: <offset>
           offset: <offset>
-          size: 108
-          unpacked_size: 89
+          size: 118
+          unpacked_size: 99
           row_count: 4
           min_key: ['ёёё']
   - - 00000000000000000008.run
@@ -281,7 +281,7 @@ result
       - HEADER:
           type: ROWINDEX
         BODY:
-          row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00"
+          row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06"
   - - 00000000000000000012.index
     - - HEADER:
           type: RUNINFO
@@ -298,8 +298,8 @@ result
         BODY:
           row_index_offset: <offset>
           offset: <offset>
-          size: 102
-          unpacked_size: 83
+          size: 110
+          unpacked_size: 91
           row_count: 3
           min_key: ['ёёё']
   - - 00000000000000000012.run
@@ -324,7 +324,7 @@ result
       - HEADER:
           type: ROWINDEX
         BODY:
-          row_index: "\0\0\0\0\0\0\0\x14\0\0\0*"
+          row_index: "\0\0\0\0\0\0\0\x16\0\0\0."
   - - 00000000000000000006.index
     - - HEADER:
           type: RUNINFO
@@ -341,8 +341,8 @@ result
         BODY:
           row_index_offset: <offset>
           offset: <offset>
-          size: 108
-          unpacked_size: 89
+          size: 118
+          unpacked_size: 99
           row_count: 4
           min_key: [null, 'ёёё']
   - - 00000000000000000006.run
@@ -369,7 +369,7 @@ result
       - HEADER:
           type: ROWINDEX
         BODY:
-          row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00"
+          row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06"
   - - 00000000000000000010.index
     - - HEADER:
           type: RUNINFO
@@ -386,8 +386,8 @@ result
         BODY:
           row_index_offset: <offset>
           offset: <offset>
-          size: 90
-          unpacked_size: 71
+          size: 98
+          unpacked_size: 79
           row_count: 3
           min_key: [123, 'ёёё']
   - - 00000000000000000010.run
@@ -409,7 +409,7 @@ result
       - HEADER:
           type: ROWINDEX
         BODY:
-          row_index: "\0\0\0\0\0\0\0\x10\0\0\0\""
+          row_index: "\0\0\0\0\0\0\0\x12\0\0\0&"
 ...
 test_run:cmd("clear filter")
 ---
diff --git a/test/vinyl/stat.result b/test/vinyl/stat.result
index 419d3e6c2..33b604b9f 100644
--- a/test/vinyl/stat.result
+++ b/test/vinyl/stat.result
@@ -299,7 +299,7 @@ stat_diff(istat(), st)
   run_count: 1
   disk:
     last_level:
-      bytes: 26049
+      bytes: 26113
       pages: 7
       bytes_compressed: <bytes_compressed>
       rows: 25
@@ -312,16 +312,16 @@ stat_diff(istat(), st)
         bytes: 26525
       count: 1
       output:
-        bytes: 26049
+        bytes: 26113
         pages: 7
         bytes_compressed: <bytes_compressed>
         rows: 25
-    bytes: 26049
+    bytes: 26113
     index_size: 294
     pages: 7
     bytes_compressed: <bytes_compressed>
     bloom_size: 70
-  bytes: 26049
+  bytes: 26113
   put:
     rows: 25
     bytes: 26525
@@ -344,7 +344,7 @@ stat_diff(istat(), st)
 ---
 - disk:
     last_level:
-      bytes: 26042
+      bytes: 26104
       pages: 6
       bytes_compressed: <bytes_compressed>
       rows: 25
@@ -357,23 +357,23 @@ stat_diff(istat(), st)
         bytes: 53050
       count: 1
       output:
-        bytes: 52091
+        bytes: 52217
         pages: 13
         bytes_compressed: <bytes_compressed>
         rows: 50
-    bytes: 26042
+    bytes: 26104
     index_size: 252
     pages: 6
     bytes_compressed: <bytes_compressed>
     compaction:
       input:
-        bytes: 78140
+        bytes: 78330
         pages: 20
         bytes_compressed: <bytes_compressed>
         rows: 75
       count: 1
       output:
-        bytes: 52091
+        bytes: 52217
         pages: 13
         bytes_compressed: <bytes_compressed>
         rows: 50
@@ -381,7 +381,7 @@ stat_diff(istat(), st)
     rows: 50
     bytes: 53050
   rows: 25
-  bytes: 26042
+  bytes: 26104
 ...
 -- point lookup from disk + cache put
 st = istat()
@@ -405,7 +405,7 @@ stat_diff(istat(), st)
   disk:
     iterator:
       read:
-        bytes: 4167
+        bytes: 4177
         pages: 1
         bytes_compressed: <bytes_compressed>
         rows: 4
@@ -655,7 +655,7 @@ stat_diff(istat(), st)
   disk:
     iterator:
       read:
-        bytes: 104300
+        bytes: 104550
         pages: 25
         bytes_compressed: <bytes_compressed>
         rows: 100
@@ -1000,7 +1000,7 @@ istat()
 ---
 - rows: 306
   run_avg: 1
-  bytes: 317731
+  bytes: 317981
   upsert:
     squashed: 0
     applied: 0
@@ -1032,7 +1032,7 @@ istat()
       bytes_compressed: <bytes_compressed>
       pages: 25
       rows: 100
-      bytes: 104300
+      bytes: 104550
     rows: 100
     statement:
       inserts: 0
@@ -1083,7 +1083,7 @@ istat()
       count: 0
     pages: 25
     bytes_compressed: <bytes_compressed>
-    bytes: 104300
+    bytes: 104550
   txw:
     bytes: 0
     rows: 0
@@ -1123,8 +1123,8 @@ gstat()
     page_index: 1050
     bloom_filter: 140
   disk:
-    data_compacted: 104300
-    data: 104300
+    data_compacted: 104550
+    data: 104550
     index: 1190
   scheduler:
     tasks_inprogress: 0
@@ -1163,7 +1163,7 @@ box.snapshot()
 stat_diff(gstat(), st, 'scheduler')
 ---
 - dump_input: 104200
-  dump_output: 103592
+  dump_output: 104018
   tasks_completed: 2
   dump_count: 1
 ...
@@ -1180,7 +1180,7 @@ box.snapshot()
 stat_diff(gstat(), st, 'scheduler')
 ---
 - dump_input: 10420
-  dump_output: 10371
+  dump_output: 10417
   tasks_completed: 2
   dump_count: 1
 ...
@@ -1195,9 +1195,9 @@ while i1:stat().disk.compaction.count == 0 do fiber.sleep(0.01) end
 ...
 stat_diff(gstat(), st, 'scheduler')
 ---
-- compaction_input: 112188
+- compaction_input: 112436
   tasks_completed: 1
-  compaction_output: 101984
+  compaction_output: 102208
 ...
 st = gstat()
 ---
@@ -1210,9 +1210,9 @@ while i2:stat().disk.compaction.count == 0 do fiber.sleep(0.01) end
 ...
 stat_diff(gstat(), st, 'scheduler')
 ---
-- compaction_input: 1775
+- compaction_input: 1999
   tasks_completed: 1
-  compaction_output: 1608
+  compaction_output: 1810
 ...
 s:drop()
 ---
@@ -1324,7 +1324,7 @@ st2 = i2:stat()
 ...
 s:bsize()
 ---
-- 52199
+- 52313
 ...
 i1:len(), i2:len()
 ---
@@ -1334,7 +1334,7 @@ i1:len(), i2:len()
 i1:bsize(), i2:bsize()
 ---
 - 364
-- 920
+- 1022
 ...
 s:bsize() == st1.disk.bytes
 ---
@@ -1386,7 +1386,7 @@ st2 = i2:stat()
 ...
 s:bsize()
 ---
-- 107449
+- 107563
 ...
 i1:len(), i2:len()
 ---
@@ -1396,7 +1396,7 @@ i1:len(), i2:len()
 i1:bsize(), i2:bsize()
 ---
 - 49516
-- 50072
+- 50174
 ...
 s:bsize() == st1.memory.bytes + st1.disk.bytes
 ---
@@ -1451,7 +1451,7 @@ st2 = i2:stat()
 ...
 s:bsize()
 ---
-- 52199
+- 52313
 ...
 i1:len(), i2:len()
 ---
@@ -1461,7 +1461,7 @@ i1:len(), i2:len()
 i1:bsize(), i2:bsize()
 ---
 - 364
-- 920
+- 1022
 ...
 s:bsize() == st1.disk.bytes
 ---
@@ -1653,18 +1653,18 @@ i1:stat().disk.last_level
 - bytes_compressed: <bytes_compressed>
   pages: 2
   rows: 100
-  bytes: 11815
+  bytes: 12019
 ...
 i2:stat().disk.last_level
 ---
 - bytes_compressed: <bytes_compressed>
   pages: 1
   rows: 100
-  bytes: 1608
+  bytes: 1810
 ...
 box.stat.vinyl().disk.data_compacted
 ---
-- 11815
+- 12019
 ...
 for i = 1, 100, 10 do s:replace{i, i * 1000, digest.urandom(100)} end
 ---
@@ -1678,18 +1678,18 @@ i1:stat().disk.last_level
 - bytes_compressed: <bytes_compressed>
   pages: 2
   rows: 100
-  bytes: 11815
+  bytes: 12019
 ...
 i2:stat().disk.last_level
 ---
 - bytes_compressed: <bytes_compressed>
   pages: 1
   rows: 100
-  bytes: 1608
+  bytes: 1810
 ...
 box.stat.vinyl().disk.data_compacted
 ---
-- 11815
+- 12019
 ...
 i1:compact()
 ---
@@ -1702,11 +1702,11 @@ i1:stat().disk.last_level
 - bytes_compressed: <bytes_compressed>
   pages: 2
   rows: 100
-  bytes: 11841
+  bytes: 12045
 ...
 box.stat.vinyl().disk.data_compacted
 ---
-- 11841
+- 12045
 ...
 i2:compact()
 ---
@@ -1719,11 +1719,11 @@ i2:stat().disk.last_level
 - bytes_compressed: <bytes_compressed>
   pages: 1
   rows: 110
-  bytes: 1794
+  bytes: 2016
 ...
 box.stat.vinyl().disk.data_compacted
 ---
-- 11841
+- 12045
 ...
 s:drop()
 ---
-- 
2.20.1





More information about the Tarantool-patches mailing list