On Monday, January 28, 2019 2:59:22 PM MSK Vladimir Davydov wrote: > On Tue, Jan 22, 2019 at 01:31:10PM +0300, Georgy Kirichenko wrote: > > Journal maintains replicaset vclock for recovery, local and replicated > > operations. Introduce replicaset.applier.vclock to prevent applier > > races. > > What kind of races? > > > Prerequisite #980 > > --- > > > > src/box/applier.cc | 68 +++++++++++++++++++++--------------- > > src/box/box.cc | 3 +- > > src/box/replication.cc | 1 + > > src/box/replication.h | 3 ++ > > src/box/vclock.c | 14 ++++++++ > > src/box/vclock.h | 3 ++ > > src/box/wal.c | 38 ++++---------------- > > test/xlog-py/dup_key.result | 12 +++++-- > > test/xlog-py/dup_key.test.py | 18 +++++++--- > > 9 files changed, 93 insertions(+), 67 deletions(-) > > > > diff --git a/src/box/applier.cc b/src/box/applier.cc > > index 21d2e6bcb..87873e970 100644 > > --- a/src/box/applier.cc > > +++ b/src/box/applier.cc > > @@ -139,7 +139,7 @@ applier_writer_f(va_list ap) > > > > continue; > > > > try { > > > > struct xrow_header xrow; > > > > - xrow_encode_vclock(&xrow, &replicaset.vclock); > > + xrow_encode_vclock(&xrow, &replicaset.applier.vclock); > > > > coio_write_xrow(&io, &xrow); > > > > } catch (SocketError *e) { > > > > /* > > > > @@ -300,7 +300,7 @@ applier_join(struct applier *applier) > > > > * Used to initialize the replica's initial > > * vclock in bootstrap_from_master() > > */ > > > > - xrow_decode_vclock_xc(&row, &replicaset.vclock); > > + xrow_decode_vclock_xc(&row, &replicaset.applier.vclock); > > > > } > > > > applier_set_state(applier, APPLIER_INITIAL_JOIN); > > > > @@ -326,7 +326,8 @@ applier_join(struct applier *applier) > > > > * vclock yet, do it now. In 1.7+ > > * this vclock is not used. > > */ > > > > - xrow_decode_vclock_xc(&row, &replicaset.vclock); > > + xrow_decode_vclock_xc(&row, > > + &replicaset.applier.vclock); > > > > } > > break; /* end of stream */ > > > > } else if (iproto_type_is_error(row.type)) { > > > > @@ -336,6 +337,7 @@ applier_join(struct applier *applier) > > > > (uint32_t) row.type); > > > > } > > > > } > > > > + vclock_copy(&replicaset.vclock, &replicaset.applier.vclock); > > > > say_info("initial data received"); > > > > applier_set_state(applier, APPLIER_FINAL_JOIN); > > > > @@ -355,7 +357,7 @@ applier_join(struct applier *applier) > > > > coio_read_xrow(coio, ibuf, &row); > > applier->last_row_time = ev_monotonic_now(loop()); > > if (iproto_type_is_dml(row.type)) { > > > > - vclock_follow_xrow(&replicaset.vclock, &row); > > + vclock_follow_xrow(&replicaset.applier.vclock, &row); > > > > xstream_write_xc(applier->subscribe_stream, &row); > > if (++row_count % 100000 == 0) > > > > say_info("%.1fM rows received", row_count / 1e6); > > > > @@ -386,6 +388,9 @@ applier_subscribe(struct applier *applier) > > > > { > > > > assert(applier->subscribe_stream != NULL); > > > > + if (!vclock_is_set(&replicaset.applier.vclock)) > > + vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); > > + > > > > /* Send SUBSCRIBE request */ > > struct ev_io *coio = &applier->io; > > struct ibuf *ibuf = &applier->ibuf; > > > > @@ -470,19 +475,6 @@ applier_subscribe(struct applier *applier) > > > > applier_set_state(applier, APPLIER_FOLLOW); > > > > } > > > > - /* > > - * Stay 'orphan' until appliers catch up with > > - * the remote vclock at the time of SUBSCRIBE > > - * and the lag is less than configured. > > - */ > > - if (applier->state == APPLIER_SYNC && > > - applier->lag <= replication_sync_lag && > > - vclock_compare(&remote_vclock_at_subscribe, > > - &replicaset.vclock) <= 0) { > > - /* Applier is synced, switch to "follow". */ > > - applier_set_state(applier, APPLIER_FOLLOW); > > - } > > - > > Hmm, why move this chunk? > > > /* > > > > * Tarantool < 1.7.7 does not send periodic heartbeat > > * messages so we can't assume that if we haven't heard > > > > @@ -512,16 +504,18 @@ applier_subscribe(struct applier *applier) > > > > applier->lag = ev_now(loop()) - row.tm; > > applier->last_row_time = ev_monotonic_now(loop()); > > > > - > > - if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { > > - /** > > - * Promote the replica set vclock before > > - * applying the row. If there is an > > - * exception (conflict) applying the row, > > - * the row is skipped when the replication > > - * is resumed. > > - */ > > - vclock_follow_xrow(&replicaset.vclock, &row); > > + if (vclock_get(&replicaset.applier.vclock, > > + row.replica_id) < row.lsn) { > > + if (row.replica_id == instance_id && > > + vclock_get(&replicaset.vclock, instance_id) < > > + row.lsn) { > > + /* Local row returned back. */ > > + goto done; > > How can it happen? Why should we handle it? Can you please elaborate the > comment? > > > + } > > + /* Preserve old lsn value. */ > > + int64_t old_lsn = vclock_get(&replicaset.applier.vclock, > > + row.replica_id); > > + vclock_follow_xrow(&replicaset.applier.vclock, &row); > > So, AFAICS you want to promote replicaset.vclock after applying a row > received from a remote replica, and in order to avoid applying the same > row twice, you add replicaset.applier.vclock, is that correct? > > However, in the next patch you wrap everything in a latch, rendering > this race impossible (BTW this is what Serge P. did too). So what's the > point in introducing another vclock here? If it is needed solely for > sync replication, then IMO we'd better do it in the scope of the > corresponding patch set, because currently it seems useless. You are right but it is a preparation part, I hope I will be able to unlock latch just before commit to allow parallel applier. I think one of patches 2 or 3 could be eliminated right now. Lets talk about it. > > > struct replica *replica = replica_by_id(row.replica_id); > > > > struct latch *latch = (replica ? &replica->order_latch : > > &replicaset.applier.order_latch); > > > > @@ -550,10 +544,28 @@ applier_subscribe(struct applier *applier) > > > > box_error_code(e) == ER_TUPLE_FOUND && > > replication_skip_conflict) > > > > diag_clear(diag_get()); > > > > - else > > + else { > > + /* Rollback lsn to have a chance for a retry. */ > > + vclock_set(&replicaset.applier.vclock, > > + row.replica_id, old_lsn); > > > > diag_raise(); > > > > + } > > > > } > > > > } > > > > +done: > > + /* > > + * Stay 'orphan' until appliers catch up with > > + * the remote vclock at the time of SUBSCRIBE > > + * and the lag is less than configured. > > + */ > > + if (applier->state == APPLIER_SYNC && > > + applier->lag <= replication_sync_lag && > > + vclock_compare(&remote_vclock_at_subscribe, > > + &replicaset.vclock) <= 0) { > > + /* Applier is synced, switch to "follow". */ > > + applier_set_state(applier, APPLIER_FOLLOW); > > + } > > + > > > > if (applier->state == APPLIER_SYNC || > > > > applier->state == APPLIER_FOLLOW) > > > > fiber_cond_signal(&applier->writer_cond); > > > > diff --git a/src/box/box.cc b/src/box/box.cc > > index 9f2fd6da1..0df0875dd 100644 > > --- a/src/box/box.cc > > +++ b/src/box/box.cc > > @@ -292,6 +292,7 @@ recovery_journal_write(struct journal *base, > > > > struct journal_entry * /* entry */) > > > > { > > > > struct recovery_journal *journal = (struct recovery_journal *) base; > > > > + vclock_copy(&replicaset.vclock, journal->vclock); > > > > return vclock_sum(journal->vclock); > > > > } > > > > @@ -1809,7 +1810,7 @@ bootstrap_from_master(struct replica *master) > > > > */ > > > > engine_begin_final_recovery_xc(); > > struct recovery_journal journal; > > > > - recovery_journal_create(&journal, &replicaset.vclock); > > + recovery_journal_create(&journal, &replicaset.applier.vclock); > > I fail to understand how this works. If we bump applier.vclock during > remote bootstrap, then where is replicaset.vclock set? > > > journal_set(&journal.base); > > > > applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY); > > > > diff --git a/src/box/replication.cc b/src/box/replication.cc > > index 2cb4ec0f8..51e08886c 100644 > > --- a/src/box/replication.cc > > +++ b/src/box/replication.cc > > @@ -90,6 +90,7 @@ replication_init(void) > > > > fiber_cond_create(&replicaset.applier.cond); > > replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, > > sizeof(struct replica *)); > > latch_create(&replicaset.applier.order_latch); > > > > + vclock_create(&replicaset.applier.vclock); > > > > } > > > > void > > > > diff --git a/src/box/replication.h b/src/box/replication.h > > index 2ac620d86..b9aebed14 100644 > > --- a/src/box/replication.h > > +++ b/src/box/replication.h > > @@ -194,6 +194,9 @@ struct replicaset { > > > > struct vclock vclock; > > /** Applier state. */ > > struct { > > > > + /** > > + * Vclock sent to process from appliers. */ > > + struct vclock vclock; > > > > /** > > > > * Total number of replicas with attached > > * appliers. > > > > diff --git a/src/box/vclock.c b/src/box/vclock.c > > index b5eb2800b..c297d1ff9 100644 > > --- a/src/box/vclock.c > > +++ b/src/box/vclock.c > > @@ -36,6 +36,20 @@ > > > > #include "diag.h" > > > > +void > > +vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn) > > +{ > > + assert(lsn >= 0); > > + assert(replica_id < VCLOCK_MAX); > > + int64_t prev_lsn = vclock->lsn[replica_id]; > > + if (lsn > 0) > > + vclock->map |= 1 << replica_id; > > + else > > + vclock->map &= ~(1 << replica_id); > > + vclock->lsn[replica_id] = lsn; > > + vclock->signature += lsn - prev_lsn; > > +} > > + > > > > int64_t > > vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn) > > { > > > > diff --git a/src/box/vclock.h b/src/box/vclock.h > > index 111e29160..d6cb14c2a 100644 > > --- a/src/box/vclock.h > > +++ b/src/box/vclock.h > > @@ -161,6 +161,9 @@ vclock_get(const struct vclock *vclock, uint32_t > > replica_id)> > > return vclock->lsn[replica_id]; > > > > } > > > > +void > > +vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn); > > + > > > > static inline int64_t > > vclock_inc(struct vclock *vclock, uint32_t replica_id) > > { > > > > diff --git a/src/box/wal.c b/src/box/wal.c > > index a55b544aa..4c3537672 100644 > > --- a/src/box/wal.c > > +++ b/src/box/wal.c > > @@ -171,6 +171,8 @@ struct wal_msg { > > > > * be rolled back. > > */ > > > > struct stailq rollback; > > > > + /** vclock after the batch processed. */ > > + struct vclock vclock; > > > > }; > > > > /** > > > > @@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch) > > > > batch->approx_len = 0; > > stailq_create(&batch->commit); > > stailq_create(&batch->rollback); > > > > + vclock_create(&batch->vclock); > > > > } > > > > static struct wal_msg * > > > > @@ -280,6 +283,7 @@ tx_schedule_commit(struct cmsg *msg) > > > > * wal_msg memory disappears after the first > > * iteration of tx_schedule_queue loop. > > */ > > > > + vclock_copy(&replicaset.vclock, &batch->vclock); > > > > if (! stailq_empty(&batch->rollback)) { > > > > /* Closes the input valve. */ > > stailq_concat(&writer->rollback, &batch->rollback); > > > > @@ -1028,6 +1032,8 @@ done: > > error_log(error); > > diag_clear(diag_get()); > > > > } > > > > + /* Set resulting vclock. */ > > + vclock_copy(&wal_msg->vclock, &writer->vclock); > > > > /* > > > > * We need to start rollback from the first request > > * following the last committed request. If > > > > @@ -1159,31 +1165,6 @@ wal_write(struct journal *journal, struct > > journal_entry *entry)> > > bool cancellable = fiber_set_cancellable(false); > > fiber_yield(); /* Request was inserted. */ > > fiber_set_cancellable(cancellable); > > > > - if (entry->res > 0) { > > - struct xrow_header **last = entry->rows + entry->n_rows - 1; > > - while (last >= entry->rows) { > > - /* > > - * Find last row from local instance id > > - * and promote vclock. > > - */ > > - if ((*last)->replica_id == instance_id) { > > - /* > > - * In master-master configuration, during sudden > > - * power-loss, if the data have not been written > > - * to WAL but have already been sent to others, > > - * they will send the data back. In this case > > - * vclock has already been promoted by applier. > > - */ > > - if (vclock_get(&replicaset.vclock, > > - instance_id) < (*last)->lsn) { > > - vclock_follow_xrow(&replicaset.vclock, > > - *last); > > - } > > - break; > > - } > > - --last; > > - } > > - } > > > > return entry->res; > > > > } > > > > @@ -1193,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal, > > > > { > > > > struct wal_writer *writer = (struct wal_writer *) journal; > > wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + > > entry->n_rows); > > > > - int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id); > > - int64_t new_lsn = vclock_get(&writer->vclock, instance_id); > > - if (new_lsn > old_lsn) { > > - /* There were local writes, promote vclock. */ > > - vclock_follow(&replicaset.vclock, instance_id, new_lsn); > > - } > > + vclock_copy(&replicaset.vclock, &writer->vclock); > > > > return vclock_sum(&writer->vclock); > > > > } > > > > diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result > > index f387e8e89..966fa1f4a 100644 > > --- a/test/xlog-py/dup_key.result > > +++ b/test/xlog-py/dup_key.result > > @@ -16,7 +16,16 @@ box.space.test:insert{2, 'second tuple'} > > > > --- > > - [2, 'second tuple'] > > ... > > > > -.xlog exists > > +.xlog#1 exists > > +box.space.test:insert{3, 'third tuple'} > > +--- > > +- [3, 'third tuple'] > > +... > > +box.space.test:insert{4, 'fourth tuple'} > > +--- > > +- [4, 'fourth tuple'] > > +... > > +.xlog#2 exists > > > > box.space.test:insert{1, 'third tuple'} > > --- > > - [1, 'third tuple'] > > What happened to this test? > > > @@ -25,7 +34,6 @@ box.space.test:insert{2, 'fourth tuple'} > > > > --- > > - [2, 'fourth tuple'] > > ... > > > > -.xlog does not exist > > > > check log line for 'Duplicate key' > > > > 'Duplicate key' exists in server log > > > > diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py > > index 1c033da40..3dacde771 100644 > > --- a/test/xlog-py/dup_key.test.py > > +++ b/test/xlog-py/dup_key.test.py > > @@ -26,19 +26,27 @@ server.stop() > > > > # Save wal#1 > > > > if os.access(wal, os.F_OK): > > - print ".xlog exists" > > + print ".xlog#1 exists" > > > > os.rename(wal, wal_old) > > > > -# Write wal#2 > > +# Write wal#2 to bump lsn > > +server.start() > > +server.admin("box.space.test:insert{3, 'third tuple'}") > > +server.admin("box.space.test:insert{4, 'fourth tuple'}") > > +server.stop() > > + > > +if os.access(wal, os.F_OK): > > + print ".xlog#2 exists" > > + > > +# Write wal#3 - confliction with wal#1 > > > > server.start() > > server.admin("box.space.test:insert{1, 'third tuple'}") > > server.admin("box.space.test:insert{2, 'fourth tuple'}") > > server.stop() > > > > # Restore wal#1 > > > > -if not os.access(wal, os.F_OK): > > - print ".xlog does not exist" > > - os.rename(wal_old, wal) > > +os.unlink(wal) > > +os.rename(wal_old, wal)