[tarantool-patches] Re: [PATCH v4 8/9] applier: apply transaction in parallel

Vladimir Davydov vdavydov.dev at gmail.com
Thu Jun 20 19:37:09 MSK 2019


On Thu, Jun 20, 2019 at 10:41:10AM +0300, Георгий Кириченко wrote:
> I'm sorry, there is proper version of the commit:
> 
> Applier use asynchronous transaction to batch journal writes. All
> appliers share the replicaset.applier.tx_vclock which means the vclock
> applied but not necessarily written to a journal. Appliers use a trigger
> to coordinate in case of failure - when a transaction is going to
> be rolled back. Also an applier writer condition is shared across all
> appliers and signaled in case of commit or hearth beat message.
> 
> Closes: #1254
> ---
>  src/box/applier.cc     | 156 +++++++++++++++++++++++++++++------------
>  src/box/applier.h      |   9 ++-
>  src/box/replication.cc |   7 ++
>  src/box/replication.h  |  14 ++++
>  4 files changed, 138 insertions(+), 48 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 5a92f6109..fee49d8ca 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -50,6 +50,7 @@
>  #include "schema.h"
>  #include "txn.h"
>  #include "box.h"
> +#include "scoped_guard.h"
>  
>  STRS(applier_state, applier_STATE);
>  
> @@ -130,11 +131,24 @@ applier_writer_f(va_list ap)
>  		 * replication_timeout seconds any more.
>  		 */
>  		if (applier->version_id >= version_id(1, 7, 7))
> -			fiber_cond_wait_timeout(&applier->writer_cond,
> +			fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
>  						TIMEOUT_INFINITY);
>  		else
> -			fiber_cond_wait_timeout(&applier->writer_cond,
> +			fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
>  						replication_timeout);

Why replace applier->writer_cond with replicaset.applier.commit_cond?
This means that even if only one applier is active, we will wake up all
of the writers on each commit, which looks strange.

> +		/*
> +		 * Stay 'orphan' until appliers catch up with
> +		 * the remote vclock at the time of SUBSCRIBE
> +		 * and the lag is less than configured.
> +		 */
> +		if (applier->state == APPLIER_SYNC &&
> +		    applier->lag <= replication_sync_lag &&
> +		    vclock_compare(&applier->remote_vclock_at_subscribe,
> +				   &replicaset.vclock) <= 0) {
> +			/* Applier is synced, switch to "follow". */
> +			applier_set_state(applier, APPLIER_FOLLOW);
> +		}
> +

A writer is supposed to send ACKs, not change the applier state.
How did this wind up here? Can't we do this right from the on_commit
trigger?

>  		/* Send ACKs only when in FOLLOW mode ,*/
>  		if (applier->state != APPLIER_SYNC &&
>  		    applier->state != APPLIER_FOLLOW)
> @@ -565,6 +579,36 @@ applier_read_tx(struct applier *applier, struct stailq 
> *rows)
>  				    next)->row.is_commit);
>  }
>  
> +static void
> +sequencer_rollback_cb(struct trigger *trigger, void *event)

There's no sequencer object so the names are confusing. Let's call them
applier_on_rollback/commit?

> +{
> +	(void) trigger;
> +	(void) event;
> +	diag_set(ClientError, ER_WAL_IO);
> +	diag_move(&fiber()->diag, &replicaset.applier.diag);
> +	trigger_run(&replicaset.applier.on_replication_fail, NULL);
> +	vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock);
> +}
> +
> +static void
> +sequencer_commit_cb(struct trigger *trigger, void *event)
> +{
> +	(void) trigger;
> +	(void) event;
> +	fiber_cond_broadcast(&replicaset.applier.commit_cond);
> +}
> +
> +static void
> +applier_on_fail(struct trigger *trigger, void *event)
> +{
> +	(void) event;
> +	struct applier *applier = (struct applier *)trigger->data;
> +	if (!diag_is_empty(&replicaset.applier.diag))
> +		diag_add_error(&applier->diag, 
> diag_last_error(&replicaset.applier.diag));
> +	fiber_cancel(applier->reader);
> +
> +}
> +
>  /**
>   * Apply all rows in the rows queue as a single transaction.
>   *
> @@ -573,6 +617,22 @@ applier_read_tx(struct applier *applier, struct stailq 
> *rows)
>  static int
>  applier_apply_tx(struct stailq *rows)
>  {
> +	struct xrow_header *first_row =
> +		&stailq_first_entry(rows, struct applier_tx_row,
> +				    next)->row;
> +	struct replica *replica = replica_by_id(first_row->replica_id);
> +	struct latch *latch = (replica ? &replica->order_latch :
> +			       &replicaset.applier.order_latch);
> +	latch_lock(latch);
> +	if (vclock_get(&replicaset.applier.net_vclock, first_row->replica_id) >=
> +	    first_row->lsn) {
> +		/* Check there is a heathbeat message and wake a writers up. */
> +		if (first_row->lsn == 0)
> +			fiber_cond_broadcast(&replicaset.applier.commit_cond);

