From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp43.i.mail.ru (smtp43.i.mail.ru [94.100.177.103]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 7F37A446429 for ; Sat, 21 Mar 2020 21:59:29 +0300 (MSK) From: Vladislav Shpilevoy Date: Sat, 21 Mar 2020 19:59:25 +0100 Message-Id: <364ea64d7bf6beef8cea83ff2fbb432ba967cd80.1584817081.git.v.shpilevoy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH vshard 1/2] storage: introduce upgrade strategy List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org, olegrok@tarantool.org, yaroslav.dynnikov@tarantool.org Since vshard creation it never needed any schema changes. Version of vshard schema was always '1', stored in _schema as 'oncevshard:storage:1', because it was created by box.once('vshard:storage:1'). Now it is time to make some changes to the schema. First is necessity to simplify introduction and removal of internal functions. There are 3 of them at the moment: bucket_recv()/ rebalancer_apply_routes()/rebalancer_request_state(). Users see them in vshard.storage.* namespace, and in _func space. They are going to be replaced by one vshard.storage._service_call(). In that way it will be easy to update them without schema changes, to add new functions, remove them. In _service_call() it will be possible to validate versions, return proper errors in case a requested function does not exist. Secondly, _service_call() is going to be used by new discovery on routers. It is not possible to modify the existing vshard.storage.buckets_discovery() because it is a pubic function. But a new discovery can be added to _service_call(). Thirdly, there is a bug in rebalancing related to possible big TCP delays, which requires a change of _bucket space format in future. Fourthly, _service_call() would allow to introduce new functions on read-only replica, using code reload only. This may be needed for the bug about bucket_ref() not preventing bucket move. The patch introduces versioning using 4 numbers: x.x.x.x. The first 3 numbers are major, minor, patch, the same as in the tags. The last value is a number increased when first 3 numbers can't be changed, but the schema is modified. That happens when users take master branch between 2 tags, and then the schema is changed again before the new tag is published. Upgrade between two adjacent tags is supposed to be always safe and automatic when reload or reconfiguration happen. However currently there are too few versions so as any upgrade could be not safe. Needed for #227 Needed for #210 --- test/upgrade/box.lua | 9 ++ test/upgrade/storage_1_a.lua | 11 ++ test/upgrade/storage_1_b.lua | 1 + test/upgrade/storage_2_a.lua | 1 + test/upgrade/storage_2_b.lua | 1 + test/upgrade/suite.ini | 7 + test/upgrade/upgrade.result | 259 ++++++++++++++++++++++++++++++++++ test/upgrade/upgrade.test.lua | 94 ++++++++++++ vshard/storage/init.lua | 121 +++++++++++++++- 9 files changed, 500 insertions(+), 4 deletions(-) create mode 100644 test/upgrade/box.lua create mode 100644 test/upgrade/storage_1_a.lua create mode 120000 test/upgrade/storage_1_b.lua create mode 120000 test/upgrade/storage_2_a.lua create mode 120000 test/upgrade/storage_2_b.lua create mode 100644 test/upgrade/suite.ini create mode 100644 test/upgrade/upgrade.result create mode 100644 test/upgrade/upgrade.test.lua diff --git a/test/upgrade/box.lua b/test/upgrade/box.lua new file mode 100644 index 0000000..ad0543a --- /dev/null +++ b/test/upgrade/box.lua @@ -0,0 +1,9 @@ +#!/usr/bin/env tarantool + +require('strict').on() + +box.cfg{ + listen = os.getenv("LISTEN"), +} + +require('console').listen(os.getenv('ADMIN')) diff --git a/test/upgrade/storage_1_a.lua b/test/upgrade/storage_1_a.lua new file mode 100644 index 0000000..a521dbc --- /dev/null +++ b/test/upgrade/storage_1_a.lua @@ -0,0 +1,11 @@ +#!/usr/bin/env tarantool +util = require('util') +NAME = require('fio').basename(arg[0], '.lua') +local source_path = arg[1] +if source_path then + -- Run one storage on a different vshard + -- version. + package.path = string.format('%s/?.lua;%s/?/init.lua;%s', source_path, + source_path, package.path) +end +require('storage_template') diff --git a/test/upgrade/storage_1_b.lua b/test/upgrade/storage_1_b.lua new file mode 120000 index 0000000..02572da --- /dev/null +++ b/test/upgrade/storage_1_b.lua @@ -0,0 +1 @@ +storage_1_a.lua \ No newline at end of file diff --git a/test/upgrade/storage_2_a.lua b/test/upgrade/storage_2_a.lua new file mode 120000 index 0000000..02572da --- /dev/null +++ b/test/upgrade/storage_2_a.lua @@ -0,0 +1 @@ +storage_1_a.lua \ No newline at end of file diff --git a/test/upgrade/storage_2_b.lua b/test/upgrade/storage_2_b.lua new file mode 120000 index 0000000..02572da --- /dev/null +++ b/test/upgrade/storage_2_b.lua @@ -0,0 +1 @@ +storage_1_a.lua \ No newline at end of file diff --git a/test/upgrade/suite.ini b/test/upgrade/suite.ini new file mode 100644 index 0000000..78efefe --- /dev/null +++ b/test/upgrade/suite.ini @@ -0,0 +1,7 @@ +[default] +core = tarantool +description = Upgrade tests +script = box.lua +is_parallel = False +lua_libs = ../lua_libs/storage_template.lua ../lua_libs/util.lua + ../lua_libs/git_util.lua ../../example/localcfg.lua diff --git a/test/upgrade/upgrade.result b/test/upgrade/upgrade.result new file mode 100644 index 0000000..0fb68c7 --- /dev/null +++ b/test/upgrade/upgrade.result @@ -0,0 +1,259 @@ +-- test-run result file version 2 +test_run = require('test_run').new() + | --- + | ... +git_util = require('git_util') + | --- + | ... +util = require('util') + | --- + | ... + +-- Commit "Improve compatibility with 1.9". +version_0_1_15_0 = '79a4dbfc4229e922cbfe4be259193a7b18dc089d' + | --- + | ... +vshard_copy_path = util.git_checkout('vshard_git_tree_copy_0_1_15_0', \ + version_0_1_15_0) + | --- + | ... + +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } + | --- + | ... +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } + | --- + | ... +test_run:create_cluster(REPLICASET_1, 'upgrade', {args = vshard_copy_path}) + | --- + | ... +test_run:create_cluster(REPLICASET_2, 'upgrade', {args = vshard_copy_path}) + | --- + | ... +util = require('util') + | --- + | ... +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') + | --- + | ... +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') + | --- + | ... +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')') + | --- + | ... + +test_run:switch('storage_1_a') + | --- + | - true + | ... +box.space._schema:get({'oncevshard:storage:1'}) or box.space._schema:select() + | --- + | - ['oncevshard:storage:1'] + | ... +vshard.storage.internal.schema_current_version + | --- + | - null + | ... +vshard.storage.internal.schema_latest_version + | --- + | - null + | ... +bucket_count = vshard.consts.DEFAULT_BUCKET_COUNT / 2 + | --- + | ... +vshard.storage.bucket_force_create(1, bucket_count) + | --- + | - true + | ... +box.begin() \ +for i = 1, bucket_count do box.space.test:replace{i, i} end \ +box.commit() + | --- + | ... +box.space.test:count() + | --- + | - 1500 + | ... + +test_run:switch('storage_2_a') + | --- + | - true + | ... +box.space._schema:get({'oncevshard:storage:1'}) or box.space._schema:select() + | --- + | - ['oncevshard:storage:1'] + | ... +vshard.storage.internal.schema_current_version + | --- + | - null + | ... +vshard.storage.internal.schema_latest_version + | --- + | - null + | ... +bucket_count = vshard.consts.DEFAULT_BUCKET_COUNT / 2 + | --- + | ... +first_bucket = vshard.consts.DEFAULT_BUCKET_COUNT / 2 + 1 + | --- + | ... +vshard.storage.bucket_force_create(first_bucket, bucket_count) + | --- + | - true + | ... +box.begin() \ +for i = first_bucket, first_bucket + bucket_count - 1 do \ + box.space.test:replace{i, i} \ +end \ +box.commit() + | --- + | ... +box.space.test:count() + | --- + | - 1500 + | ... + +test_run:switch('default') + | --- + | - true + | ... +test_run:cmd('stop server storage_1_a') + | --- + | - true + | ... +test_run:cmd('start server storage_1_a') + | --- + | - true + | ... +test_run:cmd('stop server storage_1_b') + | --- + | - true + | ... +test_run:cmd('start server storage_1_b') + | --- + | - true + | ... + +test_run:switch('storage_1_a') + | --- + | - true + | ... +box.space._schema:get({'vshard_version'}) + | --- + | - ['vshard_version', 0, 1, 16, 0] + | ... +vshard.storage.internal.schema_current_version() + | --- + | - - 0 + | - 1 + | - 16 + | - 0 + | ... +vshard.storage.internal.schema_latest_version + | --- + | - - 0 + | - 1 + | - 16 + | - 0 + | ... + +test_run:switch('storage_1_b') + | --- + | - true + | ... +box.space._schema:get({'vshard_version'}) + | --- + | - ['vshard_version', 0, 1, 16, 0] + | ... +vshard.storage.internal.schema_current_version() + | --- + | - - 0 + | - 1 + | - 16 + | - 0 + | ... +vshard.storage.internal.schema_latest_version + | --- + | - - 0 + | - 1 + | - 16 + | - 0 + | ... + +test_run:switch('default') + | --- + | - true + | ... +-- Main purpose of the test - ensure that data can be safely moved +-- from an old instance to a newer one. Weight difference makes +-- rebalancer move the buckets from old storage_2 to new upgraded +-- storage_1. +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, [[ \ + cfg.sharding[ util.replicasets[2] ].weight = 1 \ + cfg.sharding[ util.replicasets[1] ].weight = 2 \ + cfg.rebalancer_max_sending = 5 \ + vshard.storage.cfg(cfg, util.name_to_uuid[NAME]) \ +]]) + | --- + | ... + +test_run:switch('storage_2_a') + | --- + | - true + | ... +wait_rebalancer_state('The cluster is balanced ok', test_run) + | --- + | ... +active_count = 0 + | --- + | ... +index = box.space._bucket.index.status + | --- + | ... +for _, t in index:pairs({vshard.consts.BUCKET.ACTIVE}) do \ + active_count = active_count + 1 \ + assert(box.space.test:get({t.id}) ~= nil) \ +end + | --- + | ... +active_count + | --- + | - 1000 + | ... + +test_run:switch('storage_1_a') + | --- + | - true + | ... +active_count = 0 + | --- + | ... +index = box.space._bucket.index.status + | --- + | ... +for _, t in index:pairs({vshard.consts.BUCKET.ACTIVE}) do \ + active_count = active_count + 1 \ + assert(box.space.test:get({t.id}) ~= nil) \ +end + | --- + | ... +active_count + | --- + | - 2000 + | ... + +test_run:switch('default') + | --- + | - true + | ... +test_run:drop_cluster(REPLICASET_2) + | --- + | ... +test_run:drop_cluster(REPLICASET_1) + | --- + | ... +test_run:cmd('clear filter') + | --- + | - true + | ... diff --git a/test/upgrade/upgrade.test.lua b/test/upgrade/upgrade.test.lua new file mode 100644 index 0000000..3a4d113 --- /dev/null +++ b/test/upgrade/upgrade.test.lua @@ -0,0 +1,94 @@ +test_run = require('test_run').new() +git_util = require('git_util') +util = require('util') + +-- Commit "Improve compatibility with 1.9". +version_0_1_15_0 = '79a4dbfc4229e922cbfe4be259193a7b18dc089d' +vshard_copy_path = util.git_checkout('vshard_git_tree_copy_0_1_15_0', \ + version_0_1_15_0) + +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } +test_run:create_cluster(REPLICASET_1, 'upgrade', {args = vshard_copy_path}) +test_run:create_cluster(REPLICASET_2, 'upgrade', {args = vshard_copy_path}) +util = require('util') +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')') + +test_run:switch('storage_1_a') +box.space._schema:get({'oncevshard:storage:1'}) or box.space._schema:select() +vshard.storage.internal.schema_current_version +vshard.storage.internal.schema_latest_version +bucket_count = vshard.consts.DEFAULT_BUCKET_COUNT / 2 +vshard.storage.bucket_force_create(1, bucket_count) +box.begin() \ +for i = 1, bucket_count do box.space.test:replace{i, i} end \ +box.commit() +box.space.test:count() + +test_run:switch('storage_2_a') +box.space._schema:get({'oncevshard:storage:1'}) or box.space._schema:select() +vshard.storage.internal.schema_current_version +vshard.storage.internal.schema_latest_version +bucket_count = vshard.consts.DEFAULT_BUCKET_COUNT / 2 +first_bucket = vshard.consts.DEFAULT_BUCKET_COUNT / 2 + 1 +vshard.storage.bucket_force_create(first_bucket, bucket_count) +box.begin() \ +for i = first_bucket, first_bucket + bucket_count - 1 do \ + box.space.test:replace{i, i} \ +end \ +box.commit() +box.space.test:count() + +test_run:switch('default') +test_run:cmd('stop server storage_1_a') +test_run:cmd('start server storage_1_a') +test_run:cmd('stop server storage_1_b') +test_run:cmd('start server storage_1_b') + +test_run:switch('storage_1_a') +box.space._schema:get({'vshard_version'}) +vshard.storage.internal.schema_current_version() +vshard.storage.internal.schema_latest_version + +test_run:switch('storage_1_b') +box.space._schema:get({'vshard_version'}) +vshard.storage.internal.schema_current_version() +vshard.storage.internal.schema_latest_version + +test_run:switch('default') +-- Main purpose of the test - ensure that data can be safely moved +-- from an old instance to a newer one. Weight difference makes +-- rebalancer move the buckets from old storage_2 to new upgraded +-- storage_1. +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, [[ \ + cfg.sharding[ util.replicasets[2] ].weight = 1 \ + cfg.sharding[ util.replicasets[1] ].weight = 2 \ + cfg.rebalancer_max_sending = 5 \ + vshard.storage.cfg(cfg, util.name_to_uuid[NAME]) \ +]]) + +test_run:switch('storage_2_a') +wait_rebalancer_state('The cluster is balanced ok', test_run) +active_count = 0 +index = box.space._bucket.index.status +for _, t in index:pairs({vshard.consts.BUCKET.ACTIVE}) do \ + active_count = active_count + 1 \ + assert(box.space.test:get({t.id}) ~= nil) \ +end +active_count + +test_run:switch('storage_1_a') +active_count = 0 +index = box.space._bucket.index.status +for _, t in index:pairs({vshard.consts.BUCKET.ACTIVE}) do \ + active_count = active_count + 1 \ + assert(box.space.test:get({t.id}) ~= nil) \ +end +active_count + +test_run:switch('default') +test_run:drop_cluster(REPLICASET_2) +test_run:drop_cluster(REPLICASET_1) +test_run:cmd('clear filter') diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index a86fcaf..a87a50b 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -257,7 +257,16 @@ end -------------------------------------------------------------------------------- -- Schema -------------------------------------------------------------------------------- -local function storage_schema_v1(username, password) + +-- VShard versioning works in 4 numbers: major, minor, patch, and +-- a last helper number incremented on every schema change, if +-- first 3 numbers stay not changed. That happens when users take +-- the latest master version not having a tag yet. They couldn't +-- upgrade if not the 4th number changed inside one tag. + +-- The schema first time appeared with 0.1.16. So this function +-- describes schema before that - 0.1.15. +local function schema_init_0_1_15_0(username, password) log.info("Initializing schema") box.schema.user.create(username, { password = password, @@ -297,7 +306,105 @@ local function storage_schema_v1(username, password) box.schema.user.grant(username, 'execute', 'function', name) end - box.snapshot() + box.space._schema:replace({'vshard_version', 0, 1, 15, 0}) +end + +local function schema_compare_versions(v1, v2) + for i = 1, 4 do + if v1[i] ~= v2[i] then + return v1[i] - v2[i] + end + end + return 0 +end + +local function schema_upgrade_to_0_1_16_0(username) + -- Since 0.1.16.0 the old versioning by + -- 'oncevshard:storage:' is dropped because it is not + -- really extendible nor understandable. + log.info('Insert vshard_version into _schema') + box.space._schema:replace({'vshard_version', 0, 1, 16, 0}) + box.space._schema:delete({'oncevshard:storage:1'}) +end + +local function schema_current_version() + local version = box.space._schema:get({'vshard_version'}) + if version == nil then + version = {0, 1, 15, 0} + else + version = version:totable() + version = {version[2], version[3], version[4], version[5]} + end + return version +end + +local schema_latest_version = {0, 1, 16, 0} + +local function schema_upgrade_replica() + local version = schema_current_version() + -- Replica can't do upgrade - it is read-only. And it + -- shouldn't anyway - that would conflict with master doing + -- the same. So the upgrade is either non-critical, and the + -- replica can work with the new code but old schema. Or it + -- it is critical, and need to wait the schema upgrade from + -- the master. + -- Or it may happen, that the upgrade just is not possible. + -- For example, when an auto-upgrade tries to change a too old + -- schema to the newest, skipping some intermediate versions. + -- For example, from 1.2.3.4 to 1.7.8.9, when it is assumed + -- that a safe upgrade should go 1.2.3.4 -> 1.2.4.1 -> + -- 1.3.1.1 and so on step by step. + if schema_compare_versions(version, schema_latest_version) == 0 then + log.info('Replica\'s vshard schema version is up to date - %s', + table.concat(version, '.')) + else + log.info('Replica\' vshard schema version is outdated - current '.. + '%s vs latest %s, but the replica still can work', + table.concat(version, '.'), + table.concat(schema_latest_version, '.')) + end + -- In future for hard changes the replica may be suspended + -- until its schema is synced with master. Or it may + -- reject to upgrade in case of incompatible changes. Now + -- there are too few versions so as such problems could + -- appear. +end + +local function schema_upgrade_master(username, password) + local _schema = box.space._schema + local is_old_versioning = _schema:get({'oncevshard:storage:1'}) ~= nil + local version = schema_current_version() + local is_bootstrap = not box.space._bucket + + if is_bootstrap then + schema_init_0_1_15_0(username, password) + elseif is_old_versioning then + log.info('The instance does not have vshard_version record. '.. + 'It is 0.1.15.0.') + end + local handlers = { + {version = {0, 1, 16, 0}, func = schema_upgrade_to_0_1_16_0}, + } + assert(schema_compare_versions(handlers[#handlers].version, + schema_latest_version) == 0) + for _, handler in ipairs(handlers) do + local next_version = handler.version + if schema_compare_versions(next_version, version) > 0 then + local next_version_str = table.concat(next_version, '.') + log.info("Upgrade vshard to {%s}", next_version_str) + handler.func(username, password) + log.info("Successful vshard upgrade to {%s}", next_version_str) + _schema:replace({'vshard_version', unpack(next_version)}) + end + end +end + +local function schema_upgrade(is_master, username, password) + if is_master then + return schema_upgrade_master(username, password) + else + return schema_upgrade_replica() + end end local function this_is_master() @@ -2169,8 +2276,12 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload) error(err) end log.info("Box has been configured") - local uri = luri.parse(this_replica.uri) - box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password) + end + + local uri = luri.parse(this_replica.uri) + schema_upgrade(is_master, uri.login, uri.password) + + if not is_reload then box.space._bucket:on_replace(bucket_generation_increment) else local old = box.space._bucket:on_replace()[1] @@ -2469,6 +2580,8 @@ M.rlist = { add_tail = rlist_add_tail, remove = rlist_remove, } +M.schema_latest_version = schema_latest_version +M.schema_current_version = schema_current_version return { sync = sync, -- 2.21.1 (Apple Git-122.3)