Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol
@ 2019-01-22 10:57 Georgy Kirichenko
  2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries Georgy Kirichenko
  2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko
  0 siblings, 2 replies; 16+ messages in thread
From: Georgy Kirichenko @ 2019-01-22 10:57 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

This patchset introduces transactional replication and consist of two
commits:
 * the first one forms transaction boundaries in a xstream
 * the second one forms transactions in applier buffers and then
   applies them with correct begin/commit boundaries.

Note: this pathchset based on g.kirichenko/gh-980-disable-lsn-gaps

Note: distributed transaction are not supported so journal forms a
separate transaction for all local triggers effects.

Changes in v2:
 - Rebased against latest 2.1
 - Fixed local transaction extraction

Issue: https://github.com/tarantool/tarantool/issues/2798
Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-2798-transaction-boundaries

Georgy Kirichenko (2):
  Journal transaction boundaries
  Transaction support for applier

 src/box/applier.cc            | 207 ++++++++++++++++++++++++----------
 src/box/iproto_constants.h    |   3 +
 src/box/wal.c                 |  36 +++++-
 src/box/xrow.c                |  38 +++++++
 src/box/xrow.h                |   5 +-
 test/unit/xrow.cc             |   3 +
 test/vinyl/errinj_stat.result |   8 +-
 test/vinyl/layout.result      |  24 ++--
 test/vinyl/stat.result        |  78 ++++++-------
 9 files changed, 286 insertions(+), 116 deletions(-)

-- 
2.20.1

^ permalink raw reply	[flat|nested] 16+ messages in thread

