> 27 февр. 2020 г., в 02:54, Vladislav Shpilevoy написал(а): > > Thanks for the patch! > Hi! Thanks for the review! Please find my comments and the new diff below. > See 4 comments below. > >> replication: do not relay rows coming from a remote instance back to it >> >> We have a mechanism for restoring rows originating from an instance that >> suffered a sudden power loss: remote masters resend the isntance's rows >> received before a certain point in time, defined by remote master vclock >> at the moment of subscribe. >> However, this is useful only on initial replication configuraiton, when >> an instance has just recovered, so that it can receive what it has >> relayed but haven't synced to disk. >> In other cases, when an instance is operating normally and master-master >> replication is configured, the mechanism described above may lead to >> instance re-applying instance's own rows, coming from a master it has just >> subscribed to. >> To fix the problem do not relay rows coming from a remote instance, if >> the instance has already recovered. >> >> Closes #4739 >> >> diff --git a/src/box/applier.cc b/src/box/applier.cc >> index 911353425..73ffc0d68 100644 >> --- a/src/box/applier.cc >> +++ b/src/box/applier.cc >> @@ -866,8 +866,13 @@ applier_subscribe(struct applier *applier) >> struct vclock vclock; >> vclock_create(&vclock); >> vclock_copy(&vclock, &replicaset.vclock); >> + /* >> + * Stop accepting local rows coming from a remote >> + * instance as soon as local WAL starts accepting writes. >> + */ >> + unsigned int id_filter = box_is_orphan() ? 0 : 1 << instance_id; > > 1. I was always wondering, what if the instance got orphaned after it > started accepting writes? WAL is fully functional, it syncs whatever is > needed, and then a resubscribe happens. Can this break anything? > >> xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, >> - &vclock, replication_anon, 0); >> + &vclock, replication_anon, id_filter); >> coio_write_xrow(coio, &row); >> >> /* Read SUBSCRIBE response */ >> diff --git a/src/box/wal.c b/src/box/wal.c >> index 27bff662a..35ba7b072 100644 >> --- a/src/box/wal.c >> +++ b/src/box/wal.c >> @@ -278,8 +278,13 @@ tx_schedule_commit(struct cmsg *msg) >> /* Closes the input valve. */ >> stailq_concat(&writer->rollback, &batch->rollback); >> } >> + >> + ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; }); >> /* Update the tx vclock to the latest written by wal. */ >> vclock_copy(&replicaset.vclock, &batch->vclock); >> +#ifndef NDEBUG >> +skip_update: >> +#endif > > 2. Consider this hack which I just invented. In that way you won't > depend on ERRINJ and NDEBUG interconnection. > > ==================== > @@ -282,9 +282,7 @@ tx_schedule_commit(struct cmsg *msg) > ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; }); > /* Update the tx vclock to the latest written by wal. */ > vclock_copy(&replicaset.vclock, &batch->vclock); > -#ifndef NDEBUG > -skip_update: > -#endif > + ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;}); > tx_schedule_queue(&batch->commit); > mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base)); > } > ==================== Good one, applied. > > Talking of the injection itself - don't know really. Perhaps > it would be better to add a delay to the wal_write_to_disk() > function, to its very end, after wal_notify_watchers(). In > that case relay will wake up, send whatever it wants, and TX > won't update the vclock until you let wal_write_to_disk() > finish. Seems more natural this way. I tried to add a sleep first. It’s impossible to sleep in tx_schedule_commit(), since it’s processed in tx_prio endpoint, where yielding is impossible. I also tried to add a sleep at the end of wal_write_to_disk(), just like you suggest. This didn’t work out either. I’ll give you more details in the evening, when I give it another try. I’ll send a follow-up if I succeed with adding a sleep. > >> tx_schedule_queue(&batch->commit); >> mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base)); >> } >> diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result >> new file mode 100644 >> index 000000000..7dc2f7118 >> --- /dev/null >> +++ b/test/replication/gh-4739-vclock-assert.result >> @@ -0,0 +1,82 @@ >> +-- test-run result file version 2 >> +env = require('test_run') >> + | --- >> + | ... >> +test_run = env.new() >> + | --- >> + | ... >> + >> +SERVERS = {'rebootstrap1', 'rebootstrap2'} >> + | --- >> + | ... >> +test_run:create_cluster(SERVERS, "replication") >> + | --- >> + | ... >> +test_run:wait_fullmesh(SERVERS) >> + | --- >> + | ... >> + >> +test_run:cmd('switch rebootstrap1') >> + | --- >> + | - true >> + | ... >> +fiber = require('fiber') >> + | --- >> + | ... >> +-- Stop updating replicaset vclock to simulate a situation, when >> +-- a row is already relayed to the remote master, but the local >> +-- vclock update hasn't happened yet. >> +box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true) >> + | --- >> + | - ok >> + | ... >> +lsn = box.info.lsn >> + | --- >> + | ... >> +box.space._schema:replace{'something'} >> + | --- >> + | - ['something'] >> + | ... >> +-- Vclock isn't updated. >> +box.info.lsn == lsn >> + | --- >> + | - true >> + | ... >> + >> +-- Wait until the remote instance gets the row. >> +while test_run:get_vclock('rebootstrap2')[box.info.id] == lsn do\ >> + fiber.sleep(0.01)\ >> +end > > 3. There is a cool thing which I discovered relatively recently: > test_run:wait_cond(). It does fiber sleep and while cycle, and > has a finite timeout, so such a test won't hang for 10 minutes > in Travis in case of a problem. Thanks! > >> + | --- >> + | ... >> + >> +-- Restart the remote instance. This will make the first instance >> +-- resubscribe without entering orphan mode. >> +test_run:cmd('restart server rebootstrap2') >> + | --- >> + | - true >> + | ... >> +test_run:cmd('switch rebootstrap1') >> + | --- >> + | - true >> + | ... >> +-- Wait until resubscribe is sent >> +fiber.sleep(2 * box.cfg.replication_timeout) > > 4. Don't we collect any statistics on replication requests, just > like we do in box.stat()? Perhaps box.stat.net() can help? To > wait properly. Maybe just do test_run:wait_cond() for status 'sync'? wait_cond for ’sync’ is enough. Applied. > >> + | --- >> + | ... >> +box.info.replication[2].upstream.status >> + | --- >> + | - sync >> + | ... >> + >> +box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false) >> + | --- >> + | - ok >> + | ... >> +test_run:cmd('switch default') >> + | --- >> + | - true >> + | ... >> +test_run:drop_cluster(SERVERS) >> + | --- >> + | … diff --git a/src/box/applier.cc b/src/box/applier.cc index 73ffc0d68..78f3d8a73 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -870,7 +870,7 @@ applier_subscribe(struct applier *applier) * Stop accepting local rows coming from a remote * instance as soon as local WAL starts accepting writes. */ - unsigned int id_filter = box_is_orphan() ? 0 : 1 << instance_id; + uint32_t id_filter = box_is_orphan() ? 0 : 1 << instance_id; xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, &vclock, replication_anon, id_filter); coio_write_xrow(coio, &row); diff --git a/src/box/wal.c b/src/box/wal.c index 35ba7b072..bf127b259 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -282,9 +282,7 @@ tx_schedule_commit(struct cmsg *msg) ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; }); /* Update the tx vclock to the latest written by wal. */ vclock_copy(&replicaset.vclock, &batch->vclock); -#ifndef NDEBUG -skip_update: -#endif + ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;}); tx_schedule_queue(&batch->commit); mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base)); } diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result index 7dc2f7118..a612826a0 100644 --- a/test/replication/gh-4739-vclock-assert.result +++ b/test/replication/gh-4739-vclock-assert.result @@ -44,10 +44,11 @@ box.info.lsn == lsn | ... -- Wait until the remote instance gets the row. -while test_run:get_vclock('rebootstrap2')[box.info.id] == lsn do\ - fiber.sleep(0.01)\ -end +test_run:wait_cond(function()\ + return test_run:get_vclock('rebootstrap2')[box.info.id] > lsn\ +end, 10) | --- + | - true | ... -- Restart the remote instance. This will make the first instance @@ -61,14 +62,12 @@ test_run:cmd('switch rebootstrap1') | - true | ... -- Wait until resubscribe is sent -fiber.sleep(2 * box.cfg.replication_timeout) - | --- - | ... -box.info.replication[2].upstream.status +test_run:wait_cond(function()\ + return box.info.replication[2].upstream.status == 'sync'\ +end, 10) | --- - | - sync + | - true | ... - box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false) | --- | - ok diff --git a/test/replication/gh-4739-vclock-assert.test.lua b/test/replication/gh-4739-vclock-assert.test.lua index 26dc781e2..b6a7caf3b 100644 --- a/test/replication/gh-4739-vclock-assert.test.lua +++ b/test/replication/gh-4739-vclock-assert.test.lua @@ -17,18 +17,18 @@ box.space._schema:replace{'something'} box.info.lsn == lsn -- Wait until the remote instance gets the row. -while test_run:get_vclock('rebootstrap2')[box.info.id] == lsn do\ - fiber.sleep(0.01)\ -end +test_run:wait_cond(function()\ + return test_run:get_vclock('rebootstrap2')[box.info.id] > lsn\ +end, 10) -- Restart the remote instance. This will make the first instance -- resubscribe without entering orphan mode. test_run:cmd('restart server rebootstrap2') test_run:cmd('switch rebootstrap1') -- Wait until resubscribe is sent -fiber.sleep(2 * box.cfg.replication_timeout) -box.info.replication[2].upstream.status - +test_run:wait_cond(function()\ + return box.info.replication[2].upstream.status == 'sync'\ +end, 10) box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false) test_run:cmd('switch default') test_run:drop_cluster(SERVERS) -- Serge Petrenko sergepetrenko@tarantool.org