[tarantool-patches] [PATCH 1/2] Journal transaction boundaries

Georgy Kirichenko georgy at tarantool.org
Sun Jan 6 00:26:05 MSK 2019


Append txn_id, txn_replica_id and txn_last to xrow_header structure.
txn_replica_id identifies replica where transaction was started and
txn_id identifies transaction id on that replica. As transaction id a lsn
of the first row in this transaction is used.
txn_last set to true if it is the last row in a transaction, so we could
commit transaction with last row or use additional NOP requests with
txn_last = true ans valid txn_id and txn_replica_id.
For replication all local changes moved to xrows array tail to form a
separate transaction (like autonomous transaction) because it is not
possible to replicate such transaction back to it's creator.

As encoding/deconding rules assumed:
 1. txn_replica_id is encoded only if it is not equal with replica id. This
 might have point because of replication trigger
 2. txn_id and txn_last are encoded only for multi-row transaction. So if we
 do not have txn_id in a xstream then this means that it is a single row
 transaction
This rules provides compatibility with previous xlog handling.

Needed for: 2798
---
 src/box/iproto_constants.h |  3 +++
 src/box/wal.c              | 33 ++++++++++++++++++++++++++++++++-
 src/box/xrow.c             | 38 ++++++++++++++++++++++++++++++++++++++
 src/box/xrow.h             |  5 ++++-
 test/unit/xrow.cc          |  3 +++
 test/vinyl/errinj.result   |  8 ++++----
 test/vinyl/info.result     | 38 +++++++++++++++++++-------------------
 test/vinyl/layout.result   | 24 ++++++++++++------------
 8 files changed, 115 insertions(+), 37 deletions(-)

diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 728514297..d01cdf840 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -60,6 +60,9 @@ enum iproto_key {
 	IPROTO_SCHEMA_VERSION = 0x05,
 	IPROTO_SERVER_VERSION = 0x06,
 	IPROTO_GROUP_ID = 0x07,
+	IPROTO_TXN_ID = 0x08,
+	IPROTO_TXN_REPLICA_ID = 0x09,
+	IPROTO_TXN_LAST = 0x0a,
 	/* Leave a gap for other keys in the header. */
 	IPROTO_SPACE_ID = 0x10,
 	IPROTO_INDEX_ID = 0x11,
diff --git a/src/box/wal.c b/src/box/wal.c
index 4c3537672..584d951c0 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer)
 }
 
 static void
-wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
+wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
 	       struct xrow_header **end)
 {
+	struct xrow_header **row = begin;
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
 		if ((*row)->replica_id == 0) {
@@ -917,6 +918,36 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
 			vclock_follow_xrow(vclock, *row);
 		}
 	}
+	if ((*begin)->replica_id != instance_id) {
+		/*
+		 * Move all local changes to the end of rows array and
+		 * a fake local transaction (like an autonomous transaction)
+		 * because we could not replicate the transaction back.
+		 */
+		row = begin;
+		while (row < end - 1) {
+			if (row[0]->replica_id == instance_id &&
+			    row[1]->replica_id != instance_id) {
+				struct xrow_header *tmp = row[0];
+				row[0] = row[1];
+				row[1] = tmp;
+			}
+			++row;
+		}
+		/* Search begin of local rows tail. */
+		row = end;
+		while (row > begin && row[-1]->replica_id == instance_id)
+			--row;
+		begin = row;
+	}
+	/* Setup txn_id and tnx_replica_id for localy generated rows. */
+	row = begin;
+	while (row < end) {
+		row[0]->txn_id = begin[0]->lsn;
+		row[0]->txn_replica_id = instance_id;
+		row[0]->txn_last = row == end - 1 ? 1 : 0;
+		++row;
+	}
 }
 
 static void
