From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id D8D4D2AC3A for ; Fri, 22 Mar 2019 08:06:17 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id yXqk1ytOO2lk for ; Fri, 22 Mar 2019 08:06:17 -0400 (EDT) Received: from smtp16.mail.ru (smtp16.mail.ru [94.100.176.153]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 876F72AC2A for ; Fri, 22 Mar 2019 08:06:17 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v3 5/5] Raise an error if remote transaction produces non-local changes Date: Fri, 22 Mar 2019 15:06:10 +0300 Message-Id: <36c90408b1cf967adf2d8a433e13ef4f8d061e27.1553255718.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Disallow changes for non-local spaces during replication stream applying. As we do not support distributed transaction yet we could not provide a transactional replication for such side effects if there are not NOPed. Needed for: #2798 Follow up for: 27283debc327a1ef87e025badeed97d9ac264ac6 --- src/box/applier.cc | 18 ++++++++++-- src/box/txn.c | 15 ++++++++++ src/box/txn.h | 9 ++++++ test/replication/on_replace.result | 44 ++++++++++++++++++++++++---- test/replication/on_replace.test.lua | 20 +++++++++++-- 5 files changed, 96 insertions(+), 10 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 08ad4a6a8..2a528b856 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -571,9 +571,23 @@ applier_apply_tx(struct stailq *rows) if (res != 0) break; } - if (res == 0) + if (res == 0) { + /* + * We are going to commit so it's a high time to check if + * the current transaction has non-local effects. + */ + if (txn_is_distributed(txn)) { + /* + * A transaction mixes remote and local rows and + * countn't be replicated back because we don't + * support distributed transactions yet. + */ + diag_set(ClientError, ER_UNSUPPORTED, + "Applier", "distributed transactions"); + return -1; + } res = txn_commit(txn); - else + } else txn_rollback(); return res; } diff --git a/src/box/txn.c b/src/box/txn.c index 31e19951f..97f076f22 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -224,6 +224,21 @@ fail: return NULL; } +bool +txn_is_distributed(struct txn *txn) +{ + assert(txn == in_txn()); + if (txn->n_local_rows == 0 || txn->n_remote_rows == 0) + return false; + struct txn_stmt *stmt; + /* Search for new non local group rows. */ + stailq_foreach_entry(stmt, &txn->stmts, next) + if (stmt->row->replica_id == 0 && + stmt->space->def->opts.group_id != GROUP_LOCAL) + return true; + return false; +} + /** * End a statement. In autocommit mode, end * the current transaction as well. diff --git a/src/box/txn.h b/src/box/txn.h index 3572b005d..c00eb28a0 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -296,6 +296,15 @@ txn_commit_ro_stmt(struct txn *txn) } } +/* + * Check whether transaction is distributed or not. + * It's essential in case of replication because we couldn't + * replicate a transaction with both remote and local non NOP + * statements. + */ +bool +txn_is_distributed(struct txn *txn); + /** * End a statement. In autocommit mode, end * the current transaction as well. diff --git a/test/replication/on_replace.result b/test/replication/on_replace.result index 2e95b90ea..a02b90f7e 100644 --- a/test/replication/on_replace.result +++ b/test/replication/on_replace.result @@ -104,7 +104,7 @@ box.space.test:drop() box.schema.user.revoke('guest', 'replication') --- ... --- gh-2682 on_replace on slave server with data change +-- gh-2798 on_replace on slave server with non-local data change should fail SERVERS = { 'on_replace1', 'on_replace2' } --- ... @@ -143,7 +143,7 @@ fiber = require'fiber' while box.space.s2 == nil do fiber.sleep(0.00001) end --- ... -_ = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end) +tg = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end) --- ... test_run:cmd('switch on_replace1') @@ -154,20 +154,27 @@ box.space.s1:replace({1, 2, 3, 4}) --- - [1, 2, 3, 4] ... -while #(box.space.s2:select()) == 0 do fiber.sleep(0.00001) end +while (box.info.replication[3 - box.info.id].downstream.status ~= 'stopped') do fiber.sleep(0.00001) end --- ... test_run:cmd('switch on_replace2') --- - true ... +while (box.info.replication[3 - box.info.id].upstream.status ~= 'stopped') do fiber.sleep(0.00001) end +--- +... +box.info.replication[3 - box.info.id].upstream.message +--- +- Applier does not support distributed transactions +... box.space.s1:select() --- -- - [1, 2, 3, 4] +- [] ... box.space.s2:select() --- -- - [1, 2, 3, 4] +- [] ... test_run:cmd('switch on_replace1') --- @@ -179,6 +186,33 @@ box.space.s1:select() ... box.space.s2:select() --- +- [] +... +-- gh-2798 on_replace on slave server with local data change is allowed +test_run:cmd('switch on_replace2') +--- +- true +... +s3 = box.schema.space.create('s3', {is_local = true}) +--- +... +_ = s3:create_index('pk') +--- +... +tg = box.space.s1:on_replace(function (old, new) box.space.s3:replace(new) end, tg) +--- +... +replication = box.cfg.replication +--- +... +box.cfg{replication = {}} +--- +... +box.cfg{replication = replication} +--- +... +s3:select() +--- - - [1, 2, 3, 4] ... _ = test_run:cmd('switch default') diff --git a/test/replication/on_replace.test.lua b/test/replication/on_replace.test.lua index e34832103..779dbf768 100644 --- a/test/replication/on_replace.test.lua +++ b/test/replication/on_replace.test.lua @@ -44,7 +44,7 @@ box.space.test:drop() box.schema.user.revoke('guest', 'replication') --- gh-2682 on_replace on slave server with data change +-- gh-2798 on_replace on slave server with non-local data change should fail SERVERS = { 'on_replace1', 'on_replace2' } test_run:create_cluster(SERVERS, "replication", {args="0.2"}) @@ -60,13 +60,15 @@ _ = s2:create_index('pk') test_run:cmd('switch on_replace2') fiber = require'fiber' while box.space.s2 == nil do fiber.sleep(0.00001) end -_ = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end) +tg = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end) test_run:cmd('switch on_replace1') box.space.s1:replace({1, 2, 3, 4}) -while #(box.space.s2:select()) == 0 do fiber.sleep(0.00001) end +while (box.info.replication[3 - box.info.id].downstream.status ~= 'stopped') do fiber.sleep(0.00001) end test_run:cmd('switch on_replace2') +while (box.info.replication[3 - box.info.id].upstream.status ~= 'stopped') do fiber.sleep(0.00001) end +box.info.replication[3 - box.info.id].upstream.message box.space.s1:select() box.space.s2:select() @@ -74,6 +76,18 @@ test_run:cmd('switch on_replace1') box.space.s1:select() box.space.s2:select() +-- gh-2798 on_replace on slave server with local data change is allowed +test_run:cmd('switch on_replace2') +s3 = box.schema.space.create('s3', {is_local = true}) +_ = s3:create_index('pk') +tg = box.space.s1:on_replace(function (old, new) box.space.s3:replace(new) end, tg) + +replication = box.cfg.replication +box.cfg{replication = {}} +box.cfg{replication = replication} + +s3:select() + _ = test_run:cmd('switch default') test_run:drop_cluster(SERVERS) test_run:cleanup_cluster() -- 2.21.0