Would be better to check that before taking the latch. We don't need the
latch to reply to a heartbeat message, do we?

> +		latch_unlock(latch);
> +		return 0;
> +	}
> +
>  	/**
>  	 * Explicitly begin the transaction so that we can
>  	 * control fiber->gc life cycle and, in case of apply
> @@ -581,8 +641,10 @@ applier_apply_tx(struct stailq *rows)
>  	 */
>  	struct txn *txn = txn_begin();
>  	struct applier_tx_row *item;
> -	if (txn == NULL)
> -		diag_raise();
> +	if (txn == NULL) {
> +		latch_unlock(latch);
> +		return -1;
> +	}
>  	stailq_foreach_entry(item, rows, next) {
>  		struct xrow_header *row = &item->row;
>  		int res = apply_row(row);
> @@ -623,10 +685,34 @@ applier_apply_tx(struct stailq *rows)
>  			 "Replication", "distributed transactions");
>  		goto rollback;
>  	}
> -	return txn_commit(txn);
>  
> +	/* We are ready to submit txn to wal. */
> +	struct trigger *on_rollback, *on_commit;
> +	on_rollback = (struct trigger *)region_alloc(&txn->region,
> +						     sizeof(struct trigger));
> +	on_commit = (struct trigger *)region_alloc(&txn->region,
> +						   sizeof(struct trigger));
> +	if (on_rollback == NULL || on_commit == NULL)
> +		goto rollback;
> +
> +	trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL);
> +	txn_on_rollback(txn, on_rollback);
> +
> +	trigger_create(on_commit, sequencer_commit_cb, NULL, NULL);
> +	txn_on_commit(txn, on_commit);
> +
> +	if (txn_write(txn) < 0)
> +		goto fail;
> +	/* Transaction was sent to journal so promote vclock. */
> +	vclock_follow(&replicaset.applier.net_vclock, first_row->replica_id,
> +		      first_row->lsn);
> +	latch_unlock(latch);
> +
> +	return 0;
>  rollback:
>  	txn_rollback(txn);
> +fail:
> +	latch_unlock(latch);
>  	fiber_gc();
>  	return -1;
>  }
> @@ -641,7 +727,6 @@ applier_subscribe(struct applier *applier)
>  	struct ev_io *coio = &applier->io;
>  	struct ibuf *ibuf = &applier->ibuf;
>  	struct xrow_header row;
> -	struct vclock remote_vclock_at_subscribe;
>  	struct tt_uuid cluster_id = uuid_nil;
>  
>  	struct vclock vclock;
> @@ -668,10 +753,10 @@ applier_subscribe(struct applier *applier)
>  		 * the replica, and replica has to check whether
>  		 * its and master's cluster ids match.
>  		 */
> -		vclock_create(&remote_vclock_at_subscribe);
> +		vclock_create(&applier->remote_vclock_at_subscribe);
>  		xrow_decode_subscribe_response_xc(&row,
>  						  &cluster_id,
> -						  &remote_vclock_at_subscribe);
> +						  &applier-
> >remote_vclock_at_subscribe);
>  		/*
>  		 * If master didn't send us its cluster id
>  		 * assume that it has done all the checks.
> @@ -686,7 +771,7 @@ applier_subscribe(struct applier *applier)
>  
>  		say_info("subscribed");
>  		say_info("remote vclock %s local vclock %s",
> -			 vclock_to_string(&remote_vclock_at_subscribe),
> +			 vclock_to_string(&applier->remote_vclock_at_subscribe),
>  			 vclock_to_string(&vclock));
>  	}
>  	/*
> @@ -735,6 +820,15 @@ applier_subscribe(struct applier *applier)
>  
>  	applier->lag = TIMEOUT_INFINITY;
>  
> +	/* Register a trigger to handle replication failures. */
> +	struct trigger on_fail;
> +	trigger_create(&on_fail, applier_on_fail, applier, NULL);
> +	trigger_add(&replicaset.applier.on_replication_fail, &on_fail);

Why do we need on_replication_fail trigger? AFAICS it is called from
on_rollback callback. Can't we call applier_on_fail right from there,
without the use of the intermediary?

> +	auto trigger_guard = make_scoped_guard([&] {
> +		trigger_clear(&on_fail);
> +	});
> +
> +
>  	/*
>  	 * Process a stream of rows from the binary log.
>  	 */
> @@ -747,47 +841,13 @@ applier_subscribe(struct applier *applier)
>  			applier_set_state(applier, APPLIER_FOLLOW);
>  		}
>  
> -		/*
> -		 * Stay 'orphan' until appliers catch up with
> -		 * the remote vclock at the time of SUBSCRIBE
> -		 * and the lag is less than configured.
> -		 */
> -		if (applier->state == APPLIER_SYNC &&
> -		    applier->lag <= replication_sync_lag &&
> -		    vclock_compare(&remote_vclock_at_subscribe,
> -				   &replicaset.vclock) <= 0) {
> -			/* Applier is synced, switch to "follow". */
> -			applier_set_state(applier, APPLIER_FOLLOW);
> -		}
> -
>  		struct stailq rows;
>  		applier_read_tx(applier, &rows);
>  
> -		struct xrow_header *first_row =
> -			&stailq_first_entry(&rows, struct applier_tx_row,
> -					    next)->row;
>  		applier->last_row_time = ev_monotonic_now(loop());
> -		struct replica *replica = replica_by_id(first_row->replica_id);
> -		struct latch *latch = (replica ? &replica->order_latch :
> -				       &replicaset.applier.order_latch);
> -		/*
> -		 * In a full mesh topology, the same set of changes
> -		 * may arrive via two concurrently running appliers.
> -		 * Hence we need a latch to strictly order all changes
> -		 * that belong to the same server id.
> -		 */

