From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Tue, 11 Dec 2018 13:42:42 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] [PATCH] replication: do not skip master's rows in case of an error Message-ID: <20181211104242.mlrocuwy3bqvdz5t@esperanza> References: <20181210153600.79222-1-sergepetrenko@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20181210153600.79222-1-sergepetrenko@tarantool.org> To: Serge Petrenko Cc: kostja@tarantool.org, tarantool-patches@freelists.org List-ID: On Mon, Dec 10, 2018 at 06:36:00PM +0300, Serge Petrenko wrote: > Applier used to promote vclock prior to applying the row. This lead to a > situation when master's row would be skipped forever in case there is an > error trying to apply it. However, some errors are transient, and we > might be able to successfully apply the same row later. > > So only advance vclock if a corresponding row is successfully written to > WAL. This makes sure that rows are not skipped. > replication_skip_conflict works as before: if it is set an a conflict > occurs while applying the row, it is silently ignored and the vclock is > advanced, so the row is never applied. > > While we're at it, make wal writer the only one responsible for > advancing replicaset vclock. It was already doing it for rows coming from > the local instance, besides, it makes the code cleaner since now we want > to advance vclock only after a successful write and lets us get rid of > unnecessary checks whether applier or wal has already advanced the > vclock. > > Closes #2283 > --- > https://github.com/tarantool/tarantool/issues/2283 > https://github.com/tarantool/tarantool/tree/sp/gh-2283-dont-skip-rows > > src/box/applier.cc | 49 +++++----- > src/box/wal.c | 59 ++++++----- > src/errinj.h | 1 + > test/box/errinj.result | 2 + > test/replication/skip_conflict_row.result | 64 ++++++++++++ > test/replication/skip_conflict_row.test.lua | 19 ++++ > test/xlog/errinj.result | 17 +++- > test/xlog/errinj.test.lua | 4 + > test/xlog/panic_on_lsn_gap.result | 103 +++++++++++--------- > test/xlog/panic_on_lsn_gap.test.lua | 46 +++++---- > 10 files changed, 241 insertions(+), 123 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index ff4af95e5..bdc303a7d 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -355,6 +355,10 @@ applier_join(struct applier *applier) > coio_read_xrow(coio, ibuf, &row); > applier->last_row_time = ev_monotonic_now(loop()); > if (iproto_type_is_dml(row.type)) { > + /* > + * Manually advance vclock, since no WAL writes > + * are done during this stage of recovery. > + */ > vclock_follow_xrow(&replicaset.vclock, &row); > xstream_write_xc(applier->subscribe_stream, &row); > if (++row_count % 100000 == 0) > @@ -513,31 +517,21 @@ applier_subscribe(struct applier *applier) > applier->lag = ev_now(loop()) - row.tm; > applier->last_row_time = ev_monotonic_now(loop()); > > + /* > + * In a full mesh topology, the same set > + * of changes may arrive via two > + * concurrently running appliers. > + * Thus the rows may execute out of order, > + * when the following xstream_write() > + * yields on WAL. Hence we need a latch to > + * strictly order all changes which belong > + * to the same server id. > + */ > + struct replica *replica = replica_by_id(row.replica_id); > + struct latch *latch = (replica ? &replica->order_latch : > + &replicaset.applier.order_latch); > + latch_lock(latch); > if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { > - /** > - * Promote the replica set vclock before > - * applying the row. If there is an > - * exception (conflict) applying the row, > - * the row is skipped when the replication > - * is resumed. > - */ > - vclock_follow_xrow(&replicaset.vclock, &row); > - struct replica *replica = replica_by_id(row.replica_id); > - struct latch *latch = (replica ? &replica->order_latch : > - &replicaset.applier.order_latch); > - /* > - * In a full mesh topology, the same set > - * of changes may arrive via two > - * concurrently running appliers. Thanks > - * to vclock_follow() above, the first row > - * in the set will be skipped - but the > - * remaining may execute out of order, > - * when the following xstream_write() > - * yields on WAL. Hence we need a latch to > - * strictly order all changes which belong > - * to the same server id. > - */ > - latch_lock(latch); > int res = xstream_write(applier->subscribe_stream, &row); > latch_unlock(latch); > if (res != 0) { > @@ -548,12 +542,13 @@ applier_subscribe(struct applier *applier) > */ > if (e->type == &type_ClientError && > box_error_code(e) == ER_TUPLE_FOUND && > - replication_skip_conflict) > + replication_skip_conflict) { > diag_clear(diag_get()); > - else > + vclock_follow_xrow(&replicaset.vclock, &row); Hmm, do we really need to advance the vclock here? > + } else > diag_raise(); > } > - } > + } else latch_unlock(latch); Coding style: latch_unlock should be on the next line. Anyway, could you please rewrite this part so that latch_lock and latch_unlock stay on the same level? > if (applier->state == APPLIER_SYNC || > applier->state == APPLIER_FOLLOW) > fiber_cond_signal(&applier->writer_cond); > diff --git a/src/box/wal.c b/src/box/wal.c > index 3b50d3629..c6c3c4ee5 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -121,6 +121,11 @@ struct wal_writer > * with this LSN and LSN becomes "real". > */ > struct vclock vclock; > + /* > + * Uncommitted vclock of the latest entry to be written, > + * which will become "real" on a successful WAL write. > + */ > + struct vclock req_vclock; > /** > * VClock of the most recent successfully created checkpoint. > * The WAL writer must not delete WAL files that are needed to > @@ -171,6 +176,11 @@ struct wal_msg { > * be rolled back. > */ > struct stailq rollback; > + /** > + * Vclock of latest successfully written request in batch > + * used to advance replicaset vclock. > + */ > + struct vclock vclock; I don't understand this part. Why do you need to introduce these wal_msg::vclock and wal_writer::req_vclock. Can't you simply use the vclock of the last written row in wal_write()? > }; > > /** > @@ -284,6 +294,7 @@ tx_schedule_commit(struct cmsg *msg) > /* Closes the input valve. */ > stailq_concat(&writer->rollback, &batch->rollback); > } > + vclock_copy(&replicaset.vclock, &batch->vclock); > tx_schedule_queue(&batch->commit); > } > > @@ -368,6 +379,7 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, > writer->checkpoint_triggered = false; > > vclock_copy(&writer->vclock, vclock); > + vclock_copy(&writer->req_vclock, vclock); > vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock); > rlist_create(&writer->watchers); > > @@ -907,10 +919,15 @@ wal_assign_lsn(struct wal_writer *writer, struct xrow_header **row, > /** Assign LSN to all local rows. */ > for ( ; row < end; row++) { > if ((*row)->replica_id == 0) { > - (*row)->lsn = vclock_inc(&writer->vclock, instance_id); > + (*row)->lsn = vclock_inc(&writer->req_vclock, instance_id); > (*row)->replica_id = instance_id; > } else { > - vclock_follow_xrow(&writer->vclock, *row); > + vclock_follow_xrow(&writer->req_vclock, *row); > + } > + struct errinj *inj = errinj(ERRINJ_WAL_LSN_GAP, ERRINJ_INT); This error injection looks rather artifical to me. Can such a thing happen in the real world? Anyway, why do you need it at all? AFAICS you don't use it in the replication test, which is supposed to test the feature. I don't want to dive into the xlog tests so could you please elaborate here or, even better, in the commit message? > + if (inj != NULL && inj->iparam > 0) { > + (*row)->lsn += inj->iparam; > + vclock_follow_xrow(&writer->req_vclock, *row); > } > } > } > @@ -975,13 +992,14 @@ wal_write_to_disk(struct cmsg *msg) > struct stailq_entry *last_committed = NULL; > stailq_foreach_entry(entry, &wal_msg->commit, fifo) { > wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows); > - entry->res = vclock_sum(&writer->vclock); > - rc = xlog_write_entry(l, entry); > + entry->res = vclock_sum(&writer->req_vclock); > + int rc = xlog_write_entry(l, entry); > if (rc < 0) > goto done; > if (rc > 0) { > writer->checkpoint_wal_size += rc; > last_committed = &entry->fifo; > + vclock_copy(&writer->vclock, &writer->req_vclock); > } > /* rc == 0: the write is buffered in xlog_tx */ > } > @@ -991,6 +1009,7 @@ wal_write_to_disk(struct cmsg *msg) > > writer->checkpoint_wal_size += rc; > last_committed = stailq_last(&wal_msg->commit); > + vclock_copy(&writer->vclock, &writer->req_vclock); > > /* > * Notify TX if the checkpoint threshold has been exceeded. > @@ -1021,6 +1040,10 @@ done: > error_log(error); > diag_clear(diag_get()); > } > + /* Latest successfully written row vclock. */ > + vclock_copy(&wal_msg->vclock, &writer->vclock); > + /* Rollback request vclock to the latest committed. */ > + vclock_copy(&writer->req_vclock, &writer->vclock); > /* > * We need to start rollback from the first request > * following the last committed request. If > @@ -1152,31 +1175,6 @@ wal_write(struct journal *journal, struct journal_entry *entry) > bool cancellable = fiber_set_cancellable(false); > fiber_yield(); /* Request was inserted. */ > fiber_set_cancellable(cancellable); > - if (entry->res > 0) { > - struct xrow_header **last = entry->rows + entry->n_rows - 1; > - while (last >= entry->rows) { > - /* > - * Find last row from local instance id > - * and promote vclock. > - */ > - if ((*last)->replica_id == instance_id) { > - /* > - * In master-master configuration, during sudden > - * power-loss, if the data have not been written > - * to WAL but have already been sent to others, > - * they will send the data back. In this case > - * vclock has already been promoted by applier. > - */ It'd be nice to leave this comment. > - if (vclock_get(&replicaset.vclock, > - instance_id) < (*last)->lsn) { > - vclock_follow_xrow(&replicaset.vclock, > - *last); > - } > - break; > - } > - --last; > - } > - } > return entry->res; > } > > @@ -1187,11 +1185,12 @@ wal_write_in_wal_mode_none(struct journal *journal, > struct wal_writer *writer = (struct wal_writer *) journal; > wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows); > int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id); > - int64_t new_lsn = vclock_get(&writer->vclock, instance_id); > + int64_t new_lsn = vclock_get(&writer->req_vclock, instance_id); > if (new_lsn > old_lsn) { > /* There were local writes, promote vclock. */ > vclock_follow(&replicaset.vclock, instance_id, new_lsn); > } > + vclock_copy(&writer->vclock, &writer->req_vclock); > return vclock_sum(&writer->vclock); > } >