Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH v3 0/5] Transaction in replication protocol
@ 2019-03-22 12:06 Georgy Kirichenko
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 1/5] Abort vinyl index creation in case of truncation rollback Georgy Kirichenko
                   ` (4 more replies)
  0 siblings, 5 replies; 14+ messages in thread
From: Georgy Kirichenko @ 2019-03-22 12:06 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

This patch set introduces a transactional replication mode into an
applier. There are five patches:
 * Fix truncate rollback in case of failures
 * Update lua schema as soon as space cache updates
 * Allow for ddl operations to be between begin and commit
 * Apply a whole transaction within begin/commit boundaries
 * Raise an error in case of distributed transaction (follow up for
   27283debc)

It is a complete redesign except for applier patch which has some
refactoring according to the last review comments.

Issue: https://github.com/tarantool/tarantool/issues/2798
Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-2798-transaction-boundaries

Georgy Kirichenko (5):
  Abort vinyl index creation in case of truncation rollback
  Synchronize lua schema update with space cache
  Require for single statement not autocommit in case of ddl
  Transaction support for applier
  Raise an error if remote transaction produces non-local changes

 src/box/alter.cc                      |  26 ++-
 src/box/applier.cc                    | 232 ++++++++++++++++++------
 src/box/memtx_engine.c                |  20 +--
 src/box/txn.c                         |  24 ++-
 src/box/txn.h                         |  14 ++
 test/box/ddl.result                   |   8 +-
 test/box/ddl.test.lua                 |   4 +-
 test/box/transaction.result           |  53 ++----
 test/box/transaction.test.lua         |  27 ++-
 test/engine/ddl.result                |  92 ++++++++++
 test/engine/ddl.test.lua              |  55 ++++++
 test/engine/errinj.result             |  53 ++++++
 test/engine/errinj.test.lua           |  15 ++
 test/engine/suite.ini                 |   1 +
 test/engine/truncate.result           |   3 +-
 test/replication/on_replace.result    |  44 ++++-
 test/replication/on_replace.test.lua  |  20 ++-
 test/replication/transaction.result   | 242 ++++++++++++++++++++++++++
 test/replication/transaction.test.lua |  86 +++++++++
 test/sql-tap/trigger1.test.lua        |  12 +-
 test/sql/delete.result                |   8 +-
 test/sql/delete.test.lua              |   3 +-
 22 files changed, 877 insertions(+), 165 deletions(-)
 create mode 100644 test/engine/errinj.result
 create mode 100644 test/engine/errinj.test.lua
 create mode 100644 test/replication/transaction.result
 create mode 100644 test/replication/transaction.test.lua

-- 
2.21.0

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [tarantool-patches] [PATCH v3 1/5] Abort vinyl index creation in case of truncation rollback
  2019-03-22 12:06 [tarantool-patches] [PATCH v3 0/5] Transaction in replication protocol Georgy Kirichenko
@ 2019-03-22 12:06 ` Georgy Kirichenko
  2019-03-27  9:59   ` Vladimir Davydov
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 2/5] Synchronize lua schema update with space cache Georgy Kirichenko
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 14+ messages in thread
From: Georgy Kirichenko @ 2019-03-22 12:06 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Abort a new index creation if truncate couldn't be finished because of
rollback or an error. Without this vinyl fails because of internal
scheduler assertion.

Needed for: 2798
---
 src/box/alter.cc            | 13 +++++++--
 test/engine/errinj.result   | 53 +++++++++++++++++++++++++++++++++++++
 test/engine/errinj.test.lua | 15 +++++++++++
 test/engine/suite.ini       |  1 +
 4 files changed, 80 insertions(+), 2 deletions(-)
 create mode 100644 test/engine/errinj.result
 create mode 100644 test/engine/errinj.test.lua

diff --git a/src/box/alter.cc b/src/box/alter.cc
index 080a72b9f..daaa9cd57 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -1310,13 +1310,16 @@ public:
 		: AlterSpaceOp(alter), iid(iid) {}
 	/** id of the index to truncate. */
 	uint32_t iid;
+	struct index *new_index;
 	virtual void prepare(struct alter_space *alter);
 	virtual void commit(struct alter_space *alter, int64_t signature);
+	virtual ~TruncateIndex();
 };
 
 void
 TruncateIndex::prepare(struct alter_space *alter)
 {
+	new_index = space_index(alter->new_space, iid);
 	if (iid == 0) {
 		/*
 		 * Notify the engine that the primary index
@@ -1333,7 +1336,6 @@ TruncateIndex::prepare(struct alter_space *alter)
 	 * index was recreated. For example, Vinyl uses this
 	 * callback to load indexes during local recovery.
 	 */
-	struct index *new_index = space_index(alter->new_space, iid);
 	assert(new_index != NULL);
 	space_build_index_xc(alter->new_space, new_index,
 			     alter->new_space->format);
@@ -1343,10 +1345,17 @@ void
 TruncateIndex::commit(struct alter_space *alter, int64_t signature)
 {
 	struct index *old_index = space_index(alter->old_space, iid);
-	struct index *new_index = space_index(alter->new_space, iid);
 
 	index_commit_drop(old_index, signature);
 	index_commit_create(new_index, signature);
+	new_index = NULL;
+}
+
+TruncateIndex::~TruncateIndex()
+{
+	if (new_index == NULL)
+		return;
+	index_abort_create(new_index);
 }
 
 /**
diff --git a/test/engine/errinj.result b/test/engine/errinj.result
new file mode 100644
index 000000000..d244c334a
--- /dev/null
+++ b/test/engine/errinj.result
@@ -0,0 +1,53 @@
+test_run = require('test_run')
+---
+...
+inspector = test_run.new()
+---
+...
+engine = inspector:get_cfg('engine')
+---
+...
+errinj = box.error.injection
+---
+...
+-- truncation rollback should not crash
+s = box.schema.space.create('truncate_rollback', {engine = engine})
+---
+...
+_ = s:create_index('pk')
+---
+...
+_ = s:create_index('sk', {parts = {1, 'int'}})
+---
+...
+for i = 1, 10 do s:replace({i, i}) end
+---
+...
+errinj.set('ERRINJ_WAL_IO', true)
+---
+- ok
+...
+s:truncate()
+---
+- error: Failed to write to disk
+...
+errinj.set('ERRINJ_WAL_IO', false)
+---
+- ok
+...
+s:select()
+---
+- - [1, 1]
+  - [2, 2]
+  - [3, 3]
+  - [4, 4]
+  - [5, 5]
+  - [6, 6]
+  - [7, 7]
+  - [8, 8]
+  - [9, 9]
+  - [10, 10]
+...
+s:drop()
+---
+...
diff --git a/test/engine/errinj.test.lua b/test/engine/errinj.test.lua
new file mode 100644
index 000000000..57f3a962c
--- /dev/null
+++ b/test/engine/errinj.test.lua
@@ -0,0 +1,15 @@
+test_run = require('test_run')
+inspector = test_run.new()
+engine = inspector:get_cfg('engine')
+errinj = box.error.injection
+
+-- truncation rollback should not crash
+s = box.schema.space.create('truncate_rollback', {engine = engine})
+_ = s:create_index('pk')
+_ = s:create_index('sk', {parts = {1, 'int'}})
+for i = 1, 10 do s:replace({i, i}) end
+errinj.set('ERRINJ_WAL_IO', true)
+s:truncate()
+errinj.set('ERRINJ_WAL_IO', false)
+s:select()
+s:drop()
diff --git a/test/engine/suite.ini b/test/engine/suite.ini
index 3f82a1325..3db02ab6f 100644
--- a/test/engine/suite.ini
+++ b/test/engine/suite.ini
@@ -3,6 +3,7 @@ core = tarantool
 description = tarantool multiengine tests
 script = box.lua
 use_unix_sockets = True
+release_disabled = errinj.test.lua
 config = engine.cfg
 #disabled = replica_join.test.lua
 lua_libs = conflict.lua ../box/lua/utils.lua ../box/lua/push.lua
-- 
2.21.0

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [tarantool-patches] [PATCH v3 2/5] Synchronize lua schema update with space cache
  2019-03-22 12:06 [tarantool-patches] [PATCH v3 0/5] Transaction in replication protocol Georgy Kirichenko
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 1/5] Abort vinyl index creation in case of truncation rollback Georgy Kirichenko
@ 2019-03-22 12:06 ` Georgy Kirichenko
  2019-03-27 10:03   ` Vladimir Davydov
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 3/5] Require for single statement not autocommit in case of ddl Georgy Kirichenko
                   ` (2 subsequent siblings)
  4 siblings, 1 reply; 14+ messages in thread
From: Georgy Kirichenko @ 2019-03-22 12:06 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Update lua schema as soon as space cache replace was done instead of
doing this while on_commit trigger executes. In opposite then case
schema changes would not be visible until commit was finished.

Needed for: #2798
---
 src/box/alter.cc | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/src/box/alter.cc b/src/box/alter.cc
index daaa9cd57..275e39dd5 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -750,8 +750,6 @@ alter_space_commit(struct trigger *trigger, void *event)
 		op->commit(alter, txn->signature);
 	}
 
-	trigger_run_xc(&on_alter_space, alter->new_space);
-
 	alter->new_space = NULL; /* for alter_space_delete(). */
 	/*
 	 * Delete the old version of the space, we are not
@@ -787,6 +785,8 @@ alter_space_rollback(struct trigger *trigger, void * /* event */)
 	space_swap_triggers(alter->new_space, alter->old_space);
 	space_swap_fk_constraints(alter->new_space, alter->old_space);
 	space_cache_replace(alter->new_space, alter->old_space);
+	trigger_run(&on_alter_space, alter->old_space);
+
 	alter_space_delete(alter);
 }
 
@@ -888,6 +888,7 @@ alter_space_do(struct txn *txn, struct alter_space *alter)
 	 * cache with it.
 	 */
 	space_cache_replace(alter->old_space, alter->new_space);
+	trigger_run_xc(&on_alter_space, alter->new_space);
 
 	/*
 	 * Install transaction commit/rollback triggers to either
@@ -1388,7 +1389,6 @@ on_drop_space_commit(struct trigger *trigger, void *event)
 {
 	(void) event;
 	struct space *space = (struct space *)trigger->data;
-	trigger_run_xc(&on_alter_space, space);
 	space_delete(space);
 }
 
@@ -1403,6 +1403,7 @@ on_drop_space_rollback(struct trigger *trigger, void *event)
 	(void) event;
 	struct space *space = (struct space *)trigger->data;
 	space_cache_replace(NULL, space);
+	trigger_run(&on_alter_space, space);
 }
 
 /**
@@ -1412,8 +1413,7 @@ static void
 on_create_space_commit(struct trigger *trigger, void *event)
 {
 	(void) event;
-	struct space *space = (struct space *)trigger->data;
-	trigger_run_xc(&on_alter_space, space);
+	(void) trigger;
 }
 
 /**
@@ -1429,6 +1429,7 @@ on_create_space_rollback(struct trigger *trigger, void *event)
 	(void) event;
 	struct space *space = (struct space *)trigger->data;
 	space_cache_replace(space, NULL);
+	trigger_run(&on_alter_space, space);
 	space_delete(space);
 }
 
@@ -1672,6 +1673,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
 		 * execution on a replica.
 		 */
 		space_cache_replace(NULL, space);
+		trigger_run_xc(&on_alter_space, space);
 		/*
 		 * Do not forget to update schema_version right after
 		 * inserting the space to the space_cache, since no
@@ -1764,6 +1766,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
 		 * execution on a replica.
 		 */
 		space_cache_replace(old_space, NULL);
+		trigger_run_xc(&on_alter_space, old_space);
 		/*
 		 * Do not forget to update schema_version right after
 		 * deleting the space from the space_cache, since no
-- 
2.21.0

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [tarantool-patches] [PATCH v3 3/5] Require for single statement not autocommit in case of ddl
  2019-03-22 12:06 [tarantool-patches] [PATCH v3 0/5] Transaction in replication protocol Georgy Kirichenko
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 1/5] Abort vinyl index creation in case of truncation rollback Georgy Kirichenko
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 2/5] Synchronize lua schema update with space cache Georgy Kirichenko
@ 2019-03-22 12:06 ` Georgy Kirichenko
  2019-03-27 10:49   ` Vladimir Davydov
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 4/5] Transaction support for applier Georgy Kirichenko
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 5/5] Raise an error if remote transaction produces non-local changes Georgy Kirichenko
  4 siblings, 1 reply; 14+ messages in thread
From: Georgy Kirichenko @ 2019-03-22 12:06 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Allow single statement transactions within begin/commit in case of an
ddl operation instead of auto commit requirements. This is essential
for a transactional applier.

Needed for: #2798
---
 src/box/memtx_engine.c         | 20 ++------
 src/box/txn.c                  |  9 +++-
 src/box/txn.h                  |  5 ++
 test/box/ddl.result            |  8 +--
 test/box/ddl.test.lua          |  4 +-
 test/box/transaction.result    | 53 +++++---------------
 test/box/transaction.test.lua  | 27 ++++------
 test/engine/ddl.result         | 92 ++++++++++++++++++++++++++++++++++
 test/engine/ddl.test.lua       | 55 ++++++++++++++++++++
 test/engine/truncate.result    |  3 +-
 test/sql-tap/trigger1.test.lua | 12 ++---
 test/sql/delete.result         |  8 ++-
 test/sql/delete.test.lua       |  3 +-
 13 files changed, 204 insertions(+), 95 deletions(-)

diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index d468d1cd8..924f8bbc4 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -384,15 +384,7 @@ static int
 memtx_engine_begin(struct engine *engine, struct txn *txn)
 {
 	(void)engine;
-	/*
-	 * Register a trigger to rollback transaction on yield.
-	 * This must be done in begin(), since it's
-	 * the first thing txn invokes after txn->n_stmts++,
-	 * to match with trigger_clear() in rollbackStatement().
-	 */
-	if (txn->is_autocommit == false) {
-		memtx_init_txn(txn);
-	}
+	(void)txn;
 	return 0;
 }
 
@@ -404,15 +396,9 @@ memtx_engine_begin_statement(struct engine *engine, struct txn *txn)
 	if (txn->engine_tx == NULL) {
 		struct space *space = txn_last_stmt(txn)->space;
 
-		if (space->def->id > BOX_SYSTEM_ID_MAX &&
-		    ! rlist_empty(&space->on_replace)) {
-			/**
-			 * A space on_replace trigger may initiate
-			 * a yield.
-			 */
-			assert(txn->is_autocommit);
+		if (space->def->id > BOX_SYSTEM_ID_MAX)
+			/* Setup triggers for non-ddl transactions. */
 			memtx_init_txn(txn);
-		}
 	}
 	return 0;
 }
diff --git a/src/box/txn.c b/src/box/txn.c
index deb4fac47..31e19951f 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -147,6 +147,7 @@ txn_begin(bool is_autocommit)
 	txn->n_local_rows = 0;
 	txn->n_remote_rows = 0;
 	txn->is_autocommit = is_autocommit;
+	txn->is_single_statement = false;
 	txn->has_triggers  = false;
 	txn->is_aborted = false;
 	txn->in_sub_stmt = 0;
@@ -191,6 +192,11 @@ txn_begin_stmt(struct space *space)
 		diag_set(ClientError, ER_SUB_STMT_MAX);
 		return NULL;
 	}
+	if (txn->is_single_statement && !stailq_empty(&txn->stmts)) {
+		diag_set(ClientError, ER_UNSUPPORTED,
+			 "DDL", "multi-statement transactions");
+		return NULL;
+	}
 
 	struct txn_stmt *stmt = txn_stmt_new(txn);
 	if (stmt == NULL) {
@@ -430,11 +436,12 @@ txn_abort(struct txn *txn)
 int
 txn_check_singlestatement(struct txn *txn, const char *where)
 {
-	if (!txn->is_autocommit || !txn_is_first_statement(txn)) {
+	if (!txn_is_first_statement(txn)) {
 		diag_set(ClientError, ER_UNSUPPORTED,
 			 where, "multi-statement transactions");
 		return -1;
 	}
+	txn->is_single_statement = true;
 	return 0;
 }
 
diff --git a/src/box/txn.h b/src/box/txn.h
index c9829da9e..3572b005d 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -149,6 +149,11 @@ struct txn {
 	 * (statement end causes an automatic transaction commit).
 	 */
 	bool is_autocommit;
+	/**
+	 * True if this transaction is allowed to have only one statement.
+	 * Used for ddl operations.
+	 */
+	bool is_single_statement;
 	/**
 	 * True if the transaction was aborted so should be
 	 * rolled back at commit.
diff --git a/test/box/ddl.result b/test/box/ddl.result
index 3d6d07f43..e48284ffd 100644
--- a/test/box/ddl.result
+++ b/test/box/ddl.result
@@ -299,11 +299,7 @@ box.space._collation:replace(c)
 ---
 - error: collation does not support alter
 ...
-box.begin() box.internal.collation.create('test2', 'ICU', 'ru_RU')
----
-- error: Space _collation does not support multi-statement transactions
-...
-box.rollback()
+box.begin() box.internal.collation.create('test2', 'ICU', 'ru_RU') box.rollback()
 ---
 ...
 box.internal.collation.create('test', 'ICU', 'ru_RU')
@@ -645,11 +641,11 @@ _ = fiber.create(function()
 end);
 ---
 ...
+-- Should be Ok for now
 box.begin()
     test_latch:create_index("sec2", {unique = true, parts = {2, 'unsigned'}})
 box.commit();
 ---
-- error: DDL does not support multi-statement transactions
 ...
 test_run:cmd("setopt delimiter ''");
 ---
diff --git a/test/box/ddl.test.lua b/test/box/ddl.test.lua
index 5c147cdfb..101bc6f9b 100644
--- a/test/box/ddl.test.lua
+++ b/test/box/ddl.test.lua
@@ -131,8 +131,7 @@ c = box.space._collation:get{1}:totable()
 c[2] = 'unicode_test'
 box.space._collation:replace(c)
 
-box.begin() box.internal.collation.create('test2', 'ICU', 'ru_RU')
-box.rollback()
+box.begin() box.internal.collation.create('test2', 'ICU', 'ru_RU') box.rollback()
 
 box.internal.collation.create('test', 'ICU', 'ru_RU')
 box.internal.collation.exists('test')
@@ -260,6 +259,7 @@ _ = fiber.create(function()
     c:put(true)
 end);
 
+-- Should be Ok for now
 box.begin()
     test_latch:create_index("sec2", {unique = true, parts = {2, 'unsigned'}})
 box.commit();
diff --git a/test/box/transaction.result b/test/box/transaction.result
index 8a4d11d3b..7def44d5d 100644
--- a/test/box/transaction.result
+++ b/test/box/transaction.result
@@ -84,77 +84,50 @@ while f:status() ~= 'dead' do fiber.sleep(0) end;
 ---
 ...
 -- transactions and system spaces
+-- some operation involves more than one ddl spaces, so they should fail
 box.begin() box.schema.space.create('test');
 ---
-- error: Space _schema does not support multi-statement transactions
-...
-box.rollback();
----
-...
-box.begin() box.schema.func.create('test');
----
-- error: Space _func does not support multi-statement transactions
+- error: DDL does not support multi-statement transactions
 ...
 box.rollback();
 ---
 ...
 box.begin() box.schema.user.create('test');
 ---
-- error: Space _user does not support multi-statement transactions
-...
-box.rollback();
----
-...
-box.begin() box.schema.user.grant('guest', 'read', 'space', '_priv');
----
-- error: Space _priv does not support multi-statement transactions
-...
-box.rollback();
----
-...
-box.begin() box.space._user:delete{box.schema.GUEST_ID};
----
-- error: Space _user does not support multi-statement transactions
+- error: DDL does not support multi-statement transactions
 ...
 box.rollback();
 ---
 ...
-box.begin() box.space._space:delete{box.schema.CLUSTER_ID};
+-- but this is Ok now
+box.begin() box.schema.func.create('test') box.rollback();
 ---
-- error: DDL does not support multi-statement transactions
 ...
-box.rollback();
+box.begin() box.schema.user.grant('guest', 'read', 'space', '_priv') box.rollback();
 ---
 ...
-box.begin() box.space._sequence:insert{1, 1, 'test', 1, 1, 2, 1, 0, false};
+space = box.schema.space.create('test');
 ---
-- error: Space _sequence does not support multi-statement transactions
 ...
-box.rollback();
+box.begin() box.space._space:delete{space.id} box.rollback();
 ---
 ...
-box.begin() box.space._schema:insert{'test'};
+box.begin() box.space._space:delete{space.id} box.commit();
 ---
-- error: Space _schema does not support multi-statement transactions
 ...
-box.rollback();
+box.begin() box.space._sequence:insert{1, 1, 'test', 1, 1, 2, 1, 0, false} box.rollback();
 ---
 ...
-box.begin() box.space._cluster:insert{123456789, 'abc'};
+box.begin() box.space._schema:insert{'test'} box.rollback();
 ---
-- error: Space _cluster does not support multi-statement transactions
 ...
-box.rollback();
+box.begin() box.space._cluster:insert{30, '00000000-0000-0000-0000-000000000001'} box.rollback();
 ---
 ...
 s = box.schema.space.create('test');
 ---
 ...
-box.begin() index = s:create_index('primary');
----
-- error: DDL does not support multi-statement transactions
-...
-box.rollback();
+box.begin() index = s:create_index('primary') box.rollback();
 ---
 ...
 index = s:create_index('primary');
diff --git a/test/box/transaction.test.lua b/test/box/transaction.test.lua
index 8f7dfedec..0d212ca29 100644
--- a/test/box/transaction.test.lua
+++ b/test/box/transaction.test.lua
@@ -41,27 +41,22 @@ f = fiber.create(sloppy);
 -- ensure it's rolled back automatically
 while f:status() ~= 'dead' do fiber.sleep(0) end;
 -- transactions and system spaces
+-- some operation involves more than one ddl spaces, so they should fail
 box.begin() box.schema.space.create('test');
 box.rollback();
-box.begin() box.schema.func.create('test');
-box.rollback();
 box.begin() box.schema.user.create('test');
 box.rollback();
-box.begin() box.schema.user.grant('guest', 'read', 'space', '_priv');
-box.rollback();
-box.begin() box.space._user:delete{box.schema.GUEST_ID};
-box.rollback();
-box.begin() box.space._space:delete{box.schema.CLUSTER_ID};
-box.rollback();
-box.begin() box.space._sequence:insert{1, 1, 'test', 1, 1, 2, 1, 0, false};
-box.rollback();
-box.begin() box.space._schema:insert{'test'};
-box.rollback();
-box.begin() box.space._cluster:insert{123456789, 'abc'};
-box.rollback();
+-- but this is Ok now
+box.begin() box.schema.func.create('test') box.rollback();
+box.begin() box.schema.user.grant('guest', 'read', 'space', '_priv') box.rollback();
+space = box.schema.space.create('test');
+box.begin() box.space._space:delete{space.id} box.rollback();
+box.begin() box.space._space:delete{space.id} box.commit();
+box.begin() box.space._sequence:insert{1, 1, 'test', 1, 1, 2, 1, 0, false} box.rollback();
+box.begin() box.space._schema:insert{'test'} box.rollback();
+box.begin() box.space._cluster:insert{30, '00000000-0000-0000-0000-000000000001'} box.rollback();
 s = box.schema.space.create('test');
-box.begin() index = s:create_index('primary');
-box.rollback();
+box.begin() index = s:create_index('primary') box.rollback();
 index = s:create_index('primary');
 t = nil
 function multi()
diff --git a/test/engine/ddl.result b/test/engine/ddl.result
index 8d34d5ef4..c493bd4ac 100644
--- a/test/engine/ddl.result
+++ b/test/engine/ddl.result
@@ -2075,6 +2075,15 @@ i3:select()
 ...
 -- Check that recovery works.
 inspector:cmd("restart server default")
+test_run = require('test_run')
+---
+...
+inspector = test_run.new()
+---
+...
+engine = inspector:get_cfg('engine')
+---
+...
 s = box.space.test
 ---
 ...
@@ -2130,3 +2139,86 @@ box.snapshot()
 s:drop()
 ---
 ...
+-- test ddl operation within begin/commit/rollback
+-- acquire free space id
+space = box.schema.space.create('ddl_test', {engine = engine})
+---
+...
+id = space.id
+---
+...
+space:drop()
+---
+...
+inspector:cmd("setopt delimiter ';'")
+---
+- true
+...
+box.begin()
+s = box.schema.space.create('ddl_test', {engine = engine, id = id})
+box.rollback();
+---
+...
+box.begin()
+s = box.schema.space.create('ddl_test', {engine = engine, id = id})
+box.commit();
+---
+...
+box.begin()
+s:create_index('pk')
+box.rollback();
+---
+...
+box.begin()
+s:create_index('pk')
+box.commit();
+---
+...
+s:replace({1});
+---
+- [1]
+...
+s:replace({2});
+---
+- [2]
+...
+s:replace({3});
+---
+- [3]
+...
+box.begin()
+s:truncate()
+box.commit();
+---
+...
+s:select();
+---
+- []
+...
+box.begin()
+box.schema.user.grant('guest', 'write', 'space', 'ddl_test')
+box.rollback();
+---
+...
+box.begin()
+box.schema.user.grant('guest', 'write', 'space', 'ddl_test')
+box.commit();
+---
+...
+box.begin()
+box.schema.user.revoke('guest', 'write', 'space', 'ddl_test')
+box.rollback();
+---
+...
+box.begin()
+box.schema.user.revoke('guest', 'write', 'space', 'ddl_test')
+box.commit();
+---
+...
+-- index and space drop are not currently supported (because of truncate)
+s.index.pk:drop();
+---
+...
+s:drop();
+---
+...
diff --git a/test/engine/ddl.test.lua b/test/engine/ddl.test.lua
index cdaf7a5bf..636f6c3b9 100644
--- a/test/engine/ddl.test.lua
+++ b/test/engine/ddl.test.lua
@@ -759,6 +759,9 @@ i3:select()
 
 -- Check that recovery works.
 inspector:cmd("restart server default")
+test_run = require('test_run')
+inspector = test_run.new()
+engine = inspector:get_cfg('engine')
 
 s = box.space.test
 s.index.i1:select()
@@ -775,3 +778,55 @@ s.index.i1:select()
 box.snapshot()
 
 s:drop()
+
+-- test ddl operation within begin/commit/rollback
+-- acquire free space id
+space = box.schema.space.create('ddl_test', {engine = engine})
+id = space.id
+space:drop()
+
+inspector:cmd("setopt delimiter ';'")
+box.begin()
+s = box.schema.space.create('ddl_test', {engine = engine, id = id})
+box.rollback();
+
+box.begin()
+s = box.schema.space.create('ddl_test', {engine = engine, id = id})
+box.commit();
+
+box.begin()
+s:create_index('pk')
+box.rollback();
+
+box.begin()
+s:create_index('pk')
+box.commit();
+
+s:replace({1});
+s:replace({2});
+s:replace({3});
+
+box.begin()
+s:truncate()
+box.commit();
+s:select();
+
+box.begin()
+box.schema.user.grant('guest', 'write', 'space', 'ddl_test')
+box.rollback();
+
+box.begin()
+box.schema.user.grant('guest', 'write', 'space', 'ddl_test')
+box.commit();
+
+box.begin()
+box.schema.user.revoke('guest', 'write', 'space', 'ddl_test')
+box.rollback();
+
+box.begin()
+box.schema.user.revoke('guest', 'write', 'space', 'ddl_test')
+box.commit();
+
+-- index and space drop are not currently supported (because of truncate)
+s.index.pk:drop();
+s:drop();
diff --git a/test/engine/truncate.result b/test/engine/truncate.result
index b4de787fb..277e7bda6 100644
--- a/test/engine/truncate.result
+++ b/test/engine/truncate.result
@@ -24,14 +24,13 @@ box.begin()
 ...
 s:truncate()
 ---
-- error: DDL does not support multi-statement transactions
 ...
 box.commit()
 ---
 ...
 s:select()
 ---
-- - [123]
+- []
 ...
 s:drop()
 ---
diff --git a/test/sql-tap/trigger1.test.lua b/test/sql-tap/trigger1.test.lua
index 2984d4c21..924e57b58 100755
--- a/test/sql-tap/trigger1.test.lua
+++ b/test/sql-tap/trigger1.test.lua
@@ -849,28 +849,24 @@ test:do_catchsql_test(
         -- </trigger1-16.7>
     })
 
-test:do_catchsql_test(
+test:do_execsql_test(
     "trigger1-16.8",
     [[
         START TRANSACTION;
           CREATE TRIGGER tr168 INSERT ON tA BEGIN
             INSERT INTO t16 values(1);
           END;
+        ROLLBACK;
    ]], {
-        1, [[Space _trigger does not support multi-statement transactions]]
 })
 
-test:execsql [[
-    ROLLBACK;
-]]
-
-test:do_catchsql_test(
+test:do_execsql_test(
     "trigger1-16.9",
     [[
         START TRANSACTION;
           DROP TRIGGER t16err3;
+        ROLLBACK;
    ]], {
-        1, [[Space _trigger does not support multi-statement transactions]]
 })
 -- MUST_WORK_TEST
 -- #-------------------------------------------------------------------------
diff --git a/test/sql/delete.result b/test/sql/delete.result
index e024dd697..dcefa8d5f 100644
--- a/test/sql/delete.result
+++ b/test/sql/delete.result
@@ -76,17 +76,21 @@ box.sql.execute("INSERT INTO t1 VALUES(1, 1, 'one');")
 box.sql.execute("INSERT INTO t1 VALUES(2, 2, 'two');")
 ---
 ...
--- Can't truncate in transaction.
+-- Truncate rollback
 box.sql.execute("START TRANSACTION")
 ---
 ...
 box.sql.execute("TRUNCATE TABLE t1;")
 ---
-- error: DDL does not support multi-statement transactions
 ...
 box.sql.execute("ROLLBACK")
 ---
 ...
+box.sql.execute("SELECT * FROM t1")
+---
+- - [1, 1, 'one']
+  - [2, 2, 'two']
+...
 -- Can't truncate view.
 box.sql.execute("CREATE VIEW v1 AS SELECT * FROM t1;")
 ---
diff --git a/test/sql/delete.test.lua b/test/sql/delete.test.lua
index 5a0813071..b61a993a8 100644
--- a/test/sql/delete.test.lua
+++ b/test/sql/delete.test.lua
@@ -50,10 +50,11 @@ box.sql.execute("CREATE TABLE t1(id INT PRIMARY KEY, a INT, b TEXT);")
 box.sql.execute("INSERT INTO t1 VALUES(1, 1, 'one');")
 box.sql.execute("INSERT INTO t1 VALUES(2, 2, 'two');")
 
--- Can't truncate in transaction.
+-- Truncate rollback
 box.sql.execute("START TRANSACTION")
 box.sql.execute("TRUNCATE TABLE t1;")
 box.sql.execute("ROLLBACK")
+box.sql.execute("SELECT * FROM t1")
 
 -- Can't truncate view.
 box.sql.execute("CREATE VIEW v1 AS SELECT * FROM t1;")
-- 
2.21.0

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [tarantool-patches] [PATCH v3 4/5] Transaction support for applier
  2019-03-22 12:06 [tarantool-patches] [PATCH v3 0/5] Transaction in replication protocol Georgy Kirichenko
                   ` (2 preceding siblings ...)
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 3/5] Require for single statement not autocommit in case of ddl Georgy Kirichenko
@ 2019-03-22 12:06 ` Georgy Kirichenko
  2019-03-27 11:41   ` Vladimir Davydov
  2019-03-27 11:48   ` Vladimir Davydov
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 5/5] Raise an error if remote transaction produces non-local changes Georgy Kirichenko
  4 siblings, 2 replies; 14+ messages in thread
From: Georgy Kirichenko @ 2019-03-22 12:06 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Applier fetch incoming rows to form a transaction and then apply it.
Rows are fetched and stored on fiber gc region until last transaction row
with is_commit was fetched. After fetch a multi row transaction is going to be
applied into txn_begin/txn_commit/txn_rolback boundaries. At this time
we could not apply single row transaction in such boundaries because of
ddl which does not support non auto commit transactions.

Closes: #2798
Needed for: #980
---
 src/box/applier.cc                    | 218 +++++++++++++++++------
 test/replication/transaction.result   | 242 ++++++++++++++++++++++++++
 test/replication/transaction.test.lua |  86 +++++++++
 3 files changed, 491 insertions(+), 55 deletions(-)
 create mode 100644 test/replication/transaction.result
 create mode 100644 test/replication/transaction.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 94c07aac7..08ad4a6a8 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -426,6 +426,158 @@ applier_join(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
+/**
+ * Helper struct to bind rows in a list.
+ */
+struct applier_tx_row {
+	/* Next transaction row. */
+	struct stailq_entry next;
+	/* xrow_header struct for the current transaction row. */
+	struct xrow_header row;
+};
+
+static struct applier_tx_row *
+applier_read_tx_row(struct applier *applier)
+{
+	struct ev_io *coio = &applier->io;
+	struct ibuf *ibuf = &applier->ibuf;
+
+	struct applier_tx_row *tx_row = (struct applier_tx_row *)
+		region_alloc(&fiber()->gc, sizeof(struct applier_tx_row));
+
+	if (tx_row == NULL)
+		tnt_raise(OutOfMemory, sizeof(struct applier_tx_row),
+			  "region", "struct applier_tx_row");
+
+	struct xrow_header *row = &tx_row->row;
+
+	double timeout = replication_disconnect_timeout();
+	/*
+	 * Tarantool < 1.7.7 does not send periodic heartbeat
+	 * messages so we can't assume that if we haven't heard
+	 * from the master for quite a while the connection is
+	 * broken - the master might just be idle.
+	 */
+	if (applier->version_id < version_id(1, 7, 7))
+		coio_read_xrow(coio, ibuf, row);
+	else
+		coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+
+	applier->lag = ev_now(loop()) - row->tm;
+	applier->last_row_time = ev_monotonic_now(loop());
+	return tx_row;
+}
+
+/**
+ * Read one transaction from network using applier's input buffer.
+ * Transaction rows are placed onto fiber gc region.
+ * We could not use applier input buffer for that because rpos is adjusted
+ * after each xrow decoding and corresponding network input space is going
+ * to be reused.
+ */
+static void
+applier_read_tx(struct applier *applier, struct stailq *rows)
+{
+	int64_t tsn = 0;
+
+	stailq_create(rows);
+	do {
+		struct applier_tx_row *tx_row = applier_read_tx_row(applier);
+		struct xrow_header *row = &tx_row->row;
+
+		if (iproto_type_is_error(row->type))
+			xrow_decode_error_xc(row);
+
+		/* Replication request. */
+		if (row->replica_id == REPLICA_ID_NIL ||
+		    row->replica_id >= VCLOCK_MAX) {
+			/*
+			 * A safety net, this can only occur
+			 * if we're fed a strangely broken xlog.
+			 */
+			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
+				  int2str(row->replica_id),
+				  tt_uuid_str(&REPLICASET_UUID));
+		}
+		if (tsn == 0) {
+			/*
+			 * Transaction id must be derived from the log sequence
+			 * number of the first row in the transaction.
+			 */
+			tsn = row->tsn;
+			if (row->lsn != tsn)
+				tnt_raise(ClientError, ER_PROTOCOL,
+					  "Transaction id must be derived from "
+					  "the lsn of the first row in the "
+					  "transaction.");
+		}
+		if (tsn != row->tsn)
+			tnt_raise(ClientError, ER_UNSUPPORTED,
+				  "replication",
+				  "interleaving transactions");
+
+		assert(row->bodycnt <= 1);
+		if (row->bodycnt == 1 && !row->is_commit) {
+			/* Save row body to gc region. */
+			void *new_base = region_alloc(&fiber()->gc,
+						      row->body->iov_len);
+			if (new_base == NULL)
+				tnt_raise(OutOfMemory, row->body->iov_len,
+					  "region", "xrow body");
+			memcpy(new_base, row->body->iov_base, row->body->iov_len);
+			/* Adjust row body pointers. */
+			row->body->iov_base = new_base;
+		}
+		stailq_add_tail(rows, &tx_row->next);
+
+	} while (!stailq_last_entry(rows, struct applier_tx_row,
+				    next)->row.is_commit);
+}
+
+/**
+ * Apply all rows in the rows queue as a single transaction.
+ *
+ * Return 0 for success or -1 in case of an error.
+ */
+static int
+applier_apply_tx(struct stailq *rows)
+{
+	int res = 0;
+	struct txn *txn = txn_begin(false);
+	struct applier_tx_row *item;
+	if (txn == NULL)
+		diag_raise();
+	stailq_foreach_entry(item, rows, next) {
+		struct xrow_header *row = &item->row;
+		res = apply_row(row);
+		if (res != 0) {
+			struct error *e = diag_last_error(diag_get());
+			/*
+			 * In case of ER_TUPLE_FOUND error and enabled
+			 * replication_skip_conflict configuration
+			 * option, skip applying the foreign row and
+			 * replace it with NOP in the local write ahead
+			 * log.
+			 */
+			if (e->type == &type_ClientError &&
+			    box_error_code(e) == ER_TUPLE_FOUND &&
+			    replication_skip_conflict) {
+				diag_clear(diag_get());
+				row->type = IPROTO_NOP;
+				row->bodycnt = 0;
+				res = apply_row(row);
+			}
+		}
+		if (res != 0)
+			break;
+	}
+	if (res == 0)
+		res = txn_commit(txn);
+	else
+		txn_rollback();
+	return res;
+}
+
 /**
  * Execute and process SUBSCRIBE request (follow updates from a master).
  */
@@ -555,36 +707,14 @@ applier_subscribe(struct applier *applier)
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
-		/*
-		 * Tarantool < 1.7.7 does not send periodic heartbeat
-		 * messages so we can't assume that if we haven't heard
-		 * from the master for quite a while the connection is
-		 * broken - the master might just be idle.
-		 */
-		if (applier->version_id < version_id(1, 7, 7)) {
-			coio_read_xrow(coio, ibuf, &row);
-		} else {
-			double timeout = replication_disconnect_timeout();
-			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
-		}
-
-		if (iproto_type_is_error(row.type))
-			xrow_decode_error_xc(&row);  /* error */
-		/* Replication request. */
-		if (row.replica_id == REPLICA_ID_NIL ||
-		    row.replica_id >= VCLOCK_MAX) {
-			/*
-			 * A safety net, this can only occur
-			 * if we're fed a strangely broken xlog.
-			 */
-			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
-				  int2str(row.replica_id),
-				  tt_uuid_str(&REPLICASET_UUID));
-		}
+		struct stailq rows;
+		applier_read_tx(applier, &rows);
 
-		applier->lag = ev_now(loop()) - row.tm;
+		struct xrow_header *first_row =
+			&stailq_first_entry(&rows, struct applier_tx_row,
+					    next)->row;
 		applier->last_row_time = ev_monotonic_now(loop());
-		struct replica *replica = replica_by_id(row.replica_id);
+		struct replica *replica = replica_by_id(first_row->replica_id);
 		struct latch *latch = (replica ? &replica->order_latch :
 				       &replicaset.applier.order_latch);
 		/*
@@ -594,33 +724,11 @@ applier_subscribe(struct applier *applier)
 		 * that belong to the same server id.
 		 */
 		latch_lock(latch);
-		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			int res = apply_row(&row);
-			if (res != 0) {
-				struct error *e = diag_last_error(diag_get());
-				/*
-				 * In case of ER_TUPLE_FOUND error and enabled
-				 * replication_skip_conflict configuration
-				 * option, skip applying the foreign row and
-				 * replace it with NOP in the local write ahead
-				 * log.
-				 */
-				if (e->type == &type_ClientError &&
-				    box_error_code(e) == ER_TUPLE_FOUND &&
-				    replication_skip_conflict) {
-					diag_clear(diag_get());
-					struct xrow_header nop;
-					nop.type = IPROTO_NOP;
-					nop.bodycnt = 0;
-					nop.replica_id = row.replica_id;
-					nop.lsn = row.lsn;
-					res = apply_row(&nop);
-				}
-			}
-			if (res != 0) {
-				latch_unlock(latch);
-				diag_raise();
-			}
+		if (vclock_get(&replicaset.vclock, first_row->replica_id) <
+		    first_row->lsn &&
+		    applier_apply_tx(&rows) != 0) {
+			latch_unlock(latch);
+			diag_raise();
 		}
 		latch_unlock(latch);
 
diff --git a/test/replication/transaction.result b/test/replication/transaction.result
new file mode 100644
index 000000000..8c2ac6ee4
--- /dev/null
+++ b/test/replication/transaction.result
@@ -0,0 +1,242 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+---
+...
+_ = s:create_index('pk')
+---
+...
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+---
+- [4, 'r']
+...
+v1 = box.info.vclock
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+---
+...
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- set conflict to third transaction
+_ = box.space.test:delete({4})
+---
+...
+box.space.test:replace({6, 'r'})
+---
+- [6, 'r']
+...
+-- restart replication
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'm']
+  - [6, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- check restart does not help
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'm']
+  - [6, 'r']
+...
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}, replication_skip_conflict = true}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- check last transaction applied without conflicting row
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'm']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+---
+- [8, 'r']
+...
+box.space.test:replace({9, 'r'})
+---
+- [9, 'r']
+...
+-- issue a conflicting tx
+test_run:cmd("switch default")
+---
+- true
+...
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- vclock should be increased but rows skipped
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'm']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+  - [8, 'r']
+  - [9, 'r']
+...
+-- check restart does not change something
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'm']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+  - [8, 'r']
+  - [9, 'r']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+s:drop()
+---
+...
diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
new file mode 100644
index 000000000..f25a4737d
--- /dev/null
+++ b/test/replication/transaction.test.lua
@@ -0,0 +1,86 @@
+env = require('test_run')
+test_run = env.new()
+box.schema.user.grant('guest', 'replication')
+
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+_ = s:create_index('pk')
+
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+v1 = box.info.vclock
+
+test_run:cmd("switch default")
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+-- set conflict to third transaction
+_ = box.space.test:delete({4})
+box.space.test:replace({6, 'r'})
+-- restart replication
+replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication = replication}
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+
+-- check restart does not help
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+box.cfg{replication = {}, replication_skip_conflict = true}
+box.cfg{replication = replication}
+
+-- check last transaction applied without conflicting row
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+box.space.test:replace({9, 'r'})
+
+-- issue a conflicting tx
+test_run:cmd("switch default")
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- vclock should be increased but rows skipped
+box.space.test:select()
+
+-- check restart does not change something
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+box.schema.user.revoke('guest', 'replication')
+s:drop()
-- 
2.21.0

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [tarantool-patches] [PATCH v3 5/5] Raise an error if remote transaction produces non-local changes
  2019-03-22 12:06 [tarantool-patches] [PATCH v3 0/5] Transaction in replication protocol Georgy Kirichenko
                   ` (3 preceding siblings ...)
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 4/5] Transaction support for applier Georgy Kirichenko
@ 2019-03-22 12:06 ` Georgy Kirichenko
  2019-03-27 12:06   ` Vladimir Davydov
  4 siblings, 1 reply; 14+ messages in thread
From: Georgy Kirichenko @ 2019-03-22 12:06 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Disallow changes for non-local spaces during replication stream
applying. As we do not support distributed transaction yet we could not
provide a transactional replication for such side effects if there are
not NOPed.

Needed for: #2798
Follow up for: 27283debc327a1ef87e025badeed97d9ac264ac6
---
 src/box/applier.cc                   | 18 ++++++++++--
 src/box/txn.c                        | 15 ++++++++++
 src/box/txn.h                        |  9 ++++++
 test/replication/on_replace.result   | 44 ++++++++++++++++++++++++----
 test/replication/on_replace.test.lua | 20 +++++++++++--
 5 files changed, 96 insertions(+), 10 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 08ad4a6a8..2a528b856 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -571,9 +571,23 @@ applier_apply_tx(struct stailq *rows)
 		if (res != 0)
 			break;
 	}
-	if (res == 0)
+	if (res == 0) {
+		/*
+		 * We are going to commit so it's a high time to check if
+		 * the current transaction has non-local effects.
+		 */
+		if (txn_is_distributed(txn)) {
+			/*
+			 * A transaction mixes remote and local rows and
+			 * countn't be replicated back because we don't
+			 * support distributed transactions yet.
+			 */
+			diag_set(ClientError, ER_UNSUPPORTED,
+				 "Applier", "distributed transactions");
+			return -1;
+		}
 		res = txn_commit(txn);
-	else
+	} else
 		txn_rollback();
 	return res;
 }
diff --git a/src/box/txn.c b/src/box/txn.c
index 31e19951f..97f076f22 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -224,6 +224,21 @@ fail:
 	return NULL;
 }
 
+bool
+txn_is_distributed(struct txn *txn)
+{
+	assert(txn == in_txn());
+	if (txn->n_local_rows == 0 || txn->n_remote_rows == 0)
+		return false;
+	struct txn_stmt *stmt;
+	/* Search for new non local group rows. */
+	stailq_foreach_entry(stmt, &txn->stmts, next)
+		if (stmt->row->replica_id == 0 &&
+		    stmt->space->def->opts.group_id != GROUP_LOCAL)
+			return true;
+	return false;
+}
+
 /**
  * End a statement. In autocommit mode, end
  * the current transaction as well.
diff --git a/src/box/txn.h b/src/box/txn.h
index 3572b005d..c00eb28a0 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -296,6 +296,15 @@ txn_commit_ro_stmt(struct txn *txn)
 	}
 }
 
+/*
+ * Check whether transaction is distributed or not.
+ * It's essential in case of replication because we couldn't
+ * replicate a transaction with both remote and local non NOP
+ * statements.
+ */
+bool
+txn_is_distributed(struct txn *txn);
+
 /**
  * End a statement. In autocommit mode, end
  * the current transaction as well.
diff --git a/test/replication/on_replace.result b/test/replication/on_replace.result
index 2e95b90ea..a02b90f7e 100644
--- a/test/replication/on_replace.result
+++ b/test/replication/on_replace.result
@@ -104,7 +104,7 @@ box.space.test:drop()
 box.schema.user.revoke('guest', 'replication')
 ---
 ...
--- gh-2682 on_replace on slave server with data change
+-- gh-2798 on_replace on slave server with non-local data change should fail
 SERVERS = { 'on_replace1', 'on_replace2' }
 ---
 ...
@@ -143,7 +143,7 @@ fiber = require'fiber'
 while box.space.s2 == nil do fiber.sleep(0.00001) end
 ---
 ...
-_ = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
+tg = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
 ---
 ...
 test_run:cmd('switch on_replace1')
@@ -154,20 +154,27 @@ box.space.s1:replace({1, 2, 3, 4})
 ---
 - [1, 2, 3, 4]
 ...
-while #(box.space.s2:select()) == 0 do fiber.sleep(0.00001) end
+while (box.info.replication[3 - box.info.id].downstream.status ~= 'stopped') do fiber.sleep(0.00001) end
 ---
 ...
 test_run:cmd('switch on_replace2')
 ---
 - true
 ...
+while (box.info.replication[3 - box.info.id].upstream.status ~= 'stopped') do fiber.sleep(0.00001) end
+---
+...
+box.info.replication[3 - box.info.id].upstream.message
+---
+- Applier does not support distributed transactions
+...
 box.space.s1:select()
 ---
-- - [1, 2, 3, 4]
+- []
 ...
 box.space.s2:select()
 ---
-- - [1, 2, 3, 4]
+- []
 ...
 test_run:cmd('switch on_replace1')
 ---
@@ -179,6 +186,33 @@ box.space.s1:select()
 ...
 box.space.s2:select()
 ---
+- []
+...
+-- gh-2798 on_replace on slave server with local data change is allowed
+test_run:cmd('switch on_replace2')
+---
+- true
+...
+s3 = box.schema.space.create('s3', {is_local = true})
+---
+...
+_ = s3:create_index('pk')
+---
+...
+tg = box.space.s1:on_replace(function (old, new) box.space.s3:replace(new) end, tg)
+---
+...
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication = replication}
+---
+...
+s3:select()
+---
 - - [1, 2, 3, 4]
 ...
 _ = test_run:cmd('switch default')
diff --git a/test/replication/on_replace.test.lua b/test/replication/on_replace.test.lua
index e34832103..779dbf768 100644
--- a/test/replication/on_replace.test.lua
+++ b/test/replication/on_replace.test.lua
@@ -44,7 +44,7 @@ box.space.test:drop()
 box.schema.user.revoke('guest', 'replication')
 
 
--- gh-2682 on_replace on slave server with data change
+-- gh-2798 on_replace on slave server with non-local data change should fail
 
 SERVERS = { 'on_replace1', 'on_replace2' }
 test_run:create_cluster(SERVERS, "replication", {args="0.2"})
@@ -60,13 +60,15 @@ _ = s2:create_index('pk')
 test_run:cmd('switch on_replace2')
 fiber = require'fiber'
 while box.space.s2 == nil do fiber.sleep(0.00001) end
-_ = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
+tg = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
 
 test_run:cmd('switch on_replace1')
 box.space.s1:replace({1, 2, 3, 4})
-while #(box.space.s2:select()) == 0 do fiber.sleep(0.00001) end
+while (box.info.replication[3 - box.info.id].downstream.status ~= 'stopped') do fiber.sleep(0.00001) end
 
 test_run:cmd('switch on_replace2')
+while (box.info.replication[3 - box.info.id].upstream.status ~= 'stopped') do fiber.sleep(0.00001) end
+box.info.replication[3 - box.info.id].upstream.message
 box.space.s1:select()
 box.space.s2:select()
 
@@ -74,6 +76,18 @@ test_run:cmd('switch on_replace1')
 box.space.s1:select()
 box.space.s2:select()
 
+-- gh-2798 on_replace on slave server with local data change is allowed
+test_run:cmd('switch on_replace2')
+s3 = box.schema.space.create('s3', {is_local = true})
+_ = s3:create_index('pk')
+tg = box.space.s1:on_replace(function (old, new) box.space.s3:replace(new) end, tg)
+
+replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication = replication}
+
+s3:select()
+
 _ = test_run:cmd('switch default')
 test_run:drop_cluster(SERVERS)
 test_run:cleanup_cluster()
-- 
2.21.0

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [tarantool-patches] [PATCH v3 1/5] Abort vinyl index creation in case of truncation rollback
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 1/5] Abort vinyl index creation in case of truncation rollback Georgy Kirichenko
@ 2019-03-27  9:59   ` Vladimir Davydov
  0 siblings, 0 replies; 14+ messages in thread
From: Vladimir Davydov @ 2019-03-27  9:59 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Fri, Mar 22, 2019 at 03:06:06PM +0300, Georgy Kirichenko wrote:
> Abort a new index creation if truncate couldn't be finished because of
> rollback or an error. Without this vinyl fails because of internal
> scheduler assertion.
> 
> Needed for: 2798
> ---
>  src/box/alter.cc            | 13 +++++++--
>  test/engine/errinj.result   | 53 +++++++++++++++++++++++++++++++++++++
>  test/engine/errinj.test.lua | 15 +++++++++++
>  test/engine/suite.ini       |  1 +
>  4 files changed, 80 insertions(+), 2 deletions(-)
>  create mode 100644 test/engine/errinj.result
>  create mode 100644 test/engine/errinj.test.lua
> 
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index 080a72b9f..daaa9cd57 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -1310,13 +1310,16 @@ public:
>  		: AlterSpaceOp(alter), iid(iid) {}
>  	/** id of the index to truncate. */
>  	uint32_t iid;
> +	struct index *new_index;

new_index must be initialized with NULL in the constructor.

Other than this minor thing, the patch is fine by me. Kostja has already
pushed it to master, 2.1, and 1.10.

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [tarantool-patches] [PATCH v3 2/5] Synchronize lua schema update with space cache
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 2/5] Synchronize lua schema update with space cache Georgy Kirichenko
@ 2019-03-27 10:03   ` Vladimir Davydov
  2019-03-27 10:32     ` Vladimir Davydov
  0 siblings, 1 reply; 14+ messages in thread
From: Vladimir Davydov @ 2019-03-27 10:03 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Fri, Mar 22, 2019 at 03:06:07PM +0300, Georgy Kirichenko wrote:
> Update lua schema as soon as space cache replace was done instead of
> doing this while on_commit trigger executes. In opposite then case
> schema changes would not be visible until commit was finished.
> 
> Needed for: #2798
> ---
>  src/box/alter.cc | 13 ++++++++-----
>  1 file changed, 8 insertions(+), 5 deletions(-)
> 
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index daaa9cd57..275e39dd5 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -750,8 +750,6 @@ alter_space_commit(struct trigger *trigger, void *event)
>  		op->commit(alter, txn->signature);
>  	}
>  
> -	trigger_run_xc(&on_alter_space, alter->new_space);
> -
>  	alter->new_space = NULL; /* for alter_space_delete(). */
>  	/*
>  	 * Delete the old version of the space, we are not
> @@ -787,6 +785,8 @@ alter_space_rollback(struct trigger *trigger, void * /* event */)
>  	space_swap_triggers(alter->new_space, alter->old_space);
>  	space_swap_fk_constraints(alter->new_space, alter->old_space);
>  	space_cache_replace(alter->new_space, alter->old_space);
> +	trigger_run(&on_alter_space, alter->old_space);
> +
>  	alter_space_delete(alter);
>  }
>  
> @@ -888,6 +888,7 @@ alter_space_do(struct txn *txn, struct alter_space *alter)
>  	 * cache with it.
>  	 */
>  	space_cache_replace(alter->old_space, alter->new_space);
> +	trigger_run_xc(&on_alter_space, alter->new_space);

