Tarantool development patches archive
 help / color / mirror / Atom feed
From: Georgy Kirichenko <georgy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: Georgy Kirichenko <georgy@tarantool.org>
Subject: [tarantool-patches] [PATCH v2 2/2] Promote replicaset.vclock only after wal
Date: Thu,  7 Feb 2019 20:27:31 +0300	[thread overview]
Message-ID: <9a66ddf15aea56b09b6821b40f46b546c253d9da.1549556742.git.georgy@tarantool.org> (raw)
In-Reply-To: <cover.1549556742.git.georgy@tarantool.org>

Applier used to promote vclock prior to applying the row. This lead to
a situation when master's row would be skipped forever in case there is
an error trying to apply it. However, some errors are transient, and we
might be able to successfully apply the same row later.

While we're at it, make wal writer the only one responsible for
advancing replicaset vclock. It was already doing it for rows coming
from the local instance, besides, it makes the code cleaner since now we
want to advance vclock direct from wal batch reply and lets us get rid of
unnecessary checks whether applier or wal has already advanced the
vclock.

Closes #2283
Prerequisite #980
---
 src/box/applier.cc                          | 46 ++++++---------
 src/box/wal.c                               | 43 ++++----------
 test/box/errinj.result                      | 56 ++++++++++++++----
 test/replication/skip_conflict_row.result   | 63 +++++++++++++++++++++
 test/replication/skip_conflict_row.test.lua | 20 +++++++
 5 files changed, 159 insertions(+), 69 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 21d2e6bcb..cae71ec1c 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -512,34 +512,20 @@ applier_subscribe(struct applier *applier)
 
 		applier->lag = ev_now(loop()) - row.tm;
 		applier->last_row_time = ev_monotonic_now(loop());
-
-		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			/**
-			 * Promote the replica set vclock before
-			 * applying the row. If there is an
-			 * exception (conflict) applying the row,
-			 * the row is skipped when the replication
-			 * is resumed.
-			 */
-			vclock_follow_xrow(&replicaset.vclock, &row);
-			struct replica *replica = replica_by_id(row.replica_id);
-			struct latch *latch = (replica ? &replica->order_latch :
-					       &replicaset.applier.order_latch);
-			/*
-			 * In a full mesh topology, the same set
-			 * of changes may arrive via two
-			 * concurrently running appliers. Thanks
-			 * to vclock_follow() above, the first row
-			 * in the set will be skipped - but the
-			 * remaining may execute out of order,
-			 * when the following xstream_write()
-			 * yields on WAL. Hence we need a latch to
-			 * strictly order all changes which belong
-			 * to the same server id.
-			 */
-			latch_lock(latch);
+		struct replica *replica = replica_by_id(row.replica_id);
+		struct latch *latch = (replica ? &replica->order_latch :
+				       &replicaset.applier.order_latch);
+		/*
+		 * In a full mesh topology, the same set
+		 * of changes may arrive via two
+		 * concurrently running appliers.
+		 * Hence we need a latch to strictly order all
+		 * changes which belong to the same server id.
+		 */
+		latch_lock(latch);
+		if (vclock_get(&replicaset.vclock,
+			       row.replica_id) < row.lsn) {
 			int res = xstream_write(applier->subscribe_stream, &row);
-			latch_unlock(latch);
 			if (res != 0) {
 				struct error *e = diag_last_error(diag_get());
 				/**
@@ -550,10 +536,14 @@ applier_subscribe(struct applier *applier)
 				    box_error_code(e) == ER_TUPLE_FOUND &&
 				    replication_skip_conflict)
 					diag_clear(diag_get());
-				else
+				else {
+					latch_unlock(latch);
 					diag_raise();
+				}
 			}
 		}
+		latch_unlock(latch);
+
 		if (applier->state == APPLIER_SYNC ||
 		    applier->state == APPLIER_FOLLOW)
 			fiber_cond_signal(&applier->writer_cond);
diff --git a/src/box/wal.c b/src/box/wal.c
index 966f3bfb9..6d6dda390 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -171,6 +171,8 @@ struct wal_msg {
 	 * be rolled back.
 	 */
 	struct stailq rollback;
+	/** vclock after the batch processed. */
+	struct vclock vclock;
 };
 
 /**
@@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch)
 	batch->approx_len = 0;
 	stailq_create(&batch->commit);
 	stailq_create(&batch->rollback);
+	vclock_create(&batch->vclock);
 }
 
 static struct wal_msg *
@@ -284,6 +287,8 @@ tx_schedule_commit(struct cmsg *msg)
 		/* Closes the input valve. */
 		stailq_concat(&writer->rollback, &batch->rollback);
 	}
