[tarantool-patches] [PATCH 2/5] Update replicaset vclock from wal

Georgy Kirichenko georgy at tarantool.org
Fri Jan 4 13:34:12 MSK 2019


Journal maintains replicaset vclock for all recovery, local and
replication operations. Introduce replicaset.applier.vclock to
prevent appliers from races.

Needed for: #980
---
 src/box/applier.cc           | 61 +++++++++++++++++++-----------------
 src/box/box.cc               |  3 +-
 src/box/replication.cc       |  1 +
 src/box/replication.h        |  3 ++
 src/box/vclock.c             | 14 +++++++++
 src/box/vclock.h             |  3 ++
 src/box/wal.c                | 38 +++++-----------------
 test/xlog-py/dup_key.result  | 12 +++++--
 test/xlog-py/dup_key.test.py | 18 ++++++++---
 9 files changed, 86 insertions(+), 67 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index ff4af95e5..1c6ed878d 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -139,7 +139,7 @@ applier_writer_f(va_list ap)
 			continue;
 		try {
 			struct xrow_header xrow;
-			xrow_encode_vclock(&xrow, &replicaset.vclock);
+			xrow_encode_vclock(&xrow, &replicaset.applier.vclock);
 			coio_write_xrow(&io, &xrow);
 		} catch (SocketError *e) {
 			/*
@@ -300,7 +300,7 @@ applier_join(struct applier *applier)
 		 * Used to initialize the replica's initial
 		 * vclock in bootstrap_from_master()
 		 */
-		xrow_decode_vclock_xc(&row, &replicaset.vclock);
+		xrow_decode_vclock_xc(&row, &replicaset.applier.vclock);
 	}
 
 	applier_set_state(applier, APPLIER_INITIAL_JOIN);
@@ -326,7 +326,8 @@ applier_join(struct applier *applier)
 				 * vclock yet, do it now. In 1.7+
 				 * this vclock is not used.
 				 */
-				xrow_decode_vclock_xc(&row, &replicaset.vclock);
+				xrow_decode_vclock_xc(&row,
+						      &replicaset.applier.vclock);
 			}
 			break; /* end of stream */
 		} else if (iproto_type_is_error(row.type)) {
@@ -336,6 +337,7 @@ applier_join(struct applier *applier)
 				  (uint32_t) row.type);
 		}
 	}
+	vclock_copy(&replicaset.vclock, &replicaset.applier.vclock);
 	say_info("initial data received");
 
 	applier_set_state(applier, APPLIER_FINAL_JOIN);
@@ -355,7 +357,7 @@ applier_join(struct applier *applier)
 		coio_read_xrow(coio, ibuf, &row);
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
-			vclock_follow_xrow(&replicaset.vclock, &row);
+			vclock_follow_xrow(&replicaset.applier.vclock, &row);
 			xstream_write_xc(applier->subscribe_stream, &row);
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
@@ -386,6 +388,9 @@ applier_subscribe(struct applier *applier)
 {
 	assert(applier->subscribe_stream != NULL);
 
+	if (!vclock_is_set(&replicaset.applier.vclock))
+		vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+
 	/* Send SUBSCRIBE request */
 	struct ev_io *coio = &applier->io;
 	struct ibuf *ibuf = &applier->ibuf;
@@ -470,19 +475,6 @@ applier_subscribe(struct applier *applier)
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
-		/*
-		 * Stay 'orphan' until appliers catch up with
-		 * the remote vclock at the time of SUBSCRIBE
-		 * and the lag is less than configured.
-		 */
-		if (applier->state == APPLIER_SYNC &&
-		    applier->lag <= replication_sync_lag &&
-		    vclock_compare(&remote_vclock_at_subscribe,
-				   &replicaset.vclock) <= 0) {
-			/* Applier is synced, switch to "follow". */
-			applier_set_state(applier, APPLIER_FOLLOW);
-		}
-
 		/*
 		 * Tarantool < 1.7.7 does not send periodic heartbeat
 		 * messages so we can't assume that if we haven't heard
@@ -512,16 +504,12 @@ applier_subscribe(struct applier *applier)
 
 		applier->lag = ev_now(loop()) - row.tm;
 		applier->last_row_time = ev_monotonic_now(loop());
-
-		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			/**
-			 * Promote the replica set vclock before
-			 * applying the row. If there is an
-			 * exception (conflict) applying the row,
-			 * the row is skipped when the replication
-			 * is resumed.
-			 */
-			vclock_follow_xrow(&replicaset.vclock, &row);
+		if (vclock_get(&replicaset.applier.vclock,
+			       row.replica_id) < row.lsn) {
+			/* Preserve old lsn value. */
+			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
+						     row.replica_id);
+			vclock_follow_xrow(&replicaset.applier.vclock, &row);
 			struct replica *replica = replica_by_id(row.replica_id);
 			struct latch *latch = (replica ? &replica->order_latch :
 					       &replicaset.applier.order_latch);
@@ -550,10 +538,27 @@ applier_subscribe(struct applier *applier)
 				    box_error_code(e) == ER_TUPLE_FOUND &&
 				    replication_skip_conflict)
 					diag_clear(diag_get());
-				else
+				else {
+					/* Rollback lsn to have a chance for a retry. */
+					vclock_set(&replicaset.applier.vclock,
+						   row.replica_id, old_lsn);
 					diag_raise();
+				}
 			}
 		}
+		/*
+		 * Stay 'orphan' until appliers catch up with
+		 * the remote vclock at the time of SUBSCRIBE
+		 * and the lag is less than configured.
+		 */
+		if (applier->state == APPLIER_SYNC &&
+		    applier->lag <= replication_sync_lag &&
+		    vclock_compare(&remote_vclock_at_subscribe,
+				   &replicaset.vclock) <= 0) {
+			/* Applier is synced, switch to "follow". */
+			applier_set_state(applier, APPLIER_FOLLOW);
+		}
+
 		if (applier->state == APPLIER_SYNC ||
 		    applier->state == APPLIER_FOLLOW)
 			fiber_cond_signal(&applier->writer_cond);
diff --git a/src/box/box.cc b/src/box/box.cc
index 9f2fd6da1..0df0875dd 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -292,6 +292,7 @@ recovery_journal_write(struct journal *base,
 		       struct journal_entry * /* entry */)
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
+	vclock_copy(&replicaset.vclock, journal->vclock);
 	return vclock_sum(journal->vclock);
 }
 
@@ -1809,7 +1810,7 @@ bootstrap_from_master(struct replica *master)
 	 */
 	engine_begin_final_recovery_xc();
 	struct recovery_journal journal;
-	recovery_journal_create(&journal, &replicaset.vclock);
+	recovery_journal_create(&journal, &replicaset.applier.vclock);
 	journal_set(&journal.base);
 
 	applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 2cb4ec0f8..51e08886c 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -90,6 +90,7 @@ replication_init(void)
 	fiber_cond_create(&replicaset.applier.cond);
 	replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *));
 	latch_create(&replicaset.applier.order_latch);
+	vclock_create(&replicaset.applier.vclock);
 }
 
 void
