From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Wed, 6 Feb 2019 17:45:58 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] [PATCH 3/3] Promote replicaset.vclock only after wal Message-ID: <20190206144558.4rjdjpanbwdmr6ml@esperanza> References: <94d6421460682117a3f06d7dc042f0581cdfa0e5.1549441084.git.georgy@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <94d6421460682117a3f06d7dc042f0581cdfa0e5.1549441084.git.georgy@tarantool.org> To: Georgy Kirichenko Cc: tarantool-patches@freelists.org List-ID: On Wed, Feb 06, 2019 at 11:29:59AM +0300, Georgy Kirichenko wrote: > Get rid if appliers' vclock_follow and promote vclock only after > wal write. Gosh, what an ascetic commit message, really. Please instead take the commit message Serge wrote when he tried to fix issue #2283. > > Closes #2283 > Prerequisite #980 > --- > src/box/applier.cc | 14 ++------ > src/box/wal.c | 38 ++++----------------- > test/replication/skip_conflict_row.test.lua | 19 +++++++++++ > test/xlog-py/dup_key.result | 12 +++++-- > test/xlog-py/dup_key.test.py | 23 ++++++++++--- > 5 files changed, 57 insertions(+), 49 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index d87b247e2..cae71ec1c 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -518,22 +518,14 @@ applier_subscribe(struct applier *applier) > /* > * In a full mesh topology, the same set > * of changes may arrive via two > - * concurrently running appliers. Thanks > - * to vclock_follow() above, the first row > - * in the set will be skipped - but the > - * remaining may execute out of order, > - * when the following xstream_write() > - * yields on WAL. Hence we need a latch to > - * strictly order all changes which belong > - * to the same server id. > + * concurrently running appliers. > + * Hence we need a latch to strictly order all > + * changes which belong to the same server id. > */ > latch_lock(latch); > if (vclock_get(&replicaset.vclock, > row.replica_id) < row.lsn) { > - vclock_follow_xrow(&replicaset.vclock, &row); > - latch_lock(latch); > int res = xstream_write(applier->subscribe_stream, &row); > - latch_unlock(latch); Yep, this should definitely be squashed in patch 2. > if (res != 0) { > struct error *e = diag_last_error(diag_get()); > /** > diff --git a/src/box/wal.c b/src/box/wal.c > index a55b544aa..4c3537672 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -171,6 +171,8 @@ struct wal_msg { > * be rolled back. > */ > struct stailq rollback; > + /** vclock after the batch processed. */ > + struct vclock vclock; > }; > > /** > @@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch) > batch->approx_len = 0; > stailq_create(&batch->commit); > stailq_create(&batch->rollback); > + vclock_create(&batch->vclock); > } > > static struct wal_msg * > @@ -280,6 +283,7 @@ tx_schedule_commit(struct cmsg *msg) > * wal_msg memory disappears after the first > * iteration of tx_schedule_queue loop. > */ > + vclock_copy(&replicaset.vclock, &batch->vclock); An unfortunate place to add this. Now it looks like the comment above relates to this vclock_copy although it doesn't. Please move it below to avoid confusion. > if (! stailq_empty(&batch->rollback)) { > /* Closes the input valve. */ > stailq_concat(&writer->rollback, &batch->rollback); > @@ -1028,6 +1032,8 @@ done: > error_log(error); > diag_clear(diag_get()); > } > + /* Set resulting vclock. */ Again, what a pointless comment... Would be more useful if you elaborated a bit: Remember the vclock of the last successfully written row so that we can update replicaset.vclock once this message gets back to tx. > + vclock_copy(&wal_msg->vclock, &writer->vclock); > /* > * We need to start rollback from the first request > * following the last committed request. If > @@ -1159,31 +1165,6 @@ wal_write(struct journal *journal, struct journal_entry *entry) > bool cancellable = fiber_set_cancellable(false); > fiber_yield(); /* Request was inserted. */ > fiber_set_cancellable(cancellable); > - if (entry->res > 0) { > - struct xrow_header **last = entry->rows + entry->n_rows - 1; > - while (last >= entry->rows) { > - /* > - * Find last row from local instance id > - * and promote vclock. > - */ > - if ((*last)->replica_id == instance_id) { > - /* > - * In master-master configuration, during sudden > - * power-loss, if the data have not been written > - * to WAL but have already been sent to others, > - * they will send the data back. In this case > - * vclock has already been promoted by applier. > - */ > - if (vclock_get(&replicaset.vclock, > - instance_id) < (*last)->lsn) { > - vclock_follow_xrow(&replicaset.vclock, > - *last); > - } > - break; > - } > - --last; > - } > - } > return entry->res; > } > > @@ -1193,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal, > { > struct wal_writer *writer = (struct wal_writer *) journal; > wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows); > - int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id); > - int64_t new_lsn = vclock_get(&writer->vclock, instance_id); > - if (new_lsn > old_lsn) { > - /* There were local writes, promote vclock. */ > - vclock_follow(&replicaset.vclock, instance_id, new_lsn); > - } > + vclock_copy(&replicaset.vclock, &writer->vclock); > return vclock_sum(&writer->vclock); > } > > diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua > index 4406ced95..c60999b9b 100644 > --- a/test/replication/skip_conflict_row.test.lua > +++ b/test/replication/skip_conflict_row.test.lua > @@ -1,5 +1,6 @@ > env = require('test_run') > test_run = env.new() > +test_run:cmd("restart server default with cleanup=1") Let's please try to rewrite the test without this. E.g. we could restart the replica and check that replication doesn't resume instead of checking vclocks. > engine = test_run:get_cfg('engine') > > box.schema.user.grant('guest', 'replication') > @@ -28,6 +29,24 @@ box.space.test:select() > test_run:cmd("switch default") > box.info.status > > +-- test that if replication_skip_conflict is off vclock > +-- is not advanced on errors. > +test_run:cmd("restart server replica") > +test_run:cmd("switch replica") > +box.cfg{replication_skip_conflict=false} It's already unset after restart. > +box.space.test:insert{3} > +box.info.vclock > +test_run:cmd("switch default") > +box.space.test:insert{3, 3} > +box.space.test:insert{4} > +box.info.vclock > +test_run:cmd("switch replica") > +box.info.vclock > +box.info.replication[1].upstream.message > +box.info.replication[1].upstream.status > +box.space.test:select() > +test_run:cmd("switch default") > + We also want to check that replication isn't restarted > -- cleanup > test_run:cmd("stop server replica") > test_run:cmd("cleanup server replica") > diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py > index 1c033da40..e25b1d477 100644 > --- a/test/xlog-py/dup_key.test.py > +++ b/test/xlog-py/dup_key.test.py > @@ -22,23 +22,36 @@ wal = os.path.join(vardir, filename) What happened to this test? (BTW I've already asked you that) Last time I checked it passed with and without this patch. > # Create wal#1 > server.admin("box.space.test:insert{1, 'first tuple'}") > server.admin("box.space.test:insert{2, 'second tuple'}") > +lsn2 = int(yaml.load(server.admin("box.info.lsn", silent=True))[0]) > server.stop() > > # Save wal#1 > if os.access(wal, os.F_OK): > - print ".xlog exists" > + print ".xlog#1 exists" > os.rename(wal, wal_old) > +# drop empty log created on shutdown > +filename2 = str(lsn2).zfill(20) + ".xlog" > +wal2 = os.path.join(vardir, filename2) > +os.unlink(wal2) > > -# Write wal#2 > +# Write wal#2 to bump lsn > +server.start() > +server.admin("box.space.test:insert{3, 'third tuple'}") > +server.admin("box.space.test:insert{4, 'fourth tuple'}") > +server.stop() > + > +if os.access(wal, os.F_OK): > + print ".xlog#2 exists" > + > +# Write wal#3 - confliction with wal#1 > server.start() > server.admin("box.space.test:insert{1, 'third tuple'}") > server.admin("box.space.test:insert{2, 'fourth tuple'}") > server.stop() > > # Restore wal#1 > -if not os.access(wal, os.F_OK): > - print ".xlog does not exist" > - os.rename(wal_old, wal) > +os.unlink(wal) > +os.rename(wal_old, wal) > > server.start() > line = 'Duplicate key'