[tarantool-patches] [PATCH] replication: do not skip master's rows in case of an error
Vladimir Davydov
vdavydov.dev at gmail.com
Tue Dec 11 13:42:42 MSK 2018
On Mon, Dec 10, 2018 at 06:36:00PM +0300, Serge Petrenko wrote:
> Applier used to promote vclock prior to applying the row. This lead to a
> situation when master's row would be skipped forever in case there is an
> error trying to apply it. However, some errors are transient, and we
> might be able to successfully apply the same row later.
>
> So only advance vclock if a corresponding row is successfully written to
> WAL. This makes sure that rows are not skipped.
> replication_skip_conflict works as before: if it is set an a conflict
> occurs while applying the row, it is silently ignored and the vclock is
> advanced, so the row is never applied.
>
> While we're at it, make wal writer the only one responsible for
> advancing replicaset vclock. It was already doing it for rows coming from
> the local instance, besides, it makes the code cleaner since now we want
> to advance vclock only after a successful write and lets us get rid of
> unnecessary checks whether applier or wal has already advanced the
> vclock.
>
> Closes #2283
> ---
> https://github.com/tarantool/tarantool/issues/2283
> https://github.com/tarantool/tarantool/tree/sp/gh-2283-dont-skip-rows
>
> src/box/applier.cc | 49 +++++-----
> src/box/wal.c | 59 ++++++-----
> src/errinj.h | 1 +
> test/box/errinj.result | 2 +
> test/replication/skip_conflict_row.result | 64 ++++++++++++
> test/replication/skip_conflict_row.test.lua | 19 ++++
> test/xlog/errinj.result | 17 +++-
> test/xlog/errinj.test.lua | 4 +
> test/xlog/panic_on_lsn_gap.result | 103 +++++++++++---------
> test/xlog/panic_on_lsn_gap.test.lua | 46 +++++----
> 10 files changed, 241 insertions(+), 123 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index ff4af95e5..bdc303a7d 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -355,6 +355,10 @@ applier_join(struct applier *applier)
> coio_read_xrow(coio, ibuf, &row);
> applier->last_row_time = ev_monotonic_now(loop());
> if (iproto_type_is_dml(row.type)) {
> + /*
> + * Manually advance vclock, since no WAL writes
> + * are done during this stage of recovery.
> + */
> vclock_follow_xrow(&replicaset.vclock, &row);
> xstream_write_xc(applier->subscribe_stream, &row);
> if (++row_count % 100000 == 0)
> @@ -513,31 +517,21 @@ applier_subscribe(struct applier *applier)
> applier->lag = ev_now(loop()) - row.tm;
> applier->last_row_time = ev_monotonic_now(loop());
>
> + /*
> + * In a full mesh topology, the same set
> + * of changes may arrive via two
> + * concurrently running appliers.
> + * Thus the rows may execute out of order,
> + * when the following xstream_write()
> + * yields on WAL. Hence we need a latch to
> + * strictly order all changes which belong
> + * to the same server id.
> + */
> + struct replica *replica = replica_by_id(row.replica_id);
> + struct latch *latch = (replica ? &replica->order_latch :
> + &replicaset.applier.order_latch);
> + latch_lock(latch);
> if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
> - /**
> - * Promote the replica set vclock before
> - * applying the row. If there is an
> - * exception (conflict) applying the row,
> - * the row is skipped when the replication
> - * is resumed.
> - */
> - vclock_follow_xrow(&replicaset.vclock, &row);
> - struct replica *replica = replica_by_id(row.replica_id);
> - struct latch *latch = (replica ? &replica->order_latch :
> - &replicaset.applier.order_latch);
> - /*
> - * In a full mesh topology, the same set
> - * of changes may arrive via two
> - * concurrently running appliers. Thanks
> - * to vclock_follow() above, the first row
> - * in the set will be skipped - but the
> - * remaining may execute out of order,
> - * when the following xstream_write()
> - * yields on WAL. Hence we need a latch to
> - * strictly order all changes which belong
> - * to the same server id.
> - */
> - latch_lock(latch);
> int res = xstream_write(applier->subscribe_stream, &row);
> latch_unlock(latch);
> if (res != 0) {
> @@ -548,12 +542,13 @@ applier_subscribe(struct applier *applier)
> */
> if (e->type == &type_ClientError &&
> box_error_code(e) == ER_TUPLE_FOUND &&
> - replication_skip_conflict)
> + replication_skip_conflict) {
> diag_clear(diag_get());
> - else
> + vclock_follow_xrow(&replicaset.vclock, &row);
Hmm, do we really need to advance the vclock here?
> + } else
> diag_raise();
> }
> - }
> + } else latch_unlock(latch);
Coding style: latch_unlock should be on the next line.
Anyway, could you please rewrite this part so that latch_lock and
latch_unlock stay on the same level?
> if (applier->state == APPLIER_SYNC ||
> applier->state == APPLIER_FOLLOW)
> fiber_cond_signal(&applier->writer_cond);
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 3b50d3629..c6c3c4ee5 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -121,6 +121,11 @@ struct wal_writer
> * with this LSN and LSN becomes "real".
> */
> struct vclock vclock;
> + /*
> + * Uncommitted vclock of the latest entry to be written,
> + * which will become "real" on a successful WAL write.
> + */
> + struct vclock req_vclock;
> /**
> * VClock of the most recent successfully created checkpoint.
> * The WAL writer must not delete WAL files that are needed to
> @@ -171,6 +176,11 @@ struct wal_msg {
> * be rolled back.
> */
> struct stailq rollback;
> + /**
> + * Vclock of latest successfully written request in batch
> + * used to advance replicaset vclock.
> + */
> + struct vclock vclock;
I don't understand this part. Why do you need to introduce these
wal_msg::vclock and wal_writer::req_vclock. Can't you simply use
the vclock of the last written row in wal_write()?
> };
>
> /**
> @@ -284,6 +294,7 @@ tx_schedule_commit(struct cmsg *msg)
> /* Closes the input valve. */
> stailq_concat(&writer->rollback, &batch->rollback);
> }
> + vclock_copy(&replicaset.vclock, &batch->vclock);
> tx_schedule_queue(&batch->commit);
> }
>
> @@ -368,6 +379,7 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
> writer->checkpoint_triggered = false;
>
> vclock_copy(&writer->vclock, vclock);
> + vclock_copy(&writer->req_vclock, vclock);
> vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
> rlist_create(&writer->watchers);
>
> @@ -907,10 +919,15 @@ wal_assign_lsn(struct wal_writer *writer, struct xrow_header **row,
> /** Assign LSN to all local rows. */
> for ( ; row < end; row++) {
> if ((*row)->replica_id == 0) {
> - (*row)->lsn = vclock_inc(&writer->vclock, instance_id);
> + (*row)->lsn = vclock_inc(&writer->req_vclock, instance_id);
> (*row)->replica_id = instance_id;
> } else {
> - vclock_follow_xrow(&writer->vclock, *row);
> + vclock_follow_xrow(&writer->req_vclock, *row);
> + }
> + struct errinj *inj = errinj(ERRINJ_WAL_LSN_GAP, ERRINJ_INT);
This error injection looks rather artifical to me. Can such a thing
happen in the real world?
Anyway, why do you need it at all? AFAICS you don't use it in the
replication test, which is supposed to test the feature. I don't
want to dive into the xlog tests so could you please elaborate here
or, even better, in the commit message?
> + if (inj != NULL && inj->iparam > 0) {
> + (*row)->lsn += inj->iparam;
> + vclock_follow_xrow(&writer->req_vclock, *row);
> }
> }
> }
> @@ -975,13 +992,14 @@ wal_write_to_disk(struct cmsg *msg)
> struct stailq_entry *last_committed = NULL;
> stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
> wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
> - entry->res = vclock_sum(&writer->vclock);
> - rc = xlog_write_entry(l, entry);
> + entry->res = vclock_sum(&writer->req_vclock);
> + int rc = xlog_write_entry(l, entry);
> if (rc < 0)
> goto done;
> if (rc > 0) {
> writer->checkpoint_wal_size += rc;
> last_committed = &entry->fifo;
> + vclock_copy(&writer->vclock, &writer->req_vclock);
> }
> /* rc == 0: the write is buffered in xlog_tx */
> }
> @@ -991,6 +1009,7 @@ wal_write_to_disk(struct cmsg *msg)
>
> writer->checkpoint_wal_size += rc;
> last_committed = stailq_last(&wal_msg->commit);
> + vclock_copy(&writer->vclock, &writer->req_vclock);
>
> /*
> * Notify TX if the checkpoint threshold has been exceeded.
> @@ -1021,6 +1040,10 @@ done:
> error_log(error);
> diag_clear(diag_get());
> }
> + /* Latest successfully written row vclock. */
> + vclock_copy(&wal_msg->vclock, &writer->vclock);
> + /* Rollback request vclock to the latest committed. */
> + vclock_copy(&writer->req_vclock, &writer->vclock);
> /*
> * We need to start rollback from the first request
> * following the last committed request. If
> @@ -1152,31 +1175,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
> bool cancellable = fiber_set_cancellable(false);
> fiber_yield(); /* Request was inserted. */
> fiber_set_cancellable(cancellable);
> - if (entry->res > 0) {
> - struct xrow_header **last = entry->rows + entry->n_rows - 1;
> - while (last >= entry->rows) {
> - /*
> - * Find last row from local instance id
> - * and promote vclock.
> - */
> - if ((*last)->replica_id == instance_id) {
> - /*
> - * In master-master configuration, during sudden
> - * power-loss, if the data have not been written
> - * to WAL but have already been sent to others,
> - * they will send the data back. In this case
> - * vclock has already been promoted by applier.
> - */
It'd be nice to leave this comment.
> - if (vclock_get(&replicaset.vclock,
> - instance_id) < (*last)->lsn) {
> - vclock_follow_xrow(&replicaset.vclock,
> - *last);
> - }
> - break;
> - }
> - --last;
> - }
> - }
> return entry->res;
> }
>
> @@ -1187,11 +1185,12 @@ wal_write_in_wal_mode_none(struct journal *journal,
> struct wal_writer *writer = (struct wal_writer *) journal;
> wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
> int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
> - int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
> + int64_t new_lsn = vclock_get(&writer->req_vclock, instance_id);
> if (new_lsn > old_lsn) {
> /* There were local writes, promote vclock. */
> vclock_follow(&replicaset.vclock, instance_id, new_lsn);
> }
> + vclock_copy(&writer->vclock, &writer->req_vclock);
> return vclock_sum(&writer->vclock);
> }
>
More information about the Tarantool-patches
mailing list