[tarantool-patches] [PATCH v3 5/5] Raise an error if remote transaction produces non-local changes
Georgy Kirichenko
georgy at tarantool.org
Fri Mar 22 15:06:10 MSK 2019
Disallow changes for non-local spaces during replication stream
applying. As we do not support distributed transaction yet we could not
provide a transactional replication for such side effects if there are
not NOPed.
Needed for: #2798
Follow up for: 27283debc327a1ef87e025badeed97d9ac264ac6
---
src/box/applier.cc | 18 ++++++++++--
src/box/txn.c | 15 ++++++++++
src/box/txn.h | 9 ++++++
test/replication/on_replace.result | 44 ++++++++++++++++++++++++----
test/replication/on_replace.test.lua | 20 +++++++++++--
5 files changed, 96 insertions(+), 10 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 08ad4a6a8..2a528b856 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -571,9 +571,23 @@ applier_apply_tx(struct stailq *rows)
if (res != 0)
break;
}
- if (res == 0)
+ if (res == 0) {
+ /*
+ * We are going to commit so it's a high time to check if
+ * the current transaction has non-local effects.
+ */
+ if (txn_is_distributed(txn)) {
+ /*
+ * A transaction mixes remote and local rows and
+ * countn't be replicated back because we don't
+ * support distributed transactions yet.
+ */
+ diag_set(ClientError, ER_UNSUPPORTED,
+ "Applier", "distributed transactions");
+ return -1;
+ }
res = txn_commit(txn);
- else
+ } else
txn_rollback();
return res;
}
diff --git a/src/box/txn.c b/src/box/txn.c
index 31e19951f..97f076f22 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -224,6 +224,21 @@ fail:
return NULL;
}
+bool
+txn_is_distributed(struct txn *txn)
+{
+ assert(txn == in_txn());
+ if (txn->n_local_rows == 0 || txn->n_remote_rows == 0)
+ return false;
+ struct txn_stmt *stmt;
+ /* Search for new non local group rows. */
+ stailq_foreach_entry(stmt, &txn->stmts, next)
+ if (stmt->row->replica_id == 0 &&
+ stmt->space->def->opts.group_id != GROUP_LOCAL)
+ return true;
+ return false;
+}
+
/**
* End a statement. In autocommit mode, end
* the current transaction as well.
diff --git a/src/box/txn.h b/src/box/txn.h
index 3572b005d..c00eb28a0 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -296,6 +296,15 @@ txn_commit_ro_stmt(struct txn *txn)
}
}
+/*
+ * Check whether transaction is distributed or not.
+ * It's essential in case of replication because we couldn't
+ * replicate a transaction with both remote and local non NOP
+ * statements.
+ */
+bool
+txn_is_distributed(struct txn *txn);
+
/**
* End a statement. In autocommit mode, end
* the current transaction as well.
diff --git a/test/replication/on_replace.result b/test/replication/on_replace.result
index 2e95b90ea..a02b90f7e 100644
--- a/test/replication/on_replace.result
+++ b/test/replication/on_replace.result
@@ -104,7 +104,7 @@ box.space.test:drop()
box.schema.user.revoke('guest', 'replication')
---
...
--- gh-2682 on_replace on slave server with data change
+-- gh-2798 on_replace on slave server with non-local data change should fail
SERVERS = { 'on_replace1', 'on_replace2' }
---
...
@@ -143,7 +143,7 @@ fiber = require'fiber'
while box.space.s2 == nil do fiber.sleep(0.00001) end
---
...
-_ = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
+tg = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
---
...
test_run:cmd('switch on_replace1')
@@ -154,20 +154,27 @@ box.space.s1:replace({1, 2, 3, 4})
---
- [1, 2, 3, 4]
...
-while #(box.space.s2:select()) == 0 do fiber.sleep(0.00001) end
+while (box.info.replication[3 - box.info.id].downstream.status ~= 'stopped') do fiber.sleep(0.00001) end
---
...
test_run:cmd('switch on_replace2')
---
- true
...
+while (box.info.replication[3 - box.info.id].upstream.status ~= 'stopped') do fiber.sleep(0.00001) end
+---
+...
+box.info.replication[3 - box.info.id].upstream.message
+---
+- Applier does not support distributed transactions
+...
box.space.s1:select()
---
-- - [1, 2, 3, 4]
+- []
...
box.space.s2:select()
---
-- - [1, 2, 3, 4]
+- []
...
test_run:cmd('switch on_replace1')
---
@@ -179,6 +186,33 @@ box.space.s1:select()
...
box.space.s2:select()
---
+- []
+...
+-- gh-2798 on_replace on slave server with local data change is allowed
+test_run:cmd('switch on_replace2')
+---
+- true
+...
+s3 = box.schema.space.create('s3', {is_local = true})
+---
+...
+_ = s3:create_index('pk')
+---
+...
+tg = box.space.s1:on_replace(function (old, new) box.space.s3:replace(new) end, tg)
+---
+...
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication = replication}
+---
+...
+s3:select()
+---
- - [1, 2, 3, 4]
...
_ = test_run:cmd('switch default')
diff --git a/test/replication/on_replace.test.lua b/test/replication/on_replace.test.lua
index e34832103..779dbf768 100644
--- a/test/replication/on_replace.test.lua
+++ b/test/replication/on_replace.test.lua
@@ -44,7 +44,7 @@ box.space.test:drop()
box.schema.user.revoke('guest', 'replication')
--- gh-2682 on_replace on slave server with data change
+-- gh-2798 on_replace on slave server with non-local data change should fail
SERVERS = { 'on_replace1', 'on_replace2' }
test_run:create_cluster(SERVERS, "replication", {args="0.2"})
@@ -60,13 +60,15 @@ _ = s2:create_index('pk')
test_run:cmd('switch on_replace2')
fiber = require'fiber'
while box.space.s2 == nil do fiber.sleep(0.00001) end
-_ = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
+tg = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
test_run:cmd('switch on_replace1')
box.space.s1:replace({1, 2, 3, 4})
-while #(box.space.s2:select()) == 0 do fiber.sleep(0.00001) end
+while (box.info.replication[3 - box.info.id].downstream.status ~= 'stopped') do fiber.sleep(0.00001) end
test_run:cmd('switch on_replace2')
+while (box.info.replication[3 - box.info.id].upstream.status ~= 'stopped') do fiber.sleep(0.00001) end
+box.info.replication[3 - box.info.id].upstream.message
box.space.s1:select()
box.space.s2:select()
@@ -74,6 +76,18 @@ test_run:cmd('switch on_replace1')
box.space.s1:select()
box.space.s2:select()
+-- gh-2798 on_replace on slave server with local data change is allowed
+test_run:cmd('switch on_replace2')
+s3 = box.schema.space.create('s3', {is_local = true})
+_ = s3:create_index('pk')
+tg = box.space.s1:on_replace(function (old, new) box.space.s3:replace(new) end, tg)
+
+replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication = replication}
+
+s3:select()
+
_ = test_run:cmd('switch default')
test_run:drop_cluster(SERVERS)
test_run:cleanup_cluster()
--
2.21.0
More information about the Tarantool-patches
mailing list