From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 056F82242E for ; Wed, 20 Feb 2019 00:57:33 -0500 (EST) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id QLGhJybR3jYA for ; Wed, 20 Feb 2019 00:57:32 -0500 (EST) Received: from smtp52.i.mail.ru (smtp52.i.mail.ru [94.100.177.112]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id A5EDB2208F for ; Wed, 20 Feb 2019 00:57:32 -0500 (EST) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v4 1/2] Journal transaction boundaries Date: Wed, 20 Feb 2019 08:59:42 +0300 Message-Id: <5cdf365f3e59a6fe4664506d5111d6c463119917.1550641903.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Append txn_id and txn_commit to xrow_header structure, txn_id identifies transaction id on replica where transaction was started. As transaction id a lsn of the first row in the transaction is used. txn_commit is set to true if it is the last row in a transaction, so we could commit transaction by the last row or by additional NOP requests with txn_commit set as well as start transaction with NOP and corresponding txn_id. As encoding/deconding rule assumed: * txn_id and txn_commit are encoded only for multi-row transactions. So if we do not have txn_id while row decoding then this means that it is a single row transaction. * TXN_ID field is differential encoded as lsn - txn_id value * txn_commit packed into TXN_FLAGS field These rules provide compatibility with previous xlog format as well as good compaction level. Needed for: 2798 --- src/box/iproto_constants.c | 8 ++++---- src/box/iproto_constants.h | 7 +++++++ src/box/lua/xlog.c | 10 ++++++++++ src/box/wal.c | 5 +++++ src/box/xrow.c | 33 +++++++++++++++++++++++++++++++++ src/box/xrow.h | 4 +++- test/unit/xrow.cc | 2 ++ 7 files changed, 64 insertions(+), 5 deletions(-) diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index 7fd295775..28891f107 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] = /* 0x05 */ MP_UINT, /* IPROTO_SCHEMA_VERSION */ /* 0x06 */ MP_UINT, /* IPROTO_SERVER_VERSION */ /* 0x07 */ MP_UINT, /* IPROTO_GROUP_ID */ + /* 0x08 */ MP_UINT, /* IPROTO_TXN_ID */ + /* 0x09 */ MP_UINT, /* IPROTO_TXN_FLAGS */ /* }}} */ /* {{{ unused */ - /* 0x08 */ MP_UINT, - /* 0x09 */ MP_UINT, /* 0x0a */ MP_UINT, /* 0x0b */ MP_UINT, /* 0x0c */ MP_UINT, @@ -136,8 +136,8 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = { "schema version", /* 0x05 */ "server version", /* 0x06 */ "group id", /* 0x07 */ - NULL, /* 0x08 */ - NULL, /* 0x09 */ + "txn stmt id", /* 0x08 */ + "txn flags", /* 0x09 */ NULL, /* 0x0a */ NULL, /* 0x0b */ NULL, /* 0x0c */ diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 728514297..e31f8a799 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -49,6 +49,11 @@ enum { XLOG_FIXHEADER_SIZE = 19 }; +enum { + /** Set for the last xrow in a transaction. */ + IPROTO_TXN_FLAG_COMMIT = 0x01, +}; + enum iproto_key { IPROTO_REQUEST_TYPE = 0x00, IPROTO_SYNC = 0x01, @@ -60,6 +65,8 @@ enum iproto_key { IPROTO_SCHEMA_VERSION = 0x05, IPROTO_SERVER_VERSION = 0x06, IPROTO_GROUP_ID = 0x07, + IPROTO_TXN_ID = 0x08, + IPROTO_TXN_FLAGS = 0x09, /* Leave a gap for other keys in the header. */ IPROTO_SPACE_ID = 0x10, IPROTO_INDEX_ID = 0x11, diff --git a/src/box/lua/xlog.c b/src/box/lua/xlog.c index 3c7cab38c..430d9c27b 100644 --- a/src/box/lua/xlog.c +++ b/src/box/lua/xlog.c @@ -221,6 +221,16 @@ lbox_xlog_parser_iterate(struct lua_State *L) lua_pushnumber(L, row.tm); lua_settable(L, -3); /* timestamp */ } + if (row.txn_id != row.lsn || !row.txn_commit) { + lua_pushstring(L, "txn_id"); + lua_pushnumber(L, row.txn_id); + lua_settable(L, -3); /* txn_id */ + } + if (row.txn_commit && row.txn_id != row.lsn) { + lua_pushstring(L, "commit"); + lua_pushboolean(L, true); + lua_settable(L, -3); /* txn_commit flag */ + } lua_settable(L, -3); /* HEADER */ diff --git a/src/box/wal.c b/src/box/wal.c index b2652bb17..7a8b84d6a 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -895,12 +895,17 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, struct xrow_header **row, struct xrow_header **end) { + int64_t txn_id = 0; /** Assign LSN to all local rows. */ for ( ; row < end; row++) { if ((*row)->replica_id == 0) { (*row)->lsn = vclock_inc(vclock_diff, instance_id) + vclock_get(base, instance_id); (*row)->replica_id = instance_id; + /* Use the lsn of the first local row as txn_id. */ + txn_id = txn_id == 0? (*row)->lsn: txn_id; + (*row)->txn_id = txn_id; + (*row)->txn_commit = row == end - 1; } else { vclock_follow(vclock_diff, (*row)->replica_id, (*row)->lsn - vclock_get(base, diff --git a/src/box/xrow.c b/src/box/xrow.c index fec8873d0..a96aea3ee 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -102,6 +102,8 @@ error: if (mp_typeof(**pos) != MP_MAP) goto error; + bool txn_is_set = false; + uint32_t txn_flags = 0; uint32_t size = mp_decode_map(pos); for (uint32_t i = 0; i < size; i++) { @@ -133,12 +135,29 @@ error: case IPROTO_SCHEMA_VERSION: header->schema_version = mp_decode_uint(pos); break; + case IPROTO_TXN_ID: + txn_is_set = true; + header->txn_id = mp_decode_uint(pos); + break; + case IPROTO_TXN_FLAGS: + txn_flags = mp_decode_uint(pos); + header->txn_commit = txn_flags & IPROTO_TXN_FLAG_COMMIT; + break; default: /* unknown header */ mp_next(pos); } } assert(*pos <= end); + if (!txn_is_set) { + /* + * Transaction id is not set so it is a single statement + * transaction. + */ + header->txn_commit = true; + } + header->txn_id = header->lsn - header->txn_id; + /* Nop requests aren't supposed to have a body. */ if (*pos < end && header->type != IPROTO_NOP) { const char *body = *pos; @@ -223,6 +242,20 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync, d = mp_encode_double(d, header->tm); map_size++; } + if (header->txn_id != 0) { + if (header->txn_id != header->lsn || header->txn_commit == 0) { + /* Encode txn id for multi row transaction members. */ + d = mp_encode_uint(d, IPROTO_TXN_ID); + d = mp_encode_uint(d, header->lsn - header->txn_id); + map_size++; + } + if (header->txn_commit && header->txn_id != header->lsn) { + /* Setup last row for multi row transaction. */ + d = mp_encode_uint(d, IPROTO_TXN_FLAGS); + d = mp_encode_uint(d, IPROTO_TXN_FLAG_COMMIT); + map_size++; + } + } assert(d <= data + XROW_HEADER_LEN_MAX); mp_encode_map(data, map_size); out->iov_len = d - (char *) out->iov_base; diff --git a/src/box/xrow.h b/src/box/xrow.h index 2fce83bbc..2ea11ee4a 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -47,7 +47,7 @@ enum { XROW_HEADER_IOVMAX = 1, XROW_BODY_IOVMAX = 2, XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, - XROW_HEADER_LEN_MAX = 40, + XROW_HEADER_LEN_MAX = 52, XROW_BODY_LEN_MAX = 128, IPROTO_HEADER_LEN = 28, /** 7 = sizeof(iproto_body_bin). */ @@ -63,6 +63,8 @@ struct xrow_header { uint64_t sync; int64_t lsn; /* LSN must be signed for correct comparison */ double tm; + int64_t txn_id; + bool txn_commit; int bodycnt; uint32_t schema_version; diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc index 022d1f998..bc99285de 100644 --- a/test/unit/xrow.cc +++ b/test/unit/xrow.cc @@ -215,6 +215,8 @@ test_xrow_header_encode_decode() header.lsn = 400; header.tm = 123.456; header.bodycnt = 0; + header.txn_id = header.lsn; + header.txn_commit = true; uint64_t sync = 100500; struct iovec vec[1]; is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); -- 2.20.1