[Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it

Serge Petrenko sergepetrenko at tarantool.org
Thu Feb 27 17:13:31 MSK 2020


> 27 февр. 2020 г., в 02:54, Vladislav Shpilevoy <v.shpilevoy at tarantool.org> написал(а):
> 
> Thanks for the patch!
> 

Hi! Thanks for the review!

Please find my comments and the new diff below.

> See 4 comments below.
> 
>>    replication: do not relay rows coming from a remote instance back to it
>> 
>>    We have a mechanism for restoring rows originating from an instance that
>>    suffered a sudden power loss: remote masters resend the isntance's rows
>>    received before a certain point in time, defined by remote master vclock
>>    at the moment of subscribe.
>>    However, this is useful only on initial replication configuraiton, when
>>    an instance has just recovered, so that it can receive what it has
>>    relayed but haven't synced to disk.
>>    In other cases, when an instance is operating normally and master-master
>>    replication is configured, the mechanism described above may lead to
>>    instance re-applying instance's own rows, coming from a master it has just
>>    subscribed to.
>>    To fix the problem do not relay rows coming from a remote instance, if
>>    the instance has already recovered.
>> 
>>    Closes #4739
>> 
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 911353425..73ffc0d68 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -866,8 +866,13 @@ applier_subscribe(struct applier *applier)
>> 	struct vclock vclock;
>> 	vclock_create(&vclock);
>> 	vclock_copy(&vclock, &replicaset.vclock);
>> +	/*
>> +	 * Stop accepting local rows coming from a remote
>> +	 * instance as soon as local WAL starts accepting writes.
>> +	 */
>> +	unsigned int id_filter = box_is_orphan() ? 0 : 1 << instance_id;
> 
> 1. I was always wondering, what if the instance got orphaned after it
> started accepting writes? WAL is fully functional, it syncs whatever is
> needed, and then a resubscribe happens. Can this break anything?
> 
>> 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
>> -				 &vclock, replication_anon, 0);
>> +				 &vclock, replication_anon, id_filter);
>> 	coio_write_xrow(coio, &row);
>> 
>> 	/* Read SUBSCRIBE response */
>> diff --git a/src/box/wal.c b/src/box/wal.c
>> index 27bff662a..35ba7b072 100644
>> --- a/src/box/wal.c
>> +++ b/src/box/wal.c
>> @@ -278,8 +278,13 @@ tx_schedule_commit(struct cmsg *msg)
>> 		/* Closes the input valve. */
>> 		stailq_concat(&writer->rollback, &batch->rollback);
>> 	}
>> +
>> +	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
>> 	/* Update the tx vclock to the latest written by wal. */
>> 	vclock_copy(&replicaset.vclock, &batch->vclock);
>> +#ifndef NDEBUG
>> +skip_update:
>> +#endif
> 
> 2. Consider this hack which I just invented. In that way you won't
> depend on ERRINJ and NDEBUG interconnection.
> 
> ====================
> @@ -282,9 +282,7 @@ tx_schedule_commit(struct cmsg *msg)
> 	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
> 	/* Update the tx vclock to the latest written by wal. */
> 	vclock_copy(&replicaset.vclock, &batch->vclock);
> -#ifndef NDEBUG
> -skip_update:
> -#endif
> +	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
> 	tx_schedule_queue(&batch->commit);
> 	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
> }
> ====================

Good one, applied.

> 
> Talking of the injection itself - don't know really. Perhaps
> it would be better to add a delay to the wal_write_to_disk()
> function, to its very end, after wal_notify_watchers(). In
> that case relay will wake up, send whatever it wants, and TX
> won't update the vclock until you let wal_write_to_disk()
> finish. Seems more natural this way.

I tried to add a sleep first. It’s impossible to sleep in tx_schedule_commit(),
since it’s processed in tx_prio endpoint, where yielding is impossible.
I also tried to add a sleep at the end of wal_write_to_disk(), just like you
suggest. This didn’t work out either. I’ll give you more details in the evening,
when I give it another try. I’ll send a follow-up if I succeed with adding a sleep.

> 
>> 	tx_schedule_queue(&batch->commit);
>> 	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
>> }
>> diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
>> new file mode 100644
>> index 000000000..7dc2f7118
>> --- /dev/null
>> +++ b/test/replication/gh-4739-vclock-assert.result
>> @@ -0,0 +1,82 @@
>> +-- test-run result file version 2
>> +env = require('test_run')
>> + | ---
>> + | ...
>> +test_run = env.new()
>> + | ---
>> + | ...
>> +
>> +SERVERS = {'rebootstrap1', 'rebootstrap2'}
>> + | ---
>> + | ...
>> +test_run:create_cluster(SERVERS, "replication")
>> + | ---
>> + | ...
>> +test_run:wait_fullmesh(SERVERS)
>> + | ---
>> + | ...
>> +
>> +test_run:cmd('switch rebootstrap1')
>> + | ---
>> + | - true
>> + | ...
>> +fiber = require('fiber')
>> + | ---
>> + | ...
>> +-- Stop updating replicaset vclock to simulate a situation, when
>> +-- a row is already relayed to the remote master, but the local
>> +-- vclock update hasn't happened yet.
>> +box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
>> + | ---
>> + | - ok
>> + | ...
>> +lsn = box.info.lsn
>> + | ---
>> + | ...
>> +box.space._schema:replace{'something'}
>> + | ---
>> + | - ['something']
>> + | ...
>> +-- Vclock isn't updated.
>> +box.info.lsn == lsn
>> + | ---
>> + | - true
>> + | ...
>> +
>> +-- Wait until the remote instance gets the row.
>> +while test_run:get_vclock('rebootstrap2')[box.info.id] == lsn do\
>> +    fiber.sleep(0.01)\
>> +end
> 
> 3. There is a cool thing which I discovered relatively recently:
> test_run:wait_cond(). It does fiber sleep and while cycle, and
> has a finite timeout, so such a test won't hang for 10 minutes
> in Travis in case of a problem.