+	/* Update the tx vclock to the latest written by wal. */
+	vclock_copy(&replicaset.vclock, &batch->vclock);
 	tx_schedule_queue(&batch->commit);
 }
 
@@ -1033,6 +1038,12 @@ done:
 		error_log(error);
 		diag_clear(diag_get());
 	}
+	/*
+	 * Remember the vclock of the last successfully written row so
+	 * that we can update replicaset.vclock once this message gets
+	 * back to tx.
+	 */
+	vclock_copy(&wal_msg->vclock, &writer->vclock);
 	/*
 	 * We need to start rollback from the first request
 	 * following the last committed request. If
@@ -1164,31 +1175,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	bool cancellable = fiber_set_cancellable(false);
 	fiber_yield(); /* Request was inserted. */
 	fiber_set_cancellable(cancellable);
-	if (entry->res > 0) {
-		struct xrow_header **last = entry->rows + entry->n_rows - 1;
-		while (last >= entry->rows) {
-			/*
-			 * Find last row from local instance id
-			 * and promote vclock.
-			 */
-			if ((*last)->replica_id == instance_id) {
-				/*
-				 * In master-master configuration, during sudden
-				 * power-loss, if the data have not been written
-				 * to WAL but have already been sent to others,
-				 * they will send the data back. In this case
-				 * vclock has already been promoted by applier.
-				 */
-				if (vclock_get(&replicaset.vclock,
-					       instance_id) < (*last)->lsn) {
-					vclock_follow_xrow(&replicaset.vclock,
-							   *last);
-				}
-				break;
-			}
-			--last;
-		}
-	}
 	return entry->res;
 }
 
@@ -1198,12 +1184,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
-	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
-	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
-	if (new_lsn > old_lsn) {
-		/* There were local writes, promote vclock. */
-		vclock_follow(&replicaset.vclock, instance_id, new_lsn);
-	}
+	vclock_copy(&replicaset.vclock, &writer->vclock);
 	return vclock_sum(&writer->vclock);
 }
 
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 1d9a16d8d..9a797916c 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -141,31 +141,70 @@ errinj.set("ERRINJ_TESTING", false)
 ---
 - ok
 ...
-env = require('test_run')
+-- Check how well we handle a failed log write
+errinj.set("ERRINJ_WAL_IO", true)
 ---
+- ok
 ...
-test_run = env.new()
+space:insert{1}
 ---
+- error: Failed to write to disk
 ...
-lsn1 = box.info.vclock[box.info.id]
+space:get{1}
 ---
 ...
--- Check how well we handle a failed log write
-errinj.set("ERRINJ_WAL_IO", true)
+errinj.set("ERRINJ_WAL_IO", false)
 ---
 - ok
 ...
 space:insert{1}
 ---
+- [1]
+...
+-- Check vclock was promoted only one time
+errinj.set("ERRINJ_WAL_IO", true)
+---
+- ok
+...
+space:update(1, {{'=', 2, 2}})
+---
 - error: Failed to write to disk
 ...
 space:get{1}
 ---
