* [tarantool-patches] [PATCH v3 1/5] Abort vinyl index creation in case of truncation rollback
2019-03-22 12:06 [tarantool-patches] [PATCH v3 0/5] Transaction in replication protocol Georgy Kirichenko
@ 2019-03-22 12:06 ` Georgy Kirichenko
2019-03-27 9:59 ` Vladimir Davydov
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 2/5] Synchronize lua schema update with space cache Georgy Kirichenko
` (3 subsequent siblings)
4 siblings, 1 reply; 14+ messages in thread
From: Georgy Kirichenko @ 2019-03-22 12:06 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Abort a new index creation if truncate couldn't be finished because of
rollback or an error. Without this vinyl fails because of internal
scheduler assertion.
Needed for: 2798
---
src/box/alter.cc | 13 +++++++--
test/engine/errinj.result | 53 +++++++++++++++++++++++++++++++++++++
test/engine/errinj.test.lua | 15 +++++++++++
test/engine/suite.ini | 1 +
4 files changed, 80 insertions(+), 2 deletions(-)
create mode 100644 test/engine/errinj.result
create mode 100644 test/engine/errinj.test.lua
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 080a72b9f..daaa9cd57 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -1310,13 +1310,16 @@ public:
: AlterSpaceOp(alter), iid(iid) {}
/** id of the index to truncate. */
uint32_t iid;
+ struct index *new_index;
virtual void prepare(struct alter_space *alter);
virtual void commit(struct alter_space *alter, int64_t signature);
+ virtual ~TruncateIndex();
};
void
TruncateIndex::prepare(struct alter_space *alter)
{
+ new_index = space_index(alter->new_space, iid);
if (iid == 0) {
/*
* Notify the engine that the primary index
@@ -1333,7 +1336,6 @@ TruncateIndex::prepare(struct alter_space *alter)
* index was recreated. For example, Vinyl uses this
* callback to load indexes during local recovery.
*/
- struct index *new_index = space_index(alter->new_space, iid);
assert(new_index != NULL);
space_build_index_xc(alter->new_space, new_index,
alter->new_space->format);
@@ -1343,10 +1345,17 @@ void
TruncateIndex::commit(struct alter_space *alter, int64_t signature)
{
struct index *old_index = space_index(alter->old_space, iid);
- struct index *new_index = space_index(alter->new_space, iid);
index_commit_drop(old_index, signature);
index_commit_create(new_index, signature);
+ new_index = NULL;
+}
+
+TruncateIndex::~TruncateIndex()
+{
+ if (new_index == NULL)
+ return;
+ index_abort_create(new_index);
}
/**
diff --git a/test/engine/errinj.result b/test/engine/errinj.result
new file mode 100644
index 000000000..d244c334a
--- /dev/null
+++ b/test/engine/errinj.result
@@ -0,0 +1,53 @@
+test_run = require('test_run')
+---
+...
+inspector = test_run.new()
+---
+...
+engine = inspector:get_cfg('engine')
+---
+...
+errinj = box.error.injection
+---
+...
+-- truncation rollback should not crash
+s = box.schema.space.create('truncate_rollback', {engine = engine})
+---
+...
+_ = s:create_index('pk')
+---
+...
+_ = s:create_index('sk', {parts = {1, 'int'}})
+---
+...
+for i = 1, 10 do s:replace({i, i}) end
+---
+...
+errinj.set('ERRINJ_WAL_IO', true)
+---
+- ok
+...
+s:truncate()
+---
+- error: Failed to write to disk
+...
+errinj.set('ERRINJ_WAL_IO', false)
+---
+- ok
+...
+s:select()
+---
+- - [1, 1]
+ - [2, 2]
+ - [3, 3]
+ - [4, 4]
+ - [5, 5]
+ - [6, 6]
+ - [7, 7]
+ - [8, 8]
+ - [9, 9]
+ - [10, 10]
+...
+s:drop()
+---
+...
diff --git a/test/engine/errinj.test.lua b/test/engine/errinj.test.lua
new file mode 100644
index 000000000..57f3a962c
--- /dev/null
+++ b/test/engine/errinj.test.lua
@@ -0,0 +1,15 @@
+test_run = require('test_run')
+inspector = test_run.new()
+engine = inspector:get_cfg('engine')
+errinj = box.error.injection
+
+-- truncation rollback should not crash
+s = box.schema.space.create('truncate_rollback', {engine = engine})
+_ = s:create_index('pk')
+_ = s:create_index('sk', {parts = {1, 'int'}})
+for i = 1, 10 do s:replace({i, i}) end
+errinj.set('ERRINJ_WAL_IO', true)
+s:truncate()
+errinj.set('ERRINJ_WAL_IO', false)
+s:select()
+s:drop()
diff --git a/test/engine/suite.ini b/test/engine/suite.ini
index 3f82a1325..3db02ab6f 100644
--- a/test/engine/suite.ini
+++ b/test/engine/suite.ini
@@ -3,6 +3,7 @@ core = tarantool
description = tarantool multiengine tests
script = box.lua
use_unix_sockets = True
+release_disabled = errinj.test.lua
config = engine.cfg
#disabled = replica_join.test.lua
lua_libs = conflict.lua ../box/lua/utils.lua ../box/lua/push.lua
--
2.21.0
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [tarantool-patches] [PATCH v3 1/5] Abort vinyl index creation in case of truncation rollback
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 1/5] Abort vinyl index creation in case of truncation rollback Georgy Kirichenko
@ 2019-03-27 9:59 ` Vladimir Davydov
0 siblings, 0 replies; 14+ messages in thread
From: Vladimir Davydov @ 2019-03-27 9:59 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Fri, Mar 22, 2019 at 03:06:06PM +0300, Georgy Kirichenko wrote:
> Abort a new index creation if truncate couldn't be finished because of
> rollback or an error. Without this vinyl fails because of internal
> scheduler assertion.
>
> Needed for: 2798
> ---
> src/box/alter.cc | 13 +++++++--
> test/engine/errinj.result | 53 +++++++++++++++++++++++++++++++++++++
> test/engine/errinj.test.lua | 15 +++++++++++
> test/engine/suite.ini | 1 +
> 4 files changed, 80 insertions(+), 2 deletions(-)
> create mode 100644 test/engine/errinj.result
> create mode 100644 test/engine/errinj.test.lua
>
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index 080a72b9f..daaa9cd57 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -1310,13 +1310,16 @@ public:
> : AlterSpaceOp(alter), iid(iid) {}
> /** id of the index to truncate. */
> uint32_t iid;
> + struct index *new_index;
new_index must be initialized with NULL in the constructor.
Other than this minor thing, the patch is fine by me. Kostja has already
pushed it to master, 2.1, and 1.10.
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] [PATCH v3 2/5] Synchronize lua schema update with space cache
2019-03-22 12:06 [tarantool-patches] [PATCH v3 0/5] Transaction in replication protocol Georgy Kirichenko
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 1/5] Abort vinyl index creation in case of truncation rollback Georgy Kirichenko
@ 2019-03-22 12:06 ` Georgy Kirichenko
2019-03-27 10:03 ` Vladimir Davydov
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 3/5] Require for single statement not autocommit in case of ddl Georgy Kirichenko
` (2 subsequent siblings)
4 siblings, 1 reply; 14+ messages in thread
From: Georgy Kirichenko @ 2019-03-22 12:06 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Update lua schema as soon as space cache replace was done instead of
doing this while on_commit trigger executes. In opposite then case
schema changes would not be visible until commit was finished.
Needed for: #2798
---
src/box/alter.cc | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)
diff --git a/src/box/alter.cc b/src/box/alter.cc
index daaa9cd57..275e39dd5 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -750,8 +750,6 @@ alter_space_commit(struct trigger *trigger, void *event)
op->commit(alter, txn->signature);
}
- trigger_run_xc(&on_alter_space, alter->new_space);
-
alter->new_space = NULL; /* for alter_space_delete(). */
/*
* Delete the old version of the space, we are not
@@ -787,6 +785,8 @@ alter_space_rollback(struct trigger *trigger, void * /* event */)
space_swap_triggers(alter->new_space, alter->old_space);
space_swap_fk_constraints(alter->new_space, alter->old_space);
space_cache_replace(alter->new_space, alter->old_space);
+ trigger_run(&on_alter_space, alter->old_space);
+
alter_space_delete(alter);
}
@@ -888,6 +888,7 @@ alter_space_do(struct txn *txn, struct alter_space *alter)
* cache with it.
*/
space_cache_replace(alter->old_space, alter->new_space);
+ trigger_run_xc(&on_alter_space, alter->new_space);
/*
* Install transaction commit/rollback triggers to either
@@ -1388,7 +1389,6 @@ on_drop_space_commit(struct trigger *trigger, void *event)
{
(void) event;
struct space *space = (struct space *)trigger->data;
- trigger_run_xc(&on_alter_space, space);
space_delete(space);
}
@@ -1403,6 +1403,7 @@ on_drop_space_rollback(struct trigger *trigger, void *event)
(void) event;
struct space *space = (struct space *)trigger->data;
space_cache_replace(NULL, space);
+ trigger_run(&on_alter_space, space);
}
/**
@@ -1412,8 +1413,7 @@ static void
on_create_space_commit(struct trigger *trigger, void *event)
{
(void) event;
- struct space *space = (struct space *)trigger->data;
- trigger_run_xc(&on_alter_space, space);
+ (void) trigger;
}
/**
@@ -1429,6 +1429,7 @@ on_create_space_rollback(struct trigger *trigger, void *event)
(void) event;
struct space *space = (struct space *)trigger->data;
space_cache_replace(space, NULL);
+ trigger_run(&on_alter_space, space);
space_delete(space);
}
@@ -1672,6 +1673,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
* execution on a replica.
*/
space_cache_replace(NULL, space);
+ trigger_run_xc(&on_alter_space, space);
/*
* Do not forget to update schema_version right after
* inserting the space to the space_cache, since no
@@ -1764,6 +1766,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
* execution on a replica.
*/
space_cache_replace(old_space, NULL);
+ trigger_run_xc(&on_alter_space, old_space);
/*
* Do not forget to update schema_version right after
* deleting the space from the space_cache, since no
--
2.21.0
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [tarantool-patches] [PATCH v3 2/5] Synchronize lua schema update with space cache
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 2/5] Synchronize lua schema update with space cache Georgy Kirichenko
@ 2019-03-27 10:03 ` Vladimir Davydov
2019-03-27 10:32 ` Vladimir Davydov
0 siblings, 1 reply; 14+ messages in thread
From: Vladimir Davydov @ 2019-03-27 10:03 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Fri, Mar 22, 2019 at 03:06:07PM +0300, Georgy Kirichenko wrote:
> Update lua schema as soon as space cache replace was done instead of
> doing this while on_commit trigger executes. In opposite then case
> schema changes would not be visible until commit was finished.
>
> Needed for: #2798
> ---
> src/box/alter.cc | 13 ++++++++-----
> 1 file changed, 8 insertions(+), 5 deletions(-)
>
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index daaa9cd57..275e39dd5 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -750,8 +750,6 @@ alter_space_commit(struct trigger *trigger, void *event)
> op->commit(alter, txn->signature);
> }
>
> - trigger_run_xc(&on_alter_space, alter->new_space);
> -
> alter->new_space = NULL; /* for alter_space_delete(). */
> /*
> * Delete the old version of the space, we are not
> @@ -787,6 +785,8 @@ alter_space_rollback(struct trigger *trigger, void * /* event */)
> space_swap_triggers(alter->new_space, alter->old_space);
> space_swap_fk_constraints(alter->new_space, alter->old_space);
> space_cache_replace(alter->new_space, alter->old_space);
> + trigger_run(&on_alter_space, alter->old_space);
> +
> alter_space_delete(alter);
> }
>
> @@ -888,6 +888,7 @@ alter_space_do(struct txn *txn, struct alter_space *alter)
> * cache with it.
> */
> space_cache_replace(alter->old_space, alter->new_space);
> + trigger_run_xc(&on_alter_space, alter->new_space);
The comment right above says that this function must not throw an
exception here so I guess we should use _xc free variant. Not that it
really matters, as on_alter_space trigger can only fail on OOM, which
is hardly ever possible.
>
> /*
> * Install transaction commit/rollback triggers to either
> @@ -1388,7 +1389,6 @@ on_drop_space_commit(struct trigger *trigger, void *event)
> {
> (void) event;
> struct space *space = (struct space *)trigger->data;
> - trigger_run_xc(&on_alter_space, space);
> space_delete(space);
> }
>
> @@ -1403,6 +1403,7 @@ on_drop_space_rollback(struct trigger *trigger, void *event)
> (void) event;
> struct space *space = (struct space *)trigger->data;
> space_cache_replace(NULL, space);
> + trigger_run(&on_alter_space, space);
> }
>
> /**
> @@ -1412,8 +1413,7 @@ static void
> on_create_space_commit(struct trigger *trigger, void *event)
> {
> (void) event;
> - struct space *space = (struct space *)trigger->data;
> - trigger_run_xc(&on_alter_space, space);
> + (void) trigger;
> }
The function is not used now. I guess it's okay to leave it, just in
case we want to extend it in the future.
>
> /**
> @@ -1429,6 +1429,7 @@ on_create_space_rollback(struct trigger *trigger, void *event)
> (void) event;
> struct space *space = (struct space *)trigger->data;
> space_cache_replace(space, NULL);
> + trigger_run(&on_alter_space, space);
> space_delete(space);
> }
>
> @@ -1672,6 +1673,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
> * execution on a replica.
> */
> space_cache_replace(NULL, space);
> + trigger_run_xc(&on_alter_space, space);
Agagin, we must no throw exceptions here.
> /*
> * Do not forget to update schema_version right after
> * inserting the space to the space_cache, since no
> @@ -1764,6 +1766,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
> * execution on a replica.
> */
> space_cache_replace(old_space, NULL);
> + trigger_run_xc(&on_alter_space, old_space);
> /*
> * Do not forget to update schema_version right after
> * deleting the space from the space_cache, since no
I think we should've fixed on_alter_sequence trigger in the scope of
this issue, too.
The patch has been pushed to master, 2.1, and 1.10 by Kostja.
I guess we can address the minor hitches I pointed out above in
a separate 'review-fix' patch.
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [tarantool-patches] [PATCH v3 2/5] Synchronize lua schema update with space cache
2019-03-27 10:03 ` Vladimir Davydov
@ 2019-03-27 10:32 ` Vladimir Davydov
2019-03-27 11:45 ` [tarantool-patches] " Konstantin Osipov
0 siblings, 1 reply; 14+ messages in thread
From: Vladimir Davydov @ 2019-03-27 10:32 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Wed, Mar 27, 2019 at 01:03:51PM +0300, Vladimir Davydov wrote:
> On Fri, Mar 22, 2019 at 03:06:07PM +0300, Georgy Kirichenko wrote:
> > Update lua schema as soon as space cache replace was done instead of
> > doing this while on_commit trigger executes. In opposite then case
> > schema changes would not be visible until commit was finished.
> >
> > Needed for: #2798
> > ---
> > src/box/alter.cc | 13 ++++++++-----
> > 1 file changed, 8 insertions(+), 5 deletions(-)
> >
> > diff --git a/src/box/alter.cc b/src/box/alter.cc
> > index daaa9cd57..275e39dd5 100644
> > --- a/src/box/alter.cc
> > +++ b/src/box/alter.cc
> > @@ -750,8 +750,6 @@ alter_space_commit(struct trigger *trigger, void *event)
> > op->commit(alter, txn->signature);
> > }
> >
> > - trigger_run_xc(&on_alter_space, alter->new_space);
> > -
> > alter->new_space = NULL; /* for alter_space_delete(). */
> > /*
> > * Delete the old version of the space, we are not
> > @@ -787,6 +785,8 @@ alter_space_rollback(struct trigger *trigger, void * /* event */)
> > space_swap_triggers(alter->new_space, alter->old_space);
> > space_swap_fk_constraints(alter->new_space, alter->old_space);
> > space_cache_replace(alter->new_space, alter->old_space);
> > + trigger_run(&on_alter_space, alter->old_space);
> > +
> > alter_space_delete(alter);
> > }
> >
> > @@ -888,6 +888,7 @@ alter_space_do(struct txn *txn, struct alter_space *alter)
> > * cache with it.
> > */
> > space_cache_replace(alter->old_space, alter->new_space);
> > + trigger_run_xc(&on_alter_space, alter->new_space);
>
> The comment right above says that this function must not throw an
> exception here so I guess we should use _xc free variant. Not that it
> really matters, as on_alter_space trigger can only fail on OOM, which
> is hardly ever possible.
Guess we should simply fold trigger_run_xc in space_cache_replace and
panic on error.
BTW this is how things were before
commit 1f736583361f537c8e1fb4ea0df0b92ee8d50c6d
Author: Vladimir Davydov <vdavydov.dev@gmail.com>
Date: Tue Sep 19 14:21:59 2017 +0300
box: use trigger to push space data to Lua
Currently, it is done by space_cache_replace/delete which violates
incapsulation. Let's introduce a trigger that is fired after a change in
a space definition is committed and use it to propagate changes to Lua.
Patch by @kostja.
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH v3 2/5] Synchronize lua schema update with space cache
2019-03-27 10:32 ` Vladimir Davydov
@ 2019-03-27 11:45 ` Konstantin Osipov
0 siblings, 0 replies; 14+ messages in thread
From: Konstantin Osipov @ 2019-03-27 11:45 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/03/27 14:22]:
> commit 1f736583361f537c8e1fb4ea0df0b92ee8d50c6d
> Author: Vladimir Davydov <vdavydov.dev@gmail.com>
> Date: Tue Sep 19 14:21:59 2017 +0300
>
> box: use trigger to push space data to Lua
>
> Currently, it is done by space_cache_replace/delete which violates
> incapsulation. Let's introduce a trigger that is fired after a change in
> a space definition is committed and use it to propagate changes to Lua.
> Patch by @kostja.
Let's call triggers, but call them directly from
space_cache_replace(). Please submit a follow up patch. We
discussed this with Georgy, but decided it's a bit outside of the
scope of the current patch.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] [PATCH v3 3/5] Require for single statement not autocommit in case of ddl
2019-03-22 12:06 [tarantool-patches] [PATCH v3 0/5] Transaction in replication protocol Georgy Kirichenko
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 1/5] Abort vinyl index creation in case of truncation rollback Georgy Kirichenko
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 2/5] Synchronize lua schema update with space cache Georgy Kirichenko
@ 2019-03-22 12:06 ` Georgy Kirichenko
2019-03-27 10:49 ` Vladimir Davydov
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 4/5] Transaction support for applier Georgy Kirichenko
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 5/5] Raise an error if remote transaction produces non-local changes Georgy Kirichenko
4 siblings, 1 reply; 14+ messages in thread
From: Georgy Kirichenko @ 2019-03-22 12:06 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Allow single statement transactions within begin/commit in case of an
ddl operation instead of auto commit requirements. This is essential
for a transactional applier.
Needed for: #2798
---
src/box/memtx_engine.c | 20 ++------
src/box/txn.c | 9 +++-
src/box/txn.h | 5 ++
test/box/ddl.result | 8 +--
test/box/ddl.test.lua | 4 +-
test/box/transaction.result | 53 +++++---------------
test/box/transaction.test.lua | 27 ++++------
test/engine/ddl.result | 92 ++++++++++++++++++++++++++++++++++
test/engine/ddl.test.lua | 55 ++++++++++++++++++++
test/engine/truncate.result | 3 +-
test/sql-tap/trigger1.test.lua | 12 ++---
test/sql/delete.result | 8 ++-
test/sql/delete.test.lua | 3 +-
13 files changed, 204 insertions(+), 95 deletions(-)
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index d468d1cd8..924f8bbc4 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -384,15 +384,7 @@ static int
memtx_engine_begin(struct engine *engine, struct txn *txn)
{
(void)engine;
- /*
- * Register a trigger to rollback transaction on yield.
- * This must be done in begin(), since it's
- * the first thing txn invokes after txn->n_stmts++,
- * to match with trigger_clear() in rollbackStatement().
- */
- if (txn->is_autocommit == false) {
- memtx_init_txn(txn);
- }
+ (void)txn;
return 0;
}
@@ -404,15 +396,9 @@ memtx_engine_begin_statement(struct engine *engine, struct txn *txn)
if (txn->engine_tx == NULL) {
struct space *space = txn_last_stmt(txn)->space;
- if (space->def->id > BOX_SYSTEM_ID_MAX &&
- ! rlist_empty(&space->on_replace)) {
- /**
- * A space on_replace trigger may initiate
- * a yield.
- */
- assert(txn->is_autocommit);
+ if (space->def->id > BOX_SYSTEM_ID_MAX)
+ /* Setup triggers for non-ddl transactions. */
memtx_init_txn(txn);
- }
}
return 0;
}
diff --git a/src/box/txn.c b/src/box/txn.c
index deb4fac47..31e19951f 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -147,6 +147,7 @@ txn_begin(bool is_autocommit)
txn->n_local_rows = 0;
txn->n_remote_rows = 0;
txn->is_autocommit = is_autocommit;
+ txn->is_single_statement = false;
txn->has_triggers = false;
txn->is_aborted = false;
txn->in_sub_stmt = 0;
@@ -191,6 +192,11 @@ txn_begin_stmt(struct space *space)
diag_set(ClientError, ER_SUB_STMT_MAX);
return NULL;
}
+ if (txn->is_single_statement && !stailq_empty(&txn->stmts)) {
+ diag_set(ClientError, ER_UNSUPPORTED,
+ "DDL", "multi-statement transactions");
+ return NULL;
+ }
struct txn_stmt *stmt = txn_stmt_new(txn);
if (stmt == NULL) {
@@ -430,11 +436,12 @@ txn_abort(struct txn *txn)
int
txn_check_singlestatement(struct txn *txn, const char *where)
{
- if (!txn->is_autocommit || !txn_is_first_statement(txn)) {
+ if (!txn_is_first_statement(txn)) {
diag_set(ClientError, ER_UNSUPPORTED,
where, "multi-statement transactions");
return -1;
}
+ txn->is_single_statement = true;
return 0;
}
diff --git a/src/box/txn.h b/src/box/txn.h
index c9829da9e..3572b005d 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -149,6 +149,11 @@ struct txn {
* (statement end causes an automatic transaction commit).
*/
bool is_autocommit;
+ /**
+ * True if this transaction is allowed to have only one statement.
+ * Used for ddl operations.
+ */
+ bool is_single_statement;
/**
* True if the transaction was aborted so should be
* rolled back at commit.
diff --git a/test/box/ddl.result b/test/box/ddl.result
index 3d6d07f43..e48284ffd 100644
--- a/test/box/ddl.result
+++ b/test/box/ddl.result
@@ -299,11 +299,7 @@ box.space._collation:replace(c)
---
- error: collation does not support alter
...
-box.begin() box.internal.collation.create('test2', 'ICU', 'ru_RU')
----
-- error: Space _collation does not support multi-statement transactions
-...
-box.rollback()
+box.begin() box.internal.collation.create('test2', 'ICU', 'ru_RU') box.rollback()
---
...
box.internal.collation.create('test', 'ICU', 'ru_RU')
@@ -645,11 +641,11 @@ _ = fiber.create(function()
end);
---
...
+-- Should be Ok for now
box.begin()
test_latch:create_index("sec2", {unique = true, parts = {2, 'unsigned'}})
box.commit();
---
-- error: DDL does not support multi-statement transactions
...
test_run:cmd("setopt delimiter ''");
---
diff --git a/test/box/ddl.test.lua b/test/box/ddl.test.lua
index 5c147cdfb..101bc6f9b 100644
--- a/test/box/ddl.test.lua
+++ b/test/box/ddl.test.lua
@@ -131,8 +131,7 @@ c = box.space._collation:get{1}:totable()
c[2] = 'unicode_test'
box.space._collation:replace(c)
-box.begin() box.internal.collation.create('test2', 'ICU', 'ru_RU')
-box.rollback()
+box.begin() box.internal.collation.create('test2', 'ICU', 'ru_RU') box.rollback()
box.internal.collation.create('test', 'ICU', 'ru_RU')
box.internal.collation.exists('test')
@@ -260,6 +259,7 @@ _ = fiber.create(function()
c:put(true)
end);
+-- Should be Ok for now
box.begin()
test_latch:create_index("sec2", {unique = true, parts = {2, 'unsigned'}})
box.commit();
diff --git a/test/box/transaction.result b/test/box/transaction.result
index 8a4d11d3b..7def44d5d 100644
--- a/test/box/transaction.result
+++ b/test/box/transaction.result
@@ -84,77 +84,50 @@ while f:status() ~= 'dead' do fiber.sleep(0) end;
---
...
-- transactions and system spaces
+-- some operation involves more than one ddl spaces, so they should fail
box.begin() box.schema.space.create('test');
---
-- error: Space _schema does not support multi-statement transactions
-...
-box.rollback();
----
-...
-box.begin() box.schema.func.create('test');
----
-- error: Space _func does not support multi-statement transactions
+- error: DDL does not support multi-statement transactions
...
box.rollback();
---
...
box.begin() box.schema.user.create('test');
---
-- error: Space _user does not support multi-statement transactions
-...
-box.rollback();
----
-...
-box.begin() box.schema.user.grant('guest', 'read', 'space', '_priv');
----
-- error: Space _priv does not support multi-statement transactions
-...
-box.rollback();
----
-...
-box.begin() box.space._user:delete{box.schema.GUEST_ID};
----
-- error: Space _user does not support multi-statement transactions
+- error: DDL does not support multi-statement transactions
...
box.rollback();
---
...
-box.begin() box.space._space:delete{box.schema.CLUSTER_ID};
+-- but this is Ok now
+box.begin() box.schema.func.create('test') box.rollback();
---
-- error: DDL does not support multi-statement transactions
...
-box.rollback();
+box.begin() box.schema.user.grant('guest', 'read', 'space', '_priv') box.rollback();
---
...
-box.begin() box.space._sequence:insert{1, 1, 'test', 1, 1, 2, 1, 0, false};
+space = box.schema.space.create('test');
---
-- error: Space _sequence does not support multi-statement transactions
...
-box.rollback();
+box.begin() box.space._space:delete{space.id} box.rollback();
---
...
-box.begin() box.space._schema:insert{'test'};
+box.begin() box.space._space:delete{space.id} box.commit();
---
-- error: Space _schema does not support multi-statement transactions
...
-box.rollback();
+box.begin() box.space._sequence:insert{1, 1, 'test', 1, 1, 2, 1, 0, false} box.rollback();
---
...
-box.begin() box.space._cluster:insert{123456789, 'abc'};
+box.begin() box.space._schema:insert{'test'} box.rollback();
---
-- error: Space _cluster does not support multi-statement transactions
...
-box.rollback();
+box.begin() box.space._cluster:insert{30, '00000000-0000-0000-0000-000000000001'} box.rollback();
---
...
s = box.schema.space.create('test');
---
...
-box.begin() index = s:create_index('primary');
----
-- error: DDL does not support multi-statement transactions
-...
-box.rollback();
+box.begin() index = s:create_index('primary') box.rollback();
---
...
index = s:create_index('primary');
diff --git a/test/box/transaction.test.lua b/test/box/transaction.test.lua
index 8f7dfedec..0d212ca29 100644
--- a/test/box/transaction.test.lua
+++ b/test/box/transaction.test.lua
@@ -41,27 +41,22 @@ f = fiber.create(sloppy);
-- ensure it's rolled back automatically
while f:status() ~= 'dead' do fiber.sleep(0) end;
-- transactions and system spaces
+-- some operation involves more than one ddl spaces, so they should fail
box.begin() box.schema.space.create('test');
box.rollback();
-box.begin() box.schema.func.create('test');
-box.rollback();
box.begin() box.schema.user.create('test');
box.rollback();
-box.begin() box.schema.user.grant('guest', 'read', 'space', '_priv');
-box.rollback();
-box.begin() box.space._user:delete{box.schema.GUEST_ID};
-box.rollback();
-box.begin() box.space._space:delete{box.schema.CLUSTER_ID};
-box.rollback();
-box.begin() box.space._sequence:insert{1, 1, 'test', 1, 1, 2, 1, 0, false};
-box.rollback();
-box.begin() box.space._schema:insert{'test'};
-box.rollback();
-box.begin() box.space._cluster:insert{123456789, 'abc'};
-box.rollback();
+-- but this is Ok now
+box.begin() box.schema.func.create('test') box.rollback();
+box.begin() box.schema.user.grant('guest', 'read', 'space', '_priv') box.rollback();
+space = box.schema.space.create('test');
+box.begin() box.space._space:delete{space.id} box.rollback();
+box.begin() box.space._space:delete{space.id} box.commit();
+box.begin() box.space._sequence:insert{1, 1, 'test', 1, 1, 2, 1, 0, false} box.rollback();
+box.begin() box.space._schema:insert{'test'} box.rollback();
+box.begin() box.space._cluster:insert{30, '00000000-0000-0000-0000-000000000001'} box.rollback();
s = box.schema.space.create('test');
-box.begin() index = s:create_index('primary');
-box.rollback();
+box.begin() index = s:create_index('primary') box.rollback();
index = s:create_index('primary');
t = nil
function multi()
diff --git a/test/engine/ddl.result b/test/engine/ddl.result
index 8d34d5ef4..c493bd4ac 100644
--- a/test/engine/ddl.result
+++ b/test/engine/ddl.result
@@ -2075,6 +2075,15 @@ i3:select()
...
-- Check that recovery works.
inspector:cmd("restart server default")
+test_run = require('test_run')
+---
+...
+inspector = test_run.new()
+---
+...
+engine = inspector:get_cfg('engine')
+---
+...
s = box.space.test
---
...
@@ -2130,3 +2139,86 @@ box.snapshot()
s:drop()
---
...
+-- test ddl operation within begin/commit/rollback
+-- acquire free space id
+space = box.schema.space.create('ddl_test', {engine = engine})
+---
+...
+id = space.id
+---
+...
+space:drop()
+---
+...
+inspector:cmd("setopt delimiter ';'")
+---
+- true
+...
+box.begin()
+s = box.schema.space.create('ddl_test', {engine = engine, id = id})
+box.rollback();
+---
+...
+box.begin()
+s = box.schema.space.create('ddl_test', {engine = engine, id = id})
+box.commit();
+---
+...
+box.begin()
+s:create_index('pk')
+box.rollback();
+---
+...
+box.begin()
+s:create_index('pk')
+box.commit();
+---
+...
+s:replace({1});
+---
+- [1]
+...
+s:replace({2});
+---
+- [2]
+...
+s:replace({3});
+---
+- [3]
+...
+box.begin()
+s:truncate()
+box.commit();
+---
+...
+s:select();
+---
+- []
+...
+box.begin()
+box.schema.user.grant('guest', 'write', 'space', 'ddl_test')
+box.rollback();
+---
+...
+box.begin()
+box.schema.user.grant('guest', 'write', 'space', 'ddl_test')
+box.commit();
+---
+...
+box.begin()
+box.schema.user.revoke('guest', 'write', 'space', 'ddl_test')
+box.rollback();
+---
+...
+box.begin()
+box.schema.user.revoke('guest', 'write', 'space', 'ddl_test')
+box.commit();
+---
+...
+-- index and space drop are not currently supported (because of truncate)
+s.index.pk:drop();
+---
+...
+s:drop();
+---
+...
diff --git a/test/engine/ddl.test.lua b/test/engine/ddl.test.lua
index cdaf7a5bf..636f6c3b9 100644
--- a/test/engine/ddl.test.lua
+++ b/test/engine/ddl.test.lua
@@ -759,6 +759,9 @@ i3:select()
-- Check that recovery works.
inspector:cmd("restart server default")
+test_run = require('test_run')
+inspector = test_run.new()
+engine = inspector:get_cfg('engine')
s = box.space.test
s.index.i1:select()
@@ -775,3 +778,55 @@ s.index.i1:select()
box.snapshot()
s:drop()
+
+-- test ddl operation within begin/commit/rollback
+-- acquire free space id
+space = box.schema.space.create('ddl_test', {engine = engine})
+id = space.id
+space:drop()
+
+inspector:cmd("setopt delimiter ';'")
+box.begin()
+s = box.schema.space.create('ddl_test', {engine = engine, id = id})
+box.rollback();
+
+box.begin()
+s = box.schema.space.create('ddl_test', {engine = engine, id = id})
+box.commit();
+
+box.begin()
+s:create_index('pk')
+box.rollback();
+
+box.begin()
+s:create_index('pk')
+box.commit();
+
+s:replace({1});
+s:replace({2});
+s:replace({3});
+
+box.begin()
+s:truncate()
+box.commit();
+s:select();
+
+box.begin()
+box.schema.user.grant('guest', 'write', 'space', 'ddl_test')
+box.rollback();
+
+box.begin()
+box.schema.user.grant('guest', 'write', 'space', 'ddl_test')
+box.commit();
+
+box.begin()
+box.schema.user.revoke('guest', 'write', 'space', 'ddl_test')
+box.rollback();
+
+box.begin()
+box.schema.user.revoke('guest', 'write', 'space', 'ddl_test')
+box.commit();
+
+-- index and space drop are not currently supported (because of truncate)
+s.index.pk:drop();
+s:drop();
diff --git a/test/engine/truncate.result b/test/engine/truncate.result
index b4de787fb..277e7bda6 100644
--- a/test/engine/truncate.result
+++ b/test/engine/truncate.result
@@ -24,14 +24,13 @@ box.begin()
...
s:truncate()
---
-- error: DDL does not support multi-statement transactions
...
box.commit()
---
...
s:select()
---
-- - [123]
+- []
...
s:drop()
---
diff --git a/test/sql-tap/trigger1.test.lua b/test/sql-tap/trigger1.test.lua
index 2984d4c21..924e57b58 100755
--- a/test/sql-tap/trigger1.test.lua
+++ b/test/sql-tap/trigger1.test.lua
@@ -849,28 +849,24 @@ test:do_catchsql_test(
-- </trigger1-16.7>
})
-test:do_catchsql_test(
+test:do_execsql_test(
"trigger1-16.8",
[[
START TRANSACTION;
CREATE TRIGGER tr168 INSERT ON tA BEGIN
INSERT INTO t16 values(1);
END;
+ ROLLBACK;
]], {
- 1, [[Space _trigger does not support multi-statement transactions]]
})
-test:execsql [[
- ROLLBACK;
-]]
-
-test:do_catchsql_test(
+test:do_execsql_test(
"trigger1-16.9",
[[
START TRANSACTION;
DROP TRIGGER t16err3;
+ ROLLBACK;
]], {
- 1, [[Space _trigger does not support multi-statement transactions]]
})
-- MUST_WORK_TEST
-- #-------------------------------------------------------------------------
diff --git a/test/sql/delete.result b/test/sql/delete.result
index e024dd697..dcefa8d5f 100644
--- a/test/sql/delete.result
+++ b/test/sql/delete.result
@@ -76,17 +76,21 @@ box.sql.execute("INSERT INTO t1 VALUES(1, 1, 'one');")
box.sql.execute("INSERT INTO t1 VALUES(2, 2, 'two');")
---
...
--- Can't truncate in transaction.
+-- Truncate rollback
box.sql.execute("START TRANSACTION")
---
...
box.sql.execute("TRUNCATE TABLE t1;")
---
-- error: DDL does not support multi-statement transactions
...
box.sql.execute("ROLLBACK")
---
...
+box.sql.execute("SELECT * FROM t1")
+---
+- - [1, 1, 'one']
+ - [2, 2, 'two']
+...
-- Can't truncate view.
box.sql.execute("CREATE VIEW v1 AS SELECT * FROM t1;")
---
diff --git a/test/sql/delete.test.lua b/test/sql/delete.test.lua
index 5a0813071..b61a993a8 100644
--- a/test/sql/delete.test.lua
+++ b/test/sql/delete.test.lua
@@ -50,10 +50,11 @@ box.sql.execute("CREATE TABLE t1(id INT PRIMARY KEY, a INT, b TEXT);")
box.sql.execute("INSERT INTO t1 VALUES(1, 1, 'one');")
box.sql.execute("INSERT INTO t1 VALUES(2, 2, 'two');")
--- Can't truncate in transaction.
+-- Truncate rollback
box.sql.execute("START TRANSACTION")
box.sql.execute("TRUNCATE TABLE t1;")
box.sql.execute("ROLLBACK")
+box.sql.execute("SELECT * FROM t1")
-- Can't truncate view.
box.sql.execute("CREATE VIEW v1 AS SELECT * FROM t1;")
--
2.21.0
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [tarantool-patches] [PATCH v3 3/5] Require for single statement not autocommit in case of ddl
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 3/5] Require for single statement not autocommit in case of ddl Georgy Kirichenko
@ 2019-03-27 10:49 ` Vladimir Davydov
0 siblings, 0 replies; 14+ messages in thread
From: Vladimir Davydov @ 2019-03-27 10:49 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Fri, Mar 22, 2019 at 03:06:08PM +0300, Georgy Kirichenko wrote:
> Allow single statement transactions within begin/commit in case of an
> ddl operation instead of auto commit requirements. This is essential
> for a transactional applier.
>
> Needed for: #2798
> ---
> src/box/memtx_engine.c | 20 ++------
> src/box/txn.c | 9 +++-
> src/box/txn.h | 5 ++
> test/box/ddl.result | 8 +--
> test/box/ddl.test.lua | 4 +-
> test/box/transaction.result | 53 +++++---------------
> test/box/transaction.test.lua | 27 ++++------
> test/engine/ddl.result | 92 ++++++++++++++++++++++++++++++++++
> test/engine/ddl.test.lua | 55 ++++++++++++++++++++
> test/engine/truncate.result | 3 +-
> test/sql-tap/trigger1.test.lua | 12 ++---
> test/sql/delete.result | 8 ++-
> test/sql/delete.test.lua | 3 +-
> 13 files changed, 204 insertions(+), 95 deletions(-)
>
> diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
> index d468d1cd8..924f8bbc4 100644
> --- a/src/box/memtx_engine.c
> +++ b/src/box/memtx_engine.c
> @@ -384,15 +384,7 @@ static int
> memtx_engine_begin(struct engine *engine, struct txn *txn)
> {
> (void)engine;
> - /*
> - * Register a trigger to rollback transaction on yield.
> - * This must be done in begin(), since it's
> - * the first thing txn invokes after txn->n_stmts++,
> - * to match with trigger_clear() in rollbackStatement().
> - */
> - if (txn->is_autocommit == false) {
> - memtx_init_txn(txn);
> - }
> + (void)txn;
> return 0;
> }
>
> @@ -404,15 +396,9 @@ memtx_engine_begin_statement(struct engine *engine, struct txn *txn)
> if (txn->engine_tx == NULL) {
> struct space *space = txn_last_stmt(txn)->space;
>
> - if (space->def->id > BOX_SYSTEM_ID_MAX &&
> - ! rlist_empty(&space->on_replace)) {
> - /**
> - * A space on_replace trigger may initiate
> - * a yield.
> - */
> - assert(txn->is_autocommit);
> + if (space->def->id > BOX_SYSTEM_ID_MAX)
> + /* Setup triggers for non-ddl transactions. */
> memtx_init_txn(txn);
> - }
> }
> return 0;
> }
> diff --git a/src/box/txn.c b/src/box/txn.c
> index deb4fac47..31e19951f 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -147,6 +147,7 @@ txn_begin(bool is_autocommit)
> txn->n_local_rows = 0;
> txn->n_remote_rows = 0;
> txn->is_autocommit = is_autocommit;
> + txn->is_single_statement = false;
> txn->has_triggers = false;
> txn->is_aborted = false;
> txn->in_sub_stmt = 0;
> @@ -191,6 +192,11 @@ txn_begin_stmt(struct space *space)
> diag_set(ClientError, ER_SUB_STMT_MAX);
> return NULL;
> }
> + if (txn->is_single_statement && !stailq_empty(&txn->stmts)) {
> + diag_set(ClientError, ER_UNSUPPORTED,
> + "DDL", "multi-statement transactions");
> + return NULL;
> + }
>
> struct txn_stmt *stmt = txn_stmt_new(txn);
> if (stmt == NULL) {
> @@ -430,11 +436,12 @@ txn_abort(struct txn *txn)
> int
> txn_check_singlestatement(struct txn *txn, const char *where)
> {
> - if (!txn->is_autocommit || !txn_is_first_statement(txn)) {
> + if (!txn_is_first_statement(txn)) {
> diag_set(ClientError, ER_UNSUPPORTED,
> where, "multi-statement transactions");
> return -1;
> }
> + txn->is_single_statement = true;
> return 0;
> }
Judging by the function name and protocol, txn_check_singlestatement
isn't supposed to update txn fields. Relying on the fact that it is
called by any DDL operation (and only by DDL) is cumbersome IMO.
May be, better mark a transaction as DDL in begin_statement callback and
fail subsequent statements if the transaction is already marked?
> diff --git a/test/engine/ddl.test.lua b/test/engine/ddl.test.lua
> index cdaf7a5bf..636f6c3b9 100644
> --- a/test/engine/ddl.test.lua
> +++ b/test/engine/ddl.test.lua
> @@ -759,6 +759,9 @@ i3:select()
>
> -- Check that recovery works.
> inspector:cmd("restart server default")
> +test_run = require('test_run')
> +inspector = test_run.new()
> +engine = inspector:get_cfg('engine')
These two should remain set after restart. No need to reset them.
>
> s = box.space.test
> s.index.i1:select()
> @@ -775,3 +778,55 @@ s.index.i1:select()
> box.snapshot()
>
> s:drop()
> +
> +-- test ddl operation within begin/commit/rollback
> +-- acquire free space id
> +space = box.schema.space.create('ddl_test', {engine = engine})
> +id = space.id
> +space:drop()
> +
> +inspector:cmd("setopt delimiter ';'")
> +box.begin()
> +s = box.schema.space.create('ddl_test', {engine = engine, id = id})
> +box.rollback();
> +
> +box.begin()
> +s = box.schema.space.create('ddl_test', {engine = engine, id = id})
> +box.commit();
> +
> +box.begin()
> +s:create_index('pk')
> +box.rollback();
> +
> +box.begin()
> +s:create_index('pk')
> +box.commit();
> +
> +s:replace({1});
> +s:replace({2});
> +s:replace({3});
> +
> +box.begin()
> +s:truncate()
> +box.commit();
> +s:select();
> +
> +box.begin()
> +box.schema.user.grant('guest', 'write', 'space', 'ddl_test')
> +box.rollback();
> +
> +box.begin()
> +box.schema.user.grant('guest', 'write', 'space', 'ddl_test')
> +box.commit();
> +
> +box.begin()
> +box.schema.user.revoke('guest', 'write', 'space', 'ddl_test')
> +box.rollback();
> +
> +box.begin()
> +box.schema.user.revoke('guest', 'write', 'space', 'ddl_test')
> +box.commit();
Shouldn't we also check that mixing DDL with DML and combining several
DDL operations in one transaction still fail?
> +
> +-- index and space drop are not currently supported (because of truncate)
What do you mean by 'truncate'? Dropping indexes and grants? Please fix
the comment.
> +s.index.pk:drop();
> +s:drop();
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] [PATCH v3 4/5] Transaction support for applier
2019-03-22 12:06 [tarantool-patches] [PATCH v3 0/5] Transaction in replication protocol Georgy Kirichenko
` (2 preceding siblings ...)
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 3/5] Require for single statement not autocommit in case of ddl Georgy Kirichenko
@ 2019-03-22 12:06 ` Georgy Kirichenko
2019-03-27 11:41 ` Vladimir Davydov
2019-03-27 11:48 ` Vladimir Davydov
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 5/5] Raise an error if remote transaction produces non-local changes Georgy Kirichenko
4 siblings, 2 replies; 14+ messages in thread
From: Georgy Kirichenko @ 2019-03-22 12:06 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Applier fetch incoming rows to form a transaction and then apply it.
Rows are fetched and stored on fiber gc region until last transaction row
with is_commit was fetched. After fetch a multi row transaction is going to be
applied into txn_begin/txn_commit/txn_rolback boundaries. At this time
we could not apply single row transaction in such boundaries because of
ddl which does not support non auto commit transactions.
Closes: #2798
Needed for: #980
---
src/box/applier.cc | 218 +++++++++++++++++------
test/replication/transaction.result | 242 ++++++++++++++++++++++++++
test/replication/transaction.test.lua | 86 +++++++++
3 files changed, 491 insertions(+), 55 deletions(-)
create mode 100644 test/replication/transaction.result
create mode 100644 test/replication/transaction.test.lua
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 94c07aac7..08ad4a6a8 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -426,6 +426,158 @@ applier_join(struct applier *applier)
applier_set_state(applier, APPLIER_READY);
}
+/**
+ * Helper struct to bind rows in a list.
+ */
+struct applier_tx_row {
+ /* Next transaction row. */
+ struct stailq_entry next;
+ /* xrow_header struct for the current transaction row. */
+ struct xrow_header row;
+};
+
+static struct applier_tx_row *
+applier_read_tx_row(struct applier *applier)
+{
+ struct ev_io *coio = &applier->io;
+ struct ibuf *ibuf = &applier->ibuf;
+
+ struct applier_tx_row *tx_row = (struct applier_tx_row *)
+ region_alloc(&fiber()->gc, sizeof(struct applier_tx_row));
+
+ if (tx_row == NULL)
+ tnt_raise(OutOfMemory, sizeof(struct applier_tx_row),
+ "region", "struct applier_tx_row");
+
+ struct xrow_header *row = &tx_row->row;
+
+ double timeout = replication_disconnect_timeout();
+ /*
+ * Tarantool < 1.7.7 does not send periodic heartbeat
+ * messages so we can't assume that if we haven't heard
+ * from the master for quite a while the connection is
+ * broken - the master might just be idle.
+ */
+ if (applier->version_id < version_id(1, 7, 7))
+ coio_read_xrow(coio, ibuf, row);
+ else
+ coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+
+ applier->lag = ev_now(loop()) - row->tm;
+ applier->last_row_time = ev_monotonic_now(loop());
+ return tx_row;
+}
+
+/**
+ * Read one transaction from network using applier's input buffer.
+ * Transaction rows are placed onto fiber gc region.
+ * We could not use applier input buffer for that because rpos is adjusted
+ * after each xrow decoding and corresponding network input space is going
+ * to be reused.
+ */
+static void
+applier_read_tx(struct applier *applier, struct stailq *rows)
+{
+ int64_t tsn = 0;
+
+ stailq_create(rows);
+ do {
+ struct applier_tx_row *tx_row = applier_read_tx_row(applier);
+ struct xrow_header *row = &tx_row->row;
+
+ if (iproto_type_is_error(row->type))
+ xrow_decode_error_xc(row);
+
+ /* Replication request. */
+ if (row->replica_id == REPLICA_ID_NIL ||
+ row->replica_id >= VCLOCK_MAX) {
+ /*
+ * A safety net, this can only occur
+ * if we're fed a strangely broken xlog.
+ */
+ tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
+ int2str(row->replica_id),
+ tt_uuid_str(&REPLICASET_UUID));
+ }
+ if (tsn == 0) {
+ /*
+ * Transaction id must be derived from the log sequence
+ * number of the first row in the transaction.
+ */
+ tsn = row->tsn;
+ if (row->lsn != tsn)
+ tnt_raise(ClientError, ER_PROTOCOL,
+ "Transaction id must be derived from "
+ "the lsn of the first row in the "
+ "transaction.");
+ }
+ if (tsn != row->tsn)
+ tnt_raise(ClientError, ER_UNSUPPORTED,
+ "replication",
+ "interleaving transactions");
+
+ assert(row->bodycnt <= 1);
+ if (row->bodycnt == 1 && !row->is_commit) {
+ /* Save row body to gc region. */
+ void *new_base = region_alloc(&fiber()->gc,
+ row->body->iov_len);
+ if (new_base == NULL)
+ tnt_raise(OutOfMemory, row->body->iov_len,
+ "region", "xrow body");
+ memcpy(new_base, row->body->iov_base, row->body->iov_len);
+ /* Adjust row body pointers. */
+ row->body->iov_base = new_base;
+ }
+ stailq_add_tail(rows, &tx_row->next);
+
+ } while (!stailq_last_entry(rows, struct applier_tx_row,
+ next)->row.is_commit);
+}
+
+/**
+ * Apply all rows in the rows queue as a single transaction.
+ *
+ * Return 0 for success or -1 in case of an error.
+ */
+static int
+applier_apply_tx(struct stailq *rows)
+{
+ int res = 0;
+ struct txn *txn = txn_begin(false);
+ struct applier_tx_row *item;
+ if (txn == NULL)
+ diag_raise();
+ stailq_foreach_entry(item, rows, next) {
+ struct xrow_header *row = &item->row;
+ res = apply_row(row);
+ if (res != 0) {
+ struct error *e = diag_last_error(diag_get());
+ /*
+ * In case of ER_TUPLE_FOUND error and enabled
+ * replication_skip_conflict configuration
+ * option, skip applying the foreign row and
+ * replace it with NOP in the local write ahead
+ * log.
+ */
+ if (e->type == &type_ClientError &&
+ box_error_code(e) == ER_TUPLE_FOUND &&
+ replication_skip_conflict) {
+ diag_clear(diag_get());
+ row->type = IPROTO_NOP;
+ row->bodycnt = 0;
+ res = apply_row(row);
+ }
+ }
+ if (res != 0)
+ break;
+ }
+ if (res == 0)
+ res = txn_commit(txn);
+ else
+ txn_rollback();
+ return res;
+}
+
/**
* Execute and process SUBSCRIBE request (follow updates from a master).
*/
@@ -555,36 +707,14 @@ applier_subscribe(struct applier *applier)
applier_set_state(applier, APPLIER_FOLLOW);
}
- /*
- * Tarantool < 1.7.7 does not send periodic heartbeat
- * messages so we can't assume that if we haven't heard
- * from the master for quite a while the connection is
- * broken - the master might just be idle.
- */
- if (applier->version_id < version_id(1, 7, 7)) {
- coio_read_xrow(coio, ibuf, &row);
- } else {
- double timeout = replication_disconnect_timeout();
- coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
- }
-
- if (iproto_type_is_error(row.type))
- xrow_decode_error_xc(&row); /* error */
- /* Replication request. */
- if (row.replica_id == REPLICA_ID_NIL ||
- row.replica_id >= VCLOCK_MAX) {
- /*
- * A safety net, this can only occur
- * if we're fed a strangely broken xlog.
- */
- tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
- int2str(row.replica_id),
- tt_uuid_str(&REPLICASET_UUID));
- }
+ struct stailq rows;
+ applier_read_tx(applier, &rows);
- applier->lag = ev_now(loop()) - row.tm;
+ struct xrow_header *first_row =
+ &stailq_first_entry(&rows, struct applier_tx_row,
+ next)->row;
applier->last_row_time = ev_monotonic_now(loop());
- struct replica *replica = replica_by_id(row.replica_id);
+ struct replica *replica = replica_by_id(first_row->replica_id);
struct latch *latch = (replica ? &replica->order_latch :
&replicaset.applier.order_latch);
/*
@@ -594,33 +724,11 @@ applier_subscribe(struct applier *applier)
* that belong to the same server id.
*/
latch_lock(latch);
- if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
- int res = apply_row(&row);
- if (res != 0) {
- struct error *e = diag_last_error(diag_get());
- /*
- * In case of ER_TUPLE_FOUND error and enabled
- * replication_skip_conflict configuration
- * option, skip applying the foreign row and
- * replace it with NOP in the local write ahead
- * log.
- */
- if (e->type == &type_ClientError &&
- box_error_code(e) == ER_TUPLE_FOUND &&
- replication_skip_conflict) {
- diag_clear(diag_get());
- struct xrow_header nop;
- nop.type = IPROTO_NOP;
- nop.bodycnt = 0;
- nop.replica_id = row.replica_id;
- nop.lsn = row.lsn;
- res = apply_row(&nop);
- }
- }
- if (res != 0) {
- latch_unlock(latch);
- diag_raise();
- }
+ if (vclock_get(&replicaset.vclock, first_row->replica_id) <
+ first_row->lsn &&
+ applier_apply_tx(&rows) != 0) {
+ latch_unlock(latch);
+ diag_raise();
}
latch_unlock(latch);
diff --git a/test/replication/transaction.result b/test/replication/transaction.result
new file mode 100644
index 000000000..8c2ac6ee4
--- /dev/null
+++ b/test/replication/transaction.result
@@ -0,0 +1,242 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+---
+...
+_ = s:create_index('pk')
+---
+...
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+---
+- [4, 'r']
+...
+v1 = box.info.vclock
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+---
+...
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [4, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- set conflict to third transaction
+_ = box.space.test:delete({4})
+---
+...
+box.space.test:replace({6, 'r'})
+---
+- [6, 'r']
+...
+-- restart replication
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'm']
+ - [6, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- check restart does not help
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'm']
+ - [6, 'r']
+...
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}, replication_skip_conflict = true}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- check last transaction applied without conflicting row
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'm']
+ - [5, 'm']
+ - [6, 'r']
+ - [7, 'm']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+---
+- [8, 'r']
+...
+box.space.test:replace({9, 'r'})
+---
+- [9, 'r']
+...
+-- issue a conflicting tx
+test_run:cmd("switch default")
+---
+- true
+...
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- vclock should be increased but rows skipped
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'm']
+ - [5, 'm']
+ - [6, 'r']
+ - [7, 'm']
+ - [8, 'r']
+ - [9, 'r']
+...
+-- check restart does not change something
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'm']
+ - [5, 'm']
+ - [6, 'r']
+ - [7, 'm']
+ - [8, 'r']
+ - [9, 'r']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+s:drop()
+---
+...
diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
new file mode 100644
index 000000000..f25a4737d
--- /dev/null
+++ b/test/replication/transaction.test.lua
@@ -0,0 +1,86 @@
+env = require('test_run')
+test_run = env.new()
+box.schema.user.grant('guest', 'replication')
+
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+_ = s:create_index('pk')
+
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+v1 = box.info.vclock
+
+test_run:cmd("switch default")
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+-- set conflict to third transaction
+_ = box.space.test:delete({4})
+box.space.test:replace({6, 'r'})
+-- restart replication
+replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication = replication}
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+
+-- check restart does not help
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+box.cfg{replication = {}, replication_skip_conflict = true}
+box.cfg{replication = replication}
+
+-- check last transaction applied without conflicting row
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+box.space.test:replace({9, 'r'})
+
+-- issue a conflicting tx
+test_run:cmd("switch default")
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- vclock should be increased but rows skipped
+box.space.test:select()
+
+-- check restart does not change something
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+box.schema.user.revoke('guest', 'replication')
+s:drop()
--
2.21.0
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [tarantool-patches] [PATCH v3 4/5] Transaction support for applier
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 4/5] Transaction support for applier Georgy Kirichenko
@ 2019-03-27 11:41 ` Vladimir Davydov
2019-03-27 11:48 ` Vladimir Davydov
1 sibling, 0 replies; 14+ messages in thread
From: Vladimir Davydov @ 2019-03-27 11:41 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Fri, Mar 22, 2019 at 03:06:09PM +0300, Georgy Kirichenko wrote:
> Applier fetch incoming rows to form a transaction and then apply it.
> Rows are fetched and stored on fiber gc region until last transaction row
> with is_commit was fetched. After fetch a multi row transaction is going to be
> applied into txn_begin/txn_commit/txn_rolback boundaries. At this time
> we could not apply single row transaction in such boundaries because of
> ddl which does not support non auto commit transactions.
>
> Closes: #2798
> Needed for: #980
> ---
> src/box/applier.cc | 218 +++++++++++++++++------
> test/replication/transaction.result | 242 ++++++++++++++++++++++++++
> test/replication/transaction.test.lua | 86 +++++++++
> 3 files changed, 491 insertions(+), 55 deletions(-)
> create mode 100644 test/replication/transaction.result
> create mode 100644 test/replication/transaction.test.lua
This patch looks good to me.
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [tarantool-patches] [PATCH v3 4/5] Transaction support for applier
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 4/5] Transaction support for applier Georgy Kirichenko
2019-03-27 11:41 ` Vladimir Davydov
@ 2019-03-27 11:48 ` Vladimir Davydov
1 sibling, 0 replies; 14+ messages in thread
From: Vladimir Davydov @ 2019-03-27 11:48 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Fri, Mar 22, 2019 at 03:06:09PM +0300, Georgy Kirichenko wrote:
> +/**
> + * Apply all rows in the rows queue as a single transaction.
> + *
> + * Return 0 for success or -1 in case of an error.
> + */
> +static int
> +applier_apply_tx(struct stailq *rows)
> +{
> + int res = 0;
> + struct txn *txn = txn_begin(false);
> + struct applier_tx_row *item;
> + if (txn == NULL)
> + diag_raise();
Just one thing - mixing retval and exceptions to propagate errors
doesn't look good. Besides you won't release the latch in case of
an exception (see below). Please replace with 'return -1'.
> + stailq_foreach_entry(item, rows, next) {
> + struct xrow_header *row = &item->row;
> + res = apply_row(row);
> + if (res != 0) {
> + struct error *e = diag_last_error(diag_get());
> + /*
> + * In case of ER_TUPLE_FOUND error and enabled
> + * replication_skip_conflict configuration
> + * option, skip applying the foreign row and
> + * replace it with NOP in the local write ahead
> + * log.
> + */
> + if (e->type == &type_ClientError &&
> + box_error_code(e) == ER_TUPLE_FOUND &&
> + replication_skip_conflict) {
> + diag_clear(diag_get());
> + row->type = IPROTO_NOP;
> + row->bodycnt = 0;
> + res = apply_row(row);
> + }
> + }
> + if (res != 0)
> + break;
> + }
> + if (res == 0)
> + res = txn_commit(txn);
> + else
> + txn_rollback();
> + return res;
> +}
> @@ -594,33 +724,11 @@ applier_subscribe(struct applier *applier)
> * that belong to the same server id.
> */
> latch_lock(latch);
> - if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
> - int res = apply_row(&row);
> - if (res != 0) {
> - struct error *e = diag_last_error(diag_get());
> - /*
> - * In case of ER_TUPLE_FOUND error and enabled
> - * replication_skip_conflict configuration
> - * option, skip applying the foreign row and
> - * replace it with NOP in the local write ahead
> - * log.
> - */
> - if (e->type == &type_ClientError &&
> - box_error_code(e) == ER_TUPLE_FOUND &&
> - replication_skip_conflict) {
> - diag_clear(diag_get());
> - struct xrow_header nop;
> - nop.type = IPROTO_NOP;
> - nop.bodycnt = 0;
> - nop.replica_id = row.replica_id;
> - nop.lsn = row.lsn;
> - res = apply_row(&nop);
> - }
> - }
> - if (res != 0) {
> - latch_unlock(latch);
> - diag_raise();
> - }
> + if (vclock_get(&replicaset.vclock, first_row->replica_id) <
> + first_row->lsn &&
> + applier_apply_tx(&rows) != 0) {
> + latch_unlock(latch);
> + diag_raise();
> }
> latch_unlock(latch);
>
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] [PATCH v3 5/5] Raise an error if remote transaction produces non-local changes
2019-03-22 12:06 [tarantool-patches] [PATCH v3 0/5] Transaction in replication protocol Georgy Kirichenko
` (3 preceding siblings ...)
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 4/5] Transaction support for applier Georgy Kirichenko
@ 2019-03-22 12:06 ` Georgy Kirichenko
2019-03-27 12:06 ` Vladimir Davydov
4 siblings, 1 reply; 14+ messages in thread
From: Georgy Kirichenko @ 2019-03-22 12:06 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Disallow changes for non-local spaces during replication stream
applying. As we do not support distributed transaction yet we could not
provide a transactional replication for such side effects if there are
not NOPed.
Needed for: #2798
Follow up for: 27283debc327a1ef87e025badeed97d9ac264ac6
---
src/box/applier.cc | 18 ++++++++++--
src/box/txn.c | 15 ++++++++++
src/box/txn.h | 9 ++++++
test/replication/on_replace.result | 44 ++++++++++++++++++++++++----
test/replication/on_replace.test.lua | 20 +++++++++++--
5 files changed, 96 insertions(+), 10 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 08ad4a6a8..2a528b856 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -571,9 +571,23 @@ applier_apply_tx(struct stailq *rows)
if (res != 0)
break;
}
- if (res == 0)
+ if (res == 0) {
+ /*
+ * We are going to commit so it's a high time to check if
+ * the current transaction has non-local effects.
+ */
+ if (txn_is_distributed(txn)) {
+ /*
+ * A transaction mixes remote and local rows and
+ * countn't be replicated back because we don't
+ * support distributed transactions yet.
+ */
+ diag_set(ClientError, ER_UNSUPPORTED,
+ "Applier", "distributed transactions");
+ return -1;
+ }
res = txn_commit(txn);
- else
+ } else
txn_rollback();
return res;
}
diff --git a/src/box/txn.c b/src/box/txn.c
index 31e19951f..97f076f22 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -224,6 +224,21 @@ fail:
return NULL;
}
+bool
+txn_is_distributed(struct txn *txn)
+{
+ assert(txn == in_txn());
+ if (txn->n_local_rows == 0 || txn->n_remote_rows == 0)
+ return false;
+ struct txn_stmt *stmt;
+ /* Search for new non local group rows. */
+ stailq_foreach_entry(stmt, &txn->stmts, next)
+ if (stmt->row->replica_id == 0 &&
+ stmt->space->def->opts.group_id != GROUP_LOCAL)
+ return true;
+ return false;
+}
+
/**
* End a statement. In autocommit mode, end
* the current transaction as well.
diff --git a/src/box/txn.h b/src/box/txn.h
index 3572b005d..c00eb28a0 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -296,6 +296,15 @@ txn_commit_ro_stmt(struct txn *txn)
}
}
+/*
+ * Check whether transaction is distributed or not.
+ * It's essential in case of replication because we couldn't
+ * replicate a transaction with both remote and local non NOP
+ * statements.
+ */
+bool
+txn_is_distributed(struct txn *txn);
+
/**
* End a statement. In autocommit mode, end
* the current transaction as well.
diff --git a/test/replication/on_replace.result b/test/replication/on_replace.result
index 2e95b90ea..a02b90f7e 100644
--- a/test/replication/on_replace.result
+++ b/test/replication/on_replace.result
@@ -104,7 +104,7 @@ box.space.test:drop()
box.schema.user.revoke('guest', 'replication')
---
...
--- gh-2682 on_replace on slave server with data change
+-- gh-2798 on_replace on slave server with non-local data change should fail
SERVERS = { 'on_replace1', 'on_replace2' }
---
...
@@ -143,7 +143,7 @@ fiber = require'fiber'
while box.space.s2 == nil do fiber.sleep(0.00001) end
---
...
-_ = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
+tg = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
---
...
test_run:cmd('switch on_replace1')
@@ -154,20 +154,27 @@ box.space.s1:replace({1, 2, 3, 4})
---
- [1, 2, 3, 4]
...
-while #(box.space.s2:select()) == 0 do fiber.sleep(0.00001) end
+while (box.info.replication[3 - box.info.id].downstream.status ~= 'stopped') do fiber.sleep(0.00001) end
---
...
test_run:cmd('switch on_replace2')
---
- true
...
+while (box.info.replication[3 - box.info.id].upstream.status ~= 'stopped') do fiber.sleep(0.00001) end
+---
+...
+box.info.replication[3 - box.info.id].upstream.message
+---
+- Applier does not support distributed transactions
+...
box.space.s1:select()
---
-- - [1, 2, 3, 4]
+- []
...
box.space.s2:select()
---
-- - [1, 2, 3, 4]
+- []
...
test_run:cmd('switch on_replace1')
---
@@ -179,6 +186,33 @@ box.space.s1:select()
...
box.space.s2:select()
---
+- []
+...
+-- gh-2798 on_replace on slave server with local data change is allowed
+test_run:cmd('switch on_replace2')
+---
+- true
+...
+s3 = box.schema.space.create('s3', {is_local = true})
+---
+...
+_ = s3:create_index('pk')
+---
+...
+tg = box.space.s1:on_replace(function (old, new) box.space.s3:replace(new) end, tg)
+---
+...
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication = replication}
+---
+...
+s3:select()
+---
- - [1, 2, 3, 4]
...
_ = test_run:cmd('switch default')
diff --git a/test/replication/on_replace.test.lua b/test/replication/on_replace.test.lua
index e34832103..779dbf768 100644
--- a/test/replication/on_replace.test.lua
+++ b/test/replication/on_replace.test.lua
@@ -44,7 +44,7 @@ box.space.test:drop()
box.schema.user.revoke('guest', 'replication')
--- gh-2682 on_replace on slave server with data change
+-- gh-2798 on_replace on slave server with non-local data change should fail
SERVERS = { 'on_replace1', 'on_replace2' }
test_run:create_cluster(SERVERS, "replication", {args="0.2"})
@@ -60,13 +60,15 @@ _ = s2:create_index('pk')
test_run:cmd('switch on_replace2')
fiber = require'fiber'
while box.space.s2 == nil do fiber.sleep(0.00001) end
-_ = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
+tg = box.space.s1:on_replace(function (old, new) box.space.s2:replace(new) end)
test_run:cmd('switch on_replace1')
box.space.s1:replace({1, 2, 3, 4})
-while #(box.space.s2:select()) == 0 do fiber.sleep(0.00001) end
+while (box.info.replication[3 - box.info.id].downstream.status ~= 'stopped') do fiber.sleep(0.00001) end
test_run:cmd('switch on_replace2')
+while (box.info.replication[3 - box.info.id].upstream.status ~= 'stopped') do fiber.sleep(0.00001) end
+box.info.replication[3 - box.info.id].upstream.message
box.space.s1:select()
box.space.s2:select()
@@ -74,6 +76,18 @@ test_run:cmd('switch on_replace1')
box.space.s1:select()
box.space.s2:select()
+-- gh-2798 on_replace on slave server with local data change is allowed
+test_run:cmd('switch on_replace2')
+s3 = box.schema.space.create('s3', {is_local = true})
+_ = s3:create_index('pk')
+tg = box.space.s1:on_replace(function (old, new) box.space.s3:replace(new) end, tg)
+
+replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication = replication}
+
+s3:select()
+
_ = test_run:cmd('switch default')
test_run:drop_cluster(SERVERS)
test_run:cleanup_cluster()
--
2.21.0
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [tarantool-patches] [PATCH v3 5/5] Raise an error if remote transaction produces non-local changes
2019-03-22 12:06 ` [tarantool-patches] [PATCH v3 5/5] Raise an error if remote transaction produces non-local changes Georgy Kirichenko
@ 2019-03-27 12:06 ` Vladimir Davydov
0 siblings, 0 replies; 14+ messages in thread
From: Vladimir Davydov @ 2019-03-27 12:06 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Fri, Mar 22, 2019 at 03:06:10PM +0300, Georgy Kirichenko wrote:
> diff --git a/test/replication/on_replace.result b/test/replication/on_replace.result
> index 2e95b90ea..a02b90f7e 100644
> --- a/test/replication/on_replace.result
> +++ b/test/replication/on_replace.result
> @@ -154,20 +154,27 @@ box.space.s1:replace({1, 2, 3, 4})
> ---
> - [1, 2, 3, 4]
> ...
> -while #(box.space.s2:select()) == 0 do fiber.sleep(0.00001) end
> +while (box.info.replication[3 - box.info.id].downstream.status ~= 'stopped') do fiber.sleep(0.00001) end
Nit:
3 - box.info.id ?
Looks confusing. Why not simply use 1? It should point to the master,
shouldn't it?
Other than that, the patch looks good to me.
^ permalink raw reply [flat|nested] 14+ messages in thread