* [tarantool-patches] [PATCH v3 0/2] Transaction boundaries for applier @ 2019-03-10 20:21 Georgy Kirichenko 2019-03-10 20:21 ` [tarantool-patches] [PATCH] Don't free fiber gc on while txn_rollback Georgy Kirichenko ` (2 more replies) 0 siblings, 3 replies; 14+ messages in thread From: Georgy Kirichenko @ 2019-03-10 20:21 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko This patchset consists of thwo patches. * The first one creates a separate journal transaction for all local effects in case of replication what is needed to be able to replicate such effects back. * The second one turns applier into transaction mode - an applier first fetches the whole transaction and then applies all rows within transaction boundaries. Changes in v3: * rebased against latest 2.1. * use n_local_rows/n_remote_rows counters. * fixes and refactoring according to review. Changes in v2: * Get rid of apply_initial_journal_row and apply_row from box.cc and purge box dependency from applier. * txn.cc and txn.h changes moved into a separate commit. * applier_read_tx uses stailq to form a list of rows in a transaction. * use exceptions for applier routines. * slight changes according to review. Issue: https://github.com/tarantool/tarantool/issues/2798 Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-2798-transaction-boundaries Georgy Kirichenko (2): Write rows without a lsn to the transaction tail Transaction support for applier src/box/applier.cc | 219 +++++++++++++++++------ src/box/txn.c | 42 +++-- src/box/txn.h | 6 +- test/replication/transaction.result | 242 ++++++++++++++++++++++++++ test/replication/transaction.test.lua | 86 +++++++++ 5 files changed, 525 insertions(+), 70 deletions(-) create mode 100644 test/replication/transaction.result create mode 100644 test/replication/transaction.test.lua -- 2.21.0 ^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] [PATCH] Don't free fiber gc on while txn_rollback 2019-03-10 20:21 [tarantool-patches] [PATCH v3 0/2] Transaction boundaries for applier Georgy Kirichenko @ 2019-03-10 20:21 ` Georgy Kirichenko 2019-03-11 8:12 ` [tarantool-patches] " Konstantin Osipov 2019-03-10 20:21 ` [tarantool-patches] [PATCH v3 1/2] Write rows without a lsn to the transaction tail Georgy Kirichenko 2019-03-10 20:21 ` [tarantool-patches] [PATCH v3 2/2] Transaction support for applier Georgy Kirichenko 2 siblings, 1 reply; 14+ messages in thread From: Georgy Kirichenko @ 2019-03-10 20:21 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko In some cases volatile memory should be usable after txn_rollback invocation. So move fiber_gx to the box_txn_rollback from the txn_rollback. Follow-up 77fa1736dbb9 box: factor fiber_gc out of txn_commit Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-2618-txn-rollback-gc-follow-up --- src/box/txn.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/box/txn.c b/src/box/txn.c index 7900fb3ab..1f488bbcc 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -399,8 +399,6 @@ txn_rollback() txn_stmt_unref_tuples(stmt); TRASH(txn); - /** Free volatile txn memory. */ - fiber_gc(); fiber_set_txn(fiber(), NULL); } @@ -480,6 +478,8 @@ box_txn_rollback() return -1; } txn_rollback(); /* doesn't throw */ + /** Free volatile txn memory. */ + fiber_gc(); return 0; } -- 2.21.0 ^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH] Don't free fiber gc on while txn_rollback 2019-03-10 20:21 ` [tarantool-patches] [PATCH] Don't free fiber gc on while txn_rollback Georgy Kirichenko @ 2019-03-11 8:12 ` Konstantin Osipov 0 siblings, 0 replies; 14+ messages in thread From: Konstantin Osipov @ 2019-03-11 8:12 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko * Georgy Kirichenko <georgy@tarantool.org> [19/03/11 09:55]: > In some cases volatile memory should be usable after txn_rollback > invocation. So move fiber_gx to the box_txn_rollback from the > txn_rollback. txn_rollback() is used in a bunch of places - call.c, transaction rollback in case of error. Are we going to leave the memory behind now in such cases? What exactly do you need this change for, what case? Could you please write in a changeset comment? It would also be nice if you turned on spell checking in your git commit message editor. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] [PATCH v3 1/2] Write rows without a lsn to the transaction tail 2019-03-10 20:21 [tarantool-patches] [PATCH v3 0/2] Transaction boundaries for applier Georgy Kirichenko 2019-03-10 20:21 ` [tarantool-patches] [PATCH] Don't free fiber gc on while txn_rollback Georgy Kirichenko @ 2019-03-10 20:21 ` Georgy Kirichenko 2019-03-11 8:14 ` [tarantool-patches] " Konstantin Osipov ` (2 more replies) 2019-03-10 20:21 ` [tarantool-patches] [PATCH v3 2/2] Transaction support for applier Georgy Kirichenko 2 siblings, 3 replies; 14+ messages in thread From: Georgy Kirichenko @ 2019-03-10 20:21 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko Form a separate transaction with local changes in case of replication. This is important because we should be able to replicate such changes (e.g. made within an on_replace trigger) back. In the opposite case local changes will be incorporated into originating transaction and would be skipped by the originator replica. Needed for: #2798 --- src/box/txn.c | 42 +++++++++++++++++++++++++++++------------- src/box/txn.h | 6 ++++-- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/src/box/txn.c b/src/box/txn.c index 1f488bbcc..d3d008c25 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -118,9 +118,13 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp) stailq_foreach_entry(stmt, &rollback, next) { if (txn->engine != NULL && stmt->space != NULL) engine_rollback_statement(txn->engine, txn, stmt); - if (stmt->row != NULL) { - assert(txn->n_rows > 0); - txn->n_rows--; + if (stmt->row != NULL && stmt->row->replica_id == 0) { + assert(txn->n_local_rows > 0); + txn->n_local_rows--; + } + if (stmt->row != NULL && stmt->row->replica_id != 0) { + assert(txn->n_remote_rows > 0); + txn->n_remote_rows--; } txn_stmt_unref_tuples(stmt); stmt->space = NULL; @@ -140,7 +144,8 @@ txn_begin(bool is_autocommit) } /* Initialize members explicitly to save time on memset() */ stailq_create(&txn->stmts); - txn->n_rows = 0; + txn->n_local_rows = 0; + txn->n_remote_rows = 0; txn->is_autocommit = is_autocommit; txn->has_triggers = false; txn->is_aborted = false; @@ -233,7 +238,11 @@ txn_commit_stmt(struct txn *txn, struct request *request) if (stmt->space == NULL || !space_is_temporary(stmt->space)) { if (txn_add_redo(stmt, request) != 0) goto fail; - ++txn->n_rows; + assert(stmt->row != NULL); + if (stmt->row->replica_id == 0) + ++txn->n_local_rows; + else + ++txn->n_remote_rows; } /* * If there are triggers, and they are not disabled, and @@ -264,21 +273,27 @@ fail: static int64_t txn_write_to_wal(struct txn *txn) { - assert(txn->n_rows > 0); + assert(txn->n_local_rows + txn->n_remote_rows > 0); - struct journal_entry *req = journal_entry_new(txn->n_rows); + struct journal_entry *req = journal_entry_new(txn->n_local_rows + + txn->n_remote_rows); if (req == NULL) return -1; struct txn_stmt *stmt; - struct xrow_header **row = req->rows; + struct xrow_header **remote_row = req->rows; + struct xrow_header **local_row = req->rows + txn->n_remote_rows; stailq_foreach_entry(stmt, &txn->stmts, next) { if (stmt->row == NULL) continue; /* A read (e.g. select) request */ - *row++ = stmt->row; + if (stmt->row->replica_id == 0) + *local_row++ = stmt->row; + else + *remote_row++ = stmt->row; req->approx_len += xrow_approx_len(stmt->row); } - assert(row == req->rows + req->n_rows); + assert(remote_row == req->rows + txn->n_remote_rows); + assert(local_row == remote_row + txn->n_local_rows); ev_tstamp start = ev_monotonic_now(loop()); int64_t res = journal_write(req); @@ -296,9 +311,10 @@ txn_write_to_wal(struct txn *txn) diag_set(ClientError, ER_WAL_IO); diag_log(); } else if (stop - start > too_long_threshold) { + int n_rows = txn->n_local_rows + txn->n_remote_rows; say_warn_ratelimited("too long WAL write: %d rows at " - "LSN %lld: %.3f sec", txn->n_rows, - res - txn->n_rows + 1, stop - start); + "LSN %lld: %.3f sec", n_rows, + res - n_rows + 1, stop - start); } /* * Use vclock_sum() from WAL writer as transaction signature. @@ -331,7 +347,7 @@ txn_commit(struct txn *txn) goto fail; } - if (txn->n_rows > 0) { + if (txn->n_local_rows + txn->n_remote_rows > 0) { txn->signature = txn_write_to_wal(txn); if (txn->signature < 0) goto fail; diff --git a/src/box/txn.h b/src/box/txn.h index de5cb0de4..c9829da9e 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -140,8 +140,10 @@ struct txn { int64_t id; /** List of statements in a transaction. */ struct stailq stmts; - /** Total number of WAL rows in this txn. */ - int n_rows; + /** Number of new rows without an assigned lsn. */ + int n_local_rows; + /** Number of rows with an already assigned lsn. */ + int n_remote_rows; /** * True if this transaction is running in autocommit mode * (statement end causes an automatic transaction commit). -- 2.21.0 ^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH v3 1/2] Write rows without a lsn to the transaction tail 2019-03-10 20:21 ` [tarantool-patches] [PATCH v3 1/2] Write rows without a lsn to the transaction tail Georgy Kirichenko @ 2019-03-11 8:14 ` Konstantin Osipov 2019-03-11 8:40 ` [tarantool-patches] " Vladimir Davydov 2019-03-11 9:59 ` [tarantool-patches] " Konstantin Osipov 2 siblings, 0 replies; 14+ messages in thread From: Konstantin Osipov @ 2019-03-11 8:14 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko * Georgy Kirichenko <georgy@tarantool.org> [19/03/11 09:55]: > Form a separate transaction with local changes in case of replication. > This is important because we should be able to replicate such changes > (e.g. made within an on_replace trigger) back. In the opposite case > local changes will be incorporated into originating transaction and > would be skipped by the originator replica. Looks good to me. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [tarantool-patches] [PATCH v3 1/2] Write rows without a lsn to the transaction tail 2019-03-10 20:21 ` [tarantool-patches] [PATCH v3 1/2] Write rows without a lsn to the transaction tail Georgy Kirichenko 2019-03-11 8:14 ` [tarantool-patches] " Konstantin Osipov @ 2019-03-11 8:40 ` Vladimir Davydov 2019-03-11 9:59 ` [tarantool-patches] " Konstantin Osipov 2 siblings, 0 replies; 14+ messages in thread From: Vladimir Davydov @ 2019-03-11 8:40 UTC (permalink / raw) To: Georgy Kirichenko; +Cc: tarantool-patches On Sun, Mar 10, 2019 at 11:21:27PM +0300, Georgy Kirichenko wrote: > Form a separate transaction with local changes in case of replication. > This is important because we should be able to replicate such changes > (e.g. made within an on_replace trigger) back. In the opposite case > local changes will be incorporated into originating transaction and > would be skipped by the originator replica. > > Needed for: #2798 > --- > src/box/txn.c | 42 +++++++++++++++++++++++++++++------------- > src/box/txn.h | 6 ++++-- > 2 files changed, 33 insertions(+), 15 deletions(-) Pushed to 2.1. ^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH v3 1/2] Write rows without a lsn to the transaction tail 2019-03-10 20:21 ` [tarantool-patches] [PATCH v3 1/2] Write rows without a lsn to the transaction tail Georgy Kirichenko 2019-03-11 8:14 ` [tarantool-patches] " Konstantin Osipov 2019-03-11 8:40 ` [tarantool-patches] " Vladimir Davydov @ 2019-03-11 9:59 ` Konstantin Osipov 2019-03-11 10:54 ` Georgy Kirichenko 2 siblings, 1 reply; 14+ messages in thread From: Konstantin Osipov @ 2019-03-11 9:59 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko * Georgy Kirichenko <georgy@tarantool.org> [19/03/11 09:55]: > Form a separate transaction with local changes in case of replication. > This is important because we should be able to replicate such changes > (e.g. made within an on_replace trigger) back. In the opposite case > local changes will be incorporated into originating transaction and > would be skipped by the originator replica. I wonder will we possibly have some recovery issues, since in fact we're performing a reordering of execution here? Imagine local and remote statements change the same set of rows. During initial execution these changes are intermixed, during recovery they are serialized. It seems we clearly have a problem here. We can either open a bug, support multiple txn ids in the same stream, support multiple server ids in the same transaction, ban triggers in multi-statement transaction? Can we attribute local changes to the same server id? We don't have to replicate them back - this is a gray zone and we can do it in any way we want. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH v3 1/2] Write rows without a lsn to the transaction tail 2019-03-11 9:59 ` [tarantool-patches] " Konstantin Osipov @ 2019-03-11 10:54 ` Georgy Kirichenko 2019-03-11 14:04 ` Konstantin Osipov 0 siblings, 1 reply; 14+ messages in thread From: Georgy Kirichenko @ 2019-03-11 10:54 UTC (permalink / raw) To: tarantool-patches; +Cc: Konstantin Osipov [-- Attachment #1: Type: text/plain, Size: 2008 bytes --] On Monday, March 11, 2019 12:59:26 PM MSK Konstantin Osipov wrote: > * Georgy Kirichenko <georgy@tarantool.org> [19/03/11 09:55]: > > Form a separate transaction with local changes in case of replication. > > This is important because we should be able to replicate such changes > > (e.g. made within an on_replace trigger) back. In the opposite case > > local changes will be incorporated into originating transaction and > > would be skipped by the originator replica. > > I wonder will we possibly have some recovery issues, since in fact > we're performing a reordering of execution here? > > Imagine local and remote statements change the same set of rows. > During initial execution these changes are intermixed, during > recovery they are serialized. If you remember we were agreed that only local spaces are allowed to change in case of replication triggers. > > It seems we clearly have a problem here. We can either open a bug, > support multiple txn ids in the same stream, support multiple > server ids in the same transaction, ban triggers in > multi-statement transaction? You pushed me to remove txn_replica_id but it was one of the instruments I planed to use in order to support distributed transactions (with multiple replica ids in the same transaction) in the future. So I would prefer if we just disable changing of non-local spaces during replication. In such case we won't have any issues with reordering. > > Can we attribute local changes to the same server id? It is impossible because of lsn > We don't have to replicate them back - this is a gray zone and we can do it > in any way we want. I'm afraid no because we already have this functionality and it is even covered with tests. So we have to make a high level decision: what is expected behavior. In any case I will be agreed with your decision what we should to do: disable non-local replication changes, change behavior of replication for such changes or start further distributed transaction investigation. [-- Attachment #2: This is a digitally signed message part. --] [-- Type: application/pgp-signature, Size: 488 bytes --] ^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH v3 1/2] Write rows without a lsn to the transaction tail 2019-03-11 10:54 ` Georgy Kirichenko @ 2019-03-11 14:04 ` Konstantin Osipov 2019-03-11 19:52 ` Георгий Кириченко 0 siblings, 1 reply; 14+ messages in thread From: Konstantin Osipov @ 2019-03-11 14:04 UTC (permalink / raw) To: Georgy Kirichenko; +Cc: tarantool-patches * Georgy Kirichenko <georgy@tarantool.org> [19/03/11 13:54]: It seems we should be able to allow using statements with different server_id in the same transaction. When deciding which transactions to send back in multi-master mode, we should only look at the first statement to find out the source (origin) of the transaction and either send all statements in the transaction or skip all statements. > On Monday, March 11, 2019 12:59:26 PM MSK Konstantin Osipov wrote: > > * Georgy Kirichenko <georgy@tarantool.org> [19/03/11 09:55]: > > > Form a separate transaction with local changes in case of replication. > > > This is important because we should be able to replicate such changes > > > (e.g. made within an on_replace trigger) back. In the opposite case > > > local changes will be incorporated into originating transaction and > > > would be skipped by the originator replica. > > > > I wonder will we possibly have some recovery issues, since in fact > > we're performing a reordering of execution here? > > > > Imagine local and remote statements change the same set of rows. > > During initial execution these changes are intermixed, during > > recovery they are serialized. > If you remember we were agreed that only local spaces are allowed to change in > case of replication triggers. > > > > It seems we clearly have a problem here. We can either open a bug, > > support multiple txn ids in the same stream, support multiple > > server ids in the same transaction, ban triggers in > > multi-statement transaction? > You pushed me to remove txn_replica_id but it was one of the instruments I > planed to use in order to support distributed transactions (with multiple > replica ids in the same transaction) in the future. So I would prefer if we > just disable changing of non-local spaces during replication. In such case we > won't have any issues with reordering. > > > > Can we attribute local changes to the same server id? > It is impossible because of lsn > > We don't have to replicate them back - this is a gray zone and we can do it > > in any way we want. > I'm afraid no because we already have this functionality and it is even > covered with tests. > > So we have to make a high level decision: what is expected behavior. > In any case I will be agreed with your decision what we should to do: disable > non-local replication changes, change behavior of replication for such changes > or start further distributed transaction investigation. > > -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH v3 1/2] Write rows without a lsn to the transaction tail 2019-03-11 14:04 ` Konstantin Osipov @ 2019-03-11 19:52 ` Георгий Кириченко 0 siblings, 0 replies; 14+ messages in thread From: Георгий Кириченко @ 2019-03-11 19:52 UTC (permalink / raw) To: Konstantin Osipov; +Cc: tarantool-patches [-- Attachment #1: Type: text/plain, Size: 3657 bytes --] On Monday, March 11, 2019 5:04:39 PM MSK Konstantin Osipov wrote: > * Georgy Kirichenko <georgy@tarantool.org> [19/03/11 13:54]: > > It seems we should be able to allow using statements with > different server_id in the same transaction. When deciding which > transactions to send back in multi-master mode, we should only > look at the first statement to find out the source (origin) of the > transaction and either send all statements in the transaction or > skip all statements. I don't think it is a good approach because of: 1. If replica A produces a transaction and replica B writes this transaction with local changes then state of replica C is unknown (we couldn't predict which replica A or B replicates faster). Also what should happen if C replicates from A and D replicates from B - they both will have different data. 2. In case of synchronous replication replica B how replica B should confirm its local changes? Using replica A confirmation, but replica A doesn't know anything about that as well as other replicaset items that replicates from A first. So if this local changes could not even be replicated should we allow such changes only for local spaces? 3. This breaks row format - now each row has full info about its transaction (replica id and tsn), but without separate tsn_replica_id item we should rely on external information (tx row number, previous rows and other). Please keep in mind a possibility of interleaving transactions. > > > On Monday, March 11, 2019 12:59:26 PM MSK Konstantin Osipov wrote: > > > * Georgy Kirichenko <georgy@tarantool.org> [19/03/11 09:55]: > > > > Form a separate transaction with local changes in case of replication. > > > > This is important because we should be able to replicate such changes > > > > (e.g. made within an on_replace trigger) back. In the opposite case > > > > local changes will be incorporated into originating transaction and > > > > would be skipped by the originator replica. > > > > > > I wonder will we possibly have some recovery issues, since in fact > > > we're performing a reordering of execution here? > > > > > > Imagine local and remote statements change the same set of rows. > > > During initial execution these changes are intermixed, during > > > recovery they are serialized. > > > > If you remember we were agreed that only local spaces are allowed to > > change in case of replication triggers. > > > > > It seems we clearly have a problem here. We can either open a bug, > > > support multiple txn ids in the same stream, support multiple > > > server ids in the same transaction, ban triggers in > > > multi-statement transaction? > > > > You pushed me to remove txn_replica_id but it was one of the instruments I > > planed to use in order to support distributed transactions (with multiple > > replica ids in the same transaction) in the future. So I would prefer if > > we > > just disable changing of non-local spaces during replication. In such case > > we won't have any issues with reordering. > > > > > Can we attribute local changes to the same server id? > > > > It is impossible because of lsn > > > > > We don't have to replicate them back - this is a gray zone and we can do > > > it > > > in any way we want. > > > > I'm afraid no because we already have this functionality and it is even > > covered with tests. > > > > So we have to make a high level decision: what is expected behavior. > > In any case I will be agreed with your decision what we should to do: > > disable non-local replication changes, change behavior of replication for > > such changes or start further distributed transaction investigation. [-- Attachment #2: This is a digitally signed message part. --] [-- Type: application/pgp-signature, Size: 488 bytes --] ^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] [PATCH v3 2/2] Transaction support for applier 2019-03-10 20:21 [tarantool-patches] [PATCH v3 0/2] Transaction boundaries for applier Georgy Kirichenko 2019-03-10 20:21 ` [tarantool-patches] [PATCH] Don't free fiber gc on while txn_rollback Georgy Kirichenko 2019-03-10 20:21 ` [tarantool-patches] [PATCH v3 1/2] Write rows without a lsn to the transaction tail Georgy Kirichenko @ 2019-03-10 20:21 ` Georgy Kirichenko 2019-03-11 8:18 ` [tarantool-patches] " Konstantin Osipov 2 siblings, 1 reply; 14+ messages in thread From: Georgy Kirichenko @ 2019-03-10 20:21 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko Applier fetch incoming rows to form a transaction and then apply it. Rows are fetched and stored on fiber gc region until last transaction row with is_commit was fetched. After fetch a multi row transaction is going to be applied into txn_begin/txn_commit/txn_rolback boundaries. At this time we could not apply single row transaction in such boundaries because of ddl which does not support non auto commit transactions. Closes: #2798 Needed for: #980 --- src/box/applier.cc | 219 +++++++++++++++++------ test/replication/transaction.result | 242 ++++++++++++++++++++++++++ test/replication/transaction.test.lua | 86 +++++++++ 3 files changed, 492 insertions(+), 55 deletions(-) create mode 100644 test/replication/transaction.result create mode 100644 test/replication/transaction.test.lua diff --git a/src/box/applier.cc b/src/box/applier.cc index 94c07aac7..5af132377 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -426,6 +426,159 @@ applier_join(struct applier *applier) applier_set_state(applier, APPLIER_READY); } +/** + * Helper struct to bind rows in a list. + */ +struct applier_tx_row { + /* Next transaction row. */ + struct stailq_entry next; + /* xrow_header struct for the current transaction row. */ + struct xrow_header row; +}; + +static struct applier_tx_row * +applier_read_tx_row(struct applier *applier) +{ + struct ev_io *coio = &applier->io; + struct ibuf *ibuf = &applier->ibuf; + + struct applier_tx_row *tx_row = (struct applier_tx_row *) + region_alloc(&fiber()->gc, sizeof(struct applier_tx_row)); + + if (tx_row == NULL) + tnt_raise(OutOfMemory, sizeof(struct applier_tx_row), + "region", "struct applier_tx_row"); + + struct xrow_header *row = &tx_row->row; + + double timeout = replication_disconnect_timeout(); + /* + * Tarantool < 1.7.7 does not send periodic heartbeat + * messages so we can't assume that if we haven't heard + * from the master for quite a while the connection is + * broken - the master might just be idle. + */ + if (applier->version_id < version_id(1, 7, 7)) + coio_read_xrow(coio, ibuf, row); + else + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); + + applier->lag = ev_now(loop()) - row->tm; + applier->last_row_time = ev_monotonic_now(loop()); + return tx_row; +} + +/** + * Read one transaction from network using applier's input buffer. + * Transaction rows are placed onto fiber gc region. + * We could not use applier input buffer for that because rpos is adjusted + * after each xrow decoding and corresponding network input space is going + * to be reused. + */ +static void +applier_read_tx(struct applier *applier, struct stailq *rows) +{ + int64_t tsn = 0; + + stailq_create(rows); + do { + struct applier_tx_row *tx_row = applier_read_tx_row(applier); + struct xrow_header *row = &tx_row->row; + + if (iproto_type_is_error(row->type)) + xrow_decode_error_xc(row); + + /* Replication request. */ + if (row->replica_id == REPLICA_ID_NIL || + row->replica_id >= VCLOCK_MAX) { + /* + * A safety net, this can only occur + * if we're fed a strangely broken xlog. + */ + tnt_raise(ClientError, ER_UNKNOWN_REPLICA, + int2str(row->replica_id), + tt_uuid_str(&REPLICASET_UUID)); + } + if (tsn == 0) { + /* + * Transaction id must be derived from the log sequence + * number of the first row in the transaction. + */ + tsn = row->tsn; + if (row->lsn != tsn) + tnt_raise(ClientError, ER_PROTOCOL, + "Transaction id must be derived from " + "the lsn of the first row in the " + "transaction."); + } + if (tsn != row->tsn) + tnt_raise(ClientError, ER_UNSUPPORTED, + "replication", + "interleaving transactions"); + + assert(row->bodycnt <= 1); + if (row->bodycnt == 1 && !row->is_commit) { + /* Save row body to gc region. */ + void *new_base = region_alloc(&fiber()->gc, + row->body->iov_len); + if (new_base == NULL) + tnt_raise(OutOfMemory, row->body->iov_len, + "region", "xrow body"); + memcpy(new_base, row->body->iov_base, row->body->iov_len); + /* Adjust row body pointers. */ + row->body->iov_base = new_base; + } + stailq_add_tail(rows, &tx_row->next); + + } while (!stailq_last_entry(rows, struct applier_tx_row, + next)->row.is_commit); +} + +/** + * Apply all rows in the rows queue as a single transaction. + * + * Return 0 for success or -1 in case of an error. + */ +static int +applier_apply_tx(struct stailq *rows) +{ + int res = 0; + struct txn *txn = NULL; + struct applier_tx_row *item; + if (stailq_first(rows) != stailq_last(rows) && + (txn = txn_begin(false)) == NULL) + diag_raise(); + stailq_foreach_entry(item, rows, next) { + struct xrow_header *row = &item->row; + res = apply_row(row); + if (res != 0) { + struct error *e = diag_last_error(diag_get()); + /* + * In case of ER_TUPLE_FOUND error and enabled + * replication_skip_conflict configuration + * option, skip applying the foreign row and + * replace it with NOP in the local write ahead + * log. + */ + if (e->type == &type_ClientError && + box_error_code(e) == ER_TUPLE_FOUND && + replication_skip_conflict) { + diag_clear(diag_get()); + row->type = IPROTO_NOP; + row->bodycnt = 0; + res = apply_row(row); + } + } + if (res != 0) + break; + } + if (res == 0 && txn != NULL) + res = txn_commit(txn); + if (res != 0) + txn_rollback(); + return res; +} + /** * Execute and process SUBSCRIBE request (follow updates from a master). */ @@ -555,36 +708,14 @@ applier_subscribe(struct applier *applier) applier_set_state(applier, APPLIER_FOLLOW); } - /* - * Tarantool < 1.7.7 does not send periodic heartbeat - * messages so we can't assume that if we haven't heard - * from the master for quite a while the connection is - * broken - the master might just be idle. - */ - if (applier->version_id < version_id(1, 7, 7)) { - coio_read_xrow(coio, ibuf, &row); - } else { - double timeout = replication_disconnect_timeout(); - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout); - } - - if (iproto_type_is_error(row.type)) - xrow_decode_error_xc(&row); /* error */ - /* Replication request. */ - if (row.replica_id == REPLICA_ID_NIL || - row.replica_id >= VCLOCK_MAX) { - /* - * A safety net, this can only occur - * if we're fed a strangely broken xlog. - */ - tnt_raise(ClientError, ER_UNKNOWN_REPLICA, - int2str(row.replica_id), - tt_uuid_str(&REPLICASET_UUID)); - } + struct stailq rows; + applier_read_tx(applier, &rows); - applier->lag = ev_now(loop()) - row.tm; + struct xrow_header *first_row = + &stailq_first_entry(&rows, struct applier_tx_row, + next)->row; applier->last_row_time = ev_monotonic_now(loop()); - struct replica *replica = replica_by_id(row.replica_id); + struct replica *replica = replica_by_id(first_row->replica_id); struct latch *latch = (replica ? &replica->order_latch : &replicaset.applier.order_latch); /* @@ -594,33 +725,11 @@ applier_subscribe(struct applier *applier) * that belong to the same server id. */ latch_lock(latch); - if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { - int res = apply_row(&row); - if (res != 0) { - struct error *e = diag_last_error(diag_get()); - /* - * In case of ER_TUPLE_FOUND error and enabled - * replication_skip_conflict configuration - * option, skip applying the foreign row and - * replace it with NOP in the local write ahead - * log. - */ - if (e->type == &type_ClientError && - box_error_code(e) == ER_TUPLE_FOUND && - replication_skip_conflict) { - diag_clear(diag_get()); - struct xrow_header nop; - nop.type = IPROTO_NOP; - nop.bodycnt = 0; - nop.replica_id = row.replica_id; - nop.lsn = row.lsn; - res = apply_row(&nop); - } - } - if (res != 0) { - latch_unlock(latch); - diag_raise(); - } + if (vclock_get(&replicaset.vclock, first_row->replica_id) < + first_row->lsn && + applier_apply_tx(&rows) != 0) { + latch_unlock(latch); + diag_raise(); } latch_unlock(latch); diff --git a/test/replication/transaction.result b/test/replication/transaction.result new file mode 100644 index 000000000..8c2ac6ee4 --- /dev/null +++ b/test/replication/transaction.result @@ -0,0 +1,242 @@ +env = require('test_run') +--- +... +test_run = env.new() +--- +... +box.schema.user.grant('guest', 'replication') +--- +... +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) +--- +... +_ = s:create_index('pk') +--- +... +-- transaction w/o conflict +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit() +--- +... +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +--- +- true +... +test_run:cmd("start server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +-- insert a conflicting row +box.space.test:replace({4, 'r'}) +--- +- [4, 'r'] +... +v1 = box.info.vclock +--- +... +test_run:cmd("switch default") +--- +- true +... +-- create a two-row transaction with conflicting second +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit() +--- +... +-- create a third transaction +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit() +--- +... +test_run:cmd("switch replica") +--- +- true +... +-- nothing was applied +v1[1] == box.info.vclock[1] +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [4, 'r'] +... +-- check replication status +box.info.replication[1].upstream.status +--- +- stopped +... +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'pk' in space 'test' +... +-- set conflict to third transaction +_ = box.space.test:delete({4}) +--- +... +box.space.test:replace({6, 'r'}) +--- +- [6, 'r'] +... +-- restart replication +replication = box.cfg.replication +--- +... +box.cfg{replication = {}} +--- +... +box.cfg{replication = replication} +--- +... +-- replication stopped of third transaction +v1[1] + 2 == box.info.vclock[1] +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'm'] + - [6, 'r'] +... +-- check replication status +box.info.replication[1].upstream.status +--- +- stopped +... +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'pk' in space 'test' +... +-- check restart does not help +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("restart server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'm'] + - [6, 'r'] +... +-- set skip conflict rows and check that non-conflicting were applied +replication = box.cfg.replication +--- +... +box.cfg{replication = {}, replication_skip_conflict = true} +--- +... +box.cfg{replication = replication} +--- +... +-- check last transaction applied without conflicting row +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'm'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] +... +box.info.replication[1].upstream.status +--- +- follow +... +-- make some new conflicting rows with skip-conflicts +box.space.test:replace({8, 'r'}) +--- +- [8, 'r'] +... +box.space.test:replace({9, 'r'}) +--- +- [9, 'r'] +... +-- issue a conflicting tx +test_run:cmd("switch default") +--- +- true +... +box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit() +--- +... +test_run:cmd("switch replica") +--- +- true +... +-- vclock should be increased but rows skipped +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'm'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] + - [8, 'r'] + - [9, 'r'] +... +-- check restart does not change something +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("restart server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'm'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] + - [8, 'r'] + - [9, 'r'] +... +box.info.replication[1].upstream.status +--- +- follow +... +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... +box.schema.user.revoke('guest', 'replication') +--- +... +s:drop() +--- +... diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua new file mode 100644 index 000000000..f25a4737d --- /dev/null +++ b/test/replication/transaction.test.lua @@ -0,0 +1,86 @@ +env = require('test_run') +test_run = env.new() +box.schema.user.grant('guest', 'replication') + +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) +_ = s:create_index('pk') + +-- transaction w/o conflict +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit() + +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +test_run:cmd("start server replica") +test_run:cmd("switch replica") + +-- insert a conflicting row +box.space.test:replace({4, 'r'}) +v1 = box.info.vclock + +test_run:cmd("switch default") +-- create a two-row transaction with conflicting second +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit() +-- create a third transaction +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit() + +test_run:cmd("switch replica") +-- nothing was applied +v1[1] == box.info.vclock[1] +box.space.test:select() +-- check replication status +box.info.replication[1].upstream.status +box.info.replication[1].upstream.message +-- set conflict to third transaction +_ = box.space.test:delete({4}) +box.space.test:replace({6, 'r'}) +-- restart replication +replication = box.cfg.replication +box.cfg{replication = {}} +box.cfg{replication = replication} +-- replication stopped of third transaction +v1[1] + 2 == box.info.vclock[1] +box.space.test:select() +-- check replication status +box.info.replication[1].upstream.status +box.info.replication[1].upstream.message + +-- check restart does not help +test_run:cmd("switch default") +test_run:cmd("restart server replica") +test_run:cmd("switch replica") + +box.space.test:select() +-- set skip conflict rows and check that non-conflicting were applied +replication = box.cfg.replication +box.cfg{replication = {}, replication_skip_conflict = true} +box.cfg{replication = replication} + +-- check last transaction applied without conflicting row +box.space.test:select() +box.info.replication[1].upstream.status + +-- make some new conflicting rows with skip-conflicts +box.space.test:replace({8, 'r'}) +box.space.test:replace({9, 'r'}) + +-- issue a conflicting tx +test_run:cmd("switch default") +box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit() + +test_run:cmd("switch replica") +-- vclock should be increased but rows skipped +box.space.test:select() + +-- check restart does not change something +test_run:cmd("switch default") +test_run:cmd("restart server replica") +test_run:cmd("switch replica") + +box.space.test:select() +box.info.replication[1].upstream.status + +test_run:cmd("switch default") +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") + +box.schema.user.revoke('guest', 'replication') +s:drop() -- 2.21.0 ^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH v3 2/2] Transaction support for applier 2019-03-10 20:21 ` [tarantool-patches] [PATCH v3 2/2] Transaction support for applier Georgy Kirichenko @ 2019-03-11 8:18 ` Konstantin Osipov 0 siblings, 0 replies; 14+ messages in thread From: Konstantin Osipov @ 2019-03-11 8:18 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko * Georgy Kirichenko <georgy@tarantool.org> [19/03/11 09:55]: > Applier fetch incoming rows to form a transaction and then apply it. > Rows are fetched and stored on fiber gc region until last transaction row > with is_commit was fetched. After fetch a multi row transaction is going to be > applied into txn_begin/txn_commit/txn_rolback boundaries. At this time > we could not apply single row transaction in such boundaries because of > ddl which does not support non auto commit transactions. The patch is generally looking good to me, but I assume it depends on the first patch which changes the gc memory life cycle boundaries. Is it possible to fix this patch so that it doesn't depend on the first patch? Besides, I asked in the previous review for a test case involving on_replace/before_replace triggers. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 14+ messages in thread
[parent not found: <cover.1550001848.git.georgy@tarantool.org>]
* [tarantool-patches] [PATCH v3 2/2] Transaction support for applier [not found] <cover.1550001848.git.georgy@tarantool.org> @ 2019-02-12 20:04 ` Georgy Kirichenko 2019-02-18 9:36 ` Vladimir Davydov 0 siblings, 1 reply; 14+ messages in thread From: Georgy Kirichenko @ 2019-02-12 20:04 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko Applier fetch incoming rows to form a transaction and then apply it. Implementation assumes that transaction could not mix in a replication stream. Also distributed transaction are not supported yet. Closes: #2798 Needed for: #980 --- src/box/applier.cc | 185 +++++++++++++++----- test/replication/transaction.result | 240 ++++++++++++++++++++++++++ test/replication/transaction.test.lua | 86 +++++++++ 3 files changed, 471 insertions(+), 40 deletions(-) create mode 100644 test/replication/transaction.result create mode 100644 test/replication/transaction.test.lua diff --git a/src/box/applier.cc b/src/box/applier.cc index 7f37fe2ee..59c33bb84 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -48,6 +48,7 @@ #include "error.h" #include "session.h" #include "cfg.h" +#include "txn.h" STRS(applier_state, applier_STATE); @@ -378,6 +379,105 @@ applier_join(struct applier *applier) applier_set_state(applier, APPLIER_READY); } +/** + * Read one transaction from network. + * Transaction rows are placed into row_buf as an array, row's bodies are + * placed into obuf because it is not allowed to relocate row's bodies. + * Also we could not use applier input buffer because rpos adjusted after xrow + * decoding and corresponding space going to reuse. + * + * Note: current implementation grants that transaction could not be mixed, so + * we read each transaction from first xrow until xrow with txn_last = true. + */ +static int64_t +applier_read_tx(struct applier *applier, struct ibuf *row_buf, + struct obuf *data_buf) +{ + struct xrow_header *row; + struct ev_io *coio = &applier->io; + struct ibuf *ibuf = &applier->ibuf; + int64_t txn_id = 0; + + do { + row = (struct xrow_header *)ibuf_alloc(row_buf, + sizeof(struct xrow_header)); + if (row == NULL) { + diag_set(OutOfMemory, sizeof(struct xrow_header), + "slab", "struct xrow_header"); + goto error; + } + + double timeout = replication_disconnect_timeout(); + try { + /* TODO: we should have a C version of this function. */ + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); + } catch (...) { + goto error; + } + + if (iproto_type_is_error(row->type)) { + xrow_decode_error(row); + goto error; + } + + /* Replication request. */ + if (row->replica_id == REPLICA_ID_NIL || + row->replica_id >= VCLOCK_MAX) { + /* + * A safety net, this can only occur + * if we're fed a strangely broken xlog. + */ + diag_set(ClientError, ER_UNKNOWN_REPLICA, + int2str(row->replica_id), + tt_uuid_str(&REPLICASET_UUID)); + goto error; + } + if (ibuf_used(row_buf) == sizeof(struct xrow_header)) { + /* + * First row in a transaction. In order to enforce + * consistency check that first row lsn and replica id + * match with transaction. + */ + txn_id = row->txn_id; + if (row->lsn != txn_id) { + /* There is not a first row in the transactions. */ + diag_set(ClientError, ER_PROTOCOL, + "Not a first row in a transaction"); + goto error; + } + } + if (txn_id != row->txn_id) { + /* We are not able to handle interleaving transactions. */ + diag_set(ClientError, ER_UNSUPPORTED, + "replications", + "interleaving transactions"); + goto error; + } + + + applier->lag = ev_now(loop()) - row->tm; + applier->last_row_time = ev_monotonic_now(loop()); + + if (row->body->iov_base != NULL) { + void *new_base = obuf_alloc(data_buf, row->body->iov_len); + if (new_base == NULL) { + diag_set(OutOfMemory, row->body->iov_len, + "slab", "xrow_data"); + goto error; + } + memcpy(new_base, row->body->iov_base, row->body->iov_len); + row->body->iov_base = new_base; + } + + } while (row->txn_commit == 0); + + return 0; +error: + ibuf_reset(row_buf); + obuf_reset(data_buf); + return -1; +} + /** * Execute and process SUBSCRIBE request (follow updates from a master). */ @@ -392,6 +492,10 @@ applier_subscribe(struct applier *applier) struct xrow_header row; struct vclock remote_vclock_at_subscribe; struct tt_uuid cluster_id = uuid_nil; + struct ibuf row_buf; + struct obuf data_buf; + ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header)); + obuf_create(&data_buf, &cord()->slabc, 0x10000); xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, &replicaset.vclock); @@ -501,36 +605,16 @@ applier_subscribe(struct applier *applier) applier_set_state(applier, APPLIER_FOLLOW); } - /* - * Tarantool < 1.7.7 does not send periodic heartbeat - * messages so we can't assume that if we haven't heard - * from the master for quite a while the connection is - * broken - the master might just be idle. - */ - if (applier->version_id < version_id(1, 7, 7)) { - coio_read_xrow(coio, ibuf, &row); - } else { - double timeout = replication_disconnect_timeout(); - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout); - } + if (applier_read_tx(applier, &row_buf, &data_buf) != 0) + diag_raise(); - if (iproto_type_is_error(row.type)) - xrow_decode_error_xc(&row); /* error */ - /* Replication request. */ - if (row.replica_id == REPLICA_ID_NIL || - row.replica_id >= VCLOCK_MAX) { - /* - * A safety net, this can only occur - * if we're fed a strangely broken xlog. - */ - tnt_raise(ClientError, ER_UNKNOWN_REPLICA, - int2str(row.replica_id), - tt_uuid_str(&REPLICASET_UUID)); - } + struct txn *txn = NULL; + struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos; + struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1; - applier->lag = ev_now(loop()) - row.tm; + applier->lag = ev_now(loop()) - last_row->tm; applier->last_row_time = ev_monotonic_now(loop()); - struct replica *replica = replica_by_id(row.replica_id); + struct replica *replica = replica_by_id(first_row->replica_id); struct latch *latch = (replica ? &replica->order_latch : &replicaset.applier.order_latch); /* @@ -540,24 +624,45 @@ applier_subscribe(struct applier *applier) * that belong to the same server id. */ latch_lock(latch); - if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { - int res = xstream_write(applier->subscribe_stream, &row); - if (res != 0) { - struct error *e = diag_last_error(diag_get()); - /** - * Silently skip ER_TUPLE_FOUND error if such - * option is set in config. - */ - if (e->type == &type_ClientError && + if (vclock_get(&replicaset.vclock, + first_row->replica_id) < first_row->lsn) { + struct xrow_header *row = first_row; + if (first_row != last_row) + txn = txn_begin(false); + int res = 0; + while (row <= last_row && res == 0) { + res = xstream_write(applier->subscribe_stream, row); + struct error *e; + if (res != 0 && + (e = diag_last_error(diag_get()))->type == + &type_ClientError && box_error_code(e) == ER_TUPLE_FOUND && - replication_skip_conflict) + replication_skip_conflict) { + /** + * Silently skip ER_TUPLE_FOUND error + * if such option is set in config. + */ diag_clear(diag_get()); - else { - latch_unlock(latch); - diag_raise(); + row->type = IPROTO_NOP; + row->bodycnt = 0; + res = xstream_write(applier->subscribe_stream, + row); } + ++row; + } + if (res == 0 && txn != NULL) + res = txn_commit(txn); + + if (res != 0) { + txn_rollback(); + obuf_reset(&data_buf); + ibuf_reset(&row_buf); + latch_unlock(latch); + diag_raise(); } } + obuf_reset(&data_buf); + ibuf_reset(&row_buf); latch_unlock(latch); if (applier->state == APPLIER_SYNC || diff --git a/test/replication/transaction.result b/test/replication/transaction.result new file mode 100644 index 000000000..009f84430 --- /dev/null +++ b/test/replication/transaction.result @@ -0,0 +1,240 @@ +env = require('test_run') +--- +... +test_run = env.new() +--- +... +box.schema.user.grant('guest', 'replication') +--- +... +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) +--- +... +_ = s:create_index('pk') +--- +... +-- transaction w/o conflict +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit() +--- +... +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +--- +- true +... +test_run:cmd("start server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +-- insert a conflicting row +box.space.test:replace({4, 'r'}) +--- +- [4, 'r'] +... +v1 = box.info.vclock +--- +... +test_run:cmd("switch default") +--- +- true +... +-- create a two-row transaction with conflicting second +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit() +--- +... +-- create a third transaction +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit() +--- +... +test_run:cmd("switch replica") +--- +- true +... +-- nothing was applied +v1[1] == box.info.vclock[1] +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [4, 'r'] +... +-- check replication status +box.info.replication[1].upstream.status +--- +- stopped +... +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'pk' in space 'test' +... +-- set conflict to third transaction +box.space.test:delete({3}) +--- +... +box.space.test:replace({6, 'r'}) +--- +- [6, 'r'] +... +-- restart replication +replication = box.cfg.replication +--- +... +box.cfg{replication = {}} +--- +... +box.cfg{replication = replication} +--- +... +-- replication stopped of third transaction +v1[1] + 2 == box.info.vclock[1] +--- +- false +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [4, 'r'] + - [6, 'r'] +... +-- check replication status +box.info.replication[1].upstream.status +--- +- stopped +... +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'pk' in space 'test' +... +-- check restart does not help +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("restart server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [4, 'r'] + - [6, 'r'] +... +-- set skip conflict rows and check that non-conflicting were applied +replication = box.cfg.replication +--- +... +box.cfg{replication = {}, replication_skip_conflict = true} +--- +... +box.cfg{replication = replication} +--- +... +-- check last transaction applied without conflicting row +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'r'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] +... +box.info.replication[1].upstream.status +--- +- follow +... +-- make some new conflicting rows with skip-conflicts +box.space.test:replace({8, 'r'}) +--- +- [8, 'r'] +... +box.space.test:replace({9, 'r'}) +--- +- [9, 'r'] +... +-- issue a conflicting tx +test_run:cmd("switch default") +--- +- true +... +box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit() +--- +... +test_run:cmd("switch replica") +--- +- true +... +-- vclock should be increased but rows skipped +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'r'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] + - [8, 'r'] + - [9, 'r'] +... +-- check restart does not change something +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("restart server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'r'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] + - [8, 'r'] + - [9, 'r'] +... +box.info.replication[1].upstream.status +--- +- follow +... +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... +box.schema.user.revoke('guest', 'replication') +--- +... +s:drop() +--- +... diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua new file mode 100644 index 000000000..47003c644 --- /dev/null +++ b/test/replication/transaction.test.lua @@ -0,0 +1,86 @@ +env = require('test_run') +test_run = env.new() +box.schema.user.grant('guest', 'replication') + +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) +_ = s:create_index('pk') + +-- transaction w/o conflict +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit() + +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +test_run:cmd("start server replica") +test_run:cmd("switch replica") + +-- insert a conflicting row +box.space.test:replace({4, 'r'}) +v1 = box.info.vclock + +test_run:cmd("switch default") +-- create a two-row transaction with conflicting second +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit() +-- create a third transaction +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit() + +test_run:cmd("switch replica") +-- nothing was applied +v1[1] == box.info.vclock[1] +box.space.test:select() +-- check replication status +box.info.replication[1].upstream.status +box.info.replication[1].upstream.message +-- set conflict to third transaction +box.space.test:delete({3}) +box.space.test:replace({6, 'r'}) +-- restart replication +replication = box.cfg.replication +box.cfg{replication = {}} +box.cfg{replication = replication} +-- replication stopped of third transaction +v1[1] + 2 == box.info.vclock[1] +box.space.test:select() +-- check replication status +box.info.replication[1].upstream.status +box.info.replication[1].upstream.message + +-- check restart does not help +test_run:cmd("switch default") +test_run:cmd("restart server replica") +test_run:cmd("switch replica") + +box.space.test:select() +-- set skip conflict rows and check that non-conflicting were applied +replication = box.cfg.replication +box.cfg{replication = {}, replication_skip_conflict = true} +box.cfg{replication = replication} + +-- check last transaction applied without conflicting row +box.space.test:select() +box.info.replication[1].upstream.status + +-- make some new conflicting rows with skip-conflicts +box.space.test:replace({8, 'r'}) +box.space.test:replace({9, 'r'}) + +-- issue a conflicting tx +test_run:cmd("switch default") +box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit() + +test_run:cmd("switch replica") +-- vclock should be increased but rows skipped +box.space.test:select() + +-- check restart does not change something +test_run:cmd("switch default") +test_run:cmd("restart server replica") +test_run:cmd("switch replica") + +box.space.test:select() +box.info.replication[1].upstream.status + +test_run:cmd("switch default") +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") + +box.schema.user.revoke('guest', 'replication') +s:drop() -- 2.20.1 ^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [tarantool-patches] [PATCH v3 2/2] Transaction support for applier 2019-02-12 20:04 ` [tarantool-patches] " Georgy Kirichenko @ 2019-02-18 9:36 ` Vladimir Davydov 0 siblings, 0 replies; 14+ messages in thread From: Vladimir Davydov @ 2019-02-18 9:36 UTC (permalink / raw) To: Georgy Kirichenko; +Cc: tarantool-patches On Tue, Feb 12, 2019 at 11:04:32PM +0300, Georgy Kirichenko wrote: > Applier fetch incoming rows to form a transaction and then apply it. > Implementation assumes that transaction could not mix in a > replication stream. Also distributed transaction are not supported yet. > > Closes: #2798 > Needed for: #980 > --- > src/box/applier.cc | 185 +++++++++++++++----- > test/replication/transaction.result | 240 ++++++++++++++++++++++++++ > test/replication/transaction.test.lua | 86 +++++++++ > 3 files changed, 471 insertions(+), 40 deletions(-) > create mode 100644 test/replication/transaction.result > create mode 100644 test/replication/transaction.test.lua > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 7f37fe2ee..59c33bb84 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -48,6 +48,7 @@ > #include "error.h" > #include "session.h" > #include "cfg.h" > +#include "txn.h" > > STRS(applier_state, applier_STATE); > > @@ -378,6 +379,105 @@ applier_join(struct applier *applier) > applier_set_state(applier, APPLIER_READY); > } > > +/** > + * Read one transaction from network. > + * Transaction rows are placed into row_buf as an array, row's bodies are > + * placed into obuf because it is not allowed to relocate row's bodies. > + * Also we could not use applier input buffer because rpos adjusted after xrow > + * decoding and corresponding space going to reuse. > + * > + * Note: current implementation grants that transaction could not be mixed, so > + * we read each transaction from first xrow until xrow with txn_last = true. > + */ > +static int64_t > +applier_read_tx(struct applier *applier, struct ibuf *row_buf, > + struct obuf *data_buf) > +{ > + struct xrow_header *row; > + struct ev_io *coio = &applier->io; > + struct ibuf *ibuf = &applier->ibuf; > + int64_t txn_id = 0; > + > + do { > + row = (struct xrow_header *)ibuf_alloc(row_buf, > + sizeof(struct xrow_header)); Nit: the line's too for no reason, and there are more lines like that. Please fix them where you can without making the code look ugly. > + if (row == NULL) { > + diag_set(OutOfMemory, sizeof(struct xrow_header), > + "slab", "struct xrow_header"); > + goto error; > + } > + > + double timeout = replication_disconnect_timeout(); > + try { > + /* TODO: we should have a C version of this function. */ > + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); Nit: IMO better use guards to free resources, if any. > + } catch (...) { > + goto error; > + } > + > + if (iproto_type_is_error(row->type)) { > + xrow_decode_error(row); > + goto error; > + } > + > + /* Replication request. */ > + if (row->replica_id == REPLICA_ID_NIL || > + row->replica_id >= VCLOCK_MAX) { > + /* > + * A safety net, this can only occur > + * if we're fed a strangely broken xlog. > + */ > + diag_set(ClientError, ER_UNKNOWN_REPLICA, > + int2str(row->replica_id), > + tt_uuid_str(&REPLICASET_UUID)); > + goto error; > + } > + if (ibuf_used(row_buf) == sizeof(struct xrow_header)) { > + /* > + * First row in a transaction. In order to enforce > + * consistency check that first row lsn and replica id > + * match with transaction. > + */ > + txn_id = row->txn_id; > + if (row->lsn != txn_id) { > + /* There is not a first row in the transactions. */ > + diag_set(ClientError, ER_PROTOCOL, > + "Not a first row in a transaction"); > + goto error; > + } > + } > + if (txn_id != row->txn_id) { > + /* We are not able to handle interleaving transactions. */ > + diag_set(ClientError, ER_UNSUPPORTED, > + "replications", > + "interleaving transactions"); > + goto error; > + } > + > + > + applier->lag = ev_now(loop()) - row->tm; > + applier->last_row_time = ev_monotonic_now(loop()); > + > + if (row->body->iov_base != NULL) { > + void *new_base = obuf_alloc(data_buf, row->body->iov_len); > + if (new_base == NULL) { > + diag_set(OutOfMemory, row->body->iov_len, > + "slab", "xrow_data"); > + goto error; > + } > + memcpy(new_base, row->body->iov_base, row->body->iov_len); > + row->body->iov_base = new_base; So we first read a row to an ibuf, then copy it to an obuf. I understand that you do this, because xrow_header::body has pointers in it. Still it looks rather awkward. May be, we'd better temporarily fix up those pointers to store relative offsets instead? > + } > + > + } while (row->txn_commit == 0); > + > + return 0; > +error: > + ibuf_reset(row_buf); > + obuf_reset(data_buf); obuf_reset, ibuf_reset don't free up memory. You must use obuf_destroy, ibuf_destroy. > + return -1; > +} > + > /** > * Execute and process SUBSCRIBE request (follow updates from a master). > */ > @@ -392,6 +492,10 @@ applier_subscribe(struct applier *applier) > struct xrow_header row; > struct vclock remote_vclock_at_subscribe; > struct tt_uuid cluster_id = uuid_nil; > + struct ibuf row_buf; > + struct obuf data_buf; IMO the buffers better be a part of the applier struct. Then you wouldn't need to use a label to free them up in applier_read_tx and hence could simply let the exception thrown by coio_read_xrow_timeout_xc travel up the stack without wrapping it in that awkward try-catch block. > + ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header)); > + obuf_create(&data_buf, &cord()->slabc, 0x10000); This constant look like magic to me. > > xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, > &replicaset.vclock); > @@ -501,36 +605,16 @@ applier_subscribe(struct applier *applier) > applier_set_state(applier, APPLIER_FOLLOW); > } > > - /* > - * Tarantool < 1.7.7 does not send periodic heartbeat > - * messages so we can't assume that if we haven't heard > - * from the master for quite a while the connection is > - * broken - the master might just be idle. > - */ > - if (applier->version_id < version_id(1, 7, 7)) { > - coio_read_xrow(coio, ibuf, &row); You silently dropped this branch. Please leave it be. > - } else { > - double timeout = replication_disconnect_timeout(); > - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout); > - } > + if (applier_read_tx(applier, &row_buf, &data_buf) != 0) > + diag_raise(); > > - if (iproto_type_is_error(row.type)) > - xrow_decode_error_xc(&row); /* error */ > - /* Replication request. */ > - if (row.replica_id == REPLICA_ID_NIL || > - row.replica_id >= VCLOCK_MAX) { > - /* > - * A safety net, this can only occur > - * if we're fed a strangely broken xlog. > - */ > - tnt_raise(ClientError, ER_UNKNOWN_REPLICA, > - int2str(row.replica_id), > - tt_uuid_str(&REPLICASET_UUID)); > - } > + struct txn *txn = NULL; > + struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos; > + struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1; > > - applier->lag = ev_now(loop()) - row.tm; > + applier->lag = ev_now(loop()) - last_row->tm; > applier->last_row_time = ev_monotonic_now(loop()); > - struct replica *replica = replica_by_id(row.replica_id); > + struct replica *replica = replica_by_id(first_row->replica_id); > struct latch *latch = (replica ? &replica->order_latch : > &replicaset.applier.order_latch); > /* > @@ -540,24 +624,45 @@ applier_subscribe(struct applier *applier) > * that belong to the same server id. > */ > latch_lock(latch); > - if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { > - int res = xstream_write(applier->subscribe_stream, &row); > - if (res != 0) { > - struct error *e = diag_last_error(diag_get()); > - /** > - * Silently skip ER_TUPLE_FOUND error if such > - * option is set in config. > - */ > - if (e->type == &type_ClientError && > + if (vclock_get(&replicaset.vclock, > + first_row->replica_id) < first_row->lsn) { > + struct xrow_header *row = first_row; > + if (first_row != last_row) > + txn = txn_begin(false); We have a nice level of abstraction implemented by xstream, but now you bluntly break it by calling txn_begin/commit directly. Please come up with a better solution, e.g. you could extend the xstream interface with write_tx method. > + int res = 0; > + while (row <= last_row && res == 0) { > + res = xstream_write(applier->subscribe_stream, row); > + struct error *e; > + if (res != 0 && > + (e = diag_last_error(diag_get()))->type == > + &type_ClientError && > box_error_code(e) == ER_TUPLE_FOUND && > - replication_skip_conflict) > + replication_skip_conflict) { > + /** > + * Silently skip ER_TUPLE_FOUND error > + * if such option is set in config. > + */ > diag_clear(diag_get()); > - else { > - latch_unlock(latch); > - diag_raise(); > + row->type = IPROTO_NOP; > + row->bodycnt = 0; > + res = xstream_write(applier->subscribe_stream, > + row); > } > + ++row; > + } > + if (res == 0 && txn != NULL) > + res = txn_commit(txn); > + > + if (res != 0) { > + txn_rollback(); > + obuf_reset(&data_buf); > + ibuf_reset(&row_buf); > + latch_unlock(latch); > + diag_raise(); > } > } > + obuf_reset(&data_buf); > + ibuf_reset(&row_buf); > latch_unlock(latch); > > if (applier->state == APPLIER_SYNC || > diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua > new file mode 100644 > index 000000000..47003c644 > --- /dev/null > +++ b/test/replication/transaction.test.lua > @@ -0,0 +1,86 @@ > +env = require('test_run') > +test_run = env.new() > +box.schema.user.grant('guest', 'replication') > + > +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) > +_ = s:create_index('pk') > + > +-- transaction w/o conflict > +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit() Whenever you add a test, please write a few words about what it does and the resolution of which ticket it is supposed to test. ^ permalink raw reply [flat|nested] 14+ messages in thread
end of thread, other threads:[~2019-03-11 19:52 UTC | newest] Thread overview: 14+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2019-03-10 20:21 [tarantool-patches] [PATCH v3 0/2] Transaction boundaries for applier Georgy Kirichenko 2019-03-10 20:21 ` [tarantool-patches] [PATCH] Don't free fiber gc on while txn_rollback Georgy Kirichenko 2019-03-11 8:12 ` [tarantool-patches] " Konstantin Osipov 2019-03-10 20:21 ` [tarantool-patches] [PATCH v3 1/2] Write rows without a lsn to the transaction tail Georgy Kirichenko 2019-03-11 8:14 ` [tarantool-patches] " Konstantin Osipov 2019-03-11 8:40 ` [tarantool-patches] " Vladimir Davydov 2019-03-11 9:59 ` [tarantool-patches] " Konstantin Osipov 2019-03-11 10:54 ` Georgy Kirichenko 2019-03-11 14:04 ` Konstantin Osipov 2019-03-11 19:52 ` Георгий Кириченко 2019-03-10 20:21 ` [tarantool-patches] [PATCH v3 2/2] Transaction support for applier Georgy Kirichenko 2019-03-11 8:18 ` [tarantool-patches] " Konstantin Osipov [not found] <cover.1550001848.git.georgy@tarantool.org> 2019-02-12 20:04 ` [tarantool-patches] " Georgy Kirichenko 2019-02-18 9:36 ` Vladimir Davydov
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox