[tarantool-patches] [PATCH 3/3] Promote replicaset.vclock only after wal

Georgy Kirichenko georgy at tarantool.org
Wed Feb 6 11:29:59 MSK 2019


Get rid if appliers' vclock_follow and promote vclock only after
wal write.

Closes #2283
Prerequisite #980
---
 src/box/applier.cc                          | 14 ++------
 src/box/wal.c                               | 38 ++++-----------------
 test/replication/skip_conflict_row.test.lua | 19 +++++++++++
 test/xlog-py/dup_key.result                 | 12 +++++--
 test/xlog-py/dup_key.test.py                | 23 ++++++++++---
 5 files changed, 57 insertions(+), 49 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index d87b247e2..cae71ec1c 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -518,22 +518,14 @@ applier_subscribe(struct applier *applier)
 		/*
 		 * In a full mesh topology, the same set
 		 * of changes may arrive via two
-		 * concurrently running appliers. Thanks
-		 * to vclock_follow() above, the first row
-		 * in the set will be skipped - but the
-		 * remaining may execute out of order,
-		 * when the following xstream_write()
-		 * yields on WAL. Hence we need a latch to
-		 * strictly order all changes which belong
-		 * to the same server id.
+		 * concurrently running appliers.
+		 * Hence we need a latch to strictly order all
+		 * changes which belong to the same server id.
 		 */
 		latch_lock(latch);
 		if (vclock_get(&replicaset.vclock,
 			       row.replica_id) < row.lsn) {
-			vclock_follow_xrow(&replicaset.vclock, &row);
-			latch_lock(latch);
 			int res = xstream_write(applier->subscribe_stream, &row);
-			latch_unlock(latch);
 			if (res != 0) {
 				struct error *e = diag_last_error(diag_get());
 				/**
diff --git a/src/box/wal.c b/src/box/wal.c
index a55b544aa..4c3537672 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -171,6 +171,8 @@ struct wal_msg {
 	 * be rolled back.
 	 */
 	struct stailq rollback;
+	/** vclock after the batch processed. */
+	struct vclock vclock;
 };
 
 /**
@@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch)
 	batch->approx_len = 0;
 	stailq_create(&batch->commit);
 	stailq_create(&batch->rollback);
+	vclock_create(&batch->vclock);
 }
 
 static struct wal_msg *
@@ -280,6 +283,7 @@ tx_schedule_commit(struct cmsg *msg)
 	 * wal_msg memory disappears after the first
 	 * iteration of tx_schedule_queue loop.
 	 */
+	vclock_copy(&replicaset.vclock, &batch->vclock);
 	if (! stailq_empty(&batch->rollback)) {
 		/* Closes the input valve. */
 		stailq_concat(&writer->rollback, &batch->rollback);
@@ -1028,6 +1032,8 @@ done:
 		error_log(error);
 		diag_clear(diag_get());
 	}
+	/* Set resulting vclock. */
+	vclock_copy(&wal_msg->vclock, &writer->vclock);
 	/*
 	 * We need to start rollback from the first request
 	 * following the last committed request. If
@@ -1159,31 +1165,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	bool cancellable = fiber_set_cancellable(false);
 	fiber_yield(); /* Request was inserted. */
 	fiber_set_cancellable(cancellable);
-	if (entry->res > 0) {
-		struct xrow_header **last = entry->rows + entry->n_rows - 1;
-		while (last >= entry->rows) {
-			/*
-			 * Find last row from local instance id
-			 * and promote vclock.
-			 */
-			if ((*last)->replica_id == instance_id) {
-				/*
-				 * In master-master configuration, during sudden
-				 * power-loss, if the data have not been written
-				 * to WAL but have already been sent to others,
-				 * they will send the data back. In this case
-				 * vclock has already been promoted by applier.
-				 */
-				if (vclock_get(&replicaset.vclock,
-					       instance_id) < (*last)->lsn) {
-					vclock_follow_xrow(&replicaset.vclock,
-							   *last);
-				}
-				break;
-			}
-			--last;
-		}
-	}
 	return entry->res;
 }
 
