[Tarantool-patches] [PATCH vshard 1/2] storage: introduce upgrade strategy
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Wed Mar 25 02:44:50 MSK 2020
Hi!
I reworked the patch. The diff is big, so I will send a second version.
Below I also provide diff.
On 24/03/2020 16:21, Yaroslav Dynnikov wrote:
> Hi, Vlad.
>
> I've run my cartridge upgrade tests on your branch (6e50e26c), and it's ok on tarantool 2.2, but fails for 1.10:
> ```
> |
> replica | ApplyConfigError: Space _schema does not support multi-statement transactions
> ||
> replica | stack traceback:
> ||
> replica | ...cartridge/.rocks/share/tarantool/vshard/storage/init.lua:419: in function 'schema_upgrade'
> ||replica | ...cartridge/.rocks/share/tarantool/vshard/storage/init.lua:2336: in function 'cfg'|
> |```|
> |
> |
> Here are results from Gitlab CI: https://gitlab.com/tarantool/cartridge/pipelines/129256300
>
> And here is one more remark about the patch itself.
>
> On Sat, 21 Mar 2020 at 21:59, Vladislav Shpilevoy <v.shpilevoy at tarantool.org <mailto:v.shpilevoy at tarantool.org>> wrote:
>
>
> local function this_is_master()
> @@ -2169,8 +2276,12 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
> error(err)
> end
> log.info <http://log.info>("Box has been configured")
> - local uri = luri.parse(this_replica.uri)
> - box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password)
> + end
> +
> + local uri = luri.parse(this_replica.uri)
> + schema_upgrade(is_master, uri.login, uri.password)
> +
> + if not is_reload then
>
>
> It seems like this `if/else` statement isn't necessary. The `else` branch is enough for both cases.
> Even it's not hot reload it would result in `local old = nil; box.space._bucket:on_replace(new_trigger, nil)` which is essentially the same
Agree, added to the patch.
====================
diff --git a/test/unit/box2.lua b/test/unit/box2.lua
new file mode 120000
index 0000000..1347390
--- /dev/null
+++ b/test/unit/box2.lua
@@ -0,0 +1 @@
+box.lua
\ No newline at end of file
diff --git a/test/unit/upgrade.result b/test/unit/upgrade.result
new file mode 100644
index 0000000..5530746
--- /dev/null
+++ b/test/unit/upgrade.result
@@ -0,0 +1,218 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+-- There is no way to revert a bootstrapped storage manually. The
+-- test is done in a new instance which is killed with all its
+-- data afterwards.
+
+_ = test_run:cmd("create server newdefault with script='unit/box2.lua'")
+ | ---
+ | ...
+_ = test_run:cmd("start server newdefault")
+ | ---
+ | ...
+_ = test_run:switch("newdefault")
+ | ---
+ | ...
+
+util = require('util')
+ | ---
+ | ...
+
+vshard = require('vshard')
+ | ---
+ | ...
+
+upgrade = vshard.storage.internal.schema_upgrade_master
+ | ---
+ | ...
+handlers = vshard.storage.internal.schema_upgrade_handlers
+ | ---
+ | ...
+version_make = vshard.storage.internal.schema_version_make
+ | ---
+ | ...
+curr_version = vshard.storage.internal.schema_current_version
+ | ---
+ | ...
+schema_bootstrap = vshard.storage.internal.schema_bootstrap
+ | ---
+ | ...
+
+user = 'storage'
+ | ---
+ | ...
+password = 'storage'
+ | ---
+ | ...
+
+schema_bootstrap(user, password)
+ | ---
+ | ...
+
+version_make({0, 1, 16, 0})
+ | ---
+ | - '{0.1.16.0}'
+ | ...
+
+versions = {}
+ | ---
+ | ...
+
+_ = test_run:cmd("setopt delimiter ';'")
+ | ---
+ | ...
+for v1 = 1, 3 do
+ for v2 = 1, 3 do
+ for v3 = 1, 3 do
+ for v4 = 1, 3 do
+ table.insert(versions, version_make({v1, v2, v3, v4}))
+ end
+ end
+ end
+end;
+ | ---
+ | ...
+
+err = nil;
+ | ---
+ | ...
+count = 0;
+ | ---
+ | ...
+for i, v1 in pairs(versions) do
+ for j, v2 in pairs(versions) do
+ local v1n = string.format("%s.%s.%s.%s", v1[1], v1[2], v1[3], v1[4])
+ local v2n = string.format("%s.%s.%s.%s", v2[1], v2[2], v2[3], v2[4])
+ count = count + 1
+ if ((v1 == v2) ~= (v1n == v2n)) or ((v1 ~= v2) ~= (v1n ~= v2n)) or
+ ((v1 <= v2) ~= (v1n <= v2n)) or ((v1 < v2) ~= (v1n < v2n)) or
+ ((v1 >= v2) ~= (v1n >= v2n)) or ((v1 > v2) ~= (v1n > v2n)) then
+ err = {i, j}
+ break
+ end
+ end
+ if err then
+ break
+ end
+end;
+ | ---
+ | ...
+err;
+ | ---
+ | - null
+ | ...
+count;
+ | ---
+ | - 6561
+ | ...
+
+function stat_collect()
+ return {
+ func_count = box.space._func:count(),
+ space_count = box.space._space:count(),
+ user_count = box.space._user:count(),
+ schema_count = box.space._schema:count(),
+ version = curr_version(),
+ }
+end;
+ | ---
+ | ...
+
+function stat_diff(s1, s2)
+ local res = {}
+ if s2.func_count ~= s1.func_count then
+ res.func_count = s2.func_count - s1.func_count
+ end
+ if s2.space_count ~= s1.space_count then
+ res.space_count = s2.space_count - s1.space_count
+ end
+ if s2.user_count ~= s1.user_count then
+ res.user_count = s2.user_count - s1.user_count
+ end
+ if s2.schema_count ~= s1.schema_count then
+ res.schema_count = s2.schema_count - s1.schema_count
+ end
+ if s1.version ~= s2.version then
+ res.old_version = s1.version
+ res.new_version = s2.version
+ end
+ return res
+end;
+ | ---
+ | ...
+
+stat = stat_collect();
+ | ---
+ | ...
+
+function stat_update()
+ local new_stat = stat_collect()
+ local diff = stat_diff(stat, new_stat)
+ stat = new_stat
+ return diff
+end;
+ | ---
+ | ...
+
+upgrade_trace = {};
+ | ---
+ | ...
+errinj = vshard.storage.internal.errinj;
+ | ---
+ | ...
+
+for _, handler in pairs(handlers) do
+ table.insert(upgrade_trace, string.format('Upgrade to %s', handler.version))
+
+ table.insert(upgrade_trace, 'Errinj in begin')
+ errinj.ERRINJ_UPGRADE = 'begin'
+ table.insert(upgrade_trace,
+ {util.check_error(upgrade, handler.version, user, password)})
+ table.insert(upgrade_trace, {diff = stat_update()})
+
+ table.insert(upgrade_trace, 'Errinj in end')
+ errinj.ERRINJ_UPGRADE = 'end'
+ table.insert(upgrade_trace,
+ {util.check_error(upgrade, handler.version, user, password)})
+ table.insert(upgrade_trace, {diff = stat_update()})
+
+ table.insert(upgrade_trace, 'No errinj')
+ errinj.ERRINJ_UPGRADE = nil
+ table.insert(upgrade_trace,
+ {pcall(upgrade, handler.version, user, password)})
+ table.insert(upgrade_trace, {diff = stat_update()})
+end;
+ | ---
+ | ...
+
+_ = test_run:cmd("setopt delimiter ''");
+ | ---
+ | ...
+
+upgrade_trace
+ | ---
+ | - - Upgrade to {0.1.16.0}
+ | - Errinj in begin
+ | - - Errinj in begin
+ | - diff: []
+ | - Errinj in end
+ | - - Errinj in end
+ | - diff: []
+ | - No errinj
+ | - - true
+ | - diff:
+ | new_version: '{0.1.16.0}'
+ | old_version: '{0.1.15.0}'
+ | ...
+
+_ = test_run:switch("default")
+ | ---
+ | ...
+_ = test_run:cmd("stop server newdefault")
+ | ---
+ | ...
+_ = test_run:cmd("cleanup server newdefault")
+ | ---
+ | ...
+_ = test_run:cmd("delete server newdefault")
+ | ---
+ | ...
diff --git a/test/unit/upgrade.test.lua b/test/unit/upgrade.test.lua
new file mode 100644
index 0000000..9bbfd84
--- /dev/null
+++ b/test/unit/upgrade.test.lua
@@ -0,0 +1,133 @@
+test_run = require('test_run').new()
+
+-- There is no way to revert a bootstrapped storage manually. The
+-- test is done in a new instance which is killed with all its
+-- data afterwards.
+
+_ = test_run:cmd("create server newdefault with script='unit/box2.lua'")
+_ = test_run:cmd("start server newdefault")
+_ = test_run:switch("newdefault")
+
+util = require('util')
+
+vshard = require('vshard')
+
+upgrade = vshard.storage.internal.schema_upgrade_master
+handlers = vshard.storage.internal.schema_upgrade_handlers
+version_make = vshard.storage.internal.schema_version_make
+curr_version = vshard.storage.internal.schema_current_version
+schema_bootstrap = vshard.storage.internal.schema_bootstrap
+
+user = 'storage'
+password = 'storage'
+
+schema_bootstrap(user, password)
+
+version_make({0, 1, 16, 0})
+
+versions = {}
+
+_ = test_run:cmd("setopt delimiter ';'")
+for v1 = 1, 3 do
+ for v2 = 1, 3 do
+ for v3 = 1, 3 do
+ for v4 = 1, 3 do
+ table.insert(versions, version_make({v1, v2, v3, v4}))
+ end
+ end
+ end
+end;
+
+err = nil;
+count = 0;
+for i, v1 in pairs(versions) do
+ for j, v2 in pairs(versions) do
+ local v1n = string.format("%s.%s.%s.%s", v1[1], v1[2], v1[3], v1[4])
+ local v2n = string.format("%s.%s.%s.%s", v2[1], v2[2], v2[3], v2[4])
+ count = count + 1
+ if ((v1 == v2) ~= (v1n == v2n)) or ((v1 ~= v2) ~= (v1n ~= v2n)) or
+ ((v1 <= v2) ~= (v1n <= v2n)) or ((v1 < v2) ~= (v1n < v2n)) or
+ ((v1 >= v2) ~= (v1n >= v2n)) or ((v1 > v2) ~= (v1n > v2n)) then
+ err = {i, j}
+ break
+ end
+ end
+ if err then
+ break
+ end
+end;
+err;
+count;
+
+function stat_collect()
+ return {
+ func_count = box.space._func:count(),
+ space_count = box.space._space:count(),
+ user_count = box.space._user:count(),
+ schema_count = box.space._schema:count(),
+ version = curr_version(),
+ }
+end;
+
+function stat_diff(s1, s2)
+ local res = {}
+ if s2.func_count ~= s1.func_count then
+ res.func_count = s2.func_count - s1.func_count
+ end
+ if s2.space_count ~= s1.space_count then
+ res.space_count = s2.space_count - s1.space_count
+ end
+ if s2.user_count ~= s1.user_count then
+ res.user_count = s2.user_count - s1.user_count
+ end
+ if s2.schema_count ~= s1.schema_count then
+ res.schema_count = s2.schema_count - s1.schema_count
+ end
+ if s1.version ~= s2.version then
+ res.old_version = s1.version
+ res.new_version = s2.version
+ end
+ return res
+end;
+
+stat = stat_collect();
+
+function stat_update()
+ local new_stat = stat_collect()
+ local diff = stat_diff(stat, new_stat)
+ stat = new_stat
+ return diff
+end;
+
+upgrade_trace = {};
+errinj = vshard.storage.internal.errinj;
+
+for _, handler in pairs(handlers) do
+ table.insert(upgrade_trace, string.format('Upgrade to %s', handler.version))
+
+ table.insert(upgrade_trace, 'Errinj in begin')
+ errinj.ERRINJ_UPGRADE = 'begin'
+ table.insert(upgrade_trace,
+ {util.check_error(upgrade, handler.version, user, password)})
+ table.insert(upgrade_trace, {diff = stat_update()})
+
+ table.insert(upgrade_trace, 'Errinj in end')
+ errinj.ERRINJ_UPGRADE = 'end'
+ table.insert(upgrade_trace,
+ {util.check_error(upgrade, handler.version, user, password)})
+ table.insert(upgrade_trace, {diff = stat_update()})
+
+ table.insert(upgrade_trace, 'No errinj')
+ errinj.ERRINJ_UPGRADE = nil
+ table.insert(upgrade_trace,
+ {pcall(upgrade, handler.version, user, password)})
+ table.insert(upgrade_trace, {diff = stat_update()})
+end;
+
+_ = test_run:cmd("setopt delimiter ''");
+
+upgrade_trace
+
+_ = test_run:switch("default")
+_ = test_run:cmd("stop server newdefault")
+_ = test_run:cmd("cleanup server newdefault")
+_ = test_run:cmd("delete server newdefault")
diff --git a/test/upgrade/upgrade.result b/test/upgrade/upgrade.result
index e21d353..4157b08 100644
--- a/test/upgrade/upgrade.result
+++ b/test/upgrade/upgrade.result
@@ -145,14 +145,11 @@ box.space._schema:get({'vshard_version'})
| ...
vshard.storage.internal.schema_current_version()
| ---
- | - [0, 1, 16, 0]
+ | - '{0.1.16.0}'
| ...
vshard.storage.internal.schema_latest_version
| ---
- | - - 0
- | - 1
- | - 16
- | - 0
+ | - '{0.1.16.0}'
| ...
test_run:switch('storage_1_b')
@@ -165,14 +162,11 @@ box.space._schema:get({'vshard_version'})
| ...
vshard.storage.internal.schema_current_version()
| ---
- | - [0, 1, 16, 0]
+ | - '{0.1.16.0}'
| ...
vshard.storage.internal.schema_latest_version
| ---
- | - - 0
- | - 1
- | - 16
- | - 0
+ | - '{0.1.16.0}'
| ...
test_run:switch('default')
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 630d1ff..ebc6af1 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -74,6 +74,7 @@ if not M then
ERRINJ_LAST_RECEIVE_DELAY = false,
ERRINJ_RECEIVE_PARTIALLY = false,
ERRINJ_NO_RECOVERY = false,
+ ERRINJ_UPGRADE = false,
},
-- This counter is used to restart background fibers with
-- new reloaded code.
@@ -258,6 +259,33 @@ end
-- Schema
--------------------------------------------------------------------------------
+local schema_version_mt = {
+ __tostring = function(self)
+ return string.format('{%s}', table.concat(self, '.'))
+ end,
+ __serialize = function(self)
+ return tostring(self)
+ end,
+ __eq = function(l, r)
+ return l[1] == r[1] and l[2] == r[2] and l[3] == r[3] and l[4] == r[4]
+ end,
+ __lt = function(l, r)
+ for i = 1, 4 do
+ local diff = l[i] - r[i]
+ if diff < 0 then
+ return true
+ elseif diff > 0 then
+ return false
+ end
+ end
+ return false;
+ end,
+}
+
+local function schema_version_make(ver)
+ return setmetatable(ver, schema_version_mt)
+end
+
-- VShard versioning works in 4 numbers: major, minor, patch, and
-- a last helper number incremented on every schema change, if
-- first 3 numbers stay not changed. That happens when users take
@@ -267,7 +295,7 @@ end
-- The schema first time appeared with 0.1.16. So this function
-- describes schema before that - 0.1.15.
local function schema_init_0_1_15_0(username, password)
- log.info("Initializing schema")
+ log.info("Initializing schema %s", schema_version_make({0, 1, 15, 0}))
box.schema.user.create(username, {
password = password,
if_not_exists = true,
@@ -309,36 +337,31 @@ local function schema_init_0_1_15_0(username, password)
box.space._schema:replace({'vshard_version', 0, 1, 15, 0})
end
-local function schema_compare_versions(v1, v2)
- for i = 1, 4 do
- if v1[i] ~= v2[i] then
- return v1[i] - v2[i]
- end
- end
- return 0
-end
-
local function schema_upgrade_to_0_1_16_0(username)
-- Since 0.1.16.0 the old versioning by
-- 'oncevshard:storage:<number>' is dropped because it is not
-- really extendible nor understandable.
- log.info('Insert vshard_version into _schema')
- box.begin()
+ log.info("Insert 'vshard_version' into _schema")
box.space._schema:replace({'vshard_version', 0, 1, 16, 0})
box.space._schema:delete({'oncevshard:storage:1'})
- box.commit()
+end
+
+local function schema_downgrade_from_0_1_16_0()
+ log.info("Remove 'vshard_version' from _schema")
+ box.space._schema:replace({'oncevshard:storage:1'})
+ box.space._schema:delete({'vshard_version'})
end
local function schema_current_version()
local version = box.space._schema:get({'vshard_version'})
if version == nil then
- return {0, 1, 15, 0}
+ return schema_version_make({0, 1, 15, 0})
else
- return version:totable(2)
+ return schema_version_make(version:totable(2))
end
end
-local schema_latest_version = {0, 1, 16, 0}
+local schema_latest_version = schema_version_make({0, 1, 16, 0})
local function schema_upgrade_replica()
local version = schema_current_version()
@@ -354,11 +377,10 @@ local function schema_upgrade_replica()
-- For example, from 1.2.3.4 to 1.7.8.9, when it is assumed
-- that a safe upgrade should go 1.2.3.4 -> 1.2.4.1 ->
-- 1.3.1.1 and so on step by step.
- if schema_compare_versions(version, schema_latest_version) ~= 0 then
+ if version ~= schema_latest_version then
log.info('Replica\' vshard schema version is not latest - current '..
- '%s vs latest %s, but the replica still can work',
- table.concat(version, '.'),
- table.concat(schema_latest_version, '.'))
+ '%s vs latest %s, but the replica still can work', version,
+ schema_latest_version)
end
-- In future for hard changes the replica may be suspended
-- until its schema is synced with master. Or it may
@@ -374,10 +396,14 @@ end
-- prohibit yields, in case the upgrade, for example, affects huge
-- number of tuples (_bucket records, maybe).
local schema_upgrade_handlers = {
- {version = {0, 1, 16, 0}, func = schema_upgrade_to_0_1_16_0},
+ {
+ version = schema_version_make({0, 1, 16, 0}),
+ upgrade = schema_upgrade_to_0_1_16_0,
+ downgrade = schema_downgrade_from_0_1_16_0
+ },
}
-local function schema_upgrade_master(username, password)
+local function schema_upgrade_master(target_version, username, password)
local _schema = box.space._schema
local is_old_versioning = _schema:get({'oncevshard:storage:1'}) ~= nil
local version = schema_current_version()
@@ -386,39 +412,60 @@ local function schema_upgrade_master(username, password)
if is_bootstrap then
schema_init_0_1_15_0(username, password)
elseif is_old_versioning then
- log.info('The instance does not have vshard_version record. '..
- 'It is 0.1.15.0.')
- end
- local handler_latest_version =
- schema_upgrade_handlers[#schema_upgrade_handlers].version
- assert(schema_compare_versions(handler_latest_version,
- schema_latest_version) == 0)
+ log.info("The instance does not have 'vshard_version' record. "..
+ "It is 0.1.15.0.")
+ end
+ assert(schema_upgrade_handlers[#schema_upgrade_handlers].version ==
+ schema_latest_version)
+ local prev_version = version
+ local ok, err1, err2
+ local errinj = M.errinj.ERRINJ_UPGRADE
for _, handler in ipairs(schema_upgrade_handlers) do
local next_version = handler.version
- if schema_compare_versions(next_version, version) > 0 then
- local next_version_str = table.concat(next_version, '.')
- log.info("Upgrade vshard to {%s}", next_version_str)
- local ok, err = pcall(handler.func, username, password)
+ if next_version > target_version then
+ break
+ end
+ if next_version > version then
+ log.info("Upgrade vshard schema to %s", next_version)
+ if errinj == 'begin' then
+ ok, err1 = false, 'Errinj in begin'
+ else
+ ok, err1 = pcall(handler.upgrade, username)
+ if ok and errinj == 'end' then
+ ok, err1 = false, 'Errinj in end'
+ end
+ end
if not ok then
+ -- Rollback in case the handler started a
+ -- transaction before the exception.
box.rollback()
- log.info("Couldn't upgrade to {%s}: %s", next_version_str, err)
- error(err)
+ log.info("Couldn't upgrade schema to %s: '%s'. Revert to %s",
+ next_version, err1, prev_version)
+ ok, err2 = pcall(handler.downgrade)
+ if not ok then
+ log.info("Couldn't downgrade schema to %s - fatal error: "..
+ "'%s'", prev_version, err2)
+ os.exit(-1)
+ end
+ error(err1)
end
- log.info("Successful vshard upgrade to {%s}", next_version_str)
- ok, err = pcall(_schema.replace, _schema,
- {'vshard_version', unpack(next_version)})
+ ok, err1 = pcall(_schema.replace, _schema,
+ {'vshard_version', unpack(next_version)})
if not ok then
- log.info("Upgraded to {%s} but couldn't update _schema "..
- "'vshard_version' - fatal error: %s", err)
+ log.info("Upgraded schema to %s but couldn't update _schema "..
+ "'vshard_version' - fatal error: '%s'", next_version,
+ err1)
os.exit(-1)
end
+ log.info("Successful vshard schema upgrade to %s", next_version)
end
+ prev_version = next_version
end
end
local function schema_upgrade(is_master, username, password)
if is_master then
- return schema_upgrade_master(username, password)
+ return schema_upgrade_master(schema_latest_version, username, password)
else
return schema_upgrade_replica()
end
@@ -2298,12 +2345,8 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
local uri = luri.parse(this_replica.uri)
schema_upgrade(is_master, uri.login, uri.password)
- if not is_reload then
- box.space._bucket:on_replace(bucket_generation_increment)
- else
- local old = box.space._bucket:on_replace()[1]
- box.space._bucket:on_replace(bucket_generation_increment, old)
- end
+ local old_trigger = box.space._bucket:on_replace()[1]
+ box.space._bucket:on_replace(bucket_generation_increment, old_trigger)
lreplicaset.rebind_replicasets(new_replicasets, M.replicasets)
lreplicaset.outdate_replicasets(M.replicasets)
@@ -2599,6 +2642,10 @@ M.rlist = {
}
M.schema_latest_version = schema_latest_version
M.schema_current_version = schema_current_version
+M.schema_upgrade_master = schema_upgrade_master
+M.schema_upgrade_handlers = schema_upgrade_handlers
+M.schema_version_make = schema_version_make
+M.schema_bootstrap = schema_init_0_1_15_0
return {
sync = sync,
More information about the Tarantool-patches
mailing list