[tarantool-patches] [PATCH v2 3/5] Enforce applier out of order protection
Georgy Kirichenko
georgy at tarantool.org
Tue Jan 22 13:31:11 MSK 2019
Do not skip row until the row is not processed by other appliers.
Prerequisite #980
---
src/box/applier.cc | 35 ++++++++++++++++++-----------------
1 file changed, 18 insertions(+), 17 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 87873e970..148c8ce5a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -504,6 +504,22 @@ applier_subscribe(struct applier *applier)
applier->lag = ev_now(loop()) - row.tm;
applier->last_row_time = ev_monotonic_now(loop());
+ struct replica *replica = replica_by_id(row.replica_id);
+ struct latch *latch = (replica ? &replica->order_latch :
+ &replicaset.applier.order_latch);
+ /*
+ * In a full mesh topology, the same set
+ * of changes may arrive via two
+ * concurrently running appliers. Thanks
+ * to vclock_follow() above, the first row
+ * in the set will be skipped - but the
+ * remaining may execute out of order,
+ * when the following xstream_write()
+ * yields on WAL. Hence we need a latch to
+ * strictly order all changes which belong
+ * to the same server id.
+ */
+ latch_lock(latch);
if (vclock_get(&replicaset.applier.vclock,
row.replica_id) < row.lsn) {
if (row.replica_id == instance_id &&
@@ -516,24 +532,7 @@ applier_subscribe(struct applier *applier)
int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
row.replica_id);
vclock_follow_xrow(&replicaset.applier.vclock, &row);
- struct replica *replica = replica_by_id(row.replica_id);
- struct latch *latch = (replica ? &replica->order_latch :
- &replicaset.applier.order_latch);
- /*
- * In a full mesh topology, the same set
- * of changes may arrive via two
- * concurrently running appliers. Thanks
- * to vclock_follow() above, the first row
- * in the set will be skipped - but the
- * remaining may execute out of order,
- * when the following xstream_write()
- * yields on WAL. Hence we need a latch to
- * strictly order all changes which belong
- * to the same server id.
- */
- latch_lock(latch);
int res = xstream_write(applier->subscribe_stream, &row);
- latch_unlock(latch);
if (res != 0) {
struct error *e = diag_last_error(diag_get());
/**
@@ -548,11 +547,13 @@ applier_subscribe(struct applier *applier)
/* Rollback lsn to have a chance for a retry. */
vclock_set(&replicaset.applier.vclock,
row.replica_id, old_lsn);
+ latch_unlock(latch);
diag_raise();
}
}
}
done:
+ latch_unlock(latch);
/*
* Stay 'orphan' until appliers catch up with
* the remote vclock at the time of SUBSCRIBE
--
2.20.1
More information about the Tarantool-patches
mailing list