[tarantool-patches] [PATCH v4 1/2] Journal transaction boundaries
Georgy Kirichenko
georgy at tarantool.org
Wed Feb 20 08:59:42 MSK 2019
Append txn_id and txn_commit to xrow_header structure, txn_id identifies
transaction id on replica where transaction was started. As transaction id
a lsn of the first row in the transaction is used. txn_commit is set to true
if it is the last row in a transaction, so we could commit transaction by the
last row or by additional NOP requests with txn_commit set as well as
start transaction with NOP and corresponding txn_id.
As encoding/deconding rule assumed:
* txn_id and txn_commit are encoded only for multi-row transactions.
So if we do not have txn_id while row decoding then this means that it
is a single row transaction.
* TXN_ID field is differential encoded as lsn - txn_id value
* txn_commit packed into TXN_FLAGS field
These rules provide compatibility with previous xlog format as well
as good compaction level.
Needed for: 2798
---
src/box/iproto_constants.c | 8 ++++----
src/box/iproto_constants.h | 7 +++++++
src/box/lua/xlog.c | 10 ++++++++++
src/box/wal.c | 5 +++++
src/box/xrow.c | 33 +++++++++++++++++++++++++++++++++
src/box/xrow.h | 4 +++-
test/unit/xrow.cc | 2 ++
7 files changed, 64 insertions(+), 5 deletions(-)
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index 7fd295775..28891f107 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
/* 0x05 */ MP_UINT, /* IPROTO_SCHEMA_VERSION */
/* 0x06 */ MP_UINT, /* IPROTO_SERVER_VERSION */
/* 0x07 */ MP_UINT, /* IPROTO_GROUP_ID */
+ /* 0x08 */ MP_UINT, /* IPROTO_TXN_ID */
+ /* 0x09 */ MP_UINT, /* IPROTO_TXN_FLAGS */
/* }}} */
/* {{{ unused */
- /* 0x08 */ MP_UINT,
- /* 0x09 */ MP_UINT,
/* 0x0a */ MP_UINT,
/* 0x0b */ MP_UINT,
/* 0x0c */ MP_UINT,
@@ -136,8 +136,8 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = {
"schema version", /* 0x05 */
"server version", /* 0x06 */
"group id", /* 0x07 */
- NULL, /* 0x08 */
- NULL, /* 0x09 */
+ "txn stmt id", /* 0x08 */
+ "txn flags", /* 0x09 */
NULL, /* 0x0a */
NULL, /* 0x0b */
NULL, /* 0x0c */
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 728514297..e31f8a799 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -49,6 +49,11 @@ enum {
XLOG_FIXHEADER_SIZE = 19
};
+enum {
+ /** Set for the last xrow in a transaction. */
+ IPROTO_TXN_FLAG_COMMIT = 0x01,
+};
+
enum iproto_key {
IPROTO_REQUEST_TYPE = 0x00,
IPROTO_SYNC = 0x01,
@@ -60,6 +65,8 @@ enum iproto_key {
IPROTO_SCHEMA_VERSION = 0x05,
IPROTO_SERVER_VERSION = 0x06,
IPROTO_GROUP_ID = 0x07,
+ IPROTO_TXN_ID = 0x08,
+ IPROTO_TXN_FLAGS = 0x09,
/* Leave a gap for other keys in the header. */
IPROTO_SPACE_ID = 0x10,
IPROTO_INDEX_ID = 0x11,
diff --git a/src/box/lua/xlog.c b/src/box/lua/xlog.c
index 3c7cab38c..430d9c27b 100644
--- a/src/box/lua/xlog.c
+++ b/src/box/lua/xlog.c
@@ -221,6 +221,16 @@ lbox_xlog_parser_iterate(struct lua_State *L)
lua_pushnumber(L, row.tm);
lua_settable(L, -3); /* timestamp */
}
+ if (row.txn_id != row.lsn || !row.txn_commit) {
+ lua_pushstring(L, "txn_id");
+ lua_pushnumber(L, row.txn_id);
+ lua_settable(L, -3); /* txn_id */
+ }
+ if (row.txn_commit && row.txn_id != row.lsn) {
+ lua_pushstring(L, "commit");
+ lua_pushboolean(L, true);
+ lua_settable(L, -3); /* txn_commit flag */
+ }
lua_settable(L, -3); /* HEADER */
diff --git a/src/box/wal.c b/src/box/wal.c
index b2652bb17..7a8b84d6a 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -895,12 +895,17 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
struct xrow_header **row,
struct xrow_header **end)
{
+ int64_t txn_id = 0;
/** Assign LSN to all local rows. */
for ( ; row < end; row++) {
if ((*row)->replica_id == 0) {
(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
vclock_get(base, instance_id);
(*row)->replica_id = instance_id;
+ /* Use the lsn of the first local row as txn_id. */
+ txn_id = txn_id == 0? (*row)->lsn: txn_id;
+ (*row)->txn_id = txn_id;
+ (*row)->txn_commit = row == end - 1;
} else {
vclock_follow(vclock_diff, (*row)->replica_id,
(*row)->lsn - vclock_get(base,
diff --git a/src/box/xrow.c b/src/box/xrow.c
index fec8873d0..a96aea3ee 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -102,6 +102,8 @@ error:
if (mp_typeof(**pos) != MP_MAP)
goto error;
+ bool txn_is_set = false;
+ uint32_t txn_flags = 0;
uint32_t size = mp_decode_map(pos);
for (uint32_t i = 0; i < size; i++) {
@@ -133,12 +135,29 @@ error:
case IPROTO_SCHEMA_VERSION:
header->schema_version = mp_decode_uint(pos);
break;
+ case IPROTO_TXN_ID:
+ txn_is_set = true;
+ header->txn_id = mp_decode_uint(pos);
+ break;
+ case IPROTO_TXN_FLAGS:
+ txn_flags = mp_decode_uint(pos);
+ header->txn_commit = txn_flags & IPROTO_TXN_FLAG_COMMIT;
+ break;
default:
/* unknown header */
mp_next(pos);
}
}
assert(*pos <= end);
+ if (!txn_is_set) {
+ /*
+ * Transaction id is not set so it is a single statement
+ * transaction.
+ */
+ header->txn_commit = true;
+ }
+ header->txn_id = header->lsn - header->txn_id;
+
/* Nop requests aren't supposed to have a body. */
if (*pos < end && header->type != IPROTO_NOP) {
const char *body = *pos;
@@ -223,6 +242,20 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
d = mp_encode_double(d, header->tm);
map_size++;
}
+ if (header->txn_id != 0) {
+ if (header->txn_id != header->lsn || header->txn_commit == 0) {
+ /* Encode txn id for multi row transaction members. */
+ d = mp_encode_uint(d, IPROTO_TXN_ID);
+ d = mp_encode_uint(d, header->lsn - header->txn_id);
+ map_size++;
+ }
+ if (header->txn_commit && header->txn_id != header->lsn) {
+ /* Setup last row for multi row transaction. */
+ d = mp_encode_uint(d, IPROTO_TXN_FLAGS);
+ d = mp_encode_uint(d, IPROTO_TXN_FLAG_COMMIT);
+ map_size++;
+ }
+ }
assert(d <= data + XROW_HEADER_LEN_MAX);
mp_encode_map(data, map_size);
out->iov_len = d - (char *) out->iov_base;
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 2fce83bbc..2ea11ee4a 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -47,7 +47,7 @@ enum {
XROW_HEADER_IOVMAX = 1,
XROW_BODY_IOVMAX = 2,
XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
- XROW_HEADER_LEN_MAX = 40,
+ XROW_HEADER_LEN_MAX = 52,
XROW_BODY_LEN_MAX = 128,
IPROTO_HEADER_LEN = 28,
/** 7 = sizeof(iproto_body_bin). */
@@ -63,6 +63,8 @@ struct xrow_header {
uint64_t sync;
int64_t lsn; /* LSN must be signed for correct comparison */
double tm;
+ int64_t txn_id;
+ bool txn_commit;
int bodycnt;
uint32_t schema_version;
diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
index 022d1f998..bc99285de 100644
--- a/test/unit/xrow.cc
+++ b/test/unit/xrow.cc
@@ -215,6 +215,8 @@ test_xrow_header_encode_decode()
header.lsn = 400;
header.tm = 123.456;
header.bodycnt = 0;
+ header.txn_id = header.lsn;
+ header.txn_commit = true;
uint64_t sync = 100500;
struct iovec vec[1];
is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
--
2.20.1
More information about the Tarantool-patches
mailing list