I'm sorry, there is proper version of the commit: Applier use asynchronous transaction to batch journal writes. All appliers share the replicaset.applier.tx_vclock which means the vclock applied but not necessarily written to a journal. Appliers use a trigger to coordinate in case of failure - when a transaction is going to be rolled back. Also an applier writer condition is shared across all appliers and signaled in case of commit or hearth beat message. Closes: #1254 --- src/box/applier.cc | 156 +++++++++++++++++++++++++++++------------ src/box/applier.h | 9 ++- src/box/replication.cc | 7 ++ src/box/replication.h | 14 ++++ 4 files changed, 138 insertions(+), 48 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 5a92f6109..fee49d8ca 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -50,6 +50,7 @@ #include "schema.h" #include "txn.h" #include "box.h" +#include "scoped_guard.h" STRS(applier_state, applier_STATE); @@ -130,11 +131,24 @@ applier_writer_f(va_list ap) * replication_timeout seconds any more. */ if (applier->version_id >= version_id(1, 7, 7)) - fiber_cond_wait_timeout(&applier->writer_cond, + fiber_cond_wait_timeout(&replicaset.applier.commit_cond, TIMEOUT_INFINITY); else - fiber_cond_wait_timeout(&applier->writer_cond, + fiber_cond_wait_timeout(&replicaset.applier.commit_cond, replication_timeout); + /* + * Stay 'orphan' until appliers catch up with + * the remote vclock at the time of SUBSCRIBE + * and the lag is less than configured. + */ + if (applier->state == APPLIER_SYNC && + applier->lag <= replication_sync_lag && + vclock_compare(&applier->remote_vclock_at_subscribe, + &replicaset.vclock) <= 0) { + /* Applier is synced, switch to "follow". */ + applier_set_state(applier, APPLIER_FOLLOW); + } + /* Send ACKs only when in FOLLOW mode ,*/ if (applier->state != APPLIER_SYNC && applier->state != APPLIER_FOLLOW) @@ -565,6 +579,36 @@ applier_read_tx(struct applier *applier, struct stailq *rows) next)->row.is_commit); } +static void +sequencer_rollback_cb(struct trigger *trigger, void *event) +{ + (void) trigger; + (void) event; + diag_set(ClientError, ER_WAL_IO); + diag_move(&fiber()->diag, &replicaset.applier.diag); + trigger_run(&replicaset.applier.on_replication_fail, NULL); + vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock); +} + +static void +sequencer_commit_cb(struct trigger *trigger, void *event) +{ + (void) trigger; + (void) event; + fiber_cond_broadcast(&replicaset.applier.commit_cond); +} + +static void +applier_on_fail(struct trigger *trigger, void *event) +{ + (void) event; + struct applier *applier = (struct applier *)trigger->data; + if (!diag_is_empty(&replicaset.applier.diag)) + diag_add_error(&applier->diag, diag_last_error(&replicaset.applier.diag)); + fiber_cancel(applier->reader); + +} + /** * Apply all rows in the rows queue as a single transaction. * @@ -573,6 +617,22 @@ applier_read_tx(struct applier *applier, struct stailq *rows) static int applier_apply_tx(struct stailq *rows) { + struct xrow_header *first_row = + &stailq_first_entry(rows, struct applier_tx_row, + next)->row; + struct replica *replica = replica_by_id(first_row->replica_id); + struct latch *latch = (replica ? &replica->order_latch : + &replicaset.applier.order_latch); + latch_lock(latch); + if (vclock_get(&replicaset.applier.net_vclock, first_row->replica_id) >= + first_row->lsn) { + /* Check there is a heathbeat message and wake a writers up. */ + if (first_row->lsn == 0) + fiber_cond_broadcast(&replicaset.applier.commit_cond); + latch_unlock(latch); + return 0; + } + /** * Explicitly begin the transaction so that we can * control fiber->gc life cycle and, in case of apply @@ -581,8 +641,10 @@ applier_apply_tx(struct stailq *rows) */ struct txn *txn = txn_begin(); struct applier_tx_row *item; - if (txn == NULL) - diag_raise(); + if (txn == NULL) { + latch_unlock(latch); + return -1; + } stailq_foreach_entry(item, rows, next) { struct xrow_header *row = &item->row; int res = apply_row(row); @@ -623,10 +685,34 @@ applier_apply_tx(struct stailq *rows) "Replication", "distributed transactions"); goto rollback; } - return txn_commit(txn); + /* We are ready to submit txn to wal. */ + struct trigger *on_rollback, *on_commit; + on_rollback = (struct trigger *)region_alloc(&txn->region, + sizeof(struct trigger)); + on_commit = (struct trigger *)region_alloc(&txn->region, + sizeof(struct trigger)); + if (on_rollback == NULL || on_commit == NULL) + goto rollback; + + trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL); + txn_on_rollback(txn, on_rollback); + + trigger_create(on_commit, sequencer_commit_cb, NULL, NULL); + txn_on_commit(txn, on_commit); + + if (txn_write(txn) < 0) + goto fail; + /* Transaction was sent to journal so promote vclock. */ + vclock_follow(&replicaset.applier.net_vclock, first_row->replica_id, + first_row->lsn); + latch_unlock(latch); + + return 0; rollback: txn_rollback(txn); +fail: + latch_unlock(latch); fiber_gc(); return -1; } @@ -641,7 +727,6 @@ applier_subscribe(struct applier *applier) struct ev_io *coio = &applier->io; struct ibuf *ibuf = &applier->ibuf; struct xrow_header row; - struct vclock remote_vclock_at_subscribe; struct tt_uuid cluster_id = uuid_nil; struct vclock vclock; @@ -668,10 +753,10 @@ applier_subscribe(struct applier *applier) * the replica, and replica has to check whether * its and master's cluster ids match. */ - vclock_create(&remote_vclock_at_subscribe); + vclock_create(&applier->remote_vclock_at_subscribe); xrow_decode_subscribe_response_xc(&row, &cluster_id, - &remote_vclock_at_subscribe); + &applier- >remote_vclock_at_subscribe); /* * If master didn't send us its cluster id * assume that it has done all the checks. @@ -686,7 +771,7 @@ applier_subscribe(struct applier *applier) say_info("subscribed"); say_info("remote vclock %s local vclock %s", - vclock_to_string(&remote_vclock_at_subscribe), + vclock_to_string(&applier->remote_vclock_at_subscribe), vclock_to_string(&vclock)); } /* @@ -735,6 +820,15 @@ applier_subscribe(struct applier *applier) applier->lag = TIMEOUT_INFINITY; + /* Register a trigger to handle replication failures. */ + struct trigger on_fail; + trigger_create(&on_fail, applier_on_fail, applier, NULL); + trigger_add(&replicaset.applier.on_replication_fail, &on_fail); + auto trigger_guard = make_scoped_guard([&] { + trigger_clear(&on_fail); + }); + + /* * Process a stream of rows from the binary log. */ @@ -747,47 +841,13 @@ applier_subscribe(struct applier *applier) applier_set_state(applier, APPLIER_FOLLOW); } - /* - * Stay 'orphan' until appliers catch up with - * the remote vclock at the time of SUBSCRIBE - * and the lag is less than configured. - */ - if (applier->state == APPLIER_SYNC && - applier->lag <= replication_sync_lag && - vclock_compare(&remote_vclock_at_subscribe, - &replicaset.vclock) <= 0) { - /* Applier is synced, switch to "follow". */ - applier_set_state(applier, APPLIER_FOLLOW); - } - struct stailq rows; applier_read_tx(applier, &rows); - struct xrow_header *first_row = - &stailq_first_entry(&rows, struct applier_tx_row, - next)->row; applier->last_row_time = ev_monotonic_now(loop()); - struct replica *replica = replica_by_id(first_row->replica_id); - struct latch *latch = (replica ? &replica->order_latch : - &replicaset.applier.order_latch); - /* - * In a full mesh topology, the same set of changes - * may arrive via two concurrently running appliers. - * Hence we need a latch to strictly order all changes - * that belong to the same server id. - */ - latch_lock(latch); - if (vclock_get(&replicaset.vclock, first_row->replica_id) < - first_row->lsn && - applier_apply_tx(&rows) != 0) { - latch_unlock(latch); + if (applier_apply_tx(&rows) != 0) diag_raise(); - } - latch_unlock(latch); - if (applier->state == APPLIER_SYNC || - applier->state == APPLIER_FOLLOW) - fiber_cond_signal(&applier->writer_cond); if (ibuf_used(ibuf) == 0) ibuf_reset(ibuf); fiber_gc(); @@ -872,6 +932,11 @@ applier_f(va_list ap) return -1; } } catch (FiberIsCancelled *e) { + if (!diag_is_empty(&applier->diag)) { + diag_move(&applier->diag, &fiber()->diag); + applier_disconnect(applier, APPLIER_STOPPED); + break; + } applier_disconnect(applier, APPLIER_OFF); break; } catch (SocketError *e) { @@ -959,7 +1024,7 @@ applier_new(const char *uri) applier->last_row_time = ev_monotonic_now(loop()); rlist_create(&applier->on_state); fiber_cond_create(&applier->resume_cond); - fiber_cond_create(&applier->writer_cond); + diag_create(&applier->diag); return applier; } @@ -972,7 +1037,6 @@ applier_delete(struct applier *applier) assert(applier->io.fd == -1); trigger_destroy(&applier->on_state); fiber_cond_destroy(&applier->resume_cond); - fiber_cond_destroy(&applier->writer_cond); free(applier); } diff --git a/src/box/applier.h b/src/box/applier.h index 5bff90031..716da32e2 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -74,8 +74,6 @@ struct applier { struct fiber *reader; /** Background fiber to reply with vclock */ struct fiber *writer; - /** Writer cond. */ - struct fiber_cond writer_cond; /** Finite-state machine */ enum applier_state state; /** Local time of this replica when the last row has been received */ @@ -114,8 +112,15 @@ struct applier { bool is_paused; /** Condition variable signaled to resume the applier. */ struct fiber_cond resume_cond; + /* Diag to raise an error. */ + struct diag diag; + /* The masters vclock while subscribe. */ + struct vclock remote_vclock_at_subscribe; }; +void +applier_init(); + /** * Start a client to a remote master using a background fiber. * diff --git a/src/box/replication.cc b/src/box/replication.cc index a1a2a9eb3..fd4d4e387 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -90,6 +90,13 @@ replication_init(void) fiber_cond_create(&replicaset.applier.cond); replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *)); latch_create(&replicaset.applier.order_latch); + + vclock_create(&replicaset.applier.net_vclock); + vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock); + rlist_create(&replicaset.applier.on_replication_fail); + + fiber_cond_create(&replicaset.applier.commit_cond); + diag_create(&replicaset.applier.diag); } void diff --git a/src/box/replication.h b/src/box/replication.h index 8c8a9927e..a4830f5b5 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -232,6 +232,20 @@ struct replicaset { * struct replica object). */ struct latch order_latch; + /* + * A vclock of the last transaction wich was read + * from an applier connection. + */ + struct vclock net_vclock; + /* Signaled on replicated transaction commit. */ + struct fiber_cond commit_cond; + /* + * Trigger to fire when replication stops in case + * of an error. + */ + struct rlist on_replication_fail; + /* Diag to populate an error acros all appliers. */ + struct diag diag; } applier; /** Map of all known replica_id's to correspponding replica's. */ struct replica **replica_by_id; -- 2.22.0