[patches] [PATCH] [replication] [recovery] recover missing data
Konstantin Belyavskiy
k.belyavskiy at tarantool.org
Tue Mar 13 17:59:04 MSK 2018
Recover missing local data from replica.
If master was restarted and some data missing in xlog, it didn't
recover with old behaviour. Fix it.
Based on @GeorgyKirichenko proposal
Closes #3210
---
branch: gh-3210-recover-missing-local-data-master-master
src/box/relay.cc | 6 ++-
src/box/wal.cc | 7 ++-
test/replication/master1.lua | 1 +
test/replication/master2.lua | 1 +
test/replication/master_master.lua | 30 ++++++++++++
test/replication/recover_missing.result | 78 +++++++++++++++++++++++++++++++
test/replication/recover_missing.test.lua | 30 ++++++++++++
7 files changed, 151 insertions(+), 2 deletions(-)
create mode 120000 test/replication/master1.lua
create mode 120000 test/replication/master2.lua
create mode 100644 test/replication/master_master.lua
create mode 100644 test/replication/recover_missing.result
create mode 100644 test/replication/recover_missing.test.lua
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 2bd05ad5f..fa9c9e205 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -110,6 +110,8 @@ struct relay {
struct vclock recv_vclock;
/** Replicatoin slave version. */
uint32_t version_id;
+ /** lsn on subscribe, see #3210 */
+ int64_t lsn_on_subscr;
/** Relay endpoint */
struct cbus_endpoint endpoint;
@@ -541,6 +543,7 @@ relay_subscribe(int fd, uint64_t sync, struct replica *replica,
relay.version_id = replica_version_id;
relay.replica = replica;
replica_set_relay(replica, &relay);
+ relay.lsn_on_subscr = replicaset.vclock.lsn[replica->id];
int rc = cord_costart(&relay.cord, tt_sprintf("relay_%p", &relay),
relay_subscribe_f, &relay);
@@ -586,7 +589,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
* (i.e. don't send replica's own rows back).
*/
if (relay->replica == NULL ||
- packet->replica_id != relay->replica->id) {
+ packet->replica_id != relay->replica->id ||
+ packet->lsn <= relay->lsn_on_subscr) {
relay_send(relay, packet);
}
}
diff --git a/src/box/wal.cc b/src/box/wal.cc
index 4576cfe09..a28759151 100644
--- a/src/box/wal.cc
+++ b/src/box/wal.cc
@@ -768,8 +768,13 @@ wal_write(struct journal *journal, struct journal_entry *entry)
/*
* Find last row from local instance id
* and promote vclock.
+ * If some data are missing on local master,
+ * others will send them back. In this case
+ * we should not update vclock on replicaset.
+ * See #3210 for details.
*/
- if ((*last)->replica_id == instance_id) {
+ if ((*last)->replica_id == instance_id &&
+ replicaset.vclock.lsn[instance_id] < (*last)->lsn) {
vclock_follow(&replicaset.vclock, instance_id,
(*last)->lsn);
break;
diff --git a/test/replication/master1.lua b/test/replication/master1.lua
new file mode 120000
index 000000000..16399ae47
--- /dev/null
+++ b/test/replication/master1.lua
@@ -0,0 +1 @@
+master_master.lua
\ No newline at end of file
diff --git a/test/replication/master2.lua b/test/replication/master2.lua
new file mode 120000
index 000000000..16399ae47
--- /dev/null
+++ b/test/replication/master2.lua
@@ -0,0 +1 @@
+master_master.lua
\ No newline at end of file
diff --git a/test/replication/master_master.lua b/test/replication/master_master.lua
new file mode 100644
index 000000000..ff165c367
--- /dev/null
+++ b/test/replication/master_master.lua
@@ -0,0 +1,30 @@
+#!/usr/bin/env tarantool
+
+-- get instance name from filename (master1.lua => master1)
+local INSTANCE_ID = string.match(arg[0], "%d")
+local USER = 'cluster'
+local PASSWORD = 'somepassword'
+local SOCKET_DIR = require('fio').cwd()
+local function instance_uri(instance_id)
+ --return 'localhost:'..(3310 + instance_id)
+ return SOCKET_DIR..'/master'..instance_id..'.sock';
+end
+
+-- start console first
+require('console').listen(os.getenv('ADMIN'))
+
+box.cfg({
+ listen = instance_uri(INSTANCE_ID);
+ replication = {
+ USER..':'..PASSWORD..'@'..instance_uri(1);
+ USER..':'..PASSWORD..'@'..instance_uri(2);
+ };
+})
+
+box.once("bootstrap", function()
+ local test_run = require('test_run').new()
+ box.schema.user.create(USER, { password = PASSWORD })
+ box.schema.user.grant(USER, 'replication')
+ box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+ box.space.test:create_index('primary')
+end)
diff --git a/test/replication/recover_missing.result b/test/replication/recover_missing.result
new file mode 100644
index 000000000..605232be4
--- /dev/null
+++ b/test/replication/recover_missing.result
@@ -0,0 +1,78 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+SERVERS = { 'master1', 'master2' }
+---
+...
+-- Start servers
+test_run:create_cluster(SERVERS)
+---
+...
+-- Wait for full mesh
+test_run:wait_fullmesh(SERVERS)
+---
+...
+test_run:cmd("switch master1")
+---
+- true
+...
+box.space._schema:insert({'1'})
+---
+- ['1']
+...
+box.space._schema:select('1')
+---
+- - ['1']
+...
+fiber = require('fiber')
+---
+...
+fiber.sleep(0.1)
+---
+...
+test_run:cmd("switch master2")
+---
+- true
+...
+box.space._schema:select('1')
+---
+- - ['1']
+...
+test_run:cmd("stop server master1")
+---
+- true
+...
+fio = require('fio')
+---
+...
+fio.unlink(fio.pathjoin(fio.abspath("."), string.format('master1/%020d.xlog', 8)))
+---
+- true
+...
+test_run:cmd("start server master1")
+---
+- true
+...
+test_run:cmd("switch master1")
+---
+- true
+...
+box.space._schema:select('1')
+---
+- - ['1']
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server master1")
+---
+- true
+...
+test_run:cmd("stop server master2")
+---
+- true
+...
diff --git a/test/replication/recover_missing.test.lua b/test/replication/recover_missing.test.lua
new file mode 100644
index 000000000..2f36fc329
--- /dev/null
+++ b/test/replication/recover_missing.test.lua
@@ -0,0 +1,30 @@
+env = require('test_run')
+test_run = env.new()
+
+SERVERS = { 'master1', 'master2' }
+-- Start servers
+test_run:create_cluster(SERVERS)
+-- Wait for full mesh
+test_run:wait_fullmesh(SERVERS)
+
+test_run:cmd("switch master1")
+box.space._schema:insert({'1'})
+box.space._schema:select('1')
+
+fiber = require('fiber')
+fiber.sleep(0.1)
+
+test_run:cmd("switch master2")
+box.space._schema:select('1')
+test_run:cmd("stop server master1")
+fio = require('fio')
+fio.unlink(fio.pathjoin(fio.abspath("."), string.format('master1/%020d.xlog', 8)))
+test_run:cmd("start server master1")
+
+test_run:cmd("switch master1")
+box.space._schema:select('1')
+
+test_run:cmd("switch default")
+test_run:cmd("stop server master1")
+test_run:cmd("stop server master2")
+
--
2.14.3 (Apple Git-98)
More information about the Tarantool-patches
mailing list