* [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries [not found] <cover.1550001848.git.georgy@tarantool.org> @ 2019-02-12 20:04 ` Georgy Kirichenko 2019-02-15 13:15 ` Vladimir Davydov 2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 2/2] Transaction support for applier Georgy Kirichenko 1 sibling, 1 reply; 6+ messages in thread From: Georgy Kirichenko @ 2019-02-12 20:04 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko Append txn_id and txn_commit to xrow_header structure, txn_id identifies transaction id on replica where transaction was started. As transaction id a lsn of the first row in the transaction is used. txn_commit is set to true if it is the last row in a transaction, so we could commit transaction by the last row or by additional NOP requests with txn_commit set as well as start transaction with NOP and corresponding txn_id. In case of replication all local changes moved to an journal entry tail to form a separate transaction (like autonomous transaction) to be able to replicate changes back. As encoding/deconding rule assumed: * txn_id and txn_commit are encoded only for multi-row transactions. So if we do not have txn_id while row decoding then this means that it is a single row transaction. * TXN_ID field is differential encoded as lsn - txn_id value * txn_commit packed into TXN_FLAGS field These rules provide compatibility with previous xlog format as well as good compaction level. Needed for: 2798 --- src/box/iproto_constants.c | 4 ++-- src/box/iproto_constants.h | 7 +++++++ src/box/txn.c | 21 +++++++++++++++++---- src/box/txn.h | 2 ++ src/box/wal.c | 28 ++++++++++++++++++++++++---- src/box/xrow.c | 36 ++++++++++++++++++++++++++++++++++++ src/box/xrow.h | 4 +++- test/unit/xrow.cc | 2 ++ 8 files changed, 93 insertions(+), 11 deletions(-) diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index 7fd295775..4d2e21752 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] = /* 0x05 */ MP_UINT, /* IPROTO_SCHEMA_VERSION */ /* 0x06 */ MP_UINT, /* IPROTO_SERVER_VERSION */ /* 0x07 */ MP_UINT, /* IPROTO_GROUP_ID */ + /* 0x08 */ MP_UINT, /* IPROTO_TXN_ID */ + /* 0x09 */ MP_UINT, /* IPROTO_TXN_FLAGS */ /* }}} */ /* {{{ unused */ - /* 0x08 */ MP_UINT, - /* 0x09 */ MP_UINT, /* 0x0a */ MP_UINT, /* 0x0b */ MP_UINT, /* 0x0c */ MP_UINT, diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 728514297..fd80e3111 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -49,6 +49,11 @@ enum { XLOG_FIXHEADER_SIZE = 19 }; +enum { + /** Set for the last xrow in a transaction. */ + TXN_FLAG_COMMIT = 0x01, +}; + enum iproto_key { IPROTO_REQUEST_TYPE = 0x00, IPROTO_SYNC = 0x01, @@ -60,6 +65,8 @@ enum iproto_key { IPROTO_SCHEMA_VERSION = 0x05, IPROTO_SERVER_VERSION = 0x06, IPROTO_GROUP_ID = 0x07, + IPROTO_TXN_ID = 0x08, + IPROTO_TXN_FLAGS = 0x09, /* Leave a gap for other keys in the header. */ IPROTO_SPACE_ID = 0x10, IPROTO_INDEX_ID = 0x11, diff --git a/src/box/txn.c b/src/box/txn.c index 7f4e85b47..de0152706 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -34,6 +34,7 @@ #include "journal.h" #include <fiber.h> #include "xrow.h" +#include "replication.h" double too_long_threshold; @@ -150,6 +151,7 @@ txn_begin(bool is_autocommit) txn->engine = NULL; txn->engine_tx = NULL; txn->psql_txn = NULL; + txn->remote_row_count = 0; /* fiber_on_yield/fiber_on_stop initialized by engine on demand */ fiber_set_txn(fiber(), txn); return txn; @@ -231,6 +233,10 @@ txn_commit_stmt(struct txn *txn, struct request *request) * stmt->space can be NULL for IRPOTO_NOP. */ if (stmt->space == NULL || !space_is_temporary(stmt->space)) { + if (request->header && + request->header->replica_id != instance_id && + request->header->replica_id != 0) + ++txn->remote_row_count; if (txn_add_redo(stmt, request) != 0) goto fail; ++txn->n_rows; @@ -271,14 +277,21 @@ txn_write_to_wal(struct txn *txn) return -1; struct txn_stmt *stmt; - struct xrow_header **row = req->rows; + struct xrow_header **remote_row = req->rows; + struct xrow_header **local_row = req->rows + txn->remote_row_count; stailq_foreach_entry(stmt, &txn->stmts, next) { if (stmt->row == NULL) - continue; /* A read (e.g. select) request */ - *row++ = stmt->row; + /* A read (e.g. select) request */ + continue; + if (stmt->row->replica_id == instance_id || + stmt->row->replica_id == 0) + *local_row++ = stmt->row; + else + *remote_row++ = stmt->row; req->approx_len += xrow_approx_len(stmt->row); } - assert(row == req->rows + req->n_rows); + assert(remote_row == req->rows + txn->remote_row_count); + assert(local_row == req->rows + req->n_rows); ev_tstamp start = ev_monotonic_now(loop()); int64_t res = journal_write(req); diff --git a/src/box/txn.h b/src/box/txn.h index de5cb0de4..143f21715 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -180,6 +180,8 @@ struct txn { /** Commit and rollback triggers */ struct rlist on_commit, on_rollback; struct sql_txn *psql_txn; + /** Count of remote rows. */ + uint32_t remote_row_count; }; /* Pointer to the current transaction (if any) */ diff --git a/src/box/wal.c b/src/box/wal.c index cdcaabc00..0ea3be68d 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -896,10 +896,11 @@ wal_writer_begin_rollback(struct wal_writer *writer) cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback); } -static void -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, +static int +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin, struct xrow_header **end) { + struct xrow_header **row = begin; /** Assign LSN to all local rows. */ for ( ; row < end; row++) { if ((*row)->replica_id == 0) { @@ -909,6 +910,21 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, vclock_follow_xrow(vclock, *row); } } + while (begin < end && begin[0]->replica_id != instance_id) + ++begin; + /* Setup txn_id and tnx_replica_id for locally generated rows. */ + row = begin; + while (row < end) { + if (row[0]->replica_id != instance_id) { + diag_set(ClientError, ER_UNSUPPORTED, + "Interleaved transactions"); + return -1; + } + row[0]->txn_id = begin[0]->lsn; + row[0]->txn_commit = row == end - 1 ? 1 : 0; + ++row; + } + return 0; } static void @@ -979,7 +995,9 @@ wal_write_to_disk(struct cmsg *msg) struct journal_entry *entry; struct stailq_entry *last_committed = NULL; stailq_foreach_entry(entry, &wal_msg->commit, fifo) { - wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows); + if (wal_assign_lsn(&vclock, entry->rows, + entry->rows + entry->n_rows) < 0) + goto done; entry->res = vclock_sum(&vclock); rc = xlog_write_entry(l, entry); if (rc < 0) @@ -1173,7 +1191,9 @@ wal_write_in_wal_mode_none(struct journal *journal, struct journal_entry *entry) { struct wal_writer *writer = (struct wal_writer *) journal; - wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows); + if (wal_assign_lsn(&writer->vclock, entry->rows, + entry->rows + entry->n_rows) != 0) + return -1; vclock_copy(&replicaset.vclock, &writer->vclock); return vclock_sum(&writer->vclock); } diff --git a/src/box/xrow.c b/src/box/xrow.c index fec8873d0..29fa75de4 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -102,6 +102,8 @@ error: if (mp_typeof(**pos) != MP_MAP) goto error; + bool txn_is_set = false; + uint32_t txn_flags = 0; uint32_t size = mp_decode_map(pos); for (uint32_t i = 0; i < size; i++) { @@ -133,12 +135,32 @@ error: case IPROTO_SCHEMA_VERSION: header->schema_version = mp_decode_uint(pos); break; + case IPROTO_TXN_ID: + txn_is_set = true; + header->txn_id = mp_decode_uint(pos); + break; + case IPROTO_TXN_FLAGS: + txn_flags = mp_decode_uint(pos); + header->txn_commit = txn_flags & TXN_FLAG_COMMIT; + if ((txn_flags & ~TXN_FLAG_COMMIT) != 0) + /* Unknow flags. */ + goto error; + break; default: /* unknown header */ mp_next(pos); } } assert(*pos <= end); + if (!txn_is_set) { + /* + * Transaction id is not set so it is a single statement + * transaction. + */ + header->txn_commit = true; + } + header->txn_id = header->lsn + header->txn_id; + /* Nop requests aren't supposed to have a body. */ if (*pos < end && header->type != IPROTO_NOP) { const char *body = *pos; @@ -223,6 +245,20 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync, d = mp_encode_double(d, header->tm); map_size++; } + if (header->txn_id != 0) { + if (header->txn_id != header->lsn || header->txn_commit == 0) { + /* Encode txn id for multi row transaction members. */ + d = mp_encode_uint(d, IPROTO_TXN_ID); + d = mp_encode_uint(d, header->lsn - header->txn_id); + map_size++; + } + if (header->txn_commit && header->txn_id != header->lsn) { + /* Setup last row for multi row transaction. */ + d = mp_encode_uint(d, IPROTO_TXN_FLAGS); + d = mp_encode_uint(d, TXN_FLAG_COMMIT); + map_size++; + } + } assert(d <= data + XROW_HEADER_LEN_MAX); mp_encode_map(data, map_size); out->iov_len = d - (char *) out->iov_base; diff --git a/src/box/xrow.h b/src/box/xrow.h index 719add4f0..bc4c4a2d7 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -47,7 +47,7 @@ enum { XROW_HEADER_IOVMAX = 1, XROW_BODY_IOVMAX = 2, XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, - XROW_HEADER_LEN_MAX = 40, + XROW_HEADER_LEN_MAX = 52, XROW_BODY_LEN_MAX = 128, IPROTO_HEADER_LEN = 28, /** 7 = sizeof(iproto_body_bin). */ @@ -63,6 +63,8 @@ struct xrow_header { uint64_t sync; int64_t lsn; /* LSN must be signed for correct comparison */ double tm; + int64_t txn_id; + bool txn_commit; int bodycnt; uint32_t schema_version; diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc index 022d1f998..bc99285de 100644 --- a/test/unit/xrow.cc +++ b/test/unit/xrow.cc @@ -215,6 +215,8 @@ test_xrow_header_encode_decode() header.lsn = 400; header.tm = 123.456; header.bodycnt = 0; + header.txn_id = header.lsn; + header.txn_commit = true; uint64_t sync = 100500; struct iovec vec[1]; is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); -- 2.20.1 ^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries 2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries Georgy Kirichenko @ 2019-02-15 13:15 ` Vladimir Davydov 2019-02-19 14:59 ` [tarantool-patches] " Konstantin Osipov 0 siblings, 1 reply; 6+ messages in thread From: Vladimir Davydov @ 2019-02-15 13:15 UTC (permalink / raw) To: Georgy Kirichenko; +Cc: tarantool-patches On Tue, Feb 12, 2019 at 11:04:31PM +0300, Georgy Kirichenko wrote: > Append txn_id and txn_commit to xrow_header structure, txn_id identifies > transaction id on replica where transaction was started. As transaction id > a lsn of the first row in the transaction is used. txn_commit is set to true > if it is the last row in a transaction, so we could commit transaction by the > last row or by additional NOP requests with txn_commit set as well as > start transaction with NOP and corresponding txn_id. In case of replication > all local changes moved to an journal entry tail to form a separate transaction > (like autonomous transaction) to be able to replicate changes back. > > As encoding/deconding rule assumed: > * txn_id and txn_commit are encoded only for multi-row transactions. > So if we do not have txn_id while row decoding then this means that it > is a single row transaction. > * TXN_ID field is differential encoded as lsn - txn_id value > * txn_commit packed into TXN_FLAGS field > > These rules provide compatibility with previous xlog format as well > as good compaction level. > > Needed for: 2798 > --- > src/box/iproto_constants.c | 4 ++-- > src/box/iproto_constants.h | 7 +++++++ > src/box/txn.c | 21 +++++++++++++++++---- > src/box/txn.h | 2 ++ > src/box/wal.c | 28 ++++++++++++++++++++++++---- > src/box/xrow.c | 36 ++++++++++++++++++++++++++++++++++++ > src/box/xrow.h | 4 +++- > test/unit/xrow.cc | 2 ++ > 8 files changed, 93 insertions(+), 11 deletions(-) > > diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c > index 7fd295775..4d2e21752 100644 > --- a/src/box/iproto_constants.c > +++ b/src/box/iproto_constants.c > @@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] = > /* 0x05 */ MP_UINT, /* IPROTO_SCHEMA_VERSION */ > /* 0x06 */ MP_UINT, /* IPROTO_SERVER_VERSION */ > /* 0x07 */ MP_UINT, /* IPROTO_GROUP_ID */ > + /* 0x08 */ MP_UINT, /* IPROTO_TXN_ID */ > + /* 0x09 */ MP_UINT, /* IPROTO_TXN_FLAGS */ I don't quite like the name, because we encode not a txn id, but the statement number in a txn. Let's rename it to IPROTO_TXN_STMT_NO or IPROTO_TXN_STMT_ID or IPROTO_TXN_STMT_SEQ? xrow_header::txn_id is OK though. > /* }}} */ > > /* {{{ unused */ > - /* 0x08 */ MP_UINT, > - /* 0x09 */ MP_UINT, > /* 0x0a */ MP_UINT, > /* 0x0b */ MP_UINT, > /* 0x0c */ MP_UINT, You forgot to patch iproto_key_strs. Let's please add a test that ensures that those new headers are written to xlogs. You could use xlog reader for that. > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h > index 728514297..fd80e3111 100644 > --- a/src/box/iproto_constants.h > +++ b/src/box/iproto_constants.h > @@ -49,6 +49,11 @@ enum { > XLOG_FIXHEADER_SIZE = 19 > }; > /* IPROTO_TXN_FLAGS bits. */ > +enum { > + /** Set for the last xrow in a transaction. */ > + TXN_FLAG_COMMIT = 0x01, IPROTO_TXN_COMMIT? > +}; > + > enum iproto_key { > IPROTO_REQUEST_TYPE = 0x00, > IPROTO_SYNC = 0x01, > @@ -60,6 +65,8 @@ enum iproto_key { > IPROTO_SCHEMA_VERSION = 0x05, > IPROTO_SERVER_VERSION = 0x06, > IPROTO_GROUP_ID = 0x07, > + IPROTO_TXN_ID = 0x08, > + IPROTO_TXN_FLAGS = 0x09, > /* Leave a gap for other keys in the header. */ > IPROTO_SPACE_ID = 0x10, > IPROTO_INDEX_ID = 0x11, > diff --git a/src/box/txn.c b/src/box/txn.c > index 7f4e85b47..de0152706 100644 > --- a/src/box/txn.c > +++ b/src/box/txn.c > @@ -34,6 +34,7 @@ > #include "journal.h" > #include <fiber.h> > #include "xrow.h" > +#include "replication.h" > > double too_long_threshold; > > @@ -150,6 +151,7 @@ txn_begin(bool is_autocommit) > txn->engine = NULL; > txn->engine_tx = NULL; > txn->psql_txn = NULL; > + txn->remote_row_count = 0; Nit: let's rename it to n_remote_rows to match n_rows and keep the two member initializers together. > /* fiber_on_yield/fiber_on_stop initialized by engine on demand */ > fiber_set_txn(fiber(), txn); > return txn; > @@ -231,6 +233,10 @@ txn_commit_stmt(struct txn *txn, struct request *request) > * stmt->space can be NULL for IRPOTO_NOP. > */ > if (stmt->space == NULL || !space_is_temporary(stmt->space)) { > + if (request->header && > + request->header->replica_id != instance_id && > + request->header->replica_id != 0) > + ++txn->remote_row_count; Nit: if we moved this after txn_add_redo(), then we wouldn't have to check if request->header is set. > if (txn_add_redo(stmt, request) != 0) > goto fail; > ++txn->n_rows; > @@ -271,14 +277,21 @@ txn_write_to_wal(struct txn *txn) > return -1; > > struct txn_stmt *stmt; > - struct xrow_header **row = req->rows; > + struct xrow_header **remote_row = req->rows; > + struct xrow_header **local_row = req->rows + txn->remote_row_count; > stailq_foreach_entry(stmt, &txn->stmts, next) { > if (stmt->row == NULL) > - continue; /* A read (e.g. select) request */ > - *row++ = stmt->row; > + /* A read (e.g. select) request */ > + continue; Nit: pointless change, please remove. > + if (stmt->row->replica_id == instance_id || > + stmt->row->replica_id == 0) > + *local_row++ = stmt->row; > + else > + *remote_row++ = stmt->row; This piece of code looks nice, but it definitely needs a comment: what we do, why we do that... Anyway, we should add a test for this change. May be, it's even worth submitting this change in a separate patch. > req->approx_len += xrow_approx_len(stmt->row); > } > - assert(row == req->rows + req->n_rows); > + assert(remote_row == req->rows + txn->remote_row_count); > + assert(local_row == req->rows + req->n_rows); > > ev_tstamp start = ev_monotonic_now(loop()); > int64_t res = journal_write(req); > diff --git a/src/box/txn.h b/src/box/txn.h > index de5cb0de4..143f21715 100644 > --- a/src/box/txn.h > +++ b/src/box/txn.h > @@ -180,6 +180,8 @@ struct txn { > /** Commit and rollback triggers */ > struct rlist on_commit, on_rollback; > struct sql_txn *psql_txn; > + /** Count of remote rows. */ > + uint32_t remote_row_count; Nit: please use int rather than uint32_t and move the definition after n_rows, because those are closely related. > }; > > /* Pointer to the current transaction (if any) */ > diff --git a/src/box/wal.c b/src/box/wal.c > index cdcaabc00..0ea3be68d 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -896,10 +896,11 @@ wal_writer_begin_rollback(struct wal_writer *writer) > cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback); > } > > -static void > -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, > +static int > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin, > struct xrow_header **end) > { > + struct xrow_header **row = begin; > /** Assign LSN to all local rows. */ > for ( ; row < end; row++) { > if ((*row)->replica_id == 0) { > @@ -909,6 +910,21 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row, > vclock_follow_xrow(vclock, *row); > } > } > + while (begin < end && begin[0]->replica_id != instance_id) > + ++begin; > + /* Setup txn_id and tnx_replica_id for locally generated rows. */ > + row = begin; > + while (row < end) { > + if (row[0]->replica_id != instance_id) { > + diag_set(ClientError, ER_UNSUPPORTED, > + "Interleaved transactions"); > + return -1; Do we really need to bother about it here, in WAL? IMO a check in applier would be enough. > + } > + row[0]->txn_id = begin[0]->lsn; > + row[0]->txn_commit = row == end - 1 ? 1 : 0; > + ++row; Why can't we do this while we are iterating over rows just a few lines above, assigning LSNs? > + } > + return 0; > } > > static void > @@ -979,7 +995,9 @@ wal_write_to_disk(struct cmsg *msg) > struct journal_entry *entry; > struct stailq_entry *last_committed = NULL; > stailq_foreach_entry(entry, &wal_msg->commit, fifo) { > - wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows); > + if (wal_assign_lsn(&vclock, entry->rows, > + entry->rows + entry->n_rows) < 0) > + goto done; > entry->res = vclock_sum(&vclock); > rc = xlog_write_entry(l, entry); > if (rc < 0) > @@ -1173,7 +1191,9 @@ wal_write_in_wal_mode_none(struct journal *journal, > struct journal_entry *entry) > { > struct wal_writer *writer = (struct wal_writer *) journal; > - wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows); > + if (wal_assign_lsn(&writer->vclock, entry->rows, > + entry->rows + entry->n_rows) != 0) > + return -1; > vclock_copy(&replicaset.vclock, &writer->vclock); > return vclock_sum(&writer->vclock); > } > diff --git a/src/box/xrow.c b/src/box/xrow.c > index fec8873d0..29fa75de4 100644 > --- a/src/box/xrow.c > +++ b/src/box/xrow.c > @@ -102,6 +102,8 @@ error: > > if (mp_typeof(**pos) != MP_MAP) > goto error; > + bool txn_is_set = false; > + uint32_t txn_flags = 0; > > uint32_t size = mp_decode_map(pos); > for (uint32_t i = 0; i < size; i++) { > @@ -133,12 +135,32 @@ error: > case IPROTO_SCHEMA_VERSION: > header->schema_version = mp_decode_uint(pos); > break; > + case IPROTO_TXN_ID: > + txn_is_set = true; > + header->txn_id = mp_decode_uint(pos); > + break; > + case IPROTO_TXN_FLAGS: > + txn_flags = mp_decode_uint(pos); > + header->txn_commit = txn_flags & TXN_FLAG_COMMIT; > + if ((txn_flags & ~TXN_FLAG_COMMIT) != 0) > + /* Unknow flags. */ > + goto error; We silently ignore unknown headers. I think we can silently ignore unknown flags as well. > + break; > default: > /* unknown header */ > mp_next(pos); > } > } > assert(*pos <= end); > + if (!txn_is_set) { > + /* > + * Transaction id is not set so it is a single statement > + * transaction. > + */ > + header->txn_commit = true; > + } > + header->txn_id = header->lsn + header->txn_id; > + > /* Nop requests aren't supposed to have a body. */ > if (*pos < end && header->type != IPROTO_NOP) { > const char *body = *pos; > @@ -223,6 +245,20 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync, > d = mp_encode_double(d, header->tm); > map_size++; > } > + if (header->txn_id != 0) { > + if (header->txn_id != header->lsn || header->txn_commit == 0) { Nit: txn_commit is a bool so s/header->txn_commit == 0/!header->txn_commit > + /* Encode txn id for multi row transaction members. */ > + d = mp_encode_uint(d, IPROTO_TXN_ID); > + d = mp_encode_uint(d, header->lsn - header->txn_id); > + map_size++; > + } > + if (header->txn_commit && header->txn_id != header->lsn) { > + /* Setup last row for multi row transaction. */ > + d = mp_encode_uint(d, IPROTO_TXN_FLAGS); > + d = mp_encode_uint(d, TXN_FLAG_COMMIT); > + map_size++; > + } > + } > assert(d <= data + XROW_HEADER_LEN_MAX); > mp_encode_map(data, map_size); > out->iov_len = d - (char *) out->iov_base; > diff --git a/src/box/xrow.h b/src/box/xrow.h > index 719add4f0..bc4c4a2d7 100644 > --- a/src/box/xrow.h > +++ b/src/box/xrow.h > @@ -47,7 +47,7 @@ enum { > XROW_HEADER_IOVMAX = 1, > XROW_BODY_IOVMAX = 2, > XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, > - XROW_HEADER_LEN_MAX = 40, > + XROW_HEADER_LEN_MAX = 52, > XROW_BODY_LEN_MAX = 128, > IPROTO_HEADER_LEN = 28, > /** 7 = sizeof(iproto_body_bin). */ > @@ -63,6 +63,8 @@ struct xrow_header { > uint64_t sync; > int64_t lsn; /* LSN must be signed for correct comparison */ > double tm; > + int64_t txn_id; > + bool txn_commit; Please add a comment explaining how these are mapped to IPROTO headers. > > int bodycnt; > uint32_t schema_version; > diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc > index 022d1f998..bc99285de 100644 > --- a/test/unit/xrow.cc > +++ b/test/unit/xrow.cc > @@ -215,6 +215,8 @@ test_xrow_header_encode_decode() > header.lsn = 400; > header.tm = 123.456; > header.bodycnt = 0; > + header.txn_id = header.lsn; > + header.txn_commit = true; > uint64_t sync = 100500; > struct iovec vec[1]; > is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); ^ permalink raw reply [flat|nested] 6+ messages in thread
* [tarantool-patches] Re: [PATCH v3 1/2] Journal transaction boundaries 2019-02-15 13:15 ` Vladimir Davydov @ 2019-02-19 14:59 ` Konstantin Osipov 0 siblings, 0 replies; 6+ messages in thread From: Konstantin Osipov @ 2019-02-19 14:59 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko * Vladimir Davydov <vdavydov.dev@gmail.com> [19/02/15 16:17]: > > + /* 0x08 */ MP_UINT, /* IPROTO_TXN_ID */ > > + /* 0x09 */ MP_UINT, /* IPROTO_TXN_FLAGS */ > > I don't quite like the name, because we encode not a txn id, but the > statement number in a txn. Let's rename it to IPROTO_TXN_STMT_NO or > IPROTO_TXN_STMT_ID or IPROTO_TXN_STMT_SEQ? How about IRPOTO_TSN and IPROTO_FLAGS? > > +enum { > > + /** Set for the last xrow in a transaction. */ > > + TXN_FLAG_COMMIT = 0x01, > > IPROTO_TXN_COMMIT? IRPOTO_FLAG_COMMIT? -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 6+ messages in thread
* [tarantool-patches] [PATCH v3 2/2] Transaction support for applier [not found] <cover.1550001848.git.georgy@tarantool.org> 2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries Georgy Kirichenko @ 2019-02-12 20:04 ` Georgy Kirichenko 2019-02-18 9:36 ` Vladimir Davydov 1 sibling, 1 reply; 6+ messages in thread From: Georgy Kirichenko @ 2019-02-12 20:04 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko Applier fetch incoming rows to form a transaction and then apply it. Implementation assumes that transaction could not mix in a replication stream. Also distributed transaction are not supported yet. Closes: #2798 Needed for: #980 --- src/box/applier.cc | 185 +++++++++++++++----- test/replication/transaction.result | 240 ++++++++++++++++++++++++++ test/replication/transaction.test.lua | 86 +++++++++ 3 files changed, 471 insertions(+), 40 deletions(-) create mode 100644 test/replication/transaction.result create mode 100644 test/replication/transaction.test.lua diff --git a/src/box/applier.cc b/src/box/applier.cc index 7f37fe2ee..59c33bb84 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -48,6 +48,7 @@ #include "error.h" #include "session.h" #include "cfg.h" +#include "txn.h" STRS(applier_state, applier_STATE); @@ -378,6 +379,105 @@ applier_join(struct applier *applier) applier_set_state(applier, APPLIER_READY); } +/** + * Read one transaction from network. + * Transaction rows are placed into row_buf as an array, row's bodies are + * placed into obuf because it is not allowed to relocate row's bodies. + * Also we could not use applier input buffer because rpos adjusted after xrow + * decoding and corresponding space going to reuse. + * + * Note: current implementation grants that transaction could not be mixed, so + * we read each transaction from first xrow until xrow with txn_last = true. + */ +static int64_t +applier_read_tx(struct applier *applier, struct ibuf *row_buf, + struct obuf *data_buf) +{ + struct xrow_header *row; + struct ev_io *coio = &applier->io; + struct ibuf *ibuf = &applier->ibuf; + int64_t txn_id = 0; + + do { + row = (struct xrow_header *)ibuf_alloc(row_buf, + sizeof(struct xrow_header)); + if (row == NULL) { + diag_set(OutOfMemory, sizeof(struct xrow_header), + "slab", "struct xrow_header"); + goto error; + } + + double timeout = replication_disconnect_timeout(); + try { + /* TODO: we should have a C version of this function. */ + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); + } catch (...) { + goto error; + } + + if (iproto_type_is_error(row->type)) { + xrow_decode_error(row); + goto error; + } + + /* Replication request. */ + if (row->replica_id == REPLICA_ID_NIL || + row->replica_id >= VCLOCK_MAX) { + /* + * A safety net, this can only occur + * if we're fed a strangely broken xlog. + */ + diag_set(ClientError, ER_UNKNOWN_REPLICA, + int2str(row->replica_id), + tt_uuid_str(&REPLICASET_UUID)); + goto error; + } + if (ibuf_used(row_buf) == sizeof(struct xrow_header)) { + /* + * First row in a transaction. In order to enforce + * consistency check that first row lsn and replica id + * match with transaction. + */ + txn_id = row->txn_id; + if (row->lsn != txn_id) { + /* There is not a first row in the transactions. */ + diag_set(ClientError, ER_PROTOCOL, + "Not a first row in a transaction"); + goto error; + } + } + if (txn_id != row->txn_id) { + /* We are not able to handle interleaving transactions. */ + diag_set(ClientError, ER_UNSUPPORTED, + "replications", + "interleaving transactions"); + goto error; + } + + + applier->lag = ev_now(loop()) - row->tm; + applier->last_row_time = ev_monotonic_now(loop()); + + if (row->body->iov_base != NULL) { + void *new_base = obuf_alloc(data_buf, row->body->iov_len); + if (new_base == NULL) { + diag_set(OutOfMemory, row->body->iov_len, + "slab", "xrow_data"); + goto error; + } + memcpy(new_base, row->body->iov_base, row->body->iov_len); + row->body->iov_base = new_base; + } + + } while (row->txn_commit == 0); + + return 0; +error: + ibuf_reset(row_buf); + obuf_reset(data_buf); + return -1; +} + /** * Execute and process SUBSCRIBE request (follow updates from a master). */ @@ -392,6 +492,10 @@ applier_subscribe(struct applier *applier) struct xrow_header row; struct vclock remote_vclock_at_subscribe; struct tt_uuid cluster_id = uuid_nil; + struct ibuf row_buf; + struct obuf data_buf; + ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header)); + obuf_create(&data_buf, &cord()->slabc, 0x10000); xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, &replicaset.vclock); @@ -501,36 +605,16 @@ applier_subscribe(struct applier *applier) applier_set_state(applier, APPLIER_FOLLOW); } - /* - * Tarantool < 1.7.7 does not send periodic heartbeat - * messages so we can't assume that if we haven't heard - * from the master for quite a while the connection is - * broken - the master might just be idle. - */ - if (applier->version_id < version_id(1, 7, 7)) { - coio_read_xrow(coio, ibuf, &row); - } else { - double timeout = replication_disconnect_timeout(); - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout); - } + if (applier_read_tx(applier, &row_buf, &data_buf) != 0) + diag_raise(); - if (iproto_type_is_error(row.type)) - xrow_decode_error_xc(&row); /* error */ - /* Replication request. */ - if (row.replica_id == REPLICA_ID_NIL || - row.replica_id >= VCLOCK_MAX) { - /* - * A safety net, this can only occur - * if we're fed a strangely broken xlog. - */ - tnt_raise(ClientError, ER_UNKNOWN_REPLICA, - int2str(row.replica_id), - tt_uuid_str(&REPLICASET_UUID)); - } + struct txn *txn = NULL; + struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos; + struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1; - applier->lag = ev_now(loop()) - row.tm; + applier->lag = ev_now(loop()) - last_row->tm; applier->last_row_time = ev_monotonic_now(loop()); - struct replica *replica = replica_by_id(row.replica_id); + struct replica *replica = replica_by_id(first_row->replica_id); struct latch *latch = (replica ? &replica->order_latch : &replicaset.applier.order_latch); /* @@ -540,24 +624,45 @@ applier_subscribe(struct applier *applier) * that belong to the same server id. */ latch_lock(latch); - if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { - int res = xstream_write(applier->subscribe_stream, &row); - if (res != 0) { - struct error *e = diag_last_error(diag_get()); - /** - * Silently skip ER_TUPLE_FOUND error if such - * option is set in config. - */ - if (e->type == &type_ClientError && + if (vclock_get(&replicaset.vclock, + first_row->replica_id) < first_row->lsn) { + struct xrow_header *row = first_row; + if (first_row != last_row) + txn = txn_begin(false); + int res = 0; + while (row <= last_row && res == 0) { + res = xstream_write(applier->subscribe_stream, row); + struct error *e; + if (res != 0 && + (e = diag_last_error(diag_get()))->type == + &type_ClientError && box_error_code(e) == ER_TUPLE_FOUND && - replication_skip_conflict) + replication_skip_conflict) { + /** + * Silently skip ER_TUPLE_FOUND error + * if such option is set in config. + */ diag_clear(diag_get()); - else { - latch_unlock(latch); - diag_raise(); + row->type = IPROTO_NOP; + row->bodycnt = 0; + res = xstream_write(applier->subscribe_stream, + row); } + ++row; + } + if (res == 0 && txn != NULL) + res = txn_commit(txn); + + if (res != 0) { + txn_rollback(); + obuf_reset(&data_buf); + ibuf_reset(&row_buf); + latch_unlock(latch); + diag_raise(); } } + obuf_reset(&data_buf); + ibuf_reset(&row_buf); latch_unlock(latch); if (applier->state == APPLIER_SYNC || diff --git a/test/replication/transaction.result b/test/replication/transaction.result new file mode 100644 index 000000000..009f84430 --- /dev/null +++ b/test/replication/transaction.result @@ -0,0 +1,240 @@ +env = require('test_run') +--- +... +test_run = env.new() +--- +... +box.schema.user.grant('guest', 'replication') +--- +... +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) +--- +... +_ = s:create_index('pk') +--- +... +-- transaction w/o conflict +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit() +--- +... +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +--- +- true +... +test_run:cmd("start server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +-- insert a conflicting row +box.space.test:replace({4, 'r'}) +--- +- [4, 'r'] +... +v1 = box.info.vclock +--- +... +test_run:cmd("switch default") +--- +- true +... +-- create a two-row transaction with conflicting second +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit() +--- +... +-- create a third transaction +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit() +--- +... +test_run:cmd("switch replica") +--- +- true +... +-- nothing was applied +v1[1] == box.info.vclock[1] +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [4, 'r'] +... +-- check replication status +box.info.replication[1].upstream.status +--- +- stopped +... +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'pk' in space 'test' +... +-- set conflict to third transaction +box.space.test:delete({3}) +--- +... +box.space.test:replace({6, 'r'}) +--- +- [6, 'r'] +... +-- restart replication +replication = box.cfg.replication +--- +... +box.cfg{replication = {}} +--- +... +box.cfg{replication = replication} +--- +... +-- replication stopped of third transaction +v1[1] + 2 == box.info.vclock[1] +--- +- false +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [4, 'r'] + - [6, 'r'] +... +-- check replication status +box.info.replication[1].upstream.status +--- +- stopped +... +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'pk' in space 'test' +... +-- check restart does not help +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("restart server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [4, 'r'] + - [6, 'r'] +... +-- set skip conflict rows and check that non-conflicting were applied +replication = box.cfg.replication +--- +... +box.cfg{replication = {}, replication_skip_conflict = true} +--- +... +box.cfg{replication = replication} +--- +... +-- check last transaction applied without conflicting row +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'r'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] +... +box.info.replication[1].upstream.status +--- +- follow +... +-- make some new conflicting rows with skip-conflicts +box.space.test:replace({8, 'r'}) +--- +- [8, 'r'] +... +box.space.test:replace({9, 'r'}) +--- +- [9, 'r'] +... +-- issue a conflicting tx +test_run:cmd("switch default") +--- +- true +... +box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit() +--- +... +test_run:cmd("switch replica") +--- +- true +... +-- vclock should be increased but rows skipped +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'r'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] + - [8, 'r'] + - [9, 'r'] +... +-- check restart does not change something +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("restart server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'r'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] + - [8, 'r'] + - [9, 'r'] +... +box.info.replication[1].upstream.status +--- +- follow +... +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... +box.schema.user.revoke('guest', 'replication') +--- +... +s:drop() +--- +... diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua new file mode 100644 index 000000000..47003c644 --- /dev/null +++ b/test/replication/transaction.test.lua @@ -0,0 +1,86 @@ +env = require('test_run') +test_run = env.new() +box.schema.user.grant('guest', 'replication') + +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) +_ = s:create_index('pk') + +-- transaction w/o conflict +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit() + +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +test_run:cmd("start server replica") +test_run:cmd("switch replica") + +-- insert a conflicting row +box.space.test:replace({4, 'r'}) +v1 = box.info.vclock + +test_run:cmd("switch default") +-- create a two-row transaction with conflicting second +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit() +-- create a third transaction +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit() + +test_run:cmd("switch replica") +-- nothing was applied +v1[1] == box.info.vclock[1] +box.space.test:select() +-- check replication status +box.info.replication[1].upstream.status +box.info.replication[1].upstream.message +-- set conflict to third transaction +box.space.test:delete({3}) +box.space.test:replace({6, 'r'}) +-- restart replication +replication = box.cfg.replication +box.cfg{replication = {}} +box.cfg{replication = replication} +-- replication stopped of third transaction +v1[1] + 2 == box.info.vclock[1] +box.space.test:select() +-- check replication status +box.info.replication[1].upstream.status +box.info.replication[1].upstream.message + +-- check restart does not help +test_run:cmd("switch default") +test_run:cmd("restart server replica") +test_run:cmd("switch replica") + +box.space.test:select() +-- set skip conflict rows and check that non-conflicting were applied +replication = box.cfg.replication +box.cfg{replication = {}, replication_skip_conflict = true} +box.cfg{replication = replication} + +-- check last transaction applied without conflicting row +box.space.test:select() +box.info.replication[1].upstream.status + +-- make some new conflicting rows with skip-conflicts +box.space.test:replace({8, 'r'}) +box.space.test:replace({9, 'r'}) + +-- issue a conflicting tx +test_run:cmd("switch default") +box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit() + +test_run:cmd("switch replica") +-- vclock should be increased but rows skipped +box.space.test:select() + +-- check restart does not change something +test_run:cmd("switch default") +test_run:cmd("restart server replica") +test_run:cmd("switch replica") + +box.space.test:select() +box.info.replication[1].upstream.status + +test_run:cmd("switch default") +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") + +box.schema.user.revoke('guest', 'replication') +s:drop() -- 2.20.1 ^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [tarantool-patches] [PATCH v3 2/2] Transaction support for applier 2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 2/2] Transaction support for applier Georgy Kirichenko @ 2019-02-18 9:36 ` Vladimir Davydov 0 siblings, 0 replies; 6+ messages in thread From: Vladimir Davydov @ 2019-02-18 9:36 UTC (permalink / raw) To: Georgy Kirichenko; +Cc: tarantool-patches On Tue, Feb 12, 2019 at 11:04:32PM +0300, Georgy Kirichenko wrote: > Applier fetch incoming rows to form a transaction and then apply it. > Implementation assumes that transaction could not mix in a > replication stream. Also distributed transaction are not supported yet. > > Closes: #2798 > Needed for: #980 > --- > src/box/applier.cc | 185 +++++++++++++++----- > test/replication/transaction.result | 240 ++++++++++++++++++++++++++ > test/replication/transaction.test.lua | 86 +++++++++ > 3 files changed, 471 insertions(+), 40 deletions(-) > create mode 100644 test/replication/transaction.result > create mode 100644 test/replication/transaction.test.lua > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 7f37fe2ee..59c33bb84 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -48,6 +48,7 @@ > #include "error.h" > #include "session.h" > #include "cfg.h" > +#include "txn.h" > > STRS(applier_state, applier_STATE); > > @@ -378,6 +379,105 @@ applier_join(struct applier *applier) > applier_set_state(applier, APPLIER_READY); > } > > +/** > + * Read one transaction from network. > + * Transaction rows are placed into row_buf as an array, row's bodies are > + * placed into obuf because it is not allowed to relocate row's bodies. > + * Also we could not use applier input buffer because rpos adjusted after xrow > + * decoding and corresponding space going to reuse. > + * > + * Note: current implementation grants that transaction could not be mixed, so > + * we read each transaction from first xrow until xrow with txn_last = true. > + */ > +static int64_t > +applier_read_tx(struct applier *applier, struct ibuf *row_buf, > + struct obuf *data_buf) > +{ > + struct xrow_header *row; > + struct ev_io *coio = &applier->io; > + struct ibuf *ibuf = &applier->ibuf; > + int64_t txn_id = 0; > + > + do { > + row = (struct xrow_header *)ibuf_alloc(row_buf, > + sizeof(struct xrow_header)); Nit: the line's too for no reason, and there are more lines like that. Please fix them where you can without making the code look ugly. > + if (row == NULL) { > + diag_set(OutOfMemory, sizeof(struct xrow_header), > + "slab", "struct xrow_header"); > + goto error; > + } > + > + double timeout = replication_disconnect_timeout(); > + try { > + /* TODO: we should have a C version of this function. */ > + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); Nit: IMO better use guards to free resources, if any. > + } catch (...) { > + goto error; > + } > + > + if (iproto_type_is_error(row->type)) { > + xrow_decode_error(row); > + goto error; > + } > + > + /* Replication request. */ > + if (row->replica_id == REPLICA_ID_NIL || > + row->replica_id >= VCLOCK_MAX) { > + /* > + * A safety net, this can only occur > + * if we're fed a strangely broken xlog. > + */ > + diag_set(ClientError, ER_UNKNOWN_REPLICA, > + int2str(row->replica_id), > + tt_uuid_str(&REPLICASET_UUID)); > + goto error; > + } > + if (ibuf_used(row_buf) == sizeof(struct xrow_header)) { > + /* > + * First row in a transaction. In order to enforce > + * consistency check that first row lsn and replica id > + * match with transaction. > + */ > + txn_id = row->txn_id; > + if (row->lsn != txn_id) { > + /* There is not a first row in the transactions. */ > + diag_set(ClientError, ER_PROTOCOL, > + "Not a first row in a transaction"); > + goto error; > + } > + } > + if (txn_id != row->txn_id) { > + /* We are not able to handle interleaving transactions. */ > + diag_set(ClientError, ER_UNSUPPORTED, > + "replications", > + "interleaving transactions"); > + goto error; > + } > + > + > + applier->lag = ev_now(loop()) - row->tm; > + applier->last_row_time = ev_monotonic_now(loop()); > + > + if (row->body->iov_base != NULL) { > + void *new_base = obuf_alloc(data_buf, row->body->iov_len); > + if (new_base == NULL) { > + diag_set(OutOfMemory, row->body->iov_len, > + "slab", "xrow_data"); > + goto error; > + } > + memcpy(new_base, row->body->iov_base, row->body->iov_len); > + row->body->iov_base = new_base; So we first read a row to an ibuf, then copy it to an obuf. I understand that you do this, because xrow_header::body has pointers in it. Still it looks rather awkward. May be, we'd better temporarily fix up those pointers to store relative offsets instead? > + } > + > + } while (row->txn_commit == 0); > + > + return 0; > +error: > + ibuf_reset(row_buf); > + obuf_reset(data_buf); obuf_reset, ibuf_reset don't free up memory. You must use obuf_destroy, ibuf_destroy. > + return -1; > +} > + > /** > * Execute and process SUBSCRIBE request (follow updates from a master). > */ > @@ -392,6 +492,10 @@ applier_subscribe(struct applier *applier) > struct xrow_header row; > struct vclock remote_vclock_at_subscribe; > struct tt_uuid cluster_id = uuid_nil; > + struct ibuf row_buf; > + struct obuf data_buf; IMO the buffers better be a part of the applier struct. Then you wouldn't need to use a label to free them up in applier_read_tx and hence could simply let the exception thrown by coio_read_xrow_timeout_xc travel up the stack without wrapping it in that awkward try-catch block. > + ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header)); > + obuf_create(&data_buf, &cord()->slabc, 0x10000); This constant look like magic to me. > > xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, > &replicaset.vclock); > @@ -501,36 +605,16 @@ applier_subscribe(struct applier *applier) > applier_set_state(applier, APPLIER_FOLLOW); > } > > - /* > - * Tarantool < 1.7.7 does not send periodic heartbeat > - * messages so we can't assume that if we haven't heard > - * from the master for quite a while the connection is > - * broken - the master might just be idle. > - */ > - if (applier->version_id < version_id(1, 7, 7)) { > - coio_read_xrow(coio, ibuf, &row); You silently dropped this branch. Please leave it be. > - } else { > - double timeout = replication_disconnect_timeout(); > - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout); > - } > + if (applier_read_tx(applier, &row_buf, &data_buf) != 0) > + diag_raise(); > > - if (iproto_type_is_error(row.type)) > - xrow_decode_error_xc(&row); /* error */ > - /* Replication request. */ > - if (row.replica_id == REPLICA_ID_NIL || > - row.replica_id >= VCLOCK_MAX) { > - /* > - * A safety net, this can only occur > - * if we're fed a strangely broken xlog. > - */ > - tnt_raise(ClientError, ER_UNKNOWN_REPLICA, > - int2str(row.replica_id), > - tt_uuid_str(&REPLICASET_UUID)); > - } > + struct txn *txn = NULL; > + struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos; > + struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1; > > - applier->lag = ev_now(loop()) - row.tm; > + applier->lag = ev_now(loop()) - last_row->tm; > applier->last_row_time = ev_monotonic_now(loop()); > - struct replica *replica = replica_by_id(row.replica_id); > + struct replica *replica = replica_by_id(first_row->replica_id); > struct latch *latch = (replica ? &replica->order_latch : > &replicaset.applier.order_latch); > /* > @@ -540,24 +624,45 @@ applier_subscribe(struct applier *applier) > * that belong to the same server id. > */ > latch_lock(latch); > - if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { > - int res = xstream_write(applier->subscribe_stream, &row); > - if (res != 0) { > - struct error *e = diag_last_error(diag_get()); > - /** > - * Silently skip ER_TUPLE_FOUND error if such > - * option is set in config. > - */ > - if (e->type == &type_ClientError && > + if (vclock_get(&replicaset.vclock, > + first_row->replica_id) < first_row->lsn) { > + struct xrow_header *row = first_row; > + if (first_row != last_row) > + txn = txn_begin(false); We have a nice level of abstraction implemented by xstream, but now you bluntly break it by calling txn_begin/commit directly. Please come up with a better solution, e.g. you could extend the xstream interface with write_tx method. > + int res = 0; > + while (row <= last_row && res == 0) { > + res = xstream_write(applier->subscribe_stream, row); > + struct error *e; > + if (res != 0 && > + (e = diag_last_error(diag_get()))->type == > + &type_ClientError && > box_error_code(e) == ER_TUPLE_FOUND && > - replication_skip_conflict) > + replication_skip_conflict) { > + /** > + * Silently skip ER_TUPLE_FOUND error > + * if such option is set in config. > + */ > diag_clear(diag_get()); > - else { > - latch_unlock(latch); > - diag_raise(); > + row->type = IPROTO_NOP; > + row->bodycnt = 0; > + res = xstream_write(applier->subscribe_stream, > + row); > } > + ++row; > + } > + if (res == 0 && txn != NULL) > + res = txn_commit(txn); > + > + if (res != 0) { > + txn_rollback(); > + obuf_reset(&data_buf); > + ibuf_reset(&row_buf); > + latch_unlock(latch); > + diag_raise(); > } > } > + obuf_reset(&data_buf); > + ibuf_reset(&row_buf); > latch_unlock(latch); > > if (applier->state == APPLIER_SYNC || > diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua > new file mode 100644 > index 000000000..47003c644 > --- /dev/null > +++ b/test/replication/transaction.test.lua > @@ -0,0 +1,86 @@ > +env = require('test_run') > +test_run = env.new() > +box.schema.user.grant('guest', 'replication') > + > +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) > +_ = s:create_index('pk') > + > +-- transaction w/o conflict > +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit() Whenever you add a test, please write a few words about what it does and the resolution of which ticket it is supposed to test. ^ permalink raw reply [flat|nested] 6+ messages in thread
* [tarantool-patches] [PATCH v3 0/2] Transaction boundaries for applier @ 2019-03-10 20:21 Georgy Kirichenko 2019-03-10 20:21 ` [tarantool-patches] [PATCH v3 2/2] Transaction support " Georgy Kirichenko 0 siblings, 1 reply; 6+ messages in thread From: Georgy Kirichenko @ 2019-03-10 20:21 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko This patchset consists of thwo patches. * The first one creates a separate journal transaction for all local effects in case of replication what is needed to be able to replicate such effects back. * The second one turns applier into transaction mode - an applier first fetches the whole transaction and then applies all rows within transaction boundaries. Changes in v3: * rebased against latest 2.1. * use n_local_rows/n_remote_rows counters. * fixes and refactoring according to review. Changes in v2: * Get rid of apply_initial_journal_row and apply_row from box.cc and purge box dependency from applier. * txn.cc and txn.h changes moved into a separate commit. * applier_read_tx uses stailq to form a list of rows in a transaction. * use exceptions for applier routines. * slight changes according to review. Issue: https://github.com/tarantool/tarantool/issues/2798 Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-2798-transaction-boundaries Georgy Kirichenko (2): Write rows without a lsn to the transaction tail Transaction support for applier src/box/applier.cc | 219 +++++++++++++++++------ src/box/txn.c | 42 +++-- src/box/txn.h | 6 +- test/replication/transaction.result | 242 ++++++++++++++++++++++++++ test/replication/transaction.test.lua | 86 +++++++++ 5 files changed, 525 insertions(+), 70 deletions(-) create mode 100644 test/replication/transaction.result create mode 100644 test/replication/transaction.test.lua -- 2.21.0 ^ permalink raw reply [flat|nested] 6+ messages in thread
* [tarantool-patches] [PATCH v3 2/2] Transaction support for applier 2019-03-10 20:21 [tarantool-patches] [PATCH v3 0/2] Transaction boundaries " Georgy Kirichenko @ 2019-03-10 20:21 ` Georgy Kirichenko 0 siblings, 0 replies; 6+ messages in thread From: Georgy Kirichenko @ 2019-03-10 20:21 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko Applier fetch incoming rows to form a transaction and then apply it. Rows are fetched and stored on fiber gc region until last transaction row with is_commit was fetched. After fetch a multi row transaction is going to be applied into txn_begin/txn_commit/txn_rolback boundaries. At this time we could not apply single row transaction in such boundaries because of ddl which does not support non auto commit transactions. Closes: #2798 Needed for: #980 --- src/box/applier.cc | 219 +++++++++++++++++------ test/replication/transaction.result | 242 ++++++++++++++++++++++++++ test/replication/transaction.test.lua | 86 +++++++++ 3 files changed, 492 insertions(+), 55 deletions(-) create mode 100644 test/replication/transaction.result create mode 100644 test/replication/transaction.test.lua diff --git a/src/box/applier.cc b/src/box/applier.cc index 94c07aac7..5af132377 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -426,6 +426,159 @@ applier_join(struct applier *applier) applier_set_state(applier, APPLIER_READY); } +/** + * Helper struct to bind rows in a list. + */ +struct applier_tx_row { + /* Next transaction row. */ + struct stailq_entry next; + /* xrow_header struct for the current transaction row. */ + struct xrow_header row; +}; + +static struct applier_tx_row * +applier_read_tx_row(struct applier *applier) +{ + struct ev_io *coio = &applier->io; + struct ibuf *ibuf = &applier->ibuf; + + struct applier_tx_row *tx_row = (struct applier_tx_row *) + region_alloc(&fiber()->gc, sizeof(struct applier_tx_row)); + + if (tx_row == NULL) + tnt_raise(OutOfMemory, sizeof(struct applier_tx_row), + "region", "struct applier_tx_row"); + + struct xrow_header *row = &tx_row->row; + + double timeout = replication_disconnect_timeout(); + /* + * Tarantool < 1.7.7 does not send periodic heartbeat + * messages so we can't assume that if we haven't heard + * from the master for quite a while the connection is + * broken - the master might just be idle. + */ + if (applier->version_id < version_id(1, 7, 7)) + coio_read_xrow(coio, ibuf, row); + else + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); + + applier->lag = ev_now(loop()) - row->tm; + applier->last_row_time = ev_monotonic_now(loop()); + return tx_row; +} + +/** + * Read one transaction from network using applier's input buffer. + * Transaction rows are placed onto fiber gc region. + * We could not use applier input buffer for that because rpos is adjusted + * after each xrow decoding and corresponding network input space is going + * to be reused. + */ +static void +applier_read_tx(struct applier *applier, struct stailq *rows) +{ + int64_t tsn = 0; + + stailq_create(rows); + do { + struct applier_tx_row *tx_row = applier_read_tx_row(applier); + struct xrow_header *row = &tx_row->row; + + if (iproto_type_is_error(row->type)) + xrow_decode_error_xc(row); + + /* Replication request. */ + if (row->replica_id == REPLICA_ID_NIL || + row->replica_id >= VCLOCK_MAX) { + /* + * A safety net, this can only occur + * if we're fed a strangely broken xlog. + */ + tnt_raise(ClientError, ER_UNKNOWN_REPLICA, + int2str(row->replica_id), + tt_uuid_str(&REPLICASET_UUID)); + } + if (tsn == 0) { + /* + * Transaction id must be derived from the log sequence + * number of the first row in the transaction. + */ + tsn = row->tsn; + if (row->lsn != tsn) + tnt_raise(ClientError, ER_PROTOCOL, + "Transaction id must be derived from " + "the lsn of the first row in the " + "transaction."); + } + if (tsn != row->tsn) + tnt_raise(ClientError, ER_UNSUPPORTED, + "replication", + "interleaving transactions"); + + assert(row->bodycnt <= 1); + if (row->bodycnt == 1 && !row->is_commit) { + /* Save row body to gc region. */ + void *new_base = region_alloc(&fiber()->gc, + row->body->iov_len); + if (new_base == NULL) + tnt_raise(OutOfMemory, row->body->iov_len, + "region", "xrow body"); + memcpy(new_base, row->body->iov_base, row->body->iov_len); + /* Adjust row body pointers. */ + row->body->iov_base = new_base; + } + stailq_add_tail(rows, &tx_row->next); + + } while (!stailq_last_entry(rows, struct applier_tx_row, + next)->row.is_commit); +} + +/** + * Apply all rows in the rows queue as a single transaction. + * + * Return 0 for success or -1 in case of an error. + */ +static int +applier_apply_tx(struct stailq *rows) +{ + int res = 0; + struct txn *txn = NULL; + struct applier_tx_row *item; + if (stailq_first(rows) != stailq_last(rows) && + (txn = txn_begin(false)) == NULL) + diag_raise(); + stailq_foreach_entry(item, rows, next) { + struct xrow_header *row = &item->row; + res = apply_row(row); + if (res != 0) { + struct error *e = diag_last_error(diag_get()); + /* + * In case of ER_TUPLE_FOUND error and enabled + * replication_skip_conflict configuration + * option, skip applying the foreign row and + * replace it with NOP in the local write ahead + * log. + */ + if (e->type == &type_ClientError && + box_error_code(e) == ER_TUPLE_FOUND && + replication_skip_conflict) { + diag_clear(diag_get()); + row->type = IPROTO_NOP; + row->bodycnt = 0; + res = apply_row(row); + } + } + if (res != 0) + break; + } + if (res == 0 && txn != NULL) + res = txn_commit(txn); + if (res != 0) + txn_rollback(); + return res; +} + /** * Execute and process SUBSCRIBE request (follow updates from a master). */ @@ -555,36 +708,14 @@ applier_subscribe(struct applier *applier) applier_set_state(applier, APPLIER_FOLLOW); } - /* - * Tarantool < 1.7.7 does not send periodic heartbeat - * messages so we can't assume that if we haven't heard - * from the master for quite a while the connection is - * broken - the master might just be idle. - */ - if (applier->version_id < version_id(1, 7, 7)) { - coio_read_xrow(coio, ibuf, &row); - } else { - double timeout = replication_disconnect_timeout(); - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout); - } - - if (iproto_type_is_error(row.type)) - xrow_decode_error_xc(&row); /* error */ - /* Replication request. */ - if (row.replica_id == REPLICA_ID_NIL || - row.replica_id >= VCLOCK_MAX) { - /* - * A safety net, this can only occur - * if we're fed a strangely broken xlog. - */ - tnt_raise(ClientError, ER_UNKNOWN_REPLICA, - int2str(row.replica_id), - tt_uuid_str(&REPLICASET_UUID)); - } + struct stailq rows; + applier_read_tx(applier, &rows); - applier->lag = ev_now(loop()) - row.tm; + struct xrow_header *first_row = + &stailq_first_entry(&rows, struct applier_tx_row, + next)->row; applier->last_row_time = ev_monotonic_now(loop()); - struct replica *replica = replica_by_id(row.replica_id); + struct replica *replica = replica_by_id(first_row->replica_id); struct latch *latch = (replica ? &replica->order_latch : &replicaset.applier.order_latch); /* @@ -594,33 +725,11 @@ applier_subscribe(struct applier *applier) * that belong to the same server id. */ latch_lock(latch); - if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { - int res = apply_row(&row); - if (res != 0) { - struct error *e = diag_last_error(diag_get()); - /* - * In case of ER_TUPLE_FOUND error and enabled - * replication_skip_conflict configuration - * option, skip applying the foreign row and - * replace it with NOP in the local write ahead - * log. - */ - if (e->type == &type_ClientError && - box_error_code(e) == ER_TUPLE_FOUND && - replication_skip_conflict) { - diag_clear(diag_get()); - struct xrow_header nop; - nop.type = IPROTO_NOP; - nop.bodycnt = 0; - nop.replica_id = row.replica_id; - nop.lsn = row.lsn; - res = apply_row(&nop); - } - } - if (res != 0) { - latch_unlock(latch); - diag_raise(); - } + if (vclock_get(&replicaset.vclock, first_row->replica_id) < + first_row->lsn && + applier_apply_tx(&rows) != 0) { + latch_unlock(latch); + diag_raise(); } latch_unlock(latch); diff --git a/test/replication/transaction.result b/test/replication/transaction.result new file mode 100644 index 000000000..8c2ac6ee4 --- /dev/null +++ b/test/replication/transaction.result @@ -0,0 +1,242 @@ +env = require('test_run') +--- +... +test_run = env.new() +--- +... +box.schema.user.grant('guest', 'replication') +--- +... +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) +--- +... +_ = s:create_index('pk') +--- +... +-- transaction w/o conflict +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit() +--- +... +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +--- +- true +... +test_run:cmd("start server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +-- insert a conflicting row +box.space.test:replace({4, 'r'}) +--- +- [4, 'r'] +... +v1 = box.info.vclock +--- +... +test_run:cmd("switch default") +--- +- true +... +-- create a two-row transaction with conflicting second +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit() +--- +... +-- create a third transaction +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit() +--- +... +test_run:cmd("switch replica") +--- +- true +... +-- nothing was applied +v1[1] == box.info.vclock[1] +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [4, 'r'] +... +-- check replication status +box.info.replication[1].upstream.status +--- +- stopped +... +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'pk' in space 'test' +... +-- set conflict to third transaction +_ = box.space.test:delete({4}) +--- +... +box.space.test:replace({6, 'r'}) +--- +- [6, 'r'] +... +-- restart replication +replication = box.cfg.replication +--- +... +box.cfg{replication = {}} +--- +... +box.cfg{replication = replication} +--- +... +-- replication stopped of third transaction +v1[1] + 2 == box.info.vclock[1] +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'm'] + - [6, 'r'] +... +-- check replication status +box.info.replication[1].upstream.status +--- +- stopped +... +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'pk' in space 'test' +... +-- check restart does not help +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("restart server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'm'] + - [6, 'r'] +... +-- set skip conflict rows and check that non-conflicting were applied +replication = box.cfg.replication +--- +... +box.cfg{replication = {}, replication_skip_conflict = true} +--- +... +box.cfg{replication = replication} +--- +... +-- check last transaction applied without conflicting row +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'm'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] +... +box.info.replication[1].upstream.status +--- +- follow +... +-- make some new conflicting rows with skip-conflicts +box.space.test:replace({8, 'r'}) +--- +- [8, 'r'] +... +box.space.test:replace({9, 'r'}) +--- +- [9, 'r'] +... +-- issue a conflicting tx +test_run:cmd("switch default") +--- +- true +... +box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit() +--- +... +test_run:cmd("switch replica") +--- +- true +... +-- vclock should be increased but rows skipped +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'm'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] + - [8, 'r'] + - [9, 'r'] +... +-- check restart does not change something +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("restart server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'm'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] + - [8, 'r'] + - [9, 'r'] +... +box.info.replication[1].upstream.status +--- +- follow +... +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... +box.schema.user.revoke('guest', 'replication') +--- +... +s:drop() +--- +... diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua new file mode 100644 index 000000000..f25a4737d --- /dev/null +++ b/test/replication/transaction.test.lua @@ -0,0 +1,86 @@ +env = require('test_run') +test_run = env.new() +box.schema.user.grant('guest', 'replication') + +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) +_ = s:create_index('pk') + +-- transaction w/o conflict +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit() + +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +test_run:cmd("start server replica") +test_run:cmd("switch replica") + +-- insert a conflicting row +box.space.test:replace({4, 'r'}) +v1 = box.info.vclock + +test_run:cmd("switch default") +-- create a two-row transaction with conflicting second +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit() +-- create a third transaction +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit() + +test_run:cmd("switch replica") +-- nothing was applied +v1[1] == box.info.vclock[1] +box.space.test:select() +-- check replication status +box.info.replication[1].upstream.status +box.info.replication[1].upstream.message +-- set conflict to third transaction +_ = box.space.test:delete({4}) +box.space.test:replace({6, 'r'}) +-- restart replication +replication = box.cfg.replication +box.cfg{replication = {}} +box.cfg{replication = replication} +-- replication stopped of third transaction +v1[1] + 2 == box.info.vclock[1] +box.space.test:select() +-- check replication status +box.info.replication[1].upstream.status +box.info.replication[1].upstream.message + +-- check restart does not help +test_run:cmd("switch default") +test_run:cmd("restart server replica") +test_run:cmd("switch replica") + +box.space.test:select() +-- set skip conflict rows and check that non-conflicting were applied +replication = box.cfg.replication +box.cfg{replication = {}, replication_skip_conflict = true} +box.cfg{replication = replication} + +-- check last transaction applied without conflicting row +box.space.test:select() +box.info.replication[1].upstream.status + +-- make some new conflicting rows with skip-conflicts +box.space.test:replace({8, 'r'}) +box.space.test:replace({9, 'r'}) + +-- issue a conflicting tx +test_run:cmd("switch default") +box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit() + +test_run:cmd("switch replica") +-- vclock should be increased but rows skipped +box.space.test:select() + +-- check restart does not change something +test_run:cmd("switch default") +test_run:cmd("restart server replica") +test_run:cmd("switch replica") + +box.space.test:select() +box.info.replication[1].upstream.status + +test_run:cmd("switch default") +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") + +box.schema.user.revoke('guest', 'replication') +s:drop() -- 2.21.0 ^ permalink raw reply [flat|nested] 6+ messages in thread
end of thread, other threads:[~2019-03-10 20:21 UTC | newest] Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- [not found] <cover.1550001848.git.georgy@tarantool.org> 2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries Georgy Kirichenko 2019-02-15 13:15 ` Vladimir Davydov 2019-02-19 14:59 ` [tarantool-patches] " Konstantin Osipov 2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 2/2] Transaction support for applier Georgy Kirichenko 2019-02-18 9:36 ` Vladimir Davydov 2019-03-10 20:21 [tarantool-patches] [PATCH v3 0/2] Transaction boundaries " Georgy Kirichenko 2019-03-10 20:21 ` [tarantool-patches] [PATCH v3 2/2] Transaction support " Georgy Kirichenko
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox