From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Sergei Voronezhskii Subject: [PATCH v2 2/5] test: errinj for pause relay_send Date: Fri, 19 Oct 2018 19:17:18 +0300 Message-Id: <20181019161721.49560-3-sergw@tarantool.org> In-Reply-To: <20181019161721.49560-1-sergw@tarantool.org> References: <20181019161721.49560-1-sergw@tarantool.org> To: tarantool-patches@freelists.org Cc: Alexander Turenko , Vladimir Davydov List-ID: Instead of using timeout we need just pause `relay_send`. Can't rely on timeout because of various system load in parallel mode. Add new errinj which checks boolean in loop and until it is not `True` do not pass the method `relay_send` to the next statement. To check the read-only mode, need to make a modification of tuple. It is enough to call `replace` method. Instead of `delete` and then useless verification that we have not delete space by using `get` method. And lookup the xlog files in loop with a little sleep, until the file count is not as expected. Part of #2436, #3232 --- src/box/relay.cc | 7 ++++- src/errinj.h | 1 + test/replication/catch.result | 48 +++++++++++++++------------------ test/replication/catch.test.lua | 41 ++++++++++++++-------------- test/replication/gc.result | 46 ++++++++++++++----------------- test/replication/gc.test.lua | 38 +++++++++++++------------- 6 files changed, 86 insertions(+), 95 deletions(-) diff --git a/src/box/relay.cc b/src/box/relay.cc index 0a1e95af0..81f2b821c 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -631,12 +631,17 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, static void relay_send(struct relay *relay, struct xrow_header *packet) { + struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL); + while (inj->bparam) { + fiber_sleep(0.01); + inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL); + } packet->sync = relay->sync; relay->last_row_tm = ev_monotonic_now(loop()); coio_write_xrow(&relay->io, packet); fiber_gc(); - struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); + inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); if (inj != NULL && inj->dparam > 0) fiber_sleep(inj->dparam); } diff --git a/src/errinj.h b/src/errinj.h index 84a1fbb5e..bf6c15ba5 100644 --- a/src/errinj.h +++ b/src/errinj.h @@ -94,6 +94,7 @@ struct errinj { _(ERRINJ_VY_GC, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_VY_LOG_FLUSH, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_VY_LOG_FLUSH_DELAY, ERRINJ_BOOL, {.bparam = false}) \ + _(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE, {.dparam = 0}) \ _(ERRINJ_RELAY_REPORT_INTERVAL, ERRINJ_DOUBLE, {.dparam = 0}) \ _(ERRINJ_RELAY_FINAL_SLEEP, ERRINJ_BOOL, {.bparam = false}) \ diff --git a/test/replication/catch.result b/test/replication/catch.result index 663bdc758..d25932d1b 100644 --- a/test/replication/catch.result +++ b/test/replication/catch.result @@ -35,7 +35,7 @@ test_run:cmd("switch default") s = box.schema.space.create('test', {engine = engine}); --- ... --- vinyl does not support hash index +-- Vinyl does not support hash index. index = s:create_index('primary', {type = (engine == 'vinyl' and 'tree' or 'hash') }) --- ... @@ -57,14 +57,14 @@ test_run:cmd("stop server replica") --- - true ... --- insert values on the master while replica is stopped and can't fetch them -for i=1,100 do s:insert{i, 'this is test message12345'} end +-- Insert values on the master while replica is stopped and can't +-- fetch them. +errinj.set('ERRINJ_RELAY_SEND_DELAY', true) --- +- ok ... --- sleep after every tuple -errinj.set("ERRINJ_RELAY_TIMEOUT", 1000.0) +for i = 1, 100 do s:insert{i, 'this is test message12345'} end --- -- ok ... test_run:cmd("start server replica with args='0.01'") --- @@ -75,28 +75,26 @@ test_run:cmd("switch replica") - true ... -- Check that replica doesn't enter read-write mode before --- catching up with the master: to check that we inject sleep into --- the master relay_send function and attempt a data modifying --- statement in replica while it's still fetching data from the --- master. --- In the next two cases we try to delete a tuple while replica is --- catching up with the master (local delete, remote delete) case +-- catching up with the master: to check that we stop sending +-- rows on the master in relay_send function and attempt a data +-- modifying statement in replica while it's still fetching data +-- from the master. +-- +-- In the next two cases we try to replace a tuple while replica +-- is catching up with the master (local delete, remote delete) +-- case -- --- #1: delete tuple on replica +-- Case #1: replace tuple on replica locally. -- box.space.test ~= nil --- - true ... -d = box.space.test:delete{1} +box.space.test:replace{1} --- - error: Can't modify data because this instance is in read-only mode. ... -box.space.test:get(1) ~= nil ---- -- true -... --- case #2: delete tuple by net.box +-- Case #2: replace tuple on replica by net.box. test_run:cmd("switch default") --- - true @@ -108,20 +106,16 @@ test_run:cmd("set variable r_uri to 'replica.listen'") c = net_box.connect(r_uri) --- ... -d = c.space.test:delete{1} +d = c.space.test:replace{1} --- - error: Can't modify data because this instance is in read-only mode. ... -c.space.test:get(1) ~= nil ---- -- true -... --- check sync -errinj.set("ERRINJ_RELAY_TIMEOUT", 0) +-- Resume replicaton. +errinj.set('ERRINJ_RELAY_SEND_DELAY', false) --- - ok ... --- cleanup +-- Cleanup. test_run:cmd("stop server replica") --- - true diff --git a/test/replication/catch.test.lua b/test/replication/catch.test.lua index 6773675d0..685c3e869 100644 --- a/test/replication/catch.test.lua +++ b/test/replication/catch.test.lua @@ -13,7 +13,7 @@ test_run:cmd("switch replica") test_run:cmd("switch default") s = box.schema.space.create('test', {engine = engine}); --- vinyl does not support hash index +-- Vinyl does not support hash index. index = s:create_index('primary', {type = (engine == 'vinyl' and 'tree' or 'hash') }) test_run:cmd("switch replica") @@ -22,41 +22,40 @@ while box.space.test == nil do fiber.sleep(0.01) end test_run:cmd("switch default") test_run:cmd("stop server replica") --- insert values on the master while replica is stopped and can't fetch them -for i=1,100 do s:insert{i, 'this is test message12345'} end - --- sleep after every tuple -errinj.set("ERRINJ_RELAY_TIMEOUT", 1000.0) +-- Insert values on the master while replica is stopped and can't +-- fetch them. +errinj.set('ERRINJ_RELAY_SEND_DELAY', true) +for i = 1, 100 do s:insert{i, 'this is test message12345'} end test_run:cmd("start server replica with args='0.01'") test_run:cmd("switch replica") -- Check that replica doesn't enter read-write mode before --- catching up with the master: to check that we inject sleep into --- the master relay_send function and attempt a data modifying --- statement in replica while it's still fetching data from the --- master. --- In the next two cases we try to delete a tuple while replica is --- catching up with the master (local delete, remote delete) case +-- catching up with the master: to check that we stop sending +-- rows on the master in relay_send function and attempt a data +-- modifying statement in replica while it's still fetching data +-- from the master. +-- +-- In the next two cases we try to replace a tuple while replica +-- is catching up with the master (local delete, remote delete) +-- case -- --- #1: delete tuple on replica +-- Case #1: replace tuple on replica locally. -- box.space.test ~= nil -d = box.space.test:delete{1} -box.space.test:get(1) ~= nil +box.space.test:replace{1} --- case #2: delete tuple by net.box +-- Case #2: replace tuple on replica by net.box. test_run:cmd("switch default") test_run:cmd("set variable r_uri to 'replica.listen'") c = net_box.connect(r_uri) -d = c.space.test:delete{1} -c.space.test:get(1) ~= nil +d = c.space.test:replace{1} --- check sync -errinj.set("ERRINJ_RELAY_TIMEOUT", 0) +-- Resume replicaton. +errinj.set('ERRINJ_RELAY_SEND_DELAY', false) --- cleanup +-- Cleanup. test_run:cmd("stop server replica") test_run:cmd("cleanup server replica") test_run:cmd("delete server replica") diff --git a/test/replication/gc.result b/test/replication/gc.result index 2895bed3b..dad5539de 100644 --- a/test/replication/gc.result +++ b/test/replication/gc.result @@ -30,6 +30,9 @@ box.cfg{checkpoint_count = 1} function wait_gc(n) while #box.info.gc().checkpoints > n do fiber.sleep(0.01) end end --- ... +function wait_xlog(n, timeout) timeout = timeout or 1.0 return test_run:wait_cond(function() return #fio.glob('./master/*.xlog') == n end, timeout) end +--- +... -- Grant permissions needed for replication. box.schema.user.grant('guest', 'replication') --- @@ -66,7 +69,7 @@ for i = 1, 100 do s:auto_increment{} end ... -- Make sure replica join will take long enough for us to -- invoke garbage collection. -box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05) +box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", true) --- - ok ... @@ -81,7 +84,7 @@ test_run:cmd("setopt delimiter ';'") fiber.create(function() fiber.sleep(0.1) box.snapshot() - box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0) + box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", false) end) test_run:cmd("setopt delimiter ''"); --- @@ -102,7 +105,7 @@ test_run:cmd("switch replica") fiber = require('fiber') --- ... -while box.space.test:count() < 200 do fiber.sleep(0.01) end +while box.space.test == nil or box.space.test:count() < 200 do fiber.sleep(0.01) end --- ... box.space.test:count() @@ -122,13 +125,13 @@ wait_gc(1) --- - true ... -#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') +wait_xlog(1) or fio.listdir('./master') --- - true ... --- Make sure the replica will receive data it is subscribed --- to long enough for us to invoke garbage collection. -box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05) +-- Make sure the replica will not receive data until +-- we test garbage collection. +box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", true) --- - ok ... @@ -160,13 +163,13 @@ box.snapshot() --- - true ... -#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') +wait_xlog(2) or fio.listdir('./master') --- - true ... --- Remove the timeout injection so that the replica catches +-- Resume replicaton so that the replica catches -- up quickly. -box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0) +box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", false) --- - ok ... @@ -195,7 +198,7 @@ wait_gc(1) --- - true ... -#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master') +wait_xlog(0) or fio.listdir('./master') --- - true ... @@ -238,7 +241,7 @@ fiber.sleep(0.1) -- wait for master to relay data --- - true ... -#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') +wait_xlog(2) or fio.listdir('./master') --- - true ... @@ -283,7 +286,7 @@ wait_gc(1) --- - true ... -#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') +wait_xlog(1) or fio.listdir('./master') --- - true ... @@ -319,13 +322,10 @@ box.snapshot() --- - true ... -xlog_count = #fio.glob('./master/*.xlog') ---- -... -- the replica may have managed to download all data -- from xlog #1 before it was stopped, in which case -- it's OK to collect xlog #1 -xlog_count == 3 or xlog_count == 2 or fio.listdir('./master') +wait_xlog(3, 0.1) or wait_xlog(2, 0.1) or fio.listdir('./master') --- - true ... @@ -338,7 +338,7 @@ test_run:cleanup_cluster() --- - true ... -#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') +wait_xlog(1) or fio.listdir('./master') --- - true ... @@ -433,7 +433,7 @@ box.snapshot() --- - ok ... -#fio.glob('./master/*.xlog') == 3 or fio.listdir('./master') +wait_xlog(3) or fio.listdir('./master') --- - true ... @@ -446,13 +446,7 @@ box.snapshot() --- - ok ... -t = fiber.time() ---- -... -while #fio.glob('./master/*xlog') > 0 and fiber.time() - t < 10 do fiber.sleep(0.01) end ---- -... -#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master') +wait_xlog(0, 10) or fio.listdir('./master') --- - true ... diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua index 5100378b3..22921289d 100644 --- a/test/replication/gc.test.lua +++ b/test/replication/gc.test.lua @@ -12,6 +12,7 @@ default_checkpoint_count = box.cfg.checkpoint_count box.cfg{checkpoint_count = 1} function wait_gc(n) while #box.info.gc().checkpoints > n do fiber.sleep(0.01) end end +function wait_xlog(n, timeout) timeout = timeout or 1.0 return test_run:wait_cond(function() return #fio.glob('./master/*.xlog') == n end, timeout) end -- Grant permissions needed for replication. box.schema.user.grant('guest', 'replication') @@ -31,7 +32,7 @@ for i = 1, 100 do s:auto_increment{} end -- Make sure replica join will take long enough for us to -- invoke garbage collection. -box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05) +box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", true) -- While the replica is receiving the initial data set, -- make a snapshot and invoke garbage collection, then @@ -41,7 +42,7 @@ test_run:cmd("setopt delimiter ';'") fiber.create(function() fiber.sleep(0.1) box.snapshot() - box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0) + box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", false) end) test_run:cmd("setopt delimiter ''"); @@ -54,7 +55,7 @@ test_run:cmd("start server replica") -- data from the master. Check it. test_run:cmd("switch replica") fiber = require('fiber') -while box.space.test:count() < 200 do fiber.sleep(0.01) end +while box.space.test == nil or box.space.test:count() < 200 do fiber.sleep(0.01) end box.space.test:count() test_run:cmd("switch default") @@ -62,10 +63,10 @@ test_run:cmd("switch default") -- the replica released the corresponding checkpoint. wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() -#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') --- Make sure the replica will receive data it is subscribed --- to long enough for us to invoke garbage collection. -box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05) +wait_xlog(1) or fio.listdir('./master') +-- Make sure the replica will not receive data until +-- we test garbage collection. +box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", true) -- Send more data to the replica. -- Need to do 2 snapshots here, otherwise the replica would @@ -80,11 +81,11 @@ box.snapshot() -- xlogs needed by the replica. box.snapshot() #box.info.gc().checkpoints == 1 or box.info.gc() -#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') +wait_xlog(2) or fio.listdir('./master') --- Remove the timeout injection so that the replica catches +-- Resume replicaton so that the replica catches -- up quickly. -box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0) +box.error.injection.set("ERRINJ_RELAY_SEND_DELAY", false) -- Check that the replica received all data from the master. test_run:cmd("switch replica") @@ -96,7 +97,7 @@ test_run:cmd("switch default") -- from the old checkpoint. wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() -#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master') +wait_xlog(0) or fio.listdir('./master') -- -- Check that the master doesn't delete xlog files sent to the -- replica until it receives a confirmation that the data has @@ -115,7 +116,7 @@ fiber.sleep(0.1) -- wait for master to relay data -- because it is still needed by the replica, but remove -- the old snapshot. #box.info.gc().checkpoints == 1 or box.info.gc() -#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') +wait_xlog(2) or fio.listdir('./master') test_run:cmd("switch replica") -- Unblock the replica and break replication. box.error.injection.set("ERRINJ_WAL_DELAY", false) @@ -131,7 +132,7 @@ test_run:cmd("switch default") -- Now it's safe to drop the old xlog. wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() -#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') +wait_xlog(1) or fio.listdir('./master') -- Stop the replica. test_run:cmd("stop server replica") test_run:cmd("cleanup server replica") @@ -146,17 +147,16 @@ box.snapshot() _ = s:auto_increment{} box.snapshot() #box.info.gc().checkpoints == 1 or box.info.gc() -xlog_count = #fio.glob('./master/*.xlog') -- the replica may have managed to download all data -- from xlog #1 before it was stopped, in which case -- it's OK to collect xlog #1 -xlog_count == 3 or xlog_count == 2 or fio.listdir('./master') +wait_xlog(3, 0.1) or wait_xlog(2, 0.1) or fio.listdir('./master') -- The xlog should only be deleted after the replica -- is unregistered. test_run:cleanup_cluster() #box.info.gc().checkpoints == 1 or box.info.gc() -#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') +wait_xlog(1) or fio.listdir('./master') -- -- Test that concurrent invocation of the garbage collector works fine. -- @@ -197,15 +197,13 @@ _ = s:auto_increment{} box.snapshot() _ = s:auto_increment{} box.snapshot() -#fio.glob('./master/*.xlog') == 3 or fio.listdir('./master') +wait_xlog(3) or fio.listdir('./master') -- Delete the replica from the cluster table and check that -- all xlog files are removed. test_run:cleanup_cluster() box.snapshot() -t = fiber.time() -while #fio.glob('./master/*xlog') > 0 and fiber.time() - t < 10 do fiber.sleep(0.01) end -#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master') +wait_xlog(0, 10) or fio.listdir('./master') -- Restore the config. box.cfg{replication = {}} -- 2.18.0