[tarantool-patches] [PATCH v3 5/5] Raise an error if remote transaction produces non-local changes

Georgy Kirichenko georgy at tarantool.org
Fri Mar 22 15:06:10 MSK 2019


Disallow changes for non-local spaces during replication stream
applying. As we do not support distributed transaction yet we could not
provide a transactional replication for such side effects if there are
not NOPed.

Needed for: #2798
Follow up for: 27283debc327a1ef87e025badeed97d9ac264ac6
---
 src/box/applier.cc                   | 18 ++++++++++--
 src/box/txn.c                        | 15 ++++++++++
 src/box/txn.h                        |  9 ++++++
 test/replication/on_replace.result   | 44 ++++++++++++++++++++++++----
 test/replication/on_replace.test.lua | 20 +++++++++++--
 5 files changed, 96 insertions(+), 10 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 08ad4a6a8..2a528b856 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -571,9 +571,23 @@ applier_apply_tx(struct stailq *rows)
 		if (res != 0)
 			break;
 	}
-	if (res == 0)
+	if (res == 0) {
+		/*
+		 * We are going to commit so it's a high time to check if
+		 * the current transaction has non-local effects.
+		 */
+		if (txn_is_distributed(txn)) {
+			/*
+			 * A transaction mixes remote and local rows and
+			 * countn't be replicated back because we don't
+			 * support distributed transactions yet.
+			 */
+			diag_set(ClientError, ER_UNSUPPORTED,
+				 "Applier", "distributed transactions");
+			return -1;
+		}
 		res = txn_commit(txn);
-	else
+	} else
 		txn_rollback();
 	return res;
 }
diff --git a/src/box/txn.c b/src/box/txn.c
index 31e19951f..97f076f22 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -224,6 +224,21 @@ fail:
 	return NULL;
 }
 
+bool
+txn_is_distributed(struct txn *txn)
+{
+	assert(txn == in_txn());
+	if (txn->n_local_rows == 0 || txn->n_remote_rows == 0)
+		return false;
+	struct txn_stmt *stmt;
+	/* Search for new non local group rows. */
+	stailq_foreach_entry(stmt, &txn->stmts, next)
+		if (stmt->row->replica_id == 0 &&
+		    stmt->space->def->opts.group_id != GROUP_LOCAL)
+			return true;
+	return false;
+}
+
 /**
  * End a statement. In autocommit mode, end
  * the current transaction as well.
diff --git a/src/box/txn.h b/src/box/txn.h
index 3572b005d..c00eb28a0 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -296,6 +296,15 @@ txn_commit_ro_stmt(struct txn *txn)
 	}
 }
 
+/*
+ * Check whether transaction is distributed or not.
+ * It's essential in case of replication because we couldn't
+ * replicate a transaction with both remote and local non NOP
+ * statements.
+ */
+bool
+txn_is_distributed(struct txn *txn);
+
 /**
  * End a statement. In autocommit mode, end
  * the current transaction as well.
diff --git a/test/replication/on_replace.result b/test/replication/on_replace.result
index 2e95b90ea..a02b90f7e 100644
--- a/test/replication/on_replace.result
+++ b/test/replication/on_replace.result
@@ -104,7 +104,7 @@ box.space.test:drop()
 box.schema.user.revoke('guest', 'replication')
 ---
 ...
--- gh-2682 on_replace on slave server with data change
+-- gh-2798 on_replace on slave server with non-local data change should fail
 SERVERS = { 'on_replace1', 'on_replace2' }
 ---
 ...
@@ -143,7 +143,7 @@ fiber = require'fiber'
 while box.space.s2 == nil do fiber.sleep(0.00001) end
 ---
 ...
-_ = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
+tg = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
 ---
 ...
 test_run:cmd('switch on_replace1')
@@ -154,20 +154,27 @@ box.space.s1:replace({1, 2, 3, 4})
 ---
 - [1, 2, 3, 4]
 ...
-while #(box.space.s2:select()) == 0 do fiber.sleep(0.00001) end
+while (box.info.replication[3 - box.info.id].downstream.status ~= 'stopped') do fiber.sleep(0.00001) end
 ---
 ...
 test_run:cmd('switch on_replace2')
 ---
 - true
 ...
+while (box.info.replication[3 - box.info.id].upstream.status ~= 'stopped') do fiber.sleep(0.00001) end
+---
+...
+box.info.replication[3 - box.info.id].upstream.message
+---
+- Applier does not support distributed transactions
+...
 box.space.s1:select()
 ---
-- - [1, 2, 3, 4]
+- []
 ...
 box.space.s2:select()
 ---
-- - [1, 2, 3, 4]
+- []
 ...
 test_run:cmd('switch on_replace1')
 ---
@@ -179,6 +186,33 @@ box.space.s1:select()
 ...
 box.space.s2:select()
 ---
+- []
+...
+-- gh-2798 on_replace on slave server with local data change is allowed
+test_run:cmd('switch on_replace2')
+---
+- true
+...
+s3 = box.schema.space.create('s3', {is_local = true})
+---
+...
+_ = s3:create_index('pk')
+---
+...
+tg = box.space.s1:on_replace(function (old, new) box.space.s3:replace(new) end, tg)
+---
+...
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication = replication}
+---
+...
+s3:select()
+---
 - - [1, 2, 3, 4]
 ...
 _ = test_run:cmd('switch default')
diff --git a/test/replication/on_replace.test.lua b/test/replication/on_replace.test.lua
index e34832103..779dbf768 100644
--- a/test/replication/on_replace.test.lua
+++ b/test/replication/on_replace.test.lua
@@ -44,7 +44,7 @@ box.space.test:drop()
 box.schema.user.revoke('guest', 'replication')
 
 
--- gh-2682 on_replace on slave server with data change
+-- gh-2798 on_replace on slave server with non-local data change should fail
 
 SERVERS = { 'on_replace1', 'on_replace2' }
 test_run:create_cluster(SERVERS, "replication", {args="0.2"})
@@ -60,13 +60,15 @@ _ = s2:create_index('pk')
 test_run:cmd('switch on_replace2')
 fiber = require'fiber'
 while box.space.s2 == nil do fiber.sleep(0.00001) end
-_ = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
+tg = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
 
 test_run:cmd('switch on_replace1')
 box.space.s1:replace({1, 2, 3, 4})
-while #(box.space.s2:select()) == 0 do fiber.sleep(0.00001) end
+while (box.info.replication[3 - box.info.id].downstream.status ~= 'stopped') do fiber.sleep(0.00001) end
 
 test_run:cmd('switch on_replace2')
+while (box.info.replication[3 - box.info.id].upstream.status ~= 'stopped') do fiber.sleep(0.00001) end
+box.info.replication[3 - box.info.id].upstream.message
 box.space.s1:select()
 box.space.s2:select()
 
@@ -74,6 +76,18 @@ test_run:cmd('switch on_replace1')
 box.space.s1:select()
 box.space.s2:select()
 
+-- gh-2798 on_replace on slave server with local data change is allowed
+test_run:cmd('switch on_replace2')
+s3 = box.schema.space.create('s3', {is_local = true})
+_ = s3:create_index('pk')
+tg = box.space.s1:on_replace(function (old, new) box.space.s3:replace(new) end, tg)
+
+replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication = replication}
+
+s3:select()
+
 _ = test_run:cmd('switch default')
 test_run:drop_cluster(SERVERS)
 test_run:cleanup_cluster()
-- 
2.21.0





More information about the Tarantool-patches mailing list