[Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it

Serge Petrenko sergepetrenko at tarantool.org
Fri Feb 28 11:03:51 MSK 2020



> 28 февр. 2020 г., в 02:22, Vladislav Shpilevoy <v.shpilevoy at tarantool.org> написал(а):
> 
>> Ok. I tried to add either ERROR_INJECT_YIELD or ERROR_INJECT_SLEEP at the end
>> of wal_write_to_disk().
>> It looks like you cannot yield in wal_write_to_disk(). (is it possible to yield in WAL thread at all?)
>> Firing this injection with ERROR_INJECT_YIELD and then resetting it leads to wal thread stopping
>> processing messages. This leads to tarantool hanging infinitely on shutdown when tx waits for wal
>> thread to exit, but wal never gets the shutdown signal.
>>  
>> Using ERROR_INJECT_SLEEP leads to wal watchers not being notified until the injection is reset. This
>> probably happens because of  wal_notify_watchers’ use of cpipe_flush_input(), which doesn’t flush the input until
>> the end of event loop iteration, if there are not enough messages (only one message in our case).
>> The event loop iteration never ends, because we sleep right after wal_notify_watchers() call.
>>  
>> So, I see skipping vclock assignment in tx_schedule_commit() as the only possible alternative.
>> Hope my explanation was clear enough and, more imortantly, correct. If not, lest discuss.
> 
> Yeah, I stumbled into the same problems. And realized that the current test,
> after all, is not valid. So we need to change it anyway.
> 
> First I tried to solve them by trying to block TX thread totally on one
> instance after it tried to commit something. Since that would block the
> test too, I tried to introduce a new thread - errinj thread, which would
> listen on an ip/port or a unix socket, and will receive requests to set
> error injections from another instance. So rebootstrap1's TX thread would
> freeze, and I could control that instance via interaction with its errinj
> thread from rebootstrap2 instance or from default instance.
> 
> Despite the idea would work for sure, it appeared to be hard to implement
> for a short time, so I postponed that, and probably will open a ticket to
> implement such thing. It could be useful for any test, which needs to test
> behaviour of other threads, when TX is not scheduled for a long time. Also
> we can implement that logic as a part of iproto thread.
> 
> For our case I found a simpler solution - sleep in wal_write_to_disk, but
> deliver all watcher events immediately. Then the test works. And still
> crashes without your patch.

Thanks for your amendments!
The new test LGTM. Do you mean the old test is incorrect because the
injection simulates an impossible situation? If yes, then I agree.
I applied your diff with a tiny fix. See below.
I’ll send v5 with a changelog in the cover letter shortly.

> 
> Here is my diff, which I pushed on top of your branch. If you don't agree -
> lets discuss. Otherwise squash and the patchset LGTM.
> 
> ================================================================================
> 
> commit 7054ed8ffc5cff690858261073cdfb1822e241b7
> Author: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
> Date:   Fri Feb 28 00:02:10 2020 +0100
> 
>    Review fixes
> 
> diff --git a/src/box/wal.c b/src/box/wal.c
> index bf127b259..1668c9348 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -278,11 +278,8 @@ tx_schedule_commit(struct cmsg *msg)
> 		/* Closes the input valve. */
> 		stailq_concat(&writer->rollback, &batch->rollback);
> 	}
> -
> -	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
> 	/* Update the tx vclock to the latest written by wal. */
> 	vclock_copy(&replicaset.vclock, &batch->vclock);
> -	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
> 	tx_schedule_queue(&batch->commit);
> 	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
> }
> @@ -1117,6 +1114,7 @@ done:
> 	}
> 	fiber_gc();
> 	wal_notify_watchers(writer, WAL_EVENT_WRITE);
> +	ERROR_INJECT_SLEEP(ERRINJ_RELAY_FASTER_THAN_TX);
> }
> 
> /** WAL writer main loop.  */
> @@ -1328,6 +1326,8 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
> 	msg->events = events;
> 	cmsg_init(&msg->cmsg, watcher->route);
> 	cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
> +	ERROR_INJECT(ERRINJ_RELAY_FASTER_THAN_TX,
> +		     cpipe_deliver_now(&watcher->watcher_pipe));
> }
> 
> static void
> diff --git a/src/lib/core/cbus.h b/src/lib/core/cbus.h
> index 16d122779..f0101cb8b 100644
> --- a/src/lib/core/cbus.h
> +++ b/src/lib/core/cbus.h
> @@ -176,6 +176,13 @@ cpipe_set_max_input(struct cpipe *pipe, int max_input)
> 	pipe->max_input = max_input;
> }
> 
> +static inline void
> +cpipe_deliver_now(struct cpipe *pipe)
> +{
> +	if (pipe->n_input > 0)
> +		ev_invoke(pipe->producer, &pipe->flush_input, EV_CUSTOM);
> +}
> +
> /**
>  * Flush all staged messages into the pipe and eventually to the
>  * consumer.
> diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
> index 58fe158fd..d8cdf3f27 100644
> --- a/src/lib/core/errinj.h
> +++ b/src/lib/core/errinj.h
> @@ -137,7 +137,7 @@ struct errinj {
> 	_(ERRINJ_DYN_MODULE_COUNT, ERRINJ_INT, {.iparam = 0}) \
> 	_(ERRINJ_FIBER_MADVISE, ERRINJ_BOOL, {.bparam = false}) \
> 	_(ERRINJ_FIBER_MPROTECT, ERRINJ_INT, {.iparam = -1}) \
> -	_(ERRINJ_REPLICASET_VCLOCK_UPDATE, ERRINJ_BOOL, {.bparam = false}) \
> +	_(ERRINJ_RELAY_FASTER_THAN_TX, ERRINJ_BOOL, {.bparam = false}) \
> 
> ENUM0(errinj_id, ERRINJ_LIST);
> extern struct errinj errinjs[];
> diff --git a/test/box/errinj.result b/test/box/errinj.result
> index eb0905238..4ad24d0c1 100644
> --- a/test/box/errinj.result
> +++ b/test/box/errinj.result
> @@ -59,12 +59,12 @@ evals
>   - ERRINJ_PORT_DUMP: false
>   - ERRINJ_RELAY_BREAK_LSN: -1
>   - ERRINJ_RELAY_EXIT_DELAY: 0
> +  - ERRINJ_RELAY_FASTER_THAN_TX: false
>   - ERRINJ_RELAY_FINAL_JOIN: false
>   - ERRINJ_RELAY_FINAL_SLEEP: false
>   - ERRINJ_RELAY_REPORT_INTERVAL: 0
>   - ERRINJ_RELAY_SEND_DELAY: false
>   - ERRINJ_RELAY_TIMEOUT: 0
> -  - ERRINJ_REPLICASET_VCLOCK_UPDATE: false
>   - ERRINJ_REPLICA_JOIN_DELAY: false
>   - ERRINJ_SIO_READ_MAX: -1
>   - ERRINJ_SNAP_COMMIT_DELAY: false
> diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
> index a612826a0..43d3f27f3 100644
> --- a/test/replication/gh-4739-vclock-assert.result
> +++ b/test/replication/gh-4739-vclock-assert.result
> @@ -26,16 +26,19 @@ fiber = require('fiber')
> -- Stop updating replicaset vclock to simulate a situation, when
> -- a row is already relayed to the remote master, but the local
> -- vclock update hasn't happened yet.
> -box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
> +box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true)
>  | ---
>  | - ok
>  | ...
> lsn = box.info.lsn
>  | ---
>  | ...
> -box.space._schema:replace{'something'}
> +f = fiber.create(function() box.space._schema:replace{'something'} end)
>  | ---
> - | - ['something']
> + | ...
> +test_run:wait_cond(function() return f:status() == 'suspended' end)
> + | ---
> + | - true
>  | ...

No need to call wait_cond() here. The fiber is suspended as soon as control is returned
to console, where we’re trying to call wait_cond.

diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
index 43d3f27f3..83896c4e1 100644
--- a/test/replication/gh-4739-vclock-assert.result
+++ b/test/replication/gh-4739-vclock-assert.result
@@ -36,9 +36,9 @@ lsn = box.info.lsn
 f = fiber.create(function() box.space._schema:replace{'something'} end)
  | ---
  | ...
-test_run:wait_cond(function() return f:status() == 'suspended' end)
+f:status()
  | ---
- | - true
+ | - suspended
  | ...
 -- Vclock isn't updated.
 box.info.lsn == lsn

> -- Vclock isn't updated.
> box.info.lsn == lsn
> @@ -53,7 +56,7 @@ end, 10)
> 
> -- Restart the remote instance. This will make the first instance
> -- resubscribe without entering orphan mode.
> -test_run:cmd('restart server rebootstrap2')
> +test_run:cmd('restart server rebootstrap2 with wait=False')
>  | ---
>  | - true
>  | ...
> @@ -68,10 +71,14 @@ end, 10)
>  | ---
>  | - true
>  | ...
> -box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
> +box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', false)
>  | ---
>  | - ok
>  | ...
> +box.space._schema:get{'something'}
> + | ---
> + | - ['something']
> + | ...
> test_run:cmd('switch default')
>  | ---
>  | - true
> diff --git a/test/replication/gh-4739-vclock-assert.test.lua b/test/replication/gh-4739-vclock-assert.test.lua
> index b6a7caf3b..f8dd86688 100644
> --- a/test/replication/gh-4739-vclock-assert.test.lua
> +++ b/test/replication/gh-4739-vclock-assert.test.lua
> @@ -10,9 +10,10 @@ fiber = require('fiber')
> -- Stop updating replicaset vclock to simulate a situation, when
> -- a row is already relayed to the remote master, but the local
> -- vclock update hasn't happened yet.
> -box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
> +box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true)
> lsn = box.info.lsn
> -box.space._schema:replace{'something'}
> +f = fiber.create(function() box.space._schema:replace{'something'} end)
> +test_run:wait_cond(function() return f:status() == 'suspended' end)

diff --git a/test/replication/gh-4739-vclock-assert.test.lua b/test/replication/gh-4739-vclock-assert.test.lua
index f8dd86688..5755ad752 100644
--- a/test/replication/gh-4739-vclock-assert.test.lua
+++ b/test/replication/gh-4739-vclock-assert.test.lua
@@ -13,7 +13,7 @@ fiber = require('fiber')
 box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true)
 lsn = box.info.lsn
 f = fiber.create(function() box.space._schema:replace{'something'} end)
-test_run:wait_cond(function() return f:status() == 'suspended' end)
+f:status()
 -- Vclock isn't updated.
 box.info.lsn == lsn
 

> -- Vclock isn't updated.
> box.info.lsn == lsn
> 
> @@ -23,12 +24,13 @@ end, 10)
> 
> -- Restart the remote instance. This will make the first instance
> -- resubscribe without entering orphan mode.
> -test_run:cmd('restart server rebootstrap2')
> +test_run:cmd('restart server rebootstrap2 with wait=False')
> test_run:cmd('switch rebootstrap1')
> -- Wait until resubscribe is sent
> test_run:wait_cond(function()\
>     return box.info.replication[2].upstream.status == 'sync'\
> end, 10)
> -box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
> +box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', false)
> +box.space._schema:get{'something'}
> test_run:cmd('switch default')
> test_run:drop_cluster(SERVERS)


--
Serge Petrenko
sergepetrenko at tarantool.org

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20200228/5cc50eb6/attachment.html>


More information about the Tarantool-patches mailing list