* [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries
  2019-01-22 10:57 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
@ 2019-01-22 10:57 ` Georgy Kirichenko
  2019-01-28 12:58   ` Vladimir Davydov
                     ` (2 more replies)
  2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko
  1 sibling, 3 replies; 16+ messages in thread
From: Georgy Kirichenko @ 2019-01-22 10:57 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Append txn_id, txn_replica_id and txn_last to xrow_header structure.
txn_replica_id identifies replica where transaction was started and
txn_id identifies transaction id on that replica. As transaction id
a lsn of the first row in this transaction is used.
txn_last set to true if it is the last row in a transaction, so we
could commit transaction with last row or use additional NOP requests
with txn_last = true ans valid txn_id and txn_replica_id.
For replication all local changes moved to xrows array tail to form
a separate transaction (like autonomous transaction) because it is not
possible to replicate such transaction back to it's creator.

As encoding/deconding rules assumed:
 1. txn_replica_id is encoded only if it is not equal with replica
    id. This might have point because of replication trigger
 2. txn_id and txn_last are encoded only for multi-row transaction.
    So if we do not have txn_id in a xstream then this means that it
    is a single row transaction.
This rules provides compatibility with previous xlog handling.

Needed for: 2798
---
 src/box/iproto_constants.h    |  3 ++
 src/box/wal.c                 | 36 +++++++++++++++-
 src/box/xrow.c                | 38 +++++++++++++++++
 src/box/xrow.h                |  5 ++-
 test/unit/xrow.cc             |  3 ++
 test/vinyl/errinj_stat.result |  8 ++--
 test/vinyl/layout.result      | 24 +++++------
 test/vinyl/stat.result        | 78 +++++++++++++++++------------------
 8 files changed, 138 insertions(+), 57 deletions(-)

diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 728514297..d01cdf840 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -60,6 +60,9 @@ enum iproto_key {
 	IPROTO_SCHEMA_VERSION = 0x05,
 	IPROTO_SERVER_VERSION = 0x06,
 	IPROTO_GROUP_ID = 0x07,
+	IPROTO_TXN_ID = 0x08,
+	IPROTO_TXN_REPLICA_ID = 0x09,
+	IPROTO_TXN_LAST = 0x0a,
 	/* Leave a gap for other keys in the header. */
 	IPROTO_SPACE_ID = 0x10,
 	IPROTO_INDEX_ID = 0x11,
diff --git a/src/box/wal.c b/src/box/wal.c
index 4c3537672..17ead08e7 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer)
 }
 
 static void
-wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
+wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
 	       struct xrow_header **end)
 {
+	struct xrow_header **row = begin;
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
 		if ((*row)->replica_id == 0) {
@@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
 			vclock_follow_xrow(vclock, *row);
 		}
 	}
+	if ((*begin)->replica_id != instance_id) {
+		/*
+		 * Move all local changes to the end of rows array and
+		 * a fake local transaction (like an autonomous transaction)
+		 * because we could not replicate the transaction back.
+		 */
+		struct xrow_header **row = end - 1;
+		while (row >= begin) {
+			if (row[0]->replica_id != instance_id) {
+				--row;
+				continue;
+			}
+			/* Local row, move it back. */
+			struct xrow_header **local_row = row;
+			while (local_row < end - 1 &&
+			       local_row[1]->replica_id != instance_id) {
+				struct xrow_header *tmp = local_row[0];
+				local_row[0] = local_row[1];
+				local_row[1] = tmp;
+			}
+			--row;
+		}
+		while (begin < end && begin[0]->replica_id != instance_id)
+			++begin;
+	}
+	/* Setup txn_id and tnx_replica_id for localy generated rows. */
+	row = begin;
+	while (row < end) {
+		row[0]->txn_id = begin[0]->lsn;
+		row[0]->txn_replica_id = instance_id;
+		row[0]->txn_last = row == end - 1 ? 1 : 0;
+		++row;
+	}
 }
 
 static void
diff --git a/src/box/xrow.c b/src/box/xrow.c
index ef3f81add..db524b3c8 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -133,12 +133,32 @@ error:
 		case IPROTO_SCHEMA_VERSION:
 			header->schema_version = mp_decode_uint(pos);
 			break;
+		case IPROTO_TXN_ID:
+			header->txn_id = mp_decode_uint(pos);
+			break;
+		case IPROTO_TXN_REPLICA_ID:
+			header->txn_replica_id = mp_decode_uint(pos);
+			break;
+		case IPROTO_TXN_LAST:
+			header->txn_last = mp_decode_uint(pos);
+			break;
 		default:
 			/* unknown header */
 			mp_next(pos);
 		}
 	}
 	assert(*pos <= end);
+	if (header->txn_id == 0) {
+		/*
+		 * Transaction id is not set so it is a single statement
+		 * transaction.
+		 */
+		header->txn_id = header->lsn;
+		header->txn_last = true;
+	}
+	if (header->txn_replica_id == 0)
+		header->txn_replica_id = header->replica_id;
+
 	/* Nop requests aren't supposed to have a body. */
 	if (*pos < end && header->type != IPROTO_NOP) {
 		const char *body = *pos;
@@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
 		d = mp_encode_double(d, header->tm);
 		map_size++;
 	}
+	if (header->txn_id != header->lsn || header->txn_last == 0) {
+		/* Encode txn id for multi row transaction members. */
+		d = mp_encode_uint(d, IPROTO_TXN_ID);
+		d = mp_encode_uint(d, header->txn_id);
+		map_size++;
+	}
+	if (header->txn_replica_id != header->replica_id) {
+		d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID);
+		d = mp_encode_uint(d, header->txn_replica_id);
+		map_size++;
+	}
+	if (header->txn_last && !(header->txn_id == header->lsn &&
+				  header->txn_replica_id == header->replica_id)) {
+		/* Set last row for multi row transaction. */
+		d = mp_encode_uint(d, IPROTO_TXN_LAST);
+		d = mp_encode_uint(d, header->txn_last);
+		map_size++;
+	}
 	assert(d <= data + XROW_HEADER_LEN_MAX);
 	mp_encode_map(data, map_size);
 	out->iov_len = d - (char *) out->iov_base;
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 6bab0a1fd..4acd84d56 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -47,7 +47,7 @@ enum {
 	XROW_HEADER_IOVMAX = 1,
 	XROW_BODY_IOVMAX = 2,
 	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
-	XROW_HEADER_LEN_MAX = 40,
+	XROW_HEADER_LEN_MAX = 60,
 	XROW_BODY_LEN_MAX = 128,
 	IPROTO_HEADER_LEN = 28,
 	/** 7 = sizeof(iproto_body_bin). */
@@ -69,6 +69,9 @@ struct xrow_header {
 	uint64_t sync;
 	int64_t lsn; /* LSN must be signed for correct comparison */
 	double tm;
+	int64_t txn_id;
+	uint32_t txn_replica_id;
+	uint32_t txn_last;
 
 	int bodycnt;
 	uint32_t schema_version;
diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
index 165a543cf..0b796d728 100644
--- a/test/unit/xrow.cc
+++ b/test/unit/xrow.cc
@@ -215,6 +215,9 @@ test_xrow_header_encode_decode()
 	header.lsn = 400;
 	header.tm = 123.456;
 	header.bodycnt = 0;
+	header.txn_id = header.lsn;
+	header.txn_replica_id = header.replica_id;
+	header.txn_last = true;
 	uint64_t sync = 100500;
 	struct iovec vec[1];
 	is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
diff --git a/test/vinyl/errinj_stat.result b/test/vinyl/errinj_stat.result
index 08801dbc6..361ddf5db 100644
--- a/test/vinyl/errinj_stat.result
+++ b/test/vinyl/errinj_stat.result
@@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements
 - bytes_compressed: <bytes_compressed>
   pages: 3
   rows: 30
-  bytes: 411
+  bytes: 477
 ...
 i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue
 ---
@@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements
 - bytes_compressed: <bytes_compressed>
   pages: 4
   rows: 40
-  bytes: 548
+  bytes: 636
 ...
 i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue
 ---
@@ -97,7 +97,7 @@ i:stat().disk.compaction.queue -- 50 statements
 - bytes_compressed: <bytes_compressed>
   pages: 5
   rows: 50
-  bytes: 685
+  bytes: 795
 ...
 i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue
 ---
@@ -111,7 +111,7 @@ i:stat().disk.compaction.queue -- 50 statements
 - bytes_compressed: <bytes_compressed>
   pages: 5
   rows: 50
-  bytes: 685
+  bytes: 795
 ...
 i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue
 ---
diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result
index 14201c5dd..a6b577dbc 100644
--- a/test/vinyl/layout.result
+++ b/test/vinyl/layout.result
@@ -253,8 +253,8 @@ result
         BODY:
           row_index_offset: <offset>
           offset: <offset>
-          size: 108
-          unpacked_size: 89
+          size: 118
+          unpacked_size: 99
           row_count: 4
           min_key: ['ёёё']
   - - 00000000000000000008.run
@@ -281,7 +281,7 @@ result
       - HEADER:
           type: ROWINDEX
         BODY:
-          row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00"
+          row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06"
   - - 00000000000000000012.index
     - - HEADER:
           type: RUNINFO
@@ -298,8 +298,8 @@ result
         BODY:
           row_index_offset: <offset>
           offset: <offset>
-          size: 102
-          unpacked_size: 83
+          size: 110
+          unpacked_size: 91
           row_count: 3
           min_key: ['ёёё']
   - - 00000000000000000012.run
@@ -324,7 +324,7 @@ result
       - HEADER:
           type: ROWINDEX
         BODY:
-          row_index: "\0\0\0\0\0\0\0\x14\0\0\0*"
+          row_index: "\0\0\0\0\0\0\0\x16\0\0\0."
   - - 00000000000000000006.index
     - - HEADER:
           type: RUNINFO
@@ -341,8 +341,8 @@ result
         BODY:
           row_index_offset: <offset>
           offset: <offset>
-          size: 108
-          unpacked_size: 89
+          size: 118
+          unpacked_size: 99
           row_count: 4
           min_key: [null, 'ёёё']
   - - 00000000000000000006.run
@@ -369,7 +369,7 @@ result
       - HEADER:
           type: ROWINDEX
         BODY:
-          row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00"
+          row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06"
   - - 00000000000000000010.index
     - - HEADER:
           type: RUNINFO
@@ -386,8 +386,8 @@ result
         BODY:
           row_index_offset: <offset>
           offset: <offset>
-          size: 90
-          unpacked_size: 71
+          size: 98
+          unpacked_size: 79
           row_count: 3
           min_key: [123, 'ёёё']
   - - 00000000000000000010.run
@@ -409,7 +409,7 @@ result
       - HEADER:
           type: ROWINDEX
         BODY:
-          row_index: "\0\0\0\0\0\0\0\x10\0\0\0\""
+          row_index: "\0\0\0\0\0\0\0\x12\0\0\0&"
 ...
 test_run:cmd("clear filter")
 ---
diff --git a/test/vinyl/stat.result b/test/vinyl/stat.result
index 419d3e6c2..33b604b9f 100644
--- a/test/vinyl/stat.result
+++ b/test/vinyl/stat.result
@@ -299,7 +299,7 @@ stat_diff(istat(), st)
   run_count: 1
   disk:
     last_level:
-      bytes: 26049
+      bytes: 26113
       pages: 7
       bytes_compressed: <bytes_compressed>
       rows: 25
@@ -312,16 +312,16 @@ stat_diff(istat(), st)
         bytes: 26525
       count: 1
       output:
-        bytes: 26049
+        bytes: 26113
         pages: 7
         bytes_compressed: <bytes_compressed>
         rows: 25
-    bytes: 26049
+    bytes: 26113
     index_size: 294
     pages: 7
     bytes_compressed: <bytes_compressed>
     bloom_size: 70
-  bytes: 26049
+  bytes: 26113
   put:
     rows: 25
     bytes: 26525
@@ -344,7 +344,7 @@ stat_diff(istat(), st)
 ---
 - disk:
     last_level:
-      bytes: 26042
+      bytes: 26104
       pages: 6
       bytes_compressed: <bytes_compressed>
       rows: 25
@@ -357,23 +357,23 @@ stat_diff(istat(), st)
         bytes: 53050
       count: 1
       output:
-        bytes: 52091
+        bytes: 52217
         pages: 13
         bytes_compressed: <bytes_compressed>
         rows: 50
-    bytes: 26042
+    bytes: 26104
     index_size: 252
     pages: 6
     bytes_compressed: <bytes_compressed>
     compaction:
       input:
-        bytes: 78140
+        bytes: 78330
         pages: 20
         bytes_compressed: <bytes_compressed>
         rows: 75
       count: 1
       output:
-        bytes: 52091
+        bytes: 52217
         pages: 13
         bytes_compressed: <bytes_compressed>
         rows: 50
@@ -381,7 +381,7 @@ stat_diff(istat(), st)
     rows: 50
     bytes: 53050
   rows: 25
-  bytes: 26042
+  bytes: 26104
 ...
 -- point lookup from disk + cache put
 st = istat()
@@ -405,7 +405,7 @@ stat_diff(istat(), st)
   disk:
     iterator:
       read:
-        bytes: 4167
+        bytes: 4177
         pages: 1
         bytes_compressed: <bytes_compressed>
         rows: 4
@@ -655,7 +655,7 @@ stat_diff(istat(), st)
   disk:
     iterator:
       read:
-        bytes: 104300
+        bytes: 104550
         pages: 25
         bytes_compressed: <bytes_compressed>
         rows: 100
@@ -1000,7 +1000,7 @@ istat()
 ---
 - rows: 306
   run_avg: 1
-  bytes: 317731
+  bytes: 317981
   upsert:
     squashed: 0
     applied: 0
@@ -1032,7 +1032,7 @@ istat()
       bytes_compressed: <bytes_compressed>
       pages: 25
       rows: 100
-      bytes: 104300
+      bytes: 104550
     rows: 100
     statement:
       inserts: 0
@@ -1083,7 +1083,7 @@ istat()
       count: 0
     pages: 25
     bytes_compressed: <bytes_compressed>
-    bytes: 104300
+    bytes: 104550
   txw:
     bytes: 0
     rows: 0
@@ -1123,8 +1123,8 @@ gstat()
     page_index: 1050
     bloom_filter: 140
   disk:
-    data_compacted: 104300
-    data: 104300
+    data_compacted: 104550
+    data: 104550
     index: 1190
   scheduler:
     tasks_inprogress: 0
@@ -1163,7 +1163,7 @@ box.snapshot()
 stat_diff(gstat(), st, 'scheduler')
 ---
 - dump_input: 104200
-  dump_output: 103592
+  dump_output: 104018
   tasks_completed: 2
   dump_count: 1
 ...
@@ -1180,7 +1180,7 @@ box.snapshot()
 stat_diff(gstat(), st, 'scheduler')
 ---
 - dump_input: 10420
-  dump_output: 10371
+  dump_output: 10417
   tasks_completed: 2
   dump_count: 1
 ...
@@ -1195,9 +1195,9 @@ while i1:stat().disk.compaction.count == 0 do fiber.sleep(0.01) end
 ...
 stat_diff(gstat(), st, 'scheduler')
 ---
-- compaction_input: 112188
+- compaction_input: 112436
   tasks_completed: 1
-  compaction_output: 101984
+  compaction_output: 102208
 ...
 st = gstat()
 ---
@@ -1210,9 +1210,9 @@ while i2:stat().disk.compaction.count == 0 do fiber.sleep(0.01) end
 ...
 stat_diff(gstat(), st, 'scheduler')
 ---
-- compaction_input: 1775
+- compaction_input: 1999
   tasks_completed: 1
-  compaction_output: 1608
+  compaction_output: 1810
 ...
 s:drop()
 ---
@@ -1324,7 +1324,7 @@ st2 = i2:stat()
 ...
 s:bsize()
 ---
-- 52199
+- 52313
 ...
 i1:len(), i2:len()
 ---
@@ -1334,7 +1334,7 @@ i1:len(), i2:len()
 i1:bsize(), i2:bsize()
 ---
 - 364
-- 920
+- 1022
 ...
 s:bsize() == st1.disk.bytes
 ---
@@ -1386,7 +1386,7 @@ st2 = i2:stat()
 ...
 s:bsize()
 ---
-- 107449
+- 107563
 ...
 i1:len(), i2:len()
 ---
@@ -1396,7 +1396,7 @@ i1:len(), i2:len()
 i1:bsize(), i2:bsize()
 ---
 - 49516
-- 50072
+- 50174
 ...
 s:bsize() == st1.memory.bytes + st1.disk.bytes
 ---
@@ -1451,7 +1451,7 @@ st2 = i2:stat()
 ...
 s:bsize()
 ---
-- 52199
+- 52313
 ...
 i1:len(), i2:len()
 ---
@@ -1461,7 +1461,7 @@ i1:len(), i2:len()
 i1:bsize(), i2:bsize()
 ---
 - 364
-- 920
+- 1022
 ...
 s:bsize() == st1.disk.bytes
 ---
@@ -1653,18 +1653,18 @@ i1:stat().disk.last_level
 - bytes_compressed: <bytes_compressed>
   pages: 2
   rows: 100
-  bytes: 11815
+  bytes: 12019
 ...
 i2:stat().disk.last_level
 ---
 - bytes_compressed: <bytes_compressed>
   pages: 1
   rows: 100
-  bytes: 1608
+  bytes: 1810
 ...
 box.stat.vinyl().disk.data_compacted
 ---
-- 11815
+- 12019
 ...
 for i = 1, 100, 10 do s:replace{i, i * 1000, digest.urandom(100)} end
 ---
@@ -1678,18 +1678,18 @@ i1:stat().disk.last_level
 - bytes_compressed: <bytes_compressed>
   pages: 2
   rows: 100
-  bytes: 11815
+  bytes: 12019
 ...
 i2:stat().disk.last_level
 ---
 - bytes_compressed: <bytes_compressed>
   pages: 1
   rows: 100
-  bytes: 1608
+  bytes: 1810
 ...
 box.stat.vinyl().disk.data_compacted
 ---
-- 11815
+- 12019
 ...
 i1:compact()
 ---
@@ -1702,11 +1702,11 @@ i1:stat().disk.last_level
 - bytes_compressed: <bytes_compressed>
   pages: 2
   rows: 100
-  bytes: 11841
+  bytes: 12045
 ...
 box.stat.vinyl().disk.data_compacted
 ---
-- 11841
+- 12045
 ...
 i2:compact()
 ---
@@ -1719,11 +1719,11 @@ i2:stat().disk.last_level
 - bytes_compressed: <bytes_compressed>
   pages: 1
   rows: 110
-  bytes: 1794
+  bytes: 2016
 ...
 box.stat.vinyl().disk.data_compacted
 ---
-- 11841
+- 12045
 ...
 s:drop()
 ---
-- 
2.20.1

^ permalink raw reply	[flat|nested] 16+ messages in thread

* [tarantool-patches] [PATCH v2 2/2] Transaction support for applier
  2019-01-22 10:57 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
  2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries Georgy Kirichenko
@ 2019-01-22 10:57 ` Georgy Kirichenko
  2019-01-28 13:35   ` Vladimir Davydov
  1 sibling, 1 reply; 16+ messages in thread
From: Georgy Kirichenko @ 2019-01-22 10:57 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Applier fetch incoming rows to form a transaction and then apply it.
Implementation assumes that transaction could not mix in a
replication stream. Also distributed transaction are not supported yet.

Closes: #2798
Needed for: #980
---
 src/box/applier.cc | 207 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 148 insertions(+), 59 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index adbe88679..0e3832ad8 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -48,6 +48,7 @@
 #include "error.h"
 #include "session.h"
 #include "cfg.h"
+#include "txn.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -380,6 +381,102 @@ applier_join(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
+/**
+ * Read one transaction from network.
+ * Transaction rows are placed into row_buf as an array, row's bodies are
+ * placed into obuf because it is not allowed to relocate row's bodies.
+ * Also we could not use applier input buffer because rpos adjusted after xrow
+ * decoding and corresponding space going to reuse.
+ *
+ * Note: current implementation grants that transaction could not be mixed, so
+ * we read each transaction from first xrow until xrow with txn_last = true.
+ */
+static int64_t
+applier_read_tx(struct applier *applier, struct ibuf *row_buf,
+		struct obuf *data_buf)
+{
+	struct xrow_header *row;
+	struct ev_io *coio = &applier->io;
+	struct ibuf *ibuf = &applier->ibuf;
+	int64_t txn_id = 0;
+	uint32_t txn_replica_id = 0;
+
+	do {
+		row = (struct xrow_header *)ibuf_alloc(row_buf,
+						       sizeof(struct xrow_header));
+		if (row == NULL) {
+			diag_set(OutOfMemory, sizeof(struct xrow_header),
+				 "slab", "struct xrow_header");
+			goto error;
+		}
+
+		double timeout = replication_disconnect_timeout();
+		try {
+			/* TODO: we should have a C version of this function. */
+			coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+		} catch (...) {
+			goto error;
+		}
+
+		if (iproto_type_is_error(row->type)) {
+			xrow_decode_error(row);
+			goto error;
+		}
+
+		/* Replication request. */
+		if (row->replica_id == REPLICA_ID_NIL ||
+		    row->replica_id >= VCLOCK_MAX) {
+			/*
+			 * A safety net, this can only occur
+			 * if we're fed a strangely broken xlog.
+			 */
+			diag_set(ClientError, ER_UNKNOWN_REPLICA,
+				 int2str(row->replica_id),
+				 tt_uuid_str(&REPLICASET_UUID));
+			goto error;
+		}
+		if (ibuf_used(row_buf) == sizeof(struct xrow_header)) {
+			/*
+			 * First row in a transaction. In order to enforce
+			 * consistency check that first row lsn and replica id
+			 * match with transaction.
+			 */
+			txn_id = row->lsn;
+			txn_replica_id = row->replica_id;
+		}
+		if (txn_id != row->txn_id ||
+			   txn_replica_id != row->txn_replica_id) {
+			/* We are not able to handle interleaving transactions. */
+			diag_set(ClientError, ER_UNSUPPORTED,
+				 "replications",
+				 "interleaving transactions");
+			goto error;
+		}
+
+
+		applier->lag = ev_now(loop()) - row->tm;
+		applier->last_row_time = ev_monotonic_now(loop());
+
+		if (row->body->iov_base != NULL) {
+			void *new_base = obuf_alloc(data_buf, row->body->iov_len);
+			if (new_base == NULL) {
+				diag_set(OutOfMemory, row->body->iov_len,
+					 "slab", "xrow_data");
+				goto error;
+			}
+			memcpy(new_base, row->body->iov_base, row->body->iov_len);
+			row->body->iov_base = new_base;
+		}
+
+	} while (row->txn_last == 0);
+
+	return 0;
+error:
+	ibuf_reset(row_buf);
+	obuf_reset(data_buf);
+	return -1;
+}
+
 /**
  * Execute and process SUBSCRIBE request (follow updates from a master).
  */
@@ -396,6 +493,10 @@ applier_subscribe(struct applier *applier)
 	struct ibuf *ibuf = &applier->ibuf;
 	struct xrow_header row;
 	struct vclock remote_vclock_at_subscribe;
+	struct ibuf row_buf;
+	struct obuf data_buf;
+	ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header));
+	obuf_create(&data_buf, &cord()->slabc, 0x10000);
 
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
 				 &replicaset.vclock);
@@ -475,87 +576,75 @@ applier_subscribe(struct applier *applier)
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
-		/*
-		 * Tarantool < 1.7.7 does not send periodic heartbeat
-		 * messages so we can't assume that if we haven't heard
-		 * from the master for quite a while the connection is
-		 * broken - the master might just be idle.
-		 */
-		if (applier->version_id < version_id(1, 7, 7)) {
-			coio_read_xrow(coio, ibuf, &row);
-		} else {
-			double timeout = replication_disconnect_timeout();
-			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
-		}
+		if (applier_read_tx(applier, &row_buf, &data_buf) != 0)
+			diag_raise();
 
-		if (iproto_type_is_error(row.type))
-			xrow_decode_error_xc(&row);  /* error */
-		/* Replication request. */
-		if (row.replica_id == REPLICA_ID_NIL ||
-		    row.replica_id >= VCLOCK_MAX) {
-			/*
-			 * A safety net, this can only occur
-			 * if we're fed a strangely broken xlog.
-			 */
-			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
-				  int2str(row.replica_id),
-				  tt_uuid_str(&REPLICASET_UUID));
-		}
+		struct txn *txn = NULL;
+		struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos;
+		struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1;
 
-		applier->lag = ev_now(loop()) - row.tm;
+		applier->lag = ev_now(loop()) - last_row->tm;
 		applier->last_row_time = ev_monotonic_now(loop());
-		struct replica *replica = replica_by_id(row.replica_id);
+		struct replica *replica = replica_by_id(first_row->txn_replica_id);
 		struct latch *latch = (replica ? &replica->order_latch :
 				       &replicaset.applier.order_latch);
-		/*
-		 * In a full mesh topology, the same set
-		 * of changes may arrive via two
-		 * concurrently running appliers. Thanks
-		 * to vclock_follow() above, the first row
-		 * in the set will be skipped - but the
-		 * remaining may execute out of order,
-		 * when the following xstream_write()
-		 * yields on WAL. Hence we need a latch to
-		 * strictly order all changes which belong
-		 * to the same server id.
-		 */
 		latch_lock(latch);
+		/* First row identifies a transaction. */
+		assert(first_row->lsn == first_row->txn_id);
+		assert(first_row->replica_id == first_row->txn_replica_id);
 		if (vclock_get(&replicaset.applier.vclock,
-			       row.replica_id) < row.lsn) {
-			if (row.replica_id == instance_id &&
+			       first_row->replica_id) < first_row->lsn) {
+			if (first_row->replica_id == instance_id &&
 			    vclock_get(&replicaset.vclock, instance_id) <
-			    row.lsn) {
+			    first_row->lsn) {
 				/* Local row returned back. */
 				goto done;
 			}
 			/* Preserve old lsn value. */
 			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
-						     row.replica_id);
-			vclock_follow_xrow(&replicaset.applier.vclock, &row);
-			int res = xstream_write(applier->subscribe_stream, &row);
-			struct error *e = diag_last_error(diag_get());
-			if (res != 0 && e->type == &type_ClientError &&
-			    box_error_code(e) == ER_TUPLE_FOUND &&
-			    replication_skip_conflict) {
-				/**
-				 * Silently skip ER_TUPLE_FOUND error if such
-				 * option is set in config.
-				 */
-				diag_clear(diag_get());
-				row.type = IPROTO_NOP;
-				row.bodycnt = 0;
-				res = xstream_write(applier->subscribe_stream,
-						    &row);
+						     first_row->replica_id);
+
+			struct xrow_header *row = first_row;
+			if (first_row != last_row)
+				txn = txn_begin(false);
+			int res = 0;
+			while (row <= last_row && res == 0) {
+				vclock_follow_xrow(&replicaset.applier.vclock, row);
+				res = xstream_write(applier->subscribe_stream, row);
+				struct error *e;
+				if (res != 0 &&
+				    (e = diag_last_error(diag_get()))->type ==
+				    &type_ClientError &&
+				    box_error_code(e) == ER_TUPLE_FOUND &&
+				    replication_skip_conflict) {
+					/**
+					 * Silently skip ER_TUPLE_FOUND error
+					 * if such option is set in config.
+					 */
+					diag_clear(diag_get());
+					row->type = IPROTO_NOP;
+					row->bodycnt = 0;
+					res = xstream_write(applier->subscribe_stream,
+							    row);
+				}
+				++row;
 			}
+			if (res == 0 && txn != NULL)
+				res = txn_commit(txn);
+
 			if (res != 0) {
 				/* Rollback lsn to have a chance for a retry. */
 				vclock_set(&replicaset.applier.vclock,
-					   row.replica_id, old_lsn);
+					   first_row->replica_id, old_lsn);
+				obuf_reset(&data_buf);
+				ibuf_reset(&row_buf);
 				latch_unlock(latch);
 				diag_raise();
 			}
 		}
 done:
+		obuf_reset(&data_buf);
+		ibuf_reset(&row_buf);
 		latch_unlock(latch);
 		/*
 		 * Stay 'orphan' until appliers catch up with
-- 
2.20.1

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries
  2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries Georgy Kirichenko
@ 2019-01-28 12:58   ` Vladimir Davydov
  2019-01-29 10:09     ` Георгий Кириченко
  2019-01-28 13:00   ` Vladimir Davydov
  2019-02-08 16:56   ` Konstantin Osipov
  2 siblings, 1 reply; 16+ messages in thread
From: Vladimir Davydov @ 2019-01-28 12:58 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote:
> Append txn_id, txn_replica_id and txn_last to xrow_header structure.
> txn_replica_id identifies replica where transaction was started and
> txn_id identifies transaction id on that replica. As transaction id
> a lsn of the first row in this transaction is used.
> txn_last set to true if it is the last row in a transaction, so we
> could commit transaction with last row or use additional NOP requests
> with txn_last = true ans valid txn_id and txn_replica_id.
> For replication all local changes moved to xrows array tail to form
> a separate transaction (like autonomous transaction) because it is not
> possible to replicate such transaction back to it's creator.
> 
> As encoding/deconding rules assumed:
>  1. txn_replica_id is encoded only if it is not equal with replica
>     id. This might have point because of replication trigger
>  2. txn_id and txn_last are encoded only for multi-row transaction.
>     So if we do not have txn_id in a xstream then this means that it
>     is a single row transaction.
> This rules provides compatibility with previous xlog handling.
> 
> Needed for: 2798
> ---
>  src/box/iproto_constants.h    |  3 ++

Looks like you forgot to update iproto_constants.c.

>  src/box/wal.c                 | 36 +++++++++++++++-
>  src/box/xrow.c                | 38 +++++++++++++++++
>  src/box/xrow.h                |  5 ++-
>  test/unit/xrow.cc             |  3 ++
>  test/vinyl/errinj_stat.result |  8 ++--
>  test/vinyl/layout.result      | 24 +++++------
>  test/vinyl/stat.result        | 78 +++++++++++++++++------------------
>  8 files changed, 138 insertions(+), 57 deletions(-)
> 
> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> index 728514297..d01cdf840 100644
> --- a/src/box/iproto_constants.h
> +++ b/src/box/iproto_constants.h
> @@ -60,6 +60,9 @@ enum iproto_key {
>  	IPROTO_SCHEMA_VERSION = 0x05,
>  	IPROTO_SERVER_VERSION = 0x06,
>  	IPROTO_GROUP_ID = 0x07,
> +	IPROTO_TXN_ID = 0x08,
> +	IPROTO_TXN_REPLICA_ID = 0x09,

Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I would
expect TXN_ID to be enough - we could use REPLICA_ID to make sure that
transaction identifiers from different instances don't overlap, because
there couldn't be a "multi-instance" transaction, could there?

> +	IPROTO_TXN_LAST = 0x0a,

I think we should instead introduce BEGIN and COMMIT commands, because:

 - We might need to attach some extra information to each transaction,
   e.g. mark transactions that were committed in parallel on the master
   so that they can be committed in parallel on a replica. Attaching
   such information to each row would be excessive.

 - We will need BEGIN and COMMIT for IPROTO transactions. It would be
   nice if we could share the code with them.

 - Having separate BEGIN/COMMIT rows would make tarantoolctl-cat output
   easier to read.

>  	/* Leave a gap for other keys in the header. */
>  	IPROTO_SPACE_ID = 0x10,
>  	IPROTO_INDEX_ID = 0x11,
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 4c3537672..17ead08e7 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer)
>  }
>  
>  static void
> -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
>  	       struct xrow_header **end)
>  {
> +	struct xrow_header **row = begin;
>  	/** Assign LSN to all local rows. */
>  	for ( ; row < end; row++) {
>  		if ((*row)->replica_id == 0) {
> @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
>  			vclock_follow_xrow(vclock, *row);
>  		}
>  	}
> +	if ((*begin)->replica_id != instance_id) {
> +		/*
> +		 * Move all local changes to the end of rows array and
> +		 * a fake local transaction (like an autonomous transaction)
> +		 * because we could not replicate the transaction back.
> +		 */
> +		struct xrow_header **row = end - 1;
> +		while (row >= begin) {
> +			if (row[0]->replica_id != instance_id) {
> +				--row;
> +				continue;
> +			}
> +			/* Local row, move it back. */
> +			struct xrow_header **local_row = row;
> +			while (local_row < end - 1 &&
> +			       local_row[1]->replica_id != instance_id) {
> +				struct xrow_header *tmp = local_row[0];
> +				local_row[0] = local_row[1];
> +				local_row[1] = tmp;

There's SWAP for this.

> +			}
> +			--row;
> +		}
> +		while (begin < end && begin[0]->replica_id != instance_id)
> +			++begin;
> +	}

I don't understand why we need to move rows generated locally (by
an on_replace trigger I surmise) to the end of a transaction. We
have TXN_ID attached to each row so we could leave the transactions
interleaved, couldn't we?

> +	/* Setup txn_id and tnx_replica_id for localy generated rows. */
> +	row = begin;
> +	while (row < end) {
> +		row[0]->txn_id = begin[0]->lsn;
> +		row[0]->txn_replica_id = instance_id;
> +		row[0]->txn_last = row == end - 1 ? 1 : 0;
> +		++row;
> +	}

I think we better use txn->id for TXN_ID rather than LSN.
Why do you think LSN should be used? I don't see any rationale
for that anywhere in the comments. Also, setting TXN_ID looks
like a job that should be done by txn_add_redo...

>  }
>  
>  static void
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index ef3f81add..db524b3c8 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -133,12 +133,32 @@ error:
>  		case IPROTO_SCHEMA_VERSION:
>  			header->schema_version = mp_decode_uint(pos);
>  			break;
> +		case IPROTO_TXN_ID:
> +			header->txn_id = mp_decode_uint(pos);
> +			break;
> +		case IPROTO_TXN_REPLICA_ID:
> +			header->txn_replica_id = mp_decode_uint(pos);
> +			break;
> +		case IPROTO_TXN_LAST:
> +			header->txn_last = mp_decode_uint(pos);

Should be bool?

> +			break;
>  		default:
>  			/* unknown header */
>  			mp_next(pos);
>  		}
>  	}
>  	assert(*pos <= end);
> +	if (header->txn_id == 0) {
> +		/*
> +		 * Transaction id is not set so it is a single statement
> +		 * transaction.
> +		 */
> +		header->txn_id = header->lsn;
> +		header->txn_last = true;
> +	}
> +	if (header->txn_replica_id == 0)
> +		header->txn_replica_id = header->replica_id;
> +
>  	/* Nop requests aren't supposed to have a body. */
>  	if (*pos < end && header->type != IPROTO_NOP) {
>  		const char *body = *pos;
> @@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
>  		d = mp_encode_double(d, header->tm);
>  		map_size++;
>  	}
> +	if (header->txn_id != header->lsn || header->txn_last == 0) {
> +		/* Encode txn id for multi row transaction members. */
> +		d = mp_encode_uint(d, IPROTO_TXN_ID);
> +		d = mp_encode_uint(d, header->txn_id);
> +		map_size++;
> +	}
> +	if (header->txn_replica_id != header->replica_id) {
> +		d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID);
> +		d = mp_encode_uint(d, header->txn_replica_id);
> +		map_size++;
> +	}
> +	if (header->txn_last && !(header->txn_id == header->lsn &&
> +				  header->txn_replica_id == header->replica_id)) {
> +		/* Set last row for multi row transaction. */
> +		d = mp_encode_uint(d, IPROTO_TXN_LAST);
> +		d = mp_encode_uint(d, header->txn_last);
> +		map_size++;
> +	}
>  	assert(d <= data + XROW_HEADER_LEN_MAX);
>  	mp_encode_map(data, map_size);
>  	out->iov_len = d - (char *) out->iov_base;
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index 6bab0a1fd..4acd84d56 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -47,7 +47,7 @@ enum {
>  	XROW_HEADER_IOVMAX = 1,
>  	XROW_BODY_IOVMAX = 2,
>  	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
> -	XROW_HEADER_LEN_MAX = 40,
> +	XROW_HEADER_LEN_MAX = 60,
>  	XROW_BODY_LEN_MAX = 128,
>  	IPROTO_HEADER_LEN = 28,
>  	/** 7 = sizeof(iproto_body_bin). */
> @@ -69,6 +69,9 @@ struct xrow_header {
>  	uint64_t sync;
>  	int64_t lsn; /* LSN must be signed for correct comparison */
>  	double tm;
> +	int64_t txn_id;
> +	uint32_t txn_replica_id;
> +	uint32_t txn_last;
>  
>  	int bodycnt;
>  	uint32_t schema_version;
> diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
> index 165a543cf..0b796d728 100644
> --- a/test/unit/xrow.cc
> +++ b/test/unit/xrow.cc
> @@ -215,6 +215,9 @@ test_xrow_header_encode_decode()
>  	header.lsn = 400;
>  	header.tm = 123.456;
>  	header.bodycnt = 0;
> +	header.txn_id = header.lsn;
> +	header.txn_replica_id = header.replica_id;
> +	header.txn_last = true;
>  	uint64_t sync = 100500;
>  	struct iovec vec[1];
>  	is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
> diff --git a/test/vinyl/errinj_stat.result b/test/vinyl/errinj_stat.result
> index 08801dbc6..361ddf5db 100644
> --- a/test/vinyl/errinj_stat.result
> +++ b/test/vinyl/errinj_stat.result
> @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements
>  - bytes_compressed: <bytes_compressed>
>    pages: 3
>    rows: 30
> -  bytes: 411
> +  bytes: 477
>  ...
>  i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue
>  ---
> @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements
>  - bytes_compressed: <bytes_compressed>
>    pages: 4
>    rows: 40
> -  bytes: 548
> +  bytes: 636

I wouldn't expect vinyl stats to be changed by this patch.
Why did it happen?

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries
  2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries Georgy Kirichenko
  2019-01-28 12:58   ` Vladimir Davydov
@ 2019-01-28 13:00   ` Vladimir Davydov
  2019-01-28 13:08     ` [tarantool-patches] " Vladislav Shpilevoy
  2019-02-08 16:56   ` Konstantin Osipov
  2 siblings, 1 reply; 16+ messages in thread
From: Vladimir Davydov @ 2019-01-28 13:00 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote:
> Append txn_id, txn_replica_id and txn_last to xrow_header structure.
> txn_replica_id identifies replica where transaction was started and
> txn_id identifies transaction id on that replica. As transaction id
> a lsn of the first row in this transaction is used.
> txn_last set to true if it is the last row in a transaction, so we
> could commit transaction with last row or use additional NOP requests
> with txn_last = true ans valid txn_id and txn_replica_id.
> For replication all local changes moved to xrows array tail to form
> a separate transaction (like autonomous transaction) because it is not
> possible to replicate such transaction back to it's creator.
> 
> As encoding/deconding rules assumed:
>  1. txn_replica_id is encoded only if it is not equal with replica
>     id. This might have point because of replication trigger
>  2. txn_id and txn_last are encoded only for multi-row transaction.
>     So if we do not have txn_id in a xstream then this means that it
>     is a single row transaction.
> This rules provides compatibility with previous xlog handling.
> 
> Needed for: 2798
> ---
>  src/box/iproto_constants.h    |  3 ++
>  src/box/wal.c                 | 36 +++++++++++++++-
>  src/box/xrow.c                | 38 +++++++++++++++++
>  src/box/xrow.h                |  5 ++-
>  test/unit/xrow.cc             |  3 ++
>  test/vinyl/errinj_stat.result |  8 ++--
>  test/vinyl/layout.result      | 24 +++++------
>  test/vinyl/stat.result        | 78 +++++++++++++++++------------------

Also, forgot to say that this patch lacks a test case. We should
probably check that transaction boundaries are written using the
xlog reader module.

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [tarantool-patches] Re: [PATCH v2 1/2] Journal transaction boundaries
  2019-01-28 13:00   ` Vladimir Davydov
@ 2019-01-28 13:08     ` Vladislav Shpilevoy
  0 siblings, 0 replies; 16+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-28 13:08 UTC (permalink / raw)
  To: tarantool-patches, Vladimir Davydov, Georgy Kirichenko



On 28/01/2019 16:00, Vladimir Davydov wrote:
> On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote:
>> Append txn_id, txn_replica_id and txn_last to xrow_header structure.
>> txn_replica_id identifies replica where transaction was started and
>> txn_id identifies transaction id on that replica. As transaction id
>> a lsn of the first row in this transaction is used.
>> txn_last set to true if it is the last row in a transaction, so we
>> could commit transaction with last row or use additional NOP requests
>> with txn_last = true ans valid txn_id and txn_replica_id.
>> For replication all local changes moved to xrows array tail to form
>> a separate transaction (like autonomous transaction) because it is not
>> possible to replicate such transaction back to it's creator.
>>
>> As encoding/deconding rules assumed:
>>   1. txn_replica_id is encoded only if it is not equal with replica
>>      id. This might have point because of replication trigger
>>   2. txn_id and txn_last are encoded only for multi-row transaction.
>>      So if we do not have txn_id in a xstream then this means that it
>>      is a single row transaction.
>> This rules provides compatibility with previous xlog handling.
>>
>> Needed for: 2798
>> ---
>>   src/box/iproto_constants.h    |  3 ++
>>   src/box/wal.c                 | 36 +++++++++++++++-
>>   src/box/xrow.c                | 38 +++++++++++++++++
>>   src/box/xrow.h                |  5 ++-
>>   test/unit/xrow.cc             |  3 ++
>>   test/vinyl/errinj_stat.result |  8 ++--
>>   test/vinyl/layout.result      | 24 +++++------
>>   test/vinyl/stat.result        | 78 +++++++++++++++++------------------
> 
> Also, forgot to say that this patch lacks a test case. We should
> probably check that transaction boundaries are written using the
> xlog reader module.
> 

I have a test in my old branch on this issue. You can find it either
in the branches list, or in the issue GitHub webpage, because I referred
to it in the commit.

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [tarantool-patches] [PATCH v2 2/2] Transaction support for applier
  2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko
@ 2019-01-28 13:35   ` Vladimir Davydov
  0 siblings, 0 replies; 16+ messages in thread
From: Vladimir Davydov @ 2019-01-28 13:35 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Jan 22, 2019 at 01:57:37PM +0300, Georgy Kirichenko wrote:
> Applier fetch incoming rows to form a transaction and then apply it.
> Implementation assumes that transaction could not mix in a
> replication stream. Also distributed transaction are not supported yet.
> 
> Closes: #2798
> Needed for: #980
> ---
>  src/box/applier.cc | 207 ++++++++++++++++++++++++++++++++-------------
>  1 file changed, 148 insertions(+), 59 deletions(-)

Without a test, this patch is inadmissible. Vlad mentioned that he has
some tests left from his old implementation. Please salvage those.

> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index adbe88679..0e3832ad8 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -48,6 +48,7 @@
>  #include "error.h"
>  #include "session.h"
>  #include "cfg.h"
> +#include "txn.h"
>  
>  STRS(applier_state, applier_STATE);
>  
> @@ -380,6 +381,102 @@ applier_join(struct applier *applier)
>  	applier_set_state(applier, APPLIER_READY);
>  }
>  
> +/**
> + * Read one transaction from network.
> + * Transaction rows are placed into row_buf as an array, row's bodies are
> + * placed into obuf because it is not allowed to relocate row's bodies.
> + * Also we could not use applier input buffer because rpos adjusted after xrow
> + * decoding and corresponding space going to reuse.
> + *
> + * Note: current implementation grants that transaction could not be mixed, so
> + * we read each transaction from first xrow until xrow with txn_last = true.
> + */
> +static int64_t
> +applier_read_tx(struct applier *applier, struct ibuf *row_buf,
> +		struct obuf *data_buf)
> +{
> +	struct xrow_header *row;
> +	struct ev_io *coio = &applier->io;
> +	struct ibuf *ibuf = &applier->ibuf;
> +	int64_t txn_id = 0;
> +	uint32_t txn_replica_id = 0;
> +
> +	do {
> +		row = (struct xrow_header *)ibuf_alloc(row_buf,
> +						       sizeof(struct xrow_header));
> +		if (row == NULL) {
> +			diag_set(OutOfMemory, sizeof(struct xrow_header),
> +				 "slab", "struct xrow_header");
> +			goto error;
> +		}
> +
> +		double timeout = replication_disconnect_timeout();
> +		try {
> +			/* TODO: we should have a C version of this function. */
> +			coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
> +		} catch (...) {
> +			goto error;
> +		}
> +
> +		if (iproto_type_is_error(row->type)) {
> +			xrow_decode_error(row);
> +			goto error;
> +		}
> +
> +		/* Replication request. */
> +		if (row->replica_id == REPLICA_ID_NIL ||
> +		    row->replica_id >= VCLOCK_MAX) {
> +			/*
> +			 * A safety net, this can only occur
> +			 * if we're fed a strangely broken xlog.
> +			 */
> +			diag_set(ClientError, ER_UNKNOWN_REPLICA,
> +				 int2str(row->replica_id),
> +				 tt_uuid_str(&REPLICASET_UUID));
> +			goto error;
> +		}
> +		if (ibuf_used(row_buf) == sizeof(struct xrow_header)) {
> +			/*
> +			 * First row in a transaction. In order to enforce
> +			 * consistency check that first row lsn and replica id
> +			 * match with transaction.
> +			 */
> +			txn_id = row->lsn;
> +			txn_replica_id = row->replica_id;
> +		}
> +		if (txn_id != row->txn_id ||
> +			   txn_replica_id != row->txn_replica_id) {
> +			/* We are not able to handle interleaving transactions. */
> +			diag_set(ClientError, ER_UNSUPPORTED,
> +				 "replications",
> +				 "interleaving transactions");
> +			goto error;
> +		}

Accumulating rows feels like the iproto realm. I don't think that it's a
good idea to implement a dirty ad-hoc solution for this. Instead we
should move applier to iproto IMO. This would probably allow us to reuse
the code for interactive iproto transactions - the two issues look very
similar to me and I think we should use the same protocol and code to
get them both working.

> +
> +
> +		applier->lag = ev_now(loop()) - row->tm;
> +		applier->last_row_time = ev_monotonic_now(loop());
> +
> +		if (row->body->iov_base != NULL) {
> +			void *new_base = obuf_alloc(data_buf, row->body->iov_len);
> +			if (new_base == NULL) {
> +				diag_set(OutOfMemory, row->body->iov_len,
> +					 "slab", "xrow_data");
> +				goto error;
> +			}
> +			memcpy(new_base, row->body->iov_base, row->body->iov_len);
> +			row->body->iov_base = new_base;
> +		}
> +
> +	} while (row->txn_last == 0);
> +
> +	return 0;
> +error:
> +	ibuf_reset(row_buf);
> +	obuf_reset(data_buf);
> +	return -1;
> +}
> +
>  /**
>   * Execute and process SUBSCRIBE request (follow updates from a master).
>   */
> @@ -396,6 +493,10 @@ applier_subscribe(struct applier *applier)
>  	struct ibuf *ibuf = &applier->ibuf;
>  	struct xrow_header row;
>  	struct vclock remote_vclock_at_subscribe;
> +	struct ibuf row_buf;
> +	struct obuf data_buf;
> +	ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header));
> +	obuf_create(&data_buf, &cord()->slabc, 0x10000);
>  
>  	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
>  				 &replicaset.vclock);
> @@ -475,87 +576,75 @@ applier_subscribe(struct applier *applier)
>  			applier_set_state(applier, APPLIER_FOLLOW);
>  		}
>  
> -		/*
> -		 * Tarantool < 1.7.7 does not send periodic heartbeat
> -		 * messages so we can't assume that if we haven't heard
> -		 * from the master for quite a while the connection is
> -		 * broken - the master might just be idle.
> -		 */
> -		if (applier->version_id < version_id(1, 7, 7)) {
> -			coio_read_xrow(coio, ibuf, &row);
> -		} else {
> -			double timeout = replication_disconnect_timeout();
> -			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
> -		}
> +		if (applier_read_tx(applier, &row_buf, &data_buf) != 0)
> +			diag_raise();
>  
> -		if (iproto_type_is_error(row.type))
> -			xrow_decode_error_xc(&row);  /* error */
> -		/* Replication request. */
> -		if (row.replica_id == REPLICA_ID_NIL ||
> -		    row.replica_id >= VCLOCK_MAX) {
> -			/*
> -			 * A safety net, this can only occur
> -			 * if we're fed a strangely broken xlog.
> -			 */
> -			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
> -				  int2str(row.replica_id),
> -				  tt_uuid_str(&REPLICASET_UUID));
> -		}
> +		struct txn *txn = NULL;
> +		struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos;
> +		struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1;
>  
> -		applier->lag = ev_now(loop()) - row.tm;
> +		applier->lag = ev_now(loop()) - last_row->tm;
>  		applier->last_row_time = ev_monotonic_now(loop());
> -		struct replica *replica = replica_by_id(row.replica_id);
> +		struct replica *replica = replica_by_id(first_row->txn_replica_id);
>  		struct latch *latch = (replica ? &replica->order_latch :
>  				       &replicaset.applier.order_latch);
> -		/*
> -		 * In a full mesh topology, the same set
> -		 * of changes may arrive via two
> -		 * concurrently running appliers. Thanks
> -		 * to vclock_follow() above, the first row
> -		 * in the set will be skipped - but the
> -		 * remaining may execute out of order,
> -		 * when the following xstream_write()
> -		 * yields on WAL. Hence we need a latch to
> -		 * strictly order all changes which belong
> -		 * to the same server id.
> -		 */
>  		latch_lock(latch);
> +		/* First row identifies a transaction. */
> +		assert(first_row->lsn == first_row->txn_id);
> +		assert(first_row->replica_id == first_row->txn_replica_id);
>  		if (vclock_get(&replicaset.applier.vclock,
> -			       row.replica_id) < row.lsn) {
> -			if (row.replica_id == instance_id &&
> +			       first_row->replica_id) < first_row->lsn) {
> +			if (first_row->replica_id == instance_id &&
>  			    vclock_get(&replicaset.vclock, instance_id) <
> -			    row.lsn) {
> +			    first_row->lsn) {
>  				/* Local row returned back. */
>  				goto done;
>  			}
>  			/* Preserve old lsn value. */
>  			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
> -						     row.replica_id);
> -			vclock_follow_xrow(&replicaset.applier.vclock, &row);
> -			int res = xstream_write(applier->subscribe_stream, &row);
> -			struct error *e = diag_last_error(diag_get());
> -			if (res != 0 && e->type == &type_ClientError &&
> -			    box_error_code(e) == ER_TUPLE_FOUND &&
> -			    replication_skip_conflict) {
> -				/**
> -				 * Silently skip ER_TUPLE_FOUND error if such
> -				 * option is set in config.
> -				 */
> -				diag_clear(diag_get());
> -				row.type = IPROTO_NOP;
> -				row.bodycnt = 0;
> -				res = xstream_write(applier->subscribe_stream,
> -						    &row);
> +						     first_row->replica_id);
> +
> +			struct xrow_header *row = first_row;
> +			if (first_row != last_row)
> +				txn = txn_begin(false);

