[Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it

Serge Petrenko sergepetrenko at tarantool.org
Fri Feb 28 00:17:57 MSK 2020


  
>Четверг, 27 февраля 2020, 17:13 +03:00 от Serge Petrenko <sergepetrenko at tarantool.org>:
> 
> 
>>27 февр. 2020 г., в 02:54, Vladislav Shpilevoy < v.shpilevoy at tarantool.org > написал(а):  
>>Thanks for the patch!
>> 
> 
>Hi! Thanks for the review!
> 
>Please find my comments and the new diff below.  
>>See 4 comments below.
>>  
>>>   replication: do not relay rows coming from a remote instance back to it
>>>
>>>   We have a mechanism for restoring rows originating from an instance that
>>>   suffered a sudden power loss: remote masters resend the isntance's rows
>>>   received before a certain point in time, defined by remote master vclock
>>>   at the moment of subscribe.
>>>   However, this is useful only on initial replication configuraiton, when
>>>   an instance has just recovered, so that it can receive what it has
>>>   relayed but haven't synced to disk.
>>>   In other cases, when an instance is operating normally and master-master
>>>   replication is configured, the mechanism described above may lead to
>>>   instance re-applying instance's own rows, coming from a master it has just
>>>   subscribed to.
>>>   To fix the problem do not relay rows coming from a remote instance, if
>>>   the instance has already recovered.
>>>
>>>   Closes #4739
>>>
>>>diff --git a/src/box/ applier.cc b/src/box/ applier.cc
>>>index 911353425..73ffc0d68 100644
>>>--- a/src/box/ applier.cc
>>>+++ b/src/box/ applier.cc
>>>@@ -866,8 +866,13 @@ applier_subscribe(struct applier *applier)
>>>struct vclock vclock;
>>>vclock_create(&vclock);
>>>vclock_copy(&vclock, &replicaset.vclock);
>>>+ /*
>>>+ * Stop accepting local rows coming from a remote
>>>+ * instance as soon as local WAL starts accepting writes.
>>>+ */
>>>+ unsigned int id_filter = box_is_orphan() ? 0 : 1 << instance_id;
>>1. I was always wondering, what if the instance got orphaned after it
>>started accepting writes? WAL is fully functional, it syncs whatever is
>>needed, and then a resubscribe happens. Can this break anything?
>>  
>>>xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
>>>- &vclock, replication_anon, 0);
>>>+ &vclock, replication_anon, id_filter);
>>>coio_write_xrow(coio, &row);
>>>
>>>/* Read SUBSCRIBE response */
>>>diff --git a/src/box/wal.c b/src/box/wal.c
>>>index 27bff662a..35ba7b072 100644
>>>--- a/src/box/wal.c
>>>+++ b/src/box/wal.c
>>>@@ -278,8 +278,13 @@ tx_schedule_commit(struct cmsg *msg)
>>>/* Closes the input valve. */
>>>stailq_concat(&writer->rollback, &batch->rollback);
>>>}
>>>+
>>>+ ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
>>>/* Update the tx vclock to the latest written by wal. */
>>>vclock_copy(&replicaset.vclock, &batch->vclock);
>>>+#ifndef NDEBUG
>>>+skip_update:
>>>+#endif
>>2. Consider this hack which I just invented. In that way you won't
>>depend on ERRINJ and NDEBUG interconnection.
>>
>>====================
>>@@ -282,9 +282,7 @@ tx_schedule_commit(struct cmsg *msg)
>>ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
>>/* Update the tx vclock to the latest written by wal. */
>>vclock_copy(&replicaset.vclock, &batch->vclock);
>>-#ifndef NDEBUG
>>-skip_update:
>>-#endif
>>+ ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
>>tx_schedule_queue(&batch->commit);
>>mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
>>}
>>====================
> 
>Good one, applied.  
>>
>>Talking of the injection itself - don't know really. Perhaps
>>it would be better to add a delay to the wal_write_to_disk()
>>function, to its very end, after wal_notify_watchers(). In
>>that case relay will wake up, send whatever it wants, and TX
>>won't update the vclock until you let wal_write_to_disk()
>>finish. Seems more natural this way.
> 
>I tried to add a sleep first. It’s impossible to sleep in tx_schedule_commit(),
>since it’s processed in tx_prio endpoint, where yielding is impossible.
>I also tried to add a sleep at the end of wal_write_to_disk(), just like you
>suggest. This didn’t work out either. I’ll give you more details in the evening,
>when I give it another try. I’ll send a follow-up if I succeed with adding a sleep.
 
