[tarantool-patches] [PATCH v2 2/2] Promote replicaset.vclock only after wal

Georgy Kirichenko georgy at tarantool.org
Thu Feb 7 20:27:31 MSK 2019


Applier used to promote vclock prior to applying the row. This lead to
a situation when master's row would be skipped forever in case there is
an error trying to apply it. However, some errors are transient, and we
might be able to successfully apply the same row later.

While we're at it, make wal writer the only one responsible for
advancing replicaset vclock. It was already doing it for rows coming
from the local instance, besides, it makes the code cleaner since now we
want to advance vclock direct from wal batch reply and lets us get rid of
unnecessary checks whether applier or wal has already advanced the
vclock.

Closes #2283
Prerequisite #980
---
 src/box/applier.cc                          | 46 ++++++---------
 src/box/wal.c                               | 43 ++++----------
 test/box/errinj.result                      | 56 ++++++++++++++----
 test/replication/skip_conflict_row.result   | 63 +++++++++++++++++++++
 test/replication/skip_conflict_row.test.lua | 20 +++++++
 5 files changed, 159 insertions(+), 69 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 21d2e6bcb..cae71ec1c 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -512,34 +512,20 @@ applier_subscribe(struct applier *applier)
 
 		applier->lag = ev_now(loop()) - row.tm;
 		applier->last_row_time = ev_monotonic_now(loop());
-
-		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			/**
-			 * Promote the replica set vclock before
-			 * applying the row. If there is an
-			 * exception (conflict) applying the row,
-			 * the row is skipped when the replication
-			 * is resumed.
-			 */
-			vclock_follow_xrow(&replicaset.vclock, &row);
-			struct replica *replica = replica_by_id(row.replica_id);
-			struct latch *latch = (replica ? &replica->order_latch :
-					       &replicaset.applier.order_latch);
-			/*
-			 * In a full mesh topology, the same set
-			 * of changes may arrive via two
-			 * concurrently running appliers. Thanks
-			 * to vclock_follow() above, the first row
-			 * in the set will be skipped - but the
-			 * remaining may execute out of order,
-			 * when the following xstream_write()
-			 * yields on WAL. Hence we need a latch to
-			 * strictly order all changes which belong
-			 * to the same server id.
-			 */
-			latch_lock(latch);
+		struct replica *replica = replica_by_id(row.replica_id);
+		struct latch *latch = (replica ? &replica->order_latch :
+				       &replicaset.applier.order_latch);
+		/*
+		 * In a full mesh topology, the same set
+		 * of changes may arrive via two
+		 * concurrently running appliers.
+		 * Hence we need a latch to strictly order all
+		 * changes which belong to the same server id.
+		 */
+		latch_lock(latch);
+		if (vclock_get(&replicaset.vclock,
+			       row.replica_id) < row.lsn) {
 			int res = xstream_write(applier->subscribe_stream, &row);
-			latch_unlock(latch);
 			if (res != 0) {
 				struct error *e = diag_last_error(diag_get());
 				/**
@@ -550,10 +536,14 @@ applier_subscribe(struct applier *applier)
 				    box_error_code(e) == ER_TUPLE_FOUND &&
 				    replication_skip_conflict)
 					diag_clear(diag_get());
-				else
+				else {
+					latch_unlock(latch);
 					diag_raise();
+				}
 			}
 		}
+		latch_unlock(latch);
+
 		if (applier->state == APPLIER_SYNC ||
 		    applier->state == APPLIER_FOLLOW)
 			fiber_cond_signal(&applier->writer_cond);
diff --git a/src/box/wal.c b/src/box/wal.c
index 966f3bfb9..6d6dda390 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -171,6 +171,8 @@ struct wal_msg {
 	 * be rolled back.
 	 */
 	struct stailq rollback;
+	/** vclock after the batch processed. */
+	struct vclock vclock;
 };
 
 /**
@@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch)
 	batch->approx_len = 0;
 	stailq_create(&batch->commit);
 	stailq_create(&batch->rollback);
+	vclock_create(&batch->vclock);
 }
 
 static struct wal_msg *
@@ -284,6 +287,8 @@ tx_schedule_commit(struct cmsg *msg)
 		/* Closes the input valve. */
 		stailq_concat(&writer->rollback, &batch->rollback);
 	}
+	/* Update the tx vclock to the latest written by wal. */
+	vclock_copy(&replicaset.vclock, &batch->vclock);
 	tx_schedule_queue(&batch->commit);
 }
 
@@ -1033,6 +1038,12 @@ done:
 		error_log(error);
 		diag_clear(diag_get());
 	}
+	/*
+	 * Remember the vclock of the last successfully written row so
+	 * that we can update replicaset.vclock once this message gets
+	 * back to tx.
+	 */
+	vclock_copy(&wal_msg->vclock, &writer->vclock);
 	/*
 	 * We need to start rollback from the first request
 	 * following the last committed request. If
@@ -1164,31 +1175,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	bool cancellable = fiber_set_cancellable(false);
 	fiber_yield(); /* Request was inserted. */
 	fiber_set_cancellable(cancellable);
-	if (entry->res > 0) {
-		struct xrow_header **last = entry->rows + entry->n_rows - 1;
-		while (last >= entry->rows) {
-			/*
-			 * Find last row from local instance id
-			 * and promote vclock.
-			 */
-			if ((*last)->replica_id == instance_id) {
-				/*
-				 * In master-master configuration, during sudden
-				 * power-loss, if the data have not been written
-				 * to WAL but have already been sent to others,
-				 * they will send the data back. In this case
-				 * vclock has already been promoted by applier.
-				 */
-				if (vclock_get(&replicaset.vclock,
-					       instance_id) < (*last)->lsn) {
-					vclock_follow_xrow(&replicaset.vclock,
-							   *last);
-				}
-				break;
-			}
-			--last;
-		}
-	}
 	return entry->res;
 }
 
@@ -1198,12 +1184,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
-	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
-	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
-	if (new_lsn > old_lsn) {
-		/* There were local writes, promote vclock. */
-		vclock_follow(&replicaset.vclock, instance_id, new_lsn);
-	}
+	vclock_copy(&replicaset.vclock, &writer->vclock);
 	return vclock_sum(&writer->vclock);
 }
 
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 1d9a16d8d..9a797916c 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -141,31 +141,70 @@ errinj.set("ERRINJ_TESTING", false)
 ---
 - ok
 ...
-env = require('test_run')
+-- Check how well we handle a failed log write
+errinj.set("ERRINJ_WAL_IO", true)
 ---
+- ok
 ...
-test_run = env.new()
+space:insert{1}
 ---
+- error: Failed to write to disk
 ...
-lsn1 = box.info.vclock[box.info.id]
+space:get{1}
 ---
 ...
--- Check how well we handle a failed log write
-errinj.set("ERRINJ_WAL_IO", true)
+errinj.set("ERRINJ_WAL_IO", false)
 ---
 - ok
 ...
 space:insert{1}
 ---
+- [1]
+...
+-- Check vclock was promoted only one time
+errinj.set("ERRINJ_WAL_IO", true)
+---
+- ok
+...
+space:update(1, {{'=', 2, 2}})
+---
 - error: Failed to write to disk
 ...
 space:get{1}
 ---
+- [1]
+...
+space:get{2}
+---
 ...
 errinj.set("ERRINJ_WAL_IO", false)
 ---
 - ok
 ...
+space:update(1, {{'=', 2, 2}})
+---
+- [1, 2]
+...
+-- Check vclock was promoted only two times
+space:truncate()
+---
+...
+lsn1 = box.info.vclock[box.info.id]
+---
+...
+-- Check how well we handle a failed log write
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", 0)
+---
+- ok
+...
+space:insert{1}
+---
+- error: Failed to write to disk
+...
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", -1)
+---
+- ok
+...
 space:insert{1}
 ---
 - [1]
@@ -175,7 +214,7 @@ box.info.vclock[box.info.id] == lsn1 + 1
 ---
 - true
 ...
-errinj.set("ERRINJ_WAL_IO", true)
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", 0)
 ---
 - ok
 ...
@@ -187,10 +226,7 @@ space:get{1}
 ---
 - [1]
 ...
-space:get{2}
----
-...
-errinj.set("ERRINJ_WAL_IO", false)
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", -1)
 ---
 - ok
 ...
diff --git a/test/replication/skip_conflict_row.result b/test/replication/skip_conflict_row.result
index 6ca13b472..0c45e15e2 100644
--- a/test/replication/skip_conflict_row.result
+++ b/test/replication/skip_conflict_row.result
@@ -82,6 +82,69 @@ box.info.status
 ---
 - running
 ...
+-- test that if replication_skip_conflict is off vclock
+-- is not advanced on errors.
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:insert{3}
+---
+- [3]
+...
+lsn1 = box.info.vclock[1]
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+box.space.test:insert{3, 3}
+---
+- [3, 3]
+...
+box.space.test:insert{4}
+---
+- [4]
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- lsn is not promoted
+lsn1 == box.info.vclock[1]
+---
+- true
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'primary' in space 'test'
+...
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+-- applier is not in follow state
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'primary' in space 'test'
+...
+test_run:cmd("switch default")
+---
+- true
+...
 -- cleanup
 test_run:cmd("stop server replica")
 ---
diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
index 4406ced95..7eed4073c 100644
--- a/test/replication/skip_conflict_row.test.lua
+++ b/test/replication/skip_conflict_row.test.lua
@@ -28,6 +28,26 @@ box.space.test:select()
 test_run:cmd("switch default")
 box.info.status
 
+-- test that if replication_skip_conflict is off vclock
+-- is not advanced on errors.
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+box.space.test:insert{3}
+lsn1 = box.info.vclock[1]
+test_run:cmd("switch default")
+box.space.test:insert{3, 3}
+box.space.test:insert{4}
+test_run:cmd("switch replica")
+-- lsn is not promoted
+lsn1 == box.info.vclock[1]
+box.info.replication[1].upstream.message
+box.info.replication[1].upstream.status
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+-- applier is not in follow state
+box.info.replication[1].upstream.message
+test_run:cmd("switch default")
+
 -- cleanup
 test_run:cmd("stop server replica")
 test_run:cmd("cleanup server replica")
-- 
2.20.1





More information about the Tarantool-patches mailing list