From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Fri, 15 Feb 2019 16:15:41 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries Message-ID: <20190215131541.u64sjmsrjkkxqixy@esperanza> References: MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: To: Georgy Kirichenko Cc: tarantool-patches@freelists.org List-ID: On Tue, Feb 12, 2019 at 11:04:31PM +0300, Georgy Kirichenko wrote: > Append txn_id and txn_commit to xrow_header structure, txn_id identifies > transaction id on replica where transaction was started. As transaction id > a lsn of the first row in the transaction is used. txn_commit is set to true > if it is the last row in a transaction, so we could commit transaction by the > last row or by additional NOP requests with txn_commit set as well as > start transaction with NOP and corresponding txn_id. In case of replication > all local changes moved to an journal entry tail to form a separate transaction > (like autonomous transaction) to be able to replicate changes back. > > As encoding/deconding rule assumed: > * txn_id and txn_commit are encoded only for multi-row transactions. > So if we do not have txn_id while row decoding then this means that it > is a single row transaction. > * TXN_ID field is differential encoded as lsn - txn_id value > * txn_commit packed into TXN_FLAGS field > > These rules provide compatibility with previous xlog format as well > as good compaction level. > > Needed for: 2798 > --- > src/box/iproto_constants.c | 4 ++-- > src/box/iproto_constants.h | 7 +++++++ > src/box/txn.c | 21 +++++++++++++++++---- > src/box/txn.h | 2 ++ > src/box/wal.c | 28 ++++++++++++++++++++++++---- > src/box/xrow.c | 36 ++++++++++++++++++++++++++++++++++++ > src/box/xrow.h | 4 +++- > test/unit/xrow.cc | 2 ++ > 8 files changed, 93 insertions(+), 11 deletions(-) > > diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c > index 7fd295775..4d2e21752 100644 > --- a/src/box/iproto_constants.c > +++ b/src/box/iproto_constants.c > @@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] = > /* 0x05 */ MP_UINT, /* IPROTO_SCHEMA_VERSION */ > /* 0x06 */ MP_UINT, /* IPROTO_SERVER_VERSION */ > /* 0x07 */ MP_UINT, /* IPROTO_GROUP_ID */ > + /* 0x08 */ MP_UINT, /* IPROTO_TXN_ID */ > + /* 0x09 */ MP_UINT, /* IPROTO_TXN_FLAGS */ I don't quite like the name, because we encode not a txn id, but the statement number in a txn. Let's rename it to IPROTO_TXN_STMT_NO or IPROTO_TXN_STMT_ID or IPROTO_TXN_STMT_SEQ? xrow_header::txn_id is OK though. > /* }}} */ > > /* {{{ unused */ > - /* 0x08 */ MP_UINT, > - /* 0x09 */ MP_UINT, > /* 0x0a */ MP_UINT, > /* 0x0b */ MP_UINT, > /* 0x0c */ MP_UINT, You forgot to patch iproto_key_strs. Let's please add a test that ensures that those new headers are written to xlogs. You could use xlog reader for that. > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h > index 728514297..fd80e3111 100644 > --- a/src/box/iproto_constants.h > +++ b/src/box/iproto_constants.h > @@ -49,6 +49,11 @@ enum { > XLOG_FIXHEADER_SIZE = 19 > }; > /* IPROTO_TXN_FLAGS bits. */ > +enum { > + /** Set for the last xrow in a transaction. */ > + TXN_FLAG_COMMIT = 0x01, IPROTO_TXN_COMMIT? > +}; > + > enum iproto_key { > IPROTO_REQUEST_TYPE = 0x00, > IPROTO_SYNC = 0x01, > @@ -60,6 +65,8 @@ enum iproto_key { > IPROTO_SCHEMA_VERSION = 0x05, > IPROTO_SERVER_VERSION = 0x06, > IPROTO_GROUP_ID = 0x07, > + IPROTO_TXN_ID = 0x08, > + IPROTO_TXN_FLAGS = 0x09, > /* Leave a gap for other keys in the header. */ > IPROTO_SPACE_ID = 0x10, > IPROTO_INDEX_ID = 0x11, > diff --git a/src/box/txn.c b/src/box/txn.c > index 7f4e85b47..de0152706 100644 > --- a/src/box/txn.c > +++ b/src/box/txn.c > @@ -34,6 +34,7 @@ > #include "journal.h" > #include > #include "xrow.h" > +#include "replication.h" > > double too_long_threshold; > > @@ -150,6 +151,7 @@ txn_begin(bool is_autocommit) > txn->engine = NULL; > txn->engine_tx = NULL; > txn->psql_txn = NULL; > + txn->remote_row_count = 0; Nit: let's rename it to n_remote_rows to match n_rows and keep the two member initializers together. > /* fiber_on_yield/fiber_on_stop initialized by engine on demand */ > fiber_set_txn(fiber(), txn); > return txn; > @@ -231,6 +233,10 @@ txn_commit_stmt(struct txn *txn, struct request *request) > * stmt->space can be NULL for IRPOTO_NOP. > */ > if (stmt->space == NULL || !space_is_temporary(stmt->space)) { > + if (request->header && > + request->header->replica_id != instance_id && > + request->header->replica_id != 0) > + ++txn->remote_row_count; Nit: if we moved this after txn_add_redo(), then we wouldn't have to check if request->header is set. > if (txn_add_redo(stmt, request) != 0) > goto fail; > ++txn->n_rows; > @@ -271,14 +277,21 @@ txn_write_to_wal(struct txn *txn) > return -1; > > struct txn_stmt *stmt; > - struct xrow_header **row = req->rows; > + struct xrow_header **remote_row = req->rows; > + struct xrow_header **local_row = req->rows + txn->remote_row_count; > stailq_foreach_entry(stmt, &txn->stmts, next) { > if (stmt->row == NULL) > - continue; /* A read (e.g. select) request */ > - *row++ = stmt->row; > + /* A read (e.g. select) request */ > + continue; Nit: pointless change, please remove. > + if (stmt->row->replica_id == instance_id || > + stmt->row->replica_id == 0) > + *local_row++ = stmt->row; > + else > + *remote_row++ = stmt->row; This piece of code looks nice, but it definitely needs a comment: what we do, why we do that... Anyway, we should add a test for this change. May be, it's even worth submitting this change in a separate patch. > req->approx_len += xrow_approx_len(stmt->row); > } > - assert(row == req->rows + req->n_rows); > + assert(remote_row == req->rows + txn->remote_row_count); > + assert(local_row == req->rows + req->n_rows); > > ev_tstamp start = ev_monotonic_now(loop()); > int64_t res = journal_write(req); > diff --git a/src/box/txn.h b/src/box/txn.h > index de5cb0de4..143f21715 100644 > --- a/src/box/txn.h > +++ b/src/box/txn.h > @@ -180,6 +180,8 @@ struct txn { > /** Commit and rollback triggers */ > struct rlist on_commit, on_rollback; > struct sql_txn *psql_txn; > + /** Count of remote rows. */ > + uint32_t remote_row_count; Nit: please use int rather than uint32_t and move the definition after n_rows, because those are closely related. > }; > > /* Pointer to the current transaction (if any) */ > diff --git a/src/box/wal.c b/src/box/wal.c > index cdcaabc00..0ea3be68d 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -896,10 +896,11 @@ wal_writer_begin_rollback(struct wal_writer *writer) > cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback); > } > > -static void > -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, > +static int > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin, > struct xrow_header **end) > { > + struct xrow_header **row = begin; > /** Assign LSN to all local rows. */ > for ( ; row < end; row++) { > if ((*row)->replica_id == 0) { > @@ -909,6 +910,21 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, > vclock_follow_xrow(vclock, *row); > } > } > + while (begin < end && begin[0]->replica_id != instance_id) > + ++begin; > + /* Setup txn_id and tnx_replica_id for locally generated rows. */ > + row = begin; > + while (row < end) { > + if (row[0]->replica_id != instance_id) { > + diag_set(ClientError, ER_UNSUPPORTED, > + "Interleaved transactions"); > + return -1; Do we really need to bother about it here, in WAL? IMO a check in applier would be enough. > + } > + row[0]->txn_id = begin[0]->lsn; > + row[0]->txn_commit = row == end - 1 ? 1 : 0; > + ++row; Why can't we do this while we are iterating over rows just a few lines above, assigning LSNs? > + } > + return 0; > } > > static void > @@ -979,7 +995,9 @@ wal_write_to_disk(struct cmsg *msg) > struct journal_entry *entry; > struct stailq_entry *last_committed = NULL; > stailq_foreach_entry(entry, &wal_msg->commit, fifo) { > - wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows); > + if (wal_assign_lsn(&vclock, entry->rows, > + entry->rows + entry->n_rows) < 0) > + goto done; > entry->res = vclock_sum(&vclock); > rc = xlog_write_entry(l, entry); > if (rc < 0) > @@ -1173,7 +1191,9 @@ wal_write_in_wal_mode_none(struct journal *journal, > struct journal_entry *entry) > { > struct wal_writer *writer = (struct wal_writer *) journal; > - wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows); > + if (wal_assign_lsn(&writer->vclock, entry->rows, > + entry->rows + entry->n_rows) != 0) > + return -1; > vclock_copy(&replicaset.vclock, &writer->vclock); > return vclock_sum(&writer->vclock); > } > diff --git a/src/box/xrow.c b/src/box/xrow.c > index fec8873d0..29fa75de4 100644 > --- a/src/box/xrow.c > +++ b/src/box/xrow.c > @@ -102,6 +102,8 @@ error: > > if (mp_typeof(**pos) != MP_MAP) > goto error; > + bool txn_is_set = false; > + uint32_t txn_flags = 0; > > uint32_t size = mp_decode_map(pos); > for (uint32_t i = 0; i < size; i++) { > @@ -133,12 +135,32 @@ error: > case IPROTO_SCHEMA_VERSION: > header->schema_version = mp_decode_uint(pos); > break; > + case IPROTO_TXN_ID: > + txn_is_set = true; > + header->txn_id = mp_decode_uint(pos); > + break; > + case IPROTO_TXN_FLAGS: > + txn_flags = mp_decode_uint(pos); > + header->txn_commit = txn_flags & TXN_FLAG_COMMIT; > + if ((txn_flags & ~TXN_FLAG_COMMIT) != 0) > + /* Unknow flags. */ > + goto error; We silently ignore unknown headers. I think we can silently ignore unknown flags as well. > + break; > default: > /* unknown header */ > mp_next(pos); > } > } > assert(*pos <= end); > + if (!txn_is_set) { > + /* > + * Transaction id is not set so it is a single statement > + * transaction. > + */ > + header->txn_commit = true; > + } > + header->txn_id = header->lsn + header->txn_id; > + > /* Nop requests aren't supposed to have a body. */ > if (*pos < end && header->type != IPROTO_NOP) { > const char *body = *pos; > @@ -223,6 +245,20 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync, > d = mp_encode_double(d, header->tm); > map_size++; > } > + if (header->txn_id != 0) { > + if (header->txn_id != header->lsn || header->txn_commit == 0) { Nit: txn_commit is a bool so s/header->txn_commit == 0/!header->txn_commit > + /* Encode txn id for multi row transaction members. */ > + d = mp_encode_uint(d, IPROTO_TXN_ID); > + d = mp_encode_uint(d, header->lsn - header->txn_id); > + map_size++; > + } > + if (header->txn_commit && header->txn_id != header->lsn) { > + /* Setup last row for multi row transaction. */ > + d = mp_encode_uint(d, IPROTO_TXN_FLAGS); > + d = mp_encode_uint(d, TXN_FLAG_COMMIT); > + map_size++; > + } > + } > assert(d <= data + XROW_HEADER_LEN_MAX); > mp_encode_map(data, map_size); > out->iov_len = d - (char *) out->iov_base; > diff --git a/src/box/xrow.h b/src/box/xrow.h > index 719add4f0..bc4c4a2d7 100644 > --- a/src/box/xrow.h > +++ b/src/box/xrow.h > @@ -47,7 +47,7 @@ enum { > XROW_HEADER_IOVMAX = 1, > XROW_BODY_IOVMAX = 2, > XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, > - XROW_HEADER_LEN_MAX = 40, > + XROW_HEADER_LEN_MAX = 52, > XROW_BODY_LEN_MAX = 128, > IPROTO_HEADER_LEN = 28, > /** 7 = sizeof(iproto_body_bin). */ > @@ -63,6 +63,8 @@ struct xrow_header { > uint64_t sync; > int64_t lsn; /* LSN must be signed for correct comparison */ > double tm; > + int64_t txn_id; > + bool txn_commit; Please add a comment explaining how these are mapped to IPROTO headers. > > int bodycnt; > uint32_t schema_version; > diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc > index 022d1f998..bc99285de 100644 > --- a/test/unit/xrow.cc > +++ b/test/unit/xrow.cc > @@ -215,6 +215,8 @@ test_xrow_header_encode_decode() > header.lsn = 400; > header.tm = 123.456; > header.bodycnt = 0; > + header.txn_id = header.lsn; > + header.txn_commit = true; > uint64_t sync = 100500; > struct iovec vec[1]; > is(1, xrow_header_encode(&header, sync, vec, 200), "encode");