[Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Feb 28 02:22:40 MSK 2020


> Ok. I tried to add either ERROR_INJECT_YIELD or ERROR_INJECT_SLEEP at the end
> of wal_write_to_disk().
> It looks like you cannot yield in wal_write_to_disk(). (is it possible to yield in WAL thread at all?)
> Firing this injection with ERROR_INJECT_YIELD and then resetting it leads to wal thread stopping
> processing messages. This leads to tarantool hanging infinitely on shutdown when tx waits for wal
> thread to exit, but wal never gets the shutdown signal.
>  
> Using ERROR_INJECT_SLEEP leads to wal watchers not being notified until the injection is reset. This
> probably happens because of  wal_notify_watchers’ use of cpipe_flush_input(), which doesn’t flush the input until
> the end of event loop iteration, if there are not enough messages (only one message in our case).
> The event loop iteration never ends, because we sleep right after wal_notify_watchers() call.
>  
> So, I see skipping vclock assignment in tx_schedule_commit() as the only possible alternative.
> Hope my explanation was clear enough and, more imortantly, correct. If not, lest discuss.

Yeah, I stumbled into the same problems. And realized that the current test,
after all, is not valid. So we need to change it anyway.

First I tried to solve them by trying to block TX thread totally on one
instance after it tried to commit something. Since that would block the
test too, I tried to introduce a new thread - errinj thread, which would
listen on an ip/port or a unix socket, and will receive requests to set
error injections from another instance. So rebootstrap1's TX thread would
freeze, and I could control that instance via interaction with its errinj
thread from rebootstrap2 instance or from default instance.

Despite the idea would work for sure, it appeared to be hard to implement
for a short time, so I postponed that, and probably will open a ticket to
implement such thing. It could be useful for any test, which needs to test
behaviour of other threads, when TX is not scheduled for a long time. Also
we can implement that logic as a part of iproto thread.

For our case I found a simpler solution - sleep in wal_write_to_disk, but
deliver all watcher events immediately. Then the test works. And still
crashes without your patch.

Here is my diff, which I pushed on top of your branch. If you don't agree -
lets discuss. Otherwise squash and the patchset LGTM.

================================================================================

commit 7054ed8ffc5cff690858261073cdfb1822e241b7
Author: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
Date:   Fri Feb 28 00:02:10 2020 +0100

    Review fixes

diff --git a/src/box/wal.c b/src/box/wal.c
index bf127b259..1668c9348 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -278,11 +278,8 @@ tx_schedule_commit(struct cmsg *msg)
 		/* Closes the input valve. */
 		stailq_concat(&writer->rollback, &batch->rollback);
 	}
-
-	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
 	/* Update the tx vclock to the latest written by wal. */
 	vclock_copy(&replicaset.vclock, &batch->vclock);
-	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
 	tx_schedule_queue(&batch->commit);
 	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
 }
@@ -1117,6 +1114,7 @@ done:
 	}
 	fiber_gc();
 	wal_notify_watchers(writer, WAL_EVENT_WRITE);
+	ERROR_INJECT_SLEEP(ERRINJ_RELAY_FASTER_THAN_TX);
 }
 
 /** WAL writer main loop.  */
@@ -1328,6 +1326,8 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 	msg->events = events;
 	cmsg_init(&msg->cmsg, watcher->route);
 	cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
+	ERROR_INJECT(ERRINJ_RELAY_FASTER_THAN_TX,
+		     cpipe_deliver_now(&watcher->watcher_pipe));
 }
 
 static void
diff --git a/src/lib/core/cbus.h b/src/lib/core/cbus.h
index 16d122779..f0101cb8b 100644
--- a/src/lib/core/cbus.h
+++ b/src/lib/core/cbus.h
@@ -176,6 +176,13 @@ cpipe_set_max_input(struct cpipe *pipe, int max_input)
 	pipe->max_input = max_input;
 }
 