Thanks!

> 
>> + | ---
>> + | ...
>> +
>> +-- Restart the remote instance. This will make the first instance
>> +-- resubscribe without entering orphan mode.
>> +test_run:cmd('restart server rebootstrap2')
>> + | ---
>> + | - true
>> + | ...
>> +test_run:cmd('switch rebootstrap1')
>> + | ---
>> + | - true
>> + | ...
>> +-- Wait until resubscribe is sent
>> +fiber.sleep(2 * box.cfg.replication_timeout)
> 
> 4. Don't we collect any statistics on replication requests, just
> like we do in box.stat()? Perhaps box.stat.net() can help? To
> wait properly. Maybe just do test_run:wait_cond() for status 'sync'?

wait_cond for ’sync’ is enough. Applied.

> 
>> + | ---
>> + | ...
>> +box.info.replication[2].upstream.status
>> + | ---
>> + | - sync
>> + | ...
>> +
>> +box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
>> + | ---
>> + | - ok
>> + | ...
>> +test_run:cmd('switch default')
>> + | ---
>> + | - true
>> + | ...
>> +test_run:drop_cluster(SERVERS)
>> + | ---
>> + | …

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 73ffc0d68..78f3d8a73 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -870,7 +870,7 @@ applier_subscribe(struct applier *applier)
 	 * Stop accepting local rows coming from a remote
 	 * instance as soon as local WAL starts accepting writes.
 	 */
-	unsigned int id_filter = box_is_orphan() ? 0 : 1 << instance_id;
+	uint32_t id_filter = box_is_orphan() ? 0 : 1 << instance_id;
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
 				 &vclock, replication_anon, id_filter);
 	coio_write_xrow(coio, &row);
diff --git a/src/box/wal.c b/src/box/wal.c
index 35ba7b072..bf127b259 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -282,9 +282,7 @@ tx_schedule_commit(struct cmsg *msg)
 	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
 	/* Update the tx vclock to the latest written by wal. */
 	vclock_copy(&replicaset.vclock, &batch->vclock);
-#ifndef NDEBUG
-skip_update:
-#endif
+	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
 	tx_schedule_queue(&batch->commit);
 	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
 }
diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
index 7dc2f7118..a612826a0 100644
--- a/test/replication/gh-4739-vclock-assert.result
+++ b/test/replication/gh-4739-vclock-assert.result
@@ -44,10 +44,11 @@ box.info.lsn == lsn
  | ...
 
 -- Wait until the remote instance gets the row.
-while test_run:get_vclock('rebootstrap2')[box.info.id] == lsn do\
-    fiber.sleep(0.01)\
-end
+test_run:wait_cond(function()\
+    return test_run:get_vclock('rebootstrap2')[box.info.id] > lsn\
+end, 10)
  | ---
+ | - true
  | ...
 
 -- Restart the remote instance. This will make the first instance
@@ -61,14 +62,12 @@ test_run:cmd('switch rebootstrap1')
  | - true
  | ...
 -- Wait until resubscribe is sent
-fiber.sleep(2 * box.cfg.replication_timeout)
- | ---
- | ...
-box.info.replication[2].upstream.status
+test_run:wait_cond(function()\
+    return box.info.replication[2].upstream.status == 'sync'\
+end, 10)
  | ---
- | - sync
+ | - true
  | ...
-
 box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
  | ---
  | - ok
diff --git a/test/replication/gh-4739-vclock-assert.test.lua b/test/replication/gh-4739-vclock-assert.test.lua
index 26dc781e2..b6a7caf3b 100644
--- a/test/replication/gh-4739-vclock-assert.test.lua
+++ b/test/replication/gh-4739-vclock-assert.test.lua
@@ -17,18 +17,18 @@ box.space._schema:replace{'something'}
 box.info.lsn == lsn
 
 -- Wait until the remote instance gets the row.
-while test_run:get_vclock('rebootstrap2')[box.info.id] == lsn do\
-    fiber.sleep(0.01)\
-end
+test_run:wait_cond(function()\
+    return test_run:get_vclock('rebootstrap2')[box.info.id] > lsn\
+end, 10)
 
 -- Restart the remote instance. This will make the first instance
 -- resubscribe without entering orphan mode.
 test_run:cmd('restart server rebootstrap2')
 test_run:cmd('switch rebootstrap1')
 -- Wait until resubscribe is sent
-fiber.sleep(2 * box.cfg.replication_timeout)
-box.info.replication[2].upstream.status
-
+test_run:wait_cond(function()\
+    return box.info.replication[2].upstream.status == 'sync'\
+end, 10)
 box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
 test_run:cmd('switch default')
 test_run:drop_cluster(SERVERS)

--
Serge Petrenko
sergepetrenko at tarantool.org


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20200227/095ba3cc/attachment.html>


More information about the Tarantool-patches mailing list