From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Serge Petrenko Message-Id: <78E192F2-AA0E-43F7-979A-341DECAD65B0@tarantool.org> Content-Type: multipart/alternative; boundary="Apple-Mail=_5CBF6F72-6319-4EB3-90C6-49EA3C28119C" Mime-Version: 1.0 (Mac OS X Mail 12.2 \(3445.102.3\)) Subject: Re: [tarantool-patches] [PATCH] replication: do not skip master's rows in case of an error Date: Mon, 24 Dec 2018 13:45:22 +0300 In-Reply-To: <20181211104242.mlrocuwy3bqvdz5t@esperanza> References: <20181210153600.79222-1-sergepetrenko@tarantool.org> <20181211104242.mlrocuwy3bqvdz5t@esperanza> To: Vladimir Davydov Cc: tarantool-patches@freelists.org, Konstantin Osipov List-ID: --Apple-Mail=_5CBF6F72-6319-4EB3-90C6-49EA3C28119C Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi! Thank you for review! I pushed the new version of the patch on the branch and answered your = comments below.. Incremental diff is below. > 11 =D0=B4=D0=B5=D0=BA. 2018 =D0=B3., =D0=B2 13:42, Vladimir Davydov = =D0=BD=D0=B0=D0=BF=D0=B8=D1=81=D0=B0=D0=BB(=D0=B0= ): >=20 > On Mon, Dec 10, 2018 at 06:36:00PM +0300, Serge Petrenko wrote: >> Applier used to promote vclock prior to applying the row. This lead = to a >> situation when master's row would be skipped forever in case there is = an >> error trying to apply it. However, some errors are transient, and we >> might be able to successfully apply the same row later. >>=20 >> So only advance vclock if a corresponding row is successfully written = to >> WAL. This makes sure that rows are not skipped. >> replication_skip_conflict works as before: if it is set an a conflict >> occurs while applying the row, it is silently ignored and the vclock = is >> advanced, so the row is never applied. >>=20 >> While we're at it, make wal writer the only one responsible for >> advancing replicaset vclock. It was already doing it for rows coming = from >> the local instance, besides, it makes the code cleaner since now we = want >> to advance vclock only after a successful write and lets us get rid = of >> unnecessary checks whether applier or wal has already advanced the >> vclock. >>=20 >> Closes #2283 >> --- >> https://github.com/tarantool/tarantool/issues/2283 >> https://github.com/tarantool/tarantool/tree/sp/gh-2283-dont-skip-rows >>=20 >> src/box/applier.cc | 49 +++++----- >> src/box/wal.c | 59 ++++++----- >> src/errinj.h | 1 + >> test/box/errinj.result | 2 + >> test/replication/skip_conflict_row.result | 64 ++++++++++++ >> test/replication/skip_conflict_row.test.lua | 19 ++++ >> test/xlog/errinj.result | 17 +++- >> test/xlog/errinj.test.lua | 4 + >> test/xlog/panic_on_lsn_gap.result | 103 = +++++++++++--------- >> test/xlog/panic_on_lsn_gap.test.lua | 46 +++++---- >> 10 files changed, 241 insertions(+), 123 deletions(-) >>=20 >> diff --git a/src/box/applier.cc b/src/box/applier.cc >> index ff4af95e5..bdc303a7d 100644 >> --- a/src/box/applier.cc >> +++ b/src/box/applier.cc >> @@ -355,6 +355,10 @@ applier_join(struct applier *applier) >> coio_read_xrow(coio, ibuf, &row); >> applier->last_row_time =3D ev_monotonic_now(loop()); >> if (iproto_type_is_dml(row.type)) { >> + /* >> + * Manually advance vclock, since no WAL writes >> + * are done during this stage of recovery. >> + */ >> vclock_follow_xrow(&replicaset.vclock, &row); >> xstream_write_xc(applier->subscribe_stream, = &row); >> if (++row_count % 100000 =3D=3D 0) >> @@ -513,31 +517,21 @@ applier_subscribe(struct applier *applier) >> applier->lag =3D ev_now(loop()) - row.tm; >> applier->last_row_time =3D ev_monotonic_now(loop()); >>=20 >> + /* >> + * In a full mesh topology, the same set >> + * of changes may arrive via two >> + * concurrently running appliers. >> + * Thus the rows may execute out of order, >> + * when the following xstream_write() >> + * yields on WAL. Hence we need a latch to >> + * strictly order all changes which belong >> + * to the same server id. >> + */ >> + struct replica *replica =3D = replica_by_id(row.replica_id); >> + struct latch *latch =3D (replica ? &replica->order_latch = : >> + &replicaset.applier.order_latch); >> + latch_lock(latch); >> if (vclock_get(&replicaset.vclock, row.replica_id) < = row.lsn) { >> - /** >> - * Promote the replica set vclock before >> - * applying the row. If there is an >> - * exception (conflict) applying the row, >> - * the row is skipped when the replication >> - * is resumed. >> - */ >> - vclock_follow_xrow(&replicaset.vclock, &row); >> - struct replica *replica =3D = replica_by_id(row.replica_id); >> - struct latch *latch =3D (replica ? = &replica->order_latch : >> - = &replicaset.applier.order_latch); >> - /* >> - * In a full mesh topology, the same set >> - * of changes may arrive via two >> - * concurrently running appliers. Thanks >> - * to vclock_follow() above, the first row >> - * in the set will be skipped - but the >> - * remaining may execute out of order, >> - * when the following xstream_write() >> - * yields on WAL. Hence we need a latch to >> - * strictly order all changes which belong >> - * to the same server id. >> - */ >> - latch_lock(latch); >> int res =3D = xstream_write(applier->subscribe_stream, &row); >> latch_unlock(latch); >> if (res !=3D 0) { >> @@ -548,12 +542,13 @@ applier_subscribe(struct applier *applier) >> */ >> if (e->type =3D=3D &type_ClientError && >> box_error_code(e) =3D=3D = ER_TUPLE_FOUND && >> - replication_skip_conflict) >> + replication_skip_conflict) { >> diag_clear(diag_get()); >> - else >> + = vclock_follow_xrow(&replicaset.vclock, &row); >=20 > Hmm, do we really need to advance the vclock here? if xstream_write above fails, vclock is not advanced anywhere, so we = have to advance it manually if we want to skip the row, as directed by replication_skip_conflict = option. >=20 >> + } else >> diag_raise(); >> } >> - } >> + } else latch_unlock(latch); >=20 > Coding style: latch_unlock should be on the next line. >=20 > Anyway, could you please rewrite this part so that latch_lock and > latch_unlock stay on the same level? Rewrote this part, incremental diff is below. >=20 >> if (applier->state =3D=3D APPLIER_SYNC || >> applier->state =3D=3D APPLIER_FOLLOW) >> fiber_cond_signal(&applier->writer_cond); >> diff --git a/src/box/wal.c b/src/box/wal.c >> index 3b50d3629..c6c3c4ee5 100644 >> --- a/src/box/wal.c >> +++ b/src/box/wal.c >> @@ -121,6 +121,11 @@ struct wal_writer >> * with this LSN and LSN becomes "real". >> */ >> struct vclock vclock; >> + /* >> + * Uncommitted vclock of the latest entry to be written, >> + * which will become "real" on a successful WAL write. >> + */ >> + struct vclock req_vclock; >> /** >> * VClock of the most recent successfully created checkpoint. >> * The WAL writer must not delete WAL files that are needed to >> @@ -171,6 +176,11 @@ struct wal_msg { >> * be rolled back. >> */ >> struct stailq rollback; >> + /** >> + * Vclock of latest successfully written request in batch >> + * used to advance replicaset vclock. >> + */ >> + struct vclock vclock; >=20 > I don't understand this part. Why do you need to introduce these > wal_msg::vclock and wal_writer::req_vclock. Can't you simply use > the vclock of the last written row in wal_write()? A batch can have rows from different instances. Previously WAL writer only advanced replicaset vclock for the rows from the local instance, so = we could use the last written row from an instance to advance vclock. Now WAL writer advances replicaset vclock for all the rows coming from every cluster member, so we would have to search the batch for the last written row from every instance, which is rather cumbersome. So we better get a separate vclock for that to advance it on every = successful WAL write and then copy it to replicaset vclock. >=20 >> }; >>=20 >> /** >> @@ -284,6 +294,7 @@ tx_schedule_commit(struct cmsg *msg) >> /* Closes the input valve. */ >> stailq_concat(&writer->rollback, &batch->rollback); >> } >> + vclock_copy(&replicaset.vclock, &batch->vclock); >> tx_schedule_queue(&batch->commit); >> } >>=20 >> @@ -368,6 +379,7 @@ wal_writer_create(struct wal_writer *writer, enum = wal_mode wal_mode, >> writer->checkpoint_triggered =3D false; >>=20 >> vclock_copy(&writer->vclock, vclock); >> + vclock_copy(&writer->req_vclock, vclock); >> vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock); >> rlist_create(&writer->watchers); >>=20 >> @@ -907,10 +919,15 @@ wal_assign_lsn(struct wal_writer *writer, = struct xrow_header **row, >> /** Assign LSN to all local rows. */ >> for ( ; row < end; row++) { >> if ((*row)->replica_id =3D=3D 0) { >> - (*row)->lsn =3D vclock_inc(&writer->vclock, = instance_id); >> + (*row)->lsn =3D vclock_inc(&writer->req_vclock, = instance_id); >> (*row)->replica_id =3D instance_id; >> } else { >> - vclock_follow_xrow(&writer->vclock, *row); >> + vclock_follow_xrow(&writer->req_vclock, *row); >> + } >> + struct errinj *inj =3D errinj(ERRINJ_WAL_LSN_GAP, = ERRINJ_INT); >=20 > This error injection looks rather artifical to me. Can such a thing > happen in the real world? >=20 > Anyway, why do you need it at all? AFAICS you don't use it in the > replication test, which is supposed to test the feature. I don't > want to dive into the xlog tests so could you please elaborate here > or, even better, in the commit message? In a test xlog/panic_on_lsn_gap.test.lua we created an lsn gap in a file = by simulating a failed WAL write, which caused the writer to advance its = vclock The way to create an lsn gap in a WAL doesn=E2=80=99t work anymore, = because this patch alters WAL writer to only advance its vclock on successful journal = writes. This error injection was added to simulate the old behaviour. I guess we can drop it together with xlog/panic_on_lsn_gap.test.lua. AFAICS the bug this test is for is not reproducible anymore. What do you think? >=20 >> + if (inj !=3D NULL && inj->iparam > 0) { >> + (*row)->lsn +=3D inj->iparam; >> + vclock_follow_xrow(&writer->req_vclock, *row); >> } >> } >> } >> @@ -975,13 +992,14 @@ wal_write_to_disk(struct cmsg *msg) >> struct stailq_entry *last_committed =3D NULL; >> stailq_foreach_entry(entry, &wal_msg->commit, fifo) { >> wal_assign_lsn(writer, entry->rows, entry->rows + = entry->n_rows); >> - entry->res =3D vclock_sum(&writer->vclock); >> - rc =3D xlog_write_entry(l, entry); >> + entry->res =3D vclock_sum(&writer->req_vclock); >> + int rc =3D xlog_write_entry(l, entry); >> if (rc < 0) >> goto done; >> if (rc > 0) { >> writer->checkpoint_wal_size +=3D rc; >> last_committed =3D &entry->fifo; >> + vclock_copy(&writer->vclock, = &writer->req_vclock); >> } >> /* rc =3D=3D 0: the write is buffered in xlog_tx */ >> } >> @@ -991,6 +1009,7 @@ wal_write_to_disk(struct cmsg *msg) >>=20 >> writer->checkpoint_wal_size +=3D rc; >> last_committed =3D stailq_last(&wal_msg->commit); >> + vclock_copy(&writer->vclock, &writer->req_vclock); >>=20 >> /* >> * Notify TX if the checkpoint threshold has been exceeded. >> @@ -1021,6 +1040,10 @@ done: >> error_log(error); >> diag_clear(diag_get()); >> } >> + /* Latest successfully written row vclock. */ >> + vclock_copy(&wal_msg->vclock, &writer->vclock); >> + /* Rollback request vclock to the latest committed. */ >> + vclock_copy(&writer->req_vclock, &writer->vclock); >> /* >> * We need to start rollback from the first request >> * following the last committed request. If >> @@ -1152,31 +1175,6 @@ wal_write(struct journal *journal, struct = journal_entry *entry) >> bool cancellable =3D fiber_set_cancellable(false); >> fiber_yield(); /* Request was inserted. */ >> fiber_set_cancellable(cancellable); >> - if (entry->res > 0) { >> - struct xrow_header **last =3D entry->rows + = entry->n_rows - 1; >> - while (last >=3D entry->rows) { >> - /* >> - * Find last row from local instance id >> - * and promote vclock. >> - */ >> - if ((*last)->replica_id =3D=3D instance_id) { >> - /* >> - * In master-master configuration, = during sudden >> - * power-loss, if the data have not been = written >> - * to WAL but have already been sent to = others, >> - * they will send the data back. In this = case >> - * vclock has already been promoted by = applier. >> - */ >=20 > It'd be nice to leave this comment. Applier doesn=E2=80=99t advance vclock anymore. We can leave the=20 >> - * In master-master configuration, = during sudden >> - * power-loss, if the data have not been = written >> - * to WAL but have already been sent to = others, >> - * they will send the data back. part, but I don=E2=80=99t know, where to put it now. >=20 >> - if (vclock_get(&replicaset.vclock, >> - instance_id) < = (*last)->lsn) { >> - = vclock_follow_xrow(&replicaset.vclock, >> - *last); >> - } >> - break; >> - } >> - --last; >> - } >> - } >> return entry->res; >> } >>=20 >> @@ -1187,11 +1185,12 @@ wal_write_in_wal_mode_none(struct journal = *journal, >> struct wal_writer *writer =3D (struct wal_writer *) journal; >> wal_assign_lsn(writer, entry->rows, entry->rows + = entry->n_rows); >> int64_t old_lsn =3D vclock_get(&replicaset.vclock, instance_id); >> - int64_t new_lsn =3D vclock_get(&writer->vclock, instance_id); >> + int64_t new_lsn =3D vclock_get(&writer->req_vclock, = instance_id); >> if (new_lsn > old_lsn) { >> /* There were local writes, promote vclock. */ >> vclock_follow(&replicaset.vclock, instance_id, new_lsn); >> } >> + vclock_copy(&writer->vclock, &writer->req_vclock); >> return vclock_sum(&writer->vclock); >> } Incremental diff: diff --git a/src/box/applier.cc b/src/box/applier.cc index bdc303a7d..6fac2561a 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -530,25 +530,32 @@ applier_subscribe(struct applier *applier) struct replica *replica =3D = replica_by_id(row.replica_id); struct latch *latch =3D (replica ? &replica->order_latch = : &replicaset.applier.order_latch); + int res =3D 0; + latch_lock(latch); - if (vclock_get(&replicaset.vclock, row.replica_id) < = row.lsn) { - int res =3D = xstream_write(applier->subscribe_stream, &row); - latch_unlock(latch); - if (res !=3D 0) { - struct error *e =3D = diag_last_error(diag_get()); - /** - * Silently skip ER_TUPLE_FOUND error if = such - * option is set in config. - */ - if (e->type =3D=3D &type_ClientError && - box_error_code(e) =3D=3D = ER_TUPLE_FOUND && - replication_skip_conflict) { - diag_clear(diag_get()); - = vclock_follow_xrow(&replicaset.vclock, &row); - } else - diag_raise(); - } - } else latch_unlock(latch); + if (vclock_get(&replicaset.vclock, row.replica_id) < = row.lsn) + res =3D xstream_write(applier->subscribe_stream, = &row); + latch_unlock(latch); + + if (res !=3D 0) { + struct error *e =3D diag_last_error(diag_get()); + /** + * Silently skip ER_TUPLE_FOUND error if such + * option is set in config. + * Also advance the vclock manually, since + * WAL writer only advances it on successful + * writes, which is not the case. + * ER_TUPLE_FOUND occurs even before WAL write. + */ + if (e->type =3D=3D &type_ClientError && + box_error_code(e) =3D=3D ER_TUPLE_FOUND && + replication_skip_conflict) { + diag_clear(diag_get()); + vclock_follow_xrow(&replicaset.vclock, = &row); + } else + diag_raise(); + } + if (applier->state =3D=3D APPLIER_SYNC || applier->state =3D=3D APPLIER_FOLLOW) fiber_cond_signal(&applier->writer_cond); --Apple-Mail=_5CBF6F72-6319-4EB3-90C6-49EA3C28119C Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hi! Thank you for review!
I pushed the new version of the patch on the branch and = answered your comments below..
Incremental diff is = below.

