[tarantool-patches] [PATCH v2 2/5] Update replicaset vclock from wal

Vladimir Davydov vdavydov.dev at gmail.com
Mon Jan 28 14:59:22 MSK 2019


On Tue, Jan 22, 2019 at 01:31:10PM +0300, Georgy Kirichenko wrote:
> Journal maintains replicaset vclock for recovery, local and replicated
> operations. Introduce replicaset.applier.vclock to prevent applier
> races.

What kind of races?

> 
> Prerequisite #980
> ---
>  src/box/applier.cc           | 68 +++++++++++++++++++++---------------
>  src/box/box.cc               |  3 +-
>  src/box/replication.cc       |  1 +
>  src/box/replication.h        |  3 ++
>  src/box/vclock.c             | 14 ++++++++
>  src/box/vclock.h             |  3 ++
>  src/box/wal.c                | 38 ++++----------------
>  test/xlog-py/dup_key.result  | 12 +++++--
>  test/xlog-py/dup_key.test.py | 18 +++++++---
>  9 files changed, 93 insertions(+), 67 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 21d2e6bcb..87873e970 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -139,7 +139,7 @@ applier_writer_f(va_list ap)
>  			continue;
>  		try {
>  			struct xrow_header xrow;
> -			xrow_encode_vclock(&xrow, &replicaset.vclock);
> +			xrow_encode_vclock(&xrow, &replicaset.applier.vclock);
>  			coio_write_xrow(&io, &xrow);
>  		} catch (SocketError *e) {
>  			/*
> @@ -300,7 +300,7 @@ applier_join(struct applier *applier)
>  		 * Used to initialize the replica's initial
>  		 * vclock in bootstrap_from_master()
>  		 */
> -		xrow_decode_vclock_xc(&row, &replicaset.vclock);
> +		xrow_decode_vclock_xc(&row, &replicaset.applier.vclock);
>  	}
>  
>  	applier_set_state(applier, APPLIER_INITIAL_JOIN);
> @@ -326,7 +326,8 @@ applier_join(struct applier *applier)
>  				 * vclock yet, do it now. In 1.7+
>  				 * this vclock is not used.
>  				 */
> -				xrow_decode_vclock_xc(&row, &replicaset.vclock);
> +				xrow_decode_vclock_xc(&row,
> +						      &replicaset.applier.vclock);
>  			}
>  			break; /* end of stream */
>  		} else if (iproto_type_is_error(row.type)) {
> @@ -336,6 +337,7 @@ applier_join(struct applier *applier)
>  				  (uint32_t) row.type);
>  		}
>  	}
> +	vclock_copy(&replicaset.vclock, &replicaset.applier.vclock);
>  	say_info("initial data received");
>  
>  	applier_set_state(applier, APPLIER_FINAL_JOIN);
> @@ -355,7 +357,7 @@ applier_join(struct applier *applier)
>  		coio_read_xrow(coio, ibuf, &row);
>  		applier->last_row_time = ev_monotonic_now(loop());
>  		if (iproto_type_is_dml(row.type)) {
> -			vclock_follow_xrow(&replicaset.vclock, &row);
> +			vclock_follow_xrow(&replicaset.applier.vclock, &row);
>  			xstream_write_xc(applier->subscribe_stream, &row);
>  			if (++row_count % 100000 == 0)
>  				say_info("%.1fM rows received", row_count / 1e6);
> @@ -386,6 +388,9 @@ applier_subscribe(struct applier *applier)
>  {
>  	assert(applier->subscribe_stream != NULL);
>  
> +	if (!vclock_is_set(&replicaset.applier.vclock))
> +		vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
> +
>  	/* Send SUBSCRIBE request */
>  	struct ev_io *coio = &applier->io;
>  	struct ibuf *ibuf = &applier->ibuf;
> @@ -470,19 +475,6 @@ applier_subscribe(struct applier *applier)
>  			applier_set_state(applier, APPLIER_FOLLOW);
>  		}
>  
> -		/*
> -		 * Stay 'orphan' until appliers catch up with
> -		 * the remote vclock at the time of SUBSCRIBE
> -		 * and the lag is less than configured.
> -		 */
> -		if (applier->state == APPLIER_SYNC &&
> -		    applier->lag <= replication_sync_lag &&
> -		    vclock_compare(&remote_vclock_at_subscribe,
> -				   &replicaset.vclock) <= 0) {
> -			/* Applier is synced, switch to "follow". */
> -			applier_set_state(applier, APPLIER_FOLLOW);
> -		}
> -

Hmm, why move this chunk?

>  		/*
>  		 * Tarantool < 1.7.7 does not send periodic heartbeat
>  		 * messages so we can't assume that if we haven't heard
> @@ -512,16 +504,18 @@ applier_subscribe(struct applier *applier)
>  
>  		applier->lag = ev_now(loop()) - row.tm;
>  		applier->last_row_time = ev_monotonic_now(loop());
> -
> -		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
> -			/**
> -			 * Promote the replica set vclock before
> -			 * applying the row. If there is an
> -			 * exception (conflict) applying the row,
> -			 * the row is skipped when the replication
> -			 * is resumed.
> -			 */
> -			vclock_follow_xrow(&replicaset.vclock, &row);
> +		if (vclock_get(&replicaset.applier.vclock,
> +			       row.replica_id) < row.lsn) {
> +			if (row.replica_id == instance_id &&
> +			    vclock_get(&replicaset.vclock, instance_id) <
> +			    row.lsn) {
> +				/* Local row returned back. */
> +				goto done;

How can it happen? Why should we handle it? Can you please elaborate the
comment?

> +			}
> +			/* Preserve old lsn value. */
> +			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
> +						     row.replica_id);
> +			vclock_follow_xrow(&replicaset.applier.vclock, &row);

So, AFAICS you want to promote replicaset.vclock after applying a row
received from a remote replica, and in order to avoid applying the same
row twice, you add replicaset.applier.vclock, is that correct?

However, in the next patch you wrap everything in a latch, rendering
this race impossible (BTW this is what Serge P. did too). So what's the
point in introducing another vclock here? If it is needed solely for
sync replication, then IMO we'd better do it in the scope of the
corresponding patch set, because currently it seems useless.

>  			struct replica *replica = replica_by_id(row.replica_id);
>  			struct latch *latch = (replica ? &replica->order_latch :
>  					       &replicaset.applier.order_latch);
> @@ -550,10 +544,28 @@ applier_subscribe(struct applier *applier)
>  				    box_error_code(e) == ER_TUPLE_FOUND &&
>  				    replication_skip_conflict)
>  					diag_clear(diag_get());
> -				else
> +				else {
> +					/* Rollback lsn to have a chance for a retry. */
> +					vclock_set(&replicaset.applier.vclock,
> +						   row.replica_id, old_lsn);
>  					diag_raise();
> +				}
>  			}
>  		}
> +done:
> +		/*
> +		 * Stay 'orphan' until appliers catch up with
> +		 * the remote vclock at the time of SUBSCRIBE
> +		 * and the lag is less than configured.
> +		 */
> +		if (applier->state == APPLIER_SYNC &&
> +		    applier->lag <= replication_sync_lag &&
> +		    vclock_compare(&remote_vclock_at_subscribe,
> +				   &replicaset.vclock) <= 0) {
> +			/* Applier is synced, switch to "follow". */
> +			applier_set_state(applier, APPLIER_FOLLOW);
> +		}
> +
>  		if (applier->state == APPLIER_SYNC ||
>  		    applier->state == APPLIER_FOLLOW)
>  			fiber_cond_signal(&applier->writer_cond);
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 9f2fd6da1..0df0875dd 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -292,6 +292,7 @@ recovery_journal_write(struct journal *base,
>  		       struct journal_entry * /* entry */)
>  {
>  	struct recovery_journal *journal = (struct recovery_journal *) base;
> +	vclock_copy(&replicaset.vclock, journal->vclock);
>  	return vclock_sum(journal->vclock);
>  }
>  
> @@ -1809,7 +1810,7 @@ bootstrap_from_master(struct replica *master)
>  	 */
>  	engine_begin_final_recovery_xc();
>  	struct recovery_journal journal;
> -	recovery_journal_create(&journal, &replicaset.vclock);
> +	recovery_journal_create(&journal, &replicaset.applier.vclock);

