[tarantool-patches] Re: [PATCH v4 8/9] applier: apply transaction in parallel
Георгий Кириченко
georgy at tarantool.org
Thu Jun 20 10:41:10 MSK 2019
I'm sorry, there is proper version of the commit:
Applier use asynchronous transaction to batch journal writes. All
appliers share the replicaset.applier.tx_vclock which means the vclock
applied but not necessarily written to a journal. Appliers use a trigger
to coordinate in case of failure - when a transaction is going to
be rolled back. Also an applier writer condition is shared across all
appliers and signaled in case of commit or hearth beat message.
Closes: #1254
---
src/box/applier.cc | 156 +++++++++++++++++++++++++++++------------
src/box/applier.h | 9 ++-
src/box/replication.cc | 7 ++
src/box/replication.h | 14 ++++
4 files changed, 138 insertions(+), 48 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 5a92f6109..fee49d8ca 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -50,6 +50,7 @@
#include "schema.h"
#include "txn.h"
#include "box.h"
+#include "scoped_guard.h"
STRS(applier_state, applier_STATE);
@@ -130,11 +131,24 @@ applier_writer_f(va_list ap)
* replication_timeout seconds any more.
*/
if (applier->version_id >= version_id(1, 7, 7))
- fiber_cond_wait_timeout(&applier->writer_cond,
+ fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
TIMEOUT_INFINITY);
else
- fiber_cond_wait_timeout(&applier->writer_cond,
+ fiber_cond_wait_timeout(&replicaset.applier.commit_cond,
replication_timeout);
+ /*
+ * Stay 'orphan' until appliers catch up with
+ * the remote vclock at the time of SUBSCRIBE
+ * and the lag is less than configured.
+ */
+ if (applier->state == APPLIER_SYNC &&
+ applier->lag <= replication_sync_lag &&
+ vclock_compare(&applier->remote_vclock_at_subscribe,
+ &replicaset.vclock) <= 0) {
+ /* Applier is synced, switch to "follow". */
+ applier_set_state(applier, APPLIER_FOLLOW);
+ }
+
/* Send ACKs only when in FOLLOW mode ,*/
if (applier->state != APPLIER_SYNC &&
applier->state != APPLIER_FOLLOW)
@@ -565,6 +579,36 @@ applier_read_tx(struct applier *applier, struct stailq
*rows)
next)->row.is_commit);
}
+static void
+sequencer_rollback_cb(struct trigger *trigger, void *event)
+{
+ (void) trigger;
+ (void) event;
+ diag_set(ClientError, ER_WAL_IO);
+ diag_move(&fiber()->diag, &replicaset.applier.diag);
+ trigger_run(&replicaset.applier.on_replication_fail, NULL);
+ vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock);
+}
+
+static void
+sequencer_commit_cb(struct trigger *trigger, void *event)
+{
+ (void) trigger;
+ (void) event;
+ fiber_cond_broadcast(&replicaset.applier.commit_cond);
+}
+
+static void
+applier_on_fail(struct trigger *trigger, void *event)
+{
+ (void) event;
+ struct applier *applier = (struct applier *)trigger->data;
+ if (!diag_is_empty(&replicaset.applier.diag))
+ diag_add_error(&applier->diag,
diag_last_error(&replicaset.applier.diag));
+ fiber_cancel(applier->reader);
+
+}
+
/**
* Apply all rows in the rows queue as a single transaction.
*
@@ -573,6 +617,22 @@ applier_read_tx(struct applier *applier, struct stailq
*rows)
static int
applier_apply_tx(struct stailq *rows)
{
+ struct xrow_header *first_row =
+ &stailq_first_entry(rows, struct applier_tx_row,
+ next)->row;
+ struct replica *replica = replica_by_id(first_row->replica_id);
+ struct latch *latch = (replica ? &replica->order_latch :
+ &replicaset.applier.order_latch);
+ latch_lock(latch);
+ if (vclock_get(&replicaset.applier.net_vclock, first_row->replica_id) >=
+ first_row->lsn) {
+ /* Check there is a heathbeat message and wake a writers up. */
+ if (first_row->lsn == 0)
+ fiber_cond_broadcast(&replicaset.applier.commit_cond);
+ latch_unlock(latch);
+ return 0;
+ }
+
/**
* Explicitly begin the transaction so that we can
* control fiber->gc life cycle and, in case of apply
@@ -581,8 +641,10 @@ applier_apply_tx(struct stailq *rows)
*/
struct txn *txn = txn_begin();
struct applier_tx_row *item;
- if (txn == NULL)
- diag_raise();
+ if (txn == NULL) {
+ latch_unlock(latch);
+ return -1;
+ }
stailq_foreach_entry(item, rows, next) {
struct xrow_header *row = &item->row;
int res = apply_row(row);
@@ -623,10 +685,34 @@ applier_apply_tx(struct stailq *rows)
"Replication", "distributed transactions");
goto rollback;
}
- return txn_commit(txn);
+ /* We are ready to submit txn to wal. */
+ struct trigger *on_rollback, *on_commit;
+ on_rollback = (struct trigger *)region_alloc(&txn->region,
+ sizeof(struct trigger));
+ on_commit = (struct trigger *)region_alloc(&txn->region,
+ sizeof(struct trigger));
+ if (on_rollback == NULL || on_commit == NULL)
+ goto rollback;
+
+ trigger_create(on_rollback, sequencer_rollback_cb, NULL, NULL);
+ txn_on_rollback(txn, on_rollback);
+
+ trigger_create(on_commit, sequencer_commit_cb, NULL, NULL);
+ txn_on_commit(txn, on_commit);
+
+ if (txn_write(txn) < 0)
+ goto fail;
+ /* Transaction was sent to journal so promote vclock. */
+ vclock_follow(&replicaset.applier.net_vclock, first_row->replica_id,
+ first_row->lsn);
+ latch_unlock(latch);
+
+ return 0;
rollback:
txn_rollback(txn);
+fail:
+ latch_unlock(latch);
fiber_gc();
return -1;
}
@@ -641,7 +727,6 @@ applier_subscribe(struct applier *applier)
struct ev_io *coio = &applier->io;
struct ibuf *ibuf = &applier->ibuf;
struct xrow_header row;
- struct vclock remote_vclock_at_subscribe;
struct tt_uuid cluster_id = uuid_nil;
struct vclock vclock;
@@ -668,10 +753,10 @@ applier_subscribe(struct applier *applier)
* the replica, and replica has to check whether
* its and master's cluster ids match.
*/
- vclock_create(&remote_vclock_at_subscribe);
+ vclock_create(&applier->remote_vclock_at_subscribe);
xrow_decode_subscribe_response_xc(&row,
&cluster_id,
- &remote_vclock_at_subscribe);
+ &applier-
>remote_vclock_at_subscribe);
/*
* If master didn't send us its cluster id
* assume that it has done all the checks.
@@ -686,7 +771,7 @@ applier_subscribe(struct applier *applier)
say_info("subscribed");
say_info("remote vclock %s local vclock %s",
- vclock_to_string(&remote_vclock_at_subscribe),
+ vclock_to_string(&applier->remote_vclock_at_subscribe),
vclock_to_string(&vclock));
}
/*
@@ -735,6 +820,15 @@ applier_subscribe(struct applier *applier)
applier->lag = TIMEOUT_INFINITY;
+ /* Register a trigger to handle replication failures. */
+ struct trigger on_fail;
+ trigger_create(&on_fail, applier_on_fail, applier, NULL);
+ trigger_add(&replicaset.applier.on_replication_fail, &on_fail);
+ auto trigger_guard = make_scoped_guard([&] {
+ trigger_clear(&on_fail);
+ });
+
+
/*
* Process a stream of rows from the binary log.
*/
@@ -747,47 +841,13 @@ applier_subscribe(struct applier *applier)
applier_set_state(applier, APPLIER_FOLLOW);
}
- /*
- * Stay 'orphan' until appliers catch up with
- * the remote vclock at the time of SUBSCRIBE
- * and the lag is less than configured.
- */
- if (applier->state == APPLIER_SYNC &&
- applier->lag <= replication_sync_lag &&
- vclock_compare(&remote_vclock_at_subscribe,
- &replicaset.vclock) <= 0) {
- /* Applier is synced, switch to "follow". */
- applier_set_state(applier, APPLIER_FOLLOW);
- }
-
struct stailq rows;
applier_read_tx(applier, &rows);
- struct xrow_header *first_row =
- &stailq_first_entry(&rows, struct applier_tx_row,
- next)->row;
applier->last_row_time = ev_monotonic_now(loop());
- struct replica *replica = replica_by_id(first_row->replica_id);
- struct latch *latch = (replica ? &replica->order_latch :
- &replicaset.applier.order_latch);
- /*
- * In a full mesh topology, the same set of changes
- * may arrive via two concurrently running appliers.
- * Hence we need a latch to strictly order all changes
- * that belong to the same server id.
- */
- latch_lock(latch);
- if (vclock_get(&replicaset.vclock, first_row->replica_id) <
- first_row->lsn &&
- applier_apply_tx(&rows) != 0) {
- latch_unlock(latch);
+ if (applier_apply_tx(&rows) != 0)
diag_raise();
- }
- latch_unlock(latch);
- if (applier->state == APPLIER_SYNC ||
- applier->state == APPLIER_FOLLOW)
- fiber_cond_signal(&applier->writer_cond);
if (ibuf_used(ibuf) == 0)
ibuf_reset(ibuf);
fiber_gc();
@@ -872,6 +932,11 @@ applier_f(va_list ap)
return -1;
}
} catch (FiberIsCancelled *e) {
+ if (!diag_is_empty(&applier->diag)) {
+ diag_move(&applier->diag, &fiber()->diag);
+ applier_disconnect(applier, APPLIER_STOPPED);
+ break;
+ }
applier_disconnect(applier, APPLIER_OFF);
break;
} catch (SocketError *e) {
@@ -959,7 +1024,7 @@ applier_new(const char *uri)
applier->last_row_time = ev_monotonic_now(loop());
rlist_create(&applier->on_state);
fiber_cond_create(&applier->resume_cond);
- fiber_cond_create(&applier->writer_cond);
+ diag_create(&applier->diag);
return applier;
}
@@ -972,7 +1037,6 @@ applier_delete(struct applier *applier)
assert(applier->io.fd == -1);
trigger_destroy(&applier->on_state);
fiber_cond_destroy(&applier->resume_cond);
- fiber_cond_destroy(&applier->writer_cond);
free(applier);
}
diff --git a/src/box/applier.h b/src/box/applier.h
index 5bff90031..716da32e2 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -74,8 +74,6 @@ struct applier {
struct fiber *reader;
/** Background fiber to reply with vclock */
struct fiber *writer;
- /** Writer cond. */
- struct fiber_cond writer_cond;
/** Finite-state machine */
enum applier_state state;
/** Local time of this replica when the last row has been received */
@@ -114,8 +112,15 @@ struct applier {
bool is_paused;
/** Condition variable signaled to resume the applier. */
struct fiber_cond resume_cond;
+ /* Diag to raise an error. */
+ struct diag diag;
+ /* The masters vclock while subscribe. */
+ struct vclock remote_vclock_at_subscribe;
};
+void
+applier_init();
+
/**
* Start a client to a remote master using a background fiber.
*
diff --git a/src/box/replication.cc b/src/box/replication.cc
index a1a2a9eb3..fd4d4e387 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -90,6 +90,13 @@ replication_init(void)
fiber_cond_create(&replicaset.applier.cond);
replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX,
sizeof(struct replica *));
latch_create(&replicaset.applier.order_latch);
+
+ vclock_create(&replicaset.applier.net_vclock);
+ vclock_copy(&replicaset.applier.net_vclock, &replicaset.vclock);
+ rlist_create(&replicaset.applier.on_replication_fail);
+
+ fiber_cond_create(&replicaset.applier.commit_cond);
+ diag_create(&replicaset.applier.diag);
}
void
diff --git a/src/box/replication.h b/src/box/replication.h
index 8c8a9927e..a4830f5b5 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -232,6 +232,20 @@ struct replicaset {
* struct replica object).
*/
struct latch order_latch;
+ /*
+ * A vclock of the last transaction wich was read
+ * from an applier connection.
+ */
+ struct vclock net_vclock;
+ /* Signaled on replicated transaction commit. */
+ struct fiber_cond commit_cond;
+ /*
+ * Trigger to fire when replication stops in case
+ * of an error.
+ */
+ struct rlist on_replication_fail;
+ /* Diag to populate an error acros all appliers. */
+ struct diag diag;
} applier;
/** Map of all known replica_id's to correspponding replica's. */
struct replica **replica_by_id;
--
2.22.0
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: This is a digitally signed message part.
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20190620/be5a4a20/attachment.sig>
More information about the Tarantool-patches
mailing list