11 =D0=B4=D0=B5=D0=BA. 2018 =D0=B3., =D0=B2 13:42, Vladimir = Davydov <vdavydov.dev@gmail.com> =D0=BD=D0=B0=D0=BF=D0=B8=D1=81=D0= =B0=D0=BB(=D0=B0):

On Mon, Dec 10, 2018 at = 06:36:00PM +0300, Serge Petrenko wrote:
Applier= used to promote vclock prior to applying the row. This lead to a
situation when master's row would be skipped forever in case = there is an
error trying to apply it. However, some errors = are transient, and we
might be able to successfully apply = the same row later.

So only advance vclock = if a corresponding row is successfully written to
WAL. = This makes sure that rows are not skipped.
replication_skip_conflict works as before: if it is set an a = conflict
occurs while applying the row, it is silently = ignored and the vclock is
advanced, so the row is never = applied.

While we're at it, make wal writer = the only one responsible for
advancing replicaset vclock. = It was already doing it for rows coming from
the local = instance, besides, it makes the code cleaner since now we want
to advance vclock only after a successful write and lets us = get rid of
unnecessary checks whether applier or wal has = already advanced the
vclock.

Closes #2283
---
https://github.com/tarantool/tarantool/issues/2283
https://github.com/tarantool/tarantool/tree/sp/gh-2283-dont-ski= p-rows

