[Tarantool-patches] [PATCH v2 vshard 1/2] storage: introduce upgrade strategy

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Mar 25 02:44:46 MSK 2020


Since vshard creation it never needed any schema changes. Version
of vshard schema was always '1', stored in _schema as
'oncevshard:storage:1', because it was created by
box.once('vshard:storage:1').

Now it is time to make some changes to the schema. First is
necessity to simplify introduction and removal of internal
functions. There are 3 of them at the moment: bucket_recv()/
rebalancer_apply_routes()/rebalancer_request_state(). Users see
them in vshard.storage.* namespace, and in _func space. They are
going to be replaced by one vshard.storage._call(). In that way
it will be easy to update them without schema changes, to add new
functions, remove them. In _call() it will be possible to
validate versions, return proper errors in case a requested
function does not exist.

Secondly, _call() is going to be used by new discovery on
routers. It is not possible to modify the existing
vshard.storage.buckets_discovery() because it is a pubic function.
But a new discovery can be added to _call().

Thirdly, there is a bug in rebalancing related to possible big TCP
delays, which requires a change of _bucket space format in future.

Fourthly, _call() would allow to introduce new functions on
read-only replica, using code reload only. This may be needed
for the bug about bucket_ref() not preventing bucket move.

The patch introduces versioning using 4 numbers: x.x.x.x. The
first 3 numbers are major, minor, patch, the same as in the tags.
The last value is a number increased when first 3 numbers can't be
changed, but the schema is modified. That happens when users take
master branch between 2 tags, and then the schema is changed again
before the new tag is published.

Upgrade between two adjacent tags is supposed to be always safe
and automatic when reload or reconfiguration happen. However
currently there are too few versions so as any upgrade could be
not safe.

Needed for #227
Needed for #210
---
 test/unit/box2.lua            |   1 +
 test/unit/upgrade.result      | 222 ++++++++++++++++++++++++++++++
 test/unit/upgrade.test.lua    | 134 ++++++++++++++++++
 test/upgrade/box.lua          |   9 ++
 test/upgrade/storage_1_a.lua  |  11 ++
 test/upgrade/storage_1_b.lua  |   1 +
 test/upgrade/storage_2_a.lua  |   1 +
 test/upgrade/storage_2_b.lua  |   1 +
 test/upgrade/suite.ini        |   7 +
 test/upgrade/upgrade.result   | 247 ++++++++++++++++++++++++++++++++++
 test/upgrade/upgrade.test.lua |  94 +++++++++++++
 vshard/storage/init.lua       | 195 +++++++++++++++++++++++++--
 12 files changed, 914 insertions(+), 9 deletions(-)
 create mode 120000 test/unit/box2.lua
 create mode 100644 test/unit/upgrade.result
 create mode 100644 test/unit/upgrade.test.lua
 create mode 100644 test/upgrade/box.lua
 create mode 100644 test/upgrade/storage_1_a.lua
 create mode 120000 test/upgrade/storage_1_b.lua
 create mode 120000 test/upgrade/storage_2_a.lua
 create mode 120000 test/upgrade/storage_2_b.lua
 create mode 100644 test/upgrade/suite.ini
 create mode 100644 test/upgrade/upgrade.result
 create mode 100644 test/upgrade/upgrade.test.lua

