[tarantool-patches] Re: [PATCH v2 2/5] Update replicaset vclock from wal
Георгий Кириченко
georgy at tarantool.org
Tue Jan 29 13:33:37 MSK 2019
On Monday, January 28, 2019 2:59:22 PM MSK Vladimir Davydov wrote:
> On Tue, Jan 22, 2019 at 01:31:10PM +0300, Georgy Kirichenko wrote:
> > Journal maintains replicaset vclock for recovery, local and replicated
> > operations. Introduce replicaset.applier.vclock to prevent applier
> > races.
>
> What kind of races?
>
> > Prerequisite #980
> > ---
> >
> > src/box/applier.cc | 68 +++++++++++++++++++++---------------
> > src/box/box.cc | 3 +-
> > src/box/replication.cc | 1 +
> > src/box/replication.h | 3 ++
> > src/box/vclock.c | 14 ++++++++
> > src/box/vclock.h | 3 ++
> > src/box/wal.c | 38 ++++----------------
> > test/xlog-py/dup_key.result | 12 +++++--
> > test/xlog-py/dup_key.test.py | 18 +++++++---
> > 9 files changed, 93 insertions(+), 67 deletions(-)
> >
> > diff --git a/src/box/applier.cc b/src/box/applier.cc
> > index 21d2e6bcb..87873e970 100644
> > --- a/src/box/applier.cc
> > +++ b/src/box/applier.cc
> > @@ -139,7 +139,7 @@ applier_writer_f(va_list ap)
> >
> > continue;
> >
> > try {
> >
> > struct xrow_header xrow;
> >
> > - xrow_encode_vclock(&xrow, &replicaset.vclock);
> > + xrow_encode_vclock(&xrow, &replicaset.applier.vclock);
> >
> > coio_write_xrow(&io, &xrow);
> >
> > } catch (SocketError *e) {
> >
> > /*
> >
> > @@ -300,7 +300,7 @@ applier_join(struct applier *applier)
> >
> > * Used to initialize the replica's initial
> > * vclock in bootstrap_from_master()
> > */
> >
> > - xrow_decode_vclock_xc(&row, &replicaset.vclock);
> > + xrow_decode_vclock_xc(&row, &replicaset.applier.vclock);
> >
> > }
> >
> > applier_set_state(applier, APPLIER_INITIAL_JOIN);
> >
> > @@ -326,7 +326,8 @@ applier_join(struct applier *applier)
> >
> > * vclock yet, do it now. In 1.7+
> > * this vclock is not used.
> > */
> >
> > - xrow_decode_vclock_xc(&row, &replicaset.vclock);
> > + xrow_decode_vclock_xc(&row,
> > + &replicaset.applier.vclock);
> >
> > }
> > break; /* end of stream */
> >
> > } else if (iproto_type_is_error(row.type)) {
> >
> > @@ -336,6 +337,7 @@ applier_join(struct applier *applier)
> >
> > (uint32_t) row.type);
> >
> > }
> >
> > }
> >
> > + vclock_copy(&replicaset.vclock, &replicaset.applier.vclock);
> >
> > say_info("initial data received");
> >
> > applier_set_state(applier, APPLIER_FINAL_JOIN);
> >
> > @@ -355,7 +357,7 @@ applier_join(struct applier *applier)
> >
> > coio_read_xrow(coio, ibuf, &row);
> > applier->last_row_time = ev_monotonic_now(loop());
> > if (iproto_type_is_dml(row.type)) {
> >
> > - vclock_follow_xrow(&replicaset.vclock, &row);
> > + vclock_follow_xrow(&replicaset.applier.vclock, &row);
> >
> > xstream_write_xc(applier->subscribe_stream, &row);
> > if (++row_count % 100000 == 0)
> >
> > say_info("%.1fM rows received", row_count /
1e6);
> >
> > @@ -386,6 +388,9 @@ applier_subscribe(struct applier *applier)
> >
> > {
> >
> > assert(applier->subscribe_stream != NULL);
> >
> > + if (!vclock_is_set(&replicaset.applier.vclock))
> > + vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
> > +
> >
> > /* Send SUBSCRIBE request */
> > struct ev_io *coio = &applier->io;
> > struct ibuf *ibuf = &applier->ibuf;
> >
> > @@ -470,19 +475,6 @@ applier_subscribe(struct applier *applier)
> >
> > applier_set_state(applier, APPLIER_FOLLOW);
> >
> > }
> >
> > - /*
> > - * Stay 'orphan' until appliers catch up with
> > - * the remote vclock at the time of SUBSCRIBE
> > - * and the lag is less than configured.
> > - */
> > - if (applier->state == APPLIER_SYNC &&
> > - applier->lag <= replication_sync_lag &&
> > - vclock_compare(&remote_vclock_at_subscribe,
> > - &replicaset.vclock) <= 0) {
> > - /* Applier is synced, switch to "follow". */
> > - applier_set_state(applier, APPLIER_FOLLOW);
> > - }
> > -
>
> Hmm, why move this chunk?
>
> > /*
> >
> > * Tarantool < 1.7.7 does not send periodic heartbeat
> > * messages so we can't assume that if we haven't heard
> >
> > @@ -512,16 +504,18 @@ applier_subscribe(struct applier *applier)
> >
> > applier->lag = ev_now(loop()) - row.tm;
> > applier->last_row_time = ev_monotonic_now(loop());
> >
> > -
> > - if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
> > - /**
> > - * Promote the replica set vclock before
> > - * applying the row. If there is an
> > - * exception (conflict) applying the row,
> > - * the row is skipped when the replication
> > - * is resumed.
> > - */
> > - vclock_follow_xrow(&replicaset.vclock, &row);
> > + if (vclock_get(&replicaset.applier.vclock,
> > + row.replica_id) < row.lsn) {
> > + if (row.replica_id == instance_id &&
> > + vclock_get(&replicaset.vclock, instance_id) <
> > + row.lsn) {
> > + /* Local row returned back. */
> > + goto done;
>
> How can it happen? Why should we handle it? Can you please elaborate the
> comment?
>
> > + }
> > + /* Preserve old lsn value. */
> > + int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
> > + row.replica_id);
> > + vclock_follow_xrow(&replicaset.applier.vclock, &row);
>
> So, AFAICS you want to promote replicaset.vclock after applying a row
> received from a remote replica, and in order to avoid applying the same
> row twice, you add replicaset.applier.vclock, is that correct?
>
> However, in the next patch you wrap everything in a latch, rendering
> this race impossible (BTW this is what Serge P. did too). So what's the
> point in introducing another vclock here? If it is needed solely for
> sync replication, then IMO we'd better do it in the scope of the
> corresponding patch set, because currently it seems useless.
You are right but it is a preparation part, I hope I will be able to unlock
latch just before commit to allow parallel applier.
I think one of patches 2 or 3 could be eliminated right now. Lets talk about
it.
>
> > struct replica *replica =
replica_by_id(row.replica_id);
> >
> > struct latch *latch = (replica ? &replica->order_latch
:
> > &replicaset.applier.order_latch);
> >
> > @@ -550,10 +544,28 @@ applier_subscribe(struct applier *applier)
> >
> > box_error_code(e) == ER_TUPLE_FOUND &&
> > replication_skip_conflict)
> >
> > diag_clear(diag_get());
> >
> > - else
> > + else {
> > + /* Rollback lsn to have a chance for a
retry. */
> > + vclock_set(&replicaset.applier.vclock,
> > + row.replica_id, old_lsn);
> >
> > diag_raise();
> >
> > + }
> >
> > }
> >
> > }
> >
> > +done:
> > + /*
> > + * Stay 'orphan' until appliers catch up with
> > + * the remote vclock at the time of SUBSCRIBE
> > + * and the lag is less than configured.
> > + */
> > + if (applier->state == APPLIER_SYNC &&
> > + applier->lag <= replication_sync_lag &&
> > + vclock_compare(&remote_vclock_at_subscribe,
> > + &replicaset.vclock) <= 0) {
> > + /* Applier is synced, switch to "follow". */
> > + applier_set_state(applier, APPLIER_FOLLOW);
> > + }
> > +
> >
> > if (applier->state == APPLIER_SYNC ||
> >
> > applier->state == APPLIER_FOLLOW)
> >
> > fiber_cond_signal(&applier->writer_cond);
> >
> > diff --git a/src/box/box.cc b/src/box/box.cc
> > index 9f2fd6da1..0df0875dd 100644
> > --- a/src/box/box.cc
> > +++ b/src/box/box.cc
> > @@ -292,6 +292,7 @@ recovery_journal_write(struct journal *base,
> >
> > struct journal_entry * /* entry */)
> >
> > {
> >
> > struct recovery_journal *journal = (struct recovery_journal *) base;
> >
> > + vclock_copy(&replicaset.vclock, journal->vclock);
> >
> > return vclock_sum(journal->vclock);
> >
> > }
> >
> > @@ -1809,7 +1810,7 @@ bootstrap_from_master(struct replica *master)
> >
> > */
> >
> > engine_begin_final_recovery_xc();
> > struct recovery_journal journal;
> >
> > - recovery_journal_create(&journal, &replicaset.vclock);
> > + recovery_journal_create(&journal, &replicaset.applier.vclock);
>
> I fail to understand how this works. If we bump applier.vclock during
> remote bootstrap, then where is replicaset.vclock set?
>
> > journal_set(&journal.base);
> >
> > applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
> >
> > diff --git a/src/box/replication.cc b/src/box/replication.cc
> > index 2cb4ec0f8..51e08886c 100644
> > --- a/src/box/replication.cc
> > +++ b/src/box/replication.cc
> > @@ -90,6 +90,7 @@ replication_init(void)
> >
> > fiber_cond_create(&replicaset.applier.cond);
> > replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX,
> > sizeof(struct replica *));
> > latch_create(&replicaset.applier.order_latch);
> >
> > + vclock_create(&replicaset.applier.vclock);
> >
> > }
> >
> > void
> >
> > diff --git a/src/box/replication.h b/src/box/replication.h
> > index 2ac620d86..b9aebed14 100644
> > --- a/src/box/replication.h
> > +++ b/src/box/replication.h
> > @@ -194,6 +194,9 @@ struct replicaset {
> >
> > struct vclock vclock;
> > /** Applier state. */
> > struct {
> >
> > + /**
> > + * Vclock sent to process from appliers. */
> > + struct vclock vclock;
> >
> > /**
> >
> > * Total number of replicas with attached
> > * appliers.
> >
> > diff --git a/src/box/vclock.c b/src/box/vclock.c
> > index b5eb2800b..c297d1ff9 100644
> > --- a/src/box/vclock.c
> > +++ b/src/box/vclock.c
> > @@ -36,6 +36,20 @@
> >
> > #include "diag.h"
> >
> > +void
> > +vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
> > +{
> > + assert(lsn >= 0);
> > + assert(replica_id < VCLOCK_MAX);
> > + int64_t prev_lsn = vclock->lsn[replica_id];
> > + if (lsn > 0)
> > + vclock->map |= 1 << replica_id;
> > + else
> > + vclock->map &= ~(1 << replica_id);
> > + vclock->lsn[replica_id] = lsn;
> > + vclock->signature += lsn - prev_lsn;
> > +}
> > +
> >
> > int64_t
> > vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
> > {
> >
> > diff --git a/src/box/vclock.h b/src/box/vclock.h
> > index 111e29160..d6cb14c2a 100644
> > --- a/src/box/vclock.h
> > +++ b/src/box/vclock.h
> > @@ -161,6 +161,9 @@ vclock_get(const struct vclock *vclock, uint32_t
> > replica_id)>
> > return vclock->lsn[replica_id];
> >
> > }
> >
> > +void
> > +vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn);
> > +
> >
> > static inline int64_t
> > vclock_inc(struct vclock *vclock, uint32_t replica_id)
> > {
> >
> > diff --git a/src/box/wal.c b/src/box/wal.c
> > index a55b544aa..4c3537672 100644
> > --- a/src/box/wal.c
> > +++ b/src/box/wal.c
> > @@ -171,6 +171,8 @@ struct wal_msg {
> >
> > * be rolled back.
> > */
> >
> > struct stailq rollback;
> >
> > + /** vclock after the batch processed. */
> > + struct vclock vclock;
> >
> > };
> >
> > /**
> >
> > @@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch)
> >
> > batch->approx_len = 0;
> > stailq_create(&batch->commit);
> > stailq_create(&batch->rollback);
> >
> > + vclock_create(&batch->vclock);
> >
> > }
> >
> > static struct wal_msg *
> >
> > @@ -280,6 +283,7 @@ tx_schedule_commit(struct cmsg *msg)
> >
> > * wal_msg memory disappears after the first
> > * iteration of tx_schedule_queue loop.
> > */
> >
> > + vclock_copy(&replicaset.vclock, &batch->vclock);
> >
> > if (! stailq_empty(&batch->rollback)) {
> >
> > /* Closes the input valve. */
> > stailq_concat(&writer->rollback, &batch->rollback);
> >
> > @@ -1028,6 +1032,8 @@ done:
> > error_log(error);
> > diag_clear(diag_get());
> >
> > }
> >
> > + /* Set resulting vclock. */
> > + vclock_copy(&wal_msg->vclock, &writer->vclock);
> >
> > /*
> >
> > * We need to start rollback from the first request
> > * following the last committed request. If
> >
> > @@ -1159,31 +1165,6 @@ wal_write(struct journal *journal, struct
> > journal_entry *entry)>
> > bool cancellable = fiber_set_cancellable(false);
> > fiber_yield(); /* Request was inserted. */
> > fiber_set_cancellable(cancellable);
> >
> > - if (entry->res > 0) {
> > - struct xrow_header **last = entry->rows + entry->n_rows - 1;
> > - while (last >= entry->rows) {
> > - /*
> > - * Find last row from local instance id
> > - * and promote vclock.
> > - */
> > - if ((*last)->replica_id == instance_id) {
> > - /*
> > - * In master-master configuration, during sudden
> > - * power-loss, if the data have not been written
> > - * to WAL but have already been sent to others,
> > - * they will send the data back. In this case
> > - * vclock has already been promoted by applier.
> > - */
> > - if (vclock_get(&replicaset.vclock,
> > - instance_id) < (*last)->lsn) {
> > - vclock_follow_xrow(&replicaset.vclock,
> > - *last);
> > - }
> > - break;
> > - }
> > - --last;
> > - }
> > - }
> >
> > return entry->res;
> >
> > }
> >
> > @@ -1193,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
> >
> > {
> >
> > struct wal_writer *writer = (struct wal_writer *) journal;
> > wal_assign_lsn(&writer->vclock, entry->rows, entry->rows +
> > entry->n_rows);
> >
> > - int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
> > - int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
> > - if (new_lsn > old_lsn) {
> > - /* There were local writes, promote vclock. */
> > - vclock_follow(&replicaset.vclock, instance_id, new_lsn);
> > - }
> > + vclock_copy(&replicaset.vclock, &writer->vclock);
> >
> > return vclock_sum(&writer->vclock);
> >
> > }
> >
> > diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result
> > index f387e8e89..966fa1f4a 100644
> > --- a/test/xlog-py/dup_key.result
> > +++ b/test/xlog-py/dup_key.result
> > @@ -16,7 +16,16 @@ box.space.test:insert{2, 'second tuple'}
> >
> > ---
> > - [2, 'second tuple']
> > ...
> >
> > -.xlog exists
> > +.xlog#1 exists
> > +box.space.test:insert{3, 'third tuple'}
> > +---
> > +- [3, 'third tuple']
> > +...
> > +box.space.test:insert{4, 'fourth tuple'}
> > +---
> > +- [4, 'fourth tuple']
> > +...
> > +.xlog#2 exists
> >
> > box.space.test:insert{1, 'third tuple'}
> > ---
> > - [1, 'third tuple']
>
> What happened to this test?
>
> > @@ -25,7 +34,6 @@ box.space.test:insert{2, 'fourth tuple'}
> >
> > ---
> > - [2, 'fourth tuple']
> > ...
> >
> > -.xlog does not exist
> >
> > check log line for 'Duplicate key'
> >
> > 'Duplicate key' exists in server log
> >
> > diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py
> > index 1c033da40..3dacde771 100644
> > --- a/test/xlog-py/dup_key.test.py
> > +++ b/test/xlog-py/dup_key.test.py
> > @@ -26,19 +26,27 @@ server.stop()
> >
> > # Save wal#1
> >
> > if os.access(wal, os.F_OK):
> > - print ".xlog exists"
> > + print ".xlog#1 exists"
> >
> > os.rename(wal, wal_old)
> >
> > -# Write wal#2
> > +# Write wal#2 to bump lsn
> > +server.start()
> > +server.admin("box.space.test:insert{3, 'third tuple'}")
> > +server.admin("box.space.test:insert{4, 'fourth tuple'}")
> > +server.stop()
> > +
> > +if os.access(wal, os.F_OK):
> > + print ".xlog#2 exists"
> > +
> > +# Write wal#3 - confliction with wal#1
> >
> > server.start()
> > server.admin("box.space.test:insert{1, 'third tuple'}")
> > server.admin("box.space.test:insert{2, 'fourth tuple'}")
> > server.stop()
> >
> > # Restore wal#1
> >
> > -if not os.access(wal, os.F_OK):
> > - print ".xlog does not exist"
> > - os.rename(wal_old, wal)
> > +os.unlink(wal)
> > +os.rename(wal_old, wal)
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: This is a digitally signed message part.
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20190129/d75c1892/attachment.sig>
More information about the Tarantool-patches
mailing list