diff --git a/src/box/xrow.c b/src/box/xrow.c
index ef3f81add..db524b3c8 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -133,12 +133,32 @@ error:
 		case IPROTO_SCHEMA_VERSION:
 			header->schema_version = mp_decode_uint(pos);
 			break;
+		case IPROTO_TXN_ID:
+			header->txn_id = mp_decode_uint(pos);
+			break;
+		case IPROTO_TXN_REPLICA_ID:
+			header->txn_replica_id = mp_decode_uint(pos);
+			break;
+		case IPROTO_TXN_LAST:
+			header->txn_last = mp_decode_uint(pos);
+			break;
 		default:
 			/* unknown header */
 			mp_next(pos);
 		}
 	}
 	assert(*pos <= end);
+	if (header->txn_id == 0) {
+		/*
+		 * Transaction id is not set so it is a single statement
+		 * transaction.
+		 */
+		header->txn_id = header->lsn;
+		header->txn_last = true;
+	}
+	if (header->txn_replica_id == 0)
+		header->txn_replica_id = header->replica_id;
+
 	/* Nop requests aren't supposed to have a body. */
 	if (*pos < end && header->type != IPROTO_NOP) {
 		const char *body = *pos;
@@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
 		d = mp_encode_double(d, header->tm);
 		map_size++;
 	}
