[tarantool-patches] [PATCH 2/5] Update replicaset vclock from wal
Georgy Kirichenko
georgy at tarantool.org
Fri Jan 4 13:34:12 MSK 2019
Journal maintains replicaset vclock for all recovery, local and
replication operations. Introduce replicaset.applier.vclock to
prevent appliers from races.
Needed for: #980
---
src/box/applier.cc | 61 +++++++++++++++++++-----------------
src/box/box.cc | 3 +-
src/box/replication.cc | 1 +
src/box/replication.h | 3 ++
src/box/vclock.c | 14 +++++++++
src/box/vclock.h | 3 ++
src/box/wal.c | 38 +++++-----------------
test/xlog-py/dup_key.result | 12 +++++--
test/xlog-py/dup_key.test.py | 18 ++++++++---
9 files changed, 86 insertions(+), 67 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index ff4af95e5..1c6ed878d 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -139,7 +139,7 @@ applier_writer_f(va_list ap)
continue;
try {
struct xrow_header xrow;
- xrow_encode_vclock(&xrow, &replicaset.vclock);
+ xrow_encode_vclock(&xrow, &replicaset.applier.vclock);
coio_write_xrow(&io, &xrow);
} catch (SocketError *e) {
/*
@@ -300,7 +300,7 @@ applier_join(struct applier *applier)
* Used to initialize the replica's initial
* vclock in bootstrap_from_master()
*/
- xrow_decode_vclock_xc(&row, &replicaset.vclock);
+ xrow_decode_vclock_xc(&row, &replicaset.applier.vclock);
}
applier_set_state(applier, APPLIER_INITIAL_JOIN);
@@ -326,7 +326,8 @@ applier_join(struct applier *applier)
* vclock yet, do it now. In 1.7+
* this vclock is not used.
*/
- xrow_decode_vclock_xc(&row, &replicaset.vclock);
+ xrow_decode_vclock_xc(&row,
+ &replicaset.applier.vclock);
}
break; /* end of stream */
} else if (iproto_type_is_error(row.type)) {
@@ -336,6 +337,7 @@ applier_join(struct applier *applier)
(uint32_t) row.type);
}
}
+ vclock_copy(&replicaset.vclock, &replicaset.applier.vclock);
say_info("initial data received");
applier_set_state(applier, APPLIER_FINAL_JOIN);
@@ -355,7 +357,7 @@ applier_join(struct applier *applier)
coio_read_xrow(coio, ibuf, &row);
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
- vclock_follow_xrow(&replicaset.vclock, &row);
+ vclock_follow_xrow(&replicaset.applier.vclock, &row);
xstream_write_xc(applier->subscribe_stream, &row);
if (++row_count % 100000 == 0)
say_info("%.1fM rows received", row_count / 1e6);
@@ -386,6 +388,9 @@ applier_subscribe(struct applier *applier)
{
assert(applier->subscribe_stream != NULL);
+ if (!vclock_is_set(&replicaset.applier.vclock))
+ vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+
/* Send SUBSCRIBE request */
struct ev_io *coio = &applier->io;
struct ibuf *ibuf = &applier->ibuf;
@@ -470,19 +475,6 @@ applier_subscribe(struct applier *applier)
applier_set_state(applier, APPLIER_FOLLOW);
}
- /*
- * Stay 'orphan' until appliers catch up with
- * the remote vclock at the time of SUBSCRIBE
- * and the lag is less than configured.
- */
- if (applier->state == APPLIER_SYNC &&
- applier->lag <= replication_sync_lag &&
- vclock_compare(&remote_vclock_at_subscribe,
- &replicaset.vclock) <= 0) {
- /* Applier is synced, switch to "follow". */
- applier_set_state(applier, APPLIER_FOLLOW);
- }
-
/*
* Tarantool < 1.7.7 does not send periodic heartbeat
* messages so we can't assume that if we haven't heard
@@ -512,16 +504,12 @@ applier_subscribe(struct applier *applier)
applier->lag = ev_now(loop()) - row.tm;
applier->last_row_time = ev_monotonic_now(loop());
-
- if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
- /**
- * Promote the replica set vclock before
- * applying the row. If there is an
- * exception (conflict) applying the row,
- * the row is skipped when the replication
- * is resumed.
- */
- vclock_follow_xrow(&replicaset.vclock, &row);
+ if (vclock_get(&replicaset.applier.vclock,
+ row.replica_id) < row.lsn) {
+ /* Preserve old lsn value. */
+ int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
+ row.replica_id);
+ vclock_follow_xrow(&replicaset.applier.vclock, &row);
struct replica *replica = replica_by_id(row.replica_id);
struct latch *latch = (replica ? &replica->order_latch :
&replicaset.applier.order_latch);
@@ -550,10 +538,27 @@ applier_subscribe(struct applier *applier)
box_error_code(e) == ER_TUPLE_FOUND &&
replication_skip_conflict)
diag_clear(diag_get());
- else
+ else {
+ /* Rollback lsn to have a chance for a retry. */
+ vclock_set(&replicaset.applier.vclock,
+ row.replica_id, old_lsn);
diag_raise();
+ }
}
}
+ /*
+ * Stay 'orphan' until appliers catch up with
+ * the remote vclock at the time of SUBSCRIBE
+ * and the lag is less than configured.
+ */
+ if (applier->state == APPLIER_SYNC &&
+ applier->lag <= replication_sync_lag &&
+ vclock_compare(&remote_vclock_at_subscribe,
+ &replicaset.vclock) <= 0) {
+ /* Applier is synced, switch to "follow". */
+ applier_set_state(applier, APPLIER_FOLLOW);
+ }
+
if (applier->state == APPLIER_SYNC ||
applier->state == APPLIER_FOLLOW)
fiber_cond_signal(&applier->writer_cond);
diff --git a/src/box/box.cc b/src/box/box.cc
index 9f2fd6da1..0df0875dd 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -292,6 +292,7 @@ recovery_journal_write(struct journal *base,
struct journal_entry * /* entry */)
{
struct recovery_journal *journal = (struct recovery_journal *) base;
+ vclock_copy(&replicaset.vclock, journal->vclock);
return vclock_sum(journal->vclock);
}
@@ -1809,7 +1810,7 @@ bootstrap_from_master(struct replica *master)
*/
engine_begin_final_recovery_xc();
struct recovery_journal journal;
- recovery_journal_create(&journal, &replicaset.vclock);
+ recovery_journal_create(&journal, &replicaset.applier.vclock);
journal_set(&journal.base);
applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 2cb4ec0f8..51e08886c 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -90,6 +90,7 @@ replication_init(void)
fiber_cond_create(&replicaset.applier.cond);
replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *));
latch_create(&replicaset.applier.order_latch);
+ vclock_create(&replicaset.applier.vclock);
}
void
diff --git a/src/box/replication.h b/src/box/replication.h
index 2ac620d86..b9aebed14 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -194,6 +194,9 @@ struct replicaset {
struct vclock vclock;
/** Applier state. */
struct {
+ /**
+ * Vclock sent to process from appliers. */
+ struct vclock vclock;
/**
* Total number of replicas with attached
* appliers.
diff --git a/src/box/vclock.c b/src/box/vclock.c
index b5eb2800b..c297d1ff9 100644
--- a/src/box/vclock.c
+++ b/src/box/vclock.c
@@ -36,6 +36,20 @@
#include "diag.h"
+void
+vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
+{
+ assert(lsn >= 0);
+ assert(replica_id < VCLOCK_MAX);
+ int64_t prev_lsn = vclock->lsn[replica_id];
+ if (lsn > 0)
+ vclock->map |= 1 << replica_id;
+ else
+ vclock->map &= ~(1 << replica_id);
+ vclock->lsn[replica_id] = lsn;
+ vclock->signature += lsn - prev_lsn;
+}
+
int64_t
vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
{
diff --git a/src/box/vclock.h b/src/box/vclock.h
index 111e29160..d6cb14c2a 100644
--- a/src/box/vclock.h
+++ b/src/box/vclock.h
@@ -161,6 +161,9 @@ vclock_get(const struct vclock *vclock, uint32_t replica_id)
return vclock->lsn[replica_id];
}
+void
+vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn);
+
static inline int64_t
vclock_inc(struct vclock *vclock, uint32_t replica_id)
{
diff --git a/src/box/wal.c b/src/box/wal.c
index a55b544aa..4c3537672 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -171,6 +171,8 @@ struct wal_msg {
* be rolled back.
*/
struct stailq rollback;
+ /** vclock after the batch processed. */
+ struct vclock vclock;
};
/**
@@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch)
batch->approx_len = 0;
stailq_create(&batch->commit);
stailq_create(&batch->rollback);
+ vclock_create(&batch->vclock);
}
static struct wal_msg *
@@ -280,6 +283,7 @@ tx_schedule_commit(struct cmsg *msg)
* wal_msg memory disappears after the first
* iteration of tx_schedule_queue loop.
*/
+ vclock_copy(&replicaset.vclock, &batch->vclock);
if (! stailq_empty(&batch->rollback)) {
/* Closes the input valve. */
stailq_concat(&writer->rollback, &batch->rollback);
@@ -1028,6 +1032,8 @@ done:
error_log(error);
diag_clear(diag_get());
}
+ /* Set resulting vclock. */
+ vclock_copy(&wal_msg->vclock, &writer->vclock);
/*
* We need to start rollback from the first request
* following the last committed request. If
@@ -1159,31 +1165,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
bool cancellable = fiber_set_cancellable(false);
fiber_yield(); /* Request was inserted. */
fiber_set_cancellable(cancellable);
- if (entry->res > 0) {
- struct xrow_header **last = entry->rows + entry->n_rows - 1;
- while (last >= entry->rows) {
- /*
- * Find last row from local instance id
- * and promote vclock.
- */
- if ((*last)->replica_id == instance_id) {
- /*
- * In master-master configuration, during sudden
- * power-loss, if the data have not been written
- * to WAL but have already been sent to others,
- * they will send the data back. In this case
- * vclock has already been promoted by applier.
- */
- if (vclock_get(&replicaset.vclock,
- instance_id) < (*last)->lsn) {
- vclock_follow_xrow(&replicaset.vclock,
- *last);
- }
- break;
- }
- --last;
- }
- }
return entry->res;
}
@@ -1193,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
{
struct wal_writer *writer = (struct wal_writer *) journal;
wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
- int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
- int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
- if (new_lsn > old_lsn) {
- /* There were local writes, promote vclock. */
- vclock_follow(&replicaset.vclock, instance_id, new_lsn);
- }
+ vclock_copy(&replicaset.vclock, &writer->vclock);
return vclock_sum(&writer->vclock);
}
diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result
index f387e8e89..966fa1f4a 100644
--- a/test/xlog-py/dup_key.result
+++ b/test/xlog-py/dup_key.result
@@ -16,7 +16,16 @@ box.space.test:insert{2, 'second tuple'}
---
- [2, 'second tuple']
...
-.xlog exists
+.xlog#1 exists
+box.space.test:insert{3, 'third tuple'}
+---
+- [3, 'third tuple']
+...
+box.space.test:insert{4, 'fourth tuple'}
+---
+- [4, 'fourth tuple']
+...
+.xlog#2 exists
box.space.test:insert{1, 'third tuple'}
---
- [1, 'third tuple']
@@ -25,7 +34,6 @@ box.space.test:insert{2, 'fourth tuple'}
---
- [2, 'fourth tuple']
...
-.xlog does not exist
check log line for 'Duplicate key'
'Duplicate key' exists in server log
diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py
index 1c033da40..3dacde771 100644
--- a/test/xlog-py/dup_key.test.py
+++ b/test/xlog-py/dup_key.test.py
@@ -26,19 +26,27 @@ server.stop()
# Save wal#1
if os.access(wal, os.F_OK):
- print ".xlog exists"
+ print ".xlog#1 exists"
os.rename(wal, wal_old)
-# Write wal#2
+# Write wal#2 to bump lsn
+server.start()
+server.admin("box.space.test:insert{3, 'third tuple'}")
+server.admin("box.space.test:insert{4, 'fourth tuple'}")
+server.stop()
+
+if os.access(wal, os.F_OK):
+ print ".xlog#2 exists"
+
+# Write wal#3 - confliction with wal#1
server.start()
server.admin("box.space.test:insert{1, 'third tuple'}")
server.admin("box.space.test:insert{2, 'fourth tuple'}")
server.stop()
# Restore wal#1
-if not os.access(wal, os.F_OK):
- print ".xlog does not exist"
- os.rename(wal_old, wal)
+os.unlink(wal)
+os.rename(wal_old, wal)
server.start()
line = 'Duplicate key'
--
2.20.1
More information about the Tarantool-patches
mailing list