[tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries
Vladimir Davydov
vdavydov.dev at gmail.com
Fri Feb 15 16:15:41 MSK 2019
On Tue, Feb 12, 2019 at 11:04:31PM +0300, Georgy Kirichenko wrote:
> Append txn_id and txn_commit to xrow_header structure, txn_id identifies
> transaction id on replica where transaction was started. As transaction id
> a lsn of the first row in the transaction is used. txn_commit is set to true
> if it is the last row in a transaction, so we could commit transaction by the
> last row or by additional NOP requests with txn_commit set as well as
> start transaction with NOP and corresponding txn_id. In case of replication
> all local changes moved to an journal entry tail to form a separate transaction
> (like autonomous transaction) to be able to replicate changes back.
>
> As encoding/deconding rule assumed:
> * txn_id and txn_commit are encoded only for multi-row transactions.
> So if we do not have txn_id while row decoding then this means that it
> is a single row transaction.
> * TXN_ID field is differential encoded as lsn - txn_id value
> * txn_commit packed into TXN_FLAGS field
>
> These rules provide compatibility with previous xlog format as well
> as good compaction level.
>
> Needed for: 2798
> ---
> src/box/iproto_constants.c | 4 ++--
> src/box/iproto_constants.h | 7 +++++++
> src/box/txn.c | 21 +++++++++++++++++----
> src/box/txn.h | 2 ++
> src/box/wal.c | 28 ++++++++++++++++++++++++----
> src/box/xrow.c | 36 ++++++++++++++++++++++++++++++++++++
> src/box/xrow.h | 4 +++-
> test/unit/xrow.cc | 2 ++
> 8 files changed, 93 insertions(+), 11 deletions(-)
>
> diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
> index 7fd295775..4d2e21752 100644
> --- a/src/box/iproto_constants.c
> +++ b/src/box/iproto_constants.c
> @@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
> /* 0x05 */ MP_UINT, /* IPROTO_SCHEMA_VERSION */
> /* 0x06 */ MP_UINT, /* IPROTO_SERVER_VERSION */
> /* 0x07 */ MP_UINT, /* IPROTO_GROUP_ID */
> + /* 0x08 */ MP_UINT, /* IPROTO_TXN_ID */
> + /* 0x09 */ MP_UINT, /* IPROTO_TXN_FLAGS */
I don't quite like the name, because we encode not a txn id, but the
statement number in a txn. Let's rename it to IPROTO_TXN_STMT_NO or
IPROTO_TXN_STMT_ID or IPROTO_TXN_STMT_SEQ?
xrow_header::txn_id is OK though.
> /* }}} */
>
> /* {{{ unused */
> - /* 0x08 */ MP_UINT,
> - /* 0x09 */ MP_UINT,
> /* 0x0a */ MP_UINT,
> /* 0x0b */ MP_UINT,
> /* 0x0c */ MP_UINT,
You forgot to patch iproto_key_strs.
Let's please add a test that ensures that those new headers are written
to xlogs. You could use xlog reader for that.
> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> index 728514297..fd80e3111 100644
> --- a/src/box/iproto_constants.h
> +++ b/src/box/iproto_constants.h
> @@ -49,6 +49,11 @@ enum {
> XLOG_FIXHEADER_SIZE = 19
> };
>
/* IPROTO_TXN_FLAGS bits. */
> +enum {
> + /** Set for the last xrow in a transaction. */
> + TXN_FLAG_COMMIT = 0x01,
IPROTO_TXN_COMMIT?
> +};
> +
> enum iproto_key {
> IPROTO_REQUEST_TYPE = 0x00,
> IPROTO_SYNC = 0x01,
> @@ -60,6 +65,8 @@ enum iproto_key {
> IPROTO_SCHEMA_VERSION = 0x05,
> IPROTO_SERVER_VERSION = 0x06,
> IPROTO_GROUP_ID = 0x07,
> + IPROTO_TXN_ID = 0x08,
> + IPROTO_TXN_FLAGS = 0x09,
> /* Leave a gap for other keys in the header. */
> IPROTO_SPACE_ID = 0x10,
> IPROTO_INDEX_ID = 0x11,
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 7f4e85b47..de0152706 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -34,6 +34,7 @@
> #include "journal.h"
> #include <fiber.h>
> #include "xrow.h"
> +#include "replication.h"
>
> double too_long_threshold;
>
> @@ -150,6 +151,7 @@ txn_begin(bool is_autocommit)
> txn->engine = NULL;
> txn->engine_tx = NULL;
> txn->psql_txn = NULL;
> + txn->remote_row_count = 0;
Nit: let's rename it to n_remote_rows to match n_rows and keep the two
member initializers together.
> /* fiber_on_yield/fiber_on_stop initialized by engine on demand */
> fiber_set_txn(fiber(), txn);
> return txn;
> @@ -231,6 +233,10 @@ txn_commit_stmt(struct txn *txn, struct request *request)
> * stmt->space can be NULL for IRPOTO_NOP.
> */
> if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
> + if (request->header &&
> + request->header->replica_id != instance_id &&
> + request->header->replica_id != 0)
> + ++txn->remote_row_count;
Nit: if we moved this after txn_add_redo(), then we wouldn't have to
check if request->header is set.
> if (txn_add_redo(stmt, request) != 0)
> goto fail;
> ++txn->n_rows;
> @@ -271,14 +277,21 @@ txn_write_to_wal(struct txn *txn)
> return -1;
>
> struct txn_stmt *stmt;
> - struct xrow_header **row = req->rows;
> + struct xrow_header **remote_row = req->rows;
> + struct xrow_header **local_row = req->rows + txn->remote_row_count;
> stailq_foreach_entry(stmt, &txn->stmts, next) {
> if (stmt->row == NULL)
> - continue; /* A read (e.g. select) request */
> - *row++ = stmt->row;
> + /* A read (e.g. select) request */
> + continue;
Nit: pointless change, please remove.
> + if (stmt->row->replica_id == instance_id ||
> + stmt->row->replica_id == 0)
> + *local_row++ = stmt->row;
> + else
> + *remote_row++ = stmt->row;
This piece of code looks nice, but it definitely needs a comment: what
we do, why we do that...
Anyway, we should add a test for this change. May be, it's even worth
submitting this change in a separate patch.
> req->approx_len += xrow_approx_len(stmt->row);
> }
> - assert(row == req->rows + req->n_rows);
> + assert(remote_row == req->rows + txn->remote_row_count);
> + assert(local_row == req->rows + req->n_rows);
>
> ev_tstamp start = ev_monotonic_now(loop());
> int64_t res = journal_write(req);
> diff --git a/src/box/txn.h b/src/box/txn.h
> index de5cb0de4..143f21715 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -180,6 +180,8 @@ struct txn {
> /** Commit and rollback triggers */
> struct rlist on_commit, on_rollback;
> struct sql_txn *psql_txn;
> + /** Count of remote rows. */
> + uint32_t remote_row_count;
Nit: please use int rather than uint32_t and move the definition after
n_rows, because those are closely related.
> };
>
> /* Pointer to the current transaction (if any) */
> diff --git a/src/box/wal.c b/src/box/wal.c
> index cdcaabc00..0ea3be68d 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -896,10 +896,11 @@ wal_writer_begin_rollback(struct wal_writer *writer)
> cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback);
> }
>
> -static void
> -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> +static int
> +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
> struct xrow_header **end)
> {
> + struct xrow_header **row = begin;
> /** Assign LSN to all local rows. */
> for ( ; row < end; row++) {
> if ((*row)->replica_id == 0) {
> @@ -909,6 +910,21 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> vclock_follow_xrow(vclock, *row);
> }
> }
> + while (begin < end && begin[0]->replica_id != instance_id)
> + ++begin;
> + /* Setup txn_id and tnx_replica_id for locally generated rows. */
> + row = begin;
> + while (row < end) {
> + if (row[0]->replica_id != instance_id) {
> + diag_set(ClientError, ER_UNSUPPORTED,
> + "Interleaved transactions");
> + return -1;
Do we really need to bother about it here, in WAL? IMO a check in
applier would be enough.
> + }
> + row[0]->txn_id = begin[0]->lsn;
> + row[0]->txn_commit = row == end - 1 ? 1 : 0;
> + ++row;
Why can't we do this while we are iterating over rows just a few lines
above, assigning LSNs?
> + }
> + return 0;
> }
>
> static void
> @@ -979,7 +995,9 @@ wal_write_to_disk(struct cmsg *msg)
> struct journal_entry *entry;
> struct stailq_entry *last_committed = NULL;
> stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
> - wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows);
> + if (wal_assign_lsn(&vclock, entry->rows,
> + entry->rows + entry->n_rows) < 0)
> + goto done;
> entry->res = vclock_sum(&vclock);
> rc = xlog_write_entry(l, entry);
> if (rc < 0)
> @@ -1173,7 +1191,9 @@ wal_write_in_wal_mode_none(struct journal *journal,
> struct journal_entry *entry)
> {
> struct wal_writer *writer = (struct wal_writer *) journal;
> - wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
> + if (wal_assign_lsn(&writer->vclock, entry->rows,
> + entry->rows + entry->n_rows) != 0)
> + return -1;
> vclock_copy(&replicaset.vclock, &writer->vclock);
> return vclock_sum(&writer->vclock);
> }
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index fec8873d0..29fa75de4 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -102,6 +102,8 @@ error:
>
> if (mp_typeof(**pos) != MP_MAP)
> goto error;
> + bool txn_is_set = false;
> + uint32_t txn_flags = 0;
>
> uint32_t size = mp_decode_map(pos);
> for (uint32_t i = 0; i < size; i++) {
> @@ -133,12 +135,32 @@ error:
> case IPROTO_SCHEMA_VERSION:
> header->schema_version = mp_decode_uint(pos);
> break;
> + case IPROTO_TXN_ID:
> + txn_is_set = true;
> + header->txn_id = mp_decode_uint(pos);
> + break;
> + case IPROTO_TXN_FLAGS:
> + txn_flags = mp_decode_uint(pos);
> + header->txn_commit = txn_flags & TXN_FLAG_COMMIT;
> + if ((txn_flags & ~TXN_FLAG_COMMIT) != 0)
> + /* Unknow flags. */
> + goto error;
We silently ignore unknown headers. I think we can silently ignore
unknown flags as well.
> + break;
> default:
> /* unknown header */
> mp_next(pos);
> }
> }
> assert(*pos <= end);
> + if (!txn_is_set) {
> + /*
> + * Transaction id is not set so it is a single statement
> + * transaction.
> + */
> + header->txn_commit = true;
> + }
> + header->txn_id = header->lsn + header->txn_id;
> +
> /* Nop requests aren't supposed to have a body. */
> if (*pos < end && header->type != IPROTO_NOP) {
> const char *body = *pos;
> @@ -223,6 +245,20 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
> d = mp_encode_double(d, header->tm);
> map_size++;
> }
> + if (header->txn_id != 0) {
> + if (header->txn_id != header->lsn || header->txn_commit == 0) {
Nit: txn_commit is a bool so
s/header->txn_commit == 0/!header->txn_commit
> + /* Encode txn id for multi row transaction members. */
> + d = mp_encode_uint(d, IPROTO_TXN_ID);
> + d = mp_encode_uint(d, header->lsn - header->txn_id);
> + map_size++;
> + }
> + if (header->txn_commit && header->txn_id != header->lsn) {
> + /* Setup last row for multi row transaction. */
> + d = mp_encode_uint(d, IPROTO_TXN_FLAGS);
> + d = mp_encode_uint(d, TXN_FLAG_COMMIT);
> + map_size++;
> + }
> + }
> assert(d <= data + XROW_HEADER_LEN_MAX);
> mp_encode_map(data, map_size);
> out->iov_len = d - (char *) out->iov_base;
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index 719add4f0..bc4c4a2d7 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -47,7 +47,7 @@ enum {
> XROW_HEADER_IOVMAX = 1,
> XROW_BODY_IOVMAX = 2,
> XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
> - XROW_HEADER_LEN_MAX = 40,
> + XROW_HEADER_LEN_MAX = 52,
> XROW_BODY_LEN_MAX = 128,
> IPROTO_HEADER_LEN = 28,
> /** 7 = sizeof(iproto_body_bin). */
> @@ -63,6 +63,8 @@ struct xrow_header {
> uint64_t sync;
> int64_t lsn; /* LSN must be signed for correct comparison */
> double tm;
> + int64_t txn_id;
> + bool txn_commit;
Please add a comment explaining how these are mapped to IPROTO headers.
>
> int bodycnt;
> uint32_t schema_version;
> diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
> index 022d1f998..bc99285de 100644
> --- a/test/unit/xrow.cc
> +++ b/test/unit/xrow.cc
> @@ -215,6 +215,8 @@ test_xrow_header_encode_decode()
> header.lsn = 400;
> header.tm = 123.456;
> header.bodycnt = 0;
> + header.txn_id = header.lsn;
> + header.txn_commit = true;
> uint64_t sync = 100500;
> struct iovec vec[1];
> is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
More information about the Tarantool-patches
mailing list