[tarantool-patches] [PATCH] replication: do not skip master's rows in case of an error

Serge Petrenko sergepetrenko at tarantool.org
Mon Dec 24 13:45:22 MSK 2018


Hi! Thank you for review!
I pushed the new version of the patch on the branch and answered your comments below..
Incremental diff is below.

> 11 дек. 2018 г., в 13:42, Vladimir Davydov <vdavydov.dev at gmail.com> написал(а):
> 
> On Mon, Dec 10, 2018 at 06:36:00PM +0300, Serge Petrenko wrote:
>> Applier used to promote vclock prior to applying the row. This lead to a
>> situation when master's row would be skipped forever in case there is an
>> error trying to apply it. However, some errors are transient, and we
>> might be able to successfully apply the same row later.
>> 
>> So only advance vclock if a corresponding row is successfully written to
>> WAL. This makes sure that rows are not skipped.
>> replication_skip_conflict works as before: if it is set an a conflict
>> occurs while applying the row, it is silently ignored and the vclock is
>> advanced, so the row is never applied.
>> 
>> While we're at it, make wal writer the only one responsible for
>> advancing replicaset vclock. It was already doing it for rows coming from
>> the local instance, besides, it makes the code cleaner since now we want
>> to advance vclock only after a successful write and lets us get rid of
>> unnecessary checks whether applier or wal has already advanced the
>> vclock.
>> 
>> Closes #2283
>> ---
>> https://github.com/tarantool/tarantool/issues/2283
>> https://github.com/tarantool/tarantool/tree/sp/gh-2283-dont-skip-rows
>> 
>> src/box/applier.cc                          |  49 +++++-----
>> src/box/wal.c                               |  59 ++++++-----
>> src/errinj.h                                |   1 +
>> test/box/errinj.result                      |   2 +
>> test/replication/skip_conflict_row.result   |  64 ++++++++++++
>> test/replication/skip_conflict_row.test.lua |  19 ++++
>> test/xlog/errinj.result                     |  17 +++-
>> test/xlog/errinj.test.lua                   |   4 +
>> test/xlog/panic_on_lsn_gap.result           | 103 +++++++++++---------
>> test/xlog/panic_on_lsn_gap.test.lua         |  46 +++++----
>> 10 files changed, 241 insertions(+), 123 deletions(-)
>> 
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index ff4af95e5..bdc303a7d 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -355,6 +355,10 @@ applier_join(struct applier *applier)
>> 		coio_read_xrow(coio, ibuf, &row);
>> 		applier->last_row_time = ev_monotonic_now(loop());
>> 		if (iproto_type_is_dml(row.type)) {
>> +			/*
>> +			 * Manually advance vclock, since no WAL writes
>> +			 * are done during this stage of recovery.
>> +			 */
>> 			vclock_follow_xrow(&replicaset.vclock, &row);
>> 			xstream_write_xc(applier->subscribe_stream, &row);
>> 			if (++row_count % 100000 == 0)
>> @@ -513,31 +517,21 @@ applier_subscribe(struct applier *applier)
>> 		applier->lag = ev_now(loop()) - row.tm;
>> 		applier->last_row_time = ev_monotonic_now(loop());
>> 
>> +		/*
>> +		 * In a full mesh topology, the same set
>> +		 * of changes may arrive via two
>> +		 * concurrently running appliers.
>> +		 * Thus the rows may execute out of order,
>> +		 * when the following xstream_write()
>> +		 * yields on WAL. Hence we need a latch to
>> +		 * strictly order all changes which belong
>> +		 * to the same server id.
>> +		 */
>> +		struct replica *replica = replica_by_id(row.replica_id);
>> +		struct latch *latch = (replica ? &replica->order_latch :
>> +				       &replicaset.applier.order_latch);
>> +		latch_lock(latch);
>> 		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
>> -			/**
>> -			 * Promote the replica set vclock before
>> -			 * applying the row. If there is an
>> -			 * exception (conflict) applying the row,
>> -			 * the row is skipped when the replication
>> -			 * is resumed.
>> -			 */
>> -			vclock_follow_xrow(&replicaset.vclock, &row);
>> -			struct replica *replica = replica_by_id(row.replica_id);
>> -			struct latch *latch = (replica ? &replica->order_latch :
>> -					       &replicaset.applier.order_latch);
>> -			/*
>> -			 * In a full mesh topology, the same set
>> -			 * of changes may arrive via two
>> -			 * concurrently running appliers. Thanks
>> -			 * to vclock_follow() above, the first row
>> -			 * in the set will be skipped - but the
>> -			 * remaining may execute out of order,
>> -			 * when the following xstream_write()
>> -			 * yields on WAL. Hence we need a latch to
>> -			 * strictly order all changes which belong
>> -			 * to the same server id.
>> -			 */
>> -			latch_lock(latch);
>> 			int res = xstream_write(applier->subscribe_stream, &row);
>> 			latch_unlock(latch);
>> 			if (res != 0) {
>> @@ -548,12 +542,13 @@ applier_subscribe(struct applier *applier)
>> 				 */
>> 				if (e->type == &type_ClientError &&
>> 				    box_error_code(e) == ER_TUPLE_FOUND &&
>> -				    replication_skip_conflict)
>> +				    replication_skip_conflict) {
>> 					diag_clear(diag_get());
>> -				else
>> +					vclock_follow_xrow(&replicaset.vclock, &row);
> 
> Hmm, do we really need to advance the vclock here?

if xstream_write above fails, vclock is not advanced anywhere, so we have to advance it manually
if we want to skip the row, as directed by replication_skip_conflict option.

> 
>> +				} else
>> 					diag_raise();
>> 			}
>> -		}
>> +		} else latch_unlock(latch);
> 
> Coding style: latch_unlock should be on the next line.
> 
> Anyway, could you please rewrite this part so that latch_lock and
> latch_unlock stay on the same level?

Rewrote this part, incremental diff is below.

> 
>> 		if (applier->state == APPLIER_SYNC ||
>> 		    applier->state == APPLIER_FOLLOW)
>> 			fiber_cond_signal(&applier->writer_cond);
>> diff --git a/src/box/wal.c b/src/box/wal.c
>> index 3b50d3629..c6c3c4ee5 100644
>> --- a/src/box/wal.c
>> +++ b/src/box/wal.c
>> @@ -121,6 +121,11 @@ struct wal_writer
>> 	 * with this LSN and LSN becomes "real".
>> 	 */
>> 	struct vclock vclock;
>> +	/*
>> +	 * Uncommitted vclock of the latest entry to be written,
>> +	 * which will become "real" on a successful WAL write.
>> +	 */
>> +	struct vclock req_vclock;
>> 	/**
>> 	 * VClock of the most recent successfully created checkpoint.
>> 	 * The WAL writer must not delete WAL files that are needed to
>> @@ -171,6 +176,11 @@ struct wal_msg {
>> 	 * be rolled back.
>> 	 */
>> 	struct stailq rollback;
>> +	/**
>> +	 * Vclock of latest successfully written request in batch
>> +	 * used to advance replicaset vclock.
>> +	 */
>> +	struct vclock vclock;
> 
> I don't understand this part. Why do you need to introduce these
> wal_msg::vclock and wal_writer::req_vclock. Can't you simply use
> the vclock of the last written row in wal_write()?

A batch can have rows from different instances. Previously WAL writer
only advanced replicaset vclock for the rows from the local instance, so we
could use the last written row from an instance to advance vclock.
Now WAL writer advances replicaset vclock for all the rows coming from
every cluster member, so we would have to search the batch for the last
written row from every instance, which is rather cumbersome.
So we better get a separate vclock for that to advance it on every successful WAL write
and then copy it to replicaset vclock.

> 
>> };
>> 
>> /**
>> @@ -284,6 +294,7 @@ tx_schedule_commit(struct cmsg *msg)
>> 		/* Closes the input valve. */
>> 		stailq_concat(&writer->rollback, &batch->rollback);
>> 	}
>> +	vclock_copy(&replicaset.vclock, &batch->vclock);
>> 	tx_schedule_queue(&batch->commit);
>> }
>> 
>> @@ -368,6 +379,7 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
>> 	writer->checkpoint_triggered = false;
>> 
>> 	vclock_copy(&writer->vclock, vclock);
>> +	vclock_copy(&writer->req_vclock, vclock);
>> 	vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
>> 	rlist_create(&writer->watchers);
>> 
>> @@ -907,10 +919,15 @@ wal_assign_lsn(struct wal_writer *writer, struct xrow_header **row,
>> 	/** Assign LSN to all local rows. */
>> 	for ( ; row < end; row++) {
>> 		if ((*row)->replica_id == 0) {
>> -			(*row)->lsn = vclock_inc(&writer->vclock, instance_id);
>> +			(*row)->lsn = vclock_inc(&writer->req_vclock, instance_id);
>> 			(*row)->replica_id = instance_id;
>> 		} else {
>> -			vclock_follow_xrow(&writer->vclock, *row);
>> +			vclock_follow_xrow(&writer->req_vclock, *row);
>> +		}
>> +		struct errinj *inj = errinj(ERRINJ_WAL_LSN_GAP, ERRINJ_INT);
> 
> This error injection looks rather artifical to me. Can such a thing
> happen in the real world?
> 
> Anyway, why do you need it at all? AFAICS you don't use it in the
> replication test, which is supposed to test the feature. I don't
> want to dive into the xlog tests so could you please elaborate here
> or, even better, in the commit message?

In a test xlog/panic_on_lsn_gap.test.lua we created an lsn gap in a file by
simulating a failed WAL write, which caused the writer to advance its vclock
The way to create an lsn gap in a WAL doesn’t work anymore, because this
patch alters WAL writer to only advance its vclock on successful journal writes.
This error injection was added to simulate the old behaviour.

I guess we can drop it together with xlog/panic_on_lsn_gap.test.lua.
AFAICS the bug this test is for is not reproducible anymore.
What do you think?

> 
>> +		if (inj != NULL && inj->iparam > 0) {
>> +			(*row)->lsn += inj->iparam;
>> +			vclock_follow_xrow(&writer->req_vclock, *row);
>> 		}
>> 	}
>> }
>> @@ -975,13 +992,14 @@ wal_write_to_disk(struct cmsg *msg)
>> 	struct stailq_entry *last_committed = NULL;
>> 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
>> 		wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
>> -		entry->res = vclock_sum(&writer->vclock);
>> -		rc = xlog_write_entry(l, entry);
>> +		entry->res = vclock_sum(&writer->req_vclock);
>> +		int rc = xlog_write_entry(l, entry);
>> 		if (rc < 0)
>> 			goto done;
>> 		if (rc > 0) {
>> 			writer->checkpoint_wal_size += rc;
>> 			last_committed = &entry->fifo;
>> +			vclock_copy(&writer->vclock, &writer->req_vclock);
>> 		}
>> 		/* rc == 0: the write is buffered in xlog_tx */
>> 	}
>> @@ -991,6 +1009,7 @@ wal_write_to_disk(struct cmsg *msg)
>> 
>> 	writer->checkpoint_wal_size += rc;
>> 	last_committed = stailq_last(&wal_msg->commit);
>> +	vclock_copy(&writer->vclock, &writer->req_vclock);
>> 
>> 	/*
>> 	 * Notify TX if the checkpoint threshold has been exceeded.
>> @@ -1021,6 +1040,10 @@ done:
>> 		error_log(error);
>> 		diag_clear(diag_get());
>> 	}
>> +	/* Latest successfully written row vclock. */
>> +	vclock_copy(&wal_msg->vclock, &writer->vclock);
>> +	/* Rollback request vclock to the latest committed. */
>> +	vclock_copy(&writer->req_vclock, &writer->vclock);
>> 	/*
>> 	 * We need to start rollback from the first request
>> 	 * following the last committed request. If
>> @@ -1152,31 +1175,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
>> 	bool cancellable = fiber_set_cancellable(false);
>> 	fiber_yield(); /* Request was inserted. */
>> 	fiber_set_cancellable(cancellable);
>> -	if (entry->res > 0) {
>> -		struct xrow_header **last = entry->rows + entry->n_rows - 1;
>> -		while (last >= entry->rows) {
>> -			/*
>> -			 * Find last row from local instance id
>> -			 * and promote vclock.
>> -			 */
>> -			if ((*last)->replica_id == instance_id) {
>> -				/*
>> -				 * In master-master configuration, during sudden
>> -				 * power-loss, if the data have not been written
>> -				 * to WAL but have already been sent to others,
>> -				 * they will send the data back. In this case
>> -				 * vclock has already been promoted by applier.
>> -				 */
> 
> It'd be nice to leave this comment.

Applier doesn’t advance vclock anymore. We can leave the 
>> -				 * In master-master configuration, during sudden
>> -				 * power-loss, if the data have not been written
>> -				 * to WAL but have already been sent to others,
>> -				 * they will send the data back.

part, but I don’t know, where to put it now.

> 
>> -				if (vclock_get(&replicaset.vclock,
>> -					       instance_id) < (*last)->lsn) {
>> -					vclock_follow_xrow(&replicaset.vclock,
>> -							   *last);
>> -				}
>> -				break;
>> -			}
>> -			--last;
>> -		}
>> -	}
>> 	return entry->res;
>> }
>> 
>> @@ -1187,11 +1185,12 @@ wal_write_in_wal_mode_none(struct journal *journal,
>> 	struct wal_writer *writer = (struct wal_writer *) journal;
>> 	wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
>> 	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
>> -	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
>> +	int64_t new_lsn = vclock_get(&writer->req_vclock, instance_id);
>> 	if (new_lsn > old_lsn) {
>> 		/* There were local writes, promote vclock. */
>> 		vclock_follow(&replicaset.vclock, instance_id, new_lsn);
>> 	}
>> +	vclock_copy(&writer->vclock, &writer->req_vclock);
>> 	return vclock_sum(&writer->vclock);
>> }

Incremental diff:

diff --git a/src/box/applier.cc b/src/box/applier.cc
index bdc303a7d..6fac2561a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -530,25 +530,32 @@ applier_subscribe(struct applier *applier)
                struct replica *replica = replica_by_id(row.replica_id);
                struct latch *latch = (replica ? &replica->order_latch :
                                       &replicaset.applier.order_latch);
+               int res = 0;
+
                latch_lock(latch);
-               if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-                       int res = xstream_write(applier->subscribe_stream, &row);
-                       latch_unlock(latch);
-                       if (res != 0) {
-                               struct error *e = diag_last_error(diag_get());
-                               /**
-                                * Silently skip ER_TUPLE_FOUND error if such
-                                * option is set in config.
-                                */
-                               if (e->type == &type_ClientError &&
-                                   box_error_code(e) == ER_TUPLE_FOUND &&
-                                   replication_skip_conflict) {
-                                       diag_clear(diag_get());
-                                       vclock_follow_xrow(&replicaset.vclock, &row);
-                               } else
-                                       diag_raise();
-                       }
-               } else latch_unlock(latch);
+               if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn)
+                       res = xstream_write(applier->subscribe_stream, &row);
+               latch_unlock(latch);
+
+               if (res != 0) {
+                       struct error *e = diag_last_error(diag_get());
+                       /**
+                        * Silently skip ER_TUPLE_FOUND error if such
+                        * option is set in config.
+                        * Also advance the vclock manually, since
+                        * WAL writer only advances it on successful
+                        * writes, which is not the case.
+                        * ER_TUPLE_FOUND occurs even before WAL write.
+                        */
+                       if (e->type == &type_ClientError &&
+                           box_error_code(e) == ER_TUPLE_FOUND &&
+                           replication_skip_conflict) {
+                               diag_clear(diag_get());
+                               vclock_follow_xrow(&replicaset.vclock, &row);
+                       } else
+                               diag_raise();
+               }
+
                if (applier->state == APPLIER_SYNC ||
                    applier->state == APPLIER_FOLLOW)
                        fiber_cond_signal(&applier->writer_cond);

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20181224/bab0274e/attachment.html>


More information about the Tarantool-patches mailing list