[tarantool-patches] [PATCH 3/3] Promote replicaset.vclock only after wal

Vladimir Davydov vdavydov.dev at gmail.com
Wed Feb 6 17:45:58 MSK 2019


On Wed, Feb 06, 2019 at 11:29:59AM +0300, Georgy Kirichenko wrote:
> Get rid if appliers' vclock_follow and promote vclock only after
> wal write.

Gosh, what an ascetic commit message, really. Please instead take the
commit message Serge wrote when he tried to fix issue #2283.

> 
> Closes #2283
> Prerequisite #980
> ---
>  src/box/applier.cc                          | 14 ++------
>  src/box/wal.c                               | 38 ++++-----------------
>  test/replication/skip_conflict_row.test.lua | 19 +++++++++++
>  test/xlog-py/dup_key.result                 | 12 +++++--
>  test/xlog-py/dup_key.test.py                | 23 ++++++++++---
>  5 files changed, 57 insertions(+), 49 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index d87b247e2..cae71ec1c 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -518,22 +518,14 @@ applier_subscribe(struct applier *applier)
>  		/*
>  		 * In a full mesh topology, the same set
>  		 * of changes may arrive via two
> -		 * concurrently running appliers. Thanks
> -		 * to vclock_follow() above, the first row
> -		 * in the set will be skipped - but the
> -		 * remaining may execute out of order,
> -		 * when the following xstream_write()
> -		 * yields on WAL. Hence we need a latch to
> -		 * strictly order all changes which belong
> -		 * to the same server id.
> +		 * concurrently running appliers.
> +		 * Hence we need a latch to strictly order all
> +		 * changes which belong to the same server id.
>  		 */
>  		latch_lock(latch);
>  		if (vclock_get(&replicaset.vclock,
>  			       row.replica_id) < row.lsn) {
> -			vclock_follow_xrow(&replicaset.vclock, &row);
> -			latch_lock(latch);
>  			int res = xstream_write(applier->subscribe_stream, &row);
> -			latch_unlock(latch);

Yep, this should definitely be squashed in patch 2.

>  			if (res != 0) {
>  				struct error *e = diag_last_error(diag_get());
>  				/**
> diff --git a/src/box/wal.c b/src/box/wal.c
> index a55b544aa..4c3537672 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -171,6 +171,8 @@ struct wal_msg {
>  	 * be rolled back.
>  	 */
>  	struct stailq rollback;
> +	/** vclock after the batch processed. */
> +	struct vclock vclock;
>  };
>  
>  /**
> @@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch)
>  	batch->approx_len = 0;
>  	stailq_create(&batch->commit);
>  	stailq_create(&batch->rollback);
> +	vclock_create(&batch->vclock);
>  }
>  
>  static struct wal_msg *
> @@ -280,6 +283,7 @@ tx_schedule_commit(struct cmsg *msg)
>  	 * wal_msg memory disappears after the first
>  	 * iteration of tx_schedule_queue loop.
>  	 */
> +	vclock_copy(&replicaset.vclock, &batch->vclock);

An unfortunate place to add this. Now it looks like the comment above
relates to this vclock_copy although it doesn't. Please move it below
to avoid confusion.

>  	if (! stailq_empty(&batch->rollback)) {
>  		/* Closes the input valve. */
>  		stailq_concat(&writer->rollback, &batch->rollback);
> @@ -1028,6 +1032,8 @@ done:
>  		error_log(error);
>  		diag_clear(diag_get());
>  	}
> +	/* Set resulting vclock. */

Again, what a pointless comment... Would be more useful if you
elaborated a bit:

	Remember the vclock of the last successfully written row so
	that we can update replicaset.vclock once this message gets
	back to tx.

> +	vclock_copy(&wal_msg->vclock, &writer->vclock);
>  	/*
>  	 * We need to start rollback from the first request
>  	 * following the last committed request. If
> @@ -1159,31 +1165,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
>  	bool cancellable = fiber_set_cancellable(false);
>  	fiber_yield(); /* Request was inserted. */
>  	fiber_set_cancellable(cancellable);
> -	if (entry->res > 0) {
> -		struct xrow_header **last = entry->rows + entry->n_rows - 1;
> -		while (last >= entry->rows) {
> -			/*
> -			 * Find last row from local instance id
> -			 * and promote vclock.
> -			 */
> -			if ((*last)->replica_id == instance_id) {
> -				/*
> -				 * In master-master configuration, during sudden
> -				 * power-loss, if the data have not been written
> -				 * to WAL but have already been sent to others,
> -				 * they will send the data back. In this case
> -				 * vclock has already been promoted by applier.
> -				 */
> -				if (vclock_get(&replicaset.vclock,
> -					       instance_id) < (*last)->lsn) {
> -					vclock_follow_xrow(&replicaset.vclock,
> -							   *last);
> -				}
> -				break;
> -			}
> -			--last;
> -		}
> -	}
>  	return entry->res;
>  }
>  
> @@ -1193,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
>  {
>  	struct wal_writer *writer = (struct wal_writer *) journal;
>  	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
> -	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
> -	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
> -	if (new_lsn > old_lsn) {
> -		/* There were local writes, promote vclock. */
> -		vclock_follow(&replicaset.vclock, instance_id, new_lsn);
> -	}
> +	vclock_copy(&replicaset.vclock, &writer->vclock);
>  	return vclock_sum(&writer->vclock);
>  }
>  
> diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
> index 4406ced95..c60999b9b 100644
> --- a/test/replication/skip_conflict_row.test.lua
> +++ b/test/replication/skip_conflict_row.test.lua
> @@ -1,5 +1,6 @@
>  env = require('test_run')
>  test_run = env.new()
> +test_run:cmd("restart server default with cleanup=1")

Let's please try to rewrite the test without this. E.g. we could
restart the replica and check that replication doesn't resume
instead of checking vclocks.

>  engine = test_run:get_cfg('engine')
>  
>  box.schema.user.grant('guest', 'replication')
> @@ -28,6 +29,24 @@ box.space.test:select()
>  test_run:cmd("switch default")
>  box.info.status
>  
> +-- test that if replication_skip_conflict is off vclock
> +-- is not advanced on errors.
> +test_run:cmd("restart server replica")
> +test_run:cmd("switch replica")

> +box.cfg{replication_skip_conflict=false}

It's already unset after restart.

> +box.space.test:insert{3}
> +box.info.vclock
> +test_run:cmd("switch default")
> +box.space.test:insert{3, 3}
> +box.space.test:insert{4}
> +box.info.vclock
> +test_run:cmd("switch replica")
> +box.info.vclock
> +box.info.replication[1].upstream.message
> +box.info.replication[1].upstream.status
> +box.space.test:select()
> +test_run:cmd("switch default")
> +

We also want to check that replication isn't restarted

>  -- cleanup
>  test_run:cmd("stop server replica")
>  test_run:cmd("cleanup server replica")
> diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py
> index 1c033da40..e25b1d477 100644
> --- a/test/xlog-py/dup_key.test.py
> +++ b/test/xlog-py/dup_key.test.py
> @@ -22,23 +22,36 @@ wal = os.path.join(vardir, filename)

What happened to this test? (BTW I've already asked you that)

Last time I checked it passed with and without this patch.

>  # Create wal#1
>  server.admin("box.space.test:insert{1, 'first tuple'}")
>  server.admin("box.space.test:insert{2, 'second tuple'}")
> +lsn2 = int(yaml.load(server.admin("box.info.lsn", silent=True))[0])
>  server.stop()
>  
>  # Save wal#1
>  if os.access(wal, os.F_OK):
> -    print ".xlog exists"
> +    print ".xlog#1 exists"
>      os.rename(wal, wal_old)
> +# drop empty log created on shutdown
> +filename2 = str(lsn2).zfill(20) + ".xlog"
> +wal2 = os.path.join(vardir, filename2)
> +os.unlink(wal2)
>  
> -# Write wal#2
> +# Write wal#2 to bump lsn
> +server.start()
> +server.admin("box.space.test:insert{3, 'third tuple'}")
> +server.admin("box.space.test:insert{4, 'fourth tuple'}")
> +server.stop()
> +
> +if os.access(wal, os.F_OK):
> +    print ".xlog#2 exists"
> +
> +# Write wal#3 - confliction with wal#1
>  server.start()
>  server.admin("box.space.test:insert{1, 'third tuple'}")
>  server.admin("box.space.test:insert{2, 'fourth tuple'}")
>  server.stop()
>  
>  # Restore wal#1
> -if not os.access(wal, os.F_OK):
> -    print ".xlog does not exist"
> -    os.rename(wal_old, wal)
> +os.unlink(wal)
> +os.rename(wal_old, wal)
>  
>  server.start()
>  line = 'Duplicate key'



More information about the Tarantool-patches mailing list