From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 063FF70202; Wed, 24 Feb 2021 13:31:21 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 063FF70202 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1614162681; bh=ykoeDEdfkGi1s1cVLFl87JmzK4OHnIrJvKwZDz/ekS8=; h=To:References:Date:In-Reply-To:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To: From; b=MJmPG/gcHwWsJ9xwgYu7jc1f9FeAuJ826lnSmqWTSQZE0GZ12rjJ+9MePaDjrBtPg bVw0xdjHtin2qX8xQaMfm0vJnwECJY++dHqZYuFd8i7+mDZR6MmcRniwVTbXpbyREG IUeaDAtM9IUr+4p3uNwR6rOsMOhOTmA6vjW3KfG4= Received: from smtp35.i.mail.ru (smtp35.i.mail.ru [94.100.177.95]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id F26897030D for ; Wed, 24 Feb 2021 13:28:03 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org F26897030D Received: by smtp35.i.mail.ru with esmtpa (envelope-from ) id 1lErOl-0007s8-1Q; Wed, 24 Feb 2021 13:28:03 +0300 To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org, yaroslav.dynnikov@tarantool.org References: Message-ID: <7a39f441-b33d-0aaf-2135-fb09c3130b3c@tarantool.org> Date: Wed, 24 Feb 2021 13:28:02 +0300 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.16; rv:78.0) Gecko/20100101 Thunderbird/78.7.1 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 7bit Content-Language: en-GB X-7564579A: B8F34718100C35BD X-77F55803: 4F1203BC0FB41BD975C3EC174F56692254B0AABE1FB071B2BA6557555153D6A0182A05F538085040A486B65EC5350EE81F1FF66FDD16C63900B3F5139185D068251C8E086C6A02BB X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7AC4684DF4EC4B256EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006379B0255B5E5688AF88638F802B75D45FF5571747095F342E8C7A0BC55FA0FE5FC37516A8786216141C9FC061E2FFA66C364F48483281ABA7A389733CBF5DBD5E913377AFFFEAFD269A417C69337E82CC2CC7F00164DA146DAFE8445B8C89999729449624AB7ADAF37F6B57BC7E64490611E7FA7ABCAF51C92A417C69337E82CC2CC7F00164DA146DA6F5DAA56C3B73B237318B6A418E8EAB8D32BA5DBAC0009BE9E8FC8737B5C22493022527D3ED0A6A076E601842F6C81A12EF20D2F80756B5F7E9C4E3C761E06A776E601842F6C81A127C277FBC8AE2E8BC54B29B64E5E57B63AA81AA40904B5D9DBF02ECDB25306B2B25CBF701D1BE8734AD6D5ED66289B5278DA827A17800CE73349F3DFB73D9B1167F23339F89546C5A8DF7F3B2552694A6FED454B719173D6725E5C173C3A84C391A60F04C81D059235872C767BF85DA2F004C906525384306FED454B719173D6462275124DF8B9C9DE2850DD75B2526BE5BFE6E7EFDEDCD789D4C264860C145E X-C1DE0DAB: 0D63561A33F958A59F6C02B86EB7385DEC55A064DEC5B61ADC40D425F27C4E36D59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA75B7BFB303F1C7DB4D8E8E86DC7131B365E7726E8460B7C23C X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34987A0E8D235BB5341E8E396235BD9ED8A68B17A829895BC5D335412B32E699C234B923782551A0621D7E09C32AA3244C228B9609D1ADFA69826DBE2C92A7F77B3FD9C8CA1B0515E0FACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojyK6JYJ15DtKCvayl/k6xvQ== X-Mailru-Sender: 583F1D7ACE8F49BD9317CE1922F30C7EE625D8DDF0F0BC06003CD84ABEF7B591DE9789092B5ACC5A23E75C7104EB1B885DEE61814008E47C7013064206BFB89F93956FB04BA385BE9437F6177E88F7363CDA0F3B3F5B9367 X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH vshard 09/11] ref: introduce vshard.storage.ref module X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Oleg Babin via Tarantool-patches Reply-To: Oleg Babin Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Thanks for you patch. It's a brief review - I hope I'll look once again on this patch. Consider a question below. On 23.02.2021 03:15, Vladislav Shpilevoy wrote: > 'vshard.storage.ref' module helps to ensure that all buckets on > the storage stay writable while there is at least one ref on the > storage. Having storage referenced allows to execute any kinds of > requests on all the visible data in all spaces in locally stored > buckets. > > This is useful when need to access tons of buckets at once, > especially when exact bucket IDs are not known. > > Refs have deadlines. So as the storage wouldn't freeze not being > able to move buckets until restart in case a ref is not deleted > due to an error in user's code or disconnect. > > The disconnects and restarts mean the refs can't be global. > Otherwise any kinds of global counters, uuids and so on, even > paired with any ids from a client could clash between clients on > their reconnects or storage restarts. Unless they establish a > TCP-like session, which would be too complicated. > > Instead, the refs are spread over the existing box sessions. This > allows to bind refs of each client to its TCP connection and not > care about how to make them unique, how not to mess the refs on > restart, and how to drop the refs when a client disconnects. > > Vshard.storage.ref does not depend on internals of the main file > (storage/init.lua), so it is implemented as a separate module to > keep it simple and isolated. It uses the storage via the registry > only to get a couple of functions from its API. > > In addition, having it in a module simplifies the tests. > > The API is not public so far, and is going to be privately used by > the future map-reduce API. > > Part of #147 > --- > test/reload_evolution/storage.result | 66 ++++ > test/reload_evolution/storage.test.lua | 28 ++ > test/storage/ref.result | 399 +++++++++++++++++++++++++ > test/storage/ref.test.lua | 166 ++++++++++ > test/unit-tap/ref.test.lua | 202 +++++++++++++ > vshard/consts.lua | 1 + > vshard/error.lua | 19 ++ > vshard/storage/CMakeLists.txt | 2 +- > vshard/storage/init.lua | 9 + > vshard/storage/ref.lua | 371 +++++++++++++++++++++++ > 10 files changed, 1262 insertions(+), 1 deletion(-) > create mode 100644 test/storage/ref.result > create mode 100644 test/storage/ref.test.lua > create mode 100755 test/unit-tap/ref.test.lua > create mode 100644 vshard/storage/ref.lua > > diff --git a/test/reload_evolution/storage.result b/test/reload_evolution/storage.result > index 9d30a04..c4a0cdd 100644 > --- a/test/reload_evolution/storage.result > +++ b/test/reload_evolution/storage.result > @@ -227,6 +227,72 @@ box.space._bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) > --- > - 1500 > ... > +-- > +-- Ensure storage refs are enabled and work from the scratch via reload. > +-- > +lref = require('vshard.storage.ref') > +--- > +... > +vshard.storage.rebalancer_disable() > +--- > +... > +big_timeout = 1000000 > +--- > +... > +timeout = 0.01 > +--- > +... > +lref.add(0, 0, big_timeout) > +--- > +- true > +... > +status_index = box.space._bucket.index.status > +--- > +... > +bucket_id_to_move = status_index:min({vshard.consts.BUCKET.ACTIVE}).id > +--- > +... > +ok, err = vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[2], \ > + {timeout = timeout}) > +--- > +... > +assert(not ok and err.message) > +--- > +- Storage is referenced > +... > +lref.del(0, 0) > +--- > +- true > +... > +vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[2], \ > + {timeout = big_timeout}) > +--- > +- true > +... > +wait_bucket_is_collected(bucket_id_to_move) > +--- > +... > +test_run:switch('storage_2_a') > +--- > +- true > +... > +vshard.storage.rebalancer_disable() > +--- > +... > +big_timeout = 1000000 > +--- > +... > +bucket_id_to_move = test_run:eval('storage_1_a', 'return bucket_id_to_move')[1] > +--- > +... > +vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[1], \ > + {timeout = big_timeout}) > +--- > +- true > +... > +wait_bucket_is_collected(bucket_id_to_move) > +--- > +... > test_run:switch('default') > --- > - true > diff --git a/test/reload_evolution/storage.test.lua b/test/reload_evolution/storage.test.lua > index 639553e..c351ada 100644 > --- a/test/reload_evolution/storage.test.lua > +++ b/test/reload_evolution/storage.test.lua > @@ -83,6 +83,34 @@ box.space._bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) > test_run:switch('storage_1_a') > box.space._bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) > > +-- > +-- Ensure storage refs are enabled and work from the scratch via reload. > +-- > +lref = require('vshard.storage.ref') > +vshard.storage.rebalancer_disable() > + > +big_timeout = 1000000 > +timeout = 0.01 > +lref.add(0, 0, big_timeout) > +status_index = box.space._bucket.index.status > +bucket_id_to_move = status_index:min({vshard.consts.BUCKET.ACTIVE}).id > +ok, err = vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[2], \ > + {timeout = timeout}) > +assert(not ok and err.message) > +lref.del(0, 0) > +vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[2], \ > + {timeout = big_timeout}) > +wait_bucket_is_collected(bucket_id_to_move) > + > +test_run:switch('storage_2_a') > +vshard.storage.rebalancer_disable() > + > +big_timeout = 1000000 > +bucket_id_to_move = test_run:eval('storage_1_a', 'return bucket_id_to_move')[1] > +vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[1], \ > + {timeout = big_timeout}) > +wait_bucket_is_collected(bucket_id_to_move) > + > test_run:switch('default') > test_run:drop_cluster(REPLICASET_2) > test_run:drop_cluster(REPLICASET_1) > diff --git a/test/storage/ref.result b/test/storage/ref.result > new file mode 100644 > index 0000000..d5f4166 > --- /dev/null > +++ b/test/storage/ref.result > @@ -0,0 +1,399 @@ > +-- test-run result file version 2 > +test_run = require('test_run').new() > + | --- > + | ... > +netbox = require('net.box') > + | --- > + | ... > +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } > + | --- > + | ... > +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } > + | --- > + | ... > + > +test_run:create_cluster(REPLICASET_1, 'storage') > + | --- > + | ... > +test_run:create_cluster(REPLICASET_2, 'storage') > + | --- > + | ... > +util = require('util') > + | --- > + | ... > +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') > + | --- > + | ... > +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') > + | --- > + | ... > +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()') > + | --- > + | ... > + > +-- > +-- gh-147: refs allow to pin all the buckets on the storage at once. Is invented > +-- for map-reduce functionality to pin all buckets on all storages in the > +-- cluster to execute consistent map-reduce calls on all cluster data. > +-- > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +vshard.storage.rebalancer_disable() > + | --- > + | ... > +vshard.storage.bucket_force_create(1, 1500) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +vshard.storage.rebalancer_disable() > + | --- > + | ... > +vshard.storage.bucket_force_create(1501, 1500) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +lref = require('vshard.storage.ref') > + | --- > + | ... > + > +-- > +-- Bucket moves are not allowed under a ref. > +-- > +util = require('util') > + | --- > + | ... > +sid = 0 > + | --- > + | ... > +rid = 0 > + | --- > + | ... > +big_timeout = 1000000 > + | --- > + | ... > +small_timeout = 0.001 > + | --- > + | ... > +lref.add(rid, sid, big_timeout) > + | --- > + | - true > + | ... > +-- Send fails. > +ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \ > + {timeout = big_timeout}) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - Storage is referenced > + | ... > +lref.use(rid, sid) > + | --- > + | - true > + | ... > +-- Still fails - use only makes ref undead until it is deleted explicitly. > +ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \ > + {timeout = big_timeout}) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - Storage is referenced > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +-- Receive (from another replicaset) also fails. > +big_timeout = 1000000 > + | --- > + | ... > +ok, err = vshard.storage.bucket_send(1501, util.replicasets[1], \ > + {timeout = big_timeout}) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - Storage is referenced > + | ... > + > +-- > +-- After unref all the bucket moves are allowed again. > +-- > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +lref.del(rid, sid) > + | --- > + | - true > + | ... > + > +vshard.storage.bucket_send(1, util.replicasets[2], {timeout = big_timeout}) > + | --- > + | - true > + | ... > +wait_bucket_is_collected(1) > + | --- > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +vshard.storage.bucket_send(1, util.replicasets[1], {timeout = big_timeout}) > + | --- > + | - true > + | ... > +wait_bucket_is_collected(1) > + | --- > + | ... > + > +-- > +-- While bucket move is in progress, ref won't work. > +-- > +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true > + | --- > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +fiber = require('fiber') > + | --- > + | ... > +_ = fiber.create(vshard.storage.bucket_send, 1, util.replicasets[2], \ > + {timeout = big_timeout}) > + | --- > + | ... > +ok, err = lref.add(rid, sid, small_timeout) > + | --- > + | ... > +assert(not ok and err.message) > + | --- > + | - Timeout exceeded > + | ... > +-- Ref will wait if timeout is big enough. > +ok, err = nil > + | --- > + | ... > +_ = fiber.create(function() \ > + ok, err = lref.add(rid, sid, big_timeout) \ > +end) > + | --- > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false > + | --- > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +wait_bucket_is_collected(1) > + | --- > + | ... > +test_run:wait_cond(function() return ok or err end) > + | --- > + | - true > + | ... > +lref.use(rid, sid) > + | --- > + | - true > + | ... > +lref.del(rid, sid) > + | --- > + | - true > + | ... > +assert(ok and not err) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +vshard.storage.bucket_send(1, util.replicasets[1], {timeout = big_timeout}) > + | --- > + | - true > + | ... > +wait_bucket_is_collected(1) > + | --- > + | ... > + > +-- > +-- Refs are bound to sessions. > +-- > +box.schema.user.grant('storage', 'super') > + | --- > + | ... > +lref = require('vshard.storage.ref') > + | --- > + | ... > +small_timeout = 0.001 > + | --- > + | ... > +function make_ref(rid, timeout) \ > + return lref.add(rid, box.session.id(), timeout) \ > +end > + | --- > + | ... > +function use_ref(rid) \ > + return lref.use(rid, box.session.id()) \ > +end > + | --- > + | ... > +function del_ref(rid) \ > + return lref.del(rid, box.session.id()) \ > +end > + | --- > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +netbox = require('net.box') > + | --- > + | ... > +remote_uri = test_run:eval('storage_2_a', 'return box.cfg.listen')[1] > + | --- > + | ... > +c = netbox.connect(remote_uri) > + | --- > + | ... > + > +-- Ref is added and does not disappear anywhere on its own. > +c:call('make_ref', {1, small_timeout}) > + | --- > + | - true > + | ... > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +assert(lref.count == 1) > + | --- > + | - true > + | ... > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > + > +-- Use works. > +c:call('use_ref', {1}) > + | --- > + | - true > + | ... > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +assert(lref.count == 1) > + | --- > + | - true > + | ... > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > + > +-- Del works. > +c:call('del_ref', {1}) > + | --- > + | - true > + | ... > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +assert(lref.count == 0) > + | --- > + | - true > + | ... > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > + > +-- Expiration works. Try to add a second ref when the first one is expired - the > +-- first is collected and a subsequent use and del won't work. > +c:call('make_ref', {1, small_timeout}) > + | --- > + | - true > + | ... > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +assert(lref.count == 1) > + | --- > + | - true > + | ... > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > + > +fiber.sleep(small_timeout) > + | --- > + | ... > +c:call('make_ref', {2, small_timeout}) > + | --- > + | - true > + | ... > +ok, err = c:call('use_ref', {1}) > + | --- > + | ... > +assert(ok == nil and err.message) > + | --- > + | - 'Can not use a storage ref: no ref' > + | ... > +ok, err = c:call('del_ref', {1}) > + | --- > + | ... > +assert(ok == nil and err.message) > + | --- > + | - 'Can not delete a storage ref: no ref' > + | ... > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +assert(lref.count == 1) > + | --- > + | - true > + | ... > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > + > +-- > +-- Session disconnect removes its refs. > +-- > +c:call('make_ref', {3, big_timeout}) > + | --- > + | - true > + | ... > +c:close() > + | --- > + | ... > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +test_run:wait_cond(function() return lref.count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch("default") > + | --- > + | ... > +test_run:drop_cluster(REPLICASET_2) > + | --- > + | ... > +test_run:drop_cluster(REPLICASET_1) > + | --- > + | ... > diff --git a/test/storage/ref.test.lua b/test/storage/ref.test.lua > new file mode 100644 > index 0000000..b34a294 > --- /dev/null > +++ b/test/storage/ref.test.lua > @@ -0,0 +1,166 @@ > +test_run = require('test_run').new() > +netbox = require('net.box') > +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } > +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } > + > +test_run:create_cluster(REPLICASET_1, 'storage') > +test_run:create_cluster(REPLICASET_2, 'storage') > +util = require('util') > +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') > +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') > +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()') > + > +-- > +-- gh-147: refs allow to pin all the buckets on the storage at once. Is invented > +-- for map-reduce functionality to pin all buckets on all storages in the > +-- cluster to execute consistent map-reduce calls on all cluster data. > +-- > + > +_ = test_run:switch('storage_1_a') > +vshard.storage.rebalancer_disable() > +vshard.storage.bucket_force_create(1, 1500) > + > +_ = test_run:switch('storage_2_a') > +vshard.storage.rebalancer_disable() > +vshard.storage.bucket_force_create(1501, 1500) > + > +_ = test_run:switch('storage_1_a') > +lref = require('vshard.storage.ref') > + > +-- > +-- Bucket moves are not allowed under a ref. > +-- > +util = require('util') > +sid = 0 > +rid = 0 > +big_timeout = 1000000 > +small_timeout = 0.001 > +lref.add(rid, sid, big_timeout) > +-- Send fails. > +ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \ > + {timeout = big_timeout}) > +assert(not ok and err.message) > +lref.use(rid, sid) > +-- Still fails - use only makes ref undead until it is deleted explicitly. > +ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \ > + {timeout = big_timeout}) > +assert(not ok and err.message) > + > +_ = test_run:switch('storage_2_a') > +-- Receive (from another replicaset) also fails. > +big_timeout = 1000000 > +ok, err = vshard.storage.bucket_send(1501, util.replicasets[1], \ > + {timeout = big_timeout}) > +assert(not ok and err.message) > + > +-- > +-- After unref all the bucket moves are allowed again. > +-- > +_ = test_run:switch('storage_1_a') > +lref.del(rid, sid) > + > +vshard.storage.bucket_send(1, util.replicasets[2], {timeout = big_timeout}) > +wait_bucket_is_collected(1) > + > +_ = test_run:switch('storage_2_a') > +vshard.storage.bucket_send(1, util.replicasets[1], {timeout = big_timeout}) > +wait_bucket_is_collected(1) > + > +-- > +-- While bucket move is in progress, ref won't work. > +-- > +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true > + > +_ = test_run:switch('storage_1_a') > +fiber = require('fiber') > +_ = fiber.create(vshard.storage.bucket_send, 1, util.replicasets[2], \ > + {timeout = big_timeout}) > +ok, err = lref.add(rid, sid, small_timeout) > +assert(not ok and err.message) > +-- Ref will wait if timeout is big enough. > +ok, err = nil > +_ = fiber.create(function() \ > + ok, err = lref.add(rid, sid, big_timeout) \ > +end) > + > +_ = test_run:switch('storage_2_a') > +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false > + > +_ = test_run:switch('storage_1_a') > +wait_bucket_is_collected(1) > +test_run:wait_cond(function() return ok or err end) > +lref.use(rid, sid) > +lref.del(rid, sid) > +assert(ok and not err) > + > +_ = test_run:switch('storage_2_a') > +vshard.storage.bucket_send(1, util.replicasets[1], {timeout = big_timeout}) > +wait_bucket_is_collected(1) > + > +-- > +-- Refs are bound to sessions. > +-- > +box.schema.user.grant('storage', 'super') > +lref = require('vshard.storage.ref') > +small_timeout = 0.001 > +function make_ref(rid, timeout) \ > + return lref.add(rid, box.session.id(), timeout) \ > +end > +function use_ref(rid) \ > + return lref.use(rid, box.session.id()) \ > +end > +function del_ref(rid) \ > + return lref.del(rid, box.session.id()) \ > +end > + > +_ = test_run:switch('storage_1_a') > +netbox = require('net.box') > +remote_uri = test_run:eval('storage_2_a', 'return box.cfg.listen')[1] > +c = netbox.connect(remote_uri) > + > +-- Ref is added and does not disappear anywhere on its own. > +c:call('make_ref', {1, small_timeout}) > +_ = test_run:switch('storage_2_a') > +assert(lref.count == 1) > +_ = test_run:switch('storage_1_a') > + > +-- Use works. > +c:call('use_ref', {1}) > +_ = test_run:switch('storage_2_a') > +assert(lref.count == 1) > +_ = test_run:switch('storage_1_a') > + > +-- Del works. > +c:call('del_ref', {1}) > +_ = test_run:switch('storage_2_a') > +assert(lref.count == 0) > +_ = test_run:switch('storage_1_a') > + > +-- Expiration works. Try to add a second ref when the first one is expired - the > +-- first is collected and a subsequent use and del won't work. > +c:call('make_ref', {1, small_timeout}) > +_ = test_run:switch('storage_2_a') > +assert(lref.count == 1) > +_ = test_run:switch('storage_1_a') > + > +fiber.sleep(small_timeout) > +c:call('make_ref', {2, small_timeout}) > +ok, err = c:call('use_ref', {1}) > +assert(ok == nil and err.message) > +ok, err = c:call('del_ref', {1}) > +assert(ok == nil and err.message) > +_ = test_run:switch('storage_2_a') > +assert(lref.count == 1) > +_ = test_run:switch('storage_1_a') > + > +-- > +-- Session disconnect removes its refs. > +-- > +c:call('make_ref', {3, big_timeout}) > +c:close() > +_ = test_run:switch('storage_2_a') > +test_run:wait_cond(function() return lref.count == 0 end) > + > +_ = test_run:switch("default") > +test_run:drop_cluster(REPLICASET_2) > +test_run:drop_cluster(REPLICASET_1) > diff --git a/test/unit-tap/ref.test.lua b/test/unit-tap/ref.test.lua > new file mode 100755 > index 0000000..d987a63 > --- /dev/null > +++ b/test/unit-tap/ref.test.lua > @@ -0,0 +1,202 @@ > +#!/usr/bin/env tarantool > + > +local tap = require('tap') > +local test = tap.test('cfg') > +local fiber = require('fiber') > +local lregistry = require('vshard.registry') > +local lref = require('vshard.storage.ref') > + > +local big_timeout = 1000000 > +local small_timeout = 0.000001 > +local sid = 0 > +local sid2 = 1 > +local sid3 = 2 > + > +-- > +-- gh-147: refs allow to pin all the buckets on the storage at once. Is invented > +-- for map-reduce functionality to pin all buckets on all storages in the > +-- cluster to execute consistent map-reduce calls on all cluster data. > +-- > + > +-- > +-- Refs used storage API to get bucket space state and wait on its changes. But > +-- not important for these unit tests. > +-- > +local function bucket_are_all_rw() > + return true > +end > + > +lregistry.storage = { > + bucket_are_all_rw = bucket_are_all_rw, > +} > + > +-- > +-- Min heap fill and empty. > +-- > +local function test_ref_basic(test) > + test:plan(15) > + > + local rid = 0 > + local ok, err > + -- > + -- Basic ref/unref. > + -- > + ok, err = lref.add(rid, sid, big_timeout) > + test:ok(ok and not err, '+1 ref') > + test:is(lref.count, 1, 'accounted') > + ok, err = lref.use(rid, sid) > + test:ok(ok and not err, 'use the ref') > + test:is(lref.count, 1, 'but still accounted') > + ok, err = lref.del(rid, sid) > + test:ok(ok and not err, '-1 ref') > + test:is(lref.count, 0, 'accounted') > + > + -- > + -- Bad ref ID. > + -- > + rid = 1 > + ok, err = lref.use(rid, sid) > + test:ok(not ok and err, 'invalid RID at use') > + ok, err = lref.del(rid, sid) > + test:ok(not ok and err, 'invalid RID at del') > + > + -- > + -- Bad session ID. > + -- > + lref.kill(sid) > + rid = 0 > + ok, err = lref.use(rid, sid) > + test:ok(not ok and err, 'invalid SID at use') > + ok, err = lref.del(rid, sid) > + test:ok(not ok and err, 'invalid SID at del') > + > + -- > + -- Duplicate ID. > + -- > + ok, err = lref.add(rid, sid, big_timeout) > + test:ok(ok and not err, 'add ref') > + ok, err = lref.add(rid, sid, big_timeout) > + test:ok(not ok and err, 'duplicate ref') > + test:is(lref.count, 1, 'did not affect count') > + test:ok(lref.use(rid, sid) and lref.del(rid, sid), 'del old ref') > + test:is(lref.count, 0, 'accounted') > +end > + > +local function test_ref_incremental_gc(test) > + test:plan(20) > + > + -- > + -- Ref addition expires 2 old refs. > + -- > + local ok, err > + for i = 0, 2 do > + assert(lref.add(i, sid, small_timeout)) > + end > + fiber.sleep(small_timeout) > + test:is(lref.count, 3, 'expired refs are still here') > + test:ok(lref.add(3, sid, 0), 'add new ref') > + -- 3 + 1 new - 2 old = 2. > + test:is(lref.count, 2, 'it collected 2 old refs') > + test:ok(lref.add(4, sid, 0), 'add new ref') > + -- 2 + 1 new - 2 old = 1. > + test:is(lref.count, 2, 'it collected 2 old refs') > + test:ok(lref.del(4, sid), 'del the latest manually') > + > + -- > + -- Incremental GC works fine if only one ref was GCed. > + -- > + test:ok(lref.add(0, sid, small_timeout), 'add ref with small timeout') > + test:ok(lref.add(1, sid, big_timeout), 'add ref with big timeout') > + fiber.sleep(small_timeout) > + test:ok(lref.add(2, sid, 0), 'add ref with 0 timeout') > + test:is(lref.count, 2, 'collected 1 old ref, 1 is kept') > + test:ok(lref.del(2, sid), 'del newest ref, it was not collected') > + test:ok(lref.del(1, sid), 'del ref with big timeout') > + test:ok(lref.count, 0, 'all is deleted') > + > + -- > + -- GC works fine when only one ref was left and it was expired. > + -- > + test:ok(lref.add(0, sid, small_timeout), 'add ref with small timeout') > + test:is(lref.count, 1, '1 ref total') > + fiber.sleep(small_timeout) > + test:ok(lref.add(1, sid, big_timeout), 'add ref with big timeout') > + test:is(lref.count, 1, 'collected the old one') > + lref.gc() > + test:is(lref.count, 1, 'still 1 - timeout was big') > + test:ok(lref.del(1, sid), 'delete it') > + test:is(lref.count, 0, 'no refs') > +end > + > +local function test_ref_gc(test) > + test:plan(7) > + > + -- > + -- Generic GC works fine with multiple sessions. > + -- > + assert(lref.add(0, sid, big_timeout)) > + assert(lref.add(1, sid, small_timeout)) > + assert(lref.add(0, sid3, small_timeout)) > + assert(lref.add(0, sid2, small_timeout)) > + assert(lref.add(1, sid2, big_timeout)) > + assert(lref.add(1, sid3, big_timeout)) > + test:is(lref.count, 6, 'add 6 refs total') > + fiber.sleep(small_timeout) > + lref.gc() > + test:is(lref.count, 3, '3 collected') > + test:ok(lref.del(0, sid), 'del first') > + test:ok(lref.del(1, sid2), 'del second') > + test:ok(lref.del(1, sid3), 'del third') > + test:is(lref.count, 0, '3 deleted') > + lref.gc() > + test:is(lref.count, 0, 'gc on empty refs did not break anything') > +end > + > +local function test_ref_use(test) > + test:plan(7) > + > + -- > + -- Ref use updates the session heap. > + -- > + assert(lref.add(0, sid, small_timeout)) > + assert(lref.add(0, sid2, big_timeout)) > + test:ok(lref.count, 2, 'add 2 refs') > + test:ok(lref.use(0, sid), 'use one with small timeout') > + lref.gc() > + test:is(lref.count, 2, 'still 2 refs') > + fiber.sleep(small_timeout) > + test:is(lref.count, 2, 'still 2 refs after sleep') > + test:ok(lref.del(0, sid, 'del first')) > + test:ok(lref.del(0, sid2, 'del second')) > + test:is(lref.count, 0, 'now all is deleted') > +end > + > +local function test_ref_del(test) > + test:plan(7) > + > + -- > + -- Ref del updates the session heap. > + -- > + assert(lref.add(0, sid, small_timeout)) > + assert(lref.add(0, sid2, big_timeout)) > + test:is(lref.count, 2, 'add 2 refs') > + test:ok(lref.del(0, sid), 'del with small timeout') > + lref.gc() > + test:is(lref.count, 1, '1 ref remains') > + fiber.sleep(small_timeout) > + test:is(lref.count, 1, '1 ref remains after sleep') > + lref.gc() > + test:is(lref.count, 1, '1 ref remains after sleep and gc') > + test:ok(lref.del(0, sid2), 'del with big timeout') > + test:is(lref.count, 0, 'now all is deleted') > +end > + > +test:plan(5) > + > +test:test('basic', test_ref_basic) > +test:test('incremental gc', test_ref_incremental_gc) > +test:test('gc', test_ref_gc) > +test:test('use', test_ref_use) > +test:test('del', test_ref_del) > + > +os.exit(test:check() and 0 or 1) > diff --git a/vshard/consts.lua b/vshard/consts.lua > index cf3f422..0ffe0e2 100644 > --- a/vshard/consts.lua > +++ b/vshard/consts.lua > @@ -48,4 +48,5 @@ return { > DISCOVERY_TIMEOUT = 10, > > TIMEOUT_INFINITY = 500 * 365 * 86400, > + DEADLINE_INFINITY = math.huge, > } > diff --git a/vshard/error.lua b/vshard/error.lua > index a6f46a9..b02bfe9 100644 > --- a/vshard/error.lua > +++ b/vshard/error.lua > @@ -130,6 +130,25 @@ local error_message_template = { > name = 'TOO_MANY_RECEIVING', > msg = 'Too many receiving buckets at once, please, throttle' > }, > + [26] = { > + name = 'STORAGE_IS_REFERENCED', > + msg = 'Storage is referenced' > + }, > + [27] = { > + name = 'STORAGE_REF_ADD', > + msg = 'Can not add a storage ref: %s', > + args = {'reason'}, > + }, > + [28] = { > + name = 'STORAGE_REF_USE', > + msg = 'Can not use a storage ref: %s', > + args = {'reason'}, > + }, > + [29] = { > + name = 'STORAGE_REF_DEL', > + msg = 'Can not delete a storage ref: %s', > + args = {'reason'}, > + }, > } > > -- > diff --git a/vshard/storage/CMakeLists.txt b/vshard/storage/CMakeLists.txt > index 3f4ed43..7c1e97d 100644 > --- a/vshard/storage/CMakeLists.txt > +++ b/vshard/storage/CMakeLists.txt > @@ -1,2 +1,2 @@ > -install(FILES init.lua reload_evolution.lua > +install(FILES init.lua reload_evolution.lua ref.lua > DESTINATION ${TARANTOOL_INSTALL_LUADIR}/vshard/storage) > diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua > index c3ed236..2957f48 100644 > --- a/vshard/storage/init.lua > +++ b/vshard/storage/init.lua > @@ -17,6 +17,7 @@ if rawget(_G, MODULE_INTERNALS) then > 'vshard.replicaset', 'vshard.util', > 'vshard.storage.reload_evolution', > 'vshard.lua_gc', 'vshard.rlist', 'vshard.registry', > + 'vshard.heap', 'vshard.storage.ref', > } > for _, module in pairs(vshard_modules) do > package.loaded[module] = nil > @@ -30,6 +31,7 @@ local lreplicaset = require('vshard.replicaset') > local util = require('vshard.util') > local lua_gc = require('vshard.lua_gc') > local lregistry = require('vshard.registry') > +local lref = require('vshard.storage.ref') > local reload_evolution = require('vshard.storage.reload_evolution') > local fiber_cond_wait = util.fiber_cond_wait > local bucket_ref_new > @@ -1140,6 +1142,9 @@ local function bucket_recv_xc(bucket_id, from, data, opts) > return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id, msg, > from) > end > + if lref.count > 0 then > + return nil, lerror.vshard(lerror.code.STORAGE_IS_REFERENCED) > + end You will remove this part in the next patch. Do you really need it? Or you add it just for tests? > if is_this_replicaset_locked() then > return nil, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED) > end > @@ -1441,6 +1446,9 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard) > > local _bucket = box.space._bucket > local bucket = _bucket:get({bucket_id}) > + if lref.count > 0 then > + return nil, lerror.vshard(lerror.code.STORAGE_IS_REFERENCED) > + end Ditto. > if is_this_replicaset_locked() then > return nil, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED) > end > @@ -2528,6 +2536,7 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload) > box.space._bucket:on_replace(nil, M.bucket_on_replace) > M.bucket_on_replace = nil > end > + lref.cfg() > if is_master then > box.space._bucket:on_replace(bucket_generation_increment) > M.bucket_on_replace = bucket_generation_increment > diff --git a/vshard/storage/ref.lua b/vshard/storage/ref.lua > new file mode 100644 > index 0000000..7589cb9 > --- /dev/null > +++ b/vshard/storage/ref.lua > @@ -0,0 +1,371 @@ > +-- > +-- 'Ref' module helps to ensure that all buckets on the storage stay writable > +-- while there is at least one ref on the storage. > +-- Having storage referenced allows to execute any kinds of requests on all the > +-- visible data in all spaces in locally stored buckets. This is useful when > +-- need to access tons of buckets at once, especially when exact bucket IDs are > +-- not known. > +-- > +-- Refs have deadlines. So as the storage wouldn't freeze not being able to move > +-- buckets until restart in case a ref is not deleted due to an error in user's > +-- code or disconnect. > +-- > +-- The disconnects and restarts mean the refs can't be global. Otherwise any > +-- kinds of global counters, uuids and so on, even paired with any ids from a > +-- client could clash between clients on their reconnects or storage restarts. > +-- Unless they establish a TCP-like session, which would be too complicated. > +-- > +-- Instead, the refs are spread over the existing box sessions. This allows to > +-- bind refs of each client to its TCP connection and not care about how to make > +-- them unique across all sessions, how not to mess the refs on restart, and how > +-- to drop the refs when a client disconnects. > +-- > + > +local MODULE_INTERNALS = '__module_vshard_storage_ref' > +-- Update when change behaviour of anything in the file, to be able to reload. > +local MODULE_VERSION = 1 > + > +local lfiber = require('fiber') > +local lheap = require('vshard.heap') > +local lerror = require('vshard.error') > +local lconsts = require('vshard.consts') > +local lregistry = require('vshard.registry') > +local fiber_clock = lfiber.clock > +local fiber_yield = lfiber.yield > +local DEADLINE_INFINITY = lconsts.DEADLINE_INFINITY > +local LUA_CHUNK_SIZE = lconsts.LUA_CHUNK_SIZE > + > +-- > +-- Binary heap sort. Object with the closest deadline should be on top. > +-- > +local function heap_min_deadline_cmp(ref1, ref2) > + return ref1.deadline < ref2.deadline > +end > + > +local M = rawget(_G, MODULE_INTERNALS) > +if not M then > + M = { > + module_version = MODULE_VERSION, > + -- Total number of references in all sessions. > + count = 0, > + -- Heap of session objects. Each session has refs sorted by their > + -- deadline. The sessions themselves are also sorted by deadlines. > + -- Session deadline is defined as the closest deadline of all its refs. > + -- Or infinity in case there are no refs in it. > + session_heap = lheap.new(heap_min_deadline_cmp), > + -- Map of session objects. This is used to get session object by its ID. > + session_map = {}, > + -- On session disconnect trigger to kill the dead sessions. It is saved > + -- here for the sake of future reload to be able to delete the old > + -- on disconnect function before setting a new one. > + on_disconnect = nil, > + } > +else > + -- No reload so far. This is a first version. Return as is. > + return M > +end > + > +local function ref_session_new(sid) > + -- Session object does store its internal hot attributes in a table. Because > + -- it would mean access to any session attribute would cost at least one > + -- table indexing operation. Instead, all internal fields are stored as > + -- upvalues referenced by the methods defined as closures. > + -- > + -- This means session creation may not very suitable for jitting, but it is > + -- very rare and attempts to optimize the most common case. > + -- > + -- Still the public functions take 'self' object to make it look normally. > + -- They even use it a bit. > + > + -- Ref map to get ref object by its ID. > + local ref_map = {} > + -- Ref heap sorted by their deadlines. > + local ref_heap = lheap.new(heap_min_deadline_cmp) > + -- Total number of refs of the session. Is used to drop the session without > + -- fullscan of the ref map. Heap size can't be used because not all refs are > + -- stored here. See more on that below. > + local count = 0 > + -- Cache global session storages as upvalues to save on M indexing. > + local global_heap = M.session_heap > + local global_map = M.session_map > + > + local function ref_session_discount(self, del_count) > + local new_count = M.count - del_count > + assert(new_count >= 0) > + M.count = new_count > + > + new_count = count - del_count > + assert(new_count >= 0) > + count = new_count > + end > + > + local function ref_session_update_deadline(self) > + local ref = ref_heap:top() > + if not ref then > + self.deadline = DEADLINE_INFINITY > + global_heap:update(self) > + else > + local deadline = ref.deadline > + if deadline ~= self.deadline then > + self.deadline = deadline > + global_heap:update(self) > + end > + end > + end > + > + -- > + -- Garbage collect at most 2 expired refs. The idea is that there is no a > + -- dedicated fiber for expired refs collection. It would be too expensive to > + -- wakeup a fiber on each added or removed or updated ref. > + -- > + -- Instead, ref GC is mostly incremental and works by the principle "remove > + -- more than add". On each new ref added, two old refs try to expire. This > + -- way refs don't stack infinitely, and the expired refs are eventually > + -- removed. Because removal is faster than addition: -2 for each +1. > + -- > + local function ref_session_gc_step(self, now) > + -- This is inlined 2 iterations of the more general GC procedure. The > + -- latter is not called in order to save on not having a loop, > + -- additional branches and variables. > + if self.deadline > now then > + return > + end > + local top = ref_heap:top() > + ref_heap:remove_top() > + ref_map[top.id] = nil > + top = ref_heap:top() > + if not top then > + self.deadline = DEADLINE_INFINITY > + global_heap:update(self) > + ref_session_discount(self, 1) > + return > + end > + local deadline = top.deadline > + if deadline >= now then > + self.deadline = deadline > + global_heap:update(self) > + ref_session_discount(self, 1) > + return > + end > + ref_heap:remove_top() > + ref_map[top.id] = nil > + top = ref_heap:top() > + if not top then > + self.deadline = DEADLINE_INFINITY > + else > + self.deadline = top.deadline > + end > + global_heap:update(self) > + ref_session_discount(self, 2) > + end > + > + -- > + -- GC expired refs until they end or the limit on the number of iterations > + -- is exhausted. The limit is supposed to prevent too long GC which would > + -- occupy TX thread unfairly. > + -- > + -- Returns false if nothing to GC, or number of iterations left from the > + -- limit. The caller is supposed to yield when 0 is returned, and retry GC > + -- until it returns false. > + -- The function itself does not yield, because it is used from a more > + -- generic function GCing all sessions. It would not ever yield if all > + -- sessions would have less than limit refs, even if total ref count would > + -- be much bigger. > + -- > + -- Besides, the session might be killed during general GC. There must not be > + -- any yields in session methods so as not to introduce a support of dead > + -- sessions. > + -- > + local function ref_session_gc(self, limit, now) > + if self.deadline >= now then > + return false > + end > + local top = ref_heap:top() > + local del = 1 > + local rest = 0 > + local deadline > + repeat > + ref_heap:remove_top() > + ref_map[top.id] = nil > + top = ref_heap:top() > + if not top then > + self.deadline = DEADLINE_INFINITY > + rest = limit - del > + break > + end > + deadline = top.deadline > + if deadline >= now then > + self.deadline = deadline > + rest = limit - del > + break > + end > + del = del + 1 > + until del >= limit > + ref_session_discount(self, del) > + global_heap:update(self) > + return rest > + end > + > + local function ref_session_add(self, rid, deadline, now) > + if ref_map[rid] then > + return nil, lerror.vshard(lerror.code.STORAGE_REF_ADD, > + 'duplicate ref') > + end > + local ref = { > + deadline = deadline, > + id = rid, > + -- Used by the heap. > + index = -1, > + } > + ref_session_gc_step(self, now) > + ref_map[rid] = ref > + ref_heap:push(ref) > + if deadline < self.deadline then > + self.deadline = deadline > + global_heap:update(self) > + end > + count = count + 1 > + M.count = M.count + 1 > + return true > + end > + > + -- > + -- Ref use means it can't be expired until deleted explicitly. Should be > + -- done when the request affecting the whole storage starts. After use it is > + -- important to call del afterwards - GC won't delete it automatically now. > + -- Unless the entire session is killed. > + -- > + local function ref_session_use(self, rid) > + local ref = ref_map[rid] > + if not ref then > + return nil, lerror.vshard(lerror.code.STORAGE_REF_USE, 'no ref') > + end > + ref_heap:remove(ref) > + ref_session_update_deadline(self) > + return true > + end > + > + local function ref_session_del(self, rid) > + local ref = ref_map[rid] > + if not ref then > + return nil, lerror.vshard(lerror.code.STORAGE_REF_DEL, 'no ref') > + end > + ref_heap:remove_try(ref) > + ref_map[rid] = nil > + ref_session_update_deadline(self) > + ref_session_discount(self, 1) > + return true > + end > + > + local function ref_session_kill(self) > + global_map[sid] = nil > + global_heap:remove(self) > + ref_session_discount(self, count) > + end > + > + -- Don't use __index. It is useless since all sessions use closures as > + -- methods. Also it is probably slower because on each method call would > + -- need to get the metatable, get __index, find the method here. While now > + -- it is only an index operation on the session object. > + local session = { > + deadline = DEADLINE_INFINITY, > + -- Used by the heap. > + index = -1, > + -- Methods. > + del = ref_session_del, > + gc = ref_session_gc, > + add = ref_session_add, > + use = ref_session_use, > + kill = ref_session_kill, > + } > + global_map[sid] = session > + global_heap:push(session) > + return session > +end > + > +local function ref_gc() > + local session_heap = M.session_heap > + local session = session_heap:top() > + if not session then > + return > + end > + local limit = LUA_CHUNK_SIZE > + local now = fiber_clock() > + repeat > + limit = session:gc(limit, now) > + if not limit then > + return > + end > + if limit == 0 then > + fiber_yield() > + limit = LUA_CHUNK_SIZE > + now = fiber_clock() > + end > + session = session_heap:top() > + until not session > +end > + > +local function ref_add(rid, sid, timeout) > + local now = fiber_clock() > + local deadline = now + timeout > + local ok, err, session > + local storage = lregistry.storage > + while not storage.bucket_are_all_rw() do > + ok, err = storage.bucket_generation_wait(timeout) > + if not ok then > + return nil, err > + end > + now = fiber_clock() > + timeout = deadline - now > + end > + session = M.session_map[sid] > + if not session then > + session = ref_session_new(sid) > + end > + return session:add(rid, deadline, now) > +end > + > +local function ref_use(rid, sid) > + local session = M.session_map[sid] > + if not session then > + return nil, lerror.vshard(lerror.code.STORAGE_REF_USE, 'no session') > + end > + return session:use(rid) > +end > + > +local function ref_del(rid, sid) > + local session = M.session_map[sid] > + if not session then > + return nil, lerror.vshard(lerror.code.STORAGE_REF_DEL, 'no session') > + end > + return session:del(rid) > +end > + > +local function ref_kill_session(sid) > + local session = M.session_map[sid] > + if session then > + session:kill() > + end > +end > + > +local function ref_on_session_disconnect() > + ref_kill_session(box.session.id()) > +end > + > +local function ref_cfg() > + if M.on_disconnect then > + pcall(box.session.on_disconnect, nil, M.on_disconnect) > + end > + box.session.on_disconnect(ref_on_session_disconnect) > + M.on_disconnect = ref_on_session_disconnect > +end > + > +M.del = ref_del > +M.gc = ref_gc > +M.add = ref_add > +M.use = ref_use > +M.cfg = ref_cfg > +M.kill = ref_kill_session > +lregistry.storage_ref = M > + > +return M