From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 8EB5D3166D for ; Fri, 21 Jun 2019 17:48:28 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id jUb7vqianJ6E for ; Fri, 21 Jun 2019 17:48:28 -0400 (EDT) Received: from smtp51.i.mail.ru (smtp51.i.mail.ru [94.100.177.111]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 262B531647 for ; Fri, 21 Jun 2019 17:48:28 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v5 6/7] applier: apply transaction in parallel Date: Sat, 22 Jun 2019 00:48:20 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Applier use asynchronous transaction to batch journal writes. All appliers share the replicaset.applier.tx_vclock which means the vclock applied but not necessarily written to a journal. Appliers use a trigger to coordinate in case of failure - when a transaction is going to be rolled back. Closes: #1254 --- src/box/applier.cc | 188 ++++++++++++++++++++++++++++++++--------- src/box/applier.h | 7 ++ src/box/replication.cc | 7 ++ src/box/replication.h | 11 +++ 4 files changed, 172 insertions(+), 41 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 6f93759a8..9465b071a 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -50,6 +50,7 @@ #include "schema.h" #include "txn.h" #include "box.h" +#include "scoped_guard.h" STRS(applier_state, applier_STATE); @@ -108,6 +109,26 @@ applier_log_error(struct applier *applier, struct error *e) applier->last_logged_errcode = errcode; } +/* + * A helper function to track an applier state. + */ +static inline void +applier_check_state(struct applier *applier) +{ + /* + * Stay 'orphan' until appliers catch up with + * the remote vclock at the time of SUBSCRIBE + * and the lag is less than configured. + */ + if (applier->state == APPLIER_SYNC && + applier->lag <= replication_sync_lag && + vclock_compare(&applier->remote_vclock_at_subscribe, + &replicaset.vclock) <= 0) { + /* Applier is synced, switch to "follow". */ + applier_set_state(applier, APPLIER_FOLLOW); + } +} + /* * Fiber function to write vclock to replication master. * To track connection status, replica answers master @@ -135,6 +156,12 @@ applier_writer_f(va_list ap) else fiber_cond_wait_timeout(&applier->writer_cond, replication_timeout); + /* A writer fiber is going to be awaken after a commit or + * a heartbeat message. So this is a appropriate place to + * update an applier status because the applier state could + * yield and doesn't fit into a commit trigger. + */ + applier_check_state(applier); /* Send ACKs only when in FOLLOW mode ,*/ if (applier->state != APPLIER_SYNC && applier->state != APPLIER_FOLLOW) @@ -574,6 +601,27 @@ applier_read_tx(struct applier *applier, struct stailq *rows) next)->row.is_commit); } +/* A trigger to control a replicated transaction rollback. */ +static void +applier_txn_rollback_cb(struct trigger *trigger, void *event) +{ + (void) trigger; + /* Setup shared applier diagnostic area. */ + diag_set(ClientError, ER_WAL_IO); + diag_move(&fiber()->diag, &replicaset.applier.diag); + trigger_run(&replicaset.applier.on_rollback, event); + /* Rollback applier vclock to the commited one. */ + vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); +} + +static void +applier_txn_commit_cb(struct trigger *trigger, void *event) +{ + (void) trigger; + /* Broadcast the commit event across all appliers. */ + trigger_run(&replicaset.applier.on_commit, event); +} + /** * Apply all rows in the rows queue as a single transaction. * @@ -582,6 +630,19 @@ applier_read_tx(struct applier *applier, struct stailq *rows) static int applier_apply_tx(struct stailq *rows) { + struct xrow_header *first_row = + &stailq_first_entry(rows, struct applier_tx_row, + next)->row; + struct replica *replica = replica_by_id(first_row->replica_id); + struct latch *latch = (replica ? &replica->order_latch : + &replicaset.applier.order_latch); + latch_lock(latch); + if (vclock_get(&replicaset.applier.vclock, first_row->replica_id) >= + first_row->lsn) { + latch_unlock(latch); + return 0; + } + /** * Explicitly begin the transaction so that we can * control fiber->gc life cycle and, in case of apply @@ -590,8 +651,10 @@ applier_apply_tx(struct stailq *rows) */ struct txn *txn = txn_begin(); struct applier_tx_row *item; - if (txn == NULL) - diag_raise(); + if (txn == NULL) { + latch_unlock(latch); + return -1; + } stailq_foreach_entry(item, rows, next) { struct xrow_header *row = &item->row; int res = apply_row(row); @@ -632,14 +695,63 @@ applier_apply_tx(struct stailq *rows) "Replication", "distributed transactions"); goto rollback; } - return txn_commit(txn); + /* We are ready to submit txn to wal. */ + struct trigger *on_rollback, *on_commit; + on_rollback = (struct trigger *)region_alloc(&txn->region, + sizeof(struct trigger)); + on_commit = (struct trigger *)region_alloc(&txn->region, + sizeof(struct trigger)); + if (on_rollback == NULL || on_commit == NULL) + goto rollback; + + trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL); + txn_on_rollback(txn, on_rollback); + + trigger_create(on_commit, applier_txn_commit_cb, NULL, NULL); + txn_on_commit(txn, on_commit); + + if (txn_write(txn) < 0) + goto fail; + /* Transaction was sent to journal so promote vclock. */ + vclock_follow(&replicaset.applier.vclock, first_row->replica_id, + first_row->lsn); + latch_unlock(latch); + + return 0; rollback: txn_rollback(txn); +fail: + latch_unlock(latch); fiber_gc(); return -1; } +/* + * A trigger to update an applier state after a replication commit. + */ +static void +applier_on_commit(struct trigger *trigger, void *event) +{ + (void) event; + struct applier *applier = (struct applier *)trigger->data; + fiber_cond_signal(&applier->writer_cond); +} + +/* + * A trigger to update an applier state after a replication rollback. + */ +static void +applier_on_rollback(struct trigger *trigger, void *event) +{ + (void) event; + struct applier *applier = (struct applier *)trigger->data; + /* Setup a shared error. */ + if (!diag_is_empty(&replicaset.applier.diag)) + diag_add_error(&applier->diag, diag_last_error(&replicaset.applier.diag)); + fiber_cancel(applier->reader); +} + /** * Execute and process SUBSCRIBE request (follow updates from a master). */ @@ -650,7 +762,6 @@ applier_subscribe(struct applier *applier) struct ev_io *coio = &applier->io; struct ibuf *ibuf = &applier->ibuf; struct xrow_header row; - struct vclock remote_vclock_at_subscribe; struct tt_uuid cluster_id = uuid_nil; struct vclock vclock; @@ -677,10 +788,10 @@ applier_subscribe(struct applier *applier) * the replica, and replica has to check whether * its and master's cluster ids match. */ - vclock_create(&remote_vclock_at_subscribe); + vclock_create(&applier->remote_vclock_at_subscribe); xrow_decode_subscribe_response_xc(&row, &cluster_id, - &remote_vclock_at_subscribe); + &applier->remote_vclock_at_subscribe); /* * If master didn't send us its cluster id * assume that it has done all the checks. @@ -695,7 +806,7 @@ applier_subscribe(struct applier *applier) say_info("subscribed"); say_info("remote vclock %s local vclock %s", - vclock_to_string(&remote_vclock_at_subscribe), + vclock_to_string(&applier->remote_vclock_at_subscribe), vclock_to_string(&vclock)); } /* @@ -744,6 +855,21 @@ applier_subscribe(struct applier *applier) applier->lag = TIMEOUT_INFINITY; + /* Register triggers to handle replication commits and rollbacks. */ + struct trigger on_commit; + trigger_create(&on_commit, applier_on_commit, applier, NULL); + trigger_add(&replicaset.applier.on_commit, &on_commit); + + struct trigger on_rollback; + trigger_create(&on_rollback, applier_on_rollback, applier, NULL); + trigger_add(&replicaset.applier.on_rollback, &on_rollback); + + auto trigger_guard = make_scoped_guard([&] { + trigger_clear(&on_commit); + trigger_clear(&on_rollback); + }); + + /* * Process a stream of rows from the binary log. */ @@ -756,47 +882,22 @@ applier_subscribe(struct applier *applier) applier_set_state(applier, APPLIER_FOLLOW); } - /* - * Stay 'orphan' until appliers catch up with - * the remote vclock at the time of SUBSCRIBE - * and the lag is less than configured. - */ - if (applier->state == APPLIER_SYNC && - applier->lag <= replication_sync_lag && - vclock_compare(&remote_vclock_at_subscribe, - &replicaset.vclock) <= 0) { - /* Applier is synced, switch to "follow". */ - applier_set_state(applier, APPLIER_FOLLOW); - } - struct stailq rows; applier_read_tx(applier, &rows); - struct xrow_header *first_row = - &stailq_first_entry(&rows, struct applier_tx_row, - next)->row; applier->last_row_time = ev_monotonic_now(loop()); - struct replica *replica = replica_by_id(first_row->replica_id); - struct latch *latch = (replica ? &replica->order_latch : - &replicaset.applier.order_latch); /* - * In a full mesh topology, the same set of changes - * may arrive via two concurrently running appliers. - * Hence we need a latch to strictly order all changes - * that belong to the same server id. + * In case of an heathbeat message wake a writer up and + * check aplier state. */ - latch_lock(latch); - if (vclock_get(&replicaset.vclock, first_row->replica_id) < - first_row->lsn && - applier_apply_tx(&rows) != 0) { - latch_unlock(latch); - diag_raise(); + if (stailq_first_entry(&rows, struct applier_tx_row, + next)->row.lsn == 0) { + fiber_cond_signal(&applier->writer_cond); + // applier_check_state(applier); } - latch_unlock(latch); + else if (applier_apply_tx(&rows) != 0) + diag_raise(); - if (applier->state == APPLIER_SYNC || - applier->state == APPLIER_FOLLOW) - fiber_cond_signal(&applier->writer_cond); if (ibuf_used(ibuf) == 0) ibuf_reset(ibuf); fiber_gc(); @@ -881,6 +982,11 @@ applier_f(va_list ap) return -1; } } catch (FiberIsCancelled *e) { + if (!diag_is_empty(&applier->diag)) { + diag_move(&applier->diag, &fiber()->diag); + applier_disconnect(applier, APPLIER_STOPPED); + break; + } applier_disconnect(applier, APPLIER_OFF); break; } catch (SocketError *e) { @@ -969,6 +1075,7 @@ applier_new(const char *uri) rlist_create(&applier->on_state); fiber_cond_create(&applier->resume_cond); fiber_cond_create(&applier->writer_cond); + diag_create(&applier->diag); return applier; } @@ -981,7 +1088,6 @@ applier_delete(struct applier *applier) assert(applier->io.fd == -1); trigger_destroy(&applier->on_state); fiber_cond_destroy(&applier->resume_cond); - fiber_cond_destroy(&applier->writer_cond); free(applier); } diff --git a/src/box/applier.h b/src/box/applier.h index 5bff90031..b9bb14198 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -114,8 +114,15 @@ struct applier { bool is_paused; /** Condition variable signaled to resume the applier. */ struct fiber_cond resume_cond; + /* Diag to raise an error. */ + struct diag diag; + /* The masters vclock while subscribe. */ + struct vclock remote_vclock_at_subscribe; }; +void +applier_init(); + /** * Start a client to a remote master using a background fiber. * diff --git a/src/box/replication.cc b/src/box/replication.cc index a1a2a9eb3..617b9538f 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -90,6 +90,13 @@ replication_init(void) fiber_cond_create(&replicaset.applier.cond); replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *)); latch_create(&replicaset.applier.order_latch); + + vclock_create(&replicaset.applier.vclock); + vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); + rlist_create(&replicaset.applier.on_rollback); + rlist_create(&replicaset.applier.on_commit); + + diag_create(&replicaset.applier.diag); } void diff --git a/src/box/replication.h b/src/box/replication.h index 8c8a9927e..19f283c7d 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -232,6 +232,17 @@ struct replicaset { * struct replica object). */ struct latch order_latch; + /* + * A vclock of the last transaction wich was read + * by applier and processed by tx. + */ + struct vclock vclock; + /* Trigger to fire when a replication request failed to apply. */ + struct rlist on_rollback; + /* Trigget to fire a replication request commited to a wal. */ + struct rlist on_commit; + /* Shared applier diagnostic area. */ + struct diag diag; } applier; /** Map of all known replica_id's to correspponding replica's. */ struct replica **replica_by_id; -- 2.22.0