[PATCH v2 2/5] test: errinj for pause relay_send

Sergei Voronezhskii sergw at tarantool.org
Fri Oct 19 19:17:18 MSK 2018


Instead of using timeout we need just pause `relay_send`. Can't rely
on timeout because of various system load in parallel mode. Add new
errinj which checks boolean in loop and until it is not `True` do not
pass the method `relay_send` to the next statement.

To check the read-only mode, need to make a modification of tuple. It
is enough to call `replace` method. Instead of `delete` and then
useless verification that we have not delete space by using `get`
method.

And lookup the xlog files in loop with a little sleep, until the file
count is not as expected.

Part of #2436, #3232
---
 src/box/relay.cc                |  7 ++++-
 src/errinj.h                    |  1 +
 test/replication/catch.result   | 48 +++++++++++++++------------------
 test/replication/catch.test.lua | 41 ++++++++++++++--------------
 test/replication/gc.result      | 46 ++++++++++++++-----------------
 test/replication/gc.test.lua    | 38 +++++++++++++-------------
 6 files changed, 86 insertions(+), 95 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 0a1e95af0..81f2b821c 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -631,12 +631,17 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 static void
 relay_send(struct relay *relay, struct xrow_header *packet)
 {
+	struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL);
+	while (inj->bparam) {
+		fiber_sleep(0.01);
+		inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL);
+	}
 	packet->sync = relay->sync;
 	relay->last_row_tm = ev_monotonic_now(loop());
 	coio_write_xrow(&relay->io, packet);
 	fiber_gc();
 
-	struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
+	inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
 	if (inj != NULL && inj->dparam > 0)
 		fiber_sleep(inj->dparam);
 }