The comment right above says that this function must not throw an
exception here so I guess we should use _xc free variant. Not that it
really matters, as on_alter_space trigger can only fail on OOM, which
is hardly ever possible.

>  
>  	/*
>  	 * Install transaction commit/rollback triggers to either
> @@ -1388,7 +1389,6 @@ on_drop_space_commit(struct trigger *trigger, void *event)
>  {
>  	(void) event;
>  	struct space *space = (struct space *)trigger->data;
> -	trigger_run_xc(&on_alter_space, space);
>  	space_delete(space);
>  }
>  
> @@ -1403,6 +1403,7 @@ on_drop_space_rollback(struct trigger *trigger, void *event)
>  	(void) event;
>  	struct space *space = (struct space *)trigger->data;
>  	space_cache_replace(NULL, space);
> +	trigger_run(&on_alter_space, space);
>  }
>  
>  /**
> @@ -1412,8 +1413,7 @@ static void
>  on_create_space_commit(struct trigger *trigger, void *event)
>  {
>  	(void) event;
> -	struct space *space = (struct space *)trigger->data;
> -	trigger_run_xc(&on_alter_space, space);
> +	(void) trigger;
>  }

The function is not used now. I guess it's okay to leave it, just in
case we want to extend it in the future.

>  
>  /**
> @@ -1429,6 +1429,7 @@ on_create_space_rollback(struct trigger *trigger, void *event)
>  	(void) event;
>  	struct space *space = (struct space *)trigger->data;
>  	space_cache_replace(space, NULL);
> +	trigger_run(&on_alter_space, space);
>  	space_delete(space);
>  }
>  
> @@ -1672,6 +1673,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
>  		 * execution on a replica.
>  		 */
>  		space_cache_replace(NULL, space);
> +		trigger_run_xc(&on_alter_space, space);

Agagin, we must no throw exceptions here.

