* [Tarantool-patches] [PATCH 1/2] relay: fix vclock obtainment on join
2019-12-30 17:46 [Tarantool-patches] [PATCH 0/2] fix vlock obtainment on join & join_vclock test Ilya Kosarev
@ 2019-12-30 17:47 ` Ilya Kosarev
2019-12-31 9:11 ` Kirill Yukhin
2019-12-30 17:47 ` [Tarantool-patches] [PATCH 2/2] test: fix and split flaky join_vclock test Ilya Kosarev
1 sibling, 1 reply; 5+ messages in thread
From: Ilya Kosarev @ 2019-12-30 17:47 UTC (permalink / raw)
To: tarantool-patches; +Cc: v.shpilevoy
In case of high load vclock used to join replica could be in advance
comparing to an actual WAL. Therefore replica could have missed some
tuples from master. In order to fix this wal_sync is updated so that
now we can obtain up to date vclock on the flushed state using it.
Prerequisites #4160
---
src/box/relay.cc | 7 +++----
src/box/vinyl.c | 4 ++--
src/box/wal.c | 23 +++++++++++++++++------
src/box/wal.h | 4 +++-
4 files changed, 25 insertions(+), 13 deletions(-)
diff --git a/src/box/relay.cc b/src/box/relay.cc
index e849fcf4f50..b9ed27503a3 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -307,13 +307,12 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
/*
* Sync WAL to make sure that all changes visible from
- * the frozen read view are successfully committed.
+ * the frozen read view are successfully committed and
+ * obtain corresponding vclock.
*/
- if (wal_sync() != 0)
+ if (wal_sync(vclock) != 0)
diag_raise();
- vclock_copy(vclock, &replicaset.vclock);
-
/* Respond to the JOIN request with the current vclock. */
struct xrow_header row;
xrow_encode_vclock_xc(&row, vclock);
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 15a136f8109..5f169f09b25 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1087,7 +1087,7 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
*/
int rc;
if (need_wal_sync) {
- rc = wal_sync();
+ rc = wal_sync(NULL);
if (rc != 0)
goto out;
}
@@ -4180,7 +4180,7 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
*/
int rc;
if (need_wal_sync) {
- rc = wal_sync();
+ rc = wal_sync(NULL);
if (rc != 0)
goto out;
}
diff --git a/src/box/wal.c b/src/box/wal.c
index 5e2c13e0e08..6348ef4565e 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -519,21 +519,27 @@ wal_free(void)
wal_writer_destroy(writer);
}
+struct wal_vclock_msg {
+ struct cbus_call_msg base;
+ struct vclock vclock;
+};
+
static int
-wal_sync_f(struct cbus_call_msg *msg)
+wal_sync_f(struct cbus_call_msg *data)
{
- (void)msg;
+ struct wal_vclock_msg *msg = (struct wal_vclock_msg *) data;
struct wal_writer *writer = &wal_writer_singleton;
if (writer->in_rollback.route != NULL) {
/* We're rolling back a failed write. */
diag_set(ClientError, ER_WAL_IO);
return -1;
}
+ vclock_copy(&msg->vclock, &writer->vclock);
return 0;
}
int
-wal_sync(void)
+wal_sync(struct vclock *vclock)
{
ERROR_INJECT(ERRINJ_WAL_SYNC, {
diag_set(ClientError, ER_INJECTION, "wal sync");
@@ -541,18 +547,23 @@ wal_sync(void)
});
struct wal_writer *writer = &wal_writer_singleton;
- if (writer->wal_mode == WAL_NONE)
+ if (writer->wal_mode == WAL_NONE) {
+ if (vclock != NULL)
+ vclock_copy(vclock, &writer->vclock);
return 0;
+ }
if (!stailq_empty(&writer->rollback)) {
/* We're rolling back a failed write. */
diag_set(ClientError, ER_WAL_IO);
return -1;
}
bool cancellable = fiber_set_cancellable(false);
- struct cbus_call_msg msg;
+ struct wal_vclock_msg msg;
int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
- &msg, wal_sync_f, NULL, TIMEOUT_INFINITY);
+ &msg.base, wal_sync_f, NULL, TIMEOUT_INFINITY);
fiber_set_cancellable(cancellable);
+ if (vclock != NULL)
+ vclock_copy(vclock, &msg.vclock);
return rc;
}
diff --git a/src/box/wal.h b/src/box/wal.h
index b76b0a41f93..76b44941a7a 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -182,9 +182,11 @@ wal_mode();
/**
* Wait until all submitted writes are successfully flushed
* to disk. Returns 0 on success, -1 if write failed.
+ * Corresponding vclock is returned in @a vclock unless it is
+ * NULL.
*/
int
-wal_sync(void);
+wal_sync(struct vclock *vclock);
struct wal_checkpoint {
struct cbus_call_msg base;
--
2.17.1
^ permalink raw reply [flat|nested] 5+ messages in thread
* [Tarantool-patches] [PATCH 2/2] test: fix and split flaky join_vclock test
2019-12-30 17:46 [Tarantool-patches] [PATCH 0/2] fix vlock obtainment on join & join_vclock test Ilya Kosarev
2019-12-30 17:47 ` [Tarantool-patches] [PATCH 1/2] relay: fix vclock obtainment on join Ilya Kosarev
@ 2019-12-30 17:47 ` Ilya Kosarev
2019-12-31 9:12 ` Kirill Yukhin
1 sibling, 1 reply; 5+ messages in thread
From: Ilya Kosarev @ 2019-12-30 17:47 UTC (permalink / raw)
To: tarantool-patches; +Cc: v.shpilevoy
join_vclock test is assumed to verify that changes are not being lost
on the replica. Due to this the test is changed to explicitly check
that all changes on master are applied on replica.
Previously this test was also indirectly verifying that changes are
being applied in the correct order. Now there is separate test for
this, called replica_apply_order.
As far as changed join_vclock test might fail due to #4669, we are now
creating cluster out of fresh instances instead of using default
instance. Considering mentioned fixes it is not fragile anymore.
Closes #4160
---
test/replication/join_vclock.result | 61 ++++++---
test/replication/join_vclock.test.lua | 45 ++++---
test/replication/master1.lua | 1 +
test/replication/replica_apply_order.result | 121 ++++++++++++++++++
test/replication/replica_apply_order.test.lua | 48 +++++++
test/replication/suite.ini | 1 -
6 files changed, 238 insertions(+), 39 deletions(-)
create mode 120000 test/replication/master1.lua
create mode 100644 test/replication/replica_apply_order.result
create mode 100644 test/replication/replica_apply_order.test.lua
diff --git a/test/replication/join_vclock.result b/test/replication/join_vclock.result
index a9781073d44..82288907c74 100644
--- a/test/replication/join_vclock.result
+++ b/test/replication/join_vclock.result
@@ -1,17 +1,17 @@
-fiber = require('fiber')
----
-...
-env = require('test_run')
+test_run = require('test_run').new()
---
...
-replica_set = require('fast_replica')
+test_run:cmd("create server master with script='replication/master1.lua'")
---
+- true
...
-test_run = env.new()
+test_run:cmd('start server master')
---
+- true
...
-engine = test_run:get_cfg('engine')
+test_run:cmd("switch master")
---
+- true
...
errinj = box.error.injection
---
@@ -20,6 +20,9 @@ errinj.set("ERRINJ_RELAY_FINAL_SLEEP", true)
---
- ok
...
+engine = test_run:get_cfg('engine')
+---
+...
box.schema.user.grant('guest', 'replication')
---
...
@@ -29,61 +32,79 @@ s = box.schema.space.create('test', {engine = engine});
index = s:create_index('primary')
---
...
+fiber = require('fiber')
+---
+...
ch = fiber.channel(1)
---
...
done = false
---
...
-function repl_f() local i = 0 while not done do s:replace({i, i}) fiber.sleep(0.001) i = i + 1 end ch:put(true) end
+function repl_f() local i = 0 while not done do s:replace({i, i}) fiber.sleep(0.001) i = i + 1 end ch:put(i) end
---
...
_ = fiber.create(repl_f)
---
...
-replica_set.join(test_run, 1)
+test_run:cmd("create server replica with rpl_master=master, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
---
+- true
...
-test_run:cmd("switch replica1")
+test_run:cmd("switch replica")
---
- true
...
-test_run:cmd("switch default")
+test_run:cmd("switch master")
---
- true
...
done = true
---
...
-ch:get()
+count = ch:get()
---
-- true
...
errinj.set("ERRINJ_RELAY_FINAL_SLEEP", false)
---
- ok
...
-test_run:cmd("switch replica1")
+test_run:cmd("switch replica")
---
- true
...
-cnt = box.space.test.index[0]:count()
+test_run:cmd("setopt delimiter ';'")
---
+- true
...
-box.space.test.index.primary:max()[1] == cnt - 1
+-- Wait for all tuples to be inserted on replica
+test_run:wait_cond(function()
+ return box.space.test.index.primary:max()[1] == test_run:eval('master', 'count')[1] - 1
+end);
---
- true
...
-test_run:cmd("switch default")
+test_run:cmd("setopt delimiter ''");
---
- true
...
-replica_set.drop_all(test_run)
+replica_count = box.space.test.index.primary:count() master_count = test_run:eval('master', 'count')[1]
---
...
-box.space.test:drop()
+-- Verify that there are the same amount of tuples on master and replica
+replica_count == master_count or {replica_count, master_count}
---
+- true
+...
+-- Cleanup.
+test_run:cmd('switch default')
+---
+- true
...
-box.schema.user.revoke('guest', 'replication')
+test_run:drop_cluster({'master', 'replica'})
---
...
diff --git a/test/replication/join_vclock.test.lua b/test/replication/join_vclock.test.lua
index 0b60dffc2c0..7e21a7d0924 100644
--- a/test/replication/join_vclock.test.lua
+++ b/test/replication/join_vclock.test.lua
@@ -1,35 +1,44 @@
-fiber = require('fiber')
-env = require('test_run')
-replica_set = require('fast_replica')
-test_run = env.new()
-engine = test_run:get_cfg('engine')
+test_run = require('test_run').new()
+
+test_run:cmd("create server master with script='replication/master1.lua'")
+test_run:cmd('start server master')
+test_run:cmd("switch master")
errinj = box.error.injection
errinj.set("ERRINJ_RELAY_FINAL_SLEEP", true)
+engine = test_run:get_cfg('engine')
box.schema.user.grant('guest', 'replication')
s = box.schema.space.create('test', {engine = engine});
index = s:create_index('primary')
+fiber = require('fiber')
ch = fiber.channel(1)
done = false
-function repl_f() local i = 0 while not done do s:replace({i, i}) fiber.sleep(0.001) i = i + 1 end ch:put(true) end
+function repl_f() local i = 0 while not done do s:replace({i, i}) fiber.sleep(0.001) i = i + 1 end ch:put(i) end
_ = fiber.create(repl_f)
-replica_set.join(test_run, 1)
-test_run:cmd("switch replica1")
+test_run:cmd("create server replica with rpl_master=master, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
-test_run:cmd("switch default")
+test_run:cmd("switch master")
done = true
-ch:get()
+count = ch:get()
errinj.set("ERRINJ_RELAY_FINAL_SLEEP", false)
-test_run:cmd("switch replica1")
-cnt = box.space.test.index[0]:count()
-box.space.test.index.primary:max()[1] == cnt - 1
-test_run:cmd("switch default")
-
-replica_set.drop_all(test_run)
-box.space.test:drop()
-box.schema.user.revoke('guest', 'replication')
+test_run:cmd("switch replica")
+test_run:cmd("setopt delimiter ';'")
+-- Wait for all tuples to be inserted on replica
+test_run:wait_cond(function()
+ return box.space.test.index.primary:max()[1] == test_run:eval('master', 'count')[1] - 1
+end);
+test_run:cmd("setopt delimiter ''");
+replica_count = box.space.test.index.primary:count() master_count = test_run:eval('master', 'count')[1]
+-- Verify that there are the same amount of tuples on master and replica
+replica_count == master_count or {replica_count, master_count}
+
+-- Cleanup.
+test_run:cmd('switch default')
+test_run:drop_cluster({'master', 'replica'})
diff --git a/test/replication/master1.lua b/test/replication/master1.lua
new file mode 120000
index 00000000000..1c7debd2fc4
--- /dev/null
+++ b/test/replication/master1.lua
@@ -0,0 +1 @@
+master.lua
\ No newline at end of file
diff --git a/test/replication/replica_apply_order.result b/test/replication/replica_apply_order.result
new file mode 100644
index 00000000000..513b722a796
--- /dev/null
+++ b/test/replication/replica_apply_order.result
@@ -0,0 +1,121 @@
+-- test-run result file version 2
+fiber = require('fiber')
+ | ---
+ | ...
+env = require('test_run')
+ | ---
+ | ...
+replica_set = require('fast_replica')
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+engine = test_run:get_cfg('engine')
+ | ---
+ | ...
+
+errinj = box.error.injection
+ | ---
+ | ...
+errinj.set("ERRINJ_RELAY_FINAL_SLEEP", true)
+ | ---
+ | - ok
+ | ...
+
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+s = box.schema.space.create('test', {engine = engine});
+ | ---
+ | ...
+index = s:create_index('primary')
+ | ---
+ | ...
+
+ch = fiber.channel(1)
+ | ---
+ | ...
+done = false
+ | ---
+ | ...
+
+function repl_f() local i = 0 while not done do s:replace({i, i}) fiber.sleep(0.001) i = i + 1 end ch:put(true) end
+ | ---
+ | ...
+_ = fiber.create(repl_f)
+ | ---
+ | ...
+
+replica_set.join(test_run, 1)
+ | ---
+ | ...
+test_run:cmd("switch replica1")
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("switch default")
+ | ---
+ | - true
+ | ...
+done = true
+ | ---
+ | ...
+ch:get()
+ | ---
+ | - true
+ | ...
+
+errinj.set("ERRINJ_RELAY_FINAL_SLEEP", false)
+ | ---
+ | - ok
+ | ...
+test_run:cmd("switch replica1")
+ | ---
+ | - true
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+function get_max_index_and_count()
+ return box.space.test.index.primary:max()[1], box.space.test.index.primary:count()
+end;
+ | ---
+ | ...
+max, count = 0, 0;
+ | ---
+ | ...
+for i = 1, 100 do
+ max, count = box.atomic(get_max_index_and_count)
+ if max ~= count - 1 then
+ break
+ end
+end;
+ | ---
+ | ...
+-- Verify that at any moment max index is corresponding to amount of tuples,
+-- which means that changes apply order is correct
+max == count - 1 or {max, count - 1};
+ | ---
+ | - true
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+test_run:cmd("switch default")
+ | ---
+ | - true
+ | ...
+
+replica_set.drop_all(test_run)
+ | ---
+ | ...
+box.space.test:drop()
+ | ---
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
diff --git a/test/replication/replica_apply_order.test.lua b/test/replication/replica_apply_order.test.lua
new file mode 100644
index 00000000000..ba54ef8439d
--- /dev/null
+++ b/test/replication/replica_apply_order.test.lua
@@ -0,0 +1,48 @@
+fiber = require('fiber')
+env = require('test_run')
+replica_set = require('fast_replica')
+test_run = env.new()
+engine = test_run:get_cfg('engine')
+
+errinj = box.error.injection
+errinj.set("ERRINJ_RELAY_FINAL_SLEEP", true)
+
+box.schema.user.grant('guest', 'replication')
+s = box.schema.space.create('test', {engine = engine});
+index = s:create_index('primary')
+
+ch = fiber.channel(1)
+done = false
+
+function repl_f() local i = 0 while not done do s:replace({i, i}) fiber.sleep(0.001) i = i + 1 end ch:put(true) end
+_ = fiber.create(repl_f)
+
+replica_set.join(test_run, 1)
+test_run:cmd("switch replica1")
+
+test_run:cmd("switch default")
+done = true
+ch:get()
+
+errinj.set("ERRINJ_RELAY_FINAL_SLEEP", false)
+test_run:cmd("switch replica1")
+test_run:cmd("setopt delimiter ';'")
+function get_max_index_and_count()
+ return box.space.test.index.primary:max()[1], box.space.test.index.primary:count()
+end;
+max, count = 0, 0;
+for i = 1, 100 do
+ max, count = box.atomic(get_max_index_and_count)
+ if max ~= count - 1 then
+ break
+ end
+end;
+-- Verify that at any moment max index is corresponding to amount of tuples,
+-- which means that changes apply order is correct
+max == count - 1 or {max, count - 1};
+test_run:cmd("setopt delimiter ''");
+test_run:cmd("switch default")
+
+replica_set.drop_all(test_run)
+box.space.test:drop()
+box.schema.user.revoke('guest', 'replication')
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index 384dac677ea..ed1de31405e 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -12,7 +12,6 @@ long_run = prune.test.lua
is_parallel = True
pretest_clean = True
fragile = errinj.test.lua ; gh-3870
- join_vclock.test.lua ; gh-4160
long_row_timeout.test.lua ; gh-4351
skip_conflict_row.test.lua ; gh-4457
sync.test.lua ; gh-3835 gh-3877
--
2.17.1
^ permalink raw reply [flat|nested] 5+ messages in thread