[tarantool-patches] [PATCH] replication: do not skip master's rows in case of an error

Vladimir Davydov vdavydov.dev at gmail.com
Tue Dec 11 13:42:42 MSK 2018


On Mon, Dec 10, 2018 at 06:36:00PM +0300, Serge Petrenko wrote:
> Applier used to promote vclock prior to applying the row. This lead to a
> situation when master's row would be skipped forever in case there is an
> error trying to apply it. However, some errors are transient, and we
> might be able to successfully apply the same row later.
> 
> So only advance vclock if a corresponding row is successfully written to
> WAL. This makes sure that rows are not skipped.
> replication_skip_conflict works as before: if it is set an a conflict
> occurs while applying the row, it is silently ignored and the vclock is
> advanced, so the row is never applied.
> 
> While we're at it, make wal writer the only one responsible for
> advancing replicaset vclock. It was already doing it for rows coming from
> the local instance, besides, it makes the code cleaner since now we want
> to advance vclock only after a successful write and lets us get rid of
> unnecessary checks whether applier or wal has already advanced the
> vclock.
> 
> Closes #2283
> ---
> https://github.com/tarantool/tarantool/issues/2283
> https://github.com/tarantool/tarantool/tree/sp/gh-2283-dont-skip-rows
> 
>  src/box/applier.cc                          |  49 +++++-----
>  src/box/wal.c                               |  59 ++++++-----
>  src/errinj.h                                |   1 +
>  test/box/errinj.result                      |   2 +
>  test/replication/skip_conflict_row.result   |  64 ++++++++++++
>  test/replication/skip_conflict_row.test.lua |  19 ++++
>  test/xlog/errinj.result                     |  17 +++-
>  test/xlog/errinj.test.lua                   |   4 +
>  test/xlog/panic_on_lsn_gap.result           | 103 +++++++++++---------
>  test/xlog/panic_on_lsn_gap.test.lua         |  46 +++++----
>  10 files changed, 241 insertions(+), 123 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index ff4af95e5..bdc303a7d 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -355,6 +355,10 @@ applier_join(struct applier *applier)
>  		coio_read_xrow(coio, ibuf, &row);
>  		applier->last_row_time = ev_monotonic_now(loop());
>  		if (iproto_type_is_dml(row.type)) {
> +			/*
> +			 * Manually advance vclock, since no WAL writes
> +			 * are done during this stage of recovery.
> +			 */
>  			vclock_follow_xrow(&replicaset.vclock, &row);
>  			xstream_write_xc(applier->subscribe_stream, &row);
>  			if (++row_count % 100000 == 0)
> @@ -513,31 +517,21 @@ applier_subscribe(struct applier *applier)
>  		applier->lag = ev_now(loop()) - row.tm;
>  		applier->last_row_time = ev_monotonic_now(loop());
>  
> +		/*
> +		 * In a full mesh topology, the same set
> +		 * of changes may arrive via two
> +		 * concurrently running appliers.
> +		 * Thus the rows may execute out of order,
> +		 * when the following xstream_write()
> +		 * yields on WAL. Hence we need a latch to
> +		 * strictly order all changes which belong
> +		 * to the same server id.
> +		 */
> +		struct replica *replica = replica_by_id(row.replica_id);
> +		struct latch *latch = (replica ? &replica->order_latch :
> +				       &replicaset.applier.order_latch);
> +		latch_lock(latch);
>  		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
> -			/**
> -			 * Promote the replica set vclock before
> -			 * applying the row. If there is an
> -			 * exception (conflict) applying the row,
> -			 * the row is skipped when the replication
> -			 * is resumed.
> -			 */
> -			vclock_follow_xrow(&replicaset.vclock, &row);
> -			struct replica *replica = replica_by_id(row.replica_id);
> -			struct latch *latch = (replica ? &replica->order_latch :
> -					       &replicaset.applier.order_latch);
> -			/*
> -			 * In a full mesh topology, the same set
> -			 * of changes may arrive via two
> -			 * concurrently running appliers. Thanks
> -			 * to vclock_follow() above, the first row
> -			 * in the set will be skipped - but the
> -			 * remaining may execute out of order,
> -			 * when the following xstream_write()
> -			 * yields on WAL. Hence we need a latch to
> -			 * strictly order all changes which belong
> -			 * to the same server id.
> -			 */
> -			latch_lock(latch);
>  			int res = xstream_write(applier->subscribe_stream, &row);
>  			latch_unlock(latch);
>  			if (res != 0) {
> @@ -548,12 +542,13 @@ applier_subscribe(struct applier *applier)
>  				 */
>  				if (e->type == &type_ClientError &&
>  				    box_error_code(e) == ER_TUPLE_FOUND &&
> -				    replication_skip_conflict)
> +				    replication_skip_conflict) {
>  					diag_clear(diag_get());
> -				else
> +					vclock_follow_xrow(&replicaset.vclock, &row);

Hmm, do we really need to advance the vclock here?

> +				} else
>  					diag_raise();
>  			}
> -		}
> +		} else latch_unlock(latch);

Coding style: latch_unlock should be on the next line.

Anyway, could you please rewrite this part so that latch_lock and
latch_unlock stay on the same level?

>  		if (applier->state == APPLIER_SYNC ||
>  		    applier->state == APPLIER_FOLLOW)
>  			fiber_cond_signal(&applier->writer_cond);
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 3b50d3629..c6c3c4ee5 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -121,6 +121,11 @@ struct wal_writer
>  	 * with this LSN and LSN becomes "real".
>  	 */
>  	struct vclock vclock;
> +	/*
> +	 * Uncommitted vclock of the latest entry to be written,
> +	 * which will become "real" on a successful WAL write.
> +	 */
> +	struct vclock req_vclock;
>  	/**
>  	 * VClock of the most recent successfully created checkpoint.
>  	 * The WAL writer must not delete WAL files that are needed to
> @@ -171,6 +176,11 @@ struct wal_msg {
>  	 * be rolled back.
>  	 */
>  	struct stailq rollback;
> +	/**
> +	 * Vclock of latest successfully written request in batch
> +	 * used to advance replicaset vclock.
> +	 */
> +	struct vclock vclock;

I don't understand this part. Why do you need to introduce these
wal_msg::vclock and wal_writer::req_vclock. Can't you simply use
the vclock of the last written row in wal_write()?

>  };
>  
>  /**
> @@ -284,6 +294,7 @@ tx_schedule_commit(struct cmsg *msg)
>  		/* Closes the input valve. */
>  		stailq_concat(&writer->rollback, &batch->rollback);
>  	}
> +	vclock_copy(&replicaset.vclock, &batch->vclock);
>  	tx_schedule_queue(&batch->commit);
>  }
>  
> @@ -368,6 +379,7 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
>  	writer->checkpoint_triggered = false;
>  
>  	vclock_copy(&writer->vclock, vclock);
> +	vclock_copy(&writer->req_vclock, vclock);
>  	vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
>  	rlist_create(&writer->watchers);
>  
> @@ -907,10 +919,15 @@ wal_assign_lsn(struct wal_writer *writer, struct xrow_header **row,
>  	/** Assign LSN to all local rows. */
>  	for ( ; row < end; row++) {
>  		if ((*row)->replica_id == 0) {
> -			(*row)->lsn = vclock_inc(&writer->vclock, instance_id);
> +			(*row)->lsn = vclock_inc(&writer->req_vclock, instance_id);
>  			(*row)->replica_id = instance_id;
>  		} else {
> -			vclock_follow_xrow(&writer->vclock, *row);
> +			vclock_follow_xrow(&writer->req_vclock, *row);
> +		}
> +		struct errinj *inj = errinj(ERRINJ_WAL_LSN_GAP, ERRINJ_INT);

This error injection looks rather artifical to me. Can such a thing
happen in the real world?

Anyway, why do you need it at all? AFAICS you don't use it in the
replication test, which is supposed to test the feature. I don't
want to dive into the xlog tests so could you please elaborate here
or, even better, in the commit message?

> +		if (inj != NULL && inj->iparam > 0) {
> +			(*row)->lsn += inj->iparam;
> +			vclock_follow_xrow(&writer->req_vclock, *row);
>  		}
>  	}
>  }
> @@ -975,13 +992,14 @@ wal_write_to_disk(struct cmsg *msg)
>  	struct stailq_entry *last_committed = NULL;
>  	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
>  		wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
> -		entry->res = vclock_sum(&writer->vclock);
> -		rc = xlog_write_entry(l, entry);
> +		entry->res = vclock_sum(&writer->req_vclock);
> +		int rc = xlog_write_entry(l, entry);
>  		if (rc < 0)
>  			goto done;
>  		if (rc > 0) {
>  			writer->checkpoint_wal_size += rc;
>  			last_committed = &entry->fifo;
> +			vclock_copy(&writer->vclock, &writer->req_vclock);
>  		}
>  		/* rc == 0: the write is buffered in xlog_tx */
>  	}
> @@ -991,6 +1009,7 @@ wal_write_to_disk(struct cmsg *msg)
>  
>  	writer->checkpoint_wal_size += rc;
>  	last_committed = stailq_last(&wal_msg->commit);
> +	vclock_copy(&writer->vclock, &writer->req_vclock);
>  
>  	/*
>  	 * Notify TX if the checkpoint threshold has been exceeded.
> @@ -1021,6 +1040,10 @@ done:
>  		error_log(error);
>  		diag_clear(diag_get());
>  	}
> +	/* Latest successfully written row vclock. */
> +	vclock_copy(&wal_msg->vclock, &writer->vclock);
> +	/* Rollback request vclock to the latest committed. */
> +	vclock_copy(&writer->req_vclock, &writer->vclock);
>  	/*
>  	 * We need to start rollback from the first request
>  	 * following the last committed request. If
> @@ -1152,31 +1175,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
>  	bool cancellable = fiber_set_cancellable(false);
>  	fiber_yield(); /* Request was inserted. */
>  	fiber_set_cancellable(cancellable);
> -	if (entry->res > 0) {
> -		struct xrow_header **last = entry->rows + entry->n_rows - 1;
> -		while (last >= entry->rows) {
> -			/*
> -			 * Find last row from local instance id
> -			 * and promote vclock.
> -			 */
> -			if ((*last)->replica_id == instance_id) {
> -				/*
> -				 * In master-master configuration, during sudden
> -				 * power-loss, if the data have not been written
> -				 * to WAL but have already been sent to others,
> -				 * they will send the data back. In this case
> -				 * vclock has already been promoted by applier.
> -				 */

It'd be nice to leave this comment.

> -				if (vclock_get(&replicaset.vclock,
> -					       instance_id) < (*last)->lsn) {
> -					vclock_follow_xrow(&replicaset.vclock,
> -							   *last);
> -				}
> -				break;
> -			}
> -			--last;
> -		}
> -	}
>  	return entry->res;
>  }
>  
> @@ -1187,11 +1185,12 @@ wal_write_in_wal_mode_none(struct journal *journal,
>  	struct wal_writer *writer = (struct wal_writer *) journal;
>  	wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
>  	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
> -	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
> +	int64_t new_lsn = vclock_get(&writer->req_vclock, instance_id);
>  	if (new_lsn > old_lsn) {
>  		/* There were local writes, promote vclock. */
>  		vclock_follow(&replicaset.vclock, instance_id, new_lsn);
>  	}
> +	vclock_copy(&writer->vclock, &writer->req_vclock);
>  	return vclock_sum(&writer->vclock);
>  }
>  



More information about the Tarantool-patches mailing list