+- [1]
+...
+space:get{2}
+---
 ...
 errinj.set("ERRINJ_WAL_IO", false)
 ---
 - ok
 ...
+space:update(1, {{'=', 2, 2}})
+---
+- [1, 2]
+...
+-- Check vclock was promoted only two times
+space:truncate()
+---
+...
+lsn1 = box.info.vclock[box.info.id]
+---
+...
+-- Check how well we handle a failed log write
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", 0)
+---
+- ok
+...
+space:insert{1}
+---
+- error: Failed to write to disk
+...
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", -1)
+---
+- ok
+...
 space:insert{1}
 ---
 - [1]
@@ -175,7 +214,7 @@ box.info.vclock[box.info.id] == lsn1 + 1
 ---
 - true
 ...
-errinj.set("ERRINJ_WAL_IO", true)
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", 0)
 ---
 - ok
 ...
@@ -187,10 +226,7 @@ space:get{1}
 ---
 - [1]
 ...
-space:get{2}
----
-...
-errinj.set("ERRINJ_WAL_IO", false)
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", -1)
 ---
 - ok
 ...
diff --git a/test/replication/skip_conflict_row.result b/test/replication/skip_conflict_row.result
index 6ca13b472..0c45e15e2 100644
--- a/test/replication/skip_conflict_row.result
+++ b/test/replication/skip_conflict_row.result
@@ -82,6 +82,69 @@ box.info.status
 ---
 - running
 ...
+-- test that if replication_skip_conflict is off vclock
+-- is not advanced on errors.
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:insert{3}
+---
+- [3]
+...
+lsn1 = box.info.vclock[1]
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+box.space.test:insert{3, 3}
+---
+- [3, 3]
+...
+box.space.test:insert{4}
+---
+- [4]
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- lsn is not promoted
+lsn1 == box.info.vclock[1]
+---
+- true
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'primary' in space 'test'
+...
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+-- applier is not in follow state
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'primary' in space 'test'
+...
+test_run:cmd("switch default")
+---
+- true
+...
 -- cleanup
 test_run:cmd("stop server replica")
 ---
diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
index 4406ced95..7eed4073c 100644
--- a/test/replication/skip_conflict_row.test.lua
+++ b/test/replication/skip_conflict_row.test.lua
@@ -28,6 +28,26 @@ box.space.test:select()
 test_run:cmd("switch default")
 box.info.status
 
+-- test that if replication_skip_conflict is off vclock
+-- is not advanced on errors.
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+box.space.test:insert{3}
+lsn1 = box.info.vclock[1]
+test_run:cmd("switch default")
+box.space.test:insert{3, 3}
+box.space.test:insert{4}
+test_run:cmd("switch replica")
+-- lsn is not promoted
+lsn1 == box.info.vclock[1]
+box.info.replication[1].upstream.message
+box.info.replication[1].upstream.status
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+-- applier is not in follow state
+box.info.replication[1].upstream.message
+test_run:cmd("switch default")
+
 -- cleanup
 test_run:cmd("stop server replica")
 test_run:cmd("cleanup server replica")
-- 
2.20.1

  parent reply	other threads:[~2019-02-07 17:25 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-02-07 17:27 [tarantool-patches] [PATCH v2 0/2] Do not promote vclocks in case of failure Georgy Kirichenko
2019-02-07 17:27 ` [tarantool-patches] [PATCH v2 1/2] Do not promote wal vclock for failed writes Georgy Kirichenko
2019-02-08  9:57   ` Vladimir Davydov
2019-02-07 17:27 ` Georgy Kirichenko [this message]
2019-02-08 10:09 ` [tarantool-patches] [PATCH v2 0/2] Do not promote vclocks in case of failure Vladimir Davydov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=9a66ddf15aea56b09b6821b40f46b546c253d9da.1549556742.git.georgy@tarantool.org \
    --to=georgy@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [tarantool-patches] [PATCH v2 2/2] Promote replicaset.vclock only after wal' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox