[patches] [PATCH] [replication] [recovery] recover missing data

Konstantin Belyavskiy k.belyavskiy at tarantool.org
Tue Mar 13 17:59:04 MSK 2018


Recover missing local data from replica.
If master was restarted and some data missing in xlog, it didn't
recover with old behaviour. Fix it.
Based on @GeorgyKirichenko proposal

Closes #3210
---
 branch: gh-3210-recover-missing-local-data-master-master
 src/box/relay.cc                          |  6 ++-
 src/box/wal.cc                            |  7 ++-
 test/replication/master1.lua              |  1 +
 test/replication/master2.lua              |  1 +
 test/replication/master_master.lua        | 30 ++++++++++++
 test/replication/recover_missing.result   | 78 +++++++++++++++++++++++++++++++
 test/replication/recover_missing.test.lua | 30 ++++++++++++
 7 files changed, 151 insertions(+), 2 deletions(-)
 create mode 120000 test/replication/master1.lua
 create mode 120000 test/replication/master2.lua
 create mode 100644 test/replication/master_master.lua
 create mode 100644 test/replication/recover_missing.result
 create mode 100644 test/replication/recover_missing.test.lua

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 2bd05ad5f..fa9c9e205 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -110,6 +110,8 @@ struct relay {
 	struct vclock recv_vclock;
 	/** Replicatoin slave version. */
 	uint32_t version_id;
+	/** lsn on subscribe, see #3210 */
+	int64_t lsn_on_subscr;
 
 	/** Relay endpoint */
 	struct cbus_endpoint endpoint;
@@ -541,6 +543,7 @@ relay_subscribe(int fd, uint64_t sync, struct replica *replica,
 	relay.version_id = replica_version_id;
 	relay.replica = replica;
 	replica_set_relay(replica, &relay);
+	relay.lsn_on_subscr = replicaset.vclock.lsn[replica->id];
 
 	int rc = cord_costart(&relay.cord, tt_sprintf("relay_%p", &relay),
 			      relay_subscribe_f, &relay);
@@ -586,7 +589,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 	 * (i.e. don't send replica's own rows back).
 	 */
 	if (relay->replica == NULL ||
-	    packet->replica_id != relay->replica->id) {
+	    packet->replica_id != relay->replica->id ||
+	    packet->lsn <= relay->lsn_on_subscr) {
 		relay_send(relay, packet);
 	}
 }
diff --git a/src/box/wal.cc b/src/box/wal.cc
index 4576cfe09..a28759151 100644
--- a/src/box/wal.cc
+++ b/src/box/wal.cc
@@ -768,8 +768,13 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 			/*
 			 * Find last row from local instance id
 			 * and promote vclock.
+			 * If some data are missing on local master,
+			 * others will send them back. In this case
+			 * we should not update vclock on replicaset.
+			 * See #3210 for details.
 			 */
-			if ((*last)->replica_id == instance_id) {
+			if ((*last)->replica_id == instance_id &&
+			    replicaset.vclock.lsn[instance_id] < (*last)->lsn) {
 				vclock_follow(&replicaset.vclock, instance_id,
 					      (*last)->lsn);
 				break;
diff --git a/test/replication/master1.lua b/test/replication/master1.lua
new file mode 120000
index 000000000..16399ae47
--- /dev/null
+++ b/test/replication/master1.lua
@@ -0,0 +1 @@
+master_master.lua
\ No newline at end of file
diff --git a/test/replication/master2.lua b/test/replication/master2.lua
new file mode 120000
index 000000000..16399ae47
--- /dev/null
+++ b/test/replication/master2.lua
@@ -0,0 +1 @@
+master_master.lua
\ No newline at end of file
diff --git a/test/replication/master_master.lua b/test/replication/master_master.lua
new file mode 100644
index 000000000..ff165c367
--- /dev/null
+++ b/test/replication/master_master.lua
@@ -0,0 +1,30 @@
+#!/usr/bin/env tarantool
+
+-- get instance name from filename (master1.lua => master1)
+local INSTANCE_ID = string.match(arg[0], "%d")
+local USER = 'cluster'
+local PASSWORD = 'somepassword'
+local SOCKET_DIR = require('fio').cwd()
+local function instance_uri(instance_id)
+    --return 'localhost:'..(3310 + instance_id)
+    return SOCKET_DIR..'/master'..instance_id..'.sock';
+end
+
+-- start console first
+require('console').listen(os.getenv('ADMIN'))
+
+box.cfg({
+    listen = instance_uri(INSTANCE_ID);
+    replication = {
+        USER..':'..PASSWORD..'@'..instance_uri(1);
+        USER..':'..PASSWORD..'@'..instance_uri(2);
+    };
+})
+
+box.once("bootstrap", function()
+    local test_run = require('test_run').new()
+    box.schema.user.create(USER, { password = PASSWORD })
+    box.schema.user.grant(USER, 'replication')
+    box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+    box.space.test:create_index('primary')
+end)
diff --git a/test/replication/recover_missing.result b/test/replication/recover_missing.result
new file mode 100644
index 000000000..605232be4
--- /dev/null
+++ b/test/replication/recover_missing.result
@@ -0,0 +1,78 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+SERVERS = { 'master1', 'master2' }
+---
+...
+-- Start servers
+test_run:create_cluster(SERVERS)
+---
+...
+-- Wait for full mesh
+test_run:wait_fullmesh(SERVERS)
+---
+...
+test_run:cmd("switch master1")
+---
+- true
+...
+box.space._schema:insert({'1'})
+---
+- ['1']
+...
+box.space._schema:select('1')
+---
+- - ['1']
+...
+fiber = require('fiber')
+---
+...
+fiber.sleep(0.1)
+---
+...
+test_run:cmd("switch master2")
+---
+- true
+...
+box.space._schema:select('1')
+---
+- - ['1']
+...
+test_run:cmd("stop server master1")
+---
+- true
+...
+fio = require('fio')
+---
+...
+fio.unlink(fio.pathjoin(fio.abspath("."), string.format('master1/%020d.xlog', 8)))
+---
+- true
+...
+test_run:cmd("start server master1")
+---
+- true
+...
+test_run:cmd("switch master1")
+---
+- true
+...
+box.space._schema:select('1')
+---
+- - ['1']
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server master1")
+---
+- true
+...
+test_run:cmd("stop server master2")
+---
+- true
+...
diff --git a/test/replication/recover_missing.test.lua b/test/replication/recover_missing.test.lua
new file mode 100644
index 000000000..2f36fc329
--- /dev/null
+++ b/test/replication/recover_missing.test.lua
@@ -0,0 +1,30 @@
+env = require('test_run')
+test_run = env.new()
+
+SERVERS = { 'master1', 'master2' }
+-- Start servers
+test_run:create_cluster(SERVERS)
+-- Wait for full mesh
+test_run:wait_fullmesh(SERVERS)
+
+test_run:cmd("switch master1")
+box.space._schema:insert({'1'})
+box.space._schema:select('1')
+
+fiber = require('fiber')
+fiber.sleep(0.1)
+
+test_run:cmd("switch master2")
+box.space._schema:select('1')
+test_run:cmd("stop server master1")
+fio = require('fio')
+fio.unlink(fio.pathjoin(fio.abspath("."), string.format('master1/%020d.xlog', 8)))
+test_run:cmd("start server master1")
+
+test_run:cmd("switch master1")
+box.space._schema:select('1')
+
+test_run:cmd("switch default")
+test_run:cmd("stop server master1")
+test_run:cmd("stop server master2")
+
-- 
2.14.3 (Apple Git-98)




More information about the Tarantool-patches mailing list