diff --git a/src/box/replication.h b/src/box/replication.h
index 2ac620d86..b9aebed14 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -194,6 +194,9 @@ struct replicaset {
 	struct vclock vclock;
 	/** Applier state. */
 	struct {
+		/**
+		 * Vclock sent to process from appliers. */
+		struct vclock vclock;
 		/**
 		 * Total number of replicas with attached
 		 * appliers.
diff --git a/src/box/vclock.c b/src/box/vclock.c
index b5eb2800b..c297d1ff9 100644
--- a/src/box/vclock.c
+++ b/src/box/vclock.c
@@ -36,6 +36,20 @@
 
 #include "diag.h"
 
+void
+vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
+{
+	assert(lsn >= 0);
+	assert(replica_id < VCLOCK_MAX);
+	int64_t prev_lsn = vclock->lsn[replica_id];
+	if (lsn > 0)
+		vclock->map |= 1 << replica_id;
+	else
+		vclock->map &= ~(1 << replica_id);
+	vclock->lsn[replica_id] = lsn;
+	vclock->signature += lsn - prev_lsn;
+}
+
 int64_t
 vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
 {
diff --git a/src/box/vclock.h b/src/box/vclock.h
index 111e29160..d6cb14c2a 100644
--- a/src/box/vclock.h
+++ b/src/box/vclock.h
@@ -161,6 +161,9 @@ vclock_get(const struct vclock *vclock, uint32_t replica_id)
 	return vclock->lsn[replica_id];
 }
 
+void
+vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn);
+
 static inline int64_t
 vclock_inc(struct vclock *vclock, uint32_t replica_id)
 {
diff --git a/src/box/wal.c b/src/box/wal.c
index a55b544aa..4c3537672 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -171,6 +171,8 @@ struct wal_msg {
 	 * be rolled back.
 	 */
 	struct stailq rollback;
+	/** vclock after the batch processed. */
+	struct vclock vclock;
 };
 
 /**
@@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch)
 	batch->approx_len = 0;
 	stailq_create(&batch->commit);
 	stailq_create(&batch->rollback);
+	vclock_create(&batch->vclock);
 }
 
 static struct wal_msg *
@@ -280,6 +283,7 @@ tx_schedule_commit(struct cmsg *msg)
 	 * wal_msg memory disappears after the first
 	 * iteration of tx_schedule_queue loop.
 	 */
+	vclock_copy(&replicaset.vclock, &batch->vclock);
 	if (! stailq_empty(&batch->rollback)) {
 		/* Closes the input valve. */
 		stailq_concat(&writer->rollback, &batch->rollback);
@@ -1028,6 +1032,8 @@ done:
 		error_log(error);
 		diag_clear(diag_get());
 	}
+	/* Set resulting vclock. */
+	vclock_copy(&wal_msg->vclock, &writer->vclock);
 	/*
 	 * We need to start rollback from the first request
 	 * following the last committed request. If
@@ -1159,31 +1165,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	bool cancellable = fiber_set_cancellable(false);
 	fiber_yield(); /* Request was inserted. */
 	fiber_set_cancellable(cancellable);
-	if (entry->res > 0) {
-		struct xrow_header **last = entry->rows + entry->n_rows - 1;
-		while (last >= entry->rows) {
-			/*
-			 * Find last row from local instance id
-			 * and promote vclock.
-			 */
-			if ((*last)->replica_id == instance_id) {
-				/*
-				 * In master-master configuration, during sudden
-				 * power-loss, if the data have not been written
-				 * to WAL but have already been sent to others,
-				 * they will send the data back. In this case
-				 * vclock has already been promoted by applier.
-				 */
-				if (vclock_get(&replicaset.vclock,
-					       instance_id) < (*last)->lsn) {
-					vclock_follow_xrow(&replicaset.vclock,
-							   *last);
-				}
-				break;
-			}
-			--last;
-		}
-	}
 	return entry->res;
 }
 
@@ -1193,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
-	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
-	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
-	if (new_lsn > old_lsn) {
-		/* There were local writes, promote vclock. */
-		vclock_follow(&replicaset.vclock, instance_id, new_lsn);
-	}
+	vclock_copy(&replicaset.vclock, &writer->vclock);
 	return vclock_sum(&writer->vclock);
 }
 
diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result
index f387e8e89..966fa1f4a 100644
--- a/test/xlog-py/dup_key.result
+++ b/test/xlog-py/dup_key.result
@@ -16,7 +16,16 @@ box.space.test:insert{2, 'second tuple'}
 ---
 - [2, 'second tuple']
 ...
-.xlog exists
+.xlog#1 exists
+box.space.test:insert{3, 'third tuple'}
+---
+- [3, 'third tuple']
+...
+box.space.test:insert{4, 'fourth tuple'}
+---
+- [4, 'fourth tuple']
+...
+.xlog#2 exists
 box.space.test:insert{1, 'third tuple'}
 ---
 - [1, 'third tuple']
@@ -25,7 +34,6 @@ box.space.test:insert{2, 'fourth tuple'}
 ---
 - [2, 'fourth tuple']
 ...
-.xlog does not exist
 check log line for 'Duplicate key'
 
 'Duplicate key' exists in server log
diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py
index 1c033da40..3dacde771 100644
--- a/test/xlog-py/dup_key.test.py
+++ b/test/xlog-py/dup_key.test.py
@@ -26,19 +26,27 @@ server.stop()
 
 # Save wal#1
 if os.access(wal, os.F_OK):
-    print ".xlog exists"
+    print ".xlog#1 exists"
     os.rename(wal, wal_old)
 
-# Write wal#2
+# Write wal#2 to bump lsn
+server.start()
+server.admin("box.space.test:insert{3, 'third tuple'}")
+server.admin("box.space.test:insert{4, 'fourth tuple'}")
+server.stop()
+
+if os.access(wal, os.F_OK):
+    print ".xlog#2 exists"
+
+# Write wal#3 - confliction with wal#1
 server.start()
 server.admin("box.space.test:insert{1, 'third tuple'}")
 server.admin("box.space.test:insert{2, 'fourth tuple'}")
 server.stop()
 
 # Restore wal#1
-if not os.access(wal, os.F_OK):
-    print ".xlog does not exist"
-    os.rename(wal_old, wal)
+os.unlink(wal)
+os.rename(wal_old, wal)
 
 server.start()
 line = 'Duplicate key'
-- 
2.20.1





More information about the Tarantool-patches mailing list