On Tuesday, January 29, 2019 2:00:10 PM MSK Vladimir Davydov wrote: > On Tue, Jan 29, 2019 at 01:09:50PM +0300, Георгий Кириченко wrote: > > On Monday, January 28, 2019 3:58:59 PM MSK Vladimir Davydov wrote: > > > On Tue, Jan 22, 2019 at 01:57:36PM +0300, Georgy Kirichenko wrote: > > > > Append txn_id, txn_replica_id and txn_last to xrow_header structure. > > > > txn_replica_id identifies replica where transaction was started and > > > > txn_id identifies transaction id on that replica. As transaction id > > > > a lsn of the first row in this transaction is used. > > > > txn_last set to true if it is the last row in a transaction, so we > > > > could commit transaction with last row or use additional NOP requests > > > > with txn_last = true ans valid txn_id and txn_replica_id. > > > > For replication all local changes moved to xrows array tail to form > > > > a separate transaction (like autonomous transaction) because it is not > > > > possible to replicate such transaction back to it's creator. > > > > > > > > As encoding/deconding rules assumed: > > > > 1. txn_replica_id is encoded only if it is not equal with replica > > > > > > > > id. This might have point because of replication trigger > > > > > > > > 2. txn_id and txn_last are encoded only for multi-row transaction. > > > > > > > > So if we do not have txn_id in a xstream then this means that it > > > > is a single row transaction. > > > > > > > > This rules provides compatibility with previous xlog handling. > > > > > > > > Needed for: 2798 > > > > --- > > > > > > > > src/box/iproto_constants.h | 3 ++ > > > > > > Looks like you forgot to update iproto_constants.c. > > > > > > > src/box/wal.c | 36 +++++++++++++++- > > > > src/box/xrow.c | 38 +++++++++++++++++ > > > > src/box/xrow.h | 5 ++- > > > > test/unit/xrow.cc | 3 ++ > > > > test/vinyl/errinj_stat.result | 8 ++-- > > > > test/vinyl/layout.result | 24 +++++------ > > > > test/vinyl/stat.result | 78 > > > > +++++++++++++++++------------------ > > > > 8 files changed, 138 insertions(+), 57 deletions(-) > > > > > > > > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h > > > > index 728514297..d01cdf840 100644 > > > > --- a/src/box/iproto_constants.h > > > > +++ b/src/box/iproto_constants.h > > > > @@ -60,6 +60,9 @@ enum iproto_key { > > > > > > > > IPROTO_SCHEMA_VERSION = 0x05, > > > > IPROTO_SERVER_VERSION = 0x06, > > > > IPROTO_GROUP_ID = 0x07, > > > > > > > > + IPROTO_TXN_ID = 0x08, > > > > + IPROTO_TXN_REPLICA_ID = 0x09, > > > > > > Do we really need to introduce both TXN_ID and TXN_REPLICA_ID? I would > > > expect TXN_ID to be enough - we could use REPLICA_ID to make sure that > > > transaction identifiers from different instances don't overlap, because > > > there couldn't be a "multi-instance" transaction, could there? txn_replica_id might be differ from replica_id in case when transaction would be finished on other node, e.g. after recovery. > > > > > > > + IPROTO_TXN_LAST = 0x0a, > > > > > > I think we should instead introduce BEGIN and COMMIT commands, because: > > I completely do not like any auto-commit logic in a xlog file. You > > suggestion breaks backward compatibility because previous logs do not > > have any BEGIN/ > It wouldn't break backward compatibility. It might break forward > compatibility, which is fine by me (we do it all the time). Suggested xrow encoding/decoding rules means that any xrow without txn_id, txn_replica_id, txn_last should be processed as a single statement transaction as it was before. If we would require explicit begin/commit then previous logs turns into an invalid stream without autocommit semantic. But I think, that txn_last should be renamed into txn_commit. Also explicit begin operation is redundant because a new one pair txn_id/ txn_replica_id already means begin of an transaction. > > > COMMIT. Also separate BEGIN and COMMIT messages increase transaction size. > > I doubt that after compression you'll see a difference. replication stream does not have any compression. > > > It is worth noting, that IPROTO_NOP with a new txn_id or txn_last emulates > > BEGIN or COMMIT. > > Yeah, but that would look weird. > > > > - We might need to attach some extra information to each transaction, > > > > > > e.g. mark transactions that were committed in parallel on the master > > > so that they can be committed in parallel on a replica. Attaching > > > such information to each row would be excessive. > > > > The patch is not about this. > > But we have to think about that in advance, don't we? We do not have clear view how it should be done. > > > > - We will need BEGIN and COMMIT for IPROTO transactions. It would be > > > > > > nice if we could share the code with them. > > > > The biggest issue we could not know transaction identifier in case of > > IPROTO. Iproto is single stream proto, but wal might be not as it is > > multiplexing a lot of transactions in a one output, so it might be bad be > > in paradigm of universally format for both IPROTO and WAL. > > OK. I think we need to discuss the options with Kostja. > > > > - Having separate BEGIN/COMMIT rows would make tarantoolctl-cat output > > > > > > easier to read. > > > > > > > /* Leave a gap for other keys in the header. */ > > > > IPROTO_SPACE_ID = 0x10, > > > > IPROTO_INDEX_ID = 0x11, > > > > > > > > diff --git a/src/box/wal.c b/src/box/wal.c > > > > index 4c3537672..17ead08e7 100644 > > > > --- a/src/box/wal.c > > > > +++ b/src/box/wal.c > > > > @@ -905,9 +905,10 @@ wal_writer_begin_rollback(struct wal_writer > > > > *writer) > > > > > > > > } > > > > > > > > static void > > > > > > > > -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, > > > > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin, > > > > > > > > struct xrow_header **end) > > > > > > > > { > > > > > > > > + struct xrow_header **row = begin; > > > > > > > > /** Assign LSN to all local rows. */ > > > > for ( ; row < end; row++) { > > > > > > > > if ((*row)->replica_id == 0) { > > > > > > > > @@ -917,6 +918,39 @@ wal_assign_lsn(struct vclock *vclock, struct > > > > xrow_header **row,> > > > > > > > > vclock_follow_xrow(vclock, *row); > > > > > > > > } > > > > > > > > } > > > > > > > > + if ((*begin)->replica_id != instance_id) { > > > > + /* > > > > + * Move all local changes to the end of rows array and > > > > + * a fake local transaction (like an autonomous transaction) > > > > + * because we could not replicate the transaction back. > > > > + */ > > > > + struct xrow_header **row = end - 1; > > > > + while (row >= begin) { > > > > + if (row[0]->replica_id != instance_id) { > > > > + --row; > > > > + continue; > > > > + } > > > > + /* Local row, move it back. */ > > > > + struct xrow_header **local_row = row; > > > > + while (local_row < end - 1 && > > > > + local_row[1]->replica_id != instance_id) { > > > > + struct xrow_header *tmp = local_row[0]; > > > > + local_row[0] = local_row[1]; > > > > + local_row[1] = tmp; > > > > > > There's SWAP for this. > > > > > > > + } > > > > + --row; > > > > + } > > > > + while (begin < end && begin[0]->replica_id != instance_id) > > > > + ++begin; > > > > + } > > > > > > I don't understand why we need to move rows generated locally (by > > > an on_replace trigger I surmise) to the end of a transaction. We > > > have TXN_ID attached to each row so we could leave the transactions > > > interleaved, couldn't we? > > > > You are right, but in that case applier should track a lot of transaction > > simultaneously. Also it complicates recovery too. I hope it will be fixed > > while parallel applier implementing. > > May be, we should implement parallel applier in the scope of this issue > then? Anyway, without it, sync replication won't scale with the number > of parallel transactions. I do not think so. Parallel applier depends on that feature, but it is completely different issue. Also parallel applier is ignorant on a xlog format. > > > > > + /* Setup txn_id and tnx_replica_id for localy generated rows. */ > > > > + row = begin; > > > > + while (row < end) { > > > > + row[0]->txn_id = begin[0]->lsn; > > > > + row[0]->txn_replica_id = instance_id; > > > > + row[0]->txn_last = row == end - 1 ? 1 : 0; > > > > + ++row; > > > > + } > > > > > > I think we better use txn->id for TXN_ID rather than LSN. > > > Why do you think LSN should be used? I don't see any rationale > > > for that anywhere in the comments. Also, setting TXN_ID looks > > > like a job that should be done by txn_add_redo... > > > > > > > } > > > > > > > > static void > > > > > > > > diff --git a/src/box/xrow.c b/src/box/xrow.c > > > > index ef3f81add..db524b3c8 100644 > > > > --- a/src/box/xrow.c > > > > +++ b/src/box/xrow.c > > > > > > > > @@ -133,12 +133,32 @@ error: > > > > case IPROTO_SCHEMA_VERSION: > > > > header->schema_version = mp_decode_uint(pos); > > > > break; > > > > > > > > + case IPROTO_TXN_ID: > > > > + header->txn_id = mp_decode_uint(pos); > > > > + break; > > > > + case IPROTO_TXN_REPLICA_ID: > > > > + header->txn_replica_id = mp_decode_uint(pos); > > > > + break; > > > > + case IPROTO_TXN_LAST: > > > > + header->txn_last = mp_decode_uint(pos); > > > > > > Should be bool? > > > > > > > + break; > > > > > > > > default: > > > > /* unknown header */ > > > > mp_next(pos); > > > > > > > > } > > > > > > > > } > > > > assert(*pos <= end); > > > > > > > > + if (header->txn_id == 0) { > > > > + /* > > > > + * Transaction id is not set so it is a single statement > > > > + * transaction. > > > > + */ > > > > + header->txn_id = header->lsn; > > > > + header->txn_last = true; > > > > + } > > > > + if (header->txn_replica_id == 0) > > > > + header->txn_replica_id = header->replica_id; > > > > + > > > > > > > > /* Nop requests aren't supposed to have a body. */ > > > > if (*pos < end && header->type != IPROTO_NOP) { > > > > > > > > const char *body = *pos; > > > > > > > > @@ -223,6 +243,24 @@ xrow_header_encode(const struct xrow_header > > > > *header, > > > > uint64_t sync,> > > > > > > > > d = mp_encode_double(d, header->tm); > > > > map_size++; > > > > > > > > } > > > > > > > > + if (header->txn_id != header->lsn || header->txn_last == 0) { > > > > + /* Encode txn id for multi row transaction members. */ > > > > + d = mp_encode_uint(d, IPROTO_TXN_ID); > > > > + d = mp_encode_uint(d, header->txn_id); > > > > + map_size++; > > > > + } > > > > + if (header->txn_replica_id != header->replica_id) { > > > > + d = mp_encode_uint(d, IPROTO_TXN_REPLICA_ID); > > > > + d = mp_encode_uint(d, header->txn_replica_id); > > > > + map_size++; > > > > + } > > > > + if (header->txn_last && !(header->txn_id == header->lsn && > > > > + header->txn_replica_id == header->replica_id)) > > > > { > > > > > > + /* Set last row for multi row transaction. */ > > > > + d = mp_encode_uint(d, IPROTO_TXN_LAST); > > > > + d = mp_encode_uint(d, header->txn_last); > > > > + map_size++; > > > > + } > > > > > > > > assert(d <= data + XROW_HEADER_LEN_MAX); > > > > mp_encode_map(data, map_size); > > > > out->iov_len = d - (char *) out->iov_base; > > > > > > > > diff --git a/src/box/xrow.h b/src/box/xrow.h > > > > index 6bab0a1fd..4acd84d56 100644 > > > > --- a/src/box/xrow.h > > > > +++ b/src/box/xrow.h > > > > @@ -47,7 +47,7 @@ enum { > > > > > > > > XROW_HEADER_IOVMAX = 1, > > > > XROW_BODY_IOVMAX = 2, > > > > XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, > > > > > > > > - XROW_HEADER_LEN_MAX = 40, > > > > + XROW_HEADER_LEN_MAX = 60, > > > > > > > > XROW_BODY_LEN_MAX = 128, > > > > IPROTO_HEADER_LEN = 28, > > > > /** 7 = sizeof(iproto_body_bin). */ > > > > > > > > @@ -69,6 +69,9 @@ struct xrow_header { > > > > > > > > uint64_t sync; > > > > int64_t lsn; /* LSN must be signed for correct comparison */ > > > > double tm; > > > > > > > > + int64_t txn_id; > > > > + uint32_t txn_replica_id; > > > > + uint32_t txn_last; > > > > > > > > int bodycnt; > > > > uint32_t schema_version; > > > > > > > > diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc > > > > index 165a543cf..0b796d728 100644 > > > > --- a/test/unit/xrow.cc > > > > +++ b/test/unit/xrow.cc > > > > @@ -215,6 +215,9 @@ test_xrow_header_encode_decode() > > > > > > > > header.lsn = 400; > > > > header.tm = 123.456; > > > > header.bodycnt = 0; > > > > > > > > + header.txn_id = header.lsn; > > > > + header.txn_replica_id = header.replica_id; > > > > + header.txn_last = true; > > > > > > > > uint64_t sync = 100500; > > > > struct iovec vec[1]; > > > > is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); > > > > > > > > diff --git a/test/vinyl/errinj_stat.result > > > > b/test/vinyl/errinj_stat.result > > > > index 08801dbc6..361ddf5db 100644 > > > > --- a/test/vinyl/errinj_stat.result > > > > +++ b/test/vinyl/errinj_stat.result > > > > @@ -69,7 +69,7 @@ i:stat().disk.compaction.queue -- 30 statements > > > > > > > > - bytes_compressed: > > > > > > > > pages: 3 > > > > rows: 30 > > > > > > > > - bytes: 411 > > > > + bytes: 477 > > > > > > > > ... > > > > i:stat().disk.compaction.queue.bytes == > > > > box.stat.vinyl().scheduler.compaction_queue --- > > > > > > > > @@ -83,7 +83,7 @@ i:stat().disk.compaction.queue -- 40 statements > > > > > > > > - bytes_compressed: > > > > > > > > pages: 4 > > > > rows: 40 > > > > > > > > - bytes: 548 > > > > + bytes: 636 > > > > > > I wouldn't expect vinyl stats to be changed by this patch. > > > Why did it happen? > > > > Because if rows were written in an one entry, then wal creates > > transaction. > > But those are vinyl files (run, index). They shouldn't be affected by > this, should they? vinyl uses xlog_write_entry that forms a transaction. I do not think that introducing a new version of xlog_write_entry with/without transaction is a good idea.