So we have xstream_write to hide box internals, but we still use
txn_begin/commit. This looks ugly. We should encapsulate those somehow
as well, I guess.

> +			int res = 0;
> +			while (row <= last_row && res == 0) {
> +				vclock_follow_xrow(&replicaset.applier.vclock, row);
> +				res = xstream_write(applier->subscribe_stream, row);
> +				struct error *e;
> +				if (res != 0 &&
> +				    (e = diag_last_error(diag_get()))->type ==
> +				    &type_ClientError &&
> +				    box_error_code(e) == ER_TUPLE_FOUND &&
> +				    replication_skip_conflict) {
> +					/**
> +					 * Silently skip ER_TUPLE_FOUND error
> +					 * if such option is set in config.
> +					 */
> +					diag_clear(diag_get());
> +					row->type = IPROTO_NOP;
> +					row->bodycnt = 0;
> +					res = xstream_write(applier->subscribe_stream,
> +							    row);
> +				}
> +				++row;
>  			}
> +			if (res == 0 && txn != NULL)
> +				res = txn_commit(txn);
> +
>  			if (res != 0) {
>  				/* Rollback lsn to have a chance for a retry. */
>  				vclock_set(&replicaset.applier.vclock,
> -					   row.replica_id, old_lsn);
> +					   first_row->replica_id, old_lsn);
> +				obuf_reset(&data_buf);
> +				ibuf_reset(&row_buf);
>  				latch_unlock(latch);
>  				diag_raise();
>  			}
>  		}
>  done:
> +		obuf_reset(&data_buf);
> +		ibuf_reset(&row_buf);
>  		latch_unlock(latch);
>  		/*
>  		 * Stay 'orphan' until appliers catch up with

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries
  2019-01-28 12:58   ` Vladimir Davydov
@ 2019-01-29 10:09     ` Георгий Кириченко
  2019-01-29 11:00       ` Vladimir Davydov
  0 siblings, 1 reply; 16+ messages in thread
From: Георгий Кириченко @ 2019-01-29 10:09 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 11012 bytes --]

On Monday, January 28, 2019 3:58:59 PM MSK Vladimir Davydov wrote:
> On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote:
> > Append txn_id, txn_replica_id and txn_last to xrow_header structure.
> > txn_replica_id identifies replica where transaction was started and
> > txn_id identifies transaction id on that replica. As transaction id
> > a lsn of the first row in this transaction is used.
> > txn_last set to true if it is the last row in a transaction, so we
> > could commit transaction with last row or use additional NOP requests
> > with txn_last = true ans valid txn_id and txn_replica_id.
> > For replication all local changes moved to xrows array tail to form
> > a separate transaction (like autonomous transaction) because it is not
> > possible to replicate such transaction back to it's creator.
> > 
> > As encoding/deconding rules assumed:
> >  1. txn_replica_id is encoded only if it is not equal with replica
> >  
> >     id. This might have point because of replication trigger
> >  
> >  2. txn_id and txn_last are encoded only for multi-row transaction.
> >  
> >     So if we do not have txn_id in a xstream then this means that it
> >     is a single row transaction.
> > 
> > This rules provides compatibility with previous xlog handling.
> > 
> > Needed for: 2798
> > ---
> > 
> >  src/box/iproto_constants.h    |  3 ++
> 
> Looks like you forgot to update iproto_constants.c.
> 
> >  src/box/wal.c                 | 36 +++++++++++++++-
> >  src/box/xrow.c                | 38 +++++++++++++++++
> >  src/box/xrow.h                |  5 ++-
> >  test/unit/xrow.cc             |  3 ++
> >  test/vinyl/errinj_stat.result |  8 ++--
> >  test/vinyl/layout.result      | 24 +++++------
> >  test/vinyl/stat.result        | 78 +++++++++++++++++------------------
> >  8 files changed, 138 insertions(+), 57 deletions(-)
> > 
> > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> > index 728514297..d01cdf840 100644
> > --- a/src/box/iproto_constants.h
> > +++ b/src/box/iproto_constants.h
> > @@ -60,6 +60,9 @@ enum iproto_key {
> > 
> >  	IPROTO_SCHEMA_VERSION = 0x05,
> >  	IPROTO_SERVER_VERSION = 0x06,
> >  	IPROTO_GROUP_ID = 0x07,
> > 
> > +	IPROTO_TXN_ID = 0x08,
> > +	IPROTO_TXN_REPLICA_ID = 0x09,
> 
> Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I would
> expect TXN_ID to be enough - we could use REPLICA_ID to make sure that
> transaction identifiers from different instances don't overlap, because
> there couldn't be a "multi-instance" transaction, could there?
> 
> > +	IPROTO_TXN_LAST = 0x0a,
> 
> I think we should instead introduce BEGIN and COMMIT commands, because:
I completely do not like any auto-commit logic in a xlog file. You suggestion 
breaks backward compatibility because previous logs do not have any BEGIN/
COMMIT. Also separate BEGIN and COMMIT messages increase transaction size.
It is worth noting, that IPROTO_NOP with a new txn_id or txn_last emulates
BEGIN or COMMIT.

> 
>  - We might need to attach some extra information to each transaction,
>    e.g. mark transactions that were committed in parallel on the master
>    so that they can be committed in parallel on a replica. Attaching
>    such information to each row would be excessive.
The patch is not about this.
> 
>  - We will need BEGIN and COMMIT for IPROTO transactions. It would be
>    nice if we could share the code with them.
The biggest issue we could not know transaction identifier in case of IPROTO.
Iproto is single stream proto, but wal might be not as it is multiplexing a 
lot of transactions in a one output, so it might be bad be in paradigm of 
universally format for both IPROTO and WAL.
> 
>  - Having separate BEGIN/COMMIT rows would make tarantoolctl-cat output
>    easier to read.
> 
> >  	/* Leave a gap for other keys in the header. */
> >  	IPROTO_SPACE_ID = 0x10,
> >  	IPROTO_INDEX_ID = 0x11,
> > 
> > diff --git a/src/box/wal.c b/src/box/wal.c
> > index 4c3537672..17ead08e7 100644
> > --- a/src/box/wal.c
> > +++ b/src/box/wal.c
> > @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer)
> > 
> >  }
> >  
> >  static void
> > 
> > -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
> > 
> >  	       struct xrow_header **end)
> >  
> >  {
> > 
> > +	struct xrow_header **row = begin;
> > 
> >  	/** Assign LSN to all local rows. */
> >  	for ( ; row < end; row++) {
> >  	
> >  		if ((*row)->replica_id == 0) {
> > 
> > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct
> > xrow_header **row,> 
> >  			vclock_follow_xrow(vclock, *row);
> >  		
> >  		}
> >  	
> >  	}
> > 
> > +	if ((*begin)->replica_id != instance_id) {
> > +		/*
> > +		 * Move all local changes to the end of rows array and
> > +		 * a fake local transaction (like an autonomous transaction)
> > +		 * because we could not replicate the transaction back.
> > +		 */
> > +		struct xrow_header **row = end - 1;
> > +		while (row >= begin) {
> > +			if (row[0]->replica_id != instance_id) {
> > +				--row;
> > +				continue;
> > +			}
> > +			/* Local row, move it back. */
> > +			struct xrow_header **local_row = row;
> > +			while (local_row < end - 1 &&
> > +			       local_row[1]->replica_id != instance_id) {
> > +				struct xrow_header *tmp = local_row[0];
> > +				local_row[0] = local_row[1];
> > +				local_row[1] = tmp;
> 
> There's SWAP for this.
> 
> > +			}
> > +			--row;
> > +		}
> > +		while (begin < end && begin[0]->replica_id != instance_id)
> > +			++begin;
> > +	}
> 
> I don't understand why we need to move rows generated locally (by
> an on_replace trigger I surmise) to the end of a transaction. We
> have TXN_ID attached to each row so we could leave the transactions
> interleaved, couldn't we?
You are right, but in that case applier should track a lot of transaction 
simultaneously. Also it complicates recovery too. I hope it will be fixed while 
parallel applier implementing.
> 
> > +	/* Setup txn_id and tnx_replica_id for localy generated rows. */
> > +	row = begin;
> > +	while (row < end) {
> > +		row[0]->txn_id = begin[0]->lsn;
> > +		row[0]->txn_replica_id = instance_id;
> > +		row[0]->txn_last = row == end - 1 ? 1 : 0;
> > +		++row;
> > +	}
> 
> I think we better use txn->id for TXN_ID rather than LSN.
> Why do you think LSN should be used? I don't see any rationale
> for that anywhere in the comments. Also, setting TXN_ID looks
> like a job that should be done by txn_add_redo...
> 
> >  }
> >  
> >  static void
> > 
> > diff --git a/src/box/xrow.c b/src/box/xrow.c
> > index ef3f81add..db524b3c8 100644
> > --- a/src/box/xrow.c
> > +++ b/src/box/xrow.c
> > 
> > @@ -133,12 +133,32 @@ error:
> >  		case IPROTO_SCHEMA_VERSION:
> >  			header->schema_version = mp_decode_uint(pos);
> >  			break;
> > 
> > +		case IPROTO_TXN_ID:
> > +			header->txn_id = mp_decode_uint(pos);
> > +			break;
> > +		case IPROTO_TXN_REPLICA_ID:
> > +			header->txn_replica_id = mp_decode_uint(pos);
> > +			break;
> > +		case IPROTO_TXN_LAST:
> > +			header->txn_last = mp_decode_uint(pos);
> 
> Should be bool?
> 
> > +			break;
> > 
> >  		default:
> >  			/* unknown header */
> >  			mp_next(pos);
> >  		
> >  		}
> >  	
> >  	}
> >  	assert(*pos <= end);
> > 
> > +	if (header->txn_id == 0) {
> > +		/*
> > +		 * Transaction id is not set so it is a single statement
> > +		 * transaction.
> > +		 */
> > +		header->txn_id = header->lsn;
> > +		header->txn_last = true;
> > +	}
> > +	if (header->txn_replica_id == 0)
> > +		header->txn_replica_id = header->replica_id;
> > +
> > 
> >  	/* Nop requests aren't supposed to have a body. */
> >  	if (*pos < end && header->type != IPROTO_NOP) {
> >  	
> >  		const char *body = *pos;
> > 
> > @@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header,
> > uint64_t sync,> 
> >  		d = mp_encode_double(d, header->tm);
> >  		map_size++;
> >  	
> >  	}
> > 
> > +	if (header->txn_id != header->lsn || header->txn_last == 0) {
> > +		/* Encode txn id for multi row transaction members. */
> > +		d = mp_encode_uint(d, IPROTO_TXN_ID);
> > +		d = mp_encode_uint(d, header->txn_id);
> > +		map_size++;
> > +	}
> > +	if (header->txn_replica_id != header->replica_id) {
> > +		d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID);
> > +		d = mp_encode_uint(d, header->txn_replica_id);
> > +		map_size++;
> > +	}
> > +	if (header->txn_last && !(header->txn_id == header->lsn &&
> > +				  header->txn_replica_id == header->replica_id)) 
{
> > +		/* Set last row for multi row transaction. */
> > +		d = mp_encode_uint(d, IPROTO_TXN_LAST);
> > +		d = mp_encode_uint(d, header->txn_last);
> > +		map_size++;
> > +	}
> > 
> >  	assert(d <= data + XROW_HEADER_LEN_MAX);
> >  	mp_encode_map(data, map_size);
> >  	out->iov_len = d - (char *) out->iov_base;
> > 
> > diff --git a/src/box/xrow.h b/src/box/xrow.h
> > index 6bab0a1fd..4acd84d56 100644
> > --- a/src/box/xrow.h
> > +++ b/src/box/xrow.h
> > @@ -47,7 +47,7 @@ enum {
> > 
> >  	XROW_HEADER_IOVMAX = 1,
> >  	XROW_BODY_IOVMAX = 2,
> >  	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
> > 
> > -	XROW_HEADER_LEN_MAX = 40,
> > +	XROW_HEADER_LEN_MAX = 60,
> > 
> >  	XROW_BODY_LEN_MAX = 128,
> >  	IPROTO_HEADER_LEN = 28,
> >  	/** 7 = sizeof(iproto_body_bin). */
> > 
> > @@ -69,6 +69,9 @@ struct xrow_header {
> > 
> >  	uint64_t sync;
> >  	int64_t lsn; /* LSN must be signed for correct comparison */
> >  	double tm;
> > 
> > +	int64_t txn_id;
> > +	uint32_t txn_replica_id;
> > +	uint32_t txn_last;
> > 
> >  	int bodycnt;
> >  	uint32_t schema_version;
> > 
> > diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
> > index 165a543cf..0b796d728 100644
> > --- a/test/unit/xrow.cc
> > +++ b/test/unit/xrow.cc
> > @@ -215,6 +215,9 @@ test_xrow_header_encode_decode()
> > 
> >  	header.lsn = 400;
> >  	header.tm = 123.456;
> >  	header.bodycnt = 0;
> > 
> > +	header.txn_id = header.lsn;
> > +	header.txn_replica_id = header.replica_id;
> > +	header.txn_last = true;
> > 
> >  	uint64_t sync = 100500;
> >  	struct iovec vec[1];
> >  	is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
> > 
> > diff --git a/test/vinyl/errinj_stat.result b/test/vinyl/errinj_stat.result
> > index 08801dbc6..361ddf5db 100644
> > --- a/test/vinyl/errinj_stat.result
> > +++ b/test/vinyl/errinj_stat.result
> > @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements
> > 
> >  - bytes_compressed: <bytes_compressed>
> >  
> >    pages: 3
> >    rows: 30
> > 
> > -  bytes: 411
> > +  bytes: 477
> > 
> >  ...
> >  i:stat().disk.compaction.queue.bytes ==
> >  box.stat.vinyl().scheduler.compaction_queue ---
> > 
> > @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements
> > 
> >  - bytes_compressed: <bytes_compressed>
> >  
> >    pages: 4
> >    rows: 40
> > 
> > -  bytes: 548
> > +  bytes: 636
> 
> I wouldn't expect vinyl stats to be changed by this patch.
> Why did it happen?
Because if rows were written in an one entry, then wal creates transaction.

[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries
  2019-01-29 10:09     ` Георгий Кириченко
@ 2019-01-29 11:00       ` Vladimir Davydov
  2019-01-31  7:34         ` Георгий Кириченко
  0 siblings, 1 reply; 16+ messages in thread
From: Vladimir Davydov @ 2019-01-29 11:00 UTC (permalink / raw)
  To: Георгий
	Кириченко
  Cc: tarantool-patches

On Tue, Jan 29, 2019 at 01:09:50PM +0300, Георгий Кириченко wrote:
> On Monday, January 28, 2019 3:58:59 PM MSK Vladimir Davydov wrote:
> > On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote:
> > > Append txn_id, txn_replica_id and txn_last to xrow_header structure.
> > > txn_replica_id identifies replica where transaction was started and
> > > txn_id identifies transaction id on that replica. As transaction id
> > > a lsn of the first row in this transaction is used.
> > > txn_last set to true if it is the last row in a transaction, so we
> > > could commit transaction with last row or use additional NOP requests
> > > with txn_last = true ans valid txn_id and txn_replica_id.
> > > For replication all local changes moved to xrows array tail to form
> > > a separate transaction (like autonomous transaction) because it is not
> > > possible to replicate such transaction back to it's creator.
> > > 
> > > As encoding/deconding rules assumed:
> > >  1. txn_replica_id is encoded only if it is not equal with replica
> > >  
> > >     id. This might have point because of replication trigger
> > >  
> > >  2. txn_id and txn_last are encoded only for multi-row transaction.
> > >  
> > >     So if we do not have txn_id in a xstream then this means that it
> > >     is a single row transaction.
> > > 
> > > This rules provides compatibility with previous xlog handling.
> > > 
> > > Needed for: 2798
> > > ---
> > > 
> > >  src/box/iproto_constants.h    |  3 ++
> > 
> > Looks like you forgot to update iproto_constants.c.
> > 
> > >  src/box/wal.c                 | 36 +++++++++++++++-
> > >  src/box/xrow.c                | 38 +++++++++++++++++
> > >  src/box/xrow.h                |  5 ++-
> > >  test/unit/xrow.cc             |  3 ++
> > >  test/vinyl/errinj_stat.result |  8 ++--
> > >  test/vinyl/layout.result      | 24 +++++------
> > >  test/vinyl/stat.result        | 78 +++++++++++++++++------------------
> > >  8 files changed, 138 insertions(+), 57 deletions(-)
> > > 
> > > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> > > index 728514297..d01cdf840 100644
> > > --- a/src/box/iproto_constants.h
> > > +++ b/src/box/iproto_constants.h
> > > @@ -60,6 +60,9 @@ enum iproto_key {
> > > 
> > >  	IPROTO_SCHEMA_VERSION = 0x05,
> > >  	IPROTO_SERVER_VERSION = 0x06,
> > >  	IPROTO_GROUP_ID = 0x07,
> > > 
> > > +	IPROTO_TXN_ID = 0x08,
> > > +	IPROTO_TXN_REPLICA_ID = 0x09,
> > 
> > Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I would
> > expect TXN_ID to be enough - we could use REPLICA_ID to make sure that
> > transaction identifiers from different instances don't overlap, because
> > there couldn't be a "multi-instance" transaction, could there?
> > 
> > > +	IPROTO_TXN_LAST = 0x0a,
> > 
> > I think we should instead introduce BEGIN and COMMIT commands, because:
> I completely do not like any auto-commit logic in a xlog file. You suggestion 
> breaks backward compatibility because previous logs do not have any BEGIN/

It wouldn't break backward compatibility. It might break forward
compatibility, which is fine by me (we do it all the time).

> COMMIT. Also separate BEGIN and COMMIT messages increase transaction size.

I doubt that after compression you'll see a difference.

> It is worth noting, that IPROTO_NOP with a new txn_id or txn_last emulates
> BEGIN or COMMIT.

Yeah, but that would look weird.

> 
> > 
> >  - We might need to attach some extra information to each transaction,
> >    e.g. mark transactions that were committed in parallel on the master
> >    so that they can be committed in parallel on a replica. Attaching
> >    such information to each row would be excessive.
> The patch is not about this.

But we have to think about that in advance, don't we?

> > 
> >  - We will need BEGIN and COMMIT for IPROTO transactions. It would be
> >    nice if we could share the code with them.
> The biggest issue we could not know transaction identifier in case of IPROTO.
> Iproto is single stream proto, but wal might be not as it is multiplexing a 
> lot of transactions in a one output, so it might be bad be in paradigm of 
> universally format for both IPROTO and WAL.

OK. I think we need to discuss the options with Kostja.

> > 
> >  - Having separate BEGIN/COMMIT rows would make tarantoolctl-cat output
> >    easier to read.
> > 
> > >  	/* Leave a gap for other keys in the header. */
> > >  	IPROTO_SPACE_ID = 0x10,
> > >  	IPROTO_INDEX_ID = 0x11,
> > > 
> > > diff --git a/src/box/wal.c b/src/box/wal.c
> > > index 4c3537672..17ead08e7 100644
> > > --- a/src/box/wal.c
> > > +++ b/src/box/wal.c
> > > @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer)
> > > 
> > >  }
> > >  
> > >  static void
> > > 
> > > -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> > > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
> > > 
> > >  	       struct xrow_header **end)
> > >  
> > >  {
> > > 
> > > +	struct xrow_header **row = begin;
> > > 
> > >  	/** Assign LSN to all local rows. */
> > >  	for ( ; row < end; row++) {
> > >  	
> > >  		if ((*row)->replica_id == 0) {
> > > 
> > > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct
> > > xrow_header **row,> 
> > >  			vclock_follow_xrow(vclock, *row);
> > >  		
> > >  		}
> > >  	
> > >  	}
> > > 
> > > +	if ((*begin)->replica_id != instance_id) {
> > > +		/*
> > > +		 * Move all local changes to the end of rows array and
> > > +		 * a fake local transaction (like an autonomous transaction)
> > > +		 * because we could not replicate the transaction back.
> > > +		 */
> > > +		struct xrow_header **row = end - 1;
> > > +		while (row >= begin) {
> > > +			if (row[0]->replica_id != instance_id) {
> > > +				--row;
> > > +				continue;
> > > +			}
> > > +			/* Local row, move it back. */
> > > +			struct xrow_header **local_row = row;
> > > +			while (local_row < end - 1 &&
> > > +			       local_row[1]->replica_id != instance_id) {
> > > +				struct xrow_header *tmp = local_row[0];
> > > +				local_row[0] = local_row[1];
> > > +				local_row[1] = tmp;
> > 
> > There's SWAP for this.
> > 
> > > +			}
> > > +			--row;
> > > +		}
> > > +		while (begin < end && begin[0]->replica_id != instance_id)
> > > +			++begin;
> > > +	}
> > 
> > I don't understand why we need to move rows generated locally (by
> > an on_replace trigger I surmise) to the end of a transaction. We
> > have TXN_ID attached to each row so we could leave the transactions
> > interleaved, couldn't we?
> You are right, but in that case applier should track a lot of transaction 
> simultaneously. Also it complicates recovery too. I hope it will be fixed while 
> parallel applier implementing.

May be, we should implement parallel applier in the scope of this issue
then? Anyway, without it, sync replication won't scale with the number
of parallel transactions.

> > 
> > > +	/* Setup txn_id and tnx_replica_id for localy generated rows. */
> > > +	row = begin;
> > > +	while (row < end) {
> > > +		row[0]->txn_id = begin[0]->lsn;
> > > +		row[0]->txn_replica_id = instance_id;
> > > +		row[0]->txn_last = row == end - 1 ? 1 : 0;
> > > +		++row;
> > > +	}
> > 
> > I think we better use txn->id for TXN_ID rather than LSN.
> > Why do you think LSN should be used? I don't see any rationale
> > for that anywhere in the comments. Also, setting TXN_ID looks
> > like a job that should be done by txn_add_redo...
> > 
> > >  }
> > >  
> > >  static void
> > > 
> > > diff --git a/src/box/xrow.c b/src/box/xrow.c
> > > index ef3f81add..db524b3c8 100644
> > > --- a/src/box/xrow.c
> > > +++ b/src/box/xrow.c
> > > 
> > > @@ -133,12 +133,32 @@ error:
> > >  		case IPROTO_SCHEMA_VERSION:
> > >  			header->schema_version = mp_decode_uint(pos);
> > >  			break;
> > > 
> > > +		case IPROTO_TXN_ID:
> > > +			header->txn_id = mp_decode_uint(pos);
> > > +			break;
> > > +		case IPROTO_TXN_REPLICA_ID:
> > > +			header->txn_replica_id = mp_decode_uint(pos);
> > > +			break;
> > > +		case IPROTO_TXN_LAST:
> > > +			header->txn_last = mp_decode_uint(pos);
> > 
> > Should be bool?
> > 
> > > +			break;
> > > 
> > >  		default:
> > >  			/* unknown header */
> > >  			mp_next(pos);
> > >  		
> > >  		}
> > >  	
> > >  	}
> > >  	assert(*pos <= end);
> > > 
> > > +	if (header->txn_id == 0) {
> > > +		/*
> > > +		 * Transaction id is not set so it is a single statement
> > > +		 * transaction.
> > > +		 */
> > > +		header->txn_id = header->lsn;
> > > +		header->txn_last = true;
> > > +	}
> > > +	if (header->txn_replica_id == 0)
> > > +		header->txn_replica_id = header->replica_id;
> > > +
> > > 
> > >  	/* Nop requests aren't supposed to have a body. */
> > >  	if (*pos < end && header->type != IPROTO_NOP) {
> > >  	
> > >  		const char *body = *pos;
> > > 
> > > @@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header,
> > > uint64_t sync,> 
> > >  		d = mp_encode_double(d, header->tm);
> > >  		map_size++;
> > >  	
> > >  	}
> > > 
> > > +	if (header->txn_id != header->lsn || header->txn_last == 0) {
> > > +		/* Encode txn id for multi row transaction members. */
> > > +		d = mp_encode_uint(d, IPROTO_TXN_ID);
> > > +		d = mp_encode_uint(d, header->txn_id);
> > > +		map_size++;
> > > +	}
> > > +	if (header->txn_replica_id != header->replica_id) {
> > > +		d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID);
> > > +		d = mp_encode_uint(d, header->txn_replica_id);
> > > +		map_size++;
> > > +	}
> > > +	if (header->txn_last && !(header->txn_id == header->lsn &&
> > > +				  header->txn_replica_id == header->replica_id)) 
> {
> > > +		/* Set last row for multi row transaction. */
> > > +		d = mp_encode_uint(d, IPROTO_TXN_LAST);
> > > +		d = mp_encode_uint(d, header->txn_last);
> > > +		map_size++;
> > > +	}
> > > 
> > >  	assert(d <= data + XROW_HEADER_LEN_MAX);
> > >  	mp_encode_map(data, map_size);
> > >  	out->iov_len = d - (char *) out->iov_base;
> > > 
> > > diff --git a/src/box/xrow.h b/src/box/xrow.h
> > > index 6bab0a1fd..4acd84d56 100644
> > > --- a/src/box/xrow.h
> > > +++ b/src/box/xrow.h
> > > @@ -47,7 +47,7 @@ enum {
> > > 
> > >  	XROW_HEADER_IOVMAX = 1,
> > >  	XROW_BODY_IOVMAX = 2,
> > >  	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
> > > 
> > > -	XROW_HEADER_LEN_MAX = 40,
> > > +	XROW_HEADER_LEN_MAX = 60,
> > > 
> > >  	XROW_BODY_LEN_MAX = 128,
> > >  	IPROTO_HEADER_LEN = 28,
> > >  	/** 7 = sizeof(iproto_body_bin). */
> > > 
> > > @@ -69,6 +69,9 @@ struct xrow_header {
> > > 
> > >  	uint64_t sync;
> > >  	int64_t lsn; /* LSN must be signed for correct comparison */
> > >  	double tm;
> > > 
> > > +	int64_t txn_id;
> > > +	uint32_t txn_replica_id;
> > > +	uint32_t txn_last;
> > > 
> > >  	int bodycnt;
> > >  	uint32_t schema_version;
> > > 
> > > diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
> > > index 165a543cf..0b796d728 100644
> > > --- a/test/unit/xrow.cc
> > > +++ b/test/unit/xrow.cc
> > > @@ -215,6 +215,9 @@ test_xrow_header_encode_decode()
> > > 
> > >  	header.lsn = 400;
> > >  	header.tm = 123.456;
> > >  	header.bodycnt = 0;
> > > 
> > > +	header.txn_id = header.lsn;
> > > +	header.txn_replica_id = header.replica_id;
> > > +	header.txn_last = true;
> > > 
> > >  	uint64_t sync = 100500;
> > >  	struct iovec vec[1];
> > >  	is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
> > > 
> > > diff --git a/test/vinyl/errinj_stat.result b/test/vinyl/errinj_stat.result
> > > index 08801dbc6..361ddf5db 100644
> > > --- a/test/vinyl/errinj_stat.result
> > > +++ b/test/vinyl/errinj_stat.result
> > > @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements
> > > 
> > >  - bytes_compressed: <bytes_compressed>
> > >  
> > >    pages: 3
> > >    rows: 30
> > > 
> > > -  bytes: 411
> > > +  bytes: 477
> > > 
> > >  ...
> > >  i:stat().disk.compaction.queue.bytes ==
> > >  box.stat.vinyl().scheduler.compaction_queue ---
> > > 
> > > @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements
> > > 
> > >  - bytes_compressed: <bytes_compressed>
> > >  
> > >    pages: 4
> > >    rows: 40
> > > 
> > > -  bytes: 548
> > > +  bytes: 636
> > 
> > I wouldn't expect vinyl stats to be changed by this patch.
> > Why did it happen?
> Because if rows were written in an one entry, then wal creates transaction.

But those are vinyl files (run, index). They shouldn't be affected by
this, should they?

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries
  2019-01-29 11:00       ` Vladimir Davydov
@ 2019-01-31  7:34         ` Георгий Кириченко
  2019-01-31  8:19           ` Vladimir Davydov
  0 siblings, 1 reply; 16+ messages in thread
From: Георгий Кириченко @ 2019-01-31  7:34 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 14635 bytes --]

On Tuesday, January 29, 2019 2:00:10 PM MSK Vladimir Davydov wrote:
> On Tue, Jan 29, 2019 at 01:09:50PM +0300, Георгий Кириченко wrote:
> > On Monday, January 28, 2019 3:58:59 PM MSK Vladimir Davydov wrote:
> > > On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote:
> > > > Append txn_id, txn_replica_id and txn_last to xrow_header structure.
> > > > txn_replica_id identifies replica where transaction was started and
> > > > txn_id identifies transaction id on that replica. As transaction id
> > > > a lsn of the first row in this transaction is used.
> > > > txn_last set to true if it is the last row in a transaction, so we
> > > > could commit transaction with last row or use additional NOP requests
> > > > with txn_last = true ans valid txn_id and txn_replica_id.
> > > > For replication all local changes moved to xrows array tail to form
> > > > a separate transaction (like autonomous transaction) because it is not
> > > > possible to replicate such transaction back to it's creator.
> > > > 
> > > > As encoding/deconding rules assumed:
> > > >  1. txn_replica_id is encoded only if it is not equal with replica
> > > >  
> > > >     id. This might have point because of replication trigger
> > > >  
> > > >  2. txn_id and txn_last are encoded only for multi-row transaction.
> > > >  
> > > >     So if we do not have txn_id in a xstream then this means that it
> > > >     is a single row transaction.
> > > > 
> > > > This rules provides compatibility with previous xlog handling.
> > > > 
> > > > Needed for: 2798
> > > > ---
> > > > 
> > > >  src/box/iproto_constants.h    |  3 ++
> > > 
> > > Looks like you forgot to update iproto_constants.c.
> > > 
> > > >  src/box/wal.c                 | 36 +++++++++++++++-
> > > >  src/box/xrow.c                | 38 +++++++++++++++++
> > > >  src/box/xrow.h                |  5 ++-
> > > >  test/unit/xrow.cc             |  3 ++
> > > >  test/vinyl/errinj_stat.result |  8 ++--
> > > >  test/vinyl/layout.result      | 24 +++++------
> > > >  test/vinyl/stat.result        | 78
> > > >  +++++++++++++++++------------------
> > > >  8 files changed, 138 insertions(+), 57 deletions(-)
> > > > 
> > > > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> > > > index 728514297..d01cdf840 100644
> > > > --- a/src/box/iproto_constants.h
> > > > +++ b/src/box/iproto_constants.h
> > > > @@ -60,6 +60,9 @@ enum iproto_key {
> > > > 
> > > >  	IPROTO_SCHEMA_VERSION = 0x05,
> > > >  	IPROTO_SERVER_VERSION = 0x06,
> > > >  	IPROTO_GROUP_ID = 0x07,
> > > > 
> > > > +	IPROTO_TXN_ID = 0x08,
> > > > +	IPROTO_TXN_REPLICA_ID = 0x09,
> > > 
> > > Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I would
> > > expect TXN_ID to be enough - we could use REPLICA_ID to make sure that
> > > transaction identifiers from different instances don't overlap, because
> > > there couldn't be a "multi-instance" transaction, could there?
txn_replica_id might be differ from replica_id in case when transaction would 
be finished on other node, e.g. after recovery.
> > > 
> > > > +	IPROTO_TXN_LAST = 0x0a,
> > > 
> > > I think we should instead introduce BEGIN and COMMIT commands, because:
> > I completely do not like any auto-commit logic in a xlog file. You
> > suggestion breaks backward compatibility because previous logs do not
> > have any BEGIN/
> It wouldn't break backward compatibility. It might break forward
> compatibility, which is fine by me (we do it all the time).
Suggested xrow encoding/decoding rules means that any xrow without txn_id, 
txn_replica_id, txn_last should be processed as a single statement transaction 
as it was before.
If we would require explicit begin/commit then previous logs turns into an 
invalid stream without autocommit semantic.
But I think, that txn_last should be renamed into txn_commit.
Also explicit begin operation is redundant because a new one pair txn_id/
txn_replica_id already means begin of an transaction. 
> 
> > COMMIT. Also separate BEGIN and COMMIT messages increase transaction size.
> 
> I doubt that after compression you'll see a difference.
replication stream does not have any compression.
> 
> > It is worth noting, that IPROTO_NOP with a new txn_id or txn_last emulates
> > BEGIN or COMMIT.
> 
> Yeah, but that would look weird.
> 
> > >  - We might need to attach some extra information to each transaction,
> > >  
> > >    e.g. mark transactions that were committed in parallel on the master
> > >    so that they can be committed in parallel on a replica. Attaching
> > >    such information to each row would be excessive.
> > 
> > The patch is not about this.
> 
> But we have to think about that in advance, don't we?
We do not have clear view how it should be done.
> 
> > >  - We will need BEGIN and COMMIT for IPROTO transactions. It would be
> > >  
> > >    nice if we could share the code with them.
> > 
> > The biggest issue we could not know transaction identifier in case of
> > IPROTO. Iproto is single stream proto, but wal might be not as it is
> > multiplexing a lot of transactions in a one output, so it might be bad be
> > in paradigm of universally format for both IPROTO and WAL.
> 
> OK. I think we need to discuss the options with Kostja.
> 
> > >  - Having separate BEGIN/COMMIT rows would make tarantoolctl-cat output
> > >  
> > >    easier to read.
> > >    
> > > >  	/* Leave a gap for other keys in the header. */
> > > >  	IPROTO_SPACE_ID = 0x10,
> > > >  	IPROTO_INDEX_ID = 0x11,
> > > > 
> > > > diff --git a/src/box/wal.c b/src/box/wal.c
> > > > index 4c3537672..17ead08e7 100644
> > > > --- a/src/box/wal.c
> > > > +++ b/src/box/wal.c
> > > > @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer
> > > > *writer)
> > > > 
> > > >  }
> > > >  
> > > >  static void
> > > > 
> > > > -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> > > > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
> > > > 
> > > >  	       struct xrow_header **end)
> > > >  
> > > >  {
> > > > 
> > > > +	struct xrow_header **row = begin;
> > > > 
> > > >  	/** Assign LSN to all local rows. */
> > > >  	for ( ; row < end; row++) {
> > > >  	
> > > >  		if ((*row)->replica_id == 0) {
> > > > 
> > > > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct
> > > > xrow_header **row,>
> > > > 
> > > >  			vclock_follow_xrow(vclock, *row);
> > > >  		
> > > >  		}
> > > >  	
> > > >  	}
> > > > 
> > > > +	if ((*begin)->replica_id != instance_id) {
> > > > +		/*
> > > > +		 * Move all local changes to the end of rows array and
> > > > +		 * a fake local transaction (like an autonomous transaction)
> > > > +		 * because we could not replicate the transaction back.
> > > > +		 */
> > > > +		struct xrow_header **row = end - 1;
> > > > +		while (row >= begin) {
> > > > +			if (row[0]->replica_id != instance_id) {
> > > > +				--row;
> > > > +				continue;
> > > > +			}
> > > > +			/* Local row, move it back. */
> > > > +			struct xrow_header **local_row = row;
> > > > +			while (local_row < end - 1 &&
> > > > +			       local_row[1]->replica_id != instance_id) {
> > > > +				struct xrow_header *tmp = local_row[0];
> > > > +				local_row[0] = local_row[1];
> > > > +				local_row[1] = tmp;
> > > 
> > > There's SWAP for this.
> > > 
> > > > +			}
> > > > +			--row;
> > > > +		}
> > > > +		while (begin < end && begin[0]->replica_id != instance_id)
> > > > +			++begin;
> > > > +	}
> > > 
> > > I don't understand why we need to move rows generated locally (by
> > > an on_replace trigger I surmise) to the end of a transaction. We
> > > have TXN_ID attached to each row so we could leave the transactions
> > > interleaved, couldn't we?
> > 
> > You are right, but in that case applier should track a lot of transaction
> > simultaneously. Also it complicates recovery too. I hope it will be fixed
> > while parallel applier implementing.
> 
> May be, we should implement parallel applier in the scope of this issue
> then? Anyway, without it, sync replication won't scale with the number
> of parallel transactions.
I do not think so. Parallel applier depends on that feature, but it is 
completely different issue. Also parallel applier is ignorant on a xlog format.
> 
> > > > +	/* Setup txn_id and tnx_replica_id for localy generated rows. */
> > > > +	row = begin;
> > > > +	while (row < end) {
> > > > +		row[0]->txn_id = begin[0]->lsn;
> > > > +		row[0]->txn_replica_id = instance_id;
> > > > +		row[0]->txn_last = row == end - 1 ? 1 : 0;
> > > > +		++row;
> > > > +	}
> > > 
> > > I think we better use txn->id for TXN_ID rather than LSN.
> > > Why do you think LSN should be used? I don't see any rationale
> > > for that anywhere in the comments. Also, setting TXN_ID looks
> > > like a job that should be done by txn_add_redo...
> > > 
> > > >  }
> > > >  
> > > >  static void
> > > > 
> > > > diff --git a/src/box/xrow.c b/src/box/xrow.c
> > > > index ef3f81add..db524b3c8 100644
> > > > --- a/src/box/xrow.c
> > > > +++ b/src/box/xrow.c
> > > > 
> > > > @@ -133,12 +133,32 @@ error:
> > > >  		case IPROTO_SCHEMA_VERSION:
> > > >  			header->schema_version = mp_decode_uint(pos);
> > > >  			break;
> > > > 
> > > > +		case IPROTO_TXN_ID:
> > > > +			header->txn_id = mp_decode_uint(pos);
> > > > +			break;
> > > > +		case IPROTO_TXN_REPLICA_ID:
> > > > +			header->txn_replica_id = mp_decode_uint(pos);
> > > > +			break;
> > > > +		case IPROTO_TXN_LAST:
> > > > +			header->txn_last = mp_decode_uint(pos);
> > > 
> > > Should be bool?
> > > 
> > > > +			break;
> > > > 
> > > >  		default:
> > > >  			/* unknown header */
> > > >  			mp_next(pos);
> > > >  		
> > > >  		}
> > > >  	
> > > >  	}
> > > >  	assert(*pos <= end);
> > > > 
> > > > +	if (header->txn_id == 0) {
> > > > +		/*
> > > > +		 * Transaction id is not set so it is a single statement
> > > > +		 * transaction.
> > > > +		 */
> > > > +		header->txn_id = header->lsn;
> > > > +		header->txn_last = true;
> > > > +	}
> > > > +	if (header->txn_replica_id == 0)
> > > > +		header->txn_replica_id = header->replica_id;
> > > > +
> > > > 
> > > >  	/* Nop requests aren't supposed to have a body. */
> > > >  	if (*pos < end && header->type != IPROTO_NOP) {
> > > >  	
> > > >  		const char *body = *pos;
> > > > 
> > > > @@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header
> > > > *header,
> > > > uint64_t sync,>
> > > > 
> > > >  		d = mp_encode_double(d, header->tm);
> > > >  		map_size++;
> > > >  	
> > > >  	}
> > > > 
> > > > +	if (header->txn_id != header->lsn || header->txn_last == 0) {
> > > > +		/* Encode txn id for multi row transaction members. */
> > > > +		d = mp_encode_uint(d, IPROTO_TXN_ID);
> > > > +		d = mp_encode_uint(d, header->txn_id);
> > > > +		map_size++;
> > > > +	}
> > > > +	if (header->txn_replica_id != header->replica_id) {
> > > > +		d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID);
> > > > +		d = mp_encode_uint(d, header->txn_replica_id);
> > > > +		map_size++;
> > > > +	}
> > > > +	if (header->txn_last && !(header->txn_id == header->lsn &&
> > > > +				  header->txn_replica_id == header->replica_id))
> > 
> > {
> > 
> > > > +		/* Set last row for multi row transaction. */
> > > > +		d = mp_encode_uint(d, IPROTO_TXN_LAST);
> > > > +		d = mp_encode_uint(d, header->txn_last);
> > > > +		map_size++;
> > > > +	}
> > > > 
> > > >  	assert(d <= data + XROW_HEADER_LEN_MAX);
> > > >  	mp_encode_map(data, map_size);
> > > >  	out->iov_len = d - (char *) out->iov_base;
> > > > 
> > > > diff --git a/src/box/xrow.h b/src/box/xrow.h
> > > > index 6bab0a1fd..4acd84d56 100644
> > > > --- a/src/box/xrow.h
> > > > +++ b/src/box/xrow.h
> > > > @@ -47,7 +47,7 @@ enum {
> > > > 
> > > >  	XROW_HEADER_IOVMAX = 1,
> > > >  	XROW_BODY_IOVMAX = 2,
> > > >  	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
> > > > 
> > > > -	XROW_HEADER_LEN_MAX = 40,
> > > > +	XROW_HEADER_LEN_MAX = 60,
> > > > 
> > > >  	XROW_BODY_LEN_MAX = 128,
> > > >  	IPROTO_HEADER_LEN = 28,
> > > >  	/** 7 = sizeof(iproto_body_bin). */
> > > > 
> > > > @@ -69,6 +69,9 @@ struct xrow_header {
> > > > 
> > > >  	uint64_t sync;
> > > >  	int64_t lsn; /* LSN must be signed for correct comparison */
> > > >  	double tm;
> > > > 
> > > > +	int64_t txn_id;
> > > > +	uint32_t txn_replica_id;
> > > > +	uint32_t txn_last;
> > > > 
> > > >  	int bodycnt;
> > > >  	uint32_t schema_version;
> > > > 
> > > > diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
> > > > index 165a543cf..0b796d728 100644
> > > > --- a/test/unit/xrow.cc
> > > > +++ b/test/unit/xrow.cc
> > > > @@ -215,6 +215,9 @@ test_xrow_header_encode_decode()
> > > > 
> > > >  	header.lsn = 400;
> > > >  	header.tm = 123.456;
> > > >  	header.bodycnt = 0;
> > > > 
> > > > +	header.txn_id = header.lsn;
> > > > +	header.txn_replica_id = header.replica_id;
> > > > +	header.txn_last = true;
> > > > 
> > > >  	uint64_t sync = 100500;
> > > >  	struct iovec vec[1];
> > > >  	is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
> > > > 
> > > > diff --git a/test/vinyl/errinj_stat.result
> > > > b/test/vinyl/errinj_stat.result
> > > > index 08801dbc6..361ddf5db 100644
> > > > --- a/test/vinyl/errinj_stat.result
> > > > +++ b/test/vinyl/errinj_stat.result
> > > > @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements
> > > > 
> > > >  - bytes_compressed: <bytes_compressed>
> > > >  
> > > >    pages: 3
> > > >    rows: 30
> > > > 
> > > > -  bytes: 411
> > > > +  bytes: 477
> > > > 
> > > >  ...
> > > >  i:stat().disk.compaction.queue.bytes ==
> > > >  box.stat.vinyl().scheduler.compaction_queue ---
> > > > 
> > > > @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements
> > > > 
> > > >  - bytes_compressed: <bytes_compressed>
> > > >  
> > > >    pages: 4
> > > >    rows: 40
> > > > 
> > > > -  bytes: 548
> > > > +  bytes: 636
> > > 
> > > I wouldn't expect vinyl stats to be changed by this patch.
> > > Why did it happen?
> > 
> > Because if rows were written in an one entry, then wal creates
> > transaction.
> 
> But those are vinyl files (run, index). They shouldn't be affected by
> this, should they?
vinyl uses xlog_write_entry that forms a transaction. I do not think that 
introducing a new version of xlog_write_entry with/without transaction is a 
good idea.

[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 484 bytes --]

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries
  2019-01-31  7:34         ` Георгий Кириченко
@ 2019-01-31  8:19           ` Vladimir Davydov
  2019-01-31 14:25             ` Георгий Кириченко
  0 siblings, 1 reply; 16+ messages in thread
From: Vladimir Davydov @ 2019-01-31  8:19 UTC (permalink / raw)
  To: Георгий
	Кириченко
  Cc: tarantool-patches

On Thu, Jan 31, 2019 at 10:34:43AM +0300, Георгий Кириченко wrote:
> On Tuesday, January 29, 2019 2:00:10 PM MSK Vladimir Davydov wrote:
> > On Tue, Jan 29, 2019 at 01:09:50PM +0300, Георгий Кириченко wrote:
> > > On Monday, January 28, 2019 3:58:59 PM MSK Vladimir Davydov wrote:
> > > > On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote:
> > > > > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> > > > > index 728514297..d01cdf840 100644
> > > > > --- a/src/box/iproto_constants.h
> > > > > +++ b/src/box/iproto_constants.h
> > > > > @@ -60,6 +60,9 @@ enum iproto_key {
> > > > > 
> > > > >  	IPROTO_SCHEMA_VERSION = 0x05,
> > > > >  	IPROTO_SERVER_VERSION = 0x06,
> > > > >  	IPROTO_GROUP_ID = 0x07,
> > > > > 
> > > > > +	IPROTO_TXN_ID = 0x08,
> > > > > +	IPROTO_TXN_REPLICA_ID = 0x09,
> > > > 
> > > > Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I would
> > > > expect TXN_ID to be enough - we could use REPLICA_ID to make sure that
> > > > transaction identifiers from different instances don't overlap, because
> > > > there couldn't be a "multi-instance" transaction, could there?
> txn_replica_id might be differ from replica_id in case when transaction would 
> be finished on other node, e.g. after recovery.

I'm kinda out of scope here. I think you need to write an RFC or
something explaining all the design decisions you made when you
introduced transaction boundaries. Or we should get together with
Kostja and discuss it. Or both. Neither the commit message nor
comments shed the light on the big picture.

> > > > 
> > > > > +	IPROTO_TXN_LAST = 0x0a,
> > > > 
> > > > I think we should instead introduce BEGIN and COMMIT commands, because:
> > > I completely do not like any auto-commit logic in a xlog file. You
> > > suggestion breaks backward compatibility because previous logs do not
> > > have any BEGIN/
> > It wouldn't break backward compatibility. It might break forward
> > compatibility, which is fine by me (we do it all the time).
> Suggested xrow encoding/decoding rules means that any xrow without txn_id, 
> txn_replica_id, txn_last should be processed as a single statement transaction 
> as it was before.
> If we would require explicit begin/commit then previous logs turns into an 
> invalid stream without autocommit semantic.

Yeah, we would have to add autocommit.

> But I think, that txn_last should be renamed into txn_commit.
> Also explicit begin operation is redundant because a new one pair txn_id/
> txn_replica_id already means begin of an transaction. 
> > 
> > > COMMIT. Also separate BEGIN and COMMIT messages increase transaction size.
> > 
> > I doubt that after compression you'll see a difference.
> replication stream does not have any compression.

OK.

> > 
> > > It is worth noting, that IPROTO_NOP with a new txn_id or txn_last emulates
> > > BEGIN or COMMIT.
> > 
> > Yeah, but that would look weird.
> > 
> > > >  - We might need to attach some extra information to each transaction,
> > > >  
> > > >    e.g. mark transactions that were committed in parallel on the master
> > > >    so that they can be committed in parallel on a replica. Attaching
> > > >    such information to each row would be excessive.
> > > 
> > > The patch is not about this.
> > 
> > But we have to think about that in advance, don't we?
> We do not have clear view how it should be done.

Which means we should think it through IMO. The two issues are closely
related.

> > 
> > > >  - We will need BEGIN and COMMIT for IPROTO transactions. It would be
> > > >  
> > > >    nice if we could share the code with them.
> > > 
> > > The biggest issue we could not know transaction identifier in case of
> > > IPROTO. Iproto is single stream proto, but wal might be not as it is
> > > multiplexing a lot of transactions in a one output, so it might be bad be
> > > in paradigm of universally format for both IPROTO and WAL.
> > 
> > OK. I think we need to discuss the options with Kostja.
> > 
> > > >  - Having separate BEGIN/COMMIT rows would make tarantoolctl-cat output
> > > >  
> > > >    easier to read.
> > > >    
> > > > >  	/* Leave a gap for other keys in the header. */
> > > > >  	IPROTO_SPACE_ID = 0x10,
> > > > >  	IPROTO_INDEX_ID = 0x11,
> > > > > 
> > > > > diff --git a/src/box/wal.c b/src/box/wal.c
> > > > > index 4c3537672..17ead08e7 100644
> > > > > --- a/src/box/wal.c
> > > > > +++ b/src/box/wal.c
> > > > > @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer
> > > > > *writer)
> > > > > 
> > > > >  }
> > > > >  
> > > > >  static void
> > > > > 
> > > > > -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> > > > > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
> > > > > 
> > > > >  	       struct xrow_header **end)
> > > > >  
> > > > >  {
> > > > > 
> > > > > +	struct xrow_header **row = begin;
> > > > > 
> > > > >  	/** Assign LSN to all local rows. */
> > > > >  	for ( ; row < end; row++) {
> > > > >  	
> > > > >  		if ((*row)->replica_id == 0) {
> > > > > 
> > > > > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct
> > > > > xrow_header **row,>
> > > > > 
> > > > >  			vclock_follow_xrow(vclock, *row);
> > > > >  		
> > > > >  		}
> > > > >  	
> > > > >  	}
> > > > > 
> > > > > +	if ((*begin)->replica_id != instance_id) {
> > > > > +		/*
> > > > > +		 * Move all local changes to the end of rows array and
> > > > > +		 * a fake local transaction (like an autonomous transaction)
> > > > > +		 * because we could not replicate the transaction back.
> > > > > +		 */
> > > > > +		struct xrow_header **row = end - 1;
> > > > > +		while (row >= begin) {
> > > > > +			if (row[0]->replica_id != instance_id) {
> > > > > +				--row;
> > > > > +				continue;
> > > > > +			}
> > > > > +			/* Local row, move it back. */
> > > > > +			struct xrow_header **local_row = row;
> > > > > +			while (local_row < end - 1 &&
> > > > > +			       local_row[1]->replica_id != instance_id) {
> > > > > +				struct xrow_header *tmp = local_row[0];
> > > > > +				local_row[0] = local_row[1];
> > > > > +				local_row[1] = tmp;
> > > > 
> > > > There's SWAP for this.
> > > > 
> > > > > +			}
> > > > > +			--row;
> > > > > +		}
> > > > > +		while (begin < end && begin[0]->replica_id != instance_id)
> > > > > +			++begin;
> > > > > +	}
> > > > 
> > > > I don't understand why we need to move rows generated locally (by
> > > > an on_replace trigger I surmise) to the end of a transaction. We
> > > > have TXN_ID attached to each row so we could leave the transactions
> > > > interleaved, couldn't we?
> > > 
> > > You are right, but in that case applier should track a lot of transaction
> > > simultaneously. Also it complicates recovery too. I hope it will be fixed
> > > while parallel applier implementing.
> > 
> > May be, we should implement parallel applier in the scope of this issue
> > then? Anyway, without it, sync replication won't scale with the number
> > of parallel transactions.
> I do not think so. Parallel applier depends on that feature, but it is 
> completely different issue.

But as I said, sync replication won't scale without it.

> Also parallel applier is ignorant on a xlog format.

Depends on how you implement it. For example, MySQL marks all
transactions that were committed in parallel on the master so that
they can be committed in parallel on replicas.

> > > > > diff --git a/test/vinyl/errinj_stat.result
> > > > > b/test/vinyl/errinj_stat.result
> > > > > index 08801dbc6..361ddf5db 100644
> > > > > --- a/test/vinyl/errinj_stat.result
> > > > > +++ b/test/vinyl/errinj_stat.result
> > > > > @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements
> > > > > 
> > > > >  - bytes_compressed: <bytes_compressed>
> > > > >  
> > > > >    pages: 3
> > > > >    rows: 30
> > > > > 
> > > > > -  bytes: 411
> > > > > +  bytes: 477
> > > > > 
> > > > >  ...
> > > > >  i:stat().disk.compaction.queue.bytes ==
> > > > >  box.stat.vinyl().scheduler.compaction_queue ---
> > > > > 
> > > > > @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements
> > > > > 
> > > > >  - bytes_compressed: <bytes_compressed>
> > > > >  
> > > > >    pages: 4
> > > > >    rows: 40
> > > > > 
> > > > > -  bytes: 548
> > > > > +  bytes: 636
> > > > 
> > > > I wouldn't expect vinyl stats to be changed by this patch.
> > > > Why did it happen?
> > > 
> > > Because if rows were written in an one entry, then wal creates
> > > transaction.
> > 
> > But those are vinyl files (run, index). They shouldn't be affected by
> > this, should they?
> vinyl uses xlog_write_entry that forms a transaction.

No, not for run/index files, it does not. Take a look at
vy_run_dump_stmt. There's no point in writing any extra
transaction information to run/index files.

> I do not think that introducing a new version of xlog_write_entry
> with/without transaction is a good idea.

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries
  2019-01-31  8:19           ` Vladimir Davydov
@ 2019-01-31 14:25             ` Георгий Кириченко
  2019-01-31 14:54               ` Vladimir Davydov
  0 siblings, 1 reply; 16+ messages in thread
From: Георгий Кириченко @ 2019-01-31 14:25 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 11127 bytes --]

On Thursday, January 31, 2019 11:19:51 AM MSK Vladimir Davydov wrote:
> On Thu, Jan 31, 2019 at 10:34:43AM +0300, Георгий Кириченко wrote:
> > On Tuesday, January 29, 2019 2:00:10 PM MSK Vladimir Davydov wrote:
> > > On Tue, Jan 29, 2019 at 01:09:50PM +0300, Георгий Кириченко wrote:
> > > > On Monday, January 28, 2019 3:58:59 PM MSK Vladimir Davydov wrote:
> > > > > On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote:
> > > > > > diff --git a/src/box/iproto_constants.h
> > > > > > b/src/box/iproto_constants.h
> > > > > > index 728514297..d01cdf840 100644
> > > > > > --- a/src/box/iproto_constants.h
> > > > > > +++ b/src/box/iproto_constants.h
> > > > > > @@ -60,6 +60,9 @@ enum iproto_key {
> > > > > > 
> > > > > >  	IPROTO_SCHEMA_VERSION = 0x05,
> > > > > >  	IPROTO_SERVER_VERSION = 0x06,
> > > > > >  	IPROTO_GROUP_ID = 0x07,
> > > > > > 
> > > > > > +	IPROTO_TXN_ID = 0x08,
> > > > > > +	IPROTO_TXN_REPLICA_ID = 0x09,
> > > > > 
> > > > > Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I
> > > > > would
> > > > > expect TXN_ID to be enough - we could use REPLICA_ID to make sure
> > > > > that
> > > > > transaction identifiers from different instances don't overlap,
> > > > > because
> > > > > there couldn't be a "multi-instance" transaction, could there?
> > 
> > txn_replica_id might be differ from replica_id in case when transaction
> > would be finished on other node, e.g. after recovery.
> 
> I'm kinda out of scope here. I think you need to write an RFC or
> something explaining all the design decisions you made when you
> introduced transaction boundaries. Or we should get together with
> Kostja and discuss it. Or both. Neither the commit message nor
> comments shed the light on the big picture.
> 
> > > > > > +	IPROTO_TXN_LAST = 0x0a,
> > > > > 
> > > > > I think we should instead introduce BEGIN and COMMIT commands, 
because:
> > > > I completely do not like any auto-commit logic in a xlog file. You
> > > > suggestion breaks backward compatibility because previous logs do not
> > > > have any BEGIN/
> > > 
> > > It wouldn't break backward compatibility. It might break forward
> > > compatibility, which is fine by me (we do it all the time).
> > 
> > Suggested xrow encoding/decoding rules means that any xrow without txn_id,
> > txn_replica_id, txn_last should be processed as a single statement
> > transaction as it was before.
> > If we would require explicit begin/commit then previous logs turns into an
> > invalid stream without autocommit semantic.
> 
> Yeah, we would have to add autocommit.
I am disagreed with autocommit. There are my key points:
1. We could not interleave autocommit transactions with others, or we would 
have to have special mark for this transaction but this is no autocommit after 
all.
2. Applier would not able to check transaction boundaries.
3. There is no difference between an autocommit transaction and a single-row 
transaction.
4. Autocommit is about interface behavior but not about log format.

Also let me explain xrow encoding rules:
If we have only one row in transaction, then we do not encode txn_id and 
txn_last. So single-row transaction (or autocommit if you wish) does not 
change its' representation. And then while decoding we set txn_id = lsn and 
txn_last if txn_id was not set. In that case each row from previous xlog acts 
as one single-row transaction.
For multi-row transaction we have only penalty in transaction identifiers for 
each row and one commit flag for the last.

> 
> > But I think, that txn_last should be renamed into txn_commit.
> > Also explicit begin operation is redundant because a new one pair txn_id/
> > txn_replica_id already means begin of an transaction.
> > 
> > > > COMMIT. Also separate BEGIN and COMMIT messages increase transaction
> > > > size.
> > > 
> > > I doubt that after compression you'll see a difference.
> > 
> > replication stream does not have any compression.
> 
> OK.
> 
> > > > It is worth noting, that IPROTO_NOP with a new txn_id or txn_last
> > > > emulates
> > > > BEGIN or COMMIT.
> > > 
> > > Yeah, but that would look weird.
> > > 
> > > > >  - We might need to attach some extra information to each
> > > > >  transaction,
> > > > >  
> > > > >    e.g. mark transactions that were committed in parallel on the
> > > > >    master
> > > > >    so that they can be committed in parallel on a replica. Attaching
> > > > >    such information to each row would be excessive.
> > > > 
> > > > The patch is not about this.
> > > 
> > > But we have to think about that in advance, don't we?
> > 
> > We do not have clear view how it should be done.
> 
> Which means we should think it through IMO. The two issues are closely
> related.
The main issue - proposed decision (track which transaction were batched) is 
not applicable, because vinyl might yield in a completely unpredictable 
manner. Also I have full-parallel solution but it does not require any 
information except transaction boundaries.

> 
> > > > >  - We will need BEGIN and COMMIT for IPROTO transactions. It would
> > > > >  be
> > > > >  
> > > > >    nice if we could share the code with them.
> > > > 
> > > > The biggest issue we could not know transaction identifier in case of
> > > > IPROTO. Iproto is single stream proto, but wal might be not as it is
> > > > multiplexing a lot of transactions in a one output, so it might be bad
> > > > be
> > > > in paradigm of universally format for both IPROTO and WAL.
> > > 
> > > OK. I think we need to discuss the options with Kostja.
> > > 
> > > > >  - Having separate BEGIN/COMMIT rows would make tarantoolctl-cat
> > > > >  output
> > > > >  
> > > > >    easier to read.
> > > > >    
> > > > > >  	/* Leave a gap for other keys in the header. */
> > > > > >  	IPROTO_SPACE_ID = 0x10,
> > > > > >  	IPROTO_INDEX_ID = 0x11,
> > > > > > 
> > > > > > diff --git a/src/box/wal.c b/src/box/wal.c
> > > > > > index 4c3537672..17ead08e7 100644
> > > > > > --- a/src/box/wal.c
> > > > > > +++ b/src/box/wal.c
> > > > > > @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer
> > > > > > *writer)
> > > > > > 
> > > > > >  }
> > > > > >  
> > > > > >  static void
> > > > > > 
> > > > > > -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> > > > > > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
> > > > > > 
> > > > > >  	       struct xrow_header **end)
> > > > > >  
> > > > > >  {
> > > > > > 
> > > > > > +	struct xrow_header **row = begin;
> > > > > > 
> > > > > >  	/** Assign LSN to all local rows. */
> > > > > >  	for ( ; row < end; row++) {
> > > > > >  	
> > > > > >  		if ((*row)->replica_id == 0) {
> > > > > > 
> > > > > > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct
> > > > > > xrow_header **row,>
> > > > > > 
> > > > > >  			vclock_follow_xrow(vclock, *row);
> > > > > >  		
> > > > > >  		}
> > > > > >  	
> > > > > >  	}
> > > > > > 
> > > > > > +	if ((*begin)->replica_id != instance_id) {
> > > > > > +		/*
> > > > > > +		 * Move all local changes to the end of rows array and
> > > > > > +		 * a fake local transaction (like an autonomous 
transaction)
> > > > > > +		 * because we could not replicate the transaction 
back.
> > > > > > +		 */
> > > > > > +		struct xrow_header **row = end - 1;
> > > > > > +		while (row >= begin) {
> > > > > > +			if (row[0]->replica_id != instance_id) {
> > > > > > +				--row;
> > > > > > +				continue;
> > > > > > +			}
> > > > > > +			/* Local row, move it back. */
> > > > > > +			struct xrow_header **local_row = row;
> > > > > > +			while (local_row < end - 1 &&
> > > > > > +			       local_row[1]->replica_id != instance_id) 
{
> > > > > > +				struct xrow_header *tmp = local_row[0];
> > > > > > +				local_row[0] = local_row[1];
> > > > > > +				local_row[1] = tmp;
> > > > > 
> > > > > There's SWAP for this.
> > > > > 
> > > > > > +			}
> > > > > > +			--row;
> > > > > > +		}
> > > > > > +		while (begin < end && begin[0]->replica_id != 
instance_id)
> > > > > > +			++begin;
> > > > > > +	}
> > > > > 
> > > > > I don't understand why we need to move rows generated locally (by
> > > > > an on_replace trigger I surmise) to the end of a transaction. We
> > > > > have TXN_ID attached to each row so we could leave the transactions
> > > > > interleaved, couldn't we?
> > > > 
> > > > You are right, but in that case applier should track a lot of
> > > > transaction
> > > > simultaneously. Also it complicates recovery too. I hope it will be
> > > > fixed
> > > > while parallel applier implementing.
> > > 
> > > May be, we should implement parallel applier in the scope of this issue
> > > then? Anyway, without it, sync replication won't scale with the number
> > > of parallel transactions.
> > 
> > I do not think so. Parallel applier depends on that feature, but it is
> > completely different issue.
> 
> But as I said, sync replication won't scale without it.
> 
> > Also parallel applier is ignorant on a xlog format.
> 
> Depends on how you implement it. For example, MySQL marks all
> transactions that were committed in parallel on the master so that
> they can be committed in parallel on replicas.
In case of vinyl it is not true.
> 
> > > > > > diff --git a/test/vinyl/errinj_stat.result
> > > > > > b/test/vinyl/errinj_stat.result
> > > > > > index 08801dbc6..361ddf5db 100644
> > > > > > --- a/test/vinyl/errinj_stat.result
> > > > > > +++ b/test/vinyl/errinj_stat.result
> > > > > > @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements
> > > > > > 
> > > > > >  - bytes_compressed: <bytes_compressed>
> > > > > >  
> > > > > >    pages: 3
> > > > > >    rows: 30
> > > > > > 
> > > > > > -  bytes: 411
> > > > > > +  bytes: 477
> > > > > > 
> > > > > >  ...
> > > > > >  i:stat().disk.compaction.queue.bytes ==
> > > > > >  box.stat.vinyl().scheduler.compaction_queue ---
> > > > > > 
> > > > > > @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements
> > > > > > 
> > > > > >  - bytes_compressed: <bytes_compressed>
> > > > > >  
> > > > > >    pages: 4
> > > > > >    rows: 40
> > > > > > 
> > > > > > -  bytes: 548
> > > > > > +  bytes: 636
> > > > > 
> > > > > I wouldn't expect vinyl stats to be changed by this patch.
> > > > > Why did it happen?
> > > > 
> > > > Because if rows were written in an one entry, then wal creates
> > > > transaction.
> > > 
> > > But those are vinyl files (run, index). They shouldn't be affected by
> > > this, should they?
> > 
> > vinyl uses xlog_write_entry that forms a transaction.
> 
> No, not for run/index files, it does not. Take a look at
> vy_run_dump_stmt. There's no point in writing any extra
> transaction information to run/index files.
Thanks, will check it
> 
> > I do not think that introducing a new version of xlog_write_entry
> > with/without transaction is a good idea.


