[tarantool-patches] [PATCH v2 1/2] Journal transaction boundaries
Georgy Kirichenko
georgy at tarantool.org
Tue Jan 22 13:57:36 MSK 2019
Append txn_id, txn_replica_id and txn_last to xrow_header structure.
txn_replica_id identifies replica where transaction was started and
txn_id identifies transaction id on that replica. As transaction id
a lsn of the first row in this transaction is used.
txn_last set to true if it is the last row in a transaction, so we
could commit transaction with last row or use additional NOP requests
with txn_last = true ans valid txn_id and txn_replica_id.
For replication all local changes moved to xrows array tail to form
a separate transaction (like autonomous transaction) because it is not
possible to replicate such transaction back to it's creator.
As encoding/deconding rules assumed:
1. txn_replica_id is encoded only if it is not equal with replica
id. This might have point because of replication trigger
2. txn_id and txn_last are encoded only for multi-row transaction.
So if we do not have txn_id in a xstream then this means that it
is a single row transaction.
This rules provides compatibility with previous xlog handling.
Needed for: 2798
---
src/box/iproto_constants.h | 3 ++
src/box/wal.c | 36 +++++++++++++++-
src/box/xrow.c | 38 +++++++++++++++++
src/box/xrow.h | 5 ++-
test/unit/xrow.cc | 3 ++
test/vinyl/errinj_stat.result | 8 ++--
test/vinyl/layout.result | 24 +++++------
test/vinyl/stat.result | 78 +++++++++++++++++------------------
8 files changed, 138 insertions(+), 57 deletions(-)
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 728514297..d01cdf840 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -60,6 +60,9 @@ enum iproto_key {
IPROTO_SCHEMA_VERSION = 0x05,
IPROTO_SERVER_VERSION = 0x06,
IPROTO_GROUP_ID = 0x07,
+ IPROTO_TXN_ID = 0x08,
+ IPROTO_TXN_REPLICA_ID = 0x09,
+ IPROTO_TXN_LAST = 0x0a,
/* Leave a gap for other keys in the header. */
IPROTO_SPACE_ID = 0x10,
IPROTO_INDEX_ID = 0x11,
diff --git a/src/box/wal.c b/src/box/wal.c
index 4c3537672..17ead08e7 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer *writer)
}
static void
-wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
+wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
struct xrow_header **end)
{
+ struct xrow_header **row = begin;
/** Assign LSN to all local rows. */
for ( ; row < end; row++) {
if ((*row)->replica_id == 0) {
@@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
vclock_follow_xrow(vclock, *row);
}
}
+ if ((*begin)->replica_id != instance_id) {
+ /*
+ * Move all local changes to the end of rows array and
+ * a fake local transaction (like an autonomous transaction)
+ * because we could not replicate the transaction back.
+ */
+ struct xrow_header **row = end - 1;
+ while (row >= begin) {
+ if (row[0]->replica_id != instance_id) {
+ --row;
+ continue;
+ }
+ /* Local row, move it back. */
+ struct xrow_header **local_row = row;
+ while (local_row < end - 1 &&
+ local_row[1]->replica_id != instance_id) {
+ struct xrow_header *tmp = local_row[0];
+ local_row[0] = local_row[1];
+ local_row[1] = tmp;
+ }
+ --row;
+ }
+ while (begin < end && begin[0]->replica_id != instance_id)
+ ++begin;
+ }
+ /* Setup txn_id and tnx_replica_id for localy generated rows. */
+ row = begin;
+ while (row < end) {
+ row[0]->txn_id = begin[0]->lsn;
+ row[0]->txn_replica_id = instance_id;
+ row[0]->txn_last = row == end - 1 ? 1 : 0;
+ ++row;
+ }
}
static void
diff --git a/src/box/xrow.c b/src/box/xrow.c
index ef3f81add..db524b3c8 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -133,12 +133,32 @@ error:
case IPROTO_SCHEMA_VERSION:
header->schema_version = mp_decode_uint(pos);
break;
+ case IPROTO_TXN_ID:
+ header->txn_id = mp_decode_uint(pos);
+ break;
+ case IPROTO_TXN_REPLICA_ID:
+ header->txn_replica_id = mp_decode_uint(pos);
+ break;
+ case IPROTO_TXN_LAST:
+ header->txn_last = mp_decode_uint(pos);
+ break;
default:
/* unknown header */
mp_next(pos);
}
}
assert(*pos <= end);
+ if (header->txn_id == 0) {
+ /*
+ * Transaction id is not set so it is a single statement
+ * transaction.
+ */
+ header->txn_id = header->lsn;
+ header->txn_last = true;
+ }
+ if (header->txn_replica_id == 0)
+ header->txn_replica_id = header->replica_id;
+
/* Nop requests aren't supposed to have a body. */
if (*pos < end && header->type != IPROTO_NOP) {
const char *body = *pos;
@@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
d = mp_encode_double(d, header->tm);
map_size++;
}
+ if (header->txn_id != header->lsn || header->txn_last == 0) {
+ /* Encode txn id for multi row transaction members. */
+ d = mp_encode_uint(d, IPROTO_TXN_ID);
+ d = mp_encode_uint(d, header->txn_id);
+ map_size++;
+ }
+ if (header->txn_replica_id != header->replica_id) {
+ d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID);
+ d = mp_encode_uint(d, header->txn_replica_id);
+ map_size++;
+ }
+ if (header->txn_last && !(header->txn_id == header->lsn &&
+ header->txn_replica_id == header->replica_id)) {
+ /* Set last row for multi row transaction. */
+ d = mp_encode_uint(d, IPROTO_TXN_LAST);
+ d = mp_encode_uint(d, header->txn_last);
+ map_size++;
+ }
assert(d <= data + XROW_HEADER_LEN_MAX);
mp_encode_map(data, map_size);
out->iov_len = d - (char *) out->iov_base;
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 6bab0a1fd..4acd84d56 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -47,7 +47,7 @@ enum {
XROW_HEADER_IOVMAX = 1,
XROW_BODY_IOVMAX = 2,
XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
- XROW_HEADER_LEN_MAX = 40,
+ XROW_HEADER_LEN_MAX = 60,
XROW_BODY_LEN_MAX = 128,
IPROTO_HEADER_LEN = 28,
/** 7 = sizeof(iproto_body_bin). */
@@ -69,6 +69,9 @@ struct xrow_header {
uint64_t sync;
int64_t lsn; /* LSN must be signed for correct comparison */
double tm;
+ int64_t txn_id;
+ uint32_t txn_replica_id;
+ uint32_t txn_last;
int bodycnt;
uint32_t schema_version;
diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
index 165a543cf..0b796d728 100644
--- a/test/unit/xrow.cc
+++ b/test/unit/xrow.cc
@@ -215,6 +215,9 @@ test_xrow_header_encode_decode()
header.lsn = 400;
header.tm = 123.456;
header.bodycnt = 0;
+ header.txn_id = header.lsn;
+ header.txn_replica_id = header.replica_id;
+ header.txn_last = true;
uint64_t sync = 100500;
struct iovec vec[1];
is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
diff --git a/test/vinyl/errinj_stat.result b/test/vinyl/errinj_stat.result
index 08801dbc6..361ddf5db 100644
--- a/test/vinyl/errinj_stat.result
+++ b/test/vinyl/errinj_stat.result
@@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements
- bytes_compressed: <bytes_compressed>
pages: 3
rows: 30
- bytes: 411
+ bytes: 477
...
i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue
---
@@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements
- bytes_compressed: <bytes_compressed>
pages: 4
rows: 40
- bytes: 548
+ bytes: 636
...
i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue
---
@@ -97,7 +97,7 @@ i:stat().disk.compaction.queue -- 50 statements
- bytes_compressed: <bytes_compressed>
pages: 5
rows: 50
- bytes: 685
+ bytes: 795
...
i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue
---
@@ -111,7 +111,7 @@ i:stat().disk.compaction.queue -- 50 statements
- bytes_compressed: <bytes_compressed>
pages: 5
rows: 50
- bytes: 685
+ bytes: 795
...
i:stat().disk.compaction.queue.bytes == box.stat.vinyl().scheduler.compaction_queue
---
diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result
index 14201c5dd..a6b577dbc 100644
--- a/test/vinyl/layout.result
+++ b/test/vinyl/layout.result
@@ -253,8 +253,8 @@ result
BODY:
row_index_offset: <offset>
offset: <offset>
- size: 108
- unpacked_size: 89
+ size: 118
+ unpacked_size: 99
row_count: 4
min_key: ['ёёё']
- - 00000000000000000008.run
@@ -281,7 +281,7 @@ result
- HEADER:
type: ROWINDEX
BODY:
- row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00"
+ row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06"
- - 00000000000000000012.index
- - HEADER:
type: RUNINFO
@@ -298,8 +298,8 @@ result
BODY:
row_index_offset: <offset>
offset: <offset>
- size: 102
- unpacked_size: 83
+ size: 110
+ unpacked_size: 91
row_count: 3
min_key: ['ёёё']
- - 00000000000000000012.run
@@ -324,7 +324,7 @@ result
- HEADER:
type: ROWINDEX
BODY:
- row_index: "\0\0\0\0\0\0\0\x14\0\0\0*"
+ row_index: "\0\0\0\0\0\0\0\x16\0\0\0."
- - 00000000000000000006.index
- - HEADER:
type: RUNINFO
@@ -341,8 +341,8 @@ result
BODY:
row_index_offset: <offset>
offset: <offset>
- size: 108
- unpacked_size: 89
+ size: 118
+ unpacked_size: 99
row_count: 4
min_key: [null, 'ёёё']
- - 00000000000000000006.run
@@ -369,7 +369,7 @@ result
- HEADER:
type: ROWINDEX
BODY:
- row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\00"
+ row_index: "\0\0\0\0\0\0\0\x12\0\0\0$\0\0\06"
- - 00000000000000000010.index
- - HEADER:
type: RUNINFO
@@ -386,8 +386,8 @@ result
BODY:
row_index_offset: <offset>
offset: <offset>
- size: 90
- unpacked_size: 71
+ size: 98
+ unpacked_size: 79
row_count: 3
min_key: [123, 'ёёё']
- - 00000000000000000010.run
@@ -409,7 +409,7 @@ result
- HEADER:
type: ROWINDEX
BODY:
- row_index: "\0\0\0\0\0\0\0\x10\0\0\0\""
+ row_index: "\0\0\0\0\0\0\0\x12\0\0\0&"
...
test_run:cmd("clear filter")
---
diff --git a/test/vinyl/stat.result b/test/vinyl/stat.result
index 419d3e6c2..33b604b9f 100644
--- a/test/vinyl/stat.result
+++ b/test/vinyl/stat.result
@@ -299,7 +299,7 @@ stat_diff(istat(), st)
run_count: 1
disk:
last_level:
- bytes: 26049
+ bytes: 26113
pages: 7
bytes_compressed: <bytes_compressed>
rows: 25
@@ -312,16 +312,16 @@ stat_diff(istat(), st)
bytes: 26525
count: 1
output:
- bytes: 26049
+ bytes: 26113
pages: 7
bytes_compressed: <bytes_compressed>
rows: 25
- bytes: 26049
+ bytes: 26113
index_size: 294
pages: 7
bytes_compressed: <bytes_compressed>
bloom_size: 70
- bytes: 26049
+ bytes: 26113
put:
rows: 25
bytes: 26525
@@ -344,7 +344,7 @@ stat_diff(istat(), st)
---
- disk:
last_level:
- bytes: 26042
+ bytes: 26104
pages: 6
bytes_compressed: <bytes_compressed>
rows: 25
@@ -357,23 +357,23 @@ stat_diff(istat(), st)
bytes: 53050
count: 1
output:
- bytes: 52091
+ bytes: 52217
pages: 13
bytes_compressed: <bytes_compressed>
rows: 50
- bytes: 26042
+ bytes: 26104
index_size: 252
pages: 6
bytes_compressed: <bytes_compressed>
compaction:
input:
- bytes: 78140
+ bytes: 78330
pages: 20
bytes_compressed: <bytes_compressed>
rows: 75
count: 1
output:
- bytes: 52091
+ bytes: 52217
pages: 13
bytes_compressed: <bytes_compressed>
rows: 50
@@ -381,7 +381,7 @@ stat_diff(istat(), st)
rows: 50
bytes: 53050
rows: 25
- bytes: 26042
+ bytes: 26104
...
-- point lookup from disk + cache put
st = istat()
@@ -405,7 +405,7 @@ stat_diff(istat(), st)
disk:
iterator:
read:
- bytes: 4167
+ bytes: 4177
pages: 1
bytes_compressed: <bytes_compressed>
rows: 4
@@ -655,7 +655,7 @@ stat_diff(istat(), st)
disk:
iterator:
read:
- bytes: 104300
+ bytes: 104550
pages: 25
bytes_compressed: <bytes_compressed>
rows: 100
@@ -1000,7 +1000,7 @@ istat()
---
- rows: 306
run_avg: 1
- bytes: 317731
+ bytes: 317981
upsert:
squashed: 0
applied: 0
@@ -1032,7 +1032,7 @@ istat()
bytes_compressed: <bytes_compressed>
pages: 25
rows: 100
- bytes: 104300
+ bytes: 104550
rows: 100
statement:
inserts: 0
@@ -1083,7 +1083,7 @@ istat()
count: 0
pages: 25
bytes_compressed: <bytes_compressed>
- bytes: 104300
+ bytes: 104550
txw:
bytes: 0
rows: 0
@@ -1123,8 +1123,8 @@ gstat()
page_index: 1050
bloom_filter: 140
disk:
- data_compacted: 104300
- data: 104300
+ data_compacted: 104550
+ data: 104550
index: 1190
scheduler:
tasks_inprogress: 0
@@ -1163,7 +1163,7 @@ box.snapshot()
stat_diff(gstat(), st, 'scheduler')
---
- dump_input: 104200
- dump_output: 103592
+ dump_output: 104018
tasks_completed: 2
dump_count: 1
...
@@ -1180,7 +1180,7 @@ box.snapshot()
stat_diff(gstat(), st, 'scheduler')
---
- dump_input: 10420
- dump_output: 10371
+ dump_output: 10417
tasks_completed: 2
dump_count: 1
...
@@ -1195,9 +1195,9 @@ while i1:stat().disk.compaction.count == 0 do fiber.sleep(0.01) end
...
stat_diff(gstat(), st, 'scheduler')
---
-- compaction_input: 112188
+- compaction_input: 112436
tasks_completed: 1
- compaction_output: 101984
+ compaction_output: 102208
...
st = gstat()
---
@@ -1210,9 +1210,9 @@ while i2:stat().disk.compaction.count == 0 do fiber.sleep(0.01) end
...
stat_diff(gstat(), st, 'scheduler')
---
-- compaction_input: 1775
+- compaction_input: 1999
tasks_completed: 1
- compaction_output: 1608
+ compaction_output: 1810
...
s:drop()
---
@@ -1324,7 +1324,7 @@ st2 = i2:stat()
...
s:bsize()
---
-- 52199
+- 52313
...
i1:len(), i2:len()
---
@@ -1334,7 +1334,7 @@ i1:len(), i2:len()
i1:bsize(), i2:bsize()
---
- 364
-- 920
+- 1022
...
s:bsize() == st1.disk.bytes
---
@@ -1386,7 +1386,7 @@ st2 = i2:stat()
...
s:bsize()
---
-- 107449
+- 107563
...
i1:len(), i2:len()
---
@@ -1396,7 +1396,7 @@ i1:len(), i2:len()
i1:bsize(), i2:bsize()
---
- 49516
-- 50072
+- 50174
...
s:bsize() == st1.memory.bytes + st1.disk.bytes
---
@@ -1451,7 +1451,7 @@ st2 = i2:stat()
...
s:bsize()
---
-- 52199
+- 52313
...
i1:len(), i2:len()
---
@@ -1461,7 +1461,7 @@ i1:len(), i2:len()
i1:bsize(), i2:bsize()
---
- 364
-- 920
+- 1022
...
s:bsize() == st1.disk.bytes
---
@@ -1653,18 +1653,18 @@ i1:stat().disk.last_level
- bytes_compressed: <bytes_compressed>
pages: 2
rows: 100
- bytes: 11815
+ bytes: 12019
...
i2:stat().disk.last_level
---
- bytes_compressed: <bytes_compressed>
pages: 1
rows: 100
- bytes: 1608
+ bytes: 1810
...
box.stat.vinyl().disk.data_compacted
---
-- 11815
+- 12019
...
for i = 1, 100, 10 do s:replace{i, i * 1000, digest.urandom(100)} end
---
@@ -1678,18 +1678,18 @@ i1:stat().disk.last_level
- bytes_compressed: <bytes_compressed>
pages: 2
rows: 100
- bytes: 11815
+ bytes: 12019
...
i2:stat().disk.last_level
---
- bytes_compressed: <bytes_compressed>
pages: 1
rows: 100
- bytes: 1608
+ bytes: 1810
...
box.stat.vinyl().disk.data_compacted
---
-- 11815
+- 12019
...
i1:compact()
---
@@ -1702,11 +1702,11 @@ i1:stat().disk.last_level
- bytes_compressed: <bytes_compressed>
pages: 2
rows: 100
- bytes: 11841
+ bytes: 12045
...
box.stat.vinyl().disk.data_compacted
---
-- 11841
+- 12045
...
i2:compact()
---
@@ -1719,11 +1719,11 @@ i2:stat().disk.last_level
- bytes_compressed: <bytes_compressed>
pages: 1
rows: 110
- bytes: 1794
+ bytes: 2016
...
box.stat.vinyl().disk.data_compacted
---
-- 11841
+- 12045
...
s:drop()
---
--
2.20.1
More information about the Tarantool-patches
mailing list