@@ -1193,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
-	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
-	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
-	if (new_lsn > old_lsn) {
-		/* There were local writes, promote vclock. */
-		vclock_follow(&replicaset.vclock, instance_id, new_lsn);
-	}
+	vclock_copy(&replicaset.vclock, &writer->vclock);
 	return vclock_sum(&writer->vclock);
 }
 
diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
index 4406ced95..c60999b9b 100644
--- a/test/replication/skip_conflict_row.test.lua
+++ b/test/replication/skip_conflict_row.test.lua
@@ -1,5 +1,6 @@
 env = require('test_run')
 test_run = env.new()
+test_run:cmd("restart server default with cleanup=1")
 engine = test_run:get_cfg('engine')
 
 box.schema.user.grant('guest', 'replication')
@@ -28,6 +29,24 @@ box.space.test:select()
 test_run:cmd("switch default")
 box.info.status
 
+-- test that if replication_skip_conflict is off vclock
+-- is not advanced on errors.
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+box.cfg{replication_skip_conflict=false}
+box.space.test:insert{3}
+box.info.vclock
+test_run:cmd("switch default")
+box.space.test:insert{3, 3}
+box.space.test:insert{4}
+box.info.vclock
+test_run:cmd("switch replica")
+box.info.vclock
+box.info.replication[1].upstream.message
+box.info.replication[1].upstream.status
+box.space.test:select()
+test_run:cmd("switch default")
+
 -- cleanup
 test_run:cmd("stop server replica")
 test_run:cmd("cleanup server replica")
diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result
index f387e8e89..966fa1f4a 100644
--- a/test/xlog-py/dup_key.result
+++ b/test/xlog-py/dup_key.result
@@ -16,7 +16,16 @@ box.space.test:insert{2, 'second tuple'}
 ---
 - [2, 'second tuple']
 ...
-.xlog exists
+.xlog#1 exists
+box.space.test:insert{3, 'third tuple'}
+---
+- [3, 'third tuple']
+...
+box.space.test:insert{4, 'fourth tuple'}
+---
+- [4, 'fourth tuple']
+...
+.xlog#2 exists
 box.space.test:insert{1, 'third tuple'}
 ---
 - [1, 'third tuple']
@@ -25,7 +34,6 @@ box.space.test:insert{2, 'fourth tuple'}
 ---
 - [2, 'fourth tuple']
 ...
-.xlog does not exist
 check log line for 'Duplicate key'
 
 'Duplicate key' exists in server log
diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py
index 1c033da40..e25b1d477 100644
--- a/test/xlog-py/dup_key.test.py
+++ b/test/xlog-py/dup_key.test.py
@@ -22,23 +22,36 @@ wal = os.path.join(vardir, filename)
 # Create wal#1
 server.admin("box.space.test:insert{1, 'first tuple'}")
 server.admin("box.space.test:insert{2, 'second tuple'}")
+lsn2 = int(yaml.load(server.admin("box.info.lsn", silent=True))[0])
 server.stop()
 
 # Save wal#1
 if os.access(wal, os.F_OK):
-    print ".xlog exists"
+    print ".xlog#1 exists"
     os.rename(wal, wal_old)
+# drop empty log created on shutdown
+filename2 = str(lsn2).zfill(20) + ".xlog"
+wal2 = os.path.join(vardir, filename2)
+os.unlink(wal2)
 
-# Write wal#2
+# Write wal#2 to bump lsn
+server.start()
+server.admin("box.space.test:insert{3, 'third tuple'}")
+server.admin("box.space.test:insert{4, 'fourth tuple'}")
+server.stop()
+
+if os.access(wal, os.F_OK):
+    print ".xlog#2 exists"
+
+# Write wal#3 - confliction with wal#1
 server.start()
 server.admin("box.space.test:insert{1, 'third tuple'}")
 server.admin("box.space.test:insert{2, 'fourth tuple'}")
 server.stop()
 
 # Restore wal#1
-if not os.access(wal, os.F_OK):
-    print ".xlog does not exist"
-    os.rename(wal_old, wal)
+os.unlink(wal)
+os.rename(wal_old, wal)
 
 server.start()
 line = 'Duplicate key'
-- 
2.20.1





More information about the Tarantool-patches mailing list