27 февр. 2020 г., в 02:54, Vladislav Shpilevoy <v.shpilevoy@tarantool.org> написал(а):

Thanks for the patch!


Hi! Thanks for the review!

Please find my comments and the new diff below.

See 4 comments below.

   replication: do not relay rows coming from a remote instance back to it

   We have a mechanism for restoring rows originating from an instance that
   suffered a sudden power loss: remote masters resend the isntance's rows
   received before a certain point in time, defined by remote master vclock
   at the moment of subscribe.
   However, this is useful only on initial replication configuraiton, when
   an instance has just recovered, so that it can receive what it has
   relayed but haven't synced to disk.
   In other cases, when an instance is operating normally and master-master
   replication is configured, the mechanism described above may lead to
   instance re-applying instance's own rows, coming from a master it has just
   subscribed to.
   To fix the problem do not relay rows coming from a remote instance, if
   the instance has already recovered.

   Closes #4739

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 911353425..73ffc0d68 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -866,8 +866,13 @@ applier_subscribe(struct applier *applier)
struct vclock vclock;
vclock_create(&vclock);
vclock_copy(&vclock, &replicaset.vclock);
+ /*
+ * Stop accepting local rows coming from a remote
+ * instance as soon as local WAL starts accepting writes.
+ */
+ unsigned int id_filter = box_is_orphan() ? 0 : 1 << instance_id;

1. I was always wondering, what if the instance got orphaned after it
started accepting writes? WAL is fully functional, it syncs whatever is
needed, and then a resubscribe happens. Can this break anything?

xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
- &vclock, replication_anon, 0);
+ &vclock, replication_anon, id_filter);
coio_write_xrow(coio, &row);

/* Read SUBSCRIBE response */
diff --git a/src/box/wal.c b/src/box/wal.c
index 27bff662a..35ba7b072 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -278,8 +278,13 @@ tx_schedule_commit(struct cmsg *msg)
/* Closes the input valve. */
stailq_concat(&writer->rollback, &batch->rollback);
}
+
+ ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
/* Update the tx vclock to the latest written by wal. */
vclock_copy(&replicaset.vclock, &batch->vclock);
+#ifndef NDEBUG
+skip_update:
+#endif

2. Consider this hack which I just invented. In that way you won't
depend on ERRINJ and NDEBUG interconnection.

====================
@@ -282,9 +282,7 @@ tx_schedule_commit(struct cmsg *msg)
ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
/* Update the tx vclock to the latest written by wal. */
vclock_copy(&replicaset.vclock, &batch->vclock);
-#ifndef NDEBUG
-skip_update:
-#endif
+ ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
tx_schedule_queue(&batch->commit);
mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
}
====================

Good one, applied.


Talking of the injection itself - don't know really. Perhaps
it would be better to add a delay to the wal_write_to_disk()
function, to its very end, after wal_notify_watchers(). In
that case relay will wake up, send whatever it wants, and TX
won't update the vclock until you let wal_write_to_disk()
finish. Seems more natural this way.

I tried to add a sleep first. It’s impossible to sleep in tx_schedule_commit(),
since it’s processed in tx_prio endpoint, where yielding is impossible.
I also tried to add a sleep at the end of wal_write_to_disk(), just like you
suggest. This didn’t work out either. I’ll give you more details in the evening,
when I give it another try. I’ll send a follow-up if I succeed with adding a sleep.


tx_schedule_queue(&batch->commit);
mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
}
diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
new file mode 100644
index 000000000..7dc2f7118
--- /dev/null
+++ b/test/replication/gh-4739-vclock-assert.result
@@ -0,0 +1,82 @@
+-- test-run result file version 2
+env = require('test_run')
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+
+SERVERS = {'rebootstrap1', 'rebootstrap2'}
+ | ---
+ | ...
+test_run:create_cluster(SERVERS, "replication")
+ | ---
+ | ...
+test_run:wait_fullmesh(SERVERS)
+ | ---
+ | ...
+
+test_run:cmd('switch rebootstrap1')
+ | ---
+ | - true
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+-- Stop updating replicaset vclock to simulate a situation, when
+-- a row is already relayed to the remote master, but the local
+-- vclock update hasn't happened yet.
+box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
+ | ---
+ | - ok
+ | ...
+lsn = box.info.lsn
+ | ---
+ | ...
+box.space._schema:replace{'something'}
+ | ---
+ | - ['something']
+ | ...
+-- Vclock isn't updated.
+box.info.lsn == lsn
+ | ---
+ | - true
+ | ...
+
+-- Wait until the remote instance gets the row.
+while test_run:get_vclock('rebootstrap2')[box.info.id] == lsn do\
+    fiber.sleep(0.01)\
+end

3. There is a cool thing which I discovered relatively recently:
test_run:wait_cond(). It does fiber sleep and while cycle, and
has a finite timeout, so such a test won't hang for 10 minutes
in Travis in case of a problem.

Thanks!


+ | ---
+ | ...
+
+-- Restart the remote instance. This will make the first instance
+-- resubscribe without entering orphan mode.
+test_run:cmd('restart server rebootstrap2')
+ | ---
+ | - true
+ | ...
+test_run:cmd('switch rebootstrap1')
+ | ---
+ | - true
+ | ...
+-- Wait until resubscribe is sent
+fiber.sleep(2 * box.cfg.replication_timeout)

4. Don't we collect any statistics on replication requests, just
like we do in box.stat()? Perhaps box.stat.net() can help? To
wait properly. Maybe just do test_run:wait_cond() for status 'sync'?

wait_cond for ’sync’ is enough. Applied.


+ | ---
+ | ...
+box.info.replication[2].upstream.status
+ | ---
+ | - sync
+ | ...
+
+box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
+ | ---
+ | - ok
+ | ...
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+test_run:drop_cluster(SERVERS)
+ | ---
+ | …

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 73ffc0d68..78f3d8a73 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -870,7 +870,7 @@ applier_subscribe(struct applier *applier)
   * Stop accepting local rows coming from a remote
   * instance as soon as local WAL starts accepting writes.
   */
- unsigned int id_filter = box_is_orphan() ? 0 : 1 << instance_id;
+ uint32_t id_filter = box_is_orphan() ? 0 : 1 << instance_id;
  xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
   &vclock, replication_anon, id_filter);
  coio_write_xrow(coio, &row);
diff --git a/src/box/wal.c b/src/box/wal.c
index 35ba7b072..bf127b259 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -282,9 +282,7 @@ tx_schedule_commit(struct cmsg *msg)
  ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
  /* Update the tx vclock to the latest written by wal. */
  vclock_copy(&replicaset.vclock, &batch->vclock);
-#ifndef NDEBUG
-skip_update:
-#endif
+ ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
  tx_schedule_queue(&batch->commit);
  mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
 }
diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
index 7dc2f7118..a612826a0 100644
--- a/test/replication/gh-4739-vclock-assert.result
+++ b/test/replication/gh-4739-vclock-assert.result
@@ -44,10 +44,11 @@ box.info.lsn == lsn
  | ...
 
 -- Wait until the remote instance gets the row.
-while test_run:get_vclock('rebootstrap2')[box.info.id] == lsn do\
-    fiber.sleep(0.01)\
-end
+test_run:wait_cond(function()\
+    return test_run:get_vclock('rebootstrap2')[box.info.id] > lsn\
+end, 10)
  | ---
+ | - true
  | ...
 
 -- Restart the remote instance. This will make the first instance
@@ -61,14 +62,12 @@ test_run:cmd('switch rebootstrap1')
  | - true
  | ...
 -- Wait until resubscribe is sent
-fiber.sleep(2 * box.cfg.replication_timeout)
- | ---
- | ...
-box.info.replication[2].upstream.status
+test_run:wait_cond(function()\
+    return box.info.replication[2].upstream.status == 'sync'\
+end, 10)
  | ---
- | - sync
+ | - true
  | ...
-
 box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
  | ---
  | - ok
diff --git a/test/replication/gh-4739-vclock-assert.test.lua b/test/replication/gh-4739-vclock-assert.test.lua
index 26dc781e2..b6a7caf3b 100644
--- a/test/replication/gh-4739-vclock-assert.test.lua
+++ b/test/replication/gh-4739-vclock-assert.test.lua
@@ -17,18 +17,18 @@ box.space._schema:replace{'something'}
 box.info.lsn == lsn
 
 -- Wait until the remote instance gets the row.
-while test_run:get_vclock('rebootstrap2')[box.info.id] == lsn do\
-    fiber.sleep(0.01)\
-end
+test_run:wait_cond(function()\
+    return test_run:get_vclock('rebootstrap2')[box.info.id] > lsn\
+end, 10)
 
 -- Restart the remote instance. This will make the first instance
 -- resubscribe without entering orphan mode.
 test_run:cmd('restart server rebootstrap2')
 test_run:cmd('switch rebootstrap1')
 -- Wait until resubscribe is sent
-fiber.sleep(2 * box.cfg.replication_timeout)
-box.info.replication[2].upstream.status
-
+test_run:wait_cond(function()\
+    return box.info.replication[2].upstream.status == 'sync'\
+end, 10)
 box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
 test_run:cmd('switch default')
 test_run:drop_cluster(SERVERS)

--
Serge Petrenko
sergepetrenko@tarantool.org