I fail to understand how this works. If we bump applier.vclock during
remote bootstrap, then where is replicaset.vclock set?

>  	journal_set(&journal.base);
>  
>  	applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index 2cb4ec0f8..51e08886c 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -90,6 +90,7 @@ replication_init(void)
>  	fiber_cond_create(&replicaset.applier.cond);
>  	replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *));
>  	latch_create(&replicaset.applier.order_latch);
> +	vclock_create(&replicaset.applier.vclock);
>  }
>  
>  void
> diff --git a/src/box/replication.h b/src/box/replication.h
> index 2ac620d86..b9aebed14 100644
> --- a/src/box/replication.h
> +++ b/src/box/replication.h
> @@ -194,6 +194,9 @@ struct replicaset {
>  	struct vclock vclock;
>  	/** Applier state. */
>  	struct {
> +		/**
> +		 * Vclock sent to process from appliers. */
> +		struct vclock vclock;
>  		/**
>  		 * Total number of replicas with attached
>  		 * appliers.
> diff --git a/src/box/vclock.c b/src/box/vclock.c
> index b5eb2800b..c297d1ff9 100644
> --- a/src/box/vclock.c
> +++ b/src/box/vclock.c
> @@ -36,6 +36,20 @@
>  
>  #include "diag.h"
>  
> +void
> +vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
> +{
> +	assert(lsn >= 0);
> +	assert(replica_id < VCLOCK_MAX);
> +	int64_t prev_lsn = vclock->lsn[replica_id];
> +	if (lsn > 0)
> +		vclock->map |= 1 << replica_id;
> +	else
> +		vclock->map &= ~(1 << replica_id);
> +	vclock->lsn[replica_id] = lsn;
> +	vclock->signature += lsn - prev_lsn;
> +}
> +
>  int64_t
>  vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
>  {
> diff --git a/src/box/vclock.h b/src/box/vclock.h
> index 111e29160..d6cb14c2a 100644
> --- a/src/box/vclock.h
> +++ b/src/box/vclock.h
> @@ -161,6 +161,9 @@ vclock_get(const struct vclock *vclock, uint32_t replica_id)
>  	return vclock->lsn[replica_id];
>  }
>  
> +void
> +vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn);
> +
>  static inline int64_t
>  vclock_inc(struct vclock *vclock, uint32_t replica_id)
>  {
> diff --git a/src/box/wal.c b/src/box/wal.c
> index a55b544aa..4c3537672 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -171,6 +171,8 @@ struct wal_msg {
>  	 * be rolled back.
>  	 */
>  	struct stailq rollback;
> +	/** vclock after the batch processed. */
> +	struct vclock vclock;
>  };
>  
>  /**
> @@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch)
>  	batch->approx_len = 0;
>  	stailq_create(&batch->commit);
>  	stailq_create(&batch->rollback);
> +	vclock_create(&batch->vclock);
>  }
>  
>  static struct wal_msg *
> @@ -280,6 +283,7 @@ tx_schedule_commit(struct cmsg *msg)
>  	 * wal_msg memory disappears after the first
>  	 * iteration of tx_schedule_queue loop.
>  	 */
> +	vclock_copy(&replicaset.vclock, &batch->vclock);
>  	if (! stailq_empty(&batch->rollback)) {
>  		/* Closes the input valve. */
>  		stailq_concat(&writer->rollback, &batch->rollback);
> @@ -1028,6 +1032,8 @@ done:
>  		error_log(error);
>  		diag_clear(diag_get());
>  	}
> +	/* Set resulting vclock. */
> +	vclock_copy(&wal_msg->vclock, &writer->vclock);
>  	/*
>  	 * We need to start rollback from the first request
>  	 * following the last committed request. If
> @@ -1159,31 +1165,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
>  	bool cancellable = fiber_set_cancellable(false);
>  	fiber_yield(); /* Request was inserted. */
>  	fiber_set_cancellable(cancellable);
> -	if (entry->res > 0) {
> -		struct xrow_header **last = entry->rows + entry->n_rows - 1;
> -		while (last >= entry->rows) {
> -			/*
> -			 * Find last row from local instance id
> -			 * and promote vclock.
> -			 */
> -			if ((*last)->replica_id == instance_id) {
> -				/*
> -				 * In master-master configuration, during sudden
> -				 * power-loss, if the data have not been written
> -				 * to WAL but have already been sent to others,
> -				 * they will send the data back. In this case
> -				 * vclock has already been promoted by applier.
> -				 */
> -				if (vclock_get(&replicaset.vclock,
> -					       instance_id) < (*last)->lsn) {
> -					vclock_follow_xrow(&replicaset.vclock,
> -							   *last);
> -				}
> -				break;
> -			}
> -			--last;
> -		}
> -	}
>  	return entry->res;
>  }
>  
> @@ -1193,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
>  {
>  	struct wal_writer *writer = (struct wal_writer *) journal;
>  	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
> -	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
> -	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
> -	if (new_lsn > old_lsn) {
> -		/* There were local writes, promote vclock. */
> -		vclock_follow(&replicaset.vclock, instance_id, new_lsn);
> -	}
> +	vclock_copy(&replicaset.vclock, &writer->vclock);
>  	return vclock_sum(&writer->vclock);
>  }
>  
> diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result
> index f387e8e89..966fa1f4a 100644
> --- a/test/xlog-py/dup_key.result
> +++ b/test/xlog-py/dup_key.result
> @@ -16,7 +16,16 @@ box.space.test:insert{2, 'second tuple'}
>  ---
>  - [2, 'second tuple']
>  ...
> -.xlog exists
> +.xlog#1 exists
> +box.space.test:insert{3, 'third tuple'}
> +---
> +- [3, 'third tuple']
> +...
> +box.space.test:insert{4, 'fourth tuple'}
> +---
> +- [4, 'fourth tuple']
> +...
> +.xlog#2 exists
>  box.space.test:insert{1, 'third tuple'}
>  ---
>  - [1, 'third tuple']

What happened to this test?

> @@ -25,7 +34,6 @@ box.space.test:insert{2, 'fourth tuple'}
>  ---
>  - [2, 'fourth tuple']
>  ...
> -.xlog does not exist
>  check log line for 'Duplicate key'
>  
>  'Duplicate key' exists in server log
> diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py
> index 1c033da40..3dacde771 100644
> --- a/test/xlog-py/dup_key.test.py
> +++ b/test/xlog-py/dup_key.test.py
> @@ -26,19 +26,27 @@ server.stop()
>  
>  # Save wal#1
>  if os.access(wal, os.F_OK):
> -    print ".xlog exists"
> +    print ".xlog#1 exists"
>      os.rename(wal, wal_old)
>  
> -# Write wal#2
> +# Write wal#2 to bump lsn
> +server.start()
> +server.admin("box.space.test:insert{3, 'third tuple'}")
> +server.admin("box.space.test:insert{4, 'fourth tuple'}")
> +server.stop()
> +
> +if os.access(wal, os.F_OK):
> +    print ".xlog#2 exists"
> +
> +# Write wal#3 - confliction with wal#1
>  server.start()
>  server.admin("box.space.test:insert{1, 'third tuple'}")
>  server.admin("box.space.test:insert{2, 'fourth tuple'}")
>  server.stop()
>  
>  # Restore wal#1
> -if not os.access(wal, os.F_OK):
> -    print ".xlog does not exist"
> -    os.rename(wal_old, wal)
> +os.unlink(wal)
> +os.rename(wal_old, wal)



More information about the Tarantool-patches mailing list