[tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel

Vladimir Davydov vdavydov.dev at gmail.com
Thu Jun 13 18:17:52 MSK 2019


On Sun, Jun 09, 2019 at 11:44:42PM +0300, Georgy Kirichenko wrote:
> Applier use asynchronous transaction to batch journal writes. A
> sequencer orders transaction execution and check write result while
> an applier reads network.

In general, I have no objections against the way you handle this.
It does seem that we don't need to start a fiber per each transaction -
batching WAL writes should be enough in most cases, because memtx never
yields while vinyl usually doesn't yield on write (well, there are
UPDATE/INSERT but we can replace them with REPLACE on replica; before/on
replace triggers are slow by design anyway so we don't need to optimize
them).

However, I'd expect to see the reasoning in the commit message.

Also, I find the implementation a little bit too complex and have a
suggestion how we could possibly simplify it - see comments inline.

> 
> Closes: #1254
> ---
>  src/box/applier.cc | 372 ++++++++++++++++++++++++++++++---------------
>  src/box/applier.h  |   4 +
>  src/box/box.cc     |   1 +
>  3 files changed, 251 insertions(+), 126 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 5a92f6109..9f0efda5a 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -53,6 +53,231 @@
>  
>  STRS(applier_state, applier_STATE);
>  
> +/**
> + * Process a no-op request.
> + *
> + * A no-op request does not affect any space, but it
> + * promotes vclock and is written to WAL.
> + */
> +static int
> +process_nop(struct request *request)
> +{
> +	assert(request->type == IPROTO_NOP);
> +	struct txn *txn = in_txn();
> +	if (txn_begin_stmt(txn, NULL) == NULL)
> +		return -1;
> +	return txn_commit_stmt(txn, request);
> +}
> +
> +static int
> +apply_row(struct xrow_header *row)
> +{
> +	struct request request;
> +	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
> +		return -1;
> +	if (request.type == IPROTO_NOP)
> +		return process_nop(&request);
> +	struct space *space = space_cache_find(request.space_id);
> +	if (space == NULL)
> +		return -1;
> +	if (box_process_rw(&request, space, NULL) != 0) {
> +		say_error("error applying row: %s", request_str(&request));
> +		return -1;
> +	}
> +	return 0;
> +}

Please avoid moving and adding/patching code in the same patch.
Split it if necessary. In this particular case I see no reason
for you to move the code.

> +
> +/**
> + * A helper struct to link xrow objects in a list.
> + */
> +struct applier_tx_row {
> +	/* Next transaction row. */
> +	struct stailq_entry next;
> +	/* xrow_header struct for the current transaction row. */
> +	struct xrow_header row;
> +};
> +
> +struct sequencer {
> +	struct stailq txn_queue;
> +	struct fiber_cond txn_queue_cond;
> +	struct vclock net_vclock;
> +	struct fiber_cond tx_vclock_cond;

This cond is never used...

> +	struct diag diag;
> +	struct rlist on_fail;
> +};
> +
> +static struct sequencer sequencer;
> +
> +static int
> +sequencer_collect_f(va_list ap)
> +{
> +	(void) ap;
> +	while (!fiber_is_cancelled()) {
> +		while (stailq_empty(&sequencer.txn_queue)) {
> +			if (!diag_is_empty(&sequencer.diag)) {
> +				diag_clear(&sequencer.diag);
> +				vclock_copy(&sequencer.net_vclock, &replicaset.vclock);
> +			}
> +			fiber_cond_wait(&sequencer.txn_queue_cond);
> +			continue;
> +		}
> +		struct txn *txn =
> +			stailq_shift_entry(&sequencer.txn_queue, struct txn,
> +					   in_txn_cache);
> +		if (txn_wait(txn) == 0) {
> +			continue;
> +		}
> +		if (diag_is_empty(&sequencer.diag)) {
> +			diag_move(&fiber()->diag, &sequencer.diag);
> +			trigger_run(&sequencer.on_fail, NULL);
> +		}
> +	}
> +	return 0;
> +}
> +
> +void
> +applier_init()
> +{
> +	stailq_create(&sequencer.txn_queue);
> +	fiber_cond_create(&sequencer.txn_queue_cond);
> +
> +	rlist_create(&sequencer.on_fail);
> +
> +	vclock_create(&sequencer.net_vclock);
> +	fiber_cond_create(&sequencer.tx_vclock_cond);
> +	diag_create(&sequencer.diag);
> +	struct fiber *collector = fiber_new("collector", sequencer_collect_f);
> +	if (collector == NULL)
> +		panic("Failed to create a sequencer collector fiber");
> +	fiber_start(collector, NULL);
> +}
> +
> +static inline void
> +sequencer_on_fail(struct trigger *on_fail)
> +{
> +	trigger_add(&sequencer.on_fail, on_fail);
> +}
> +
> +static void
> +sequencer_rollback_cb(struct trigger *trigger, void *event)
> +{
> +	(void) trigger;
> +	struct txn *txn = (struct txn *)event;
> +	stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache);
> +	fiber_cond_signal(&sequencer.txn_queue_cond);
> +
> +	diag_set(ClientError, ER_WAL_IO);
> +	diag_move(&fiber()->diag, &sequencer.diag);
> +	trigger_run(&sequencer.on_fail, &sequencer);
> +}
> +
> +static void
> +sequencer_commit_cb(struct trigger *trigger, void *event)
> +{
> +	(void) trigger;
> +	(void) event;
> +	struct txn *txn = (struct txn *)event;
> +	stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache);
> +	fiber_cond_signal(&sequencer.txn_queue_cond);
> +	fiber_cond_broadcast(&sequencer.tx_vclock_cond);
> +}
> +
> +static inline int
> +sequencer_submit(uint32_t replica_id, int64_t lsn, struct stailq *rows)
> +{
> +	struct replica *replica = replica_by_id(replica_id);
> +	struct latch *latch = (replica ? &replica->order_latch :
> +			      &replicaset.applier.order_latch);
> +
> +	latch_lock(latch);
> +	if (vclock_get(&sequencer.net_vclock, replica_id) >= lsn) {
> +		/* Nothing to do. */
> +		latch_unlock(latch);
> +		return 0;
> +	}
> +
> +	struct trigger *on_rollback;
> +	struct trigger *on_commit;
> +	/**
> +	 * Explicitly begin the transaction so that we can
> +	 * control fiber->gc life cycle and, in case of apply
> +	 * conflict safely access failed xrow object and allocate
> +	 * IPROTO_NOP on gc.
> +	 */
> +	struct txn *txn = txn_begin();
> +	if (txn == NULL)
> +		goto fail;
> +	struct applier_tx_row *item;
> +	stailq_foreach_entry(item, rows, next) {
> +		struct xrow_header *row = &item->row;
> +		int res = apply_row(row);
> +		if (res != 0) {
> +			struct error *e = diag_last_error(diag_get());
> +			/*
> +			 * In case of ER_TUPLE_FOUND error and enabled
> +			 * replication_skip_conflict configuration
> +			 * option, skip applying the foreign row and
> +			 * replace it with NOP in the local write ahead
> +			 * log.
> +			 */
> +			if (e->type == &type_ClientError &&
> +			    box_error_code(e) == ER_TUPLE_FOUND &&
> +			    replication_skip_conflict) {
> +				diag_clear(diag_get());
> +				row->type = IPROTO_NOP;
> +				row->bodycnt = 0;
> +				res = apply_row(row);
> +			}
> +		}
> +		if (res != 0)
> +			goto rollback;
> +	}
> +	/*
> +	 * We are going to commit so it's a high time to check if
> +	 * the current transaction has non-local effects.
> +	 */
> +	if (txn_is_distributed(txn)) {
> +		/*
> +		 * A transaction mixes remote and local rows.
> +		 * Local rows must be replicated back, which
> +		 * doesn't make sense since the master likely has
> +		 * new changes which local rows may overwrite.
> +		 * Raise an error.
> +		 */
> +		diag_set(ClientError, ER_UNSUPPORTED,
> +			 "Replication", "distributed transactions");
> +		goto rollback;
> +	}
> +
> +	/* We are ready to submit txn to wal. */
> +	on_rollback = (struct trigger *)txn_alloc(txn, sizeof(struct trigger));
> +	trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL);
> +	txn_on_rollback(txn, on_rollback);
> +
> +	on_commit = (struct trigger *)txn_alloc(txn, sizeof(struct trigger));
> +	trigger_create(on_commit, sequencer_commit_cb, NULL, NULL);
> +	txn_on_commit(txn, on_commit);
> +
> +	if (txn_write(txn) != 0)
> +		goto fail;
> +
> +	vclock_follow(&sequencer.net_vclock, replica_id, lsn);
> +	latch_unlock(latch);
> +	return 0;
> +
> +rollback:
> +	txn_rollback(txn);
> +
> +fail:
> +	latch_unlock(latch);
> +	if (diag_is_empty(&sequencer.diag)) {
> +		diag_add_error(&sequencer.diag, diag_last_error(&fiber()->diag));
> +		trigger_run(&sequencer.on_fail, NULL);
> +	}
> +	return -1;
> +}

The code is quite complex and without comments it's easy to get lost.
Please next time write proper comments explaining what all the actors
do.

Just so I understand what's going on here:

 1. applier performs requests comprising a transaction.
 2. applier submits WAL write asynchronously.
 3. wal appends the journal entry to its queue.
 4. applier sets on_commit/rollback trigger to add completed tx to
    sequencer queue.
 5. applier continues to the next transaction (goes to step 1).
 6. wal writes the request and performs the txn_complete callback.
    It also wakes up the fiber waiting for the transaction (hell,
    there's no such fiber in this case!)
 7. txn_complete callback runs on_commit trigger.
 8. on_commit trigger set by the applier adds the transaction to the
    sequencer queue.
 9. sequencer calls txn_wait. Since the transaction is complete,
    it just frees the transaction.

This is a good example of over-engineering IMHO :)

In particular, I find the sequencer entity redundant, because WAL
already acts as a request sequencer so we should reuse its queue
rather than introducing a new one.

Let's try to simplify things a bit:

 - Instead of having journal_async_write/journal_async_wait pair, let's
   introduce single method journal_submit that would take a callback
   to execute upon write completion. The method doesn't need to wake up
   the initiating fiber - it should be done by the callback itself, if
   necessary. No other methods are defined for journal.

 - Then add txn_commit and txn_commit_async that would do something like
   this:

     txn_commit:
       txn->is_async = false
       txn_do_commit()
       while (!txn_is_complete())
           fiber_yield_timeout()
       txn_free()

     txn_commit_async:
       txn->is_async = true
       txn_do_commit()

     txn_do_commit:
       do preparatory engine stuff
       wal_submit(txn_complete)

     txn_complete:
       do completing engine stuff
       run on_commit/rollback triggers
       if (txn->is_async)
           txn_free()
       else
           wakeup awaiting fiber

 - Now applier just sets on_rollback trigger to stop in case of WAL
   write error, then fires txn_commit_async and forgets about the txn -
   it'll get freed by WAL callback automatically upon completion.

No sequencer with a separate queue, no new triggers, no new journal
methods. Overall, should look much simpler. Have you considered this?
Are there any problems? Please check it out.



More information about the Tarantool-patches mailing list