Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladimir Davydov <vdavydov.dev@gmail.com>
To: Georgy Kirichenko <georgy@tarantool.org>
Cc: tarantool-patches@freelists.org
Subject: Re: [tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel
Date: Thu, 13 Jun 2019 18:17:52 +0300	[thread overview]
Message-ID: <20190613151752.4jxj34zywbvtnayn@esperanza> (raw)
In-Reply-To: <58cc2b3d0c6ee62b3b45d204165f6e107976c35c.1560112747.git.georgy@tarantool.org>

On Sun, Jun 09, 2019 at 11:44:42PM +0300, Georgy Kirichenko wrote:
> Applier use asynchronous transaction to batch journal writes. A
> sequencer orders transaction execution and check write result while
> an applier reads network.

In general, I have no objections against the way you handle this.
It does seem that we don't need to start a fiber per each transaction -
batching WAL writes should be enough in most cases, because memtx never
yields while vinyl usually doesn't yield on write (well, there are
UPDATE/INSERT but we can replace them with REPLACE on replica; before/on
replace triggers are slow by design anyway so we don't need to optimize
them).

However, I'd expect to see the reasoning in the commit message.

Also, I find the implementation a little bit too complex and have a
suggestion how we could possibly simplify it - see comments inline.

> 
> Closes: #1254
> ---
>  src/box/applier.cc | 372 ++++++++++++++++++++++++++++++---------------
>  src/box/applier.h  |   4 +
>  src/box/box.cc     |   1 +
>  3 files changed, 251 insertions(+), 126 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 5a92f6109..9f0efda5a 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -53,6 +53,231 @@
>  
>  STRS(applier_state, applier_STATE);
>  
> +/**
> + * Process a no-op request.
> + *
> + * A no-op request does not affect any space, but it
> + * promotes vclock and is written to WAL.
> + */
> +static int
> +process_nop(struct request *request)
> +{
> +	assert(request->type == IPROTO_NOP);
> +	struct txn *txn = in_txn();
> +	if (txn_begin_stmt(txn, NULL) == NULL)
> +		return -1;
> +	return txn_commit_stmt(txn, request);
> +}
> +
> +static int
> +apply_row(struct xrow_header *row)
> +{
> +	struct request request;
> +	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
> +		return -1;
> +	if (request.type == IPROTO_NOP)
> +		return process_nop(&request);
> +	struct space *space = space_cache_find(request.space_id);
> +	if (space == NULL)
> +		return -1;
> +	if (box_process_rw(&request, space, NULL) != 0) {
> +		say_error("error applying row: %s", request_str(&request));
> +		return -1;
> +	}
> +	return 0;
> +}

Please avoid moving and adding/patching code in the same patch.
Split it if necessary. In this particular case I see no reason
for you to move the code.

> +
> +/**
> + * A helper struct to link xrow objects in a list.
> + */
> +struct applier_tx_row {
> +	/* Next transaction row. */
> +	struct stailq_entry next;
> +	/* xrow_header struct for the current transaction row. */
> +	struct xrow_header row;
> +};
> +
> +struct sequencer {
> +	struct stailq txn_queue;
> +	struct fiber_cond txn_queue_cond;
> +	struct vclock net_vclock;
> +	struct fiber_cond tx_vclock_cond;

This cond is never used...

> +	struct diag diag;
> +	struct rlist on_fail;
> +};
> +
> +static struct sequencer sequencer;
> +
> +static int
> +sequencer_collect_f(va_list ap)
> +{
> +	(void) ap;
> +	while (!fiber_is_cancelled()) {
> +		while (stailq_empty(&sequencer.txn_queue)) {
> +			if (!diag_is_empty(&sequencer.diag)) {
> +				diag_clear(&sequencer.diag);
> +				vclock_copy(&sequencer.net_vclock, &replicaset.vclock);
> +			}
> +			fiber_cond_wait(&sequencer.txn_queue_cond);
> +			continue;
> +		}
> +		struct txn *txn =
> +			stailq_shift_entry(&sequencer.txn_queue, struct txn,
> +					   in_txn_cache);
> +		if (txn_wait(txn) == 0) {
> +			continue;
> +		}
> +		if (diag_is_empty(&sequencer.diag)) {
> +			diag_move(&fiber()->diag, &sequencer.diag);
> +			trigger_run(&sequencer.on_fail, NULL);
> +		}
> +	}
> +	return 0;
> +}
> +
> +void
> +applier_init()
> +{
> +	stailq_create(&sequencer.txn_queue);
> +	fiber_cond_create(&sequencer.txn_queue_cond);
> +
> +	rlist_create(&sequencer.on_fail);
> +
> +	vclock_create(&sequencer.net_vclock);
> +	fiber_cond_create(&sequencer.tx_vclock_cond);
> +	diag_create(&sequencer.diag);
> +	struct fiber *collector = fiber_new("collector", sequencer_collect_f);
> +	if (collector == NULL)
> +		panic("Failed to create a sequencer collector fiber");
> +	fiber_start(collector, NULL);
> +}
> +
> +static inline void
> +sequencer_on_fail(struct trigger *on_fail)
> +{
> +	trigger_add(&sequencer.on_fail, on_fail);
> +}
> +
> +static void
> +sequencer_rollback_cb(struct trigger *trigger, void *event)
> +{
> +	(void) trigger;
> +	struct txn *txn = (struct txn *)event;
> +	stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache);
> +	fiber_cond_signal(&sequencer.txn_queue_cond);
> +
> +	diag_set(ClientError, ER_WAL_IO);
> +	diag_move(&fiber()->diag, &sequencer.diag);
> +	trigger_run(&sequencer.on_fail, &sequencer);
> +}
> +
> +static void
> +sequencer_commit_cb(struct trigger *trigger, void *event)
> +{
> +	(void) trigger;
> +	(void) event;
> +	struct txn *txn = (struct txn *)event;
> +	stailq_add_tail(&sequencer.txn_queue, &txn->in_txn_cache);
> +	fiber_cond_signal(&sequencer.txn_queue_cond);
> +	fiber_cond_broadcast(&sequencer.tx_vclock_cond);
> +}
> +
> +static inline int
> +sequencer_submit(uint32_t replica_id, int64_t lsn, struct stailq *rows)
> +{
> +	struct replica *replica = replica_by_id(replica_id);
> +	struct latch *latch = (replica ? &replica->order_latch :
> +			      &replicaset.applier.order_latch);
> +
> +	latch_lock(latch);
> +	if (vclock_get(&sequencer.net_vclock, replica_id) >= lsn) {
> +		/* Nothing to do. */
> +		latch_unlock(latch);
> +		return 0;
> +	}
> +
> +	struct trigger *on_rollback;
> +	struct trigger *on_commit;
> +	/**
> +	 * Explicitly begin the transaction so that we can
> +	 * control fiber->gc life cycle and, in case of apply
> +	 * conflict safely access failed xrow object and allocate
> +	 * IPROTO_NOP on gc.
> +	 */
> +	struct txn *txn = txn_begin();
> +	if (txn == NULL)
> +		goto fail;
> +	struct applier_tx_row *item;
> +	stailq_foreach_entry(item, rows, next) {
> +		struct xrow_header *row = &item->row;
> +		int res = apply_row(row);
> +		if (res != 0) {
> +			struct error *e = diag_last_error(diag_get());
> +			/*
> +			 * In case of ER_TUPLE_FOUND error and enabled
> +			 * replication_skip_conflict configuration
> +			 * option, skip applying the foreign row and
> +			 * replace it with NOP in the local write ahead
> +			 * log.
> +			 */
> +			if (e->type == &type_ClientError &&
> +			    box_error_code(e) == ER_TUPLE_FOUND &&
> +			    replication_skip_conflict) {
> +				diag_clear(diag_get());
> +				row->type = IPROTO_NOP;
> +				row->bodycnt = 0;
> +				res = apply_row(row);
> +			}
> +		}
> +		if (res != 0)
> +			goto rollback;
> +	}
> +	/*
> +	 * We are going to commit so it's a high time to check if
> +	 * the current transaction has non-local effects.
> +	 */
> +	if (txn_is_distributed(txn)) {
> +		/*
> +		 * A transaction mixes remote and local rows.
> +		 * Local rows must be replicated back, which
> +		 * doesn't make sense since the master likely has
> +		 * new changes which local rows may overwrite.
> +		 * Raise an error.
> +		 */
> +		diag_set(ClientError, ER_UNSUPPORTED,
> +			 "Replication", "distributed transactions");
> +		goto rollback;
> +	}
> +
> +	/* We are ready to submit txn to wal. */
> +	on_rollback = (struct trigger *)txn_alloc(txn, sizeof(struct trigger));
> +	trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL);
> +	txn_on_rollback(txn, on_rollback);
> +
> +	on_commit = (struct trigger *)txn_alloc(txn, sizeof(struct trigger));
> +	trigger_create(on_commit, sequencer_commit_cb, NULL, NULL);
> +	txn_on_commit(txn, on_commit);
> +
> +	if (txn_write(txn) != 0)
> +		goto fail;
> +
> +	vclock_follow(&sequencer.net_vclock, replica_id, lsn);
> +	latch_unlock(latch);
> +	return 0;
> +
> +rollback:
> +	txn_rollback(txn);
> +
> +fail:
> +	latch_unlock(latch);
> +	if (diag_is_empty(&sequencer.diag)) {
> +		diag_add_error(&sequencer.diag, diag_last_error(&fiber()->diag));
> +		trigger_run(&sequencer.on_fail, NULL);
> +	}
> +	return -1;
> +}

The code is quite complex and without comments it's easy to get lost.
Please next time write proper comments explaining what all the actors
do.

Just so I understand what's going on here:

 1. applier performs requests comprising a transaction.
 2. applier submits WAL write asynchronously.
 3. wal appends the journal entry to its queue.
 4. applier sets on_commit/rollback trigger to add completed tx to
    sequencer queue.
 5. applier continues to the next transaction (goes to step 1).
 6. wal writes the request and performs the txn_complete callback.
    It also wakes up the fiber waiting for the transaction (hell,
    there's no such fiber in this case!)
 7. txn_complete callback runs on_commit trigger.
 8. on_commit trigger set by the applier adds the transaction to the
    sequencer queue.
 9. sequencer calls txn_wait. Since the transaction is complete,
    it just frees the transaction.

This is a good example of over-engineering IMHO :)

In particular, I find the sequencer entity redundant, because WAL
already acts as a request sequencer so we should reuse its queue
rather than introducing a new one.

Let's try to simplify things a bit:

 - Instead of having journal_async_write/journal_async_wait pair, let's
   introduce single method journal_submit that would take a callback
   to execute upon write completion. The method doesn't need to wake up
   the initiating fiber - it should be done by the callback itself, if
   necessary. No other methods are defined for journal.

 - Then add txn_commit and txn_commit_async that would do something like
   this:

     txn_commit:
       txn->is_async = false
       txn_do_commit()
       while (!txn_is_complete())
           fiber_yield_timeout()
       txn_free()

     txn_commit_async:
       txn->is_async = true
       txn_do_commit()

     txn_do_commit:
       do preparatory engine stuff
       wal_submit(txn_complete)

     txn_complete:
       do completing engine stuff
       run on_commit/rollback triggers
       if (txn->is_async)
           txn_free()
       else
           wakeup awaiting fiber

 - Now applier just sets on_rollback trigger to stop in case of WAL
   write error, then fires txn_commit_async and forgets about the txn -
   it'll get freed by WAL callback automatically upon completion.

No sequencer with a separate queue, no new triggers, no new journal
methods. Overall, should look much simpler. Have you considered this?
Are there any problems? Please check it out.

  reply	other threads:[~2019-06-13 15:17 UTC|newest]

Thread overview: 42+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-06-09 20:44 [tarantool-patches] [PATCH v3 00/14] Parallel applier Georgy Kirichenko
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 01/14] txn: Fire a trigger after a transaction finalization Georgy Kirichenko
2019-06-09 21:59   ` [tarantool-patches] " Konstantin Osipov
2019-06-11 11:42   ` [tarantool-patches] " Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 02/14] ddl: synchronize privileges cache with actual data state Georgy Kirichenko
2019-06-11 13:13   ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 03/14] txn: transaction memory allocation Georgy Kirichenko
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 04/14] ddl: place alter structures onto a txn memory region Georgy Kirichenko
2019-06-11 14:14   ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 05/14] txn: get rid of autocommit from a txn structure Georgy Kirichenko
2019-06-13 14:11   ` Vladimir Davydov
2019-06-16 16:20     ` [tarantool-patches] " Konstantin Osipov
2019-06-16 16:14   ` Konstantin Osipov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 06/14] txn: get rid of fiber_gc from txn_rollback Georgy Kirichenko
2019-06-13 14:12   ` Vladimir Davydov
2019-06-13 19:28     ` Георгий Кириченко
2019-06-14  9:21       ` Vladimir Davydov
2019-06-16 16:38   ` [tarantool-patches] " Konstantin Osipov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 07/14] wal: remove fiber from a journal_entry structure Georgy Kirichenko
2019-06-13 14:17   ` Vladimir Davydov
2019-06-13 19:33     ` Георгий Кириченко
2019-06-14  8:05       ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 08/14] wal: enable asyncronous wal writes Georgy Kirichenko
2019-06-13 14:21   ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 09/14] wal: a dedicated wal scheduling fiber Georgy Kirichenko
2019-06-13 14:24   ` Vladimir Davydov
2019-06-13 19:36     ` Георгий Кириченко
2019-06-14  9:20       ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 10/14] core: latch_unlock_external routine Georgy Kirichenko
2019-06-13 14:27   ` Vladimir Davydov
2019-06-13 19:38     ` Георгий Кириченко
2019-06-14  8:10       ` Vladimir Davydov
2019-06-14  9:18         ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 11/14] txn: introduce asynchronous txn commit Georgy Kirichenko
2019-06-13 14:34   ` Vladimir Davydov
2019-06-13 19:45     ` Георгий Кириченко
2019-06-14  7:58       ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 12/14] txn: handle fiber stop event at transaction level Georgy Kirichenko
2019-06-13 14:36   ` Vladimir Davydov
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel Georgy Kirichenko
2019-06-13 15:17   ` Vladimir Davydov [this message]
2019-06-09 20:44 ` [tarantool-patches] [PATCH v3 14/14] test: fix flaky test Georgy Kirichenko

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20190613151752.4jxj34zywbvtnayn@esperanza \
    --to=vdavydov.dev@gmail.com \
    --cc=georgy@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [tarantool-patches] [PATCH v3 13/14] applier: apply transaction in parallel' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox