[tarantool-patches] Re: [PATCH v2 2/5] Update replicaset vclock from wal

Георгий Кириченко georgy at tarantool.org
Tue Jan 29 13:33:37 MSK 2019


On Monday, January 28, 2019 2:59:22 PM MSK Vladimir Davydov wrote:
> On Tue, Jan 22, 2019 at 01:31:10PM +0300, Georgy Kirichenko wrote:
> > Journal maintains replicaset vclock for recovery, local and replicated
> > operations. Introduce replicaset.applier.vclock to prevent applier
> > races.
> 
> What kind of races?
> 
> > Prerequisite #980
> > ---
> > 
> >  src/box/applier.cc           | 68 +++++++++++++++++++++---------------
> >  src/box/box.cc               |  3 +-
> >  src/box/replication.cc       |  1 +
> >  src/box/replication.h        |  3 ++
> >  src/box/vclock.c             | 14 ++++++++
> >  src/box/vclock.h             |  3 ++
> >  src/box/wal.c                | 38 ++++----------------
> >  test/xlog-py/dup_key.result  | 12 +++++--
> >  test/xlog-py/dup_key.test.py | 18 +++++++---
> >  9 files changed, 93 insertions(+), 67 deletions(-)
> > 
> > diff --git a/src/box/applier.cc b/src/box/applier.cc
> > index 21d2e6bcb..87873e970 100644
> > --- a/src/box/applier.cc
> > +++ b/src/box/applier.cc
> > @@ -139,7 +139,7 @@ applier_writer_f(va_list ap)
> > 
> >  			continue;
> >  		
> >  		try {
> >  		
> >  			struct xrow_header xrow;
> > 
> > -			xrow_encode_vclock(&xrow, &replicaset.vclock);
> > +			xrow_encode_vclock(&xrow, &replicaset.applier.vclock);
> > 
> >  			coio_write_xrow(&io, &xrow);
> >  		
> >  		} catch (SocketError *e) {
> >  		
> >  			/*
> > 
> > @@ -300,7 +300,7 @@ applier_join(struct applier *applier)
> > 
> >  		 * Used to initialize the replica's initial
> >  		 * vclock in bootstrap_from_master()
> >  		 */
> > 
> > -		xrow_decode_vclock_xc(&row, &replicaset.vclock);
> > +		xrow_decode_vclock_xc(&row, &replicaset.applier.vclock);
> > 
> >  	}
> >  	
> >  	applier_set_state(applier, APPLIER_INITIAL_JOIN);
> > 
> > @@ -326,7 +326,8 @@ applier_join(struct applier *applier)
> > 
> >  				 * vclock yet, do it now. In 1.7+
> >  				 * this vclock is not used.
> >  				 */
> > 
> > -				xrow_decode_vclock_xc(&row, &replicaset.vclock);
> > +				xrow_decode_vclock_xc(&row,
> > +						      &replicaset.applier.vclock);
> > 
> >  			}
> >  			break; /* end of stream */
> >  		
> >  		} else if (iproto_type_is_error(row.type)) {
> > 
> > @@ -336,6 +337,7 @@ applier_join(struct applier *applier)
> > 
> >  				  (uint32_t) row.type);
> >  		
> >  		}
> >  	
> >  	}
> > 
> > +	vclock_copy(&replicaset.vclock, &replicaset.applier.vclock);
> > 
> >  	say_info("initial data received");
> >  	
> >  	applier_set_state(applier, APPLIER_FINAL_JOIN);
> > 
> > @@ -355,7 +357,7 @@ applier_join(struct applier *applier)
> > 
> >  		coio_read_xrow(coio, ibuf, &row);
> >  		applier->last_row_time = ev_monotonic_now(loop());
> >  		if (iproto_type_is_dml(row.type)) {
> > 
> > -			vclock_follow_xrow(&replicaset.vclock, &row);
> > +			vclock_follow_xrow(&replicaset.applier.vclock, &row);
> > 
> >  			xstream_write_xc(applier->subscribe_stream, &row);
> >  			if (++row_count % 100000 == 0)
> >  			
> >  				say_info("%.1fM rows received", row_count / 
1e6);
> > 
> > @@ -386,6 +388,9 @@ applier_subscribe(struct applier *applier)
> > 
> >  {
> >  
> >  	assert(applier->subscribe_stream != NULL);
> > 
> > +	if (!vclock_is_set(&replicaset.applier.vclock))
> > +		vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
> > +
> > 
> >  	/* Send SUBSCRIBE request */
> >  	struct ev_io *coio = &applier->io;
> >  	struct ibuf *ibuf = &applier->ibuf;
> > 
> > @@ -470,19 +475,6 @@ applier_subscribe(struct applier *applier)
> > 
> >  			applier_set_state(applier, APPLIER_FOLLOW);
> >  		
> >  		}
> > 
> > -		/*
> > -		 * Stay 'orphan' until appliers catch up with
> > -		 * the remote vclock at the time of SUBSCRIBE
> > -		 * and the lag is less than configured.
> > -		 */
> > -		if (applier->state == APPLIER_SYNC &&
> > -		    applier->lag <= replication_sync_lag &&
> > -		    vclock_compare(&remote_vclock_at_subscribe,
> > -				   &replicaset.vclock) <= 0) {
> > -			/* Applier is synced, switch to "follow". */
> > -			applier_set_state(applier, APPLIER_FOLLOW);
> > -		}
> > -
> 
> Hmm, why move this chunk?
> 
> >  		/*
> >  		
> >  		 * Tarantool < 1.7.7 does not send periodic heartbeat
> >  		 * messages so we can't assume that if we haven't heard
> > 
> > @@ -512,16 +504,18 @@ applier_subscribe(struct applier *applier)
> > 
> >  		applier->lag = ev_now(loop()) - row.tm;
> >  		applier->last_row_time = ev_monotonic_now(loop());
> > 
> > -
> > -		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
> > -			/**
> > -			 * Promote the replica set vclock before
> > -			 * applying the row. If there is an
> > -			 * exception (conflict) applying the row,
> > -			 * the row is skipped when the replication
> > -			 * is resumed.
> > -			 */
> > -			vclock_follow_xrow(&replicaset.vclock, &row);
> > +		if (vclock_get(&replicaset.applier.vclock,
> > +			       row.replica_id) < row.lsn) {
> > +			if (row.replica_id == instance_id &&
> > +			    vclock_get(&replicaset.vclock, instance_id) <
> > +			    row.lsn) {
> > +				/* Local row returned back. */
> > +				goto done;
> 
> How can it happen? Why should we handle it? Can you please elaborate the
> comment?
> 
> > +			}
> > +			/* Preserve old lsn value. */
> > +			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
> > +						     row.replica_id);
> > +			vclock_follow_xrow(&replicaset.applier.vclock, &row);
> 
> So, AFAICS you want to promote replicaset.vclock after applying a row
> received from a remote replica, and in order to avoid applying the same
> row twice, you add replicaset.applier.vclock, is that correct?
> 
> However, in the next patch you wrap everything in a latch, rendering
> this race impossible (BTW this is what Serge P. did too). So what's the
> point in introducing another vclock here? If it is needed solely for
> sync replication, then IMO we'd better do it in the scope of the
> corresponding patch set, because currently it seems useless.
You are right but it is a preparation part, I hope I will be able to unlock 
latch just before commit to allow parallel applier.
I think one of patches 2 or 3 could be eliminated right now. Lets talk about 
it.
> 
> >  			struct replica *replica = 
replica_by_id(row.replica_id);
> >  			
> >  			struct latch *latch = (replica ? &replica->order_latch 
:
> >  					       &replicaset.applier.order_latch);
> > 
> > @@ -550,10 +544,28 @@ applier_subscribe(struct applier *applier)
> > 
> >  				    box_error_code(e) == ER_TUPLE_FOUND &&
> >  				    replication_skip_conflict)
> >  					
> >  					diag_clear(diag_get());
> > 
> > -				else
> > +				else {
> > +					/* Rollback lsn to have a chance for a 
retry. */
> > +					vclock_set(&replicaset.applier.vclock,
> > +						   row.replica_id, old_lsn);
> > 
> >  					diag_raise();
> > 
> > +				}
> > 
> >  			}
> >  		
> >  		}
> > 
> > +done:
> > +		/*
> > +		 * Stay 'orphan' until appliers catch up with
> > +		 * the remote vclock at the time of SUBSCRIBE
> > +		 * and the lag is less than configured.
> > +		 */
> > +		if (applier->state == APPLIER_SYNC &&
> > +		    applier->lag <= replication_sync_lag &&
> > +		    vclock_compare(&remote_vclock_at_subscribe,
> > +				   &replicaset.vclock) <= 0) {
> > +			/* Applier is synced, switch to "follow". */
> > +			applier_set_state(applier, APPLIER_FOLLOW);
> > +		}
> > +
> > 
> >  		if (applier->state == APPLIER_SYNC ||
> >  		
> >  		    applier->state == APPLIER_FOLLOW)
> >  			
> >  			fiber_cond_signal(&applier->writer_cond);
> > 
> > diff --git a/src/box/box.cc b/src/box/box.cc
> > index 9f2fd6da1..0df0875dd 100644
> > --- a/src/box/box.cc
> > +++ b/src/box/box.cc
> > @@ -292,6 +292,7 @@ recovery_journal_write(struct journal *base,
> > 
> >  		       struct journal_entry * /* entry */)
> >  
> >  {
> >  
> >  	struct recovery_journal *journal = (struct recovery_journal *) base;
> > 
> > +	vclock_copy(&replicaset.vclock, journal->vclock);
> > 
> >  	return vclock_sum(journal->vclock);
> >  
> >  }
> > 
> > @@ -1809,7 +1810,7 @@ bootstrap_from_master(struct replica *master)
> > 
> >  	 */
> >  	
> >  	engine_begin_final_recovery_xc();
> >  	struct recovery_journal journal;
> > 
> > -	recovery_journal_create(&journal, &replicaset.vclock);
> > +	recovery_journal_create(&journal, &replicaset.applier.vclock);
> 
> I fail to understand how this works. If we bump applier.vclock during
> remote bootstrap, then where is replicaset.vclock set?
> 
> >  	journal_set(&journal.base);
> >  	
> >  	applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
> > 
> > diff --git a/src/box/replication.cc b/src/box/replication.cc
> > index 2cb4ec0f8..51e08886c 100644
> > --- a/src/box/replication.cc
> > +++ b/src/box/replication.cc
> > @@ -90,6 +90,7 @@ replication_init(void)
> > 
> >  	fiber_cond_create(&replicaset.applier.cond);
> >  	replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX,
> >  	sizeof(struct replica *));
> >  	latch_create(&replicaset.applier.order_latch);
> > 
> > +	vclock_create(&replicaset.applier.vclock);
> > 
> >  }
> >  
> >  void
> > 
> > diff --git a/src/box/replication.h b/src/box/replication.h
> > index 2ac620d86..b9aebed14 100644
> > --- a/src/box/replication.h
> > +++ b/src/box/replication.h
> > @@ -194,6 +194,9 @@ struct replicaset {
> > 
> >  	struct vclock vclock;
> >  	/** Applier state. */
> >  	struct {
> > 
> > +		/**
> > +		 * Vclock sent to process from appliers. */
> > +		struct vclock vclock;
> > 
> >  		/**
> >  		
> >  		 * Total number of replicas with attached
> >  		 * appliers.
> > 
> > diff --git a/src/box/vclock.c b/src/box/vclock.c
> > index b5eb2800b..c297d1ff9 100644
> > --- a/src/box/vclock.c
> > +++ b/src/box/vclock.c
> > @@ -36,6 +36,20 @@
> > 
> >  #include "diag.h"
> > 
> > +void
> > +vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
> > +{
> > +	assert(lsn >= 0);
> > +	assert(replica_id < VCLOCK_MAX);
> > +	int64_t prev_lsn = vclock->lsn[replica_id];
> > +	if (lsn > 0)
> > +		vclock->map |= 1 << replica_id;
> > +	else
> > +		vclock->map &= ~(1 << replica_id);
> > +	vclock->lsn[replica_id] = lsn;
> > +	vclock->signature += lsn - prev_lsn;
> > +}
> > +
> > 
> >  int64_t
> >  vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
> >  {
> > 
> > diff --git a/src/box/vclock.h b/src/box/vclock.h
> > index 111e29160..d6cb14c2a 100644
> > --- a/src/box/vclock.h
> > +++ b/src/box/vclock.h
> > @@ -161,6 +161,9 @@ vclock_get(const struct vclock *vclock, uint32_t
> > replica_id)> 
> >  	return vclock->lsn[replica_id];
> >  
> >  }
> > 
> > +void
> > +vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn);
> > +
> > 
> >  static inline int64_t
> >  vclock_inc(struct vclock *vclock, uint32_t replica_id)
> >  {
> > 
> > diff --git a/src/box/wal.c b/src/box/wal.c
> > index a55b544aa..4c3537672 100644
> > --- a/src/box/wal.c
> > +++ b/src/box/wal.c
> > @@ -171,6 +171,8 @@ struct wal_msg {
> > 
> >  	 * be rolled back.
> >  	 */
> >  	
> >  	struct stailq rollback;
> > 
> > +	/** vclock after the batch processed. */
> > +	struct vclock vclock;
> > 
> >  };
> >  
> >  /**
> > 
> > @@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch)
> > 
> >  	batch->approx_len = 0;
> >  	stailq_create(&batch->commit);
> >  	stailq_create(&batch->rollback);
> > 
> > +	vclock_create(&batch->vclock);
> > 
> >  }
> >  
> >  static struct wal_msg *
> > 
> > @@ -280,6 +283,7 @@ tx_schedule_commit(struct cmsg *msg)
> > 
> >  	 * wal_msg memory disappears after the first
> >  	 * iteration of tx_schedule_queue loop.
> >  	 */
> > 
> > +	vclock_copy(&replicaset.vclock, &batch->vclock);
> > 
> >  	if (! stailq_empty(&batch->rollback)) {
> >  	
> >  		/* Closes the input valve. */
> >  		stailq_concat(&writer->rollback, &batch->rollback);
> > 
> > @@ -1028,6 +1032,8 @@ done:
> >  		error_log(error);
> >  		diag_clear(diag_get());
> >  	
> >  	}
> > 
> > +	/* Set resulting vclock. */
> > +	vclock_copy(&wal_msg->vclock, &writer->vclock);
> > 
> >  	/*
> >  	
> >  	 * We need to start rollback from the first request
> >  	 * following the last committed request. If
> > 
> > @@ -1159,31 +1165,6 @@ wal_write(struct journal *journal, struct
> > journal_entry *entry)> 
> >  	bool cancellable = fiber_set_cancellable(false);
> >  	fiber_yield(); /* Request was inserted. */
> >  	fiber_set_cancellable(cancellable);
> > 
> > -	if (entry->res > 0) {
> > -		struct xrow_header **last = entry->rows + entry->n_rows - 1;
> > -		while (last >= entry->rows) {
> > -			/*
> > -			 * Find last row from local instance id
> > -			 * and promote vclock.
> > -			 */
> > -			if ((*last)->replica_id == instance_id) {
> > -				/*
> > -				 * In master-master configuration, during sudden
> > -				 * power-loss, if the data have not been written
> > -				 * to WAL but have already been sent to others,
> > -				 * they will send the data back. In this case
> > -				 * vclock has already been promoted by applier.
> > -				 */
> > -				if (vclock_get(&replicaset.vclock,
> > -					       instance_id) < (*last)->lsn) {
> > -					vclock_follow_xrow(&replicaset.vclock,
> > -							   *last);
> > -				}
> > -				break;
> > -			}
> > -			--last;
> > -		}
> > -	}
> > 
> >  	return entry->res;
> >  
> >  }
> > 
> > @@ -1193,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
> > 
> >  {
> >  
> >  	struct wal_writer *writer = (struct wal_writer *) journal;
> >  	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows +
> >  	entry->n_rows);
> > 
> > -	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
> > -	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
> > -	if (new_lsn > old_lsn) {
> > -		/* There were local writes, promote vclock. */
> > -		vclock_follow(&replicaset.vclock, instance_id, new_lsn);
> > -	}
> > +	vclock_copy(&replicaset.vclock, &writer->vclock);
> > 
> >  	return vclock_sum(&writer->vclock);
> >  
> >  }
> > 
> > diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result
> > index f387e8e89..966fa1f4a 100644
> > --- a/test/xlog-py/dup_key.result
> > +++ b/test/xlog-py/dup_key.result
> > @@ -16,7 +16,16 @@ box.space.test:insert{2, 'second tuple'}
> > 
> >  ---
> >  - [2, 'second tuple']
> >  ...
> > 
> > -.xlog exists
> > +.xlog#1 exists
> > +box.space.test:insert{3, 'third tuple'}
> > +---
> > +- [3, 'third tuple']
> > +...
> > +box.space.test:insert{4, 'fourth tuple'}
> > +---
> > +- [4, 'fourth tuple']
> > +...
> > +.xlog#2 exists
> > 
> >  box.space.test:insert{1, 'third tuple'}
> >  ---
> >  - [1, 'third tuple']
> 
> What happened to this test?
> 
> > @@ -25,7 +34,6 @@ box.space.test:insert{2, 'fourth tuple'}
> > 
> >  ---
> >  - [2, 'fourth tuple']
> >  ...
> > 
> > -.xlog does not exist
> > 
> >  check log line for 'Duplicate key'
> >  
> >  'Duplicate key' exists in server log
> > 
> > diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py
> > index 1c033da40..3dacde771 100644
> > --- a/test/xlog-py/dup_key.test.py
> > +++ b/test/xlog-py/dup_key.test.py
> > @@ -26,19 +26,27 @@ server.stop()
> > 
> >  # Save wal#1
> > 
> >  if os.access(wal, os.F_OK):
> > -    print ".xlog exists"
> > +    print ".xlog#1 exists"
> > 
> >      os.rename(wal, wal_old)
> > 
> > -# Write wal#2
> > +# Write wal#2 to bump lsn
> > +server.start()
> > +server.admin("box.space.test:insert{3, 'third tuple'}")
> > +server.admin("box.space.test:insert{4, 'fourth tuple'}")
> > +server.stop()
> > +
> > +if os.access(wal, os.F_OK):
> > +    print ".xlog#2 exists"
> > +
> > +# Write wal#3 - confliction with wal#1
> > 
> >  server.start()
> >  server.admin("box.space.test:insert{1, 'third tuple'}")
> >  server.admin("box.space.test:insert{2, 'fourth tuple'}")
> >  server.stop()
> >  
> >  # Restore wal#1
> > 
> > -if not os.access(wal, os.F_OK):
> > -    print ".xlog does not exist"
> > -    os.rename(wal_old, wal)
> > +os.unlink(wal)
> > +os.rename(wal_old, wal)

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: This is a digitally signed message part.
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20190129/d75c1892/attachment.sig>


More information about the Tarantool-patches mailing list