[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries
  2019-01-31 14:25             ` Георгий Кириченко
@ 2019-01-31 14:54               ` Vladimir Davydov
  2019-02-01  9:31                 ` Георгий Кириченко
  0 siblings, 1 reply; 16+ messages in thread
From: Vladimir Davydov @ 2019-01-31 14:54 UTC (permalink / raw)
  To: Георгий
	Кириченко
  Cc: tarantool-patches

On Thu, Jan 31, 2019 at 05:25:24PM +0300, Георгий Кириченко wrote:
> On Thursday, January 31, 2019 11:19:51 AM MSK Vladimir Davydov wrote:
> > On Thu, Jan 31, 2019 at 10:34:43AM +0300, Георгий Кириченко wrote:
> > > On Tuesday, January 29, 2019 2:00:10 PM MSK Vladimir Davydov wrote:
> > > > On Tue, Jan 29, 2019 at 01:09:50PM +0300, Георгий Кириченко wrote:
> > > > > On Monday, January 28, 2019 3:58:59 PM MSK Vladimir Davydov wrote:
> > > > > > On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote:
> > > > > > > diff --git a/src/box/iproto_constants.h
> > > > > > > b/src/box/iproto_constants.h
> > > > > > > index 728514297..d01cdf840 100644
> > > > > > > --- a/src/box/iproto_constants.h
> > > > > > > +++ b/src/box/iproto_constants.h
> > > > > > > @@ -60,6 +60,9 @@ enum iproto_key {
> > > > > > > 
> > > > > > >  	IPROTO_SCHEMA_VERSION = 0x05,
> > > > > > >  	IPROTO_SERVER_VERSION = 0x06,
> > > > > > >  	IPROTO_GROUP_ID = 0x07,
> > > > > > > 
> > > > > > > +	IPROTO_TXN_ID = 0x08,
> > > > > > > +	IPROTO_TXN_REPLICA_ID = 0x09,
> > > > > > 
> > > > > > Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I
> > > > > > would
> > > > > > expect TXN_ID to be enough - we could use REPLICA_ID to make sure
> > > > > > that
> > > > > > transaction identifiers from different instances don't overlap,
> > > > > > because
> > > > > > there couldn't be a "multi-instance" transaction, could there?
> > > 
> > > txn_replica_id might be differ from replica_id in case when transaction
> > > would be finished on other node, e.g. after recovery.
> > 
> > I'm kinda out of scope here. I think you need to write an RFC or
> > something explaining all the design decisions you made when you
> > introduced transaction boundaries. Or we should get together with
> > Kostja and discuss it. Or both. Neither the commit message nor
> > comments shed the light on the big picture.
> > 
> > > > > > > +	IPROTO_TXN_LAST = 0x0a,
> > > > > > 
> > > > > > I think we should instead introduce BEGIN and COMMIT commands, because:
> > > > > I completely do not like any auto-commit logic in a xlog file. You
> > > > > suggestion breaks backward compatibility because previous logs do not
> > > > > have any BEGIN/
> > > > 
> > > > It wouldn't break backward compatibility. It might break forward
> > > > compatibility, which is fine by me (we do it all the time).
> > > 
> > > Suggested xrow encoding/decoding rules means that any xrow without txn_id,
> > > txn_replica_id, txn_last should be processed as a single statement
> > > transaction as it was before.
> > > If we would require explicit begin/commit then previous logs turns into an
> > > invalid stream without autocommit semantic.
> > 
> > Yeah, we would have to add autocommit.
> I am disagreed with autocommit. There are my key points:
> 1. We could not interleave autocommit transactions with others, or we would 
> have to have special mark for this transaction but this is no autocommit after 
> all.

Do we need to support interleaving transactions in xlog? I'm not sure.
May be, for sync replication we do.

Anyway, we could add BEGIN/COMMIT *in addition* to TXN_ID stored in each
row. They would serve as a header and footer. This would give us a place
holder to store transaction-wide information if we ever need it.
Besides, it would make the stream more robust: if we loose BEGIN, we
will detect it while if we loose the first transaction row in your case
we won't. Yeah, explicit BEGIN/COMMIT will add a few bytes to each
transaction, but IMO it's not critical.

Let's discuss it f2f when we can, because we seem to repeat ourselves
without moving forward.

> 2. Applier would not able to check transaction boundaries.
> 3. There is no difference between an autocommit transaction and a single-row 
> transaction.
> 4. Autocommit is about interface behavior but not about log format.
> 
> Also let me explain xrow encoding rules:
> If we have only one row in transaction, then we do not encode txn_id and 
> txn_last. So single-row transaction (or autocommit if you wish) does not 
> change its' representation. And then while decoding we set txn_id = lsn and 
> txn_last if txn_id was not set. In that case each row from previous xlog acts 
> as one single-row transaction.
> For multi-row transaction we have only penalty in transaction identifiers for 
> each row and one commit flag for the last.
> 
> > > > > > > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct
> > > > > > > xrow_header **row,>
> > > > > > > 
> > > > > > >  			vclock_follow_xrow(vclock, *row);
> > > > > > >  		
> > > > > > >  		}
> > > > > > >  	
> > > > > > >  	}
> > > > > > > 
> > > > > > > +	if ((*begin)->replica_id != instance_id) {
> > > > > > > +		/*
> > > > > > > +		 * Move all local changes to the end of rows array and
> > > > > > > +		 * a fake local transaction (like an autonomous transaction)
> > > > > > > +		 * because we could not replicate the transaction back.
> > > > > > > +		 */
> > > > > > > +		struct xrow_header **row = end - 1;
> > > > > > > +		while (row >= begin) {
> > > > > > > +			if (row[0]->replica_id != instance_id) {
> > > > > > > +				--row;
> > > > > > > +				continue;
> > > > > > > +			}
> > > > > > > +			/* Local row, move it back. */
> > > > > > > +			struct xrow_header **local_row = row;
> > > > > > > +			while (local_row < end - 1 &&
> > > > > > > +			       local_row[1]->replica_id != instance_id) {
> > > > > > > +				struct xrow_header *tmp = local_row[0];
> > > > > > > +				local_row[0] = local_row[1];
> > > > > > > +				local_row[1] = tmp;
> > > > > > 
> > > > > > There's SWAP for this.
> > > > > > 
> > > > > > > +			}
> > > > > > > +			--row;
> > > > > > > +		}
> > > > > > > +		while (begin < end && begin[0]->replica_id != instance_id)
> > > > > > > +			++begin;
> > > > > > > +	}
> > > > > > 
> > > > > > I don't understand why we need to move rows generated locally (by
> > > > > > an on_replace trigger I surmise) to the end of a transaction. We
> > > > > > have TXN_ID attached to each row so we could leave the transactions
> > > > > > interleaved, couldn't we?
> > > > > 
> > > > > You are right, but in that case applier should track a lot of
> > > > > transaction
> > > > > simultaneously. Also it complicates recovery too. I hope it will be
> > > > > fixed
> > > > > while parallel applier implementing.
> > > > 
> > > > May be, we should implement parallel applier in the scope of this issue
> > > > then? Anyway, without it, sync replication won't scale with the number
> > > > of parallel transactions.
> > > 
> > > I do not think so. Parallel applier depends on that feature, but it is
> > > completely different issue.
> > 
> > But as I said, sync replication won't scale without it.
> > 
> > > Also parallel applier is ignorant on a xlog format.
> > 
> > Depends on how you implement it. For example, MySQL marks all
> > transactions that were committed in parallel on the master so that
> > they can be committed in parallel on replicas.
> In case of vinyl it is not true.

In case of vinyl we can mark transactions that overlapped in time on the
master. In fact, that's what MySQL actually does. I guess I misleaded
you by saying "committed in parallel". I meant "executed in parallel".

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries
  2019-01-31 14:54               ` Vladimir Davydov
@ 2019-02-01  9:31                 ` Георгий Кириченко
  0 siblings, 0 replies; 16+ messages in thread
From: Георгий Кириченко @ 2019-02-01  9:31 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 8757 bytes --]

On Thursday, January 31, 2019 5:54:54 PM MSK Vladimir Davydov wrote:
> On Thu, Jan 31, 2019 at 05:25:24PM +0300, Георгий Кириченко wrote:
> > On Thursday, January 31, 2019 11:19:51 AM MSK Vladimir Davydov wrote:
> > > On Thu, Jan 31, 2019 at 10:34:43AM +0300, Георгий Кириченко wrote:
> > > > On Tuesday, January 29, 2019 2:00:10 PM MSK Vladimir Davydov wrote:
> > > > > On Tue, Jan 29, 2019 at 01:09:50PM +0300, Георгий Кириченко wrote:
> > > > > > On Monday, January 28, 2019 3:58:59 PM MSK Vladimir Davydov wrote:
> > > > > > > On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko 
wrote:
> > > > > > > > diff --git a/src/box/iproto_constants.h
> > > > > > > > b/src/box/iproto_constants.h
> > > > > > > > index 728514297..d01cdf840 100644
> > > > > > > > --- a/src/box/iproto_constants.h
> > > > > > > > +++ b/src/box/iproto_constants.h
> > > > > > > > @@ -60,6 +60,9 @@ enum iproto_key {
> > > > > > > > 
> > > > > > > >  	IPROTO_SCHEMA_VERSION = 0x05,
> > > > > > > >  	IPROTO_SERVER_VERSION = 0x06,
> > > > > > > >  	IPROTO_GROUP_ID = 0x07,
> > > > > > > > 
> > > > > > > > +	IPROTO_TXN_ID = 0x08,
> > > > > > > > +	IPROTO_TXN_REPLICA_ID = 0x09,
> > > > > > > 
> > > > > > > Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I
> > > > > > > would
> > > > > > > expect TXN_ID to be enough - we could use REPLICA_ID to make
> > > > > > > sure
> > > > > > > that
> > > > > > > transaction identifiers from different instances don't overlap,
> > > > > > > because
> > > > > > > there couldn't be a "multi-instance" transaction, could there?
> > > > 
> > > > txn_replica_id might be differ from replica_id in case when
> > > > transaction
> > > > would be finished on other node, e.g. after recovery.
> > > 
> > > I'm kinda out of scope here. I think you need to write an RFC or
> > > something explaining all the design decisions you made when you
> > > introduced transaction boundaries. Or we should get together with
> > > Kostja and discuss it. Or both. Neither the commit message nor
> > > comments shed the light on the big picture.
> > > 
> > > > > > > > +	IPROTO_TXN_LAST = 0x0a,
> > > > > > > 
> > > > > > > I think we should instead introduce BEGIN and COMMIT commands, 
because:
> > > > > > I completely do not like any auto-commit logic in a xlog file. You
> > > > > > suggestion breaks backward compatibility because previous logs do
> > > > > > not
> > > > > > have any BEGIN/
> > > > > 
> > > > > It wouldn't break backward compatibility. It might break forward
> > > > > compatibility, which is fine by me (we do it all the time).
> > > > 
> > > > Suggested xrow encoding/decoding rules means that any xrow without
> > > > txn_id,
> > > > txn_replica_id, txn_last should be processed as a single statement
> > > > transaction as it was before.
> > > > If we would require explicit begin/commit then previous logs turns
> > > > into an
> > > > invalid stream without autocommit semantic.
> > > 
> > > Yeah, we would have to add autocommit.
> > 
> > I am disagreed with autocommit. There are my key points:
> > 1. We could not interleave autocommit transactions with others, or we
> > would
> > have to have special mark for this transaction but this is no autocommit
> > after all.
> 
> Do we need to support interleaving transactions in xlog? I'm not sure.
> May be, for sync replication we do.
> 
> Anyway, we could add BEGIN/COMMIT *in addition* to TXN_ID stored in each
> row. They would serve as a header and footer. This would give us a place
> holder to store transaction-wide information if we ever need it.
> Besides, it would make the stream more robust: if we loose BEGIN, we
> will detect it while if we loose the first transaction row in your case
> we won't. Yeah, explicit BEGIN/COMMIT will add a few bytes to each
> transaction, but IMO it's not critical.
> 
> Let's discuss it f2f when we can, because we seem to repeat ourselves
> without moving forward.
> 
> > 2. Applier would not able to check transaction boundaries.
> > 3. There is no difference between an autocommit transaction and a
> > single-row transaction.
> > 4. Autocommit is about interface behavior but not about log format.
> > 
> > Also let me explain xrow encoding rules:
> > If we have only one row in transaction, then we do not encode txn_id and
> > txn_last. So single-row transaction (or autocommit if you wish) does not
> > change its' representation. And then while decoding we set txn_id = lsn
> > and
> > txn_last if txn_id was not set. In that case each row from previous xlog
> > acts as one single-row transaction.
> > For multi-row transaction we have only penalty in transaction identifiers
> > for each row and one commit flag for the last.
> > 
> > > > > > > > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock,
> > > > > > > > struct
> > > > > > > > xrow_header **row,>
> > > > > > > > 
> > > > > > > >  			vclock_follow_xrow(vclock, *row);
> > > > > > > >  		
> > > > > > > >  		}
> > > > > > > >  	
> > > > > > > >  	}
> > > > > > > > 
> > > > > > > > +	if ((*begin)->replica_id != instance_id) {
> > > > > > > > +		/*
> > > > > > > > +		 * Move all local changes to the end of rows array 
and
> > > > > > > > +		 * a fake local transaction (like an autonomous
> > > > > > > > transaction)
> > > > > > > > +		 * because we could not replicate the transaction 
back.
> > > > > > > > +		 */
> > > > > > > > +		struct xrow_header **row = end - 1;
> > > > > > > > +		while (row >= begin) {
> > > > > > > > +			if (row[0]->replica_id != instance_id) {
> > > > > > > > +				--row;
> > > > > > > > +				continue;
> > > > > > > > +			}
> > > > > > > > +			/* Local row, move it back. */
> > > > > > > > +			struct xrow_header **local_row = row;
> > > > > > > > +			while (local_row < end - 1 &&
> > > > > > > > +			       local_row[1]->replica_id != 
instance_id) {
> > > > > > > > +				struct xrow_header *tmp = local_row[0];
> > > > > > > > +				local_row[0] = local_row[1];
> > > > > > > > +				local_row[1] = tmp;
> > > > > > > 
> > > > > > > There's SWAP for this.
> > > > > > > 
> > > > > > > > +			}
> > > > > > > > +			--row;
> > > > > > > > +		}
> > > > > > > > +		while (begin < end && begin[0]->replica_id != 
instance_id)
> > > > > > > > +			++begin;
> > > > > > > > +	}
> > > > > > > 
> > > > > > > I don't understand why we need to move rows generated locally
> > > > > > > (by
> > > > > > > an on_replace trigger I surmise) to the end of a transaction. We
> > > > > > > have TXN_ID attached to each row so we could leave the
> > > > > > > transactions
> > > > > > > interleaved, couldn't we?
> > > > > > 
> > > > > > You are right, but in that case applier should track a lot of
> > > > > > transaction
> > > > > > simultaneously. Also it complicates recovery too. I hope it will
> > > > > > be
> > > > > > fixed
> > > > > > while parallel applier implementing.
> > > > > 
> > > > > May be, we should implement parallel applier in the scope of this
> > > > > issue
> > > > > then? Anyway, without it, sync replication won't scale with the
> > > > > number
> > > > > of parallel transactions.
> > > > 
> > > > I do not think so. Parallel applier depends on that feature, but it is
> > > > completely different issue.
> > > 
> > > But as I said, sync replication won't scale without it.
> > > 
> > > > Also parallel applier is ignorant on a xlog format.
> > > 
> > > Depends on how you implement it. For example, MySQL marks all
> > > transactions that were committed in parallel on the master so that
> > > they can be committed in parallel on replicas.
> > 
> > In case of vinyl it is not true.
> 
> In case of vinyl we can mark transactions that overlapped in time on the
> master. In fact, that's what MySQL actually does. I guess I misleaded
> you by saying "committed in parallel". I meant "executed in parallel".
The only thing we need is to send all transaction to commit in right order and 
restart a transaction in case of conflict. For example master did transactions 
t1, t2, t3. Replica starts t1, t2 and t3 in received order. But if t3 finished 
earlier that t2 then we should hold t3 commit until t2 sent to wal. After t3 
resume there are two possibilities: t3 is conflicting or not. In the second 
case we will send it to wal, in the case of conflict - just restart - we have 
all info to do that.
And we do not have to have any information about transaction batches on master 
- because all job will be done by local transaction manager.
In comparison with MySQL we also could process transaction in parallel event 
they ware done in a sequence.

[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 16+ messages in thread

* [tarantool-patches] Re: [PATCH v2 1/2] Journal transaction boundaries
  2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries Georgy Kirichenko
  2019-01-28 12:58   ` Vladimir Davydov
  2019-01-28 13:00   ` Vladimir Davydov
@ 2019-02-08 16:56   ` Konstantin Osipov
  2 siblings, 0 replies; 16+ messages in thread
From: Konstantin Osipov @ 2019-02-08 16:56 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/01/22 15:45]:
> Append txn_id, txn_replica_id and txn_last to xrow_header structure.
> txn_replica_id identifies replica where transaction was started and
> txn_id identifies transaction id on that replica. As transaction id
> a lsn of the first row in this transaction is used.
> txn_last set to true if it is the last row in a transaction, so we
> could commit transaction with last row or use additional NOP requests
> with txn_last = true ans valid txn_id and txn_replica_id.
> For replication all local changes moved to xrows array tail to form
> a separate transaction (like autonomous transaction) because it is not
> possible to replicate such transaction back to it's creator.


I think we should not need txn_replica_id at all. Let's discuss.

And I also thought we decided to drop txn_last? Having both
txn_last and txn_id seems redundant. We could set txn_id to the
last LSN of this txn, that would make txn_last unnecessary too,
while giving us easy to track transaction boundaries. 

What about adding something like "write concern" to xrow header at
the same time, so that we can select sync property individually
for each transaction?

> 
> As encoding/deconding rules assumed:
>  1. txn_replica_id is encoded only if it is not equal with replica
>     id. This might have point because of replication trigger

>  2. txn_id and txn_last are encoded only for multi-row transaction.
>     So if we do not have txn_id in a xstream then this means that it
>     is a single row transaction.
> This rules provides compatibility with previous xlog handling.
> 

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 16+ messages in thread

* [tarantool-patches] [PATCH v2 2/2] Transaction support for applier
  2019-01-06 13:05 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
@ 2019-01-06 13:05 ` Georgy Kirichenko
  0 siblings, 0 replies; 16+ messages in thread
From: Georgy Kirichenko @ 2019-01-06 13:05 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Applier fetch incoming rows to form a transaction and then apply it.
Implementation assumes that transaction could not mix in a replication
stream. Also distributed transaction are not supported yet.

Closes: #2798
Needed for: #980
---
 src/box/applier.cc | 202 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 145 insertions(+), 57 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 6c0eb45d5..7e208aaa2 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -48,6 +48,7 @@
 #include "error.h"
 #include "session.h"
 #include "cfg.h"
+#include "txn.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -380,6 +381,102 @@ applier_join(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
+/**
+ * Read one transaction from network.
+ * Transaction rows are placed into row_buf as an array, row's bodies are
+ * placed into obuf because it is not allowed to relocate row's bodies.
+ * Also we could not use applier input buffer because rpos adjusted after xrow
+ * decoding and corresponding space going to reuse.
+ *
+ * Note: current implementation grants that transaction could not be mixed, so
+ * we read each transaction from first xrow until xrow with txn_last = true.
+ */
+static int64_t
+applier_read_tx(struct applier *applier, struct ibuf *row_buf,
+		struct obuf *data_buf)
+{
+	struct xrow_header *row;
+	struct ev_io *coio = &applier->io;
+	struct ibuf *ibuf = &applier->ibuf;
+	int64_t txn_id = 0;
+	uint32_t txn_replica_id = 0;
+
+	do {
+		row = (struct xrow_header *)ibuf_alloc(row_buf,
+						       sizeof(struct xrow_header));
+		if (row == NULL) {
+			diag_set(OutOfMemory, sizeof(struct xrow_header),
+				 "slab", "struct xrow_header");
+			goto error;
+		}
+
+		double timeout = replication_disconnect_timeout();
+		try {
+			/* TODO: we should have a C version of this function. */
+			coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+		} catch (...) {
+			goto error;
+		}
+
+		if (iproto_type_is_error(row->type)) {
+			xrow_decode_error(row);
+			goto error;
+		}
+
+		/* Replication request. */
+		if (row->replica_id == REPLICA_ID_NIL ||
+		    row->replica_id >= VCLOCK_MAX) {
+			/*
+			 * A safety net, this can only occur
+			 * if we're fed a strangely broken xlog.
+			 */
+			diag_set(ClientError, ER_UNKNOWN_REPLICA,
+				 int2str(row->replica_id),
+				 tt_uuid_str(&REPLICASET_UUID));
+			goto error;
+		}
+		if (ibuf_used(row_buf) == sizeof(struct xrow_header)) {
+			/*
+			 * First row in a transaction. In order to enforce
+			 * consistency check that first row lsn and replica id
+			 * match with transaction.
+			 */
+			txn_id = row->lsn;
+			txn_replica_id = row->replica_id;
+		}
+		if (txn_id != row->txn_id ||
+			   txn_replica_id != row->txn_replica_id) {
+			/* We are not able to handle interleaving transactions. */
+			diag_set(ClientError, ER_UNSUPPORTED,
+				 "replications",
+				 "interleaving transactions");
+			goto error;
+		}
+
+
+		applier->lag = ev_now(loop()) - row->tm;
+		applier->last_row_time = ev_monotonic_now(loop());
+
+		if (row->body->iov_base != NULL) {
+			void *new_base = obuf_alloc(data_buf, row->body->iov_len);
+			if (new_base == NULL) {
+				diag_set(OutOfMemory, row->body->iov_len,
+					 "slab", "xrow_data");
+				goto error;
+			}
+			memcpy(new_base, row->body->iov_base, row->body->iov_len);
+			row->body->iov_base = new_base;
+		}
+
+	} while (row->txn_last == 0);
+
+	return 0;
+error:
+	ibuf_reset(row_buf);
+	obuf_reset(data_buf);
+	return -1;
+}
+
 /**
  * Execute and process SUBSCRIBE request (follow updates from a master).
  */
@@ -396,6 +493,10 @@ applier_subscribe(struct applier *applier)
 	struct ibuf *ibuf = &applier->ibuf;
 	struct xrow_header row;
 	struct vclock remote_vclock_at_subscribe;
+	struct ibuf row_buf;
+	struct obuf data_buf;
+	ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header));
+	obuf_create(&data_buf, &cord()->slabc, 0x10000);
 
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
 				 &replicaset.vclock);
@@ -475,80 +576,67 @@ applier_subscribe(struct applier *applier)
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
-		/*
-		 * Tarantool < 1.7.7 does not send periodic heartbeat
-		 * messages so we can't assume that if we haven't heard
-		 * from the master for quite a while the connection is
-		 * broken - the master might just be idle.
-		 */
-		if (applier->version_id < version_id(1, 7, 7)) {
-			coio_read_xrow(coio, ibuf, &row);
-		} else {
-			double timeout = replication_disconnect_timeout();
-			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
-		}
+		if (applier_read_tx(applier, &row_buf, &data_buf) != 0)
+			diag_raise();
 
-		if (iproto_type_is_error(row.type))
-			xrow_decode_error_xc(&row);  /* error */
-		/* Replication request. */
-		if (row.replica_id == REPLICA_ID_NIL ||
-		    row.replica_id >= VCLOCK_MAX) {
-			/*
-			 * A safety net, this can only occur
-			 * if we're fed a strangely broken xlog.
-			 */
-			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
-				  int2str(row.replica_id),
-				  tt_uuid_str(&REPLICASET_UUID));
-		}
+		struct txn *txn = NULL;
+		struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos;
+		struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1;
 
-		applier->lag = ev_now(loop()) - row.tm;
+		applier->lag = ev_now(loop()) - last_row->tm;
 		applier->last_row_time = ev_monotonic_now(loop());
-		struct replica *replica = replica_by_id(row.replica_id);
+		struct replica *replica = replica_by_id(first_row->txn_replica_id);
 		struct latch *latch = (replica ? &replica->order_latch :
 				       &replicaset.applier.order_latch);
-		/*
-		 * In a full mesh topology, the same set
-		 * of changes may arrive via two
-		 * concurrently running appliers. Thanks
-		 * to vclock_follow() above, the first row
-		 * in the set will be skipped - but the
-		 * remaining may execute out of order,
-		 * when the following xstream_write()
-		 * yields on WAL. Hence we need a latch to
-		 * strictly order all changes which belong
-		 * to the same server id.
-		 */
 		latch_lock(latch);
+		/* First row identifies a transaction. */
+		assert(first_row->lsn == first_row->txn_id);
+		assert(first_row->replica_id == first_row->txn_replica_id);
 		if (vclock_get(&replicaset.applier.vclock,
-			       row.replica_id) < row.lsn) {
+			       first_row->replica_id) < first_row->lsn) {
 			/* Preserve old lsn value. */
 			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
-						     row.replica_id);
-			vclock_follow_xrow(&replicaset.applier.vclock, &row);
-			int res = xstream_write(applier->subscribe_stream, &row);
-			struct error *e = diag_last_error(diag_get());
-			if (res != 0 && e->type == &type_ClientError &&
-			    box_error_code(e) == ER_TUPLE_FOUND &&
-			    replication_skip_conflict) {
-				/**
-				 * Silently skip ER_TUPLE_FOUND error if such
-				 * option is set in config.
-				 */
-				diag_clear(diag_get());
-				row.type = IPROTO_NOP;
-				row.bodycnt = 0;
-				res = xstream_write(applier->subscribe_stream,
-						    &row);
+						     first_row->replica_id);
+
+			struct xrow_header *row = first_row;
+			if (first_row != last_row)
+				txn = txn_begin(false);
+			int res = 0;
+			while (row <= last_row && res == 0) {
+				vclock_follow_xrow(&replicaset.applier.vclock, row);
+				res = xstream_write(applier->subscribe_stream, row);
+				struct error *e;
+				if (res != 0 &&
+				    (e = diag_last_error(diag_get()))->type ==
+				    &type_ClientError &&
+				    box_error_code(e) == ER_TUPLE_FOUND &&
+				    replication_skip_conflict) {
+					/**
+					 * Silently skip ER_TUPLE_FOUND error
+					 * if such option is set in config.
+					 */
+					diag_clear(diag_get());
+					row->type = IPROTO_NOP;
+					row->bodycnt = 0;
+					res = xstream_write(applier->subscribe_stream,
+							    row);
+				}
+				++row;
 			}
+			if (res == 0 && txn != NULL)
+				res = txn_commit(txn);
 			if (res != 0) {
 				/* Rollback lsn to have a chance for a retry. */
 				vclock_set(&replicaset.applier.vclock,
-					   row.replica_id, old_lsn);
+					   first_row->replica_id, old_lsn);
+				obuf_reset(&data_buf);
+				ibuf_reset(&row_buf);
 				latch_unlock(latch);
 				diag_raise();
 			}
 		}
+		obuf_reset(&data_buf);
+		ibuf_reset(&row_buf);
 		latch_unlock(latch);
 		/*
 		 * Stay 'orphan' until appliers catch up with
-- 
2.20.1

^ permalink raw reply	[flat|nested] 16+ messages in thread

end of thread, other threads:[~2019-02-08 16:56 UTC | newest]

Thread overview: 16+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-01-22 10:57 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries Georgy Kirichenko
2019-01-28 12:58   ` Vladimir Davydov
2019-01-29 10:09     ` Георгий Кириченко
2019-01-29 11:00       ` Vladimir Davydov
2019-01-31  7:34         ` Георгий Кириченко
2019-01-31  8:19           ` Vladimir Davydov
2019-01-31 14:25             ` Георгий Кириченко
2019-01-31 14:54               ` Vladimir Davydov
2019-02-01  9:31                 ` Георгий Кириченко
2019-01-28 13:00   ` Vladimir Davydov
2019-01-28 13:08     ` [tarantool-patches] " Vladislav Shpilevoy
2019-02-08 16:56   ` Konstantin Osipov
2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko
2019-01-28 13:35   ` Vladimir Davydov
  -- strict thread matches above, loose matches on Subject: below --
2019-01-06 13:05 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
2019-01-06 13:05 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox