[tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries
Georgy Kirichenko
georgy at tarantool.org
Sun Jan 6 16:05:52 MSK 2019
Append txn_id, txn_replica_id and txn_last to xrow_header structure.
txn_replica_id identifies replica where transaction was started and
txn_id identifies transaction id on that replica. As transaction id a lsn
of the first row in this transaction is used.
txn_last set to true if it is the last row in a transaction, so we could
commit transaction with last row or use additional NOP requests with
txn_last = true ans valid txn_id and txn_replica_id.
For replication all local changes moved to xrows array tail to form a
separate transaction (like autonomous transaction) because it is not
possible to replicate such transaction back to it's creator.
As encoding/deconding rules assumed:
1. txn_replica_id is encoded only if it is not equal with replica id. This
might have point because of replication trigger
2. txn_id and txn_last are encoded only for multi-row transaction. So if we
do not have txn_id in a xstream then this means that it is a single row
transaction
This rules provides compatibility with previous xlog handling.
Needed for: 2798
---
src/box/iproto_constants.h | 3 +++
src/box/wal.c | 36 +++++++++++++++++++++++++++++++++++-
src/box/xrow.c | 38 ++++++++++++++++++++++++++++++++++++++
src/box/xrow.h | 5 ++++-
test/unit/xrow.cc | 3 +++
test/vinyl/errinj.result | 8 ++++----
test/vinyl/info.result | 38 +++++++++++++++++++-------------------
test/vinyl/layout.result | 24 ++++++++++++------------
8 files changed, 118 insertions(+), 37 deletions(-)
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 728514297..d01cdf840 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -60,6 +60,9 @@ enum iproto_key {
IPROTO_SCHEMA_VERSION = 0x05,
IPROTO_SERVER_VERSION = 0x06,
IPROTO_GROUP_ID = 0x07,
+ IPROTO_TXN_ID = 0x08,
+ IPROTO_TXN_REPLICA_ID = 0x09,
+ IPROTO_TXN_LAST = 0x0a,
/* Leave a gap for other keys in the header. */
IPROTO_SPACE_ID = 0x10,
IPROTO_INDEX_ID = 0x11,
diff --git a/src/box/wal.c b/src/box/wal.c
index 4c3537672..17ead08e7 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer)
}
static void
-wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
+wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
struct xrow_header **end)
{
+ struct xrow_header **row = begin;
/** Assign LSN to all local rows. */
for ( ; row < end; row++) {
if ((*row)->replica_id == 0) {
@@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
vclock_follow_xrow(vclock, *row);
}
}
+ if ((*begin)->replica_id != instance_id) {
+ /*
+ * Move all local changes to the end of rows array and
+ * a fake local transaction (like an autonomous transaction)
+ * because we could not replicate the transaction back.
+ */
+ struct xrow_header **row = end - 1;
+ while (row >= begin) {
+ if (row[0]->replica_id != instance_id) {
+ --row;
+ continue;
+ }
+ /* Local row, move it back. */
+ struct xrow_header **local_row = row;
+ while (local_row < end - 1 &&
+ local_row[1]->replica_id != instance_id) {
+ struct xrow_header *tmp = local_row[0];
+ local_row[0] = local_row[1];
+ local_row[1] = tmp;
+ }
+ --row;
+ }
+ while (begin < end && begin[0]->replica_id != instance_id)
+ ++begin;
+ }
+ /* Setup txn_id and tnx_replica_id for localy generated rows. */
+ row = begin;
+ while (row < end) {
+ row[0]->txn_id = begin[0]->lsn;
+ row[0]->txn_replica_id = instance_id;
+ row[0]->txn_last = row == end - 1 ? 1 : 0;
+ ++row;
+ }
}
static void
diff --git a/src/box/xrow.c b/src/box/xrow.c
index ef3f81add..db524b3c8 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -133,12 +133,32 @@ error:
case IPROTO_SCHEMA_VERSION:
header->schema_version = mp_decode_uint(pos);
break;
+ case IPROTO_TXN_ID:
+ header->txn_id = mp_decode_uint(pos);
+ break;
+ case IPROTO_TXN_REPLICA_ID:
+ header->txn_replica_id = mp_decode_uint(pos);
+ break;
+ case IPROTO_TXN_LAST:
+ header->txn_last = mp_decode_uint(pos);
+ break;
default:
/* unknown header */
mp_next(pos);
}
}
assert(*pos <= end);
+ if (header->txn_id == 0) {
+ /*
+ * Transaction id is not set so it is a single statement
+ * transaction.
+ */
+ header->txn_id = header->lsn;
+ header->txn_last = true;
+ }
+ if (header->txn_replica_id == 0)
+ header->txn_replica_id = header->replica_id;
+
/* Nop requests aren't supposed to have a body. */
if (*pos < end && header->type != IPROTO_NOP) {
const char *body = *pos;
@@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
d = mp_encode_double(d, header->tm);
map_size++;
}
+ if (header->txn_id != header->lsn || header->txn_last == 0) {
+ /* Encode txn id for multi row transaction members. */
+ d = mp_encode_uint(d, IPROTO_TXN_ID);
+ d = mp_encode_uint(d, header->txn_id);
+ map_size++;
+ }
+ if (header->txn_replica_id != header->replica_id) {
+ d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID);
+ d = mp_encode_uint(d, header->txn_replica_id);
+ map_size++;
+ }
+ if (header->txn_last && !(header->txn_id == header->lsn &&
+ header->txn_replica_id == header->replica_id)) {
+ /* Set last row for multi row transaction. */
+ d = mp_encode_uint(d, IPROTO_TXN_LAST);
+ d = mp_encode_uint(d, header->txn_last);
+ map_size++;
+ }
assert(d <= data + XROW_HEADER_LEN_MAX);
mp_encode_map(data, map_size);
out->iov_len = d - (char *) out->iov_base;
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 6bab0a1fd..4acd84d56 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -47,7 +47,7 @@ enum {
XROW_HEADER_IOVMAX = 1,
XROW_BODY_IOVMAX = 2,
XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
- XROW_HEADER_LEN_MAX = 40,
+ XROW_HEADER_LEN_MAX = 60,
XROW_BODY_LEN_MAX = 128,
IPROTO_HEADER_LEN = 28,
/** 7 = sizeof(iproto_body_bin). */
@@ -69,6 +69,9 @@ struct xrow_header {
uint64_t sync;
int64_t lsn; /* LSN must be signed for correct comparison */
double tm;
+ int64_t txn_id;
+ uint32_t txn_replica_id;
+ uint32_t txn_last;
int bodycnt;
uint32_t schema_version;
diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
index 165a543cf..0b796d728 100644
--- a/test/unit/xrow.cc
+++ b/test/unit/xrow.cc
@@ -215,6 +215,9 @@ test_xrow_header_encode_decode()
header.lsn = 400;
header.tm = 123.456;
header.bodycnt = 0;
+ header.txn_id = header.lsn;
+ header.txn_replica_id = header.replica_id;
+ header.txn_last = true;
uint64_t sync = 100500;
struct iovec vec[1];
is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
index 23ab845b3..7ea5df777 100644
--- a/test/vinyl/errinj.result
+++ b/test/vinyl/errinj.result
@@ -2164,7 +2164,7 @@ i:stat().disk.compact.queue -- 30 statements
- bytes_compressed: <bytes_compressed>
pages: 3
rows: 30
- bytes: 471
+ bytes: 537
...
i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue
---
@@ -2178,7 +2178,7 @@ i:stat().disk.compact.queue -- 40 statements
- bytes_compressed: <bytes_compressed>
pages: 4
rows: 40
- bytes: 628
+ bytes: 716
...
i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue
---
@@ -2192,7 +2192,7 @@ i:stat().disk.compact.queue -- 50 statements
- bytes_compressed: <bytes_compressed>
pages: 5
rows: 50
- bytes: 785
+ bytes: 895
...
i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue
---
@@ -2206,7 +2206,7 @@ i:stat().disk.compact.queue -- 50 statements
- bytes_compressed: <bytes_compressed>
pages: 5
rows: 50
- bytes: 785
+ bytes: 895
...
i:stat().disk.compact.queue.bytes == box.stat.vinyl().disk.compact.queue
---
diff --git a/test/vinyl/info.result b/test/vinyl/info.result
index 922728abe..4876ddc68 100644
--- a/test/vinyl/info.result
+++ b/test/vinyl/info.result
@@ -285,19 +285,19 @@ stat_diff(istat(), st)
bytes: 26525
count: 1
out:
- bytes: 26049
+ bytes: 26113
pages: 7
bytes_compressed: <bytes_compressed>
rows: 25
index_size: 294
rows: 25
- bytes: 26049
+ bytes: 26113
bytes_compressed: <bytes_compressed>
bloom_size: 70
statement:
replaces: 25
pages: 7
- bytes: 26049
+ bytes: 26113
put:
rows: 25
bytes: 26525
@@ -325,26 +325,26 @@ stat_diff(istat(), st)
bytes: 53050
count: 1
out:
- bytes: 52091
+ bytes: 52217
pages: 13
bytes_compressed: <bytes_compressed>
rows: 50
index_size: 252
rows: 25
- bytes: 26042
+ bytes: 26104
bytes_compressed: <bytes_compressed>
pages: 6
statement:
replaces: 25
compact:
in:
- bytes: 78140
+ bytes: 78330
pages: 20
bytes_compressed: <bytes_compressed>
rows: 75
count: 1
out:
- bytes: 52091
+ bytes: 52217
pages: 13
bytes_compressed: <bytes_compressed>
rows: 50
@@ -352,7 +352,7 @@ stat_diff(istat(), st)
rows: 50
bytes: 53050
rows: 25
- bytes: 26042
+ bytes: 26104
...
-- point lookup from disk + cache put
st = istat()
@@ -376,7 +376,7 @@ stat_diff(istat(), st)
disk:
iterator:
read:
- bytes: 4167
+ bytes: 4177
pages: 1
bytes_compressed: <bytes_compressed>
rows: 4
@@ -626,7 +626,7 @@ stat_diff(istat(), st)
disk:
iterator:
read:
- bytes: 104300
+ bytes: 104550
pages: 25
bytes_compressed: <bytes_compressed>
rows: 100
@@ -971,7 +971,7 @@ istat()
---
- rows: 306
run_avg: 1
- bytes: 317731
+ bytes: 317981
upsert:
squashed: 0
applied: 0
@@ -1049,7 +1049,7 @@ istat()
bloom_size: 140
pages: 25
bytes_compressed: <bytes_compressed>
- bytes: 104300
+ bytes: 104550
txw:
bytes: 0
rows: 0
@@ -1082,7 +1082,7 @@ gstat()
in: 0
queue: 0
out: 0
- data: 104300
+ data: 104550
index: 1190
memory:
tuple_cache: 14313
@@ -1209,7 +1209,7 @@ st2 = i2:stat()
...
s:bsize()
---
-- 52199
+- 52313
...
i1:len(), i2:len()
---
@@ -1219,7 +1219,7 @@ i1:len(), i2:len()
i1:bsize(), i2:bsize()
---
- 364
-- 920
+- 1022
...
s:bsize() == st1.disk.bytes
---
@@ -1271,7 +1271,7 @@ st2 = i2:stat()
...
s:bsize()
---
-- 107449
+- 107563
...
i1:len(), i2:len()
---
@@ -1281,7 +1281,7 @@ i1:len(), i2:len()
i1:bsize(), i2:bsize()
---
- 49516
-- 50072
+- 50174
...
s:bsize() == st1.memory.bytes + st1.disk.bytes
---
@@ -1336,7 +1336,7 @@ st2 = i2:stat()
...
s:bsize()
---
-- 52199
+- 52313
...
i1:len(), i2:len()
---
@@ -1346,7 +1346,7 @@ i1:len(), i2:len()
i1:bsize(), i2:bsize()
---
- 364
-- 920
+- 1022
...
s:bsize() == st1.disk.bytes
---
diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result
index 14201c5dd..a6b577dbc 100644
--- a/test/vinyl/layout.result
+++ b/test/vinyl/layout.result
@@ -253,8 +253,8 @@ result
BODY:
row_index_offset: <offset>
offset: <offset>
- size: 108
- unpacked_size: 89
+ size: 118
+ unpacked_size: 99
row_count: 4
min_key: ['ÑÑÑ']
- - 00000000000000000008.run
@@ -281,7 +281,7 @@ result
- HEADER:
type: ROWINDEX
BODY:
- row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00"
+ row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06"
- - 00000000000000000012.index
- - HEADER:
type: RUNINFO
@@ -298,8 +298,8 @@ result
BODY:
row_index_offset: <offset>
offset: <offset>
- size: 102
- unpacked_size: 83
+ size: 110
+ unpacked_size: 91
row_count: 3
min_key: ['ÑÑÑ']
- - 00000000000000000012.run
@@ -324,7 +324,7 @@ result
- HEADER:
type: ROWINDEX
BODY:
- row_index: "\0\0\0\0\0\0\0\x14\0\0\0*"
+ row_index: "\0\0\0\0\0\0\0\x16\0\0\0."
- - 00000000000000000006.index
- - HEADER:
type: RUNINFO
@@ -341,8 +341,8 @@ result
BODY:
row_index_offset: <offset>
offset: <offset>
- size: 108
- unpacked_size: 89
+ size: 118
+ unpacked_size: 99
row_count: 4
min_key: [null, 'ÑÑÑ']
- - 00000000000000000006.run
@@ -369,7 +369,7 @@ result
- HEADER:
type: ROWINDEX
BODY:
- row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00"
+ row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06"
- - 00000000000000000010.index
- - HEADER:
type: RUNINFO
@@ -386,8 +386,8 @@ result
BODY:
row_index_offset: <offset>
offset: <offset>
- size: 90
- unpacked_size: 71
+ size: 98
+ unpacked_size: 79
row_count: 3
min_key: [123, 'ÑÑÑ']
- - 00000000000000000010.run
@@ -409,7 +409,7 @@ result
- HEADER:
type: ROWINDEX
BODY:
- row_index: "\0\0\0\0\0\0\0\x10\0\0\0\""
+ row_index: "\0\0\0\0\0\0\0\x12\0\0\0&"
...
test_run:cmd("clear filter")
---
--
2.20.1
More information about the Tarantool-patches
mailing list