Why did you remove this comment? (You didn't move it, I checked).

> -		latch_lock(latch);
> -		if (vclock_get(&replicaset.vclock, first_row->replica_id) <
> -		    first_row->lsn &&
> -		    applier_apply_tx(&rows) != 0) {
> -			latch_unlock(latch);
> +		if (applier_apply_tx(&rows) != 0)
>  			diag_raise();
> -		}
> -		latch_unlock(latch);
>  
> -		if (applier->state == APPLIER_SYNC ||
> -		    applier->state == APPLIER_FOLLOW)
> -			fiber_cond_signal(&applier->writer_cond);
>  		if (ibuf_used(ibuf) == 0)
>  			ibuf_reset(ibuf);
>  		fiber_gc();
> @@ -872,6 +932,11 @@ applier_f(va_list ap)
>  				return -1;
>  			}
>  		} catch (FiberIsCancelled *e) {
> +			if (!diag_is_empty(&applier->diag)) {
> +				diag_move(&applier->diag, &fiber()->diag);

Hmm, AFAIK we only need this diag for box.info.replication. May be,
better patch box.info.replication to use applier->diag instead of
applier->reader->diag so that we don't need to move errors around?

> +				applier_disconnect(applier, APPLIER_STOPPED);
> +				break;
> +			}
>  			applier_disconnect(applier, APPLIER_OFF);
>  			break;
>  		} catch (SocketError *e) {
> @@ -959,7 +1024,7 @@ applier_new(const char *uri)
>  	applier->last_row_time = ev_monotonic_now(loop());
>  	rlist_create(&applier->on_state);
>  	fiber_cond_create(&applier->resume_cond);
> -	fiber_cond_create(&applier->writer_cond);
> +	diag_create(&applier->diag);
>  
>  	return applier;
>  }
> @@ -972,7 +1037,6 @@ applier_delete(struct applier *applier)
>  	assert(applier->io.fd == -1);
>  	trigger_destroy(&applier->on_state);
>  	fiber_cond_destroy(&applier->resume_cond);
> -	fiber_cond_destroy(&applier->writer_cond);
>  	free(applier);
>  }
>  
> diff --git a/src/box/applier.h b/src/box/applier.h
> index 5bff90031..716da32e2 100644
> --- a/src/box/applier.h
> +++ b/src/box/applier.h
> @@ -74,8 +74,6 @@ struct applier {
>  	struct fiber *reader;
>  	/** Background fiber to reply with vclock */
>  	struct fiber *writer;
> -	/** Writer cond. */
> -	struct fiber_cond writer_cond;
>  	/** Finite-state machine */
>  	enum applier_state state;
>  	/** Local time of this replica when the last row has been received */
> @@ -114,8 +112,15 @@ struct applier {
>  	bool is_paused;
>  	/** Condition variable signaled to resume the applier. */
>  	struct fiber_cond resume_cond;
> +	/* Diag to raise an error. */
> +	struct diag diag;
> +	/* The masters vclock while subscribe. */
> +	struct vclock remote_vclock_at_subscribe;
>  };
>  
> +void
> +applier_init();
> +

This function isn't defined anywhere.

> diff --git a/src/box/replication.h b/src/box/replication.h
> index 8c8a9927e..a4830f5b5 100644
> --- a/src/box/replication.h
> +++ b/src/box/replication.h
> @@ -232,6 +232,20 @@ struct replicaset {
>  		 * struct replica object).
>  		 */
>  		struct latch order_latch;
> +		/*
> +		 * A vclock of the last transaction wich was read
> +		 * from an applier connection.
> +		 */
> +		struct vclock net_vclock;

Please elaborate. Can it be less than replicaset.vclock? Can it be
greater? Why?

> +		/* Signaled on replicated transaction commit. */
> +		struct fiber_cond commit_cond;
> +		/*
> +		 * Trigger to fire when replication stops in case
> +		 * of an error.
> +		 */
> +		struct rlist on_replication_fail;
> +		/* Diag to populate an error acros all appliers. */
> +		struct diag diag;
>  	} applier;
>  	/** Map of all known replica_id's to correspponding replica's. */
>  	struct replica **replica_by_id;



More information about the Tarantool-patches mailing list