[patches] [PATCH] [replication] [recovery] recover missing data
Konstantin Osipov
kostja at tarantool.org
Thu Mar 15 17:10:34 MSK 2018
* Konstantin Belyavskiy <k.belyavskiy at tarantool.org> [18/03/15 11:39]:
The patch has to go to 1.6, since that's where the customer
complained about it.
Is it feasible to backport?
A few minor comment below.
> @@ -110,6 +110,8 @@ struct relay {
> struct vclock recv_vclock;
> /** Replicatoin slave version. */
> uint32_t version_id;
> + /** lsn on subscribe, see #3210 */
> + int64_t lsn_on_subscribe;
I think it's *at* subscribe, not *on*. I would also identify
which instance's LSN it is - it's LSN of this instance at
the moment it has issued 'SUBSCRIBE' request.
How about calling the variable 'local_lsn_before_subscribe' and
describing the nature of the variable and the problem in the
comment?
> /** Relay endpoint */
> struct cbus_endpoint endpoint;
> @@ -541,6 +543,7 @@ relay_subscribe(int fd, uint64_t sync, struct replica *replica,
> relay.version_id = replica_version_id;
> relay.replica = replica;
> replica_set_relay(replica, &relay);
> + relay.lsn_on_subscribe = vclock_get(&replicaset.vclock, replica->id);
>
> int rc = cord_costart(&relay.cord, tt_sprintf("relay_%p", &relay),
> relay_subscribe_f, &relay);
> @@ -586,7 +589,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
> * (i.e. don't send replica's own rows back).
> */
> if (relay->replica == NULL ||
> - packet->replica_id != relay->replica->id) {
> + packet->replica_id != relay->replica->id ||
> + packet->lsn <= relay->lsn_on_subscribe) {
I think this change needs a comment.
> relay_send(relay, packet);
> }
> }
> diff --git a/src/box/wal.cc b/src/box/wal.cc
> index 4576cfe09..5c87f08df 100644
> --- a/src/box/wal.cc
> +++ b/src/box/wal.cc
> @@ -768,8 +768,14 @@ wal_write(struct journal *journal, struct journal_entry *entry)
> /*
> * Find last row from local instance id
> * and promote vclock.
> + * In master-master configuration, if some data
> + * are missing on local master, others will send
> + * them back. In this case replicaset vclock is
> + * already set, so we should not update it.
> + * See #3210 for details.
I don't understand this comment, especially why the replicaset
vclock is already set in this case. Could we extend it?
Other than the comments and names of the variables, the patch
looks good to me. Please let's discuss comments on telegram, or
discuss them face to face with @locker.
> */
> - if ((*last)->replica_id == instance_id) {
> + if ((*last)->replica_id == instance_id &&
> + replicaset.vclock.lsn[instance_id] < (*last)->lsn) {
> vclock_follow(&replicaset.vclock, instance_id,
> (*last)->lsn);
> break;
> diff --git a/test/replication/master1.lua b/test/replication/master1.lua
> new file mode 120000
> index 000000000..16399ae47
> --- /dev/null
> +++ b/test/replication/master1.lua
> @@ -0,0 +1 @@
> +master_master.lua
> \ No newline at end of file
> diff --git a/test/replication/master2.lua b/test/replication/master2.lua
> new file mode 120000
> index 000000000..16399ae47
> --- /dev/null
> +++ b/test/replication/master2.lua
> @@ -0,0 +1 @@
> +master_master.lua
> \ No newline at end of file
> diff --git a/test/replication/master_master.lua b/test/replication/master_master.lua
> new file mode 100644
> index 000000000..ff165c367
> --- /dev/null
> +++ b/test/replication/master_master.lua
> @@ -0,0 +1,30 @@
> +#!/usr/bin/env tarantool
> +
> +-- get instance name from filename (master1.lua => master1)
> +local INSTANCE_ID = string.match(arg[0], "%d")
> +local USER = 'cluster'
> +local PASSWORD = 'somepassword'
> +local SOCKET_DIR = require('fio').cwd()
> +local function instance_uri(instance_id)
> + --return 'localhost:'..(3310 + instance_id)
> + return SOCKET_DIR..'/master'..instance_id..'.sock';
> +end
> +
> +-- start console first
> +require('console').listen(os.getenv('ADMIN'))
> +
> +box.cfg({
> + listen = instance_uri(INSTANCE_ID);
> + replication = {
> + USER..':'..PASSWORD..'@'..instance_uri(1);
> + USER..':'..PASSWORD..'@'..instance_uri(2);
> + };
> +})
> +
> +box.once("bootstrap", function()
> + local test_run = require('test_run').new()
> + box.schema.user.create(USER, { password = PASSWORD })
> + box.schema.user.grant(USER, 'replication')
> + box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
> + box.space.test:create_index('primary')
> +end)
> diff --git a/test/replication/recover_missing.result b/test/replication/recover_missing.result
> new file mode 100644
> index 000000000..605232be4
> --- /dev/null
> +++ b/test/replication/recover_missing.result
> @@ -0,0 +1,78 @@
> +env = require('test_run')
> +---
> +...
> +test_run = env.new()
> +---
> +...
> +SERVERS = { 'master1', 'master2' }
> +---
> +...
> +-- Start servers
> +test_run:create_cluster(SERVERS)
> +---
> +...
> +-- Wait for full mesh
> +test_run:wait_fullmesh(SERVERS)
> +---
> +...
> +test_run:cmd("switch master1")
> +---
> +- true
> +...
> +box.space._schema:insert({'1'})
> +---
> +- ['1']
> +...
> +box.space._schema:select('1')
> +---
> +- - ['1']
> +...
> +fiber = require('fiber')
> +---
> +...
> +fiber.sleep(0.1)
> +---
> +...
> +test_run:cmd("switch master2")
> +---
> +- true
> +...
> +box.space._schema:select('1')
> +---
> +- - ['1']
> +...
> +test_run:cmd("stop server master1")
> +---
> +- true
> +...
> +fio = require('fio')
> +---
> +...
> +fio.unlink(fio.pathjoin(fio.abspath("."), string.format('master1/%020d.xlog', 8)))
> +---
> +- true
> +...
> +test_run:cmd("start server master1")
> +---
> +- true
> +...
> +test_run:cmd("switch master1")
> +---
> +- true
> +...
> +box.space._schema:select('1')
> +---
> +- - ['1']
> +...
> +test_run:cmd("switch default")
> +---
> +- true
> +...
> +test_run:cmd("stop server master1")
> +---
> +- true
> +...
> +test_run:cmd("stop server master2")
> +---
> +- true
> +...
> diff --git a/test/replication/recover_missing.test.lua b/test/replication/recover_missing.test.lua
> new file mode 100644
> index 000000000..2f36fc329
> --- /dev/null
> +++ b/test/replication/recover_missing.test.lua
> @@ -0,0 +1,30 @@
> +env = require('test_run')
> +test_run = env.new()
> +
> +SERVERS = { 'master1', 'master2' }
> +-- Start servers
> +test_run:create_cluster(SERVERS)
> +-- Wait for full mesh
> +test_run:wait_fullmesh(SERVERS)
> +
> +test_run:cmd("switch master1")
> +box.space._schema:insert({'1'})
> +box.space._schema:select('1')
> +
> +fiber = require('fiber')
> +fiber.sleep(0.1)
> +
> +test_run:cmd("switch master2")
> +box.space._schema:select('1')
> +test_run:cmd("stop server master1")
> +fio = require('fio')
> +fio.unlink(fio.pathjoin(fio.abspath("."), string.format('master1/%020d.xlog', 8)))
> +test_run:cmd("start server master1")
> +
> +test_run:cmd("switch master1")
> +box.space._schema:select('1')
> +
> +test_run:cmd("switch default")
> +test_run:cmd("stop server master1")
> +test_run:cmd("stop server master2")
> +
> --
> 2.14.3 (Apple Git-98)
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.org - www.twitter.com/kostja_osipov
More information about the Tarantool-patches
mailing list