Ok. I tried to add either ERROR_INJECT_YIELD or ERROR_INJECT_SLEEP at the end
of wal_write_to_disk().
It looks like you cannot yield in wal_write_to_disk(). (is it possible to yield in WAL thread at all?)
Firing this injection with ERROR_INJECT_YIELD and then resetting it leads to wal thread stopping
processing messages. This leads to tarantool hanging infinitely on shutdown when tx waits for wal
thread to exit, but wal never gets the shutdown signal.
 
Using ERROR_INJECT_SLEEP leads to wal watchers not being notified until the injection is reset. This
probably happens because of  wal_notify_watchers’ use of cpipe_flush_input(), which doesn’t flush the input until
the end of event loop iteration, if there are not enough messages (only one message in our case).
The event loop iteration never ends, because we sleep right after wal_notify_watchers() call.
 
So, I see skipping vclock assignment in tx_schedule_commit() as the only possible alternative.
Hope my explanation was clear enough and, more imortantly, correct. If not, lest discuss.
>>  
>>>tx_schedule_queue(&batch->commit);
>>>mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
>>>}
>>>diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
>>>new file mode 100644
>>>index 000000000..7dc2f7118
>>>--- /dev/null
>>>+++ b/test/replication/gh-4739-vclock-assert.result
>>>@@ -0,0 +1,82 @@
>>>+-- test-run result file version 2
>>>+env = require('test_run')
>>>+ | ---
>>>+ | ...
>>>+test_run = env.new()
>>>+ | ---
>>>+ | ...
>>>+
>>>+SERVERS = {'rebootstrap1', 'rebootstrap2'}
>>>+ | ---
>>>+ | ...
>>>+test_run:create_cluster(SERVERS, "replication")
>>>+ | ---
>>>+ | ...
>>>+test_run:wait_fullmesh(SERVERS)
>>>+ | ---
>>>+ | ...
>>>+
>>>+test_run:cmd('switch rebootstrap1')
>>>+ | ---
>>>+ | - true
>>>+ | ...
>>>+fiber = require('fiber')
>>>+ | ---
>>>+ | ...
>>>+-- Stop updating replicaset vclock to simulate a situation, when
>>>+-- a row is already relayed to the remote master, but the local
>>>+-- vclock update hasn't happened yet.
>>>+box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
>>>+ | ---
>>>+ | - ok
>>>+ | ...
>>>+lsn = box.info.lsn
>>>+ | ---
>>>+ | ...
>>>+box.space._schema:replace{'something'}
>>>+ | ---
>>>+ | - ['something']
>>>+ | ...
>>>+-- Vclock isn't updated.
>>>+box.info.lsn == lsn
>>>+ | ---
>>>+ | - true
>>>+ | ...
>>>+
>>>+-- Wait until the remote instance gets the row.
>>>+while test_run:get_vclock('rebootstrap2')[ box.info.id ] == lsn do\
>>>+    fiber.sleep(0.01)\
>>>+end
>>3. There is a cool thing which I discovered relatively recently:
>>test_run:wait_cond(). It does fiber sleep and while cycle, and
>>has a finite timeout, so such a test won't hang for 10 minutes
>>in Travis in case of a problem.
> 
>Thanks!  
>>  
>>>+ | ---
>>>+ | ...
>>>+
>>>+-- Restart the remote instance. This will make the first instance
>>>+-- resubscribe without entering orphan mode.
>>>+test_run:cmd('restart server rebootstrap2')
>>>+ | ---
>>>+ | - true
>>>+ | ...
>>>+test_run:cmd('switch rebootstrap1')
>>>+ | ---
>>>+ | - true
>>>+ | ...
>>>+-- Wait until resubscribe is sent
>>>+fiber.sleep(2 * box.cfg.replication_timeout)
>>4. Don't we collect any statistics on replication requests, just
>>like we do in box.stat()? Perhaps  box.stat.net () can help? To
>>wait properly. Maybe just do test_run:wait_cond() for status 'sync'?
> 
>wait_cond for ’sync’ is enough. Applied.  
>>  
>>>+ | ---
>>>+ | ...
>>>+box.info.replication[2].upstream.status
>>>+ | ---
>>>+ | - sync
>>>+ | ...
>>>+
>>>+box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
>>>+ | ---
>>>+ | - ok
>>>+ | ...
>>>+test_run:cmd('switch default')
>>>+ | ---
>>>+ | - true
>>>+ | ...
>>>+test_run:drop_cluster(SERVERS)
>>>+ | ---
>>>+ | … 
>diff --git a/src/box/ applier.cc b/src/box/ applier.cc
>index 73ffc0d68..78f3d8a73 100644
>--- a/src/box/ applier.cc
>+++ b/src/box/ applier.cc
>@@ -870,7 +870,7 @@ applier_subscribe(struct applier *applier)
>   * Stop accepting local rows coming from a remote
>   * instance as soon as local WAL starts accepting writes.
>   */
>- unsigned int id_filter = box_is_orphan() ? 0 : 1 << instance_id;
>+ uint32_t id_filter = box_is_orphan() ? 0 : 1 << instance_id;
>  xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
>   &vclock, replication_anon, id_filter);
>  coio_write_xrow(coio, &row);
>diff --git a/src/box/wal.c b/src/box/wal.c
>index 35ba7b072..bf127b259 100644
>--- a/src/box/wal.c
>+++ b/src/box/wal.c
>@@ -282,9 +282,7 @@ tx_schedule_commit(struct cmsg *msg)
>  ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
>  /* Update the tx vclock to the latest written by wal. */
>  vclock_copy(&replicaset.vclock, &batch->vclock);
>-#ifndef NDEBUG
>-skip_update:
>-#endif
>+ ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
>  tx_schedule_queue(&batch->commit);
>  mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
> }
>diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
>index 7dc2f7118..a612826a0 100644
>--- a/test/replication/gh-4739-vclock-assert.result
>+++ b/test/replication/gh-4739-vclock-assert.result
>@@ -44,10 +44,11 @@ box.info.lsn == lsn
>  | ...
> 
> -- Wait until the remote instance gets the row.
>-while test_run:get_vclock('rebootstrap2')[ box.info.id ] == lsn do\
>-    fiber.sleep(0.01)\
>-end
>+test_run:wait_cond(function()\
>+    return test_run:get_vclock('rebootstrap2')[ box.info.id ] > lsn\
>+end, 10)
>  | ---
>+ | - true
>  | ...
> 
> -- Restart the remote instance. This will make the first instance
>@@ -61,14 +62,12 @@ test_run:cmd('switch rebootstrap1')
>  | - true
>  | ...
> -- Wait until resubscribe is sent
>-fiber.sleep(2 * box.cfg.replication_timeout)
>- | ---
>- | ...
>-box.info.replication[2].upstream.status
>+test_run:wait_cond(function()\
>+    return box.info.replication[2].upstream.status == 'sync'\
>+end, 10)
>  | ---
>- | - sync
>+ | - true
>  | ...
>-
> box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
>  | ---
>  | - ok
>diff --git a/test/replication/gh-4739-vclock-assert.test.lua b/test/replication/gh-4739-vclock-assert.test.lua
>index 26dc781e2..b6a7caf3b 100644
>--- a/test/replication/gh-4739-vclock-assert.test.lua
>+++ b/test/replication/gh-4739-vclock-assert.test.lua
>@@ -17,18 +17,18 @@ box.space._schema:replace{'something'}
> box.info.lsn == lsn
> 
> -- Wait until the remote instance gets the row.
>-while test_run:get_vclock('rebootstrap2')[ box.info.id ] == lsn do\
>-    fiber.sleep(0.01)\
>-end
>+test_run:wait_cond(function()\
>+    return test_run:get_vclock('rebootstrap2')[ box.info.id ] > lsn\
>+end, 10)
> 
> -- Restart the remote instance. This will make the first instance
> -- resubscribe without entering orphan mode.
> test_run:cmd('restart server rebootstrap2')
> test_run:cmd('switch rebootstrap1')
> -- Wait until resubscribe is sent
>-fiber.sleep(2 * box.cfg.replication_timeout)
>-box.info.replication[2].upstream.status
>-
>+test_run:wait_cond(function()\
>+    return box.info.replication[2].upstream.status == 'sync'\
>+end, 10)
> box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
> test_run:cmd('switch default')
> test_run:drop_cluster(SERVERS)
> 
>--
>Serge Petrenko
>sergepetrenko at tarantool.org
> 
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20200228/53a97c7b/attachment.html>


More information about the Tarantool-patches mailing list