[PATCH] replication: keep header when request is modified by before_replace

Vladimir Davydov vdavydov.dev at gmail.com
Mon Oct 29 19:05:18 MSK 2018


When space.before_replace trigger modifies the result of a remote
operation, we clear the request header so that it gets rebuilt on
commit. This is incorrect, because as a result we don't bump the
master's component of the replica's vclock, which leads to the request
being applied again when the replica reconnects. The issue manifests
itself in sporadic replication/before_replace test failures.

Fix it by updating the request header rather than clearing it so that
replica id and lsn get preserved.

Closes #3722
---
https://github.com/tarantool/tarantool/issues/3722
https://github.com/tarantool/tarantool/commits/dv/gh-3722-fix-before-replace-replication

 src/box/request.c                        | 39 ++++++++++-----
 test/replication/before_replace.result   | 83 ++++++++++++++++++++++++++++++++
 test/replication/before_replace.test.lua | 42 +++++++++++++++-
 3 files changed, 151 insertions(+), 13 deletions(-)

diff --git a/src/box/request.c b/src/box/request.c
index 8690519c..9c684af7 100644
--- a/src/box/request.c
+++ b/src/box/request.c
@@ -45,10 +45,34 @@
 #include "xrow.h"
 #include "iproto_constants.h"
 
+/**
+ * Whenever we update a request, we must update its header as well.
+ * This function does the trick.
+ *
+ * We keep the original header instead of rebuilding it on commit
+ * in order to preserve the original replica id and lsn so that
+ * in case the request comes from a remote master, we will bump
+ * the master's component of the replica's vclock and hence won't
+ * try to apply the same row again on reconnect.
+ */
+static int
+request_update_header(struct request *request, struct xrow_header *row)
+{
+	if (row == NULL)
+		return 0;
+	row->type = request->type;
+	row->bodycnt = xrow_encode_dml(request, row->body);
+	if (row->bodycnt < 0)
+		return -1;
+	request->header = row;
+	return 0;
+}
+
 int
 request_create_from_tuple(struct request *request, struct space *space,
 			  struct tuple *old_tuple, struct tuple *new_tuple)
 {
+	struct xrow_header *row = request->header;
 	memset(request, 0, sizeof(*request));
 
 	if (old_tuple == new_tuple) {
@@ -57,7 +81,7 @@ request_create_from_tuple(struct request *request, struct space *space,
 		 * turn this request into no-op.
 		 */
 		request->type = IPROTO_NOP;
-		return 0;
+		return request_update_header(request, row);
 	}
 	/*
 	 * Space pointer may be zero in case of NOP, in which case
@@ -91,7 +115,7 @@ request_create_from_tuple(struct request *request, struct space *space,
 		request->tuple_end = buf + size;
 		request->type = IPROTO_REPLACE;
 	}
-	return 0;
+	return request_update_header(request, row);
 }
 
 void
@@ -198,14 +222,5 @@ request_handle_sequence(struct request *request, struct space *space)
 		if (likely(mp_read_int64(&key, &value) == 0))
 			return sequence_update(seq, value);
 	}
-	/*
-	 * As the request body was changed, we have to update body in header.
-	 */
-	struct xrow_header *row = request->header;
-	if (row != NULL) {
-		row->bodycnt = xrow_encode_dml(request, row->body);
-		if (row->bodycnt < 0)
-			return -1;
-	}
-	return 0;
+	return request_update_header(request, request->header);
 }
diff --git a/test/replication/before_replace.result b/test/replication/before_replace.result
index 858a52de..abfbc4bb 100644
--- a/test/replication/before_replace.result
+++ b/test/replication/before_replace.result
@@ -7,6 +7,9 @@ env = require('test_run')
 test_run = env.new()
 ---
 ...
+engine = test_run:get_cfg('engine')
+---
+...
 SERVERS = { 'autobootstrap1', 'autobootstrap2', 'autobootstrap3' }
 ---
 ...
@@ -223,3 +226,83 @@ test_run:cmd("switch default")
 test_run:drop_cluster(SERVERS)
 ---
 ...
+--
+-- gh-3722: Check that when space:before_replace trigger modifies
+-- the result of a replicated operation, it writes it to the WAL
+-- with the original replica id and lsn.
+--
+_ = box.schema.space.create('test', {engine = engine})
+---
+...
+_ = box.space.test:create_index('primary')
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+_ = box.space.test:before_replace(function(old, new) return new:update{{'+', 2, 1}} end)
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+box.space.test:replace{1, 1}
+---
+- [1, 1]
+...
+_ = test_run:wait_vclock('replica', test_run:get_vclock('default'))
+---
+...
+-- Check that replace{1, 2} coming from the master was suppressed
+-- by the before_replace trigger on the replica.
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select() -- [1, 2]
+---
+- - [1, 2]
+...
+-- Check that master's component of replica's vclock was bumped
+-- so that the replica doesn't apply replace{1, 2} after restart
+-- while syncing with the master.
+test_run:cmd("restart server replica")
+box.space.test:select() -- [1, 2]
+---
+- - [1, 2]
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+test_run:cmd("delete server replica")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+box.space.test:drop()
+---
+...
diff --git a/test/replication/before_replace.test.lua b/test/replication/before_replace.test.lua
index f1e59070..fc444cd3 100644
--- a/test/replication/before_replace.test.lua
+++ b/test/replication/before_replace.test.lua
@@ -3,6 +3,7 @@
 --
 env = require('test_run')
 test_run = env.new()
+engine = test_run:get_cfg('engine')
 
 SERVERS = { 'autobootstrap1', 'autobootstrap2', 'autobootstrap3' }
 
@@ -76,7 +77,46 @@ push_err
 test_run:cmd('restart server autobootstrap3 with args="0.1 0.5"')
 box.space.test:select()
 
-
 -- Cleanup.
 test_run:cmd("switch default")
 test_run:drop_cluster(SERVERS)
+
+--
+-- gh-3722: Check that when space:before_replace trigger modifies
+-- the result of a replicated operation, it writes it to the WAL
+-- with the original replica id and lsn.
+--
+_ = box.schema.space.create('test', {engine = engine})
+_ = box.space.test:create_index('primary')
+
+box.schema.user.grant('guest', 'replication')
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+
+test_run:cmd("switch replica")
+_ = box.space.test:before_replace(function(old, new) return new:update{{'+', 2, 1}} end)
+
+test_run:cmd("switch default")
+box.space.test:replace{1, 1}
+
+_ = test_run:wait_vclock('replica', test_run:get_vclock('default'))
+
+-- Check that replace{1, 2} coming from the master was suppressed
+-- by the before_replace trigger on the replica.
+test_run:cmd("switch replica")
+box.space.test:select() -- [1, 2]
+
+-- Check that master's component of replica's vclock was bumped
+-- so that the replica doesn't apply replace{1, 2} after restart
+-- while syncing with the master.
+test_run:cmd("restart server replica")
+box.space.test:select() -- [1, 2]
+
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+test_run:cmd("delete server replica")
+
+box.schema.user.revoke('guest', 'replication')
+box.space.test:drop()
-- 
2.11.0




More information about the Tarantool-patches mailing list