src/box/applier.cc     =                     =  |  49 +++++-----
src/box/wal.c     =                     =       |  59 ++++++-----
src/errinj.h =                     =            |   1 +
test/box/errinj.result           =            |   2 +
test/replication/skip_conflict_row.result   |  64 = ++++++++++++
test/replication/skip_conflict_row.test.lua | =  19 ++++
test/xlog/errinj.result       =               |  17 +++-
test/xlog/errinj.test.lua           =         |   4 +
test/xlog/panic_on_lsn_gap.result         =   | 103 +++++++++++---------
test/xlog/panic_on_lsn_gap.test.lua       =   |  46 +++++----
10 files changed, 241 = insertions(+), 123 deletions(-)

diff --git = a/src/box/applier.cc b/src/box/applier.cc
index = ff4af95e5..bdc303a7d 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -355,6 +355,10 @@ = applier_join(struct applier *applier)
= coio_read_xrow(coio, ibuf, &row);
= applier->last_row_time =3D ev_monotonic_now(loop());
= = if (iproto_type_is_dml(row.type)) {
+ /*
+ = = =  * Manually advance vclock, since no WAL writes
+ = = =  * are done during this stage of recovery.
+ = = =  */
= vclock_follow_xrow(&replicaset.vclock, &row);
= = = xstream_write_xc(applier->subscribe_stream, &row);
= = = if (++row_count % 100000 =3D=3D 0)
@@ -513,31 = +517,21 @@ applier_subscribe(struct applier *applier)
= applier->lag =3D ev_now(loop()) - row.tm;
= applier->last_row_time =3D ev_monotonic_now(loop());

+ /*
+  * = In a full mesh topology, the same set
+  * = of changes may arrive via two
+  * = concurrently running appliers.
+  * = Thus the rows may execute out of order,
+  * = when the following xstream_write()
+  * = yields on WAL. Hence we need a latch to
+  * = strictly order all changes which belong
+  * = to the same server id.
+  */
+ struct = replica *replica =3D replica_by_id(row.replica_id);
+ struct = latch *latch =3D (replica ? &replica->order_latch :
+ = = = =        = &replicaset.applier.order_latch);
+ = latch_lock(latch);
if = (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
- = = = /**
-  * Promote the replica set = vclock before
-  * applying the row. If = there is an
-  * exception (conflict) = applying the row,
-  * the row is skipped when = the replication
-  * is resumed.
- = = =  */
- = vclock_follow_xrow(&replicaset.vclock, &row);
- = = = struct replica *replica =3D replica_by_id(row.replica_id);
- = = = struct latch *latch =3D (replica ? &replica->order_latch = :
-= = = = =        = &replicaset.applier.order_latch);
- /*
- = = =  * In a full mesh topology, the same set
-  * = of changes may arrive via two
-  * = concurrently running appliers. Thanks
-  * = to vclock_follow() above, the first row
-  * = in the set will be skipped - but the
-  * = remaining may execute out of order,
-  * = when the following xstream_write()
-  * = yields on WAL. Hence we need a latch to
-  * = strictly order all changes which belong
-  * = to the same server id.
-  */
- = latch_lock(latch);
int res =3D = xstream_write(applier->subscribe_stream, &row);
= latch_unlock(latch);
if (res !=3D 0) {
@@ = -548,12 +542,13 @@ applier_subscribe(struct applier *applier)
= = = =  */
if (e->type =3D=3D = &type_ClientError &&
=     box_error_code(e) =3D=3D ER_TUPLE_FOUND = &&
-    =  replication_skip_conflict)
+ =     replication_skip_conflict) {
= diag_clear(diag_get());
- else
+ = = = = = vclock_follow_xrow(&replicaset.vclock, &row);

Hmm, do we really need to advance = the vclock here?

if xstream_write above fails, vclock is = not advanced anywhere, so we have to advance it manually
if we want to skip the row, as directed by = replication_skip_conflict option.


+ } else
= diag_raise();
}
- }
+ = = } else latch_unlock(latch);

Coding style: latch_unlock should be on the next line.

Anyway, could you please rewrite this part so = that latch_lock and
latch_unlock stay on the same = level?

Rewrote this part, incremental diff is = below.


if = (applier->state =3D=3D APPLIER_SYNC ||
=     applier->state =3D=3D APPLIER_FOLLOW)
= = = fiber_cond_signal(&applier->writer_cond);
diff= --git a/src/box/wal.c b/src/box/wal.c
index = 3b50d3629..c6c3c4ee5 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -121,6 +121,11 @@ struct = wal_writer
 * with this LSN and LSN = becomes "real".
 */
struct = vclock vclock;
+ /*
+  * = Uncommitted vclock of the latest entry to be written,
+  * = which will become "real" on a successful WAL write.
+ =  */
+ struct vclock req_vclock;
= /**
 * VClock of the most recent = successfully created checkpoint.
 * = The WAL writer must not delete WAL files that are needed to
@@ -171,6 +176,11 @@ struct wal_msg {
 * = be rolled back.
 */
struct = stailq rollback;
+ /**
+  * = Vclock of latest successfully written request in batch
+ =  * used to advance replicaset vclock.
+ =  */
+ struct vclock vclock;

I don't understand this part. Why = do you need to introduce these
wal_msg::vclock and = wal_writer::req_vclock. Can't you simply use
the vclock of = the last written row in wal_write()?