diff --git a/src/errinj.h b/src/errinj.h
index 84a1fbb5e..bf6c15ba5 100644
--- a/src/errinj.h
+++ b/src/errinj.h
@@ -94,6 +94,7 @@ struct errinj {
 	_(ERRINJ_VY_GC, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_VY_LOG_FLUSH, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_VY_LOG_FLUSH_DELAY, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE, {.dparam = 0}) \
 	_(ERRINJ_RELAY_REPORT_INTERVAL, ERRINJ_DOUBLE, {.dparam = 0}) \
 	_(ERRINJ_RELAY_FINAL_SLEEP, ERRINJ_BOOL, {.bparam = false}) \
diff --git a/test/replication/catch.result b/test/replication/catch.result
index 663bdc758..d25932d1b 100644
--- a/test/replication/catch.result
+++ b/test/replication/catch.result
@@ -35,7 +35,7 @@ test_run:cmd("switch default")
 s = box.schema.space.create('test', {engine = engine});
 ---
 ...
--- vinyl does not support hash index
+-- Vinyl does not support hash index.
 index = s:create_index('primary', {type = (engine == 'vinyl' and 'tree' or 'hash') })
 ---
 ...
@@ -57,14 +57,14 @@ test_run:cmd("stop server replica")
 ---
 - true
 ...
--- insert values on the master while replica is stopped and can't fetch them
-for i=1,100 do s:insert{i, 'this is test message12345'} end
+-- Insert values on the master while replica is stopped and can't
+-- fetch them.
+errinj.set('ERRINJ_RELAY_SEND_DELAY', true)
 ---
+- ok
 ...
--- sleep after every tuple
-errinj.set("ERRINJ_RELAY_TIMEOUT", 1000.0)
+for i = 1, 100 do s:insert{i, 'this is test message12345'} end
 ---
-- ok
 ...
 test_run:cmd("start server replica with args='0.01'")
 ---
@@ -75,28 +75,26 @@ test_run:cmd("switch replica")
 - true
 ...
 -- Check that replica doesn't enter read-write mode before
--- catching up with the master: to check that we inject sleep into
--- the master relay_send function and attempt a data modifying
--- statement in replica while it's still fetching data from the
--- master.
--- In the next two cases we try to delete a tuple while replica is
--- catching up with the master (local delete, remote delete) case
+-- catching up with the master: to check that we stop sending
+-- rows on the master in relay_send function and attempt a data
+-- modifying statement in replica while it's still fetching data
+-- from the master.
+--
+-- In the next two cases we try to replace a tuple while replica
+-- is catching up with the master (local delete, remote delete)
+-- case
 --
--- #1: delete tuple on replica
+-- Case #1: replace tuple on replica locally.
 --
 box.space.test ~= nil
 ---
 - true
 ...
-d = box.space.test:delete{1}
+box.space.test:replace{1}
 ---
 - error: Can't modify data because this instance is in read-only mode.
 ...
-box.space.test:get(1) ~= nil
----
-- true
-...
--- case #2: delete tuple by net.box
+-- Case #2: replace tuple on replica by net.box.
 test_run:cmd("switch default")
 ---
 - true
@@ -108,20 +106,16 @@ test_run:cmd("set variable r_uri to 'replica.listen'")
 c = net_box.connect(r_uri)
 ---
 ...
-d = c.space.test:delete{1}
+d = c.space.test:replace{1}
 ---
 - error: Can't modify data because this instance is in read-only mode.
 ...
-c.space.test:get(1) ~= nil
----
-- true
-...
--- check sync
-errinj.set("ERRINJ_RELAY_TIMEOUT", 0)
+-- Resume replicaton.
+errinj.set('ERRINJ_RELAY_SEND_DELAY', false)
 ---
 - ok
 ...
--- cleanup
+-- Cleanup.
 test_run:cmd("stop server replica")
 ---
 - true
diff --git a/test/replication/catch.test.lua b/test/replication/catch.test.lua
index 6773675d0..685c3e869 100644
--- a/test/replication/catch.test.lua
+++ b/test/replication/catch.test.lua
@@ -13,7 +13,7 @@ test_run:cmd("switch replica")
 
 test_run:cmd("switch default")
 s = box.schema.space.create('test', {engine = engine});
--- vinyl does not support hash index
+-- Vinyl does not support hash index.
 index = s:create_index('primary', {type = (engine == 'vinyl' and 'tree' or 'hash') })
 
 test_run:cmd("switch replica")
@@ -22,41 +22,40 @@ while box.space.test == nil do fiber.sleep(0.01) end
 test_run:cmd("switch default")
 test_run:cmd("stop server replica")
 
--- insert values on the master while replica is stopped and can't fetch them
-for i=1,100 do s:insert{i, 'this is test message12345'} end
-
--- sleep after every tuple
-errinj.set("ERRINJ_RELAY_TIMEOUT", 1000.0)
+-- Insert values on the master while replica is stopped and can't
+-- fetch them.
+errinj.set('ERRINJ_RELAY_SEND_DELAY', true)
+for i = 1, 100 do s:insert{i, 'this is test message12345'} end
 
 test_run:cmd("start server replica with args='0.01'")
 test_run:cmd("switch replica")
 
 -- Check that replica doesn't enter read-write mode before
--- catching up with the master: to check that we inject sleep into
--- the master relay_send function and attempt a data modifying
--- statement in replica while it's still fetching data from the
--- master.
--- In the next two cases we try to delete a tuple while replica is
--- catching up with the master (local delete, remote delete) case
+-- catching up with the master: to check that we stop sending
+-- rows on the master in relay_send function and attempt a data
+-- modifying statement in replica while it's still fetching data
+-- from the master.
+--
+-- In the next two cases we try to replace a tuple while replica
+-- is catching up with the master (local delete, remote delete)
+-- case
 --
--- #1: delete tuple on replica
+-- Case #1: replace tuple on replica locally.
 --
 box.space.test ~= nil
-d = box.space.test:delete{1}
-box.space.test:get(1) ~= nil
+box.space.test:replace{1}
 
--- case #2: delete tuple by net.box
+-- Case #2: replace tuple on replica by net.box.
 
 test_run:cmd("switch default")
 test_run:cmd("set variable r_uri to 'replica.listen'")
 c = net_box.connect(r_uri)
-d = c.space.test:delete{1}
-c.space.test:get(1) ~= nil
+d = c.space.test:replace{1}
 
--- check sync
-errinj.set("ERRINJ_RELAY_TIMEOUT", 0)
+-- Resume replicaton.
+errinj.set('ERRINJ_RELAY_SEND_DELAY', false)
 
--- cleanup
+-- Cleanup.
 test_run:cmd("stop server replica")
 test_run:cmd("cleanup server replica")
 test_run:cmd("delete server replica")
diff --git a/test/replication/gc.result b/test/replication/gc.result
index 2895bed3b..dad5539de 100644
--- a/test/replication/gc.result
+++ b/test/replication/gc.result
@@ -30,6 +30,9 @@ box.cfg{checkpoint_count = 1}
 function wait_gc(n) while #box.info.gc().checkpoints > n do fiber.sleep(0.01) end end
 ---
 ...
+function wait_xlog(n, timeout) timeout = timeout or 1.0 return test_run:wait_cond(function() return #fio.glob('./master/*.xlog') == n end, timeout) end
+---
+...
 -- Grant permissions needed for replication.
 box.schema.user.grant('guest', 'replication')
 ---
@@ -66,7 +69,7 @@ for i = 1, 100 do s:auto_increment{} end
 ...
 -- Make sure replica join will take long enough for us to
 -- invoke garbage collection.
-box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05)
+box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", true)
 ---
 - ok
 ...
@@ -81,7 +84,7 @@ test_run:cmd("setopt delimiter ';'")
 fiber.create(function()
     fiber.sleep(0.1)
     box.snapshot()
-    box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0)
+    box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", false)
 end)
 test_run:cmd("setopt delimiter ''");
 ---
