[tarantool-patches] Re: [PATCH v4 8/9] applier: apply transaction in parallel
Vladimir Davydov
vdavydov.dev at gmail.com
Thu Jun 20 19:37:09 MSK 2019
On Thu, Jun 20, 2019 at 10:41:10AM +0300, Георгий Кириченко wrote:
> I'm sorry, there is proper version of the commit:
>
> Applier use asynchronous transaction to batch journal writes. All
> appliers share the replicaset.applier.tx_vclock which means the vclock
> applied but not necessarily written to a journal. Appliers use a trigger
> to coordinate in case of failure - when a transaction is going to
> be rolled back. Also an applier writer condition is shared across all
> appliers and signaled in case of commit or hearth beat message.
>
> Closes: #1254
> ---
> src/box/applier.cc | 156 +++++++++++++++++++++++++++++------------
> src/box/applier.h | 9 ++-
> src/box/replication.cc | 7 ++
> src/box/replication.h | 14 ++++
> 4 files changed, 138 insertions(+), 48 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 5a92f6109..fee49d8ca 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -50,6 +50,7 @@
> #include "schema.h"
> #include "txn.h"
> #include "box.h"
> +#include "scoped_guard.h"
>
> STRS(applier_state, applier_STATE);
>
> @@ -130,11 +131,24 @@ applier_writer_f(va_list ap)
> * replication_timeout seconds any more.
> */
> if (applier->version_id >= version_id(1, 7, 7))
> - fiber_cond_wait_timeout(&applier->writer_cond,
> + fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
> TIMEOUT_INFINITY);
> else
> - fiber_cond_wait_timeout(&applier->writer_cond,
> + fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
> replication_timeout);
Why replace applier->writer_cond with replicaset.applier.commit_cond?
This means that even if only one applier is active, we will wake up all
of the writers on each commit, which looks strange.
> + /*
> + * Stay 'orphan' until appliers catch up with
> + * the remote vclock at the time of SUBSCRIBE
> + * and the lag is less than configured.
> + */
> + if (applier->state == APPLIER_SYNC &&
> + applier->lag <= replication_sync_lag &&
> + vclock_compare(&applier->remote_vclock_at_subscribe,
> + &replicaset.vclock) <= 0) {
> + /* Applier is synced, switch to "follow". */
> + applier_set_state(applier, APPLIER_FOLLOW);
> + }
> +
A writer is supposed to send ACKs, not change the applier state.
How did this wind up here? Can't we do this right from the on_commit
trigger?
> /* Send ACKs only when in FOLLOW mode ,*/
> if (applier->state != APPLIER_SYNC &&
> applier->state != APPLIER_FOLLOW)
> @@ -565,6 +579,36 @@ applier_read_tx(struct applier *applier, struct stailq
> *rows)
> next)->row.is_commit);
> }
>
> +static void
> +sequencer_rollback_cb(struct trigger *trigger, void *event)
There's no sequencer object so the names are confusing. Let's call them
applier_on_rollback/commit?
> +{
> + (void) trigger;
> + (void) event;
> + diag_set(ClientError, ER_WAL_IO);
> + diag_move(&fiber()->diag, &replicaset.applier.diag);
> + trigger_run(&replicaset.applier.on_replication_fail, NULL);
> + vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock);
> +}
> +
> +static void
> +sequencer_commit_cb(struct trigger *trigger, void *event)
> +{
> + (void) trigger;
> + (void) event;
> + fiber_cond_broadcast(&replicaset.applier.commit_cond);
> +}
> +
> +static void
> +applier_on_fail(struct trigger *trigger, void *event)
> +{
> + (void) event;
> + struct applier *applier = (struct applier *)trigger->data;
> + if (!diag_is_empty(&replicaset.applier.diag))
> + diag_add_error(&applier->diag,
> diag_last_error(&replicaset.applier.diag));
> + fiber_cancel(applier->reader);
> +
> +}
> +
> /**
> * Apply all rows in the rows queue as a single transaction.
> *
> @@ -573,6 +617,22 @@ applier_read_tx(struct applier *applier, struct stailq
> *rows)
> static int
> applier_apply_tx(struct stailq *rows)
> {
> + struct xrow_header *first_row =
> + &stailq_first_entry(rows, struct applier_tx_row,
> + next)->row;
> + struct replica *replica = replica_by_id(first_row->replica_id);
> + struct latch *latch = (replica ? &replica->order_latch :
> + &replicaset.applier.order_latch);
> + latch_lock(latch);
> + if (vclock_get(&replicaset.applier.net_vclock, first_row->replica_id) >=
> + first_row->lsn) {
> + /* Check there is a heathbeat message and wake a writers up. */
> + if (first_row->lsn == 0)
> + fiber_cond_broadcast(&replicaset.applier.commit_cond);
Would be better to check that before taking the latch. We don't need the
latch to reply to a heartbeat message, do we?
> + latch_unlock(latch);
> + return 0;
> + }
> +
> /**
> * Explicitly begin the transaction so that we can
> * control fiber->gc life cycle and, in case of apply
> @@ -581,8 +641,10 @@ applier_apply_tx(struct stailq *rows)
> */
> struct txn *txn = txn_begin();
> struct applier_tx_row *item;
> - if (txn == NULL)
> - diag_raise();
> + if (txn == NULL) {
> + latch_unlock(latch);
> + return -1;
> + }
> stailq_foreach_entry(item, rows, next) {
> struct xrow_header *row = &item->row;
> int res = apply_row(row);
> @@ -623,10 +685,34 @@ applier_apply_tx(struct stailq *rows)
> "Replication", "distributed transactions");
> goto rollback;
> }
> - return txn_commit(txn);
>
> + /* We are ready to submit txn to wal. */
> + struct trigger *on_rollback, *on_commit;
> + on_rollback = (struct trigger *)region_alloc(&txn->region,
> + sizeof(struct trigger));
> + on_commit = (struct trigger *)region_alloc(&txn->region,
> + sizeof(struct trigger));
> + if (on_rollback == NULL || on_commit == NULL)
> + goto rollback;
> +
> + trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL);
> + txn_on_rollback(txn, on_rollback);
> +
> + trigger_create(on_commit, sequencer_commit_cb, NULL, NULL);
> + txn_on_commit(txn, on_commit);
> +
> + if (txn_write(txn) < 0)
> + goto fail;
> + /* Transaction was sent to journal so promote vclock. */
> + vclock_follow(&replicaset.applier.net_vclock, first_row->replica_id,
> + first_row->lsn);
> + latch_unlock(latch);
> +
> + return 0;
> rollback:
> txn_rollback(txn);
> +fail:
> + latch_unlock(latch);
> fiber_gc();
> return -1;
> }
> @@ -641,7 +727,6 @@ applier_subscribe(struct applier *applier)
> struct ev_io *coio = &applier->io;
> struct ibuf *ibuf = &applier->ibuf;
> struct xrow_header row;
> - struct vclock remote_vclock_at_subscribe;
> struct tt_uuid cluster_id = uuid_nil;
>
> struct vclock vclock;
> @@ -668,10 +753,10 @@ applier_subscribe(struct applier *applier)
> * the replica, and replica has to check whether
> * its and master's cluster ids match.
> */
> - vclock_create(&remote_vclock_at_subscribe);
> + vclock_create(&applier->remote_vclock_at_subscribe);
> xrow_decode_subscribe_response_xc(&row,
> &cluster_id,
> - &remote_vclock_at_subscribe);
> + &applier-
> >remote_vclock_at_subscribe);
> /*
> * If master didn't send us its cluster id
> * assume that it has done all the checks.
> @@ -686,7 +771,7 @@ applier_subscribe(struct applier *applier)
>
> say_info("subscribed");
> say_info("remote vclock %s local vclock %s",
> - vclock_to_string(&remote_vclock_at_subscribe),
> + vclock_to_string(&applier->remote_vclock_at_subscribe),
> vclock_to_string(&vclock));
> }
> /*
> @@ -735,6 +820,15 @@ applier_subscribe(struct applier *applier)
>
> applier->lag = TIMEOUT_INFINITY;
>
> + /* Register a trigger to handle replication failures. */
> + struct trigger on_fail;
> + trigger_create(&on_fail, applier_on_fail, applier, NULL);
> + trigger_add(&replicaset.applier.on_replication_fail, &on_fail);
Why do we need on_replication_fail trigger? AFAICS it is called from
on_rollback callback. Can't we call applier_on_fail right from there,
without the use of the intermediary?
> + auto trigger_guard = make_scoped_guard([&] {
> + trigger_clear(&on_fail);
> + });
> +
> +
> /*
> * Process a stream of rows from the binary log.
> */
> @@ -747,47 +841,13 @@ applier_subscribe(struct applier *applier)
> applier_set_state(applier, APPLIER_FOLLOW);
> }
>
> - /*
> - * Stay 'orphan' until appliers catch up with
> - * the remote vclock at the time of SUBSCRIBE
> - * and the lag is less than configured.
> - */
> - if (applier->state == APPLIER_SYNC &&
> - applier->lag <= replication_sync_lag &&
> - vclock_compare(&remote_vclock_at_subscribe,
> - &replicaset.vclock) <= 0) {
> - /* Applier is synced, switch to "follow". */
> - applier_set_state(applier, APPLIER_FOLLOW);
> - }
> -
> struct stailq rows;
> applier_read_tx(applier, &rows);
>
> - struct xrow_header *first_row =
> - &stailq_first_entry(&rows, struct applier_tx_row,
> - next)->row;
> applier->last_row_time = ev_monotonic_now(loop());
> - struct replica *replica = replica_by_id(first_row->replica_id);
> - struct latch *latch = (replica ? &replica->order_latch :
> - &replicaset.applier.order_latch);
> - /*
> - * In a full mesh topology, the same set of changes
> - * may arrive via two concurrently running appliers.
> - * Hence we need a latch to strictly order all changes
> - * that belong to the same server id.
> - */
Why did you remove this comment? (You didn't move it, I checked).
> - latch_lock(latch);
> - if (vclock_get(&replicaset.vclock, first_row->replica_id) <
> - first_row->lsn &&
> - applier_apply_tx(&rows) != 0) {
> - latch_unlock(latch);
> + if (applier_apply_tx(&rows) != 0)
> diag_raise();
> - }
> - latch_unlock(latch);
>
> - if (applier->state == APPLIER_SYNC ||
> - applier->state == APPLIER_FOLLOW)
> - fiber_cond_signal(&applier->writer_cond);
> if (ibuf_used(ibuf) == 0)
> ibuf_reset(ibuf);
> fiber_gc();
> @@ -872,6 +932,11 @@ applier_f(va_list ap)
> return -1;
> }
> } catch (FiberIsCancelled *e) {
> + if (!diag_is_empty(&applier->diag)) {
> + diag_move(&applier->diag, &fiber()->diag);
Hmm, AFAIK we only need this diag for box.info.replication. May be,
better patch box.info.replication to use applier->diag instead of
applier->reader->diag so that we don't need to move errors around?
> + applier_disconnect(applier, APPLIER_STOPPED);
> + break;
> + }
> applier_disconnect(applier, APPLIER_OFF);
> break;
> } catch (SocketError *e) {
> @@ -959,7 +1024,7 @@ applier_new(const char *uri)
> applier->last_row_time = ev_monotonic_now(loop());
> rlist_create(&applier->on_state);
> fiber_cond_create(&applier->resume_cond);
> - fiber_cond_create(&applier->writer_cond);
> + diag_create(&applier->diag);
>
> return applier;
> }
> @@ -972,7 +1037,6 @@ applier_delete(struct applier *applier)
> assert(applier->io.fd == -1);
> trigger_destroy(&applier->on_state);
> fiber_cond_destroy(&applier->resume_cond);
> - fiber_cond_destroy(&applier->writer_cond);
> free(applier);
> }
>
> diff --git a/src/box/applier.h b/src/box/applier.h
> index 5bff90031..716da32e2 100644
> --- a/src/box/applier.h
> +++ b/src/box/applier.h
> @@ -74,8 +74,6 @@ struct applier {
> struct fiber *reader;
> /** Background fiber to reply with vclock */
> struct fiber *writer;
> - /** Writer cond. */
> - struct fiber_cond writer_cond;
> /** Finite-state machine */
> enum applier_state state;
> /** Local time of this replica when the last row has been received */
> @@ -114,8 +112,15 @@ struct applier {
> bool is_paused;
> /** Condition variable signaled to resume the applier. */
> struct fiber_cond resume_cond;
> + /* Diag to raise an error. */
> + struct diag diag;
> + /* The masters vclock while subscribe. */
> + struct vclock remote_vclock_at_subscribe;
> };
>
> +void
> +applier_init();
> +
This function isn't defined anywhere.
> diff --git a/src/box/replication.h b/src/box/replication.h
> index 8c8a9927e..a4830f5b5 100644
> --- a/src/box/replication.h
> +++ b/src/box/replication.h
> @@ -232,6 +232,20 @@ struct replicaset {
> * struct replica object).
> */
> struct latch order_latch;
> + /*
> + * A vclock of the last transaction wich was read
> + * from an applier connection.
> + */
> + struct vclock net_vclock;
Please elaborate. Can it be less than replicaset.vclock? Can it be
greater? Why?
> + /* Signaled on replicated transaction commit. */
> + struct fiber_cond commit_cond;
> + /*
> + * Trigger to fire when replication stops in case
> + * of an error.
> + */
> + struct rlist on_replication_fail;
> + /* Diag to populate an error acros all appliers. */
> + struct diag diag;
> } applier;
> /** Map of all known replica_id's to correspponding replica's. */
> struct replica **replica_by_id;
More information about the Tarantool-patches
mailing list