From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id E1B51270F1 for ; Tue, 12 Feb 2019 15:02:27 -0500 (EST) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id cYg-DgzglnyQ for ; Tue, 12 Feb 2019 15:02:27 -0500 (EST) Received: from smtp45.i.mail.ru (smtp45.i.mail.ru [94.100.177.105]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 6EA81270CA for ; Tue, 12 Feb 2019 15:02:27 -0500 (EST) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries Date: Tue, 12 Feb 2019 23:04:31 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Append txn_id and txn_commit to xrow_header structure, txn_id identifies transaction id on replica where transaction was started. As transaction id a lsn of the first row in the transaction is used. txn_commit is set to true if it is the last row in a transaction, so we could commit transaction by the last row or by additional NOP requests with txn_commit set as well as start transaction with NOP and corresponding txn_id. In case of replication all local changes moved to an journal entry tail to form a separate transaction (like autonomous transaction) to be able to replicate changes back. As encoding/deconding rule assumed: * txn_id and txn_commit are encoded only for multi-row transactions. So if we do not have txn_id while row decoding then this means that it is a single row transaction. * TXN_ID field is differential encoded as lsn - txn_id value * txn_commit packed into TXN_FLAGS field These rules provide compatibility with previous xlog format as well as good compaction level. Needed for: 2798 --- src/box/iproto_constants.c | 4 ++-- src/box/iproto_constants.h | 7 +++++++ src/box/txn.c | 21 +++++++++++++++++---- src/box/txn.h | 2 ++ src/box/wal.c | 28 ++++++++++++++++++++++++---- src/box/xrow.c | 36 ++++++++++++++++++++++++++++++++++++ src/box/xrow.h | 4 +++- test/unit/xrow.cc | 2 ++ 8 files changed, 93 insertions(+), 11 deletions(-) diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index 7fd295775..4d2e21752 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] = /* 0x05 */ MP_UINT, /* IPROTO_SCHEMA_VERSION */ /* 0x06 */ MP_UINT, /* IPROTO_SERVER_VERSION */ /* 0x07 */ MP_UINT, /* IPROTO_GROUP_ID */ + /* 0x08 */ MP_UINT, /* IPROTO_TXN_ID */ + /* 0x09 */ MP_UINT, /* IPROTO_TXN_FLAGS */ /* }}} */ /* {{{ unused */ - /* 0x08 */ MP_UINT, - /* 0x09 */ MP_UINT, /* 0x0a */ MP_UINT, /* 0x0b */ MP_UINT, /* 0x0c */ MP_UINT, diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 728514297..fd80e3111 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -49,6 +49,11 @@ enum { XLOG_FIXHEADER_SIZE = 19 }; +enum { + /** Set for the last xrow in a transaction. */ + TXN_FLAG_COMMIT = 0x01, +}; + enum iproto_key { IPROTO_REQUEST_TYPE = 0x00, IPROTO_SYNC = 0x01, @@ -60,6 +65,8 @@ enum iproto_key { IPROTO_SCHEMA_VERSION = 0x05, IPROTO_SERVER_VERSION = 0x06, IPROTO_GROUP_ID = 0x07, + IPROTO_TXN_ID = 0x08, + IPROTO_TXN_FLAGS = 0x09, /* Leave a gap for other keys in the header. */ IPROTO_SPACE_ID = 0x10, IPROTO_INDEX_ID = 0x11, diff --git a/src/box/txn.c b/src/box/txn.c index 7f4e85b47..de0152706 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -34,6 +34,7 @@ #include "journal.h" #include #include "xrow.h" +#include "replication.h" double too_long_threshold; @@ -150,6 +151,7 @@ txn_begin(bool is_autocommit) txn->engine = NULL; txn->engine_tx = NULL; txn->psql_txn = NULL; + txn->remote_row_count = 0; /* fiber_on_yield/fiber_on_stop initialized by engine on demand */ fiber_set_txn(fiber(), txn); return txn; @@ -231,6 +233,10 @@ txn_commit_stmt(struct txn *txn, struct request *request) * stmt->space can be NULL for IRPOTO_NOP. */ if (stmt->space == NULL || !space_is_temporary(stmt->space)) { + if (request->header && + request->header->replica_id != instance_id && + request->header->replica_id != 0) + ++txn->remote_row_count; if (txn_add_redo(stmt, request) != 0) goto fail; ++txn->n_rows; @@ -271,14 +277,21 @@ txn_write_to_wal(struct txn *txn) return -1; struct txn_stmt *stmt; - struct xrow_header **row = req->rows; + struct xrow_header **remote_row = req->rows; + struct xrow_header **local_row = req->rows + txn->remote_row_count; stailq_foreach_entry(stmt, &txn->stmts, next) { if (stmt->row == NULL) - continue; /* A read (e.g. select) request */ - *row++ = stmt->row; + /* A read (e.g. select) request */ + continue; + if (stmt->row->replica_id == instance_id || + stmt->row->replica_id == 0) + *local_row++ = stmt->row; + else + *remote_row++ = stmt->row; req->approx_len += xrow_approx_len(stmt->row); } - assert(row == req->rows + req->n_rows); + assert(remote_row == req->rows + txn->remote_row_count); + assert(local_row == req->rows + req->n_rows); ev_tstamp start = ev_monotonic_now(loop()); int64_t res = journal_write(req); diff --git a/src/box/txn.h b/src/box/txn.h index de5cb0de4..143f21715 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -180,6 +180,8 @@ struct txn { /** Commit and rollback triggers */ struct rlist on_commit, on_rollback; struct sql_txn *psql_txn; + /** Count of remote rows. */ + uint32_t remote_row_count; }; /* Pointer to the current transaction (if any) */ diff --git a/src/box/wal.c b/src/box/wal.c index cdcaabc00..0ea3be68d 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -896,10 +896,11 @@ wal_writer_begin_rollback(struct wal_writer *writer) cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback); } -static void -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, +static int +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin, struct xrow_header **end) { + struct xrow_header **row = begin; /** Assign LSN to all local rows. */ for ( ; row < end; row++) { if ((*row)->replica_id == 0) { @@ -909,6 +910,21 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, vclock_follow_xrow(vclock, *row); } } + while (begin < end && begin[0]->replica_id != instance_id) + ++begin; + /* Setup txn_id and tnx_replica_id for locally generated rows. */ + row = begin; + while (row < end) { + if (row[0]->replica_id != instance_id) { + diag_set(ClientError, ER_UNSUPPORTED, + "Interleaved transactions"); + return -1; + } + row[0]->txn_id = begin[0]->lsn; + row[0]->txn_commit = row == end - 1 ? 1 : 0; + ++row; + } + return 0; } static void @@ -979,7 +995,9 @@ wal_write_to_disk(struct cmsg *msg) struct journal_entry *entry; struct stailq_entry *last_committed = NULL; stailq_foreach_entry(entry, &wal_msg->commit, fifo) { - wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows); + if (wal_assign_lsn(&vclock, entry->rows, + entry->rows + entry->n_rows) < 0) + goto done; entry->res = vclock_sum(&vclock); rc = xlog_write_entry(l, entry); if (rc < 0) @@ -1173,7 +1191,9 @@ wal_write_in_wal_mode_none(struct journal *journal, struct journal_entry *entry) { struct wal_writer *writer = (struct wal_writer *) journal; - wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows); + if (wal_assign_lsn(&writer->vclock, entry->rows, + entry->rows + entry->n_rows) != 0) + return -1; vclock_copy(&replicaset.vclock, &writer->vclock); return vclock_sum(&writer->vclock); } diff --git a/src/box/xrow.c b/src/box/xrow.c index fec8873d0..29fa75de4 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -102,6 +102,8 @@ error: if (mp_typeof(**pos) != MP_MAP) goto error; + bool txn_is_set = false; + uint32_t txn_flags = 0; uint32_t size = mp_decode_map(pos); for (uint32_t i = 0; i < size; i++) { @@ -133,12 +135,32 @@ error: case IPROTO_SCHEMA_VERSION: header->schema_version = mp_decode_uint(pos); break; + case IPROTO_TXN_ID: + txn_is_set = true; + header->txn_id = mp_decode_uint(pos); + break; + case IPROTO_TXN_FLAGS: + txn_flags = mp_decode_uint(pos); + header->txn_commit = txn_flags & TXN_FLAG_COMMIT; + if ((txn_flags & ~TXN_FLAG_COMMIT) != 0) + /* Unknow flags. */ + goto error; + break; default: /* unknown header */ mp_next(pos); } } assert(*pos <= end); + if (!txn_is_set) { + /* + * Transaction id is not set so it is a single statement + * transaction. + */ + header->txn_commit = true; + } + header->txn_id = header->lsn + header->txn_id; + /* Nop requests aren't supposed to have a body. */ if (*pos < end && header->type != IPROTO_NOP) { const char *body = *pos; @@ -223,6 +245,20 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync, d = mp_encode_double(d, header->tm); map_size++; } + if (header->txn_id != 0) { + if (header->txn_id != header->lsn || header->txn_commit == 0) { + /* Encode txn id for multi row transaction members. */ + d = mp_encode_uint(d, IPROTO_TXN_ID); + d = mp_encode_uint(d, header->lsn - header->txn_id); + map_size++; + } + if (header->txn_commit && header->txn_id != header->lsn) { + /* Setup last row for multi row transaction. */ + d = mp_encode_uint(d, IPROTO_TXN_FLAGS); + d = mp_encode_uint(d, TXN_FLAG_COMMIT); + map_size++; + } + } assert(d <= data + XROW_HEADER_LEN_MAX); mp_encode_map(data, map_size); out->iov_len = d - (char *) out->iov_base; diff --git a/src/box/xrow.h b/src/box/xrow.h index 719add4f0..bc4c4a2d7 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -47,7 +47,7 @@ enum { XROW_HEADER_IOVMAX = 1, XROW_BODY_IOVMAX = 2, XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, - XROW_HEADER_LEN_MAX = 40, + XROW_HEADER_LEN_MAX = 52, XROW_BODY_LEN_MAX = 128, IPROTO_HEADER_LEN = 28, /** 7 = sizeof(iproto_body_bin). */ @@ -63,6 +63,8 @@ struct xrow_header { uint64_t sync; int64_t lsn; /* LSN must be signed for correct comparison */ double tm; + int64_t txn_id; + bool txn_commit; int bodycnt; uint32_t schema_version; diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc index 022d1f998..bc99285de 100644 --- a/test/unit/xrow.cc +++ b/test/unit/xrow.cc @@ -215,6 +215,8 @@ test_xrow_header_encode_decode() header.lsn = 400; header.tm = 123.456; header.bodycnt = 0; + header.txn_id = header.lsn; + header.txn_commit = true; uint64_t sync = 100500; struct iovec vec[1]; is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); -- 2.20.1