@@ -102,7 +105,7 @@ test_run:cmd("switch replica")
 fiber = require('fiber')
 ---
 ...
-while box.space.test:count() < 200 do fiber.sleep(0.01) end
+while box.space.test == nil or box.space.test:count() < 200 do fiber.sleep(0.01) end
 ---
 ...
 box.space.test:count()
@@ -122,13 +125,13 @@ wait_gc(1)
 ---
 - true
 ...
-#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
+wait_xlog(1) or fio.listdir('./master')
 ---
 - true
 ...
--- Make sure the replica will receive data it is subscribed
--- to long enough for us to invoke garbage collection.
-box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05)
+-- Make sure the replica will not receive data until
+-- we test garbage collection.
+box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", true)
 ---
 - ok
 ...
@@ -160,13 +163,13 @@ box.snapshot()
 ---
 - true
 ...
-#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
+wait_xlog(2) or fio.listdir('./master')
 ---
 - true
 ...
--- Remove the timeout injection so that the replica catches
+-- Resume replicaton so that the replica catches
 -- up quickly.
-box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0)
+box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", false)
 ---
 - ok
 ...
@@ -195,7 +198,7 @@ wait_gc(1)
 ---
 - true
 ...
-#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master')
+wait_xlog(0) or fio.listdir('./master')
 ---
 - true
 ...
@@ -238,7 +241,7 @@ fiber.sleep(0.1) -- wait for master to relay data
 ---
 - true
 ...
-#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
+wait_xlog(2) or fio.listdir('./master')
 ---
 - true
 ...
@@ -283,7 +286,7 @@ wait_gc(1)
 ---
 - true
 ...
-#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
+wait_xlog(1) or fio.listdir('./master')
 ---
 - true
 ...
@@ -319,13 +322,10 @@ box.snapshot()
 ---
 - true
 ...
-xlog_count = #fio.glob('./master/*.xlog')
----
-...
 -- the replica may have managed to download all data
 -- from xlog #1 before it was stopped, in which case
 -- it's OK to collect xlog #1
-xlog_count == 3 or xlog_count == 2 or fio.listdir('./master')
+wait_xlog(3, 0.1) or wait_xlog(2, 0.1) or fio.listdir('./master')
 ---
 - true
 ...
@@ -338,7 +338,7 @@ test_run:cleanup_cluster()
 ---
 - true
 ...
-#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
+wait_xlog(1) or fio.listdir('./master')
 ---
 - true
 ...
@@ -433,7 +433,7 @@ box.snapshot()
 ---
 - ok
 ...
-#fio.glob('./master/*.xlog') == 3 or fio.listdir('./master')
+wait_xlog(3) or fio.listdir('./master')
 ---
 - true
 ...
@@ -446,13 +446,7 @@ box.snapshot()
 ---
 - ok
 ...
-t = fiber.time()
----
-...
-while #fio.glob('./master/*xlog') > 0 and fiber.time() - t < 10 do fiber.sleep(0.01) end
----
-...
-#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master')
+wait_xlog(0, 10) or fio.listdir('./master')
 ---
 - true
 ...
diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua
index 5100378b3..22921289d 100644
--- a/test/replication/gc.test.lua
+++ b/test/replication/gc.test.lua
@@ -12,6 +12,7 @@ default_checkpoint_count = box.cfg.checkpoint_count
 box.cfg{checkpoint_count = 1}
 
 function wait_gc(n) while #box.info.gc().checkpoints > n do fiber.sleep(0.01) end end
+function wait_xlog(n, timeout) timeout = timeout or 1.0 return test_run:wait_cond(function() return #fio.glob('./master/*.xlog') == n end, timeout) end
 
 -- Grant permissions needed for replication.
 box.schema.user.grant('guest', 'replication')
@@ -31,7 +32,7 @@ for i = 1, 100 do s:auto_increment{} end
 
 -- Make sure replica join will take long enough for us to
 -- invoke garbage collection.
-box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05)
+box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", true)
 
 -- While the replica is receiving the initial data set,
 -- make a snapshot and invoke garbage collection, then
@@ -41,7 +42,7 @@ test_run:cmd("setopt delimiter ';'")
 fiber.create(function()
     fiber.sleep(0.1)
     box.snapshot()
-    box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0)
+    box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", false)
 end)
 test_run:cmd("setopt delimiter ''");
 
@@ -54,7 +55,7 @@ test_run:cmd("start server replica")
 -- data from the master. Check it.
 test_run:cmd("switch replica")
 fiber = require('fiber')
-while box.space.test:count() < 200 do fiber.sleep(0.01) end
+while box.space.test == nil or box.space.test:count() < 200 do fiber.sleep(0.01) end
 box.space.test:count()
 test_run:cmd("switch default")
 
@@ -62,10 +63,10 @@ test_run:cmd("switch default")
 -- the replica released the corresponding checkpoint.
 wait_gc(1)
 #box.info.gc().checkpoints == 1 or box.info.gc()
-#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
--- Make sure the replica will receive data it is subscribed
--- to long enough for us to invoke garbage collection.
-box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05)
+wait_xlog(1) or fio.listdir('./master')
+-- Make sure the replica will not receive data until
+-- we test garbage collection.
+box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", true)
 
 -- Send more data to the replica.
 -- Need to do 2 snapshots here, otherwise the replica would
@@ -80,11 +81,11 @@ box.snapshot()
 -- xlogs needed by the replica.
 box.snapshot()
 #box.info.gc().checkpoints == 1 or box.info.gc()
-#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
+wait_xlog(2) or fio.listdir('./master')
 
--- Remove the timeout injection so that the replica catches
+-- Resume replicaton so that the replica catches
 -- up quickly.
-box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0)
+box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", false)
 
 -- Check that the replica received all data from the master.
 test_run:cmd("switch replica")
@@ -96,7 +97,7 @@ test_run:cmd("switch default")
 -- from the old checkpoint.
 wait_gc(1)
 #box.info.gc().checkpoints == 1 or box.info.gc()
-#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master')
+wait_xlog(0) or fio.listdir('./master')
 --
 -- Check that the master doesn't delete xlog files sent to the
 -- replica until it receives a confirmation that the data has
@@ -115,7 +116,7 @@ fiber.sleep(0.1) -- wait for master to relay data
 -- because it is still needed by the replica, but remove
 -- the old snapshot.
 #box.info.gc().checkpoints == 1 or box.info.gc()
-#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
+wait_xlog(2) or fio.listdir('./master')
 test_run:cmd("switch replica")
 -- Unblock the replica and break replication.
 box.error.injection.set("ERRINJ_WAL_DELAY", false)
@@ -131,7 +132,7 @@ test_run:cmd("switch default")
 -- Now it's safe to drop the old xlog.
 wait_gc(1)
 #box.info.gc().checkpoints == 1 or box.info.gc()
-#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
+wait_xlog(1) or fio.listdir('./master')
 -- Stop the replica.
 test_run:cmd("stop server replica")
 test_run:cmd("cleanup server replica")
@@ -146,17 +147,16 @@ box.snapshot()
 _ = s:auto_increment{}
 box.snapshot()
 #box.info.gc().checkpoints == 1 or box.info.gc()
-xlog_count = #fio.glob('./master/*.xlog')
 -- the replica may have managed to download all data
 -- from xlog #1 before it was stopped, in which case
 -- it's OK to collect xlog #1
-xlog_count == 3 or xlog_count == 2 or fio.listdir('./master')
+wait_xlog(3, 0.1) or wait_xlog(2, 0.1) or fio.listdir('./master')
 
 -- The xlog should only be deleted after the replica
 -- is unregistered.
 test_run:cleanup_cluster()
 #box.info.gc().checkpoints == 1 or box.info.gc()
-#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
+wait_xlog(1) or fio.listdir('./master')
 --
 -- Test that concurrent invocation of the garbage collector works fine.
 --
@@ -197,15 +197,13 @@ _ = s:auto_increment{}
 box.snapshot()
 _ = s:auto_increment{}
 box.snapshot()
-#fio.glob('./master/*.xlog') == 3 or fio.listdir('./master')
+wait_xlog(3) or fio.listdir('./master')
 
 -- Delete the replica from the cluster table and check that
 -- all xlog files are removed.
 test_run:cleanup_cluster()
 box.snapshot()
-t = fiber.time()
-while #fio.glob('./master/*xlog') > 0 and fiber.time() - t < 10 do fiber.sleep(0.01) end
-#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master')
+wait_xlog(0, 10) or fio.listdir('./master')
 
 -- Restore the config.
 box.cfg{replication = {}}
-- 
2.18.0




More information about the Tarantool-patches mailing list