From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id E0F32279DD for ; Thu, 21 Feb 2019 10:27:05 -0500 (EST) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id LC7n8CX5kQjA for ; Thu, 21 Feb 2019 10:27:05 -0500 (EST) Received: from smtp63.i.mail.ru (smtp63.i.mail.ru [217.69.128.43]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 6CC13279EE for ; Thu, 21 Feb 2019 10:27:05 -0500 (EST) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v5 1/2] Journal transaction boundaries Date: Thu, 21 Feb 2019 18:29:16 +0300 Message-Id: <11df785115d046a43de09476e51302a00b1f3486.1550762885.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Append txn_id and is_commit to xrow_header structure, txn_id identifies transaction id on replica where transaction was started. As transaction id a lsn of the first row in the transaction is used. is_commit is set to true for the last row in a transaction. As encoding/deconding rule assumed: * txn_id encoded using transaction sequence number iproto field as IPROTO_TSN = lsn - txn_id, * is_commit packed into IPROTO_FLAGS field with a bit mask, * txn_id and is_commit are encoded only for multi-row transactions. So if we do not have txn_id after row decoding then this means that it is a single row transaction. These rules provide compatibility with previous xlog format as well as good compaction level. Needed for: 2798 --- src/box/iproto_constants.c | 8 +-- src/box/iproto_constants.h | 7 +++ src/box/lua/xlog.c | 10 +++ src/box/wal.c | 5 ++ src/box/xrow.c | 41 +++++++++++++ src/box/xrow.h | 4 +- test/unit/xrow.cc | 2 + test/xlog/transaction.result | 108 +++++++++++++++++++++++++++++++++ test/xlog/transaction.test.lua | 46 ++++++++++++++ 9 files changed, 226 insertions(+), 5 deletions(-) create mode 100644 test/xlog/transaction.result create mode 100644 test/xlog/transaction.test.lua diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index 7fd295775..09ded1ecb 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] = /* 0x05 */ MP_UINT, /* IPROTO_SCHEMA_VERSION */ /* 0x06 */ MP_UINT, /* IPROTO_SERVER_VERSION */ /* 0x07 */ MP_UINT, /* IPROTO_GROUP_ID */ + /* 0x08 */ MP_UINT, /* IPROTO_TSN */ + /* 0x09 */ MP_UINT, /* IPROTO_FLAGS */ /* }}} */ /* {{{ unused */ - /* 0x08 */ MP_UINT, - /* 0x09 */ MP_UINT, /* 0x0a */ MP_UINT, /* 0x0b */ MP_UINT, /* 0x0c */ MP_UINT, @@ -136,8 +136,8 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = { "schema version", /* 0x05 */ "server version", /* 0x06 */ "group id", /* 0x07 */ - NULL, /* 0x08 */ - NULL, /* 0x09 */ + "tsn", /* 0x08 */ + "flags", /* 0x09 */ NULL, /* 0x0a */ NULL, /* 0x0b */ NULL, /* 0x0c */ diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 728514297..126d73352 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -49,6 +49,11 @@ enum { XLOG_FIXHEADER_SIZE = 19 }; +enum { + /** Set for the last xrow in a transaction. */ + IPROTO_FLAG_COMMIT = 0x01, +}; + enum iproto_key { IPROTO_REQUEST_TYPE = 0x00, IPROTO_SYNC = 0x01, @@ -60,6 +65,8 @@ enum iproto_key { IPROTO_SCHEMA_VERSION = 0x05, IPROTO_SERVER_VERSION = 0x06, IPROTO_GROUP_ID = 0x07, + IPROTO_TSN = 0x08, + IPROTO_FLAGS = 0x09, /* Leave a gap for other keys in the header. */ IPROTO_SPACE_ID = 0x10, IPROTO_INDEX_ID = 0x11, diff --git a/src/box/lua/xlog.c b/src/box/lua/xlog.c index 3c7cab38c..bd30187ae 100644 --- a/src/box/lua/xlog.c +++ b/src/box/lua/xlog.c @@ -221,6 +221,16 @@ lbox_xlog_parser_iterate(struct lua_State *L) lua_pushnumber(L, row.tm); lua_settable(L, -3); /* timestamp */ } + if (row.txn_id != row.lsn || !row.is_commit) { + lua_pushstring(L, "txn_id"); + lua_pushnumber(L, row.txn_id); + lua_settable(L, -3); /* txn_id */ + } + if (row.is_commit && row.txn_id != row.lsn) { + lua_pushstring(L, "commit"); + lua_pushboolean(L, true); + lua_settable(L, -3); /* txn_commit flag */ + } lua_settable(L, -3); /* HEADER */ diff --git a/src/box/wal.c b/src/box/wal.c index b2652bb17..78112da77 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -895,12 +895,17 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, struct xrow_header **row, struct xrow_header **end) { + int64_t txn_id = 0; /** Assign LSN to all local rows. */ for ( ; row < end; row++) { if ((*row)->replica_id == 0) { (*row)->lsn = vclock_inc(vclock_diff, instance_id) + vclock_get(base, instance_id); (*row)->replica_id = instance_id; + /* Use the lsn of the first local row as txn_id. */ + txn_id = txn_id == 0? (*row)->lsn: txn_id; + (*row)->txn_id = txn_id; + (*row)->is_commit = row == end - 1; } else { vclock_follow(vclock_diff, (*row)->replica_id, (*row)->lsn - vclock_get(base, diff --git a/src/box/xrow.c b/src/box/xrow.c index fec8873d0..8496a694f 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -102,6 +102,8 @@ error: if (mp_typeof(**pos) != MP_MAP) goto error; + bool has_tsn = false; + uint32_t flags = 0; uint32_t size = mp_decode_map(pos); for (uint32_t i = 0; i < size; i++) { @@ -133,12 +135,30 @@ error: case IPROTO_SCHEMA_VERSION: header->schema_version = mp_decode_uint(pos); break; + case IPROTO_TSN: + has_tsn = true; + header->txn_id = mp_decode_uint(pos); + break; + case IPROTO_FLAGS: + flags = mp_decode_uint(pos); + header->is_commit = flags & IPROTO_FLAG_COMMIT; + break; default: /* unknown header */ mp_next(pos); } } assert(*pos <= end); + if (!has_tsn) { + /* + * Transaction id is not set so it is a single statement + * transaction. + */ + header->is_commit = true; + } + /* Restore transaction id from lsn and transaction serial number. */ + header->txn_id = header->lsn - header->txn_id; + /* Nop requests aren't supposed to have a body. */ if (*pos < end && header->type != IPROTO_NOP) { const char *body = *pos; @@ -223,6 +243,27 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync, d = mp_encode_double(d, header->tm); map_size++; } + if (header->txn_id != 0) { + if (header->txn_id != header->lsn || header->is_commit == 0) { + /* + * Encode a transaction identifier for multi row + * transaction members. + */ + d = mp_encode_uint(d, IPROTO_TSN); + /* + * Differential encoding: write a transaction serial + * number (it is equal to lsn - transaction id) instead. + */ + d = mp_encode_uint(d, header->lsn - header->txn_id); + map_size++; + } + if (header->is_commit && header->txn_id != header->lsn) { + /* Setup last row for multi row transaction. */ + d = mp_encode_uint(d, IPROTO_FLAGS); + d = mp_encode_uint(d, IPROTO_FLAG_COMMIT); + map_size++; + } + } assert(d <= data + XROW_HEADER_LEN_MAX); mp_encode_map(data, map_size); out->iov_len = d - (char *) out->iov_base; diff --git a/src/box/xrow.h b/src/box/xrow.h index 2fce83bbc..ccd57f8b2 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -47,7 +47,7 @@ enum { XROW_HEADER_IOVMAX = 1, XROW_BODY_IOVMAX = 2, XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, - XROW_HEADER_LEN_MAX = 40, + XROW_HEADER_LEN_MAX = 52, XROW_BODY_LEN_MAX = 128, IPROTO_HEADER_LEN = 28, /** 7 = sizeof(iproto_body_bin). */ @@ -63,6 +63,8 @@ struct xrow_header { uint64_t sync; int64_t lsn; /* LSN must be signed for correct comparison */ double tm; + int64_t txn_id; + bool is_commit; int bodycnt; uint32_t schema_version; diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc index 022d1f998..e0e7f1279 100644 --- a/test/unit/xrow.cc +++ b/test/unit/xrow.cc @@ -215,6 +215,8 @@ test_xrow_header_encode_decode() header.lsn = 400; header.tm = 123.456; header.bodycnt = 0; + header.txn_id = header.lsn; + header.is_commit = true; uint64_t sync = 100500; struct iovec vec[1]; is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); diff --git a/test/xlog/transaction.result b/test/xlog/transaction.result new file mode 100644 index 000000000..d2d9053ae --- /dev/null +++ b/test/xlog/transaction.result @@ -0,0 +1,108 @@ +fio = require('fio') +--- +... +xlog = require('xlog').pairs +--- +... +env = require('test_run') +--- +... +test_run = env.new() +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function read_xlog(file) + local val = {} + for k, v in xlog(file) do + table.insert(val, setmetatable(v, { __serialize = "map"})) + end + return val +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- gh-2798 check for journal transaction encoding +_ = box.schema.space.create('test'):create_index('pk') +--- +... +-- generate a new xlog +box.snapshot() +--- +- ok +... +lsn = box.info.lsn +--- +... +-- autocommit transaction +box.space.test:replace({1}) +--- +- [1] +... +-- one row transaction +box.begin() box.space.test:replace({2}) box.commit() +--- +... +-- two row transaction +box.begin() for i = 3, 4 do box.space.test:replace({i}) end box.commit() +--- +... +-- four row transaction +box.begin() for i = 5, 8 do box.space.test:replace({i}) end box.commit() +--- +... +-- open a new xlog +box.snapshot() +--- +- ok +... +-- read a previous one +lsn_str = tostring(lsn) +--- +... +data = read_xlog(fio.pathjoin(box.cfg.wal_dir, string.rep('0', 20 - #lsn_str) .. tostring(lsn_str) .. '.xlog')) +--- +... +-- check nothing changed for single row transactions +data[1].HEADER.txn_id == nil and data[1].HEADER.commit == nil +--- +- true +... +data[2].HEADER.txn_id == nil and data[2].HEADER.commit == nil +--- +- true +... +-- check two row transaction +data[3].HEADER.txn_id == data[3].HEADER.lsn and data[3].HEADER.commit == nil +--- +- true +... +data[4].HEADER.txn_id == data[3].HEADER.txn_id and data[4].HEADER.commit == true +--- +- true +... +-- check four row transaction +data[5].HEADER.txn_id == data[5].HEADER.lsn and data[5].HEADER.commit == nil +--- +- true +... +data[6].HEADER.txn_id == data[5].HEADER.txn_id and data[6].HEADER.commit == nil +--- +- true +... +data[7].HEADER.txn_id == data[5].HEADER.txn_id and data[7].HEADER.commit == nil +--- +- true +... +data[8].HEADER.txn_id == data[5].HEADER.txn_id and data[8].HEADER.commit == true +--- +- true +... +box.space.test:drop() +--- +... diff --git a/test/xlog/transaction.test.lua b/test/xlog/transaction.test.lua new file mode 100644 index 000000000..062635098 --- /dev/null +++ b/test/xlog/transaction.test.lua @@ -0,0 +1,46 @@ +fio = require('fio') +xlog = require('xlog').pairs +env = require('test_run') +test_run = env.new() + +test_run:cmd("setopt delimiter ';'") +function read_xlog(file) + local val = {} + for k, v in xlog(file) do + table.insert(val, setmetatable(v, { __serialize = "map"})) + end + return val +end; +test_run:cmd("setopt delimiter ''"); + + +-- gh-2798 check for journal transaction encoding +_ = box.schema.space.create('test'):create_index('pk') +-- generate a new xlog +box.snapshot() +lsn = box.info.lsn +-- autocommit transaction +box.space.test:replace({1}) +-- one row transaction +box.begin() box.space.test:replace({2}) box.commit() +-- two row transaction +box.begin() for i = 3, 4 do box.space.test:replace({i}) end box.commit() +-- four row transaction +box.begin() for i = 5, 8 do box.space.test:replace({i}) end box.commit() +-- open a new xlog +box.snapshot() +-- read a previous one +lsn_str = tostring(lsn) +data = read_xlog(fio.pathjoin(box.cfg.wal_dir, string.rep('0', 20 - #lsn_str) .. tostring(lsn_str) .. '.xlog')) +-- check nothing changed for single row transactions +data[1].HEADER.txn_id == nil and data[1].HEADER.commit == nil +data[2].HEADER.txn_id == nil and data[2].HEADER.commit == nil +-- check two row transaction +data[3].HEADER.txn_id == data[3].HEADER.lsn and data[3].HEADER.commit == nil +data[4].HEADER.txn_id == data[3].HEADER.txn_id and data[4].HEADER.commit == true +-- check four row transaction +data[5].HEADER.txn_id == data[5].HEADER.lsn and data[5].HEADER.commit == nil +data[6].HEADER.txn_id == data[5].HEADER.txn_id and data[6].HEADER.commit == nil +data[7].HEADER.txn_id == data[5].HEADER.txn_id and data[7].HEADER.commit == nil +data[8].HEADER.txn_id == data[5].HEADER.txn_id and data[8].HEADER.commit == true +box.space.test:drop() -- 2.20.1