diff --git a/test/unit/box2.lua b/test/unit/box2.lua
new file mode 120000
index 0000000..1347390
--- /dev/null
+++ b/test/unit/box2.lua
@@ -0,0 +1 @@
+box.lua
\ No newline at end of file
diff --git a/test/unit/upgrade.result b/test/unit/upgrade.result
new file mode 100644
index 0000000..a801862
--- /dev/null
+++ b/test/unit/upgrade.result
@@ -0,0 +1,222 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+-- There is no way to revert a bootstrapped storage manually. The
+-- test is done in a new instance which is killed with all its
+-- data afterwards.
+
+_ = test_run:cmd("create server newdefault with script='unit/box2.lua'")
+ | ---
+ | ...
+_ = test_run:cmd("start server newdefault")
+ | ---
+ | ...
+_ = test_run:switch("newdefault")
+ | ---
+ | ...
+
+util = require('util')
+ | ---
+ | ...
+
+vshard = require('vshard')
+ | ---
+ | ...
+
+upgrade = vshard.storage.internal.schema_upgrade_master
+ | ---
+ | ...
+handlers = vshard.storage.internal.schema_upgrade_handlers
+ | ---
+ | ...
+version_make = vshard.storage.internal.schema_version_make
+ | ---
+ | ...
+curr_version = vshard.storage.internal.schema_current_version
+ | ---
+ | ...
+schema_bootstrap = vshard.storage.internal.schema_bootstrap
+ | ---
+ | ...
+
+user = 'storage'
+ | ---
+ | ...
+password = 'storage'
+ | ---
+ | ...
+
+schema_bootstrap(user, password)
+ | ---
+ | ...
+
+version_make({0, 1, 16, 0})
+ | ---
+ | - '{0.1.16.0}'
+ | ...
+
+versions = {}
+ | ---
+ | ...
+
+_ = test_run:cmd("setopt delimiter ';'")
+ | ---
+ | ...
+for v1 = 1, 3 do
+    for v2 = 1, 3 do
+        for v3 = 1, 3 do
+            for v4 = 1, 3 do
+                table.insert(versions, version_make({v1, v2, v3, v4}))
+            end
+        end
+    end
+end;
+ | ---
+ | ...
+
+err = nil;
+ | ---
+ | ...
+count = 0;
+ | ---
+ | ...
+for i, v1 in pairs(versions) do
+    for j, v2 in pairs(versions) do
+        local v1n = string.format("%s.%s.%s.%s", v1[1], v1[2], v1[3], v1[4])
+        local v2n = string.format("%s.%s.%s.%s", v2[1], v2[2], v2[3], v2[4])
+        count = count + 1
+        if ((v1 == v2) ~= (v1n == v2n)) or ((v1 ~= v2) ~= (v1n ~= v2n)) or
+            ((v1 <= v2) ~= (v1n <= v2n)) or ((v1 < v2) ~= (v1n < v2n)) or
+            ((v1 >= v2) ~= (v1n >= v2n)) or ((v1 > v2) ~= (v1n > v2n)) then
+            err = {i, j}
+            break
+        end
+    end
+    if err then
+        break
+    end
+end;
+ | ---
+ | ...
+err;
+ | ---
+ | - null
+ | ...
+count;
+ | ---
+ | - 6561
+ | ...
+
+function stat_collect()
+    return {
+        func_count = box.space._func:count(),
+        space_count = box.space._space:count(),
+        user_count = box.space._user:count(),
+        schema_count = box.space._schema:count(),
+        version = curr_version(),
+    }
+end;
+ | ---
+ | ...
+
+function stat_diff(s1, s2)
+    local res = {}
+    if s2.func_count ~= s1.func_count then
+        res.func_count = s2.func_count - s1.func_count
+    end
+    if s2.space_count ~= s1.space_count then
+        res.space_count = s2.space_count - s1.space_count
+    end
+    if s2.user_count ~= s1.user_count then
+        res.user_count = s2.user_count - s1.user_count
+    end
+    if s2.schema_count ~= s1.schema_count then
+        res.schema_count = s2.schema_count - s1.schema_count
+    end
+    if s1.version ~= s2.version then
+        res.old_version = s1.version
+        res.new_version = s2.version
+    end
+    return res
+end;
+ | ---
+ | ...
+
+stat = stat_collect();
+ | ---
+ | ...
+
+function stat_update()
+    local new_stat = stat_collect()
+    local diff = stat_diff(stat, new_stat)
+    stat = new_stat
+    return diff
+end;
+ | ---
+ | ...
+
+upgrade_trace = {};
+ | ---
+ | ...
+errinj = vshard.storage.internal.errinj;
+ | ---
+ | ...
+
+for _, handler in pairs(handlers) do
+    table.insert(upgrade_trace, string.format('Upgrade to %s', handler.version))
+
+    table.insert(upgrade_trace, 'Errinj in begin')
+    errinj.ERRINJ_UPGRADE = 'begin'
+    table.insert(upgrade_trace,
+                 {util.check_error(upgrade, handler.version, user, password)})
+    table.insert(upgrade_trace, {diff = stat_update()})
+
+    table.insert(upgrade_trace, 'Errinj in end')
+    errinj.ERRINJ_UPGRADE = 'end'
+    table.insert(upgrade_trace,
+                 {util.check_error(upgrade, handler.version, user, password)})
+    table.insert(upgrade_trace, {diff = stat_update()})
+
+    table.insert(upgrade_trace, 'No errinj')
+    errinj.ERRINJ_UPGRADE = nil
+    table.insert(upgrade_trace,
+                 {pcall(upgrade, handler.version, user, password)})
+    table.insert(upgrade_trace, {diff = stat_update()})
+end;
+ | ---
+ | ...
+
+_ = test_run:cmd("setopt delimiter ''");
+ | ---
+ | ...
+
+upgrade_trace
+ | ---
+ | - - Upgrade to {0.1.16.0}
+ |   - Errinj in begin
+ |   - - Errinj in begin
+ |   - diff: []
+ |   - Errinj in end
+ |   - - Errinj in end
+ |   - diff: []
+ |   - No errinj
+ |   - - true
+ |   - diff:
+ |       new_version: '{0.1.16.0}'
+ |       old_version: '{0.1.15.0}'
+ | ...
+
+_ = test_run:switch("default")
+ | ---
+ | ...
+_ = test_run:cmd("stop server newdefault")
+ | ---
+ | ...
+_ = test_run:cmd("cleanup server newdefault")
+ | ---
+ | ...
+_ = test_run:cmd("delete server newdefault")
+ | ---
+ | ...
diff --git a/test/unit/upgrade.test.lua b/test/unit/upgrade.test.lua
new file mode 100644
index 0000000..1fcfb63
--- /dev/null
+++ b/test/unit/upgrade.test.lua
@@ -0,0 +1,134 @@
+test_run = require('test_run').new()
+
+-- There is no way to revert a bootstrapped storage manually. The
+-- test is done in a new instance which is killed with all its
+-- data afterwards.
+
+_ = test_run:cmd("create server newdefault with script='unit/box2.lua'")
+_ = test_run:cmd("start server newdefault")
+_ = test_run:switch("newdefault")
+
+util = require('util')
+
+vshard = require('vshard')
+
+upgrade = vshard.storage.internal.schema_upgrade_master
+handlers = vshard.storage.internal.schema_upgrade_handlers
+version_make = vshard.storage.internal.schema_version_make
+curr_version = vshard.storage.internal.schema_current_version
+schema_bootstrap = vshard.storage.internal.schema_bootstrap
+
+user = 'storage'
+password = 'storage'
+
+schema_bootstrap(user, password)
+
+version_make({0, 1, 16, 0})
+
+versions = {}
+
+_ = test_run:cmd("setopt delimiter ';'")
+for v1 = 1, 3 do
+    for v2 = 1, 3 do
+        for v3 = 1, 3 do
+            for v4 = 1, 3 do
+                table.insert(versions, version_make({v1, v2, v3, v4}))
+            end
+        end
+    end
+end;
+
+err = nil;
+count = 0;
+for i, v1 in pairs(versions) do
+    for j, v2 in pairs(versions) do
+        local v1n = string.format("%s.%s.%s.%s", v1[1], v1[2], v1[3], v1[4])
+        local v2n = string.format("%s.%s.%s.%s", v2[1], v2[2], v2[3], v2[4])
+        count = count + 1
+        if ((v1 == v2) ~= (v1n == v2n)) or ((v1 ~= v2) ~= (v1n ~= v2n)) or
+            ((v1 <= v2) ~= (v1n <= v2n)) or ((v1 < v2) ~= (v1n < v2n)) or
+            ((v1 >= v2) ~= (v1n >= v2n)) or ((v1 > v2) ~= (v1n > v2n)) then
+            err = {i, j}
+            break
+        end
+    end
+    if err then
+        break
+    end
+end;
+err;
+count;
+
+function stat_collect()
+    return {
+        func_count = box.space._func:count(),
+        space_count = box.space._space:count(),
+        user_count = box.space._user:count(),
+        schema_count = box.space._schema:count(),
+        version = curr_version(),
+    }
+end;
+
+function stat_diff(s1, s2)
+    local res = {}
+    if s2.func_count ~= s1.func_count then
+        res.func_count = s2.func_count - s1.func_count
+    end
+    if s2.space_count ~= s1.space_count then
+        res.space_count = s2.space_count - s1.space_count
+    end
+    if s2.user_count ~= s1.user_count then
+        res.user_count = s2.user_count - s1.user_count
+    end
+    if s2.schema_count ~= s1.schema_count then
+        res.schema_count = s2.schema_count - s1.schema_count
+    end
+    if s1.version ~= s2.version then
+        res.old_version = s1.version
+        res.new_version = s2.version
+    end
+    return res
+end;
+
+stat = stat_collect();
+
+function stat_update()
+    local new_stat = stat_collect()
+    local diff = stat_diff(stat, new_stat)
+    stat = new_stat
+    return diff
+end;
+
+upgrade_trace = {};
+errinj = vshard.storage.internal.errinj;
+
+for _, handler in pairs(handlers) do
+    table.insert(upgrade_trace, string.format('Upgrade to %s', handler.version))
+
+    table.insert(upgrade_trace, 'Errinj in begin')
+    errinj.ERRINJ_UPGRADE = 'begin'
+    table.insert(upgrade_trace,
+                 {util.check_error(upgrade, handler.version, user, password)})
+    table.insert(upgrade_trace, {diff = stat_update()})
+
+    table.insert(upgrade_trace, 'Errinj in end')
+    errinj.ERRINJ_UPGRADE = 'end'
+    table.insert(upgrade_trace,
+                 {util.check_error(upgrade, handler.version, user, password)})
+    table.insert(upgrade_trace, {diff = stat_update()})
+
+    table.insert(upgrade_trace, 'No errinj')
+    errinj.ERRINJ_UPGRADE = nil
+    table.insert(upgrade_trace,
+                 {pcall(upgrade, handler.version, user, password)})
+    table.insert(upgrade_trace, {diff = stat_update()})
+end;
+
+_ = test_run:cmd("setopt delimiter ''");
+
+upgrade_trace
+
+_ = test_run:switch("default")
+_ = test_run:cmd("stop server newdefault")
+_ = test_run:cmd("cleanup server newdefault")
+_ = test_run:cmd("delete server newdefault")
diff --git a/test/upgrade/box.lua b/test/upgrade/box.lua
new file mode 100644
index 0000000..ad0543a
--- /dev/null
+++ b/test/upgrade/box.lua
@@ -0,0 +1,9 @@
+#!/usr/bin/env tarantool
+
+require('strict').on()
+
+box.cfg{
+    listen = os.getenv("LISTEN"),
+}
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/upgrade/storage_1_a.lua b/test/upgrade/storage_1_a.lua
new file mode 100644
index 0000000..a521dbc
--- /dev/null
+++ b/test/upgrade/storage_1_a.lua
@@ -0,0 +1,11 @@
+#!/usr/bin/env tarantool
+util = require('util')
+NAME = require('fio').basename(arg[0], '.lua')
+local source_path = arg[1]
+if source_path then
+    -- Run one storage on a different vshard
+    -- version.
+    package.path = string.format('%s/?.lua;%s/?/init.lua;%s', source_path,
+                                 source_path, package.path)
+end
+require('storage_template')
diff --git a/test/upgrade/storage_1_b.lua b/test/upgrade/storage_1_b.lua
new file mode 120000
index 0000000..02572da
--- /dev/null
+++ b/test/upgrade/storage_1_b.lua
@@ -0,0 +1 @@
+storage_1_a.lua
\ No newline at end of file
diff --git a/test/upgrade/storage_2_a.lua b/test/upgrade/storage_2_a.lua
new file mode 120000
index 0000000..02572da
--- /dev/null
+++ b/test/upgrade/storage_2_a.lua
@@ -0,0 +1 @@
+storage_1_a.lua
\ No newline at end of file
diff --git a/test/upgrade/storage_2_b.lua b/test/upgrade/storage_2_b.lua
new file mode 120000
index 0000000..02572da
--- /dev/null
+++ b/test/upgrade/storage_2_b.lua
@@ -0,0 +1 @@
+storage_1_a.lua
\ No newline at end of file
diff --git a/test/upgrade/suite.ini b/test/upgrade/suite.ini
new file mode 100644
index 0000000..78efefe
--- /dev/null
+++ b/test/upgrade/suite.ini
@@ -0,0 +1,7 @@
+[default]
+core = tarantool
+description = Upgrade tests
+script = box.lua
+is_parallel = False
+lua_libs = ../lua_libs/storage_template.lua ../lua_libs/util.lua
+           ../lua_libs/git_util.lua ../../example/localcfg.lua
diff --git a/test/upgrade/upgrade.result b/test/upgrade/upgrade.result
new file mode 100644
index 0000000..4157b08
--- /dev/null
+++ b/test/upgrade/upgrade.result
@@ -0,0 +1,247 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+git_util = require('git_util')
+ | ---
+ | ...
+util = require('util')
+ | ---
+ | ...
+
+-- Commit "Improve compatibility with 1.9".
+version_0_1_15_0 = '79a4dbfc4229e922cbfe4be259193a7b18dc089d'
+ | ---
+ | ...
+vshard_copy_path = util.git_checkout('vshard_git_tree_copy_0_1_15_0',           \
+                                     version_0_1_15_0)
+ | ---
+ | ...
+
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+ | ---
+ | ...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+ | ---
+ | ...
+test_run:create_cluster(REPLICASET_1, 'upgrade', {args = vshard_copy_path})
+ | ---
+ | ...
+test_run:create_cluster(REPLICASET_2, 'upgrade', {args = vshard_copy_path})
+ | ---
+ | ...
+util = require('util')
+ | ---
+ | ...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+ | ---
+ | ...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+ | ---
+ | ...
+util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')')
+ | ---
+ | ...
+
+test_run:switch('storage_1_a')
+ | ---
+ | - true
+ | ...
+box.space._schema:get({'oncevshard:storage:1'}) or box.space._schema:select()
+ | ---
+ | - ['oncevshard:storage:1']
+ | ...
+vshard.storage.internal.schema_current_version
+ | ---
+ | - null
+ | ...
+vshard.storage.internal.schema_latest_version
+ | ---
+ | - null
+ | ...
+bucket_count = vshard.consts.DEFAULT_BUCKET_COUNT / 2
+ | ---
+ | ...
+vshard.storage.bucket_force_create(1, bucket_count)
+ | ---
+ | - true
+ | ...
+box.begin()                                                                     \
+for i = 1, bucket_count do box.space.test:replace{i, i} end                     \
+box.commit()
+ | ---
+ | ...
+box.space.test:count()
+ | ---
+ | - 1500
+ | ...
+
+test_run:switch('storage_2_a')
+ | ---
+ | - true
+ | ...
+box.space._schema:get({'oncevshard:storage:1'}) or box.space._schema:select()
+ | ---
+ | - ['oncevshard:storage:1']
+ | ...
+vshard.storage.internal.schema_current_version
+ | ---
+ | - null
+ | ...
+vshard.storage.internal.schema_latest_version
+ | ---
+ | - null
+ | ...
+bucket_count = vshard.consts.DEFAULT_BUCKET_COUNT / 2
+ | ---
+ | ...
+first_bucket = vshard.consts.DEFAULT_BUCKET_COUNT / 2 + 1
+ | ---
+ | ...
+vshard.storage.bucket_force_create(first_bucket, bucket_count)
+ | ---
+ | - true
+ | ...
+box.begin()                                                                     \
+for i = first_bucket, first_bucket + bucket_count - 1 do                        \
+    box.space.test:replace{i, i}                                                \
+end                                                                             \
+box.commit()
+ | ---
+ | ...
+box.space.test:count()
+ | ---
+ | - 1500
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:cmd('stop server storage_1_a')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server storage_1_a')
+ | ---
+ | - true
+ | ...
+test_run:cmd('stop server storage_1_b')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server storage_1_b')
+ | ---
+ | - true
+ | ...
+
+test_run:switch('storage_1_a')
+ | ---
+ | - true
+ | ...
+box.space._schema:get({'vshard_version'})
+ | ---
+ | - ['vshard_version', 0, 1, 16, 0]
+ | ...
+vshard.storage.internal.schema_current_version()
+ | ---
+ | - '{0.1.16.0}'
+ | ...
+vshard.storage.internal.schema_latest_version
+ | ---
+ | - '{0.1.16.0}'
+ | ...
+
+test_run:switch('storage_1_b')
+ | ---
+ | - true
+ | ...
+box.space._schema:get({'vshard_version'})
+ | ---
+ | - ['vshard_version', 0, 1, 16, 0]
+ | ...
+vshard.storage.internal.schema_current_version()
+ | ---
+ | - '{0.1.16.0}'
+ | ...
+vshard.storage.internal.schema_latest_version
+ | ---
+ | - '{0.1.16.0}'
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+-- Main purpose of the test - ensure that data can be safely moved
+-- from an old instance to a newer one. Weight difference makes
+-- rebalancer move the buckets from old storage_2 to new upgraded
+-- storage_1.
+util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, [[                       \
+    cfg.sharding[ util.replicasets[2] ].weight = 1                              \
+    cfg.sharding[ util.replicasets[1] ].weight = 2                              \
+    cfg.rebalancer_max_sending = 5                                              \
+    vshard.storage.cfg(cfg, util.name_to_uuid[NAME])                            \
+]])
+ | ---
+ | ...
+
+test_run:switch('storage_2_a')
+ | ---
+ | - true
+ | ...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+ | ---
+ | ...
+active_count = 0
+ | ---
+ | ...
+index = box.space._bucket.index.status
+ | ---
+ | ...
+for _, t in index:pairs({vshard.consts.BUCKET.ACTIVE}) do                       \
+    active_count = active_count + 1                                             \
+    assert(box.space.test:get({t.id}) ~= nil)                                   \
+end
+ | ---
+ | ...
+active_count
+ | ---
+ | - 1000
+ | ...
+
+test_run:switch('storage_1_a')
+ | ---
+ | - true
+ | ...
+active_count = 0
+ | ---
+ | ...
+index = box.space._bucket.index.status
+ | ---
+ | ...
+for _, t in index:pairs({vshard.consts.BUCKET.ACTIVE}) do                       \
+    active_count = active_count + 1                                             \
+    assert(box.space.test:get({t.id}) ~= nil)                                   \
+end
+ | ---
+ | ...
+active_count
+ | ---
+ | - 2000
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:drop_cluster(REPLICASET_2)
+ | ---
+ | ...
+test_run:drop_cluster(REPLICASET_1)
+ | ---
+ | ...
+test_run:cmd('clear filter')
+ | ---
+ | - true
+ | ...
diff --git a/test/upgrade/upgrade.test.lua b/test/upgrade/upgrade.test.lua
new file mode 100644
index 0000000..3a4d113
--- /dev/null
+++ b/test/upgrade/upgrade.test.lua
@@ -0,0 +1,94 @@
+test_run = require('test_run').new()
+git_util = require('git_util')
+util = require('util')
+
+-- Commit "Improve compatibility with 1.9".
+version_0_1_15_0 = '79a4dbfc4229e922cbfe4be259193a7b18dc089d'
+vshard_copy_path = util.git_checkout('vshard_git_tree_copy_0_1_15_0',           \
+                                     version_0_1_15_0)
+
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+test_run:create_cluster(REPLICASET_1, 'upgrade', {args = vshard_copy_path})
+test_run:create_cluster(REPLICASET_2, 'upgrade', {args = vshard_copy_path})
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')')
+
+test_run:switch('storage_1_a')
+box.space._schema:get({'oncevshard:storage:1'}) or box.space._schema:select()
+vshard.storage.internal.schema_current_version
+vshard.storage.internal.schema_latest_version
+bucket_count = vshard.consts.DEFAULT_BUCKET_COUNT / 2
+vshard.storage.bucket_force_create(1, bucket_count)
+box.begin()                                                                     \
+for i = 1, bucket_count do box.space.test:replace{i, i} end                     \
+box.commit()
+box.space.test:count()
+
+test_run:switch('storage_2_a')
+box.space._schema:get({'oncevshard:storage:1'}) or box.space._schema:select()
+vshard.storage.internal.schema_current_version
+vshard.storage.internal.schema_latest_version
+bucket_count = vshard.consts.DEFAULT_BUCKET_COUNT / 2
+first_bucket = vshard.consts.DEFAULT_BUCKET_COUNT / 2 + 1
+vshard.storage.bucket_force_create(first_bucket, bucket_count)
+box.begin()                                                                     \
+for i = first_bucket, first_bucket + bucket_count - 1 do                        \
+    box.space.test:replace{i, i}                                                \
+end                                                                             \
+box.commit()
+box.space.test:count()
+
+test_run:switch('default')
+test_run:cmd('stop server storage_1_a')
+test_run:cmd('start server storage_1_a')
+test_run:cmd('stop server storage_1_b')
+test_run:cmd('start server storage_1_b')
+
+test_run:switch('storage_1_a')
+box.space._schema:get({'vshard_version'})
+vshard.storage.internal.schema_current_version()
+vshard.storage.internal.schema_latest_version
+
+test_run:switch('storage_1_b')
+box.space._schema:get({'vshard_version'})
+vshard.storage.internal.schema_current_version()
+vshard.storage.internal.schema_latest_version
+
+test_run:switch('default')
+-- Main purpose of the test - ensure that data can be safely moved
+-- from an old instance to a newer one. Weight difference makes
+-- rebalancer move the buckets from old storage_2 to new upgraded
+-- storage_1.
+util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, [[                       \
+    cfg.sharding[ util.replicasets[2] ].weight = 1                              \
+    cfg.sharding[ util.replicasets[1] ].weight = 2                              \
+    cfg.rebalancer_max_sending = 5                                              \
+    vshard.storage.cfg(cfg, util.name_to_uuid[NAME])                            \
+]])
+
+test_run:switch('storage_2_a')
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+active_count = 0
+index = box.space._bucket.index.status
+for _, t in index:pairs({vshard.consts.BUCKET.ACTIVE}) do                       \
+    active_count = active_count + 1                                             \
+    assert(box.space.test:get({t.id}) ~= nil)                                   \
+end
+active_count
+
+test_run:switch('storage_1_a')
+active_count = 0
+index = box.space._bucket.index.status
+for _, t in index:pairs({vshard.consts.BUCKET.ACTIVE}) do                       \
+    active_count = active_count + 1                                             \
+    assert(box.space.test:get({t.id}) ~= nil)                                   \
+end
+active_count
+
+test_run:switch('default')
+test_run:drop_cluster(REPLICASET_2)
+test_run:drop_cluster(REPLICASET_1)
+test_run:cmd('clear filter')
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index a86fcaf..ebc6af1 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -74,6 +74,7 @@ if not M then
             ERRINJ_LAST_RECEIVE_DELAY = false,
             ERRINJ_RECEIVE_PARTIALLY = false,
             ERRINJ_NO_RECOVERY = false,
