From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Fri, 31 Aug 2018 16:01:50 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] [PATCH v7 3/3] box: adds replication sync after cfg. update Message-ID: <20180831130150.peiv4sd7vnhxtpe6@esperanza> References: <20180830233804.20727-1-krishtal.olja@gmail.com> <20180830233804.20727-3-krishtal.olja@gmail.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20180830233804.20727-3-krishtal.olja@gmail.com> To: Konstantin Osipov Cc: tarantool-patches@freelists.org, Olga Arkhangelskaia List-ID: I reworked this patch, added a proper test, and forced pushed it to the branch: https://github.com/tarantool/tarantool/commits/OKriw/gh-3427-replication-no-sync-1.9 Kostja, please take a look. The new patch is below: >From 43839be2feb3aad444b44b84076d5f0f6b374b55 Mon Sep 17 00:00:00 2001 From: Vladimir Davydov Date: Fri, 31 Aug 2018 13:11:58 +0300 Subject: [PATCH] box: sync on replication configuration update Now box.cfg() doesn't return until 'quorum' appliers are in sync not only on initial configuration, but also on replication configuration update. If it fails to synchronize within replication_sync_timeout, box.cfg() returns without an error, but the instance enters 'orphan' state, which is basically read-only mode. In the meantime, appliers will keep trying to synchronize in the background, and the instance will leave 'orphan' state as soon as enough appliers are in sync. Note, this patch also changes logging a bit: - 'ready to accept request' is printed on startup before syncing with the replica set, because although the instance is read-only at that time, it can indeed accept all sorts of ro requests. - For 'connecting', 'connected', 'synchronizing' messages, we now use 'info' logging level, not 'verbose' as they used to be, because those messages are important as they give the admin idea what's going on with the instance, and they can't flood logs. - 'sync complete' message is also printed as 'info', not 'crit', because there's nothing critical about it (it's not an error). Also note that we only enter 'orphan' state if failed to synchronize. In particular, if the instnace manages to synchronize with all replicas within a timeout, it will jump from 'loading' straight into 'running' bypassing 'orphan' state. This is done for the sake of consistency between initial configuration and reconfiguration. Closes #3427 @TarantoolBot document Title: Sync on replication configuration update The behavior of box.cfg() on replication configuration update is now consistent with initial configuration, that is box.cfg() will not return until it synchronizes with as many masters as specified by replication_connect_quorum configuration option or the timeout specified by replication_connect_sync occurs. On timeout, it will return without an error, but the instance will enter 'orphan' state. It will leave 'orphan' state as soon as enough appliers have synced. diff --git a/src/box/box.cc b/src/box/box.cc index dcedfd00..63b2974f 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -110,7 +110,7 @@ static fiber_cond ro_cond; * synchronize to a sufficient number of replicas to form * a quorum and so was forced to switch to read-only mode. */ -static bool is_orphan = true; +static bool is_orphan; /* Use the shared instance of xstream for all appliers */ static struct xstream join_stream; @@ -219,16 +219,22 @@ box_wait_ro(bool ro, double timeout) } void -box_clear_orphan(void) +box_set_orphan(bool orphan) { - if (!is_orphan) + if (is_orphan == orphan) return; /* nothing to do */ - is_orphan = false; + is_orphan = orphan; fiber_cond_broadcast(&ro_cond); /* Update the title to reflect the new status. */ - title("running"); + if (is_orphan) { + say_crit("entering orphan mode"); + title("orphan"); + } else { + say_crit("leaving orphan mode"); + title("running"); + } } struct wal_stream { @@ -646,6 +652,8 @@ box_set_replication(void) box_sync_replication(true); /* Follow replica */ replicaset_follow(); + /* Wait until appliers are in sync */ + replicaset_sync(); } void @@ -1893,8 +1901,6 @@ box_cfg_xc(void) /** Begin listening only when the local recovery is complete. */ box_listen(); - title("orphan"); - /* * In case of recovering from a checkpoint we * don't need to wait for 'quorum' masters, since @@ -1913,8 +1919,6 @@ box_cfg_xc(void) */ box_listen(); - title("orphan"); - /* * Wait for the cluster to start up. * @@ -1951,25 +1955,17 @@ box_cfg_xc(void) rmean_cleanup(rmean_box); - /* - * If this instance is a leader of a newly bootstrapped - * cluster, it is uptodate by definition so leave the - * 'orphan' mode right away to let it initialize cluster - * schema. - */ - if (is_bootstrap_leader) - box_clear_orphan(); - /* Follow replica */ replicaset_follow(); fiber_gc(); is_box_configured = true; + title("running"); + say_info("ready to accept requests"); + if (!is_bootstrap_leader) replicaset_sync(); - - say_info("ready to accept requests"); } void diff --git a/src/box/box.h b/src/box/box.h index 6e1c13f5..5e5ec7d3 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -100,12 +100,16 @@ int box_wait_ro(bool ro, double timeout); /** - * Switch this instance from 'orphan' to 'running' state. - * Called on initial configuration as soon as this instance - * synchronizes with enough replicas to form a quorum. + * Switch this instance from 'orphan' to 'running' state or + * vice versa depending on the value of the function argument. + * + * An instance enters 'orphan' state on returning from box.cfg() + * if it failed to synchornize with 'quorum' replicas within a + * specified timeout. It will keep trying to synchronize in the + * background and leave 'orphan' state once it's done. */ void -box_clear_orphan(void); +box_set_orphan(bool orphan); /** True if snapshot is in progress. */ extern bool box_checkpoint_is_in_progress; diff --git a/src/box/replication.cc b/src/box/replication.cc index ff56f442..b0952740 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -548,7 +548,8 @@ replicaset_connect(struct applier **appliers, int count, replicaset_update(appliers, count); return; } - say_verbose("connecting to %d replicas", count); + + say_info("connecting to %d replicas", count); /* * Simultaneously connect to remote peers to receive their UUIDs @@ -602,7 +603,7 @@ replicaset_connect(struct applier **appliers, int count, if (connect_quorum && state.connected < quorum) goto error; } else { - say_verbose("connected to %d replicas", state.connected); + say_info("connected to %d replicas", state.connected); } for (int i = 0; i < count; i++) { @@ -636,13 +637,6 @@ error: void replicaset_follow(void) { - if (replicaset.applier.total == 0) { - /* - * Replication is not configured. - */ - box_clear_orphan(); - return; - } struct replica *replica; replicaset_foreach(replica) { /* Resume connected appliers. */ @@ -653,13 +647,6 @@ replicaset_follow(void) /* Restart appliers that failed to connect. */ applier_start(replica->applier); } - if (replicaset_quorum() == 0) { - /* - * Leaving orphan mode immediately since - * replication_connect_quorum is set to 0. - */ - box_clear_orphan(); - } } void @@ -667,10 +654,16 @@ replicaset_sync(void) { int quorum = replicaset_quorum(); - if (quorum == 0) + if (quorum == 0) { + /* + * Quorum is 0 or replication is not configured. + * Leaving 'orphan' state immediately. + */ + box_set_orphan(false); return; + } - say_verbose("synchronizing with %d replicas", quorum); + say_info("synchronizing with %d replicas", quorum); /* * Wait until all connected replicas synchronize up to @@ -691,22 +684,21 @@ replicaset_sync(void) * Do not stall configuration, leave the instance * in 'orphan' state. */ - say_crit("entering orphan mode"); - return; + say_crit("failed to synchronize with %d out of %d replicas", + replicaset.applier.total - replicaset.applier.synced, + replicaset.applier.total); + box_set_orphan(true); + } else { + say_info("replica set sync complete"); + box_set_orphan(false); } - - say_crit("replica set sync complete, quorum of %d " - "replicas formed", quorum); } void replicaset_check_quorum(void) { - if (replicaset.applier.synced >= replicaset_quorum()) { - if (replicaset_quorum() > 0) - say_crit("leaving orphan mode"); - box_clear_orphan(); - } + if (replicaset.applier.synced >= replicaset_quorum()) + box_set_orphan(false); } void diff --git a/test/replication/sync.result b/test/replication/sync.result new file mode 100644 index 00000000..8994aa3e --- /dev/null +++ b/test/replication/sync.result @@ -0,0 +1,236 @@ +fiber = require('fiber') +--- +... +test_run = require('test_run').new() +--- +... +engine = test_run:get_cfg('engine') +--- +... +box.schema.user.grant('guest', 'replication') +--- +... +_ = box.schema.space.create('test', {engine = engine}) +--- +... +_ = box.space.test:create_index('pk') +--- +... +-- Slow down replication a little to test replication_sync_lag. +box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.001) +--- +- ok +... +-- Helper that adds some records to the space and then starts +-- a fiber to add more records in the background. +test_run:cmd("setopt delimiter ';'") +--- +- true +... +count = 0; +--- +... +function fill() + for i = count + 1, count + 100 do + box.space.test:replace{i} + end + fiber.create(function() + for i = count + 101, count + 200 do + fiber.sleep(0.0001) + box.space.test:replace{i} + end + end) + count = count + 200 +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Deploy a replica. +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +--- +- true +... +test_run:cmd("start server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +fiber = require('fiber') +--- +... +-- Stop replication. +replication = box.cfg.replication +--- +... +box.cfg{replication = {}} +--- +... +-- Fill the space. +test_run:cmd("switch default") +--- +- true +... +fill() +--- +... +test_run:cmd("switch replica") +--- +- true +... +-- Resume replication. +-- +-- Since max allowed lag is small, all recoreds should arrive +-- by the time box.cfg() returns. +-- +box.cfg{replication_sync_lag = 0.001} +--- +... +box.cfg{replication = replication} +--- +... +box.space.test:count() == 200 +--- +- true +... +box.info.status -- running +--- +- running +... +box.info.ro -- false +--- +- false +... +-- Stop replication. +replication = box.cfg.replication +--- +... +box.cfg{replication = {}} +--- +... +-- Fill the space. +test_run:cmd("switch default") +--- +- true +... +fill() +--- +... +test_run:cmd("switch replica") +--- +- true +... +-- Resume replication +-- +-- Since max allowed lag is big, not all records will arrive +-- upon returning from box.cfg() but the instance won't enter +-- orphan state. +-- +box.cfg{replication_sync_lag = 1} +--- +... +box.cfg{replication = replication} +--- +... +box.space.test:count() < 400 +--- +- true +... +box.info.status -- running +--- +- running +... +box.info.ro -- false +--- +- false +... +-- Wait for remaining rows to arrive. +repeat fiber.sleep(0.01) until box.space.test:count() == 400 +--- +... +-- Stop replication. +replication = box.cfg.replication +--- +... +box.cfg{replication = {}} +--- +... +-- Fill the space. +test_run:cmd("switch default") +--- +- true +... +fill() +--- +... +test_run:cmd("switch replica") +--- +- true +... +-- Resume replication +-- +-- Since max allowed lag is big, not all records will arrive +-- upon returning from box.cfg() but the instance won't enter +-- orphan state. +-- +box.cfg{replication_sync_lag = 0.001, replication_sync_timeout = 0.001} +--- +... +box.cfg{replication = replication} +--- +... +box.space.test:count() < 600 +--- +- true +... +box.info.status -- orphan +--- +- orphan +... +box.info.ro -- true +--- +- true +... +-- Wait for remaining rows to arrive. +repeat fiber.sleep(0.01) until box.space.test:count() == 600 +--- +... +-- Make sure replica leaves oprhan state. +repeat fiber.sleep(0.01) until box.info.status ~= 'orphan' +--- +... +box.info.status -- running +--- +- running +... +box.info.ro -- false +--- +- false +... +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... +box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0) +--- +- ok +... +box.space.test:drop() +--- +... +box.schema.user.revoke('guest', 'replication') +--- +... diff --git a/test/replication/sync.test.lua b/test/replication/sync.test.lua new file mode 100644 index 00000000..3cef825d --- /dev/null +++ b/test/replication/sync.test.lua @@ -0,0 +1,116 @@ +fiber = require('fiber') +test_run = require('test_run').new() +engine = test_run:get_cfg('engine') + +box.schema.user.grant('guest', 'replication') +_ = box.schema.space.create('test', {engine = engine}) +_ = box.space.test:create_index('pk') + +-- Slow down replication a little to test replication_sync_lag. +box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.001) + +-- Helper that adds some records to the space and then starts +-- a fiber to add more records in the background. +test_run:cmd("setopt delimiter ';'") +count = 0; +function fill() + for i = count + 1, count + 100 do + box.space.test:replace{i} + end + fiber.create(function() + for i = count + 101, count + 200 do + fiber.sleep(0.0001) + box.space.test:replace{i} + end + end) + count = count + 200 +end; +test_run:cmd("setopt delimiter ''"); + +-- Deploy a replica. +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +test_run:cmd("start server replica") +test_run:cmd("switch replica") + +fiber = require('fiber') + +-- Stop replication. +replication = box.cfg.replication +box.cfg{replication = {}} + +-- Fill the space. +test_run:cmd("switch default") +fill() +test_run:cmd("switch replica") + +-- Resume replication. +-- +-- Since max allowed lag is small, all recoreds should arrive +-- by the time box.cfg() returns. +-- +box.cfg{replication_sync_lag = 0.001} +box.cfg{replication = replication} +box.space.test:count() == 200 +box.info.status -- running +box.info.ro -- false + +-- Stop replication. +replication = box.cfg.replication +box.cfg{replication = {}} + +-- Fill the space. +test_run:cmd("switch default") +fill() +test_run:cmd("switch replica") + +-- Resume replication +-- +-- Since max allowed lag is big, not all records will arrive +-- upon returning from box.cfg() but the instance won't enter +-- orphan state. +-- +box.cfg{replication_sync_lag = 1} +box.cfg{replication = replication} +box.space.test:count() < 400 +box.info.status -- running +box.info.ro -- false + +-- Wait for remaining rows to arrive. +repeat fiber.sleep(0.01) until box.space.test:count() == 400 + +-- Stop replication. +replication = box.cfg.replication +box.cfg{replication = {}} + +-- Fill the space. +test_run:cmd("switch default") +fill() +test_run:cmd("switch replica") + +-- Resume replication +-- +-- Since max allowed lag is big, not all records will arrive +-- upon returning from box.cfg() but the instance won't enter +-- orphan state. +-- +box.cfg{replication_sync_lag = 0.001, replication_sync_timeout = 0.001} +box.cfg{replication = replication} +box.space.test:count() < 600 +box.info.status -- orphan +box.info.ro -- true + +-- Wait for remaining rows to arrive. +repeat fiber.sleep(0.01) until box.space.test:count() == 600 + +-- Make sure replica leaves oprhan state. +repeat fiber.sleep(0.01) until box.info.status ~= 'orphan' +box.info.status -- running +box.info.ro -- false + +test_run:cmd("switch default") +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") + +box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0) +box.space.test:drop() +box.schema.user.revoke('guest', 'replication')