+	if (header->txn_id != header->lsn || header->txn_last == 0) {
+		/* Encode txn id for multi row transaction members. */
+		d = mp_encode_uint(d, IPROTO_TXN_ID);
+		d = mp_encode_uint(d, header->txn_id);
+		map_size++;
+	}
+	if (header->txn_replica_id != header->replica_id) {
+		d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID);
+		d = mp_encode_uint(d, header->txn_replica_id);
+		map_size++;
+	}
+	if (header->txn_last && !(header->txn_id == header->lsn &&
+				  header->txn_replica_id == header->replica_id)) {
+		/* Set last row for multi row transaction. */
+		d = mp_encode_uint(d, IPROTO_TXN_LAST);
+		d = mp_encode_uint(d, header->txn_last);
+		map_size++;
+	}
 	assert(d <= data + XROW_HEADER_LEN_MAX);
 	mp_encode_map(data, map_size);
 	out->iov_len = d - (char *) out->iov_base;
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 6bab0a1fd..4acd84d56 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -47,7 +47,7 @@ enum {
 	XROW_HEADER_IOVMAX = 1,
 	XROW_BODY_IOVMAX = 2,
 	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
-	XROW_HEADER_LEN_MAX = 40,
+	XROW_HEADER_LEN_MAX = 60,
 	XROW_BODY_LEN_MAX = 128,
 	IPROTO_HEADER_LEN = 28,
 	/** 7 = sizeof(iproto_body_bin). */
@@ -69,6 +69,9 @@ struct xrow_header {
 	uint64_t sync;
 	int64_t lsn; /* LSN must be signed for correct comparison */
 	double tm;
+	int64_t txn_id;
+	uint32_t txn_replica_id;
+	uint32_t txn_last;
 
 	int bodycnt;
 	uint32_t schema_version;
diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
index 165a543cf..0b796d728 100644
--- a/test/unit/xrow.cc
+++ b/test/unit/xrow.cc
@@ -215,6 +215,9 @@ test_xrow_header_encode_decode()
 	header.lsn = 400;
 	header.tm = 123.456;
 	header.bodycnt = 0;
+	header.txn_id = header.lsn;
+	header.txn_replica_id = header.replica_id;
+	header.txn_last = true;
 	uint64_t sync = 100500;
 	struct iovec vec[1];
 	is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
index 23ab845b3..7ea5df777 100644
--- a/test/vinyl/errinj.result
+++ b/test/vinyl/errinj.result
@@ -2164,7 +2164,7 @@ i:stat().disk.compact.queue -- 30 statements
 - bytes_compressed: <bytes_compressed>
   pages: 3
   rows: 30
-  bytes: 471
+  bytes: 537
 ...
 i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue
 ---
@@ -2178,7 +2178,7 @@ i:stat().disk.compact.queue -- 40 statements
 - bytes_compressed: <bytes_compressed>
   pages: 4
   rows: 40
-  bytes: 628
+  bytes: 716
 ...
 i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue
 ---
@@ -2192,7 +2192,7 @@ i:stat().disk.compact.queue -- 50 statements
 - bytes_compressed: <bytes_compressed>
   pages: 5
   rows: 50
-  bytes: 785
+  bytes: 895
 ...
 i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue
 ---
@@ -2206,7 +2206,7 @@ i:stat().disk.compact.queue -- 50 statements
 - bytes_compressed: <bytes_compressed>
   pages: 5
   rows: 50
-  bytes: 785
+  bytes: 895
 ...
 i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue
 ---
diff --git a/test/vinyl/info.result b/test/vinyl/info.result
index 922728abe..4876ddc68 100644
--- a/test/vinyl/info.result
+++ b/test/vinyl/info.result
@@ -285,19 +285,19 @@ stat_diff(istat(), st)
         bytes: 26525
       count: 1
       out:
-        bytes: 26049
+        bytes: 26113
         pages: 7
         bytes_compressed: <bytes_compressed>
         rows: 25
     index_size: 294
     rows: 25
-    bytes: 26049
+    bytes: 26113
     bytes_compressed: <bytes_compressed>
     bloom_size: 70
     statement:
       replaces: 25
     pages: 7
-  bytes: 26049
+  bytes: 26113
   put:
     rows: 25
     bytes: 26525
@@ -325,26 +325,26 @@ stat_diff(istat(), st)
         bytes: 53050
       count: 1
       out:
-        bytes: 52091
+        bytes: 52217
         pages: 13
         bytes_compressed: <bytes_compressed>
         rows: 50
     index_size: 252
     rows: 25
-    bytes: 26042
+    bytes: 26104
     bytes_compressed: <bytes_compressed>
     pages: 6
     statement:
       replaces: 25
     compact:
       in:
-        bytes: 78140
+        bytes: 78330
         pages: 20
         bytes_compressed: <bytes_compressed>
         rows: 75
       count: 1
       out:
-        bytes: 52091
+        bytes: 52217
         pages: 13
         bytes_compressed: <bytes_compressed>
         rows: 50
@@ -352,7 +352,7 @@ stat_diff(istat(), st)
     rows: 50
     bytes: 53050
   rows: 25
-  bytes: 26042
+  bytes: 26104
 ...
 -- point lookup from disk + cache put
 st = istat()
@@ -376,7 +376,7 @@ stat_diff(istat(), st)
   disk:
     iterator:
       read:
-        bytes: 4167
+        bytes: 4177
         pages: 1
         bytes_compressed: <bytes_compressed>
         rows: 4
@@ -626,7 +626,7 @@ stat_diff(istat(), st)
   disk:
     iterator:
       read:
-        bytes: 104300
+        bytes: 104550
         pages: 25
         bytes_compressed: <bytes_compressed>
         rows: 100
@@ -971,7 +971,7 @@ istat()
 ---
 - rows: 306
   run_avg: 1
-  bytes: 317731
+  bytes: 317981
   upsert:
     squashed: 0
     applied: 0
@@ -1049,7 +1049,7 @@ istat()
     bloom_size: 140
     pages: 25
     bytes_compressed: <bytes_compressed>
-    bytes: 104300
+    bytes: 104550
   txw:
     bytes: 0
     rows: 0
@@ -1082,7 +1082,7 @@ gstat()
       in: 0
       queue: 0
       out: 0
-    data: 104300
+    data: 104550
     index: 1190
   memory:
     tuple_cache: 14313
@@ -1209,7 +1209,7 @@ st2 = i2:stat()
 ...
 s:bsize()
 ---
-- 52199
+- 52313
 ...
 i1:len(), i2:len()
 ---
@@ -1219,7 +1219,7 @@ i1:len(), i2:len()
 i1:bsize(), i2:bsize()
 ---
 - 364
-- 920
+- 1022
 ...
 s:bsize() == st1.disk.bytes
 ---
@@ -1271,7 +1271,7 @@ st2 = i2:stat()
 ...
 s:bsize()
 ---
-- 107449
+- 107563
 ...
 i1:len(), i2:len()
 ---
@@ -1281,7 +1281,7 @@ i1:len(), i2:len()
 i1:bsize(), i2:bsize()
 ---
 - 49516
-- 50072
+- 50174
 ...
 s:bsize() == st1.memory.bytes + st1.disk.bytes
 ---
@@ -1336,7 +1336,7 @@ st2 = i2:stat()
 ...
 s:bsize()
 ---
-- 52199
+- 52313
 ...
 i1:len(), i2:len()
 ---
@@ -1346,7 +1346,7 @@ i1:len(), i2:len()
 i1:bsize(), i2:bsize()
 ---
 - 364
-- 920
+- 1022
 ...
 s:bsize() == st1.disk.bytes
 ---
diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result
index 14201c5dd..a6b577dbc 100644
--- a/test/vinyl/layout.result
+++ b/test/vinyl/layout.result
@@ -253,8 +253,8 @@ result
         BODY:
           row_index_offset: <offset>
           offset: <offset>
-          size: 108
-          unpacked_size: 89
+          size: 118
+          unpacked_size: 99
           row_count: 4
           min_key: ['Ñ‘Ñ‘Ñ‘']
   - - 00000000000000000008.run
@@ -281,7 +281,7 @@ result
       - HEADER:
           type: ROWINDEX
         BODY:
-          row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00"
+          row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06"
   - - 00000000000000000012.index
     - - HEADER:
           type: RUNINFO
@@ -298,8 +298,8 @@ result
         BODY:
           row_index_offset: <offset>
           offset: <offset>
-          size: 102
-          unpacked_size: 83
+          size: 110
+          unpacked_size: 91
           row_count: 3
           min_key: ['Ñ‘Ñ‘Ñ‘']
   - - 00000000000000000012.run
@@ -324,7 +324,7 @@ result
       - HEADER:
           type: ROWINDEX
         BODY:
-          row_index: "\0\0\0\0\0\0\0\x14\0\0\0*"
+          row_index: "\0\0\0\0\0\0\0\x16\0\0\0."
   - - 00000000000000000006.index
     - - HEADER:
           type: RUNINFO
@@ -341,8 +341,8 @@ result
         BODY:
           row_index_offset: <offset>
           offset: <offset>
-          size: 108
-          unpacked_size: 89
+          size: 118
+          unpacked_size: 99
           row_count: 4
           min_key: [null, 'Ñ‘Ñ‘Ñ‘']
   - - 00000000000000000006.run
@@ -369,7 +369,7 @@ result
       - HEADER:
           type: ROWINDEX
         BODY:
-          row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00"
+          row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06"
   - - 00000000000000000010.index
     - - HEADER:
           type: RUNINFO
@@ -386,8 +386,8 @@ result
         BODY:
           row_index_offset: <offset>
           offset: <offset>
-          size: 90
-          unpacked_size: 71
+          size: 98
+          unpacked_size: 79
           row_count: 3
           min_key: [123, 'Ñ‘Ñ‘Ñ‘']
   - - 00000000000000000010.run
@@ -409,7 +409,7 @@ result
       - HEADER:
           type: ROWINDEX
         BODY:
-          row_index: "\0\0\0\0\0\0\0\x10\0\0\0\""
+          row_index: "\0\0\0\0\0\0\0\x12\0\0\0&"
 ...
 test_run:cmd("clear filter")
 ---
-- 
2.20.1





More information about the Tarantool-patches mailing list