[Tarantool-patches] [PATCH vshard 1/2] storage: introduce upgrade strategy

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Mar 25 02:44:50 MSK 2020


Hi!

I reworked the patch. The diff is big, so I will send a second version.
Below I also provide diff.

On 24/03/2020 16:21, Yaroslav Dynnikov wrote:
> Hi, Vlad.
> 
> I've run my cartridge upgrade tests on your branch (6e50e26c), and it's ok on tarantool 2.2, but fails for 1.10:
> ```
> |
> replica | ApplyConfigError: Space _schema does not support multi-statement transactions
> ||
> replica | stack traceback:
> ||
> replica | ...cartridge/.rocks/share/tarantool/vshard/storage/init.lua:419: in function 'schema_upgrade'
> ||replica | ...cartridge/.rocks/share/tarantool/vshard/storage/init.lua:2336: in function 'cfg'|
> |```|
> |
> |
> Here are results from Gitlab CI: https://gitlab.com/tarantool/cartridge/pipelines/129256300
> 
> And here is one more remark about the patch itself.
> 
> On Sat, 21 Mar 2020 at 21:59, Vladislav Shpilevoy <v.shpilevoy at tarantool.org <mailto:v.shpilevoy at tarantool.org>> wrote:
> 
> 
>      local function this_is_master()
>     @@ -2169,8 +2276,12 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
>                  error(err)
>              end
>              log.info <http://log.info>("Box has been configured")
>     -        local uri = luri.parse(this_replica.uri)
>     -        box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password)
>     +    end
>     +
>     +    local uri = luri.parse(this_replica.uri)
>     +    schema_upgrade(is_master, uri.login, uri.password)
>     +
>     +    if not is_reload then
> 
> 
> It seems like this `if/else` statement isn't necessary. The `else` branch is enough for both cases.
> Even it's not hot reload it would result in `local old = nil; box.space._bucket:on_replace(new_trigger, nil)` which is essentially the same

Agree, added to the patch.

====================

diff --git a/test/unit/box2.lua b/test/unit/box2.lua
new file mode 120000
index 0000000..1347390
--- /dev/null
+++ b/test/unit/box2.lua
@@ -0,0 +1 @@
+box.lua
\ No newline at end of file
diff --git a/test/unit/upgrade.result b/test/unit/upgrade.result
new file mode 100644
index 0000000..5530746
--- /dev/null
+++ b/test/unit/upgrade.result
@@ -0,0 +1,218 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+-- There is no way to revert a bootstrapped storage manually. The
+-- test is done in a new instance which is killed with all its
+-- data afterwards.
+
+_ = test_run:cmd("create server newdefault with script='unit/box2.lua'")
+ | ---
+ | ...
+_ = test_run:cmd("start server newdefault")
+ | ---
+ | ...
+_ = test_run:switch("newdefault")
+ | ---
+ | ...
+
+util = require('util')
+ | ---
+ | ...
+
+vshard = require('vshard')
+ | ---
+ | ...
+
+upgrade = vshard.storage.internal.schema_upgrade_master
+ | ---
+ | ...
+handlers = vshard.storage.internal.schema_upgrade_handlers
+ | ---
+ | ...
+version_make = vshard.storage.internal.schema_version_make
+ | ---
+ | ...
+curr_version = vshard.storage.internal.schema_current_version
+ | ---
+ | ...
+schema_bootstrap = vshard.storage.internal.schema_bootstrap
+ | ---
+ | ...
+
+user = 'storage'
+ | ---
+ | ...
+password = 'storage'
+ | ---
+ | ...
+
+schema_bootstrap(user, password)
+ | ---
+ | ...
+
+version_make({0, 1, 16, 0})
+ | ---
+ | - '{0.1.16.0}'
+ | ...
+
+versions = {}
+ | ---
+ | ...
+
+_ = test_run:cmd("setopt delimiter ';'")
+ | ---
+ | ...
+for v1 = 1, 3 do
+    for v2 = 1, 3 do
+        for v3 = 1, 3 do
+            for v4 = 1, 3 do
+                table.insert(versions, version_make({v1, v2, v3, v4}))
+            end
+        end
+    end
+end;
+ | ---
+ | ...
+
+err = nil;
+ | ---
+ | ...
+count = 0;
+ | ---
+ | ...
+for i, v1 in pairs(versions) do
+    for j, v2 in pairs(versions) do
+        local v1n = string.format("%s.%s.%s.%s", v1[1], v1[2], v1[3], v1[4])
+        local v2n = string.format("%s.%s.%s.%s", v2[1], v2[2], v2[3], v2[4])
+        count = count + 1
+        if ((v1 == v2) ~= (v1n == v2n)) or ((v1 ~= v2) ~= (v1n ~= v2n)) or
+            ((v1 <= v2) ~= (v1n <= v2n)) or ((v1 < v2) ~= (v1n < v2n)) or
+            ((v1 >= v2) ~= (v1n >= v2n)) or ((v1 > v2) ~= (v1n > v2n)) then
+            err = {i, j}
+            break
+        end
+    end
+    if err then
+        break
+    end
+end;
+ | ---
+ | ...
+err;
+ | ---
+ | - null
+ | ...
+count;
+ | ---
+ | - 6561
+ | ...
+
+function stat_collect()
+    return {
+        func_count = box.space._func:count(),
+        space_count = box.space._space:count(),
+        user_count = box.space._user:count(),
+        schema_count = box.space._schema:count(),
+        version = curr_version(),
+    }
+end;
+ | ---
+ | ...
+
+function stat_diff(s1, s2)
+    local res = {}
+    if s2.func_count ~= s1.func_count then
+        res.func_count = s2.func_count - s1.func_count
+    end
+    if s2.space_count ~= s1.space_count then
+        res.space_count = s2.space_count - s1.space_count
+    end
+    if s2.user_count ~= s1.user_count then
+        res.user_count = s2.user_count - s1.user_count
+    end
+    if s2.schema_count ~= s1.schema_count then
+        res.schema_count = s2.schema_count - s1.schema_count
+    end
+    if s1.version ~= s2.version then
+        res.old_version = s1.version
+        res.new_version = s2.version
+    end
+    return res
+end;
+ | ---
+ | ...
+
+stat = stat_collect();
+ | ---
+ | ...
+
+function stat_update()
+    local new_stat = stat_collect()
+    local diff = stat_diff(stat, new_stat)
+    stat = new_stat
+    return diff
+end;
+ | ---
+ | ...
+
+upgrade_trace = {};
+ | ---
+ | ...
+errinj = vshard.storage.internal.errinj;
+ | ---
+ | ...
+
+for _, handler in pairs(handlers) do
+    table.insert(upgrade_trace, string.format('Upgrade to %s', handler.version))
+
+    table.insert(upgrade_trace, 'Errinj in begin')
+    errinj.ERRINJ_UPGRADE = 'begin'
+    table.insert(upgrade_trace,
+                 {util.check_error(upgrade, handler.version, user, password)})
+    table.insert(upgrade_trace, {diff = stat_update()})
+
+    table.insert(upgrade_trace, 'Errinj in end')
+    errinj.ERRINJ_UPGRADE = 'end'
+    table.insert(upgrade_trace,
+                 {util.check_error(upgrade, handler.version, user, password)})
+    table.insert(upgrade_trace, {diff = stat_update()})
+
+    table.insert(upgrade_trace, 'No errinj')
+    errinj.ERRINJ_UPGRADE = nil
+    table.insert(upgrade_trace,
+                 {pcall(upgrade, handler.version, user, password)})
+    table.insert(upgrade_trace, {diff = stat_update()})
+end;
+ | ---
+ | ...
+
+_ = test_run:cmd("setopt delimiter ''");
+ | ---
+ | ...
+
+upgrade_trace
+ | ---
+ | - - Upgrade to {0.1.16.0}
+ |   - Errinj in begin
+ |   - - Errinj in begin
+ |   - diff: []
+ |   - Errinj in end
+ |   - - Errinj in end
+ |   - diff: []
+ |   - No errinj
+ |   - - true
+ |   - diff:
+ |       new_version: '{0.1.16.0}'
+ |       old_version: '{0.1.15.0}'
+ | ...
+
+_ = test_run:switch("default")
+ | ---
+ | ...
+_ = test_run:cmd("stop server newdefault")
+ | ---
+ | ...
+_ = test_run:cmd("cleanup server newdefault")
+ | ---
+ | ...
+_ = test_run:cmd("delete server newdefault")
+ | ---
+ | ...
diff --git a/test/unit/upgrade.test.lua b/test/unit/upgrade.test.lua
new file mode 100644
index 0000000..9bbfd84
--- /dev/null
+++ b/test/unit/upgrade.test.lua
@@ -0,0 +1,133 @@
+test_run = require('test_run').new()
+
+-- There is no way to revert a bootstrapped storage manually. The
+-- test is done in a new instance which is killed with all its
+-- data afterwards.
+
+_ = test_run:cmd("create server newdefault with script='unit/box2.lua'")
+_ = test_run:cmd("start server newdefault")
+_ = test_run:switch("newdefault")
+
+util = require('util')
+
+vshard = require('vshard')
+
+upgrade = vshard.storage.internal.schema_upgrade_master
+handlers = vshard.storage.internal.schema_upgrade_handlers
+version_make = vshard.storage.internal.schema_version_make
+curr_version = vshard.storage.internal.schema_current_version
+schema_bootstrap = vshard.storage.internal.schema_bootstrap
+
+user = 'storage'
+password = 'storage'
+
+schema_bootstrap(user, password)
+
+version_make({0, 1, 16, 0})
+
+versions = {}
+
+_ = test_run:cmd("setopt delimiter ';'")
+for v1 = 1, 3 do
+    for v2 = 1, 3 do
+        for v3 = 1, 3 do
+            for v4 = 1, 3 do
+                table.insert(versions, version_make({v1, v2, v3, v4}))
+            end
+        end
+    end
+end;
+
+err = nil;
+count = 0;
+for i, v1 in pairs(versions) do
+    for j, v2 in pairs(versions) do
+        local v1n = string.format("%s.%s.%s.%s", v1[1], v1[2], v1[3], v1[4])
+        local v2n = string.format("%s.%s.%s.%s", v2[1], v2[2], v2[3], v2[4])
+        count = count + 1
+        if ((v1 == v2) ~= (v1n == v2n)) or ((v1 ~= v2) ~= (v1n ~= v2n)) or
+            ((v1 <= v2) ~= (v1n <= v2n)) or ((v1 < v2) ~= (v1n < v2n)) or
+            ((v1 >= v2) ~= (v1n >= v2n)) or ((v1 > v2) ~= (v1n > v2n)) then
+            err = {i, j}
+            break
+        end
+    end
+    if err then
+        break
+    end
+end;
+err;
+count;
+
+function stat_collect()
+    return {
+        func_count = box.space._func:count(),
+        space_count = box.space._space:count(),
+        user_count = box.space._user:count(),
+        schema_count = box.space._schema:count(),
+        version = curr_version(),
+    }
+end;
+
+function stat_diff(s1, s2)
+    local res = {}
+    if s2.func_count ~= s1.func_count then
+        res.func_count = s2.func_count - s1.func_count
+    end
+    if s2.space_count ~= s1.space_count then
+        res.space_count = s2.space_count - s1.space_count
+    end
+    if s2.user_count ~= s1.user_count then
+        res.user_count = s2.user_count - s1.user_count
+    end
+    if s2.schema_count ~= s1.schema_count then
+        res.schema_count = s2.schema_count - s1.schema_count
+    end
+    if s1.version ~= s2.version then
+        res.old_version = s1.version
+        res.new_version = s2.version
+    end
+    return res
+end;
+
+stat = stat_collect();
+
+function stat_update()
+    local new_stat = stat_collect()
+    local diff = stat_diff(stat, new_stat)
+    stat = new_stat
+    return diff
+end;
+
+upgrade_trace = {};
+errinj = vshard.storage.internal.errinj;
+
+for _, handler in pairs(handlers) do
+    table.insert(upgrade_trace, string.format('Upgrade to %s', handler.version))
+
+    table.insert(upgrade_trace, 'Errinj in begin')
+    errinj.ERRINJ_UPGRADE = 'begin'
+    table.insert(upgrade_trace,
+                 {util.check_error(upgrade, handler.version, user, password)})
+    table.insert(upgrade_trace, {diff = stat_update()})
+
+    table.insert(upgrade_trace, 'Errinj in end')
+    errinj.ERRINJ_UPGRADE = 'end'
+    table.insert(upgrade_trace,
+                 {util.check_error(upgrade, handler.version, user, password)})
+    table.insert(upgrade_trace, {diff = stat_update()})
+
+    table.insert(upgrade_trace, 'No errinj')
+    errinj.ERRINJ_UPGRADE = nil
+    table.insert(upgrade_trace,
+                 {pcall(upgrade, handler.version, user, password)})
+    table.insert(upgrade_trace, {diff = stat_update()})
+end;
+
+_ = test_run:cmd("setopt delimiter ''");
+
+upgrade_trace
+
+_ = test_run:switch("default")
+_ = test_run:cmd("stop server newdefault")
+_ = test_run:cmd("cleanup server newdefault")
+_ = test_run:cmd("delete server newdefault")
diff --git a/test/upgrade/upgrade.result b/test/upgrade/upgrade.result
index e21d353..4157b08 100644
--- a/test/upgrade/upgrade.result
+++ b/test/upgrade/upgrade.result
@@ -145,14 +145,11 @@ box.space._schema:get({'vshard_version'})
  | ...
 vshard.storage.internal.schema_current_version()
  | ---
- | - [0, 1, 16, 0]
+ | - '{0.1.16.0}'
  | ...
 vshard.storage.internal.schema_latest_version
  | ---
- | - - 0
- |   - 1
- |   - 16
- |   - 0
+ | - '{0.1.16.0}'
  | ...
 
 test_run:switch('storage_1_b')
@@ -165,14 +162,11 @@ box.space._schema:get({'vshard_version'})
  | ...
 vshard.storage.internal.schema_current_version()
  | ---
- | - [0, 1, 16, 0]
+ | - '{0.1.16.0}'
  | ...
 vshard.storage.internal.schema_latest_version
  | ---
- | - - 0
- |   - 1
- |   - 16
- |   - 0
+ | - '{0.1.16.0}'
  | ...
 
 test_run:switch('default')
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 630d1ff..ebc6af1 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -74,6 +74,7 @@ if not M then
             ERRINJ_LAST_RECEIVE_DELAY = false,
             ERRINJ_RECEIVE_PARTIALLY = false,
             ERRINJ_NO_RECOVERY = false,
+            ERRINJ_UPGRADE = false,
         },
         -- This counter is used to restart background fibers with
         -- new reloaded code.
@@ -258,6 +259,33 @@ end
 -- Schema
 --------------------------------------------------------------------------------
 
+local schema_version_mt = {
+    __tostring = function(self)
+        return string.format('{%s}', table.concat(self, '.'))
+    end,
+    __serialize = function(self)
+        return tostring(self)
+    end,
+    __eq = function(l, r)
+        return l[1] == r[1] and l[2] == r[2] and l[3] == r[3] and l[4] == r[4]
+    end,
+    __lt = function(l, r)
+        for i = 1, 4 do
+            local diff = l[i] - r[i]
+            if diff < 0 then
+                return true
+            elseif diff > 0 then
+                return false
+            end
+        end
+        return false;
+    end,
+}
+
+local function schema_version_make(ver)
+    return setmetatable(ver, schema_version_mt)
+end
+
 -- VShard versioning works in 4 numbers: major, minor, patch, and
 -- a last helper number incremented on every schema change, if
 -- first 3 numbers stay not changed. That happens when users take
@@ -267,7 +295,7 @@ end
 -- The schema first time appeared with 0.1.16. So this function
 -- describes schema before that - 0.1.15.
 local function schema_init_0_1_15_0(username, password)
-    log.info("Initializing schema")
+    log.info("Initializing schema %s", schema_version_make({0, 1, 15, 0}))
     box.schema.user.create(username, {
         password = password,
         if_not_exists = true,
@@ -309,36 +337,31 @@ local function schema_init_0_1_15_0(username, password)
     box.space._schema:replace({'vshard_version', 0, 1, 15, 0})
 end
 
-local function schema_compare_versions(v1, v2)
-    for i = 1, 4 do
-        if v1[i] ~= v2[i] then
-            return v1[i] - v2[i]
-        end
-    end
-    return 0
-end
-
 local function schema_upgrade_to_0_1_16_0(username)
     -- Since 0.1.16.0 the old versioning by
     -- 'oncevshard:storage:<number>' is dropped because it is not
     -- really extendible nor understandable.
-    log.info('Insert vshard_version into _schema')
-    box.begin()
+    log.info("Insert 'vshard_version' into _schema")
     box.space._schema:replace({'vshard_version', 0, 1, 16, 0})
     box.space._schema:delete({'oncevshard:storage:1'})
-    box.commit()
+end
+
+local function schema_downgrade_from_0_1_16_0()
+    log.info("Remove 'vshard_version' from _schema")
+    box.space._schema:replace({'oncevshard:storage:1'})
+    box.space._schema:delete({'vshard_version'})
 end
 
 local function schema_current_version()
     local version = box.space._schema:get({'vshard_version'})
     if version == nil then
-        return {0, 1, 15, 0}
+        return schema_version_make({0, 1, 15, 0})
     else
-        return version:totable(2)
+        return schema_version_make(version:totable(2))
     end
 end
 
-local schema_latest_version = {0, 1, 16, 0}
+local schema_latest_version = schema_version_make({0, 1, 16, 0})
 
 local function schema_upgrade_replica()
     local version = schema_current_version()
@@ -354,11 +377,10 @@ local function schema_upgrade_replica()
     -- For example, from 1.2.3.4 to 1.7.8.9, when it is assumed
     -- that a safe upgrade should go 1.2.3.4 -> 1.2.4.1 ->
     -- 1.3.1.1 and so on step by step.
-    if schema_compare_versions(version, schema_latest_version) ~= 0 then
+    if version ~= schema_latest_version then
         log.info('Replica\' vshard schema version is not latest - current '..
-                 '%s vs latest %s, but the replica still can work',
-                 table.concat(version, '.'),
-                 table.concat(schema_latest_version, '.'))
+                 '%s vs latest %s, but the replica still can work', version,
+                 schema_latest_version)
     end
     -- In future for hard changes the replica may be suspended
     -- until its schema is synced with master. Or it may
@@ -374,10 +396,14 @@ end
 -- prohibit yields, in case the upgrade, for example, affects huge
 -- number of tuples (_bucket records, maybe).
 local schema_upgrade_handlers = {
-    {version = {0, 1, 16, 0}, func = schema_upgrade_to_0_1_16_0},
+    {
+        version = schema_version_make({0, 1, 16, 0}),
+        upgrade = schema_upgrade_to_0_1_16_0,
+        downgrade = schema_downgrade_from_0_1_16_0
+    },
 }
 
-local function schema_upgrade_master(username, password)
+local function schema_upgrade_master(target_version, username, password)
     local _schema = box.space._schema
     local is_old_versioning = _schema:get({'oncevshard:storage:1'}) ~= nil
     local version = schema_current_version()
@@ -386,39 +412,60 @@ local function schema_upgrade_master(username, password)
     if is_bootstrap then
         schema_init_0_1_15_0(username, password)
     elseif is_old_versioning then
-        log.info('The instance does not have vshard_version record. '..
-                 'It is 0.1.15.0.')
-    end
-    local handler_latest_version =
-        schema_upgrade_handlers[#schema_upgrade_handlers].version
-    assert(schema_compare_versions(handler_latest_version,
-                                   schema_latest_version) == 0)
+        log.info("The instance does not have 'vshard_version' record. "..
+                 "It is 0.1.15.0.")
+    end
+    assert(schema_upgrade_handlers[#schema_upgrade_handlers].version ==
+           schema_latest_version)
+    local prev_version = version
+    local ok, err1, err2
+    local errinj = M.errinj.ERRINJ_UPGRADE
     for _, handler in ipairs(schema_upgrade_handlers) do
         local next_version = handler.version
-        if schema_compare_versions(next_version, version) > 0 then
-            local next_version_str = table.concat(next_version, '.')
-            log.info("Upgrade vshard to {%s}", next_version_str)
-            local ok, err = pcall(handler.func, username, password)
+        if next_version > target_version then
+            break
+        end
+        if next_version > version then
+            log.info("Upgrade vshard schema to %s", next_version)
+            if errinj == 'begin' then
+                ok, err1 = false, 'Errinj in begin'
+            else
+                ok, err1 = pcall(handler.upgrade, username)
+                if ok and errinj == 'end' then
+                    ok, err1 = false, 'Errinj in end'
+                end
+            end
             if not ok then
+                -- Rollback in case the handler started a
+                -- transaction before the exception.
                 box.rollback()
-                log.info("Couldn't upgrade to {%s}: %s", next_version_str, err)
-                error(err)
+                log.info("Couldn't upgrade schema to %s: '%s'. Revert to %s",
+                         next_version, err1, prev_version)
+                ok, err2 = pcall(handler.downgrade)
+                if not ok then
+                    log.info("Couldn't downgrade schema to %s - fatal error: "..
+                             "'%s'", prev_version, err2)
+                    os.exit(-1)
+                end
+                error(err1)
             end
-            log.info("Successful vshard upgrade to {%s}", next_version_str)
-            ok, err = pcall(_schema.replace, _schema,
-                            {'vshard_version', unpack(next_version)})
+            ok, err1 = pcall(_schema.replace, _schema,
+                             {'vshard_version', unpack(next_version)})
             if not ok then
-                log.info("Upgraded to {%s} but couldn't update _schema "..
-                         "'vshard_version' - fatal error: %s", err)
+                log.info("Upgraded schema to %s but couldn't update _schema "..
+                         "'vshard_version' - fatal error: '%s'", next_version,
+                         err1)
                 os.exit(-1)
             end
+            log.info("Successful vshard schema upgrade to %s", next_version)
         end
+        prev_version = next_version
     end
 end
 
 local function schema_upgrade(is_master, username, password)
     if is_master then
-        return schema_upgrade_master(username, password)
+        return schema_upgrade_master(schema_latest_version, username, password)
     else
         return schema_upgrade_replica()
     end
@@ -2298,12 +2345,8 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
     local uri = luri.parse(this_replica.uri)
     schema_upgrade(is_master, uri.login, uri.password)
 
-    if not is_reload then
-        box.space._bucket:on_replace(bucket_generation_increment)
-    else
-        local old = box.space._bucket:on_replace()[1]
-        box.space._bucket:on_replace(bucket_generation_increment, old)
-    end
+    local old_trigger = box.space._bucket:on_replace()[1]
+    box.space._bucket:on_replace(bucket_generation_increment, old_trigger)
 
     lreplicaset.rebind_replicasets(new_replicasets, M.replicasets)
     lreplicaset.outdate_replicasets(M.replicasets)
@@ -2599,6 +2642,10 @@ M.rlist = {
 }
 M.schema_latest_version = schema_latest_version
 M.schema_current_version = schema_current_version
+M.schema_upgrade_master = schema_upgrade_master
+M.schema_upgrade_handlers = schema_upgrade_handlers
+M.schema_version_make = schema_version_make
+M.schema_bootstrap = schema_init_0_1_15_0
 
 return {
     sync = sync,


More information about the Tarantool-patches mailing list