A batch can have rows = from different instances. Previously WAL writer
only = advanced replicaset vclock for the rows from the local instance, so = we
could use the last written row from an instance = to advance vclock.
Now WAL writer advances = replicaset vclock for all the rows coming from
every = cluster member, so we would have to search the batch for the = last
written row from every instance, which is = rather cumbersome.
So we better get a separate = vclock for that to advance it on every successful WAL write
and then copy it to replicaset vclock.


};

/**
@@ -284,6 +294,7 @@ = tx_schedule_commit(struct cmsg *msg)
/* Closes = the input valve. */
= stailq_concat(&writer->rollback, = &batch->rollback);
}
+ = vclock_copy(&replicaset.vclock, &batch->vclock);
= tx_schedule_queue(&batch->commit);
}

@@ -368,6 +379,7 @@ wal_writer_create(struct = wal_writer *writer, enum wal_mode wal_mode,
= writer->checkpoint_triggered =3D false;

= vclock_copy(&writer->vclock, vclock);
+ = vclock_copy(&writer->req_vclock, vclock);
= vclock_copy(&writer->checkpoint_vclock, = checkpoint_vclock);
= rlist_create(&writer->watchers);

@@ -907,10 +919,15 @@ wal_assign_lsn(struct wal_writer = *writer, struct xrow_header **row,
/** = Assign LSN to all local rows. */
for ( ; = row < end; row++) {
if ((*row)->replica_id =3D=3D = 0) {
- (*row)->lsn =3D = vclock_inc(&writer->vclock, instance_id);
+ = (*row)->lsn =3D vclock_inc(&writer->req_vclock, = instance_id);
(*row)->replica_id =3D = instance_id;
} else {
- = vclock_follow_xrow(&writer->vclock, *row);
+ = = = vclock_follow_xrow(&writer->req_vclock, *row);
+ = = }
+ struct errinj *inj =3D = errinj(ERRINJ_WAL_LSN_GAP, ERRINJ_INT);

This error injection looks rather artifical to me. Can such a = thing
happen in the real world?

Anyway, why do you need it at all? AFAICS you don't use it in = the
replication test, which is supposed to test the = feature. I don't
want to dive into the xlog tests so could = you please elaborate here
or, even better, in the commit = message?

In a test = xlog/panic_on_lsn_gap.test.lua we created an lsn gap in a file = by
simulating a failed WAL write, which caused the = writer to advance its vclock
The way to create an = lsn gap in a WAL doesn=E2=80=99t work anymore, because this
patch alters WAL writer to only advance its vclock on = successful journal writes.
This error injection was = added to simulate the old behaviour.

I guess we can drop it together with = xlog/panic_on_lsn_gap.test.lua.
AFAICS the bug this = test is for is not reproducible anymore.
What do = you think?


+ if (inj = !=3D NULL && inj->iparam > 0) {
+ = (*row)->lsn +=3D inj->iparam;
+ = vclock_follow_xrow(&writer->req_vclock, *row);
= = }
}
}
@@ = -975,13 +992,14 @@ wal_write_to_disk(struct cmsg *msg)
struct = stailq_entry *last_committed =3D NULL;
= stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
= = wal_assign_lsn(writer, entry->rows, entry->rows + = entry->n_rows);
- entry->res =3D = vclock_sum(&writer->vclock);
- rc =3D = xlog_write_entry(l, entry);
+ entry->res =3D = vclock_sum(&writer->req_vclock);
+ int rc =3D = xlog_write_entry(l, entry);
if (rc < 0)
goto = done;
if (rc > 0) {
= = = writer->checkpoint_wal_size +=3D rc;
= last_committed =3D &entry->fifo;
+ = vclock_copy(&writer->vclock, = &writer->req_vclock);
}
/* rc =3D=3D= 0: the write is buffered in xlog_tx */
}
@@ -991,6 +1009,7 @@ wal_write_to_disk(struct cmsg *msg)

writer->checkpoint_wal_size +=3D= rc;
last_committed =3D = stailq_last(&wal_msg->commit);
+ = vclock_copy(&writer->vclock, = &writer->req_vclock);

/*
=  * Notify TX if the checkpoint threshold has been = exceeded.
@@ -1021,6 +1040,10 @@ done:
= error_log(error);
diag_clear(diag_get());
= }
+ /* Latest successfully written = row vclock. */
+ = vclock_copy(&wal_msg->vclock, &writer->vclock);
+ = /* Rollback request vclock to the latest committed. */
+ = vclock_copy(&writer->req_vclock, = &writer->vclock);
/*
 * = We need to start rollback from the first request
 * = following the last committed request. If
@@ -1152,31 = +1175,6 @@ wal_write(struct journal *journal, struct journal_entry = *entry)
bool cancellable =3D = fiber_set_cancellable(false);
fiber_yield(); /* Request was = inserted. */
= fiber_set_cancellable(cancellable);
- if = (entry->res > 0) {
- struct xrow_header **last =3D = entry->rows + entry->n_rows - 1;
- while = (last >=3D entry->rows) {
- /*
- = = =  * Find last row from local instance id
-  * = and promote vclock.
-  */
- if = ((*last)->replica_id =3D=3D instance_id) {
- /*
- = = = =  * In master-master configuration, during sudden
- = = = =  * power-loss, if the data have not been written
- = = = =  * to WAL but have already been sent to others,
- = = = =  * they will send the data back. In this case
- = = = =  * vclock has already been promoted by applier.
- = = = =  */

It'd be nice = to leave this comment.

Applier doesn=E2=80=99t advance vclock = anymore. We can leave the 
-  * = In master-master configuration, during sudden
-  * = power-loss, if the data have not been written
-  * = to WAL but have already been sent to others,
-  * = they will send the data back.
part, but I don=E2=80=99t know, where to put it now.


- if = (vclock_get(&replicaset.vclock,
- =        instance_id) < (*last)->lsn) = {
-= = = = = vclock_follow_xrow(&replicaset.vclock,
- =    *last);
- }
- break;
- = = = }
- --last;
- }
- = }
return entry->res;
}

@@ -1187,11 +1185,12 @@ = wal_write_in_wal_mode_none(struct journal *journal,
struct = wal_writer *writer =3D (struct wal_writer *) journal;
= wal_assign_lsn(writer, entry->rows, entry->rows + = entry->n_rows);
int64_t old_lsn =3D = vclock_get(&replicaset.vclock, instance_id);
- int64_t = new_lsn =3D vclock_get(&writer->vclock, instance_id);
+ = int64_t new_lsn =3D vclock_get(&writer->req_vclock, = instance_id);
if (new_lsn > old_lsn) {
= = /* There were local writes, promote vclock. */
= vclock_follow(&replicaset.vclock, instance_id, new_lsn);
= }
+ = vclock_copy(&writer->vclock, = &writer->req_vclock);
return = vclock_sum(&writer->vclock);
}

Incremental = diff:

diff --git a/src/box/applier.cc b/src/box/applier.cc
index = bdc303a7d..6fac2561a 100644
--- a/src/box/applier.cc
+++ = b/src/box/applier.cc
@@ -530,25 +530,32 @@ applier_subscribe(struct applier = *applier)
              =   struct replica *replica =3D = replica_by_id(row.replica_id);
        =         struct latch *latch =3D (replica ? = &replica->order_latch :
        =                     =           =  &replicaset.applier.order_latch);
+   =             int res =3D 0;
+
            =     latch_lock(latch);
-     =           if = (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-               =         int res =3D = xstream_write(applier->subscribe_stream, &row);
-               =         latch_unlock(latch);
-               =         if (res !=3D 0) {
-               =                 struct = error *e =3D diag_last_error(diag_get());
-   =                     =         /**
-      =                     =       * Silently skip ER_TUPLE_FOUND error if = such
-              =                   * = option is set in config.
-        =                     =     */
-         =                     =   if (e->type =3D=3D &type_ClientError &&
-               =                   =   box_error_code(e) =3D=3D ER_TUPLE_FOUND &&
-               =                   =   replication_skip_conflict) {
-   =                     =               =   diag_clear(diag_get());
-     =                     =             =   vclock_follow_xrow(&replicaset.vclock, &row);
-               =                 } else
-               =                     =     diag_raise();
-     =                   }
-             =   } else latch_unlock(latch);
+   =             if = (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn)
+               =         res =3D = xstream_write(applier->subscribe_stream, &row);
+             =   latch_unlock(latch);
+
+             =   if (res !=3D 0) {
+       =                 struct = error *e =3D diag_last_error(diag_get());
+   =                   =   /**
+            =             * Silently skip = ER_TUPLE_FOUND error if such
+        =                 * option is = set in config.
+            =             * Also advance the vclock = manually, since
+            =             * WAL writer only = advances it on successful
+        =                 * writes, = which is not the case.
+          =               * ER_TUPLE_FOUND = occurs even before WAL write.
+        =                 */
+               =         if (e->type =3D=3D = &type_ClientError &&
+     =                 =       box_error_code(e) =3D=3D ER_TUPLE_FOUND = &&
+           =               =   replication_skip_conflict) {
+   =                     =         diag_clear(diag_get());
+               =               =   vclock_follow_xrow(&replicaset.vclock, &row);
+               =         } else
+   =                     =         diag_raise();
+  =             }
+
              =   if (applier->state =3D=3D APPLIER_SYNC ||
                =     applier->state =3D=3D APPLIER_FOLLOW)
                =         = fiber_cond_signal(&applier->writer_cond);

= --Apple-Mail=_5CBF6F72-6319-4EB3-90C6-49EA3C28119C--