From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id AC7AC30D24 for ; Wed, 19 Jun 2019 17:23:24 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id C1dHM5L48G_w for ; Wed, 19 Jun 2019 17:23:24 -0400 (EDT) Received: from smtp63.i.mail.ru (smtp63.i.mail.ru [217.69.128.43]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 34F6230D07 for ; Wed, 19 Jun 2019 17:23:24 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v4 8/9] applier: apply transaction in parallel Date: Thu, 20 Jun 2019 00:23:15 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Applier use asynchronous transaction to batch journal writes. All appliers share the replicaset.applier.tx_vclock which means the vclock applied but not necessarily written to a journal. Appliers use a trigger to coordinate in case of failure - when a transaction is going to be rolled back. Also an applier writer condition is shared across all appliers and signaled in case of commit or hearth beat message. Closes: #1254 --- src/box/applier.cc | 123 +++++++++++++++++++++++++++++++---------- src/box/applier.h | 6 +- src/box/replication.cc | 7 +++ src/box/replication.h | 14 +++++ 4 files changed, 119 insertions(+), 31 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 5a92f6109..252dd58ea 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -50,6 +50,7 @@ #include "schema.h" #include "txn.h" #include "box.h" +#include "scoped_guard.h" STRS(applier_state, applier_STATE); @@ -130,10 +131,10 @@ applier_writer_f(va_list ap) * replication_timeout seconds any more. */ if (applier->version_id >= version_id(1, 7, 7)) - fiber_cond_wait_timeout(&applier->writer_cond, + fiber_cond_wait_timeout(&replicaset.applier.commit_cond, TIMEOUT_INFINITY); else - fiber_cond_wait_timeout(&applier->writer_cond, + fiber_cond_wait_timeout(&replicaset.applier.commit_cond, replication_timeout); /* Send ACKs only when in FOLLOW mode ,*/ if (applier->state != APPLIER_SYNC && @@ -565,6 +566,36 @@ applier_read_tx(struct applier *applier, struct stailq *rows) next)->row.is_commit); } +static void +sequencer_rollback_cb(struct trigger *trigger, void *event) +{ + (void) trigger; + (void) event; + diag_set(ClientError, ER_WAL_IO); + diag_move(&fiber()->diag, &replicaset.applier.diag); + trigger_run(&replicaset.applier.on_replication_fail, NULL); + vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock); +} + +static void +sequencer_commit_cb(struct trigger *trigger, void *event) +{ + (void) trigger; + (void) event; + fiber_cond_broadcast(&replicaset.applier.commit_cond); +} + +static void +applier_on_fail(struct trigger *trigger, void *event) +{ + (void) event; + struct applier *applier = (struct applier *)trigger->data; + if (!diag_is_empty(&replicaset.applier.diag)) + diag_add_error(&applier->diag, diag_last_error(&replicaset.applier.diag)); + fiber_cancel(applier->reader); + +} + /** * Apply all rows in the rows queue as a single transaction. * @@ -573,6 +604,22 @@ applier_read_tx(struct applier *applier, struct stailq *rows) static int applier_apply_tx(struct stailq *rows) { + struct xrow_header *first_row = + &stailq_first_entry(rows, struct applier_tx_row, + next)->row; + struct replica *replica = replica_by_id(first_row->replica_id); + struct latch *latch = (replica ? &replica->order_latch : + &replicaset.applier.order_latch); + latch_lock(latch); + if (vclock_get(&replicaset.applier.net_vclock, first_row->replica_id) >= + first_row->lsn) { + /* Check there is a heathbeat message and wake a writers up. */ + if (first_row->lsn == 0) + fiber_cond_broadcast(&replicaset.applier.commit_cond); + latch_unlock(latch); + return 0; + } + /** * Explicitly begin the transaction so that we can * control fiber->gc life cycle and, in case of apply @@ -581,8 +628,10 @@ applier_apply_tx(struct stailq *rows) */ struct txn *txn = txn_begin(); struct applier_tx_row *item; - if (txn == NULL) - diag_raise(); + if (txn == NULL) { + latch_unlock(latch); + return -1; + } stailq_foreach_entry(item, rows, next) { struct xrow_header *row = &item->row; int res = apply_row(row); @@ -623,10 +672,34 @@ applier_apply_tx(struct stailq *rows) "Replication", "distributed transactions"); goto rollback; } - return txn_commit(txn); + /* We are ready to submit txn to wal. */ + struct trigger *on_rollback, *on_commit; + on_rollback = (struct trigger *)region_alloc(&txn->region, + sizeof(struct trigger)); + on_commit = (struct trigger *)region_alloc(&txn->region, + sizeof(struct trigger)); + if (on_rollback == NULL || on_commit == NULL) + goto rollback; + + trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL); + txn_on_rollback(txn, on_rollback); + + trigger_create(on_commit, sequencer_commit_cb, NULL, NULL); + txn_on_commit(txn, on_commit); + + if (txn_write(txn) < 0) + goto fail; + /* Transaction was sent to journal so promote vclock. */ + vclock_follow(&replicaset.applier.net_vclock, first_row->replica_id, + first_row->lsn); + latch_unlock(latch); + + return 0; rollback: txn_rollback(txn); +fail: + latch_unlock(latch); fiber_gc(); return -1; } @@ -735,6 +808,15 @@ applier_subscribe(struct applier *applier) applier->lag = TIMEOUT_INFINITY; + /* Register a trigger to handle replication failures. */ + struct trigger on_fail; + trigger_create(&on_fail, applier_on_fail, applier, NULL); + trigger_add(&replicaset.applier.on_replication_fail, &on_fail); + auto trigger_guard = make_scoped_guard([&] { + trigger_clear(&on_fail); + }); + + /* * Process a stream of rows from the binary log. */ @@ -763,31 +845,10 @@ applier_subscribe(struct applier *applier) struct stailq rows; applier_read_tx(applier, &rows); - struct xrow_header *first_row = - &stailq_first_entry(&rows, struct applier_tx_row, - next)->row; applier->last_row_time = ev_monotonic_now(loop()); - struct replica *replica = replica_by_id(first_row->replica_id); - struct latch *latch = (replica ? &replica->order_latch : - &replicaset.applier.order_latch); - /* - * In a full mesh topology, the same set of changes - * may arrive via two concurrently running appliers. - * Hence we need a latch to strictly order all changes - * that belong to the same server id. - */ - latch_lock(latch); - if (vclock_get(&replicaset.vclock, first_row->replica_id) < - first_row->lsn && - applier_apply_tx(&rows) != 0) { - latch_unlock(latch); + if (applier_apply_tx(&rows) != 0) diag_raise(); - } - latch_unlock(latch); - if (applier->state == APPLIER_SYNC || - applier->state == APPLIER_FOLLOW) - fiber_cond_signal(&applier->writer_cond); if (ibuf_used(ibuf) == 0) ibuf_reset(ibuf); fiber_gc(); @@ -872,6 +933,11 @@ applier_f(va_list ap) return -1; } } catch (FiberIsCancelled *e) { + if (!diag_is_empty(&applier->diag)) { + diag_move(&applier->diag, &fiber()->diag); + applier_disconnect(applier, APPLIER_STOPPED); + break; + } applier_disconnect(applier, APPLIER_OFF); break; } catch (SocketError *e) { @@ -959,7 +1025,7 @@ applier_new(const char *uri) applier->last_row_time = ev_monotonic_now(loop()); rlist_create(&applier->on_state); fiber_cond_create(&applier->resume_cond); - fiber_cond_create(&applier->writer_cond); + diag_create(&applier->diag); return applier; } @@ -972,7 +1038,6 @@ applier_delete(struct applier *applier) assert(applier->io.fd == -1); trigger_destroy(&applier->on_state); fiber_cond_destroy(&applier->resume_cond); - fiber_cond_destroy(&applier->writer_cond); free(applier); } diff --git a/src/box/applier.h b/src/box/applier.h index 5bff90031..348fdacf2 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -74,8 +74,6 @@ struct applier { struct fiber *reader; /** Background fiber to reply with vclock */ struct fiber *writer; - /** Writer cond. */ - struct fiber_cond writer_cond; /** Finite-state machine */ enum applier_state state; /** Local time of this replica when the last row has been received */ @@ -114,8 +112,12 @@ struct applier { bool is_paused; /** Condition variable signaled to resume the applier. */ struct fiber_cond resume_cond; + struct diag diag; }; +void +applier_init(); + /** * Start a client to a remote master using a background fiber. * diff --git a/src/box/replication.cc b/src/box/replication.cc index a1a2a9eb3..fd4d4e387 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -90,6 +90,13 @@ replication_init(void) fiber_cond_create(&replicaset.applier.cond); replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *)); latch_create(&replicaset.applier.order_latch); + + vclock_create(&replicaset.applier.net_vclock); + vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock); + rlist_create(&replicaset.applier.on_replication_fail); + + fiber_cond_create(&replicaset.applier.commit_cond); + diag_create(&replicaset.applier.diag); } void diff --git a/src/box/replication.h b/src/box/replication.h index 8c8a9927e..a4830f5b5 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -232,6 +232,20 @@ struct replicaset { * struct replica object). */ struct latch order_latch; + /* + * A vclock of the last transaction wich was read + * from an applier connection. + */ + struct vclock net_vclock; + /* Signaled on replicated transaction commit. */ + struct fiber_cond commit_cond; + /* + * Trigger to fire when replication stops in case + * of an error. + */ + struct rlist on_replication_fail; + /* Diag to populate an error acros all appliers. */ + struct diag diag; } applier; /** Map of all known replica_id's to correspponding replica's. */ struct replica **replica_by_id; -- 2.22.0