>  		/*
>  		 * Do not forget to update schema_version right after
>  		 * inserting the space to the space_cache, since no
> @@ -1764,6 +1766,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
>  		 * execution on a replica.
>  		 */
>  		space_cache_replace(old_space, NULL);
> +		trigger_run_xc(&on_alter_space, old_space);
>  		/*
>  		 * Do not forget to update schema_version right after
>  		 * deleting the space from the space_cache, since no

I think we should've fixed on_alter_sequence trigger in the scope of
this issue, too.

The patch has been pushed to master, 2.1, and 1.10 by Kostja.

I guess we can address the minor hitches I pointed out above in
a separate 'review-fix' patch.

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [tarantool-patches] [PATCH v3 2/5] Synchronize lua schema update with space cache
  2019-03-27 10:03   ` Vladimir Davydov
@ 2019-03-27 10:32     ` Vladimir Davydov
  2019-03-27 11:45       ` [tarantool-patches] " Konstantin Osipov
  0 siblings, 1 reply; 14+ messages in thread
From: Vladimir Davydov @ 2019-03-27 10:32 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Wed, Mar 27, 2019 at 01:03:51PM +0300, Vladimir Davydov wrote:
> On Fri, Mar 22, 2019 at 03:06:07PM +0300, Georgy Kirichenko wrote:
> > Update lua schema as soon as space cache replace was done instead of
> > doing this while on_commit trigger executes. In opposite then case
> > schema changes would not be visible until commit was finished.
> > 
> > Needed for: #2798
> > ---
> >  src/box/alter.cc | 13 ++++++++-----
> >  1 file changed, 8 insertions(+), 5 deletions(-)
> > 
> > diff --git a/src/box/alter.cc b/src/box/alter.cc
> > index daaa9cd57..275e39dd5 100644
> > --- a/src/box/alter.cc
> > +++ b/src/box/alter.cc
> > @@ -750,8 +750,6 @@ alter_space_commit(struct trigger *trigger, void *event)
> >  		op->commit(alter, txn->signature);
> >  	}
> >  
> > -	trigger_run_xc(&on_alter_space, alter->new_space);
> > -
> >  	alter->new_space = NULL; /* for alter_space_delete(). */
> >  	/*
> >  	 * Delete the old version of the space, we are not
> > @@ -787,6 +785,8 @@ alter_space_rollback(struct trigger *trigger, void * /* event */)
> >  	space_swap_triggers(alter->new_space, alter->old_space);
> >  	space_swap_fk_constraints(alter->new_space, alter->old_space);
> >  	space_cache_replace(alter->new_space, alter->old_space);
> > +	trigger_run(&on_alter_space, alter->old_space);
> > +
> >  	alter_space_delete(alter);
> >  }
> >  
> > @@ -888,6 +888,7 @@ alter_space_do(struct txn *txn, struct alter_space *alter)
> >  	 * cache with it.
> >  	 */
> >  	space_cache_replace(alter->old_space, alter->new_space);
> > +	trigger_run_xc(&on_alter_space, alter->new_space);
> 
> The comment right above says that this function must not throw an
> exception here so I guess we should use _xc free variant. Not that it
> really matters, as on_alter_space trigger can only fail on OOM, which
> is hardly ever possible.

Guess we should simply fold trigger_run_xc in space_cache_replace and
panic on error.

BTW this is how things were before

commit 1f736583361f537c8e1fb4ea0df0b92ee8d50c6d
Author: Vladimir Davydov <vdavydov.dev@gmail.com>
Date:   Tue Sep 19 14:21:59 2017 +0300

    box: use trigger to push space data to Lua

    Currently, it is done by space_cache_replace/delete which violates
    incapsulation. Let's introduce a trigger that is fired after a change in
    a space definition is committed and use it to propagate changes to Lua.
    Patch by @kostja.

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [tarantool-patches] [PATCH v3 3/5] Require for single statement not autocommit in case of ddl
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 3/5] Require for single statement not autocommit in case of ddl Georgy Kirichenko
@ 2019-03-27 10:49   ` Vladimir Davydov
  0 siblings, 0 replies; 14+ messages in thread
From: Vladimir Davydov @ 2019-03-27 10:49 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Fri, Mar 22, 2019 at 03:06:08PM +0300, Georgy Kirichenko wrote:
> Allow single statement transactions within begin/commit in case of an
> ddl operation instead of auto commit requirements. This is essential
> for a transactional applier.
> 
> Needed for: #2798
> ---
>  src/box/memtx_engine.c         | 20 ++------
>  src/box/txn.c                  |  9 +++-
>  src/box/txn.h                  |  5 ++
>  test/box/ddl.result            |  8 +--
>  test/box/ddl.test.lua          |  4 +-
>  test/box/transaction.result    | 53 +++++---------------
>  test/box/transaction.test.lua  | 27 ++++------
>  test/engine/ddl.result         | 92 ++++++++++++++++++++++++++++++++++
>  test/engine/ddl.test.lua       | 55 ++++++++++++++++++++
>  test/engine/truncate.result    |  3 +-
>  test/sql-tap/trigger1.test.lua | 12 ++---
>  test/sql/delete.result         |  8 ++-
>  test/sql/delete.test.lua       |  3 +-
>  13 files changed, 204 insertions(+), 95 deletions(-)
> 
> diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
> index d468d1cd8..924f8bbc4 100644
> --- a/src/box/memtx_engine.c
> +++ b/src/box/memtx_engine.c
> @@ -384,15 +384,7 @@ static int
>  memtx_engine_begin(struct engine *engine, struct txn *txn)
>  {
>  	(void)engine;
> -	/*
> -	 * Register a trigger to rollback transaction on yield.
> -	 * This must be done in begin(), since it's
> -	 * the first thing txn invokes after txn->n_stmts++,
> -	 * to match with trigger_clear() in rollbackStatement().
> -	 */
> -	if (txn->is_autocommit == false) {
> -		memtx_init_txn(txn);
> -	}
> +	(void)txn;
>  	return 0;
>  }
>  
> @@ -404,15 +396,9 @@ memtx_engine_begin_statement(struct engine *engine, struct txn *txn)
>  	if (txn->engine_tx == NULL) {
>  		struct space *space = txn_last_stmt(txn)->space;
>  
> -		if (space->def->id > BOX_SYSTEM_ID_MAX &&
> -		    ! rlist_empty(&space->on_replace)) {
> -			/**
> -			 * A space on_replace trigger may initiate
> -			 * a yield.
> -			 */
> -			assert(txn->is_autocommit);
> +		if (space->def->id > BOX_SYSTEM_ID_MAX)
> +			/* Setup triggers for non-ddl transactions. */
>  			memtx_init_txn(txn);
> -		}
>  	}
>  	return 0;
>  }
> diff --git a/src/box/txn.c b/src/box/txn.c
> index deb4fac47..31e19951f 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -147,6 +147,7 @@ txn_begin(bool is_autocommit)
>  	txn->n_local_rows = 0;
>  	txn->n_remote_rows = 0;
>  	txn->is_autocommit = is_autocommit;
> +	txn->is_single_statement = false;
>  	txn->has_triggers  = false;
>  	txn->is_aborted = false;
>  	txn->in_sub_stmt = 0;
> @@ -191,6 +192,11 @@ txn_begin_stmt(struct space *space)
>  		diag_set(ClientError, ER_SUB_STMT_MAX);
>  		return NULL;
>  	}
> +	if (txn->is_single_statement && !stailq_empty(&txn->stmts)) {
> +		diag_set(ClientError, ER_UNSUPPORTED,
> +			 "DDL", "multi-statement transactions");
> +		return NULL;
> +	}
>  
>  	struct txn_stmt *stmt = txn_stmt_new(txn);
>  	if (stmt == NULL) {
> @@ -430,11 +436,12 @@ txn_abort(struct txn *txn)
>  int
>  txn_check_singlestatement(struct txn *txn, const char *where)
>  {
> -	if (!txn->is_autocommit || !txn_is_first_statement(txn)) {
> +	if (!txn_is_first_statement(txn)) {
>  		diag_set(ClientError, ER_UNSUPPORTED,
>  			 where, "multi-statement transactions");
>  		return -1;
>  	}
> +	txn->is_single_statement = true;
>  	return 0;
>  }

Judging by the function name and protocol, txn_check_singlestatement
isn't supposed to update txn fields. Relying on the fact that it is
called by any DDL operation (and only by DDL) is cumbersome IMO.

May be, better mark a transaction as DDL in begin_statement callback and
fail subsequent statements if the transaction is already marked?

> diff --git a/test/engine/ddl.test.lua b/test/engine/ddl.test.lua
> index cdaf7a5bf..636f6c3b9 100644
> --- a/test/engine/ddl.test.lua
> +++ b/test/engine/ddl.test.lua
> @@ -759,6 +759,9 @@ i3:select()
>  
>  -- Check that recovery works.
>  inspector:cmd("restart server default")
> +test_run = require('test_run')
> +inspector = test_run.new()
> +engine = inspector:get_cfg('engine')

These two should remain set after restart. No need to reset them.

>  
>  s = box.space.test
>  s.index.i1:select()
> @@ -775,3 +778,55 @@ s.index.i1:select()
>  box.snapshot()
>  
>  s:drop()
> +
> +-- test ddl operation within begin/commit/rollback
> +-- acquire free space id
> +space = box.schema.space.create('ddl_test', {engine = engine})
> +id = space.id
> +space:drop()
> +
> +inspector:cmd("setopt delimiter ';'")
> +box.begin()
> +s = box.schema.space.create('ddl_test', {engine = engine, id = id})
> +box.rollback();
> +
> +box.begin()
> +s = box.schema.space.create('ddl_test', {engine = engine, id = id})
> +box.commit();
> +
> +box.begin()
> +s:create_index('pk')
> +box.rollback();
> +
> +box.begin()
> +s:create_index('pk')
> +box.commit();
> +
> +s:replace({1});
> +s:replace({2});
> +s:replace({3});
> +
> +box.begin()
> +s:truncate()
> +box.commit();
> +s:select();
> +
> +box.begin()
> +box.schema.user.grant('guest', 'write', 'space', 'ddl_test')
> +box.rollback();
> +
> +box.begin()
> +box.schema.user.grant('guest', 'write', 'space', 'ddl_test')
> +box.commit();
> +
> +box.begin()
> +box.schema.user.revoke('guest', 'write', 'space', 'ddl_test')
> +box.rollback();
> +
> +box.begin()
> +box.schema.user.revoke('guest', 'write', 'space', 'ddl_test')
> +box.commit();

Shouldn't we also check that mixing DDL with DML and combining several
DDL operations in one transaction still fail?

> +
> +-- index and space drop are not currently supported (because of truncate)

What do you mean by 'truncate'? Dropping indexes and grants? Please fix
the comment.

> +s.index.pk:drop();
> +s:drop();

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [tarantool-patches] [PATCH v3 4/5] Transaction support for applier
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 4/5] Transaction support for applier Georgy Kirichenko
@ 2019-03-27 11:41   ` Vladimir Davydov
  2019-03-27 11:48   ` Vladimir Davydov
  1 sibling, 0 replies; 14+ messages in thread
From: Vladimir Davydov @ 2019-03-27 11:41 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Fri, Mar 22, 2019 at 03:06:09PM +0300, Georgy Kirichenko wrote:
> Applier fetch incoming rows to form a transaction and then apply it.
> Rows are fetched and stored on fiber gc region until last transaction row
> with is_commit was fetched. After fetch a multi row transaction is going to be
> applied into txn_begin/txn_commit/txn_rolback boundaries. At this time
> we could not apply single row transaction in such boundaries because of
> ddl which does not support non auto commit transactions.
> 
> Closes: #2798
> Needed for: #980
> ---
>  src/box/applier.cc                    | 218 +++++++++++++++++------
>  test/replication/transaction.result   | 242 ++++++++++++++++++++++++++
>  test/replication/transaction.test.lua |  86 +++++++++
>  3 files changed, 491 insertions(+), 55 deletions(-)
>  create mode 100644 test/replication/transaction.result
>  create mode 100644 test/replication/transaction.test.lua

This patch looks good to me.

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [tarantool-patches] Re: [PATCH v3 2/5] Synchronize lua schema update with space cache
  2019-03-27 10:32     ` Vladimir Davydov
@ 2019-03-27 11:45       ` Konstantin Osipov
  0 siblings, 0 replies; 14+ messages in thread
From: Konstantin Osipov @ 2019-03-27 11:45 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Vladimir Davydov <vdavydov.dev@gmail.com> [19/03/27 14:22]:
> commit 1f736583361f537c8e1fb4ea0df0b92ee8d50c6d
> Author: Vladimir Davydov <vdavydov.dev@gmail.com>
> Date:   Tue Sep 19 14:21:59 2017 +0300
> 
>     box: use trigger to push space data to Lua
> 
>     Currently, it is done by space_cache_replace/delete which violates
>     incapsulation. Let's introduce a trigger that is fired after a change in
>     a space definition is committed and use it to propagate changes to Lua.
>     Patch by @kostja.

Let's call triggers, but call them directly from
space_cache_replace(). Please submit a follow up patch. We
discussed this with Georgy, but decided it's a bit outside of the
scope of the current patch.

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [tarantool-patches] [PATCH v3 4/5] Transaction support for applier
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 4/5] Transaction support for applier Georgy Kirichenko
  2019-03-27 11:41   ` Vladimir Davydov
@ 2019-03-27 11:48   ` Vladimir Davydov
  1 sibling, 0 replies; 14+ messages in thread
From: Vladimir Davydov @ 2019-03-27 11:48 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Fri, Mar 22, 2019 at 03:06:09PM +0300, Georgy Kirichenko wrote:
> +/**
> + * Apply all rows in the rows queue as a single transaction.
> + *
> + * Return 0 for success or -1 in case of an error.
> + */
> +static int
> +applier_apply_tx(struct stailq *rows)
> +{
> +	int res = 0;
> +	struct txn *txn = txn_begin(false);
> +	struct applier_tx_row *item;
> +	if (txn == NULL)
> +		diag_raise();

Just one thing - mixing retval and exceptions to propagate errors
doesn't look good. Besides you won't release the latch in case of
an exception (see below). Please replace with 'return -1'.

> +	stailq_foreach_entry(item, rows, next) {
> +		struct xrow_header *row = &item->row;
> +		res = apply_row(row);
> +		if (res != 0) {
> +			struct error *e = diag_last_error(diag_get());
> +			/*
> +			 * In case of ER_TUPLE_FOUND error and enabled
> +			 * replication_skip_conflict configuration
> +			 * option, skip applying the foreign row and
> +			 * replace it with NOP in the local write ahead
> +			 * log.
> +			 */
> +			if (e->type == &type_ClientError &&
> +			    box_error_code(e) == ER_TUPLE_FOUND &&
> +			    replication_skip_conflict) {
> +				diag_clear(diag_get());
> +				row->type = IPROTO_NOP;
> +				row->bodycnt = 0;
> +				res = apply_row(row);
> +			}
> +		}
> +		if (res != 0)
> +			break;
> +	}
> +	if (res == 0)
> +		res = txn_commit(txn);
> +	else
> +		txn_rollback();
> +	return res;
> +}

> @@ -594,33 +724,11 @@ applier_subscribe(struct applier *applier)
>  		 * that belong to the same server id.
>  		 */
>  		latch_lock(latch);
> -		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
> -			int res = apply_row(&row);
> -			if (res != 0) {
> -				struct error *e = diag_last_error(diag_get());
> -				/*
> -				 * In case of ER_TUPLE_FOUND error and enabled
> -				 * replication_skip_conflict configuration
> -				 * option, skip applying the foreign row and
> -				 * replace it with NOP in the local write ahead
> -				 * log.
> -				 */
> -				if (e->type == &type_ClientError &&
> -				    box_error_code(e) == ER_TUPLE_FOUND &&
> -				    replication_skip_conflict) {
> -					diag_clear(diag_get());
> -					struct xrow_header nop;
> -					nop.type = IPROTO_NOP;
> -					nop.bodycnt = 0;
> -					nop.replica_id = row.replica_id;
> -					nop.lsn = row.lsn;
> -					res = apply_row(&nop);
> -				}
> -			}
> -			if (res != 0) {
> -				latch_unlock(latch);
> -				diag_raise();
> -			}
> +		if (vclock_get(&replicaset.vclock, first_row->replica_id) <
> +		    first_row->lsn &&
> +		    applier_apply_tx(&rows) != 0) {
> +			latch_unlock(latch);
> +			diag_raise();
>  		}
>  		latch_unlock(latch);
>  

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [tarantool-patches] [PATCH v3 5/5] Raise an error if remote transaction produces non-local changes
  2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 5/5] Raise an error if remote transaction produces non-local changes Georgy Kirichenko
@ 2019-03-27 12:06   ` Vladimir Davydov
  0 siblings, 0 replies; 14+ messages in thread
From: Vladimir Davydov @ 2019-03-27 12:06 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Fri, Mar 22, 2019 at 03:06:10PM +0300, Georgy Kirichenko wrote:
> diff --git a/test/replication/on_replace.result b/test/replication/on_replace.result
> index 2e95b90ea..a02b90f7e 100644
> --- a/test/replication/on_replace.result
> +++ b/test/replication/on_replace.result
> @@ -154,20 +154,27 @@ box.space.s1:replace({1, 2, 3, 4})
>  ---
>  - [1, 2, 3, 4]
>  ...
> -while #(box.space.s2:select()) == 0 do fiber.sleep(0.00001) end
> +while (box.info.replication[3 - box.info.id].downstream.status ~= 'stopped') do fiber.sleep(0.00001) end

Nit:

3 - box.info.id ?

Looks confusing. Why not simply use 1? It should point to the master,
shouldn't it?

Other than that, the patch looks good to me.

^ permalink raw reply	[flat|nested] 14+ messages in thread

end of thread, other threads:[~2019-03-27 12:06 UTC | newest]

Thread overview: 14+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-03-22 12:06 [tarantool-patches] [PATCH v3 0/5] Transaction in replication protocol Georgy Kirichenko
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 1/5] Abort vinyl index creation in case of truncation rollback Georgy Kirichenko
2019-03-27  9:59   ` Vladimir Davydov
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 2/5] Synchronize lua schema update with space cache Georgy Kirichenko
2019-03-27 10:03   ` Vladimir Davydov
2019-03-27 10:32     ` Vladimir Davydov
2019-03-27 11:45       ` [tarantool-patches] " Konstantin Osipov
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 3/5] Require for single statement not autocommit in case of ddl Georgy Kirichenko
2019-03-27 10:49   ` Vladimir Davydov
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 4/5] Transaction support for applier Georgy Kirichenko
2019-03-27 11:41   ` Vladimir Davydov
2019-03-27 11:48   ` Vladimir Davydov
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 5/5] Raise an error if remote transaction produces non-local changes Georgy Kirichenko
2019-03-27 12:06   ` Vladimir Davydov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox