[tarantool-patches] Re: [PATCH v4 8/9] applier: apply transaction in parallel

Георгий Кириченко georgy at tarantool.org
Thu Jun 20 23:33:50 MSK 2019


On Thursday, June 20, 2019 7:37:09 PM MSK Vladimir Davydov wrote:
> On Thu, Jun 20, 2019 at 10:41:10AM +0300, Георгий Кириченко wrote:
> > I'm sorry, there is proper version of the commit:
> > 
> > Applier use asynchronous transaction to batch journal writes. All
> > appliers share the replicaset.applier.tx_vclock which means the vclock
> > applied but not necessarily written to a journal. Appliers use a trigger
> > to coordinate in case of failure - when a transaction is going to
> > be rolled back. Also an applier writer condition is shared across all
> > appliers and signaled in case of commit or hearth beat message.
> > 
> > Closes: #1254
> > ---
> > 
> >  src/box/applier.cc     | 156 +++++++++++++++++++++++++++++------------
> >  src/box/applier.h      |   9 ++-
> >  src/box/replication.cc |   7 ++
> >  src/box/replication.h  |  14 ++++
> >  4 files changed, 138 insertions(+), 48 deletions(-)
> > 
> > diff --git a/src/box/applier.cc b/src/box/applier.cc
> > index 5a92f6109..fee49d8ca 100644
> > --- a/src/box/applier.cc
> > +++ b/src/box/applier.cc
> > @@ -50,6 +50,7 @@
> > 
> >  #include "schema.h"
> >  #include "txn.h"
> >  #include "box.h"
> > 
> > +#include "scoped_guard.h"
> > 
> >  STRS(applier_state, applier_STATE);
> > 
> > @@ -130,11 +131,24 @@ applier_writer_f(va_list ap)
> > 
> >  		 * replication_timeout seconds any more.
> >  		 */
> >  		
> >  		if (applier->version_id >= version_id(1, 7, 7))
> > 
> > -			fiber_cond_wait_timeout(&applier->writer_cond,
> > +			fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
> > 
> >  						TIMEOUT_INFINITY);
> >  		
> >  		else
> > 
> > -			fiber_cond_wait_timeout(&applier->writer_cond,
> > +			fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
> > 
> >  						replication_timeout);
> 
> Why replace applier->writer_cond with replicaset.applier.commit_cond?
> This means that even if only one applier is active, we will wake up all
> of the writers on each commit, which looks strange.
I did it because an applier doesn't have any control of how transaction is 
finished except an on_commit/on_rollback trigger. However if an applier sends 
nothing to commit (for instance it could be behind others) it still should 
send ACK. Also I think we should update this state for any transaction 
processed (even for local ones).
> 
> > +		/*
> > +		 * Stay 'orphan' until appliers catch up with
> > +		 * the remote vclock at the time of SUBSCRIBE
> > +		 * and the lag is less than configured.
> > +		 */
> > +		if (applier->state == APPLIER_SYNC &&
> > +		    applier->lag <= replication_sync_lag &&
> > +		    vclock_compare(&applier->remote_vclock_at_subscribe,
> > +				   &replicaset.vclock) <= 0) {
> > +			/* Applier is synced, switch to "follow". */
> > +			applier_set_state(applier, APPLIER_FOLLOW);
> > +		}
> > +
> 
> A writer is supposed to send ACKs, not change the applier state.
> How did this wind up here? Can't we do this right from the on_commit
> trigger?
The same case above - if applier didn't send anything to commit (it is behind 
other applier) where is the better point to update its state.
> 
> >  		/* Send ACKs only when in FOLLOW mode ,*/
> >  		if (applier->state != APPLIER_SYNC &&
> >  		
> >  		    applier->state != APPLIER_FOLLOW)
> > 
> > @@ -565,6 +579,36 @@ applier_read_tx(struct applier *applier, struct
> > stailq
> > *rows)
> > 
> >  				    next)->row.is_commit);
> >  
> >  }
> > 
> > +static void
> > +sequencer_rollback_cb(struct trigger *trigger, void *event)
> 
> There's no sequencer object so the names are confusing. Let's call them
> applier_on_rollback/commit?
Accepted.
> 
> > +{
> > +	(void) trigger;
> > +	(void) event;
> > +	diag_set(ClientError, ER_WAL_IO);
> > +	diag_move(&fiber()->diag, &replicaset.applier.diag);
> > +	trigger_run(&replicaset.applier.on_replication_fail, NULL);
> > +	vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock);
> > +}
> > +
> > +static void
> > +sequencer_commit_cb(struct trigger *trigger, void *event)
> > +{
> > +	(void) trigger;
> > +	(void) event;
> > +	fiber_cond_broadcast(&replicaset.applier.commit_cond);
> > +}
> > +
> > +static void
> > +applier_on_fail(struct trigger *trigger, void *event)
> > +{
> > +	(void) event;
> > +	struct applier *applier = (struct applier *)trigger->data;
> > +	if (!diag_is_empty(&replicaset.applier.diag))
> > +		diag_add_error(&applier->diag,
> > diag_last_error(&replicaset.applier.diag));
> > +	fiber_cancel(applier->reader);
> > +
> > +}
> > +
> > 
> >  /**
> >  
> >   * Apply all rows in the rows queue as a single transaction.
> >   *
> > 
> > @@ -573,6 +617,22 @@ applier_read_tx(struct applier *applier, struct
> > stailq
> > *rows)
> > 
> >  static int
> >  applier_apply_tx(struct stailq *rows)
> >  {
> > 
> > +	struct xrow_header *first_row =
> > +		&stailq_first_entry(rows, struct applier_tx_row,
> > +				    next)->row;
> > +	struct replica *replica = replica_by_id(first_row->replica_id);
> > +	struct latch *latch = (replica ? &replica->order_latch :
> > +			       &replicaset.applier.order_latch);
> > +	latch_lock(latch);
> > +	if (vclock_get(&replicaset.applier.net_vclock, first_row->replica_id) 
>=
> > +	    first_row->lsn) {
> > +		/* Check there is a heathbeat message and wake a writers up. */
> > +		if (first_row->lsn == 0)
> > +			fiber_cond_broadcast(&replicaset.applier.commit_cond);
> 
> Would be better to check that before taking the latch. We don't need the
> latch to reply to a heartbeat message, do we?
Accepted.
> 
> > +		latch_unlock(latch);
> > +		return 0;
> > +	}
> > +
> > 
> >  	/**
> >  	
> >  	 * Explicitly begin the transaction so that we can
> >  	 * control fiber->gc life cycle and, in case of apply
> > 
> > @@ -581,8 +641,10 @@ applier_apply_tx(struct stailq *rows)
> > 
> >  	 */
> >  	
> >  	struct txn *txn = txn_begin();
> >  	struct applier_tx_row *item;
> > 
> > -	if (txn == NULL)
> > -		diag_raise();
> > +	if (txn == NULL) {
> > +		latch_unlock(latch);
> > +		return -1;
> > +	}
> > 
> >  	stailq_foreach_entry(item, rows, next) {
> >  	
> >  		struct xrow_header *row = &item->row;
> >  		int res = apply_row(row);
> > 
> > @@ -623,10 +685,34 @@ applier_apply_tx(struct stailq *rows)
> > 
> >  			 "Replication", "distributed transactions");
> >  		
> >  		goto rollback;
> >  	
> >  	}
> > 
> > -	return txn_commit(txn);
> > 
> > +	/* We are ready to submit txn to wal. */
> > +	struct trigger *on_rollback, *on_commit;
> > +	on_rollback = (struct trigger *)region_alloc(&txn->region,
> > +						     sizeof(struct trigger));
> > +	on_commit = (struct trigger *)region_alloc(&txn->region,
> > +						   sizeof(struct trigger));
> > +	if (on_rollback == NULL || on_commit == NULL)
> > +		goto rollback;
> > +
> > +	trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL);
> > +	txn_on_rollback(txn, on_rollback);
> > +
> > +	trigger_create(on_commit, sequencer_commit_cb, NULL, NULL);
> > +	txn_on_commit(txn, on_commit);
> > +
> > +	if (txn_write(txn) < 0)
> > +		goto fail;
> > +	/* Transaction was sent to journal so promote vclock. */
> > +	vclock_follow(&replicaset.applier.net_vclock, first_row->replica_id,
> > +		      first_row->lsn);
> > +	latch_unlock(latch);
> > +
> > +	return 0;
> > 
> >  rollback:
> >  	txn_rollback(txn);
> > 
> > +fail:
> > +	latch_unlock(latch);
> > 
> >  	fiber_gc();
> >  	return -1;
> >  
> >  }
> > 
> > @@ -641,7 +727,6 @@ applier_subscribe(struct applier *applier)
> > 
> >  	struct ev_io *coio = &applier->io;
> >  	struct ibuf *ibuf = &applier->ibuf;
> >  	struct xrow_header row;
> > 
> > -	struct vclock remote_vclock_at_subscribe;
> > 
> >  	struct tt_uuid cluster_id = uuid_nil;
> >  	
> >  	struct vclock vclock;
> > 
> > @@ -668,10 +753,10 @@ applier_subscribe(struct applier *applier)
> > 
> >  		 * the replica, and replica has to check whether
> >  		 * its and master's cluster ids match.
> >  		 */
> > 
> > -		vclock_create(&remote_vclock_at_subscribe);
> > +		vclock_create(&applier->remote_vclock_at_subscribe);
> > 
> >  		xrow_decode_subscribe_response_xc(&row,
> >  		
> >  						  &cluster_id,
> > 
> > -						  &remote_vclock_at_subscribe);
> > +						  &applier-
> > 
> > >remote_vclock_at_subscribe);
> > >
> >  		/*
> >  		
> >  		 * If master didn't send us its cluster id
> >  		 * assume that it has done all the checks.
> > 
> > @@ -686,7 +771,7 @@ applier_subscribe(struct applier *applier)
> > 
> >  		say_info("subscribed");
> >  		say_info("remote vclock %s local vclock %s",
> > 
> > -			 vclock_to_string(&remote_vclock_at_subscribe),
> > +			 vclock_to_string(&applier->remote_vclock_at_subscribe),
> > 
> >  			 vclock_to_string(&vclock));
> >  	
> >  	}
> >  	/*
> > 
> > @@ -735,6 +820,15 @@ applier_subscribe(struct applier *applier)
> > 
> >  	applier->lag = TIMEOUT_INFINITY;
> > 
> > +	/* Register a trigger to handle replication failures. */
> > +	struct trigger on_fail;
> > +	trigger_create(&on_fail, applier_on_fail, applier, NULL);
> > +	trigger_add(&replicaset.applier.on_replication_fail, &on_fail);
> 
> Why do we need on_replication_fail trigger? AFAICS it is called from
> on_rollback callback. Can't we call applier_on_fail right from there,
> without the use of the intermediary?
Because we should cancel all appliers if anything failed (for instance an 
applier could skip a transaction and start with the next one and then should 
be cancelled if other applier failed to). We could track the applier list but 
I'm not sure it would be better.
> 
> > +	auto trigger_guard = make_scoped_guard([&] {
> > +		trigger_clear(&on_fail);
> > +	});
> > +
> > +
> > 
> >  	/*
> >  	
> >  	 * Process a stream of rows from the binary log.
> >  	 */
> > 
> > @@ -747,47 +841,13 @@ applier_subscribe(struct applier *applier)
> > 
> >  			applier_set_state(applier, APPLIER_FOLLOW);
> >  		
> >  		}
> > 
> > -		/*
> > -		 * Stay 'orphan' until appliers catch up with
> > -		 * the remote vclock at the time of SUBSCRIBE
> > -		 * and the lag is less than configured.
> > -		 */
> > -		if (applier->state == APPLIER_SYNC &&
> > -		    applier->lag <= replication_sync_lag &&
> > -		    vclock_compare(&remote_vclock_at_subscribe,
> > -				   &replicaset.vclock) <= 0) {
> > -			/* Applier is synced, switch to "follow". */
> > -			applier_set_state(applier, APPLIER_FOLLOW);
> > -		}
> > -
> > 
> >  		struct stailq rows;
> >  		applier_read_tx(applier, &rows);
> > 
> > -		struct xrow_header *first_row =
> > -			&stailq_first_entry(&rows, struct applier_tx_row,
> > -					    next)->row;
> > 
> >  		applier->last_row_time = ev_monotonic_now(loop());
> > 
> > -		struct replica *replica = replica_by_id(first_row->replica_id);
> > -		struct latch *latch = (replica ? &replica->order_latch :
> > -				       &replicaset.applier.order_latch);
> > -		/*
> > -		 * In a full mesh topology, the same set of changes
> > -		 * may arrive via two concurrently running appliers.
> > -		 * Hence we need a latch to strictly order all changes
> > -		 * that belong to the same server id.
> > -		 */
> 
> Why did you remove this comment? (You didn't move it, I checked).
Accepted
> 
> > -		latch_lock(latch);
> > -		if (vclock_get(&replicaset.vclock, first_row->replica_id) <
> > -		    first_row->lsn &&
> > -		    applier_apply_tx(&rows) != 0) {
> > -			latch_unlock(latch);
> > +		if (applier_apply_tx(&rows) != 0)
> > 
> >  			diag_raise();
> > 
> > -		}
> > -		latch_unlock(latch);
> > 
> > -		if (applier->state == APPLIER_SYNC ||
> > -		    applier->state == APPLIER_FOLLOW)
> > -			fiber_cond_signal(&applier->writer_cond);
> > 
> >  		if (ibuf_used(ibuf) == 0)
> >  		
> >  			ibuf_reset(ibuf);
> >  		
> >  		fiber_gc();
> > 
> > @@ -872,6 +932,11 @@ applier_f(va_list ap)
> > 
> >  				return -1;
> >  			
> >  			}
> >  		
> >  		} catch (FiberIsCancelled *e) {
> > 
> > +			if (!diag_is_empty(&applier->diag)) {
> > +				diag_move(&applier->diag, &fiber()->diag);
> 
> Hmm, AFAIK we only need this diag for box.info.replication. May be,
> better patch box.info.replication to use applier->diag instead of
> applier->reader->diag so that we don't need to move errors around?
If one applier failed then any other is going to be cancelled with the same 
error. As applier_subscribe uses an exception I should populate the diag to 
raise it.
> 
> > +				applier_disconnect(applier, APPLIER_STOPPED);
> > +				break;
> > +			}
> > 
> >  			applier_disconnect(applier, APPLIER_OFF);
> >  			break;
> >  		
> >  		} catch (SocketError *e) {
> > 
> > @@ -959,7 +1024,7 @@ applier_new(const char *uri)
> > 
> >  	applier->last_row_time = ev_monotonic_now(loop());
> >  	rlist_create(&applier->on_state);
> >  	fiber_cond_create(&applier->resume_cond);
> > 
> > -	fiber_cond_create(&applier->writer_cond);
> > +	diag_create(&applier->diag);
> > 
> >  	return applier;
> >  
> >  }
> > 
> > @@ -972,7 +1037,6 @@ applier_delete(struct applier *applier)
> > 
> >  	assert(applier->io.fd == -1);
> >  	trigger_destroy(&applier->on_state);
> >  	fiber_cond_destroy(&applier->resume_cond);
> > 
> > -	fiber_cond_destroy(&applier->writer_cond);
> > 
> >  	free(applier);
> >  
> >  }
> > 
> > diff --git a/src/box/applier.h b/src/box/applier.h
> > index 5bff90031..716da32e2 100644
> > --- a/src/box/applier.h
> > +++ b/src/box/applier.h
> > @@ -74,8 +74,6 @@ struct applier {
> > 
> >  	struct fiber *reader;
> >  	/** Background fiber to reply with vclock */
> >  	struct fiber *writer;
> > 
> > -	/** Writer cond. */
> > -	struct fiber_cond writer_cond;
> > 
> >  	/** Finite-state machine */
> >  	enum applier_state state;
> >  	/** Local time of this replica when the last row has been received */
> > 
> > @@ -114,8 +112,15 @@ struct applier {
> > 
> >  	bool is_paused;
> >  	/** Condition variable signaled to resume the applier. */
> >  	struct fiber_cond resume_cond;
> > 
> > +	/* Diag to raise an error. */
> > +	struct diag diag;
> > +	/* The masters vclock while subscribe. */
> > +	struct vclock remote_vclock_at_subscribe;
> > 
> >  };
> > 
> > +void
> > +applier_init();
> > +
> 
> This function isn't defined anywhere.
> 
> > diff --git a/src/box/replication.h b/src/box/replication.h
> > index 8c8a9927e..a4830f5b5 100644
> > --- a/src/box/replication.h
> > +++ b/src/box/replication.h
> > @@ -232,6 +232,20 @@ struct replicaset {
> > 
> >  		 * struct replica object).
> >  		 */
> >  		
> >  		struct latch order_latch;
> > 
> > +		/*
> > +		 * A vclock of the last transaction wich was read
> > +		 * from an applier connection.
> > +		 */
> > +		struct vclock net_vclock;
> 
> Please elaborate. Can it be less than replicaset.vclock? Can it be
> greater? Why?
Let discuss it f2f.
> 
> > +		/* Signaled on replicated transaction commit. */
> > +		struct fiber_cond commit_cond;
> > +		/*
> > +		 * Trigger to fire when replication stops in case
> > +		 * of an error.
> > +		 */
> > +		struct rlist on_replication_fail;
> > +		/* Diag to populate an error acros all appliers. */
> > +		struct diag diag;
> > 
> >  	} applier;
> >  	/** Map of all known replica_id's to correspponding replica's. */
> >  	struct replica **replica_by_id;

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: This is a digitally signed message part.
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20190620/c2931ad5/attachment.sig>


More information about the Tarantool-patches mailing list