[tarantool-patches] [PATCH v5 0/7] Parallel applier
Vladimir Davydov
vdavydov.dev at gmail.com
Tue Jun 25 19:08:44 MSK 2019
The patch set look good to me. I pushed it to master with some very
minor changes primarily regarding coding style (see the diff below).
Note I didn't push the patch fixing tests, because
- replication/transaction.test.lua works even without it
- although replication/sync.test.lua fails occasionally, I don't
believe that your fix is quite correct. I'd rather disable the
test instead for now. BTW we have a ticket to fix it properly:
https://github.com/tarantool/tarantool/issues/4129
diff --git a/src/box/alter.cc b/src/box/alter.cc
index cb5f2b17..e76b9e68 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3576,7 +3576,7 @@ unlock_after_dd(struct trigger *trigger, void *event)
(void) trigger;
(void) event;
/*
- * In case of yielding journal will this trigger be processed
+ * In case of yielding journal this trigger will be processed
* in a context of tx_prio endpoint instead of a context of
* a fiber which has this latch locked. So steal the latch first.
*/
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 9465b071..cf03ea16 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -109,11 +109,12 @@ applier_log_error(struct applier *applier, struct error *e)
applier->last_logged_errcode = errcode;
}
-/*
- * A helper function to track an applier state.
+/**
+ * A helper function which switches the applier to FOLLOW state
+ * if it has synchronized with its master.
*/
static inline void
-applier_check_state(struct applier *applier)
+applier_check_sync(struct applier *applier)
{
/*
* Stay 'orphan' until appliers catch up with
@@ -156,12 +157,14 @@ applier_writer_f(va_list ap)
else
fiber_cond_wait_timeout(&applier->writer_cond,
replication_timeout);
- /* A writer fiber is going to be awaken after a commit or
- * a heartbeat message. So this is a appropriate place to
+ /*
+ * A writer fiber is going to be awaken after a commit or
+ * a heartbeat message. So this is an appropriate place to
* update an applier status because the applier state could
* yield and doesn't fit into a commit trigger.
*/
- applier_check_state(applier);
+ applier_check_sync(applier);
+
/* Send ACKs only when in FOLLOW mode ,*/
if (applier->state != APPLIER_SYNC &&
applier->state != APPLIER_FOLLOW)
@@ -601,7 +604,6 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
next)->row.is_commit);
}
-/* A trigger to control a replicated transaction rollback. */
static void
applier_txn_rollback_cb(struct trigger *trigger, void *event)
{
@@ -609,8 +611,9 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
/* Setup shared applier diagnostic area. */
diag_set(ClientError, ER_WAL_IO);
diag_move(&fiber()->diag, &replicaset.applier.diag);
+ /* Broadcast the rollback event across all appliers. */
trigger_run(&replicaset.applier.on_rollback, event);
- /* Rollback applier vclock to the commited one. */
+ /* Rollback applier vclock to the committed one. */
vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
}
@@ -630,15 +633,20 @@ applier_txn_commit_cb(struct trigger *trigger, void *event)
static int
applier_apply_tx(struct stailq *rows)
{
- struct xrow_header *first_row =
- &stailq_first_entry(rows, struct applier_tx_row,
- next)->row;
+ struct xrow_header *first_row = &stailq_first_entry(rows,
+ struct applier_tx_row, next)->row;
struct replica *replica = replica_by_id(first_row->replica_id);
+ /*
+ * In a full mesh topology, the same set of changes
+ * may arrive via two concurrently running appliers.
+ * Hence we need a latch to strictly order all changes
+ * that belong to the same server id.
+ */
struct latch *latch = (replica ? &replica->order_latch :
&replicaset.applier.order_latch);
latch_lock(latch);
- if (vclock_get(&replicaset.applier.vclock, first_row->replica_id) >=
- first_row->lsn) {
+ if (vclock_get(&replicaset.applier.vclock,
+ first_row->replica_id) >= first_row->lsn) {
latch_unlock(latch);
return 0;
}
@@ -713,11 +721,11 @@ applier_apply_tx(struct stailq *rows)
if (txn_write(txn) < 0)
goto fail;
+
/* Transaction was sent to journal so promote vclock. */
- vclock_follow(&replicaset.applier.vclock, first_row->replica_id,
- first_row->lsn);
+ vclock_follow(&replicaset.applier.vclock,
+ first_row->replica_id, first_row->lsn);
latch_unlock(latch);
-
return 0;
rollback:
txn_rollback(txn);
@@ -747,8 +755,11 @@ applier_on_rollback(struct trigger *trigger, void *event)
(void) event;
struct applier *applier = (struct applier *)trigger->data;
/* Setup a shared error. */
- if (!diag_is_empty(&replicaset.applier.diag))
- diag_add_error(&applier->diag, diag_last_error(&replicaset.applier.diag));
+ if (!diag_is_empty(&replicaset.applier.diag)) {
+ diag_add_error(&applier->diag,
+ diag_last_error(&replicaset.applier.diag));
+ }
+ /* Stop the applier fiber. */
fiber_cancel(applier->reader);
}
@@ -789,9 +800,8 @@ applier_subscribe(struct applier *applier)
* its and master's cluster ids match.
*/
vclock_create(&applier->remote_vclock_at_subscribe);
- xrow_decode_subscribe_response_xc(&row,
- &cluster_id,
- &applier->remote_vclock_at_subscribe);
+ xrow_decode_subscribe_response_xc(&row, &cluster_id,
+ &applier->remote_vclock_at_subscribe);
/*
* If master didn't send us its cluster id
* assume that it has done all the checks.
@@ -869,7 +879,6 @@ applier_subscribe(struct applier *applier)
trigger_clear(&on_rollback);
});
-
/*
* Process a stream of rows from the binary log.
*/
@@ -887,14 +896,12 @@ applier_subscribe(struct applier *applier)
applier->last_row_time = ev_monotonic_now(loop());
/*
- * In case of an heathbeat message wake a writer up and
- * check aplier state.
+ * In case of an heartbeat message wake a writer up
+ * and check applier state.
*/
if (stailq_first_entry(&rows, struct applier_tx_row,
- next)->row.lsn == 0) {
+ next)->row.lsn == 0)
fiber_cond_signal(&applier->writer_cond);
- // applier_check_state(applier);
- }
else if (applier_apply_tx(&rows) != 0)
diag_raise();
@@ -1087,7 +1094,7 @@ applier_delete(struct applier *applier)
ibuf_destroy(&applier->ibuf);
assert(applier->io.fd == -1);
trigger_destroy(&applier->on_state);
- fiber_cond_destroy(&applier->resume_cond);
+ diag_destroy(&applier->diag);
free(applier);
}
diff --git a/src/box/applier.h b/src/box/applier.h
index b9bb1419..b406e6aa 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -116,13 +116,10 @@ struct applier {
struct fiber_cond resume_cond;
/* Diag to raise an error. */
struct diag diag;
- /* The masters vclock while subscribe. */
+ /* Master's vclock at the time of SUBSCRIBE. */
struct vclock remote_vclock_at_subscribe;
};
-void
-applier_init();
-
/**
* Start a client to a remote master using a background fiber.
*
diff --git a/src/box/box.cc b/src/box/box.cc
index f5bd29dd..80249919 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -308,7 +308,7 @@ recovery_journal_write(struct journal *base,
struct recovery_journal *journal = (struct recovery_journal *) base;
entry->res = vclock_sum(journal->vclock);
journal_entry_complete(entry);
- return entry->res;
+ return 0;
}
static inline void
diff --git a/src/box/journal.h b/src/box/journal.h
index cac82c15..236058bb 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -62,7 +62,7 @@ struct journal_entry {
int64_t res;
/**
* A journal entry finalization callback which is going to be called
- * after the entry processing was winished in both cases: succes
+ * after the entry processing was finished in both cases: success
* or fail. Entry->res is set to a result value before the callback
* is fired.
*/
@@ -98,7 +98,7 @@ journal_entry_new(size_t n_rows, struct region *region,
void *on_done_cb_data);
/**
- * Finalize a signle entry.
+ * Finalize a single entry.
*/
static inline void
journal_entry_complete(struct journal_entry *entry)
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 617b9538..28f7aced 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -110,6 +110,7 @@ replication_free(void)
replicaset_foreach(replica)
relay_cancel(replica->relay);
+ diag_destroy(&replicaset.applier.diag);
free(replicaset.replica_by_id);
}
diff --git a/src/box/txn.c b/src/box/txn.c
index f331642f..95076773 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -192,6 +192,7 @@ txn_begin()
txn->n_local_rows = 0;
txn->n_applier_rows = 0;
txn->has_triggers = false;
+ txn->is_done = false;
txn->is_aborted = false;
txn->in_sub_stmt = 0;
txn->id = ++tsn;
@@ -199,9 +200,7 @@ txn_begin()
txn->engine = NULL;
txn->engine_tx = NULL;
txn->psql_txn = NULL;
- txn->entry = NULL;
txn->fiber = NULL;
- txn->done = false;
/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
fiber_set_txn(fiber(), txn);
trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
@@ -344,22 +343,22 @@ fail:
* A helper function to process on_commit/on_rollback triggers.
*/
static inline void
-txn_process_trigger(struct rlist *trigger, struct txn *txn)
+txn_run_triggers(struct txn *txn, struct rlist *trigger)
{
/*
- * Some of triggers require for in_txn variable is set so
- * restore it for time a trigger is in progress.
+ * Some triggers require for in_txn variable to be set so
+ * restore it for the time triggers are in progress.
*/
fiber_set_txn(fiber(), txn);
/* Rollback triggers must not throw. */
if (trigger_run(trigger, txn) != 0) {
/*
* As transaction couldn't handle a trigger error so
- * there is no option except than panic.
+ * there is no option except panic.
*/
diag_log();
unreachable();
- panic("rollback trigger failed");
+ panic("commit/rollback trigger failed");
}
fiber_set_txn(fiber(), NULL);
}
@@ -370,24 +369,24 @@ txn_process_trigger(struct rlist *trigger, struct txn *txn)
static void
txn_complete(struct txn *txn)
{
+ /*
+ * Note, engine can be NULL if transaction contains
+ * IPROTO_NOP statements only.
+ */
if (txn->signature < 0) {
/* Undo the transaction. */
if (txn->engine)
engine_rollback(txn->engine, txn);
if (txn->has_triggers)
- txn_process_trigger(&txn->on_rollback, txn);
-
+ txn_run_triggers(txn, &txn->on_rollback);
} else {
- /* Accept the transaction. */
- /*
- * Engine can be NULL if transaction contains IPROTO_NOP
- * statements only.
- */
+ /* Commit the transaction. */
if (txn->engine != NULL)
engine_commit(txn->engine, txn);
if (txn->has_triggers)
- txn_process_trigger(&txn->on_commit, txn);
- ev_tstamp stop_tm = ev_monotonic_now(loop());
+ txn_run_triggers(txn, &txn->on_commit);
+
+ double stop_tm = ev_monotonic_now(loop());
if (stop_tm - txn->start_tm > too_long_threshold) {
int n_rows = txn->n_new_rows + txn->n_applier_rows;
say_warn_ratelimited("too long WAL write: %d rows at "
@@ -404,7 +403,7 @@ txn_complete(struct txn *txn)
if (txn->fiber == NULL)
txn_free(txn);
else {
- txn->done = true;
+ txn->is_done = true;
if (txn->fiber != fiber())
/* Wake a waiting fiber up. */
fiber_wakeup(txn->fiber);
@@ -414,8 +413,7 @@ txn_complete(struct txn *txn)
static void
txn_entry_done_cb(struct journal_entry *entry, void *data)
{
- struct txn *txn = (struct txn *)data;
- assert(txn->entry == entry);
+ struct txn *txn = data;
txn->signature = entry->res;
txn_complete(txn);
}
@@ -423,22 +421,22 @@ txn_entry_done_cb(struct journal_entry *entry, void *data)
static int64_t
txn_write_to_wal(struct txn *txn)
{
- assert(txn->entry == NULL);
assert(txn->n_new_rows + txn->n_applier_rows > 0);
/* Prepare a journal entry. */
- txn->entry = journal_entry_new(txn->n_new_rows +
- txn->n_applier_rows,
- &txn->region,
- txn_entry_done_cb, txn);
- if (txn->entry == NULL) {
+ struct journal_entry *req = journal_entry_new(txn->n_new_rows +
+ txn->n_applier_rows,
+ &txn->region,
+ txn_entry_done_cb,
+ txn);
+ if (req == NULL) {
txn_rollback(txn);
return -1;
}
struct txn_stmt *stmt;
- struct xrow_header **remote_row = txn->entry->rows;
- struct xrow_header **local_row = txn->entry->rows + txn->n_applier_rows;
+ struct xrow_header **remote_row = req->rows;
+ struct xrow_header **local_row = req->rows + txn->n_applier_rows;
stailq_foreach_entry(stmt, &txn->stmts, next) {
if (stmt->row == NULL)
continue; /* A read (e.g. select) request */
@@ -446,13 +444,13 @@ txn_write_to_wal(struct txn *txn)
*local_row++ = stmt->row;
else
*remote_row++ = stmt->row;
- txn->entry->approx_len += xrow_approx_len(stmt->row);
+ req->approx_len += xrow_approx_len(stmt->row);
}
- assert(remote_row == txn->entry->rows + txn->n_applier_rows);
+ assert(remote_row == req->rows + txn->n_applier_rows);
assert(local_row == remote_row + txn->n_new_rows);
- /* Send entry to a journal. */
- if (journal_write(txn->entry) < 0) {
+ /* Send the entry to the journal. */
+ if (journal_write(req) < 0) {
diag_set(ClientError, ER_WAL_IO);
diag_log();
return -1;
@@ -483,17 +481,13 @@ txn_prepare(struct txn *txn)
* we have a bunch of IPROTO_NOP statements.
*/
if (txn->engine != NULL) {
- if (engine_prepare(txn->engine, txn) != 0) {
+ if (engine_prepare(txn->engine, txn) != 0)
return -1;
- }
}
trigger_clear(&txn->fiber_on_stop);
return 0;
}
-/*
- * Send a transaction to a journal.
- */
int
txn_write(struct txn *txn)
{
@@ -503,9 +497,9 @@ txn_write(struct txn *txn)
}
/*
- * After this transaction could not be used more
- * so reset corresponding key in a fiber storage.
- */
+ * After this point the transaction must not be used
+ * so reset the corresponding key in the fiber storage.
+ */
fiber_set_txn(fiber(), NULL);
txn->start_tm = ev_monotonic_now(loop());
if (txn->n_new_rows + txn->n_applier_rows == 0) {
@@ -514,16 +508,7 @@ txn_write(struct txn *txn)
txn_complete(txn);
return 0;
}
-
- if (txn_write_to_wal(txn) != 0) {
- /*
- * After journal write the transaction would be finalized
- * with its journal entry finalization callback,
- * just return an error.
- */
- return -1;
- }
- return 0;
+ return txn_write_to_wal(txn);
}
int
@@ -537,15 +522,16 @@ txn_commit(struct txn *txn)
* In case of non-yielding journal the transaction could already
* be done and there is nothing to wait in such cases.
*/
- if (!txn->done) {
+ if (!txn->is_done) {
bool cancellable = fiber_set_cancellable(false);
fiber_yield();
fiber_set_cancellable(cancellable);
}
- int res = txn->signature >= 0? 0: -1;
+ int res = txn->signature >= 0 ? 0 : -1;
if (res != 0)
diag_set(ClientError, ER_WAL_IO);
- /* As the current fiber is waiting for the transaction so free it. */
+
+ /* Synchronous transactions are freed by the calling fiber. */
txn_free(txn);
return res;
}
diff --git a/src/box/txn.h b/src/box/txn.h
index ddcac3bb..a19becce 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -162,6 +162,8 @@ struct txn {
* already assigned LSN.
*/
int n_applier_rows;
+ /* True when transaction is processed. */
+ bool is_done;
/**
* True if the transaction was aborted so should be
* rolled back at commit.
@@ -182,6 +184,10 @@ struct txn {
struct engine *engine;
/** Engine-specific transaction data */
void *engine_tx;
+ /* A fiber to wake up when transaction is finished. */
+ struct fiber *fiber;
+ /** Timestampt of entry write start. */
+ double start_tm;
/**
* Triggers on fiber yield to abort transaction for
* for in-memory engine.
@@ -195,16 +201,6 @@ struct txn {
/** Commit and rollback triggers */
struct rlist on_commit, on_rollback;
struct sql_txn *psql_txn;
- /** Journal entry to control txn write. */
- struct journal_entry *entry;
- /** Transaction completion trigger. */
- struct trigger entry_done;
- /** Timestampt of entry write start. */
- ev_tstamp start_tm;
- /* A fiber to wake up when transaction is finished. */
- struct fiber *fiber;
- /* True when transaction is processed. */
- bool done;
};
/* Pointer to the current transaction (if any) */
@@ -238,12 +234,21 @@ txn_commit(struct txn *txn);
void
txn_rollback(struct txn *txn);
+/**
+ * Submit a transaction to the journal.
+ * @pre txn == in_txn()
+ *
+ * On success 0 is returned, and the transaction will be freed upon
+ * journal write completion. Note, the journal write may still fail.
+ * To track transaction status, one is supposed to use on_commit and
+ * on_rollback triggers.
+ *
+ * On failure -1 is returned and the transaction is rolled back and
+ * freed.
+ */
int
txn_write(struct txn *txn);
-int
-txn_wait(struct txn *txn);
-
/**
* Roll back the transaction but keep the object around.
* A special case for memtx transaction abort on yield. In this
diff --git a/src/box/wal.c b/src/box/wal.c
index dce5fee6..6f5d0a58 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -245,17 +245,19 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
return xlog_tx_commit(l);
}
+/**
+ * Invoke completion callbacks of journal entries to be
+ * completed. Callbacks are invoked in strict fifo order:
+ * this ensures that, in case of rollback, requests are
+ * rolled back in strict reverse order, producing
+ * a consistent database state.
+ */
static void
tx_schedule_queue(struct stailq *queue)
{
- /*
- * fiber_wakeup() is faster than fiber_call() when there
- * are many ready fibers.
- */
struct journal_entry *req, *tmp;
- stailq_foreach_entry_safe(req, tmp, queue, fifo) {
+ stailq_foreach_entry_safe(req, tmp, queue, fifo)
journal_entry_complete(req);
- }
}
/**
@@ -1189,7 +1191,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
vclock_copy(&replicaset.vclock, &writer->vclock);
entry->res = vclock_sum(&writer->vclock);
journal_entry_complete(entry);
- return entry->res;
+ return 0;
}
void
More information about the Tarantool-patches
mailing list