+            ERRINJ_UPGRADE = false,
         },
         -- This counter is used to restart background fibers with
         -- new reloaded code.
@@ -257,8 +258,44 @@ end
 --------------------------------------------------------------------------------
 -- Schema
 --------------------------------------------------------------------------------
-local function storage_schema_v1(username, password)
-    log.info("Initializing schema")
+
+local schema_version_mt = {
+    __tostring = function(self)
+        return string.format('{%s}', table.concat(self, '.'))
+    end,
+    __serialize = function(self)
+        return tostring(self)
+    end,
+    __eq = function(l, r)
+        return l[1] == r[1] and l[2] == r[2] and l[3] == r[3] and l[4] == r[4]
+    end,
+    __lt = function(l, r)
+        for i = 1, 4 do
+            local diff = l[i] - r[i]
+            if diff < 0 then
+                return true
+            elseif diff > 0 then
+                return false
+            end
+        end
+        return false;
+    end,
+}
+
+local function schema_version_make(ver)
+    return setmetatable(ver, schema_version_mt)
+end
+
+-- VShard versioning works in 4 numbers: major, minor, patch, and
+-- a last helper number incremented on every schema change, if
+-- first 3 numbers stay not changed. That happens when users take
+-- the latest master version not having a tag yet. They couldn't
+-- upgrade if not the 4th number changed inside one tag.
+
+-- The schema first time appeared with 0.1.16. So this function
+-- describes schema before that - 0.1.15.
+local function schema_init_0_1_15_0(username, password)
+    log.info("Initializing schema %s", schema_version_make({0, 1, 15, 0}))
     box.schema.user.create(username, {
         password = password,
         if_not_exists = true,
@@ -297,7 +334,141 @@ local function storage_schema_v1(username, password)
         box.schema.user.grant(username, 'execute', 'function', name)
     end
 
-    box.snapshot()
+    box.space._schema:replace({'vshard_version', 0, 1, 15, 0})
+end
+
+local function schema_upgrade_to_0_1_16_0(username)
+    -- Since 0.1.16.0 the old versioning by
+    -- 'oncevshard:storage:<number>' is dropped because it is not
+    -- really extendible nor understandable.
+    log.info("Insert 'vshard_version' into _schema")
+    box.space._schema:replace({'vshard_version', 0, 1, 16, 0})
+    box.space._schema:delete({'oncevshard:storage:1'})
+end
+
+local function schema_downgrade_from_0_1_16_0()
+    log.info("Remove 'vshard_version' from _schema")
+    box.space._schema:replace({'oncevshard:storage:1'})
+    box.space._schema:delete({'vshard_version'})
+end
+
+local function schema_current_version()
+    local version = box.space._schema:get({'vshard_version'})
+    if version == nil then
+        return schema_version_make({0, 1, 15, 0})
+    else
+        return schema_version_make(version:totable(2))
+    end
+end
+
+local schema_latest_version = schema_version_make({0, 1, 16, 0})
+
+local function schema_upgrade_replica()
+    local version = schema_current_version()
+    -- Replica can't do upgrade - it is read-only. And it
+    -- shouldn't anyway - that would conflict with master doing
+    -- the same. So the upgrade is either non-critical, and the
+    -- replica can work with the new code but old schema. Or it
+    -- it is critical, and need to wait the schema upgrade from
+    -- the master.
+    -- Or it may happen, that the upgrade just is not possible.
+    -- For example, when an auto-upgrade tries to change a too old
+    -- schema to the newest, skipping some intermediate versions.
+    -- For example, from 1.2.3.4 to 1.7.8.9, when it is assumed
+    -- that a safe upgrade should go 1.2.3.4 -> 1.2.4.1 ->
+    -- 1.3.1.1 and so on step by step.
+    if version ~= schema_latest_version then
+        log.info('Replica\' vshard schema version is not latest - current '..
+                 '%s vs latest %s, but the replica still can work', version,
+                 schema_latest_version)
+    end
+    -- In future for hard changes the replica may be suspended
+    -- until its schema is synced with master. Or it may
+    -- reject to upgrade in case of incompatible changes. Now
+    -- there are too few versions so as such problems could
+    -- appear.
+end
+
+-- Every handler should be atomic. It is either applied whole, or
+-- not applied at all. Atomic upgrade helps to downgrade in case
+-- something goes wrong. At least by doing restart with the latest
+-- successfully applied version. However, atomicity does not
+-- prohibit yields, in case the upgrade, for example, affects huge
+-- number of tuples (_bucket records, maybe).
+local schema_upgrade_handlers = {
+    {
+        version = schema_version_make({0, 1, 16, 0}),
+        upgrade = schema_upgrade_to_0_1_16_0,
+        downgrade = schema_downgrade_from_0_1_16_0
+    },
+}
+
+local function schema_upgrade_master(target_version, username, password)
+    local _schema = box.space._schema
+    local is_old_versioning = _schema:get({'oncevshard:storage:1'}) ~= nil
+    local version = schema_current_version()
+    local is_bootstrap = not box.space._bucket
+
+    if is_bootstrap then
+        schema_init_0_1_15_0(username, password)
+    elseif is_old_versioning then
+        log.info("The instance does not have 'vshard_version' record. "..
+                 "It is 0.1.15.0.")
+    end
+    assert(schema_upgrade_handlers[#schema_upgrade_handlers].version ==
+           schema_latest_version)
+    local prev_version = version
+    local ok, err1, err2
+    local errinj = M.errinj.ERRINJ_UPGRADE
+    for _, handler in ipairs(schema_upgrade_handlers) do
+        local next_version = handler.version
+        if next_version > target_version then
+            break
+        end
+        if next_version > version then
+            log.info("Upgrade vshard schema to %s", next_version)
+            if errinj == 'begin' then
+                ok, err1 = false, 'Errinj in begin'
+            else
+                ok, err1 = pcall(handler.upgrade, username)
+                if ok and errinj == 'end' then
+                    ok, err1 = false, 'Errinj in end'
+                end
+            end
+            if not ok then
+                -- Rollback in case the handler started a
+                -- transaction before the exception.
+                box.rollback()
+                log.info("Couldn't upgrade schema to %s: '%s'. Revert to %s",
+                         next_version, err1, prev_version)
+                ok, err2 = pcall(handler.downgrade)
+                if not ok then
+                    log.info("Couldn't downgrade schema to %s - fatal error: "..
+                             "'%s'", prev_version, err2)
+                    os.exit(-1)
+                end
+                error(err1)
+            end
+            ok, err1 = pcall(_schema.replace, _schema,
+                             {'vshard_version', unpack(next_version)})
+            if not ok then
+                log.info("Upgraded schema to %s but couldn't update _schema "..
+                         "'vshard_version' - fatal error: '%s'", next_version,
+                         err1)
+                os.exit(-1)
+            end
+            log.info("Successful vshard schema upgrade to %s", next_version)
+        end
+        prev_version = next_version
+    end
+end
+
+local function schema_upgrade(is_master, username, password)
+    if is_master then
+        return schema_upgrade_master(schema_latest_version, username, password)
+    else
+        return schema_upgrade_replica()
+    end
 end
 
 local function this_is_master()
@@ -2169,14 +2340,14 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
             error(err)
         end
         log.info("Box has been configured")
-        local uri = luri.parse(this_replica.uri)
-        box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password)
-        box.space._bucket:on_replace(bucket_generation_increment)
-    else
-        local old = box.space._bucket:on_replace()[1]
-        box.space._bucket:on_replace(bucket_generation_increment, old)
     end
 
+    local uri = luri.parse(this_replica.uri)
+    schema_upgrade(is_master, uri.login, uri.password)
+
+    local old_trigger = box.space._bucket:on_replace()[1]
+    box.space._bucket:on_replace(bucket_generation_increment, old_trigger)
+
     lreplicaset.rebind_replicasets(new_replicasets, M.replicasets)
     lreplicaset.outdate_replicasets(M.replicasets)
     M.replicasets = new_replicasets
@@ -2469,6 +2640,12 @@ M.rlist = {
     add_tail = rlist_add_tail,
     remove = rlist_remove,
 }
+M.schema_latest_version = schema_latest_version
+M.schema_current_version = schema_current_version
+M.schema_upgrade_master = schema_upgrade_master
+M.schema_upgrade_handlers = schema_upgrade_handlers
+M.schema_version_make = schema_version_make
+M.schema_bootstrap = schema_init_0_1_15_0
 
 return {
     sync = sync,
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list