+static inline void
+cpipe_deliver_now(struct cpipe *pipe)
+{
+	if (pipe->n_input > 0)
+		ev_invoke(pipe->producer, &pipe->flush_input, EV_CUSTOM);
+}
+
 /**
  * Flush all staged messages into the pipe and eventually to the
  * consumer.
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index 58fe158fd..d8cdf3f27 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -137,7 +137,7 @@ struct errinj {
 	_(ERRINJ_DYN_MODULE_COUNT, ERRINJ_INT, {.iparam = 0}) \
 	_(ERRINJ_FIBER_MADVISE, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_FIBER_MPROTECT, ERRINJ_INT, {.iparam = -1}) \
-	_(ERRINJ_REPLICASET_VCLOCK_UPDATE, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_RELAY_FASTER_THAN_TX, ERRINJ_BOOL, {.bparam = false}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/test/box/errinj.result b/test/box/errinj.result
index eb0905238..4ad24d0c1 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -59,12 +59,12 @@ evals
   - ERRINJ_PORT_DUMP: false
   - ERRINJ_RELAY_BREAK_LSN: -1
   - ERRINJ_RELAY_EXIT_DELAY: 0
+  - ERRINJ_RELAY_FASTER_THAN_TX: false
   - ERRINJ_RELAY_FINAL_JOIN: false
   - ERRINJ_RELAY_FINAL_SLEEP: false
   - ERRINJ_RELAY_REPORT_INTERVAL: 0
   - ERRINJ_RELAY_SEND_DELAY: false
   - ERRINJ_RELAY_TIMEOUT: 0
-  - ERRINJ_REPLICASET_VCLOCK_UPDATE: false
   - ERRINJ_REPLICA_JOIN_DELAY: false
   - ERRINJ_SIO_READ_MAX: -1
   - ERRINJ_SNAP_COMMIT_DELAY: false
diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
index a612826a0..43d3f27f3 100644
--- a/test/replication/gh-4739-vclock-assert.result
+++ b/test/replication/gh-4739-vclock-assert.result
@@ -26,16 +26,19 @@ fiber = require('fiber')
 -- Stop updating replicaset vclock to simulate a situation, when
 -- a row is already relayed to the remote master, but the local
 -- vclock update hasn't happened yet.
-box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
+box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true)
  | ---
  | - ok
  | ...
 lsn = box.info.lsn
  | ---
  | ...
-box.space._schema:replace{'something'}
+f = fiber.create(function() box.space._schema:replace{'something'} end)
  | ---
- | - ['something']
+ | ...
+test_run:wait_cond(function() return f:status() == 'suspended' end)
+ | ---
+ | - true
  | ...
 -- Vclock isn't updated.
 box.info.lsn == lsn
@@ -53,7 +56,7 @@ end, 10)
 
 -- Restart the remote instance. This will make the first instance
 -- resubscribe without entering orphan mode.
-test_run:cmd('restart server rebootstrap2')
+test_run:cmd('restart server rebootstrap2 with wait=False')
  | ---
  | - true
  | ...
@@ -68,10 +71,14 @@ end, 10)
  | ---
  | - true
  | ...
-box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
+box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', false)
  | ---
  | - ok
  | ...
+box.space._schema:get{'something'}
+ | ---
+ | - ['something']
+ | ...
 test_run:cmd('switch default')
  | ---
  | - true
diff --git a/test/replication/gh-4739-vclock-assert.test.lua b/test/replication/gh-4739-vclock-assert.test.lua
index b6a7caf3b..f8dd86688 100644
--- a/test/replication/gh-4739-vclock-assert.test.lua
+++ b/test/replication/gh-4739-vclock-assert.test.lua
@@ -10,9 +10,10 @@ fiber = require('fiber')
 -- Stop updating replicaset vclock to simulate a situation, when
 -- a row is already relayed to the remote master, but the local
 -- vclock update hasn't happened yet.
-box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
+box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true)
 lsn = box.info.lsn
-box.space._schema:replace{'something'}
+f = fiber.create(function() box.space._schema:replace{'something'} end)
+test_run:wait_cond(function() return f:status() == 'suspended' end)
 -- Vclock isn't updated.
 box.info.lsn == lsn
 
@@ -23,12 +24,13 @@ end, 10)
 
 -- Restart the remote instance. This will make the first instance
 -- resubscribe without entering orphan mode.
-test_run:cmd('restart server rebootstrap2')
+test_run:cmd('restart server rebootstrap2 with wait=False')
 test_run:cmd('switch rebootstrap1')
 -- Wait until resubscribe is sent
 test_run:wait_cond(function()\
     return box.info.replication[2].upstream.status == 'sync'\
 end, 10)
-box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
+box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', false)
+box.space._schema:get{'something'}
 test_run:cmd('switch default')
 test_run:drop_cluster(SERVERS)


More information about the Tarantool-patches mailing list