From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Tue, 25 Jun 2019 19:08:44 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] [PATCH v5 0/7] Parallel applier Message-ID: <20190625160844.zrslwrgbc2jto5nl@esperanza> References: MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: To: Georgy Kirichenko Cc: tarantool-patches@freelists.org List-ID: The patch set look good to me. I pushed it to master with some very minor changes primarily regarding coding style (see the diff below). Note I didn't push the patch fixing tests, because - replication/transaction.test.lua works even without it - although replication/sync.test.lua fails occasionally, I don't believe that your fix is quite correct. I'd rather disable the test instead for now. BTW we have a ticket to fix it properly: https://github.com/tarantool/tarantool/issues/4129 diff --git a/src/box/alter.cc b/src/box/alter.cc index cb5f2b17..e76b9e68 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -3576,7 +3576,7 @@ unlock_after_dd(struct trigger *trigger, void *event) (void) trigger; (void) event; /* - * In case of yielding journal will this trigger be processed + * In case of yielding journal this trigger will be processed * in a context of tx_prio endpoint instead of a context of * a fiber which has this latch locked. So steal the latch first. */ diff --git a/src/box/applier.cc b/src/box/applier.cc index 9465b071..cf03ea16 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -109,11 +109,12 @@ applier_log_error(struct applier *applier, struct error *e) applier->last_logged_errcode = errcode; } -/* - * A helper function to track an applier state. +/** + * A helper function which switches the applier to FOLLOW state + * if it has synchronized with its master. */ static inline void -applier_check_state(struct applier *applier) +applier_check_sync(struct applier *applier) { /* * Stay 'orphan' until appliers catch up with @@ -156,12 +157,14 @@ applier_writer_f(va_list ap) else fiber_cond_wait_timeout(&applier->writer_cond, replication_timeout); - /* A writer fiber is going to be awaken after a commit or - * a heartbeat message. So this is a appropriate place to + /* + * A writer fiber is going to be awaken after a commit or + * a heartbeat message. So this is an appropriate place to * update an applier status because the applier state could * yield and doesn't fit into a commit trigger. */ - applier_check_state(applier); + applier_check_sync(applier); + /* Send ACKs only when in FOLLOW mode ,*/ if (applier->state != APPLIER_SYNC && applier->state != APPLIER_FOLLOW) @@ -601,7 +604,6 @@ applier_read_tx(struct applier *applier, struct stailq *rows) next)->row.is_commit); } -/* A trigger to control a replicated transaction rollback. */ static void applier_txn_rollback_cb(struct trigger *trigger, void *event) { @@ -609,8 +611,9 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event) /* Setup shared applier diagnostic area. */ diag_set(ClientError, ER_WAL_IO); diag_move(&fiber()->diag, &replicaset.applier.diag); + /* Broadcast the rollback event across all appliers. */ trigger_run(&replicaset.applier.on_rollback, event); - /* Rollback applier vclock to the commited one. */ + /* Rollback applier vclock to the committed one. */ vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); } @@ -630,15 +633,20 @@ applier_txn_commit_cb(struct trigger *trigger, void *event) static int applier_apply_tx(struct stailq *rows) { - struct xrow_header *first_row = - &stailq_first_entry(rows, struct applier_tx_row, - next)->row; + struct xrow_header *first_row = &stailq_first_entry(rows, + struct applier_tx_row, next)->row; struct replica *replica = replica_by_id(first_row->replica_id); + /* + * In a full mesh topology, the same set of changes + * may arrive via two concurrently running appliers. + * Hence we need a latch to strictly order all changes + * that belong to the same server id. + */ struct latch *latch = (replica ? &replica->order_latch : &replicaset.applier.order_latch); latch_lock(latch); - if (vclock_get(&replicaset.applier.vclock, first_row->replica_id) >= - first_row->lsn) { + if (vclock_get(&replicaset.applier.vclock, + first_row->replica_id) >= first_row->lsn) { latch_unlock(latch); return 0; } @@ -713,11 +721,11 @@ applier_apply_tx(struct stailq *rows) if (txn_write(txn) < 0) goto fail; + /* Transaction was sent to journal so promote vclock. */ - vclock_follow(&replicaset.applier.vclock, first_row->replica_id, - first_row->lsn); + vclock_follow(&replicaset.applier.vclock, + first_row->replica_id, first_row->lsn); latch_unlock(latch); - return 0; rollback: txn_rollback(txn); @@ -747,8 +755,11 @@ applier_on_rollback(struct trigger *trigger, void *event) (void) event; struct applier *applier = (struct applier *)trigger->data; /* Setup a shared error. */ - if (!diag_is_empty(&replicaset.applier.diag)) - diag_add_error(&applier->diag, diag_last_error(&replicaset.applier.diag)); + if (!diag_is_empty(&replicaset.applier.diag)) { + diag_add_error(&applier->diag, + diag_last_error(&replicaset.applier.diag)); + } + /* Stop the applier fiber. */ fiber_cancel(applier->reader); } @@ -789,9 +800,8 @@ applier_subscribe(struct applier *applier) * its and master's cluster ids match. */ vclock_create(&applier->remote_vclock_at_subscribe); - xrow_decode_subscribe_response_xc(&row, - &cluster_id, - &applier->remote_vclock_at_subscribe); + xrow_decode_subscribe_response_xc(&row, &cluster_id, + &applier->remote_vclock_at_subscribe); /* * If master didn't send us its cluster id * assume that it has done all the checks. @@ -869,7 +879,6 @@ applier_subscribe(struct applier *applier) trigger_clear(&on_rollback); }); - /* * Process a stream of rows from the binary log. */ @@ -887,14 +896,12 @@ applier_subscribe(struct applier *applier) applier->last_row_time = ev_monotonic_now(loop()); /* - * In case of an heathbeat message wake a writer up and - * check aplier state. + * In case of an heartbeat message wake a writer up + * and check applier state. */ if (stailq_first_entry(&rows, struct applier_tx_row, - next)->row.lsn == 0) { + next)->row.lsn == 0) fiber_cond_signal(&applier->writer_cond); - // applier_check_state(applier); - } else if (applier_apply_tx(&rows) != 0) diag_raise(); @@ -1087,7 +1094,7 @@ applier_delete(struct applier *applier) ibuf_destroy(&applier->ibuf); assert(applier->io.fd == -1); trigger_destroy(&applier->on_state); - fiber_cond_destroy(&applier->resume_cond); + diag_destroy(&applier->diag); free(applier); } diff --git a/src/box/applier.h b/src/box/applier.h index b9bb1419..b406e6aa 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -116,13 +116,10 @@ struct applier { struct fiber_cond resume_cond; /* Diag to raise an error. */ struct diag diag; - /* The masters vclock while subscribe. */ + /* Master's vclock at the time of SUBSCRIBE. */ struct vclock remote_vclock_at_subscribe; }; -void -applier_init(); - /** * Start a client to a remote master using a background fiber. * diff --git a/src/box/box.cc b/src/box/box.cc index f5bd29dd..80249919 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -308,7 +308,7 @@ recovery_journal_write(struct journal *base, struct recovery_journal *journal = (struct recovery_journal *) base; entry->res = vclock_sum(journal->vclock); journal_entry_complete(entry); - return entry->res; + return 0; } static inline void diff --git a/src/box/journal.h b/src/box/journal.h index cac82c15..236058bb 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -62,7 +62,7 @@ struct journal_entry { int64_t res; /** * A journal entry finalization callback which is going to be called - * after the entry processing was winished in both cases: succes + * after the entry processing was finished in both cases: success * or fail. Entry->res is set to a result value before the callback * is fired. */ @@ -98,7 +98,7 @@ journal_entry_new(size_t n_rows, struct region *region, void *on_done_cb_data); /** - * Finalize a signle entry. + * Finalize a single entry. */ static inline void journal_entry_complete(struct journal_entry *entry) diff --git a/src/box/replication.cc b/src/box/replication.cc index 617b9538..28f7aced 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -110,6 +110,7 @@ replication_free(void) replicaset_foreach(replica) relay_cancel(replica->relay); + diag_destroy(&replicaset.applier.diag); free(replicaset.replica_by_id); } diff --git a/src/box/txn.c b/src/box/txn.c index f331642f..95076773 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -192,6 +192,7 @@ txn_begin() txn->n_local_rows = 0; txn->n_applier_rows = 0; txn->has_triggers = false; + txn->is_done = false; txn->is_aborted = false; txn->in_sub_stmt = 0; txn->id = ++tsn; @@ -199,9 +200,7 @@ txn_begin() txn->engine = NULL; txn->engine_tx = NULL; txn->psql_txn = NULL; - txn->entry = NULL; txn->fiber = NULL; - txn->done = false; /* fiber_on_yield/fiber_on_stop initialized by engine on demand */ fiber_set_txn(fiber(), txn); trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL); @@ -344,22 +343,22 @@ fail: * A helper function to process on_commit/on_rollback triggers. */ static inline void -txn_process_trigger(struct rlist *trigger, struct txn *txn) +txn_run_triggers(struct txn *txn, struct rlist *trigger) { /* - * Some of triggers require for in_txn variable is set so - * restore it for time a trigger is in progress. + * Some triggers require for in_txn variable to be set so + * restore it for the time triggers are in progress. */ fiber_set_txn(fiber(), txn); /* Rollback triggers must not throw. */ if (trigger_run(trigger, txn) != 0) { /* * As transaction couldn't handle a trigger error so - * there is no option except than panic. + * there is no option except panic. */ diag_log(); unreachable(); - panic("rollback trigger failed"); + panic("commit/rollback trigger failed"); } fiber_set_txn(fiber(), NULL); } @@ -370,24 +369,24 @@ txn_process_trigger(struct rlist *trigger, struct txn *txn) static void txn_complete(struct txn *txn) { + /* + * Note, engine can be NULL if transaction contains + * IPROTO_NOP statements only. + */ if (txn->signature < 0) { /* Undo the transaction. */ if (txn->engine) engine_rollback(txn->engine, txn); if (txn->has_triggers) - txn_process_trigger(&txn->on_rollback, txn); - + txn_run_triggers(txn, &txn->on_rollback); } else { - /* Accept the transaction. */ - /* - * Engine can be NULL if transaction contains IPROTO_NOP - * statements only. - */ + /* Commit the transaction. */ if (txn->engine != NULL) engine_commit(txn->engine, txn); if (txn->has_triggers) - txn_process_trigger(&txn->on_commit, txn); - ev_tstamp stop_tm = ev_monotonic_now(loop()); + txn_run_triggers(txn, &txn->on_commit); + + double stop_tm = ev_monotonic_now(loop()); if (stop_tm - txn->start_tm > too_long_threshold) { int n_rows = txn->n_new_rows + txn->n_applier_rows; say_warn_ratelimited("too long WAL write: %d rows at " @@ -404,7 +403,7 @@ txn_complete(struct txn *txn) if (txn->fiber == NULL) txn_free(txn); else { - txn->done = true; + txn->is_done = true; if (txn->fiber != fiber()) /* Wake a waiting fiber up. */ fiber_wakeup(txn->fiber); @@ -414,8 +413,7 @@ txn_complete(struct txn *txn) static void txn_entry_done_cb(struct journal_entry *entry, void *data) { - struct txn *txn = (struct txn *)data; - assert(txn->entry == entry); + struct txn *txn = data; txn->signature = entry->res; txn_complete(txn); } @@ -423,22 +421,22 @@ txn_entry_done_cb(struct journal_entry *entry, void *data) static int64_t txn_write_to_wal(struct txn *txn) { - assert(txn->entry == NULL); assert(txn->n_new_rows + txn->n_applier_rows > 0); /* Prepare a journal entry. */ - txn->entry = journal_entry_new(txn->n_new_rows + - txn->n_applier_rows, - &txn->region, - txn_entry_done_cb, txn); - if (txn->entry == NULL) { + struct journal_entry *req = journal_entry_new(txn->n_new_rows + + txn->n_applier_rows, + &txn->region, + txn_entry_done_cb, + txn); + if (req == NULL) { txn_rollback(txn); return -1; } struct txn_stmt *stmt; - struct xrow_header **remote_row = txn->entry->rows; - struct xrow_header **local_row = txn->entry->rows + txn->n_applier_rows; + struct xrow_header **remote_row = req->rows; + struct xrow_header **local_row = req->rows + txn->n_applier_rows; stailq_foreach_entry(stmt, &txn->stmts, next) { if (stmt->row == NULL) continue; /* A read (e.g. select) request */ @@ -446,13 +444,13 @@ txn_write_to_wal(struct txn *txn) *local_row++ = stmt->row; else *remote_row++ = stmt->row; - txn->entry->approx_len += xrow_approx_len(stmt->row); + req->approx_len += xrow_approx_len(stmt->row); } - assert(remote_row == txn->entry->rows + txn->n_applier_rows); + assert(remote_row == req->rows + txn->n_applier_rows); assert(local_row == remote_row + txn->n_new_rows); - /* Send entry to a journal. */ - if (journal_write(txn->entry) < 0) { + /* Send the entry to the journal. */ + if (journal_write(req) < 0) { diag_set(ClientError, ER_WAL_IO); diag_log(); return -1; @@ -483,17 +481,13 @@ txn_prepare(struct txn *txn) * we have a bunch of IPROTO_NOP statements. */ if (txn->engine != NULL) { - if (engine_prepare(txn->engine, txn) != 0) { + if (engine_prepare(txn->engine, txn) != 0) return -1; - } } trigger_clear(&txn->fiber_on_stop); return 0; } -/* - * Send a transaction to a journal. - */ int txn_write(struct txn *txn) { @@ -503,9 +497,9 @@ txn_write(struct txn *txn) } /* - * After this transaction could not be used more - * so reset corresponding key in a fiber storage. - */ + * After this point the transaction must not be used + * so reset the corresponding key in the fiber storage. + */ fiber_set_txn(fiber(), NULL); txn->start_tm = ev_monotonic_now(loop()); if (txn->n_new_rows + txn->n_applier_rows == 0) { @@ -514,16 +508,7 @@ txn_write(struct txn *txn) txn_complete(txn); return 0; } - - if (txn_write_to_wal(txn) != 0) { - /* - * After journal write the transaction would be finalized - * with its journal entry finalization callback, - * just return an error. - */ - return -1; - } - return 0; + return txn_write_to_wal(txn); } int @@ -537,15 +522,16 @@ txn_commit(struct txn *txn) * In case of non-yielding journal the transaction could already * be done and there is nothing to wait in such cases. */ - if (!txn->done) { + if (!txn->is_done) { bool cancellable = fiber_set_cancellable(false); fiber_yield(); fiber_set_cancellable(cancellable); } - int res = txn->signature >= 0? 0: -1; + int res = txn->signature >= 0 ? 0 : -1; if (res != 0) diag_set(ClientError, ER_WAL_IO); - /* As the current fiber is waiting for the transaction so free it. */ + + /* Synchronous transactions are freed by the calling fiber. */ txn_free(txn); return res; } diff --git a/src/box/txn.h b/src/box/txn.h index ddcac3bb..a19becce 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -162,6 +162,8 @@ struct txn { * already assigned LSN. */ int n_applier_rows; + /* True when transaction is processed. */ + bool is_done; /** * True if the transaction was aborted so should be * rolled back at commit. @@ -182,6 +184,10 @@ struct txn { struct engine *engine; /** Engine-specific transaction data */ void *engine_tx; + /* A fiber to wake up when transaction is finished. */ + struct fiber *fiber; + /** Timestampt of entry write start. */ + double start_tm; /** * Triggers on fiber yield to abort transaction for * for in-memory engine. @@ -195,16 +201,6 @@ struct txn { /** Commit and rollback triggers */ struct rlist on_commit, on_rollback; struct sql_txn *psql_txn; - /** Journal entry to control txn write. */ - struct journal_entry *entry; - /** Transaction completion trigger. */ - struct trigger entry_done; - /** Timestampt of entry write start. */ - ev_tstamp start_tm; - /* A fiber to wake up when transaction is finished. */ - struct fiber *fiber; - /* True when transaction is processed. */ - bool done; }; /* Pointer to the current transaction (if any) */ @@ -238,12 +234,21 @@ txn_commit(struct txn *txn); void txn_rollback(struct txn *txn); +/** + * Submit a transaction to the journal. + * @pre txn == in_txn() + * + * On success 0 is returned, and the transaction will be freed upon + * journal write completion. Note, the journal write may still fail. + * To track transaction status, one is supposed to use on_commit and + * on_rollback triggers. + * + * On failure -1 is returned and the transaction is rolled back and + * freed. + */ int txn_write(struct txn *txn); -int -txn_wait(struct txn *txn); - /** * Roll back the transaction but keep the object around. * A special case for memtx transaction abort on yield. In this diff --git a/src/box/wal.c b/src/box/wal.c index dce5fee6..6f5d0a58 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -245,17 +245,19 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry) return xlog_tx_commit(l); } +/** + * Invoke completion callbacks of journal entries to be + * completed. Callbacks are invoked in strict fifo order: + * this ensures that, in case of rollback, requests are + * rolled back in strict reverse order, producing + * a consistent database state. + */ static void tx_schedule_queue(struct stailq *queue) { - /* - * fiber_wakeup() is faster than fiber_call() when there - * are many ready fibers. - */ struct journal_entry *req, *tmp; - stailq_foreach_entry_safe(req, tmp, queue, fifo) { + stailq_foreach_entry_safe(req, tmp, queue, fifo) journal_entry_complete(req); - } } /** @@ -1189,7 +1191,7 @@ wal_write_in_wal_mode_none(struct journal *journal, vclock_copy(&replicaset.vclock, &writer->vclock); entry->res = vclock_sum(&writer->vclock); journal_entry_complete(entry); - return entry->res; + return 0; } void