[Tarantool-patches] [PATCH vshard 09/11] ref: introduce vshard.storage.ref module
Oleg Babin
olegrok at tarantool.org
Wed Feb 24 13:28:02 MSK 2021
Thanks for you patch. It's a brief review - I hope I'll look once again
on this patch.
Consider a question below.
On 23.02.2021 03:15, Vladislav Shpilevoy wrote:
> 'vshard.storage.ref' module helps to ensure that all buckets on
> the storage stay writable while there is at least one ref on the
> storage. Having storage referenced allows to execute any kinds of
> requests on all the visible data in all spaces in locally stored
> buckets.
>
> This is useful when need to access tons of buckets at once,
> especially when exact bucket IDs are not known.
>
> Refs have deadlines. So as the storage wouldn't freeze not being
> able to move buckets until restart in case a ref is not deleted
> due to an error in user's code or disconnect.
>
> The disconnects and restarts mean the refs can't be global.
> Otherwise any kinds of global counters, uuids and so on, even
> paired with any ids from a client could clash between clients on
> their reconnects or storage restarts. Unless they establish a
> TCP-like session, which would be too complicated.
>
> Instead, the refs are spread over the existing box sessions. This
> allows to bind refs of each client to its TCP connection and not
> care about how to make them unique, how not to mess the refs on
> restart, and how to drop the refs when a client disconnects.
>
> Vshard.storage.ref does not depend on internals of the main file
> (storage/init.lua), so it is implemented as a separate module to
> keep it simple and isolated. It uses the storage via the registry
> only to get a couple of functions from its API.
>
> In addition, having it in a module simplifies the tests.
>
> The API is not public so far, and is going to be privately used by
> the future map-reduce API.
>
> Part of #147
> ---
> test/reload_evolution/storage.result | 66 ++++
> test/reload_evolution/storage.test.lua | 28 ++
> test/storage/ref.result | 399 +++++++++++++++++++++++++
> test/storage/ref.test.lua | 166 ++++++++++
> test/unit-tap/ref.test.lua | 202 +++++++++++++
> vshard/consts.lua | 1 +
> vshard/error.lua | 19 ++
> vshard/storage/CMakeLists.txt | 2 +-
> vshard/storage/init.lua | 9 +
> vshard/storage/ref.lua | 371 +++++++++++++++++++++++
> 10 files changed, 1262 insertions(+), 1 deletion(-)
> create mode 100644 test/storage/ref.result
> create mode 100644 test/storage/ref.test.lua
> create mode 100755 test/unit-tap/ref.test.lua
> create mode 100644 vshard/storage/ref.lua
>
> diff --git a/test/reload_evolution/storage.result b/test/reload_evolution/storage.result
> index 9d30a04..c4a0cdd 100644
> --- a/test/reload_evolution/storage.result
> +++ b/test/reload_evolution/storage.result
> @@ -227,6 +227,72 @@ box.space._bucket.index.status:count({vshard.consts.BUCKET.ACTIVE})
> ---
> - 1500
> ...
> +--
> +-- Ensure storage refs are enabled and work from the scratch via reload.
> +--
> +lref = require('vshard.storage.ref')
> +---
> +...
> +vshard.storage.rebalancer_disable()
> +---
> +...
> +big_timeout = 1000000
> +---
> +...
> +timeout = 0.01
> +---
> +...
> +lref.add(0, 0, big_timeout)
> +---
> +- true
> +...
> +status_index = box.space._bucket.index.status
> +---
> +...
> +bucket_id_to_move = status_index:min({vshard.consts.BUCKET.ACTIVE}).id
> +---
> +...
> +ok, err = vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[2], \
> + {timeout = timeout})
> +---
> +...
> +assert(not ok and err.message)
> +---
> +- Storage is referenced
> +...
> +lref.del(0, 0)
> +---
> +- true
> +...
> +vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[2], \
> + {timeout = big_timeout})
> +---
> +- true
> +...
> +wait_bucket_is_collected(bucket_id_to_move)
> +---
> +...
> +test_run:switch('storage_2_a')
> +---
> +- true
> +...
> +vshard.storage.rebalancer_disable()
> +---
> +...
> +big_timeout = 1000000
> +---
> +...
> +bucket_id_to_move = test_run:eval('storage_1_a', 'return bucket_id_to_move')[1]
> +---
> +...
> +vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[1], \
> + {timeout = big_timeout})
> +---
> +- true
> +...
> +wait_bucket_is_collected(bucket_id_to_move)
> +---
> +...
> test_run:switch('default')
> ---
> - true
> diff --git a/test/reload_evolution/storage.test.lua b/test/reload_evolution/storage.test.lua
> index 639553e..c351ada 100644
> --- a/test/reload_evolution/storage.test.lua
> +++ b/test/reload_evolution/storage.test.lua
> @@ -83,6 +83,34 @@ box.space._bucket.index.status:count({vshard.consts.BUCKET.ACTIVE})
> test_run:switch('storage_1_a')
> box.space._bucket.index.status:count({vshard.consts.BUCKET.ACTIVE})
>
> +--
> +-- Ensure storage refs are enabled and work from the scratch via reload.
> +--
> +lref = require('vshard.storage.ref')
> +vshard.storage.rebalancer_disable()
> +
> +big_timeout = 1000000
> +timeout = 0.01
> +lref.add(0, 0, big_timeout)
> +status_index = box.space._bucket.index.status
> +bucket_id_to_move = status_index:min({vshard.consts.BUCKET.ACTIVE}).id
> +ok, err = vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[2], \
> + {timeout = timeout})
> +assert(not ok and err.message)
> +lref.del(0, 0)
> +vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[2], \
> + {timeout = big_timeout})
> +wait_bucket_is_collected(bucket_id_to_move)
> +
> +test_run:switch('storage_2_a')
> +vshard.storage.rebalancer_disable()
> +
> +big_timeout = 1000000
> +bucket_id_to_move = test_run:eval('storage_1_a', 'return bucket_id_to_move')[1]
> +vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[1], \
> + {timeout = big_timeout})
> +wait_bucket_is_collected(bucket_id_to_move)
> +
> test_run:switch('default')
> test_run:drop_cluster(REPLICASET_2)
> test_run:drop_cluster(REPLICASET_1)
> diff --git a/test/storage/ref.result b/test/storage/ref.result
> new file mode 100644
> index 0000000..d5f4166
> --- /dev/null
> +++ b/test/storage/ref.result
> @@ -0,0 +1,399 @@
> +-- test-run result file version 2
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +netbox = require('net.box')
> + | ---
> + | ...
> +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
> + | ---
> + | ...
> +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
> + | ---
> + | ...
> +
> +test_run:create_cluster(REPLICASET_1, 'storage')
> + | ---
> + | ...
> +test_run:create_cluster(REPLICASET_2, 'storage')
> + | ---
> + | ...
> +util = require('util')
> + | ---
> + | ...
> +util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
> + | ---
> + | ...
> +util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
> + | ---
> + | ...
> +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()')
> + | ---
> + | ...
> +
> +--
> +-- gh-147: refs allow to pin all the buckets on the storage at once. Is invented
> +-- for map-reduce functionality to pin all buckets on all storages in the
> +-- cluster to execute consistent map-reduce calls on all cluster data.
> +--
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +vshard.storage.rebalancer_disable()
> + | ---
> + | ...
> +vshard.storage.bucket_force_create(1, 1500)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +vshard.storage.rebalancer_disable()
> + | ---
> + | ...
> +vshard.storage.bucket_force_create(1501, 1500)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +lref = require('vshard.storage.ref')
> + | ---
> + | ...
> +
> +--
> +-- Bucket moves are not allowed under a ref.
> +--
> +util = require('util')
> + | ---
> + | ...
> +sid = 0
> + | ---
> + | ...
> +rid = 0
> + | ---
> + | ...
> +big_timeout = 1000000
> + | ---
> + | ...
> +small_timeout = 0.001
> + | ---
> + | ...
> +lref.add(rid, sid, big_timeout)
> + | ---
> + | - true
> + | ...
> +-- Send fails.
> +ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \
> + {timeout = big_timeout})
> + | ---
> + | ...
> +assert(not ok and err.message)
> + | ---
> + | - Storage is referenced
> + | ...
> +lref.use(rid, sid)
> + | ---
> + | - true
> + | ...
> +-- Still fails - use only makes ref undead until it is deleted explicitly.
> +ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \
> + {timeout = big_timeout})
> + | ---
> + | ...
> +assert(not ok and err.message)
> + | ---
> + | - Storage is referenced
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +-- Receive (from another replicaset) also fails.
> +big_timeout = 1000000
> + | ---
> + | ...
> +ok, err = vshard.storage.bucket_send(1501, util.replicasets[1], \
> + {timeout = big_timeout})
> + | ---
> + | ...
> +assert(not ok and err.message)
> + | ---
> + | - Storage is referenced
> + | ...
> +
> +--
> +-- After unref all the bucket moves are allowed again.
> +--
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +lref.del(rid, sid)
> + | ---
> + | - true
> + | ...
> +
> +vshard.storage.bucket_send(1, util.replicasets[2], {timeout = big_timeout})
> + | ---
> + | - true
> + | ...
> +wait_bucket_is_collected(1)
> + | ---
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +vshard.storage.bucket_send(1, util.replicasets[1], {timeout = big_timeout})
> + | ---
> + | - true
> + | ...
> +wait_bucket_is_collected(1)
> + | ---
> + | ...
> +
> +--
> +-- While bucket move is in progress, ref won't work.
> +--
> +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
> + | ---
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +fiber = require('fiber')
> + | ---
> + | ...
> +_ = fiber.create(vshard.storage.bucket_send, 1, util.replicasets[2], \
> + {timeout = big_timeout})
> + | ---
> + | ...
> +ok, err = lref.add(rid, sid, small_timeout)
> + | ---
> + | ...
> +assert(not ok and err.message)
> + | ---
> + | - Timeout exceeded
> + | ...
> +-- Ref will wait if timeout is big enough.
> +ok, err = nil
> + | ---
> + | ...
> +_ = fiber.create(function() \
> + ok, err = lref.add(rid, sid, big_timeout) \
> +end)
> + | ---
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false
> + | ---
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +wait_bucket_is_collected(1)
> + | ---
> + | ...
> +test_run:wait_cond(function() return ok or err end)
> + | ---
> + | - true
> + | ...
> +lref.use(rid, sid)
> + | ---
> + | - true
> + | ...
> +lref.del(rid, sid)
> + | ---
> + | - true
> + | ...
> +assert(ok and not err)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +vshard.storage.bucket_send(1, util.replicasets[1], {timeout = big_timeout})
> + | ---
> + | - true
> + | ...
> +wait_bucket_is_collected(1)
> + | ---
> + | ...
> +
> +--
> +-- Refs are bound to sessions.
> +--
> +box.schema.user.grant('storage', 'super')
> + | ---
> + | ...
> +lref = require('vshard.storage.ref')
> + | ---
> + | ...
> +small_timeout = 0.001
> + | ---
> + | ...
> +function make_ref(rid, timeout) \
> + return lref.add(rid, box.session.id(), timeout) \
> +end
> + | ---
> + | ...
> +function use_ref(rid) \
> + return lref.use(rid, box.session.id()) \
> +end
> + | ---
> + | ...
> +function del_ref(rid) \
> + return lref.del(rid, box.session.id()) \
> +end
> + | ---
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +netbox = require('net.box')
> + | ---
> + | ...
> +remote_uri = test_run:eval('storage_2_a', 'return box.cfg.listen')[1]
> + | ---
> + | ...
> +c = netbox.connect(remote_uri)
> + | ---
> + | ...
> +
> +-- Ref is added and does not disappear anywhere on its own.
> +c:call('make_ref', {1, small_timeout})
> + | ---
> + | - true
> + | ...
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +assert(lref.count == 1)
> + | ---
> + | - true
> + | ...
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +
> +-- Use works.
> +c:call('use_ref', {1})
> + | ---
> + | - true
> + | ...
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +assert(lref.count == 1)
> + | ---
> + | - true
> + | ...
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +
> +-- Del works.
> +c:call('del_ref', {1})
> + | ---
> + | - true
> + | ...
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +assert(lref.count == 0)
> + | ---
> + | - true
> + | ...
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +
> +-- Expiration works. Try to add a second ref when the first one is expired - the
> +-- first is collected and a subsequent use and del won't work.
> +c:call('make_ref', {1, small_timeout})
> + | ---
> + | - true
> + | ...
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +assert(lref.count == 1)
> + | ---
> + | - true
> + | ...
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +
> +fiber.sleep(small_timeout)
> + | ---
> + | ...
> +c:call('make_ref', {2, small_timeout})
> + | ---
> + | - true
> + | ...
> +ok, err = c:call('use_ref', {1})
> + | ---
> + | ...
> +assert(ok == nil and err.message)
> + | ---
> + | - 'Can not use a storage ref: no ref'
> + | ...
> +ok, err = c:call('del_ref', {1})
> + | ---
> + | ...
> +assert(ok == nil and err.message)
> + | ---
> + | - 'Can not delete a storage ref: no ref'
> + | ...
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +assert(lref.count == 1)
> + | ---
> + | - true
> + | ...
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +
> +--
> +-- Session disconnect removes its refs.
> +--
> +c:call('make_ref', {3, big_timeout})
> + | ---
> + | - true
> + | ...
> +c:close()
> + | ---
> + | ...
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +test_run:wait_cond(function() return lref.count == 0 end)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch("default")
> + | ---
> + | ...
> +test_run:drop_cluster(REPLICASET_2)
> + | ---
> + | ...
> +test_run:drop_cluster(REPLICASET_1)
> + | ---
> + | ...
> diff --git a/test/storage/ref.test.lua b/test/storage/ref.test.lua
> new file mode 100644
> index 0000000..b34a294
> --- /dev/null
> +++ b/test/storage/ref.test.lua
> @@ -0,0 +1,166 @@
> +test_run = require('test_run').new()
> +netbox = require('net.box')
> +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
> +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
> +
> +test_run:create_cluster(REPLICASET_1, 'storage')
> +test_run:create_cluster(REPLICASET_2, 'storage')
> +util = require('util')
> +util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
> +util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
> +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()')
> +
> +--
> +-- gh-147: refs allow to pin all the buckets on the storage at once. Is invented
> +-- for map-reduce functionality to pin all buckets on all storages in the
> +-- cluster to execute consistent map-reduce calls on all cluster data.
> +--
> +
> +_ = test_run:switch('storage_1_a')
> +vshard.storage.rebalancer_disable()
> +vshard.storage.bucket_force_create(1, 1500)
> +
> +_ = test_run:switch('storage_2_a')
> +vshard.storage.rebalancer_disable()
> +vshard.storage.bucket_force_create(1501, 1500)
> +
> +_ = test_run:switch('storage_1_a')
> +lref = require('vshard.storage.ref')
> +
> +--
> +-- Bucket moves are not allowed under a ref.
> +--
> +util = require('util')
> +sid = 0
> +rid = 0
> +big_timeout = 1000000
> +small_timeout = 0.001
> +lref.add(rid, sid, big_timeout)
> +-- Send fails.
> +ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \
> + {timeout = big_timeout})
> +assert(not ok and err.message)
> +lref.use(rid, sid)
> +-- Still fails - use only makes ref undead until it is deleted explicitly.
> +ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \
> + {timeout = big_timeout})
> +assert(not ok and err.message)
> +
> +_ = test_run:switch('storage_2_a')
> +-- Receive (from another replicaset) also fails.
> +big_timeout = 1000000
> +ok, err = vshard.storage.bucket_send(1501, util.replicasets[1], \
> + {timeout = big_timeout})
> +assert(not ok and err.message)
> +
> +--
> +-- After unref all the bucket moves are allowed again.
> +--
> +_ = test_run:switch('storage_1_a')
> +lref.del(rid, sid)
> +
> +vshard.storage.bucket_send(1, util.replicasets[2], {timeout = big_timeout})
> +wait_bucket_is_collected(1)
> +
> +_ = test_run:switch('storage_2_a')
> +vshard.storage.bucket_send(1, util.replicasets[1], {timeout = big_timeout})
> +wait_bucket_is_collected(1)
> +
> +--
> +-- While bucket move is in progress, ref won't work.
> +--
> +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
> +
> +_ = test_run:switch('storage_1_a')
> +fiber = require('fiber')
> +_ = fiber.create(vshard.storage.bucket_send, 1, util.replicasets[2], \
> + {timeout = big_timeout})
> +ok, err = lref.add(rid, sid, small_timeout)
> +assert(not ok and err.message)
> +-- Ref will wait if timeout is big enough.
> +ok, err = nil
> +_ = fiber.create(function() \
> + ok, err = lref.add(rid, sid, big_timeout) \
> +end)
> +
> +_ = test_run:switch('storage_2_a')
> +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false
> +
> +_ = test_run:switch('storage_1_a')
> +wait_bucket_is_collected(1)
> +test_run:wait_cond(function() return ok or err end)
> +lref.use(rid, sid)
> +lref.del(rid, sid)
> +assert(ok and not err)
> +
> +_ = test_run:switch('storage_2_a')
> +vshard.storage.bucket_send(1, util.replicasets[1], {timeout = big_timeout})
> +wait_bucket_is_collected(1)
> +
> +--
> +-- Refs are bound to sessions.
> +--
> +box.schema.user.grant('storage', 'super')
> +lref = require('vshard.storage.ref')
> +small_timeout = 0.001
> +function make_ref(rid, timeout) \
> + return lref.add(rid, box.session.id(), timeout) \
> +end
> +function use_ref(rid) \
> + return lref.use(rid, box.session.id()) \
> +end
> +function del_ref(rid) \
> + return lref.del(rid, box.session.id()) \
> +end
> +
> +_ = test_run:switch('storage_1_a')
> +netbox = require('net.box')
> +remote_uri = test_run:eval('storage_2_a', 'return box.cfg.listen')[1]
> +c = netbox.connect(remote_uri)
> +
> +-- Ref is added and does not disappear anywhere on its own.
> +c:call('make_ref', {1, small_timeout})
> +_ = test_run:switch('storage_2_a')
> +assert(lref.count == 1)
> +_ = test_run:switch('storage_1_a')
> +
> +-- Use works.
> +c:call('use_ref', {1})
> +_ = test_run:switch('storage_2_a')
> +assert(lref.count == 1)
> +_ = test_run:switch('storage_1_a')
> +
> +-- Del works.
> +c:call('del_ref', {1})
> +_ = test_run:switch('storage_2_a')
> +assert(lref.count == 0)
> +_ = test_run:switch('storage_1_a')
> +
> +-- Expiration works. Try to add a second ref when the first one is expired - the
> +-- first is collected and a subsequent use and del won't work.
> +c:call('make_ref', {1, small_timeout})
> +_ = test_run:switch('storage_2_a')
> +assert(lref.count == 1)
> +_ = test_run:switch('storage_1_a')
> +
> +fiber.sleep(small_timeout)
> +c:call('make_ref', {2, small_timeout})
> +ok, err = c:call('use_ref', {1})
> +assert(ok == nil and err.message)
> +ok, err = c:call('del_ref', {1})
> +assert(ok == nil and err.message)
> +_ = test_run:switch('storage_2_a')
> +assert(lref.count == 1)
> +_ = test_run:switch('storage_1_a')
> +
> +--
> +-- Session disconnect removes its refs.
> +--
> +c:call('make_ref', {3, big_timeout})
> +c:close()
> +_ = test_run:switch('storage_2_a')
> +test_run:wait_cond(function() return lref.count == 0 end)
> +
> +_ = test_run:switch("default")
> +test_run:drop_cluster(REPLICASET_2)
> +test_run:drop_cluster(REPLICASET_1)
> diff --git a/test/unit-tap/ref.test.lua b/test/unit-tap/ref.test.lua
> new file mode 100755
> index 0000000..d987a63
> --- /dev/null
> +++ b/test/unit-tap/ref.test.lua
> @@ -0,0 +1,202 @@
> +#!/usr/bin/env tarantool
> +
> +local tap = require('tap')
> +local test = tap.test('cfg')
> +local fiber = require('fiber')
> +local lregistry = require('vshard.registry')
> +local lref = require('vshard.storage.ref')
> +
> +local big_timeout = 1000000
> +local small_timeout = 0.000001
> +local sid = 0
> +local sid2 = 1
> +local sid3 = 2
> +
> +--
> +-- gh-147: refs allow to pin all the buckets on the storage at once. Is invented
> +-- for map-reduce functionality to pin all buckets on all storages in the
> +-- cluster to execute consistent map-reduce calls on all cluster data.
> +--
> +
> +--
> +-- Refs used storage API to get bucket space state and wait on its changes. But
> +-- not important for these unit tests.
> +--
> +local function bucket_are_all_rw()
> + return true
> +end
> +
> +lregistry.storage = {
> + bucket_are_all_rw = bucket_are_all_rw,
> +}
> +
> +--
> +-- Min heap fill and empty.
> +--
> +local function test_ref_basic(test)
> + test:plan(15)
> +
> + local rid = 0
> + local ok, err
> + --
> + -- Basic ref/unref.
> + --
> + ok, err = lref.add(rid, sid, big_timeout)
> + test:ok(ok and not err, '+1 ref')
> + test:is(lref.count, 1, 'accounted')
> + ok, err = lref.use(rid, sid)
> + test:ok(ok and not err, 'use the ref')
> + test:is(lref.count, 1, 'but still accounted')
> + ok, err = lref.del(rid, sid)
> + test:ok(ok and not err, '-1 ref')
> + test:is(lref.count, 0, 'accounted')
> +
> + --
> + -- Bad ref ID.
> + --
> + rid = 1
> + ok, err = lref.use(rid, sid)
> + test:ok(not ok and err, 'invalid RID at use')
> + ok, err = lref.del(rid, sid)
> + test:ok(not ok and err, 'invalid RID at del')
> +
> + --
> + -- Bad session ID.
> + --
> + lref.kill(sid)
> + rid = 0
> + ok, err = lref.use(rid, sid)
> + test:ok(not ok and err, 'invalid SID at use')
> + ok, err = lref.del(rid, sid)
> + test:ok(not ok and err, 'invalid SID at del')
> +
> + --
> + -- Duplicate ID.
> + --
> + ok, err = lref.add(rid, sid, big_timeout)
> + test:ok(ok and not err, 'add ref')
> + ok, err = lref.add(rid, sid, big_timeout)
> + test:ok(not ok and err, 'duplicate ref')
> + test:is(lref.count, 1, 'did not affect count')
> + test:ok(lref.use(rid, sid) and lref.del(rid, sid), 'del old ref')
> + test:is(lref.count, 0, 'accounted')
> +end
> +
> +local function test_ref_incremental_gc(test)
> + test:plan(20)
> +
> + --
> + -- Ref addition expires 2 old refs.
> + --
> + local ok, err
> + for i = 0, 2 do
> + assert(lref.add(i, sid, small_timeout))
> + end
> + fiber.sleep(small_timeout)
> + test:is(lref.count, 3, 'expired refs are still here')
> + test:ok(lref.add(3, sid, 0), 'add new ref')
> + -- 3 + 1 new - 2 old = 2.
> + test:is(lref.count, 2, 'it collected 2 old refs')
> + test:ok(lref.add(4, sid, 0), 'add new ref')
> + -- 2 + 1 new - 2 old = 1.
> + test:is(lref.count, 2, 'it collected 2 old refs')
> + test:ok(lref.del(4, sid), 'del the latest manually')
> +
> + --
> + -- Incremental GC works fine if only one ref was GCed.
> + --
> + test:ok(lref.add(0, sid, small_timeout), 'add ref with small timeout')
> + test:ok(lref.add(1, sid, big_timeout), 'add ref with big timeout')
> + fiber.sleep(small_timeout)
> + test:ok(lref.add(2, sid, 0), 'add ref with 0 timeout')
> + test:is(lref.count, 2, 'collected 1 old ref, 1 is kept')
> + test:ok(lref.del(2, sid), 'del newest ref, it was not collected')
> + test:ok(lref.del(1, sid), 'del ref with big timeout')
> + test:ok(lref.count, 0, 'all is deleted')
> +
> + --
> + -- GC works fine when only one ref was left and it was expired.
> + --
> + test:ok(lref.add(0, sid, small_timeout), 'add ref with small timeout')
> + test:is(lref.count, 1, '1 ref total')
> + fiber.sleep(small_timeout)
> + test:ok(lref.add(1, sid, big_timeout), 'add ref with big timeout')
> + test:is(lref.count, 1, 'collected the old one')
> + lref.gc()
> + test:is(lref.count, 1, 'still 1 - timeout was big')
> + test:ok(lref.del(1, sid), 'delete it')
> + test:is(lref.count, 0, 'no refs')
> +end
> +
> +local function test_ref_gc(test)
> + test:plan(7)
> +
> + --
> + -- Generic GC works fine with multiple sessions.
> + --
> + assert(lref.add(0, sid, big_timeout))
> + assert(lref.add(1, sid, small_timeout))
> + assert(lref.add(0, sid3, small_timeout))
> + assert(lref.add(0, sid2, small_timeout))
> + assert(lref.add(1, sid2, big_timeout))
> + assert(lref.add(1, sid3, big_timeout))
> + test:is(lref.count, 6, 'add 6 refs total')
> + fiber.sleep(small_timeout)
> + lref.gc()
> + test:is(lref.count, 3, '3 collected')
> + test:ok(lref.del(0, sid), 'del first')
> + test:ok(lref.del(1, sid2), 'del second')
> + test:ok(lref.del(1, sid3), 'del third')
> + test:is(lref.count, 0, '3 deleted')
> + lref.gc()
> + test:is(lref.count, 0, 'gc on empty refs did not break anything')
> +end
> +
> +local function test_ref_use(test)
> + test:plan(7)
> +
> + --
> + -- Ref use updates the session heap.
> + --
> + assert(lref.add(0, sid, small_timeout))
> + assert(lref.add(0, sid2, big_timeout))
> + test:ok(lref.count, 2, 'add 2 refs')
> + test:ok(lref.use(0, sid), 'use one with small timeout')
> + lref.gc()
> + test:is(lref.count, 2, 'still 2 refs')
> + fiber.sleep(small_timeout)
> + test:is(lref.count, 2, 'still 2 refs after sleep')
> + test:ok(lref.del(0, sid, 'del first'))
> + test:ok(lref.del(0, sid2, 'del second'))
> + test:is(lref.count, 0, 'now all is deleted')
> +end
> +
> +local function test_ref_del(test)
> + test:plan(7)
> +
> + --
> + -- Ref del updates the session heap.
> + --
> + assert(lref.add(0, sid, small_timeout))
> + assert(lref.add(0, sid2, big_timeout))
> + test:is(lref.count, 2, 'add 2 refs')
> + test:ok(lref.del(0, sid), 'del with small timeout')
> + lref.gc()
> + test:is(lref.count, 1, '1 ref remains')
> + fiber.sleep(small_timeout)
> + test:is(lref.count, 1, '1 ref remains after sleep')
> + lref.gc()
> + test:is(lref.count, 1, '1 ref remains after sleep and gc')
> + test:ok(lref.del(0, sid2), 'del with big timeout')
> + test:is(lref.count, 0, 'now all is deleted')
> +end
> +
> +test:plan(5)
> +
> +test:test('basic', test_ref_basic)
> +test:test('incremental gc', test_ref_incremental_gc)
> +test:test('gc', test_ref_gc)
> +test:test('use', test_ref_use)
> +test:test('del', test_ref_del)
> +
> +os.exit(test:check() and 0 or 1)
> diff --git a/vshard/consts.lua b/vshard/consts.lua
> index cf3f422..0ffe0e2 100644
> --- a/vshard/consts.lua
> +++ b/vshard/consts.lua
> @@ -48,4 +48,5 @@ return {
> DISCOVERY_TIMEOUT = 10,
>
> TIMEOUT_INFINITY = 500 * 365 * 86400,
> + DEADLINE_INFINITY = math.huge,
> }
> diff --git a/vshard/error.lua b/vshard/error.lua
> index a6f46a9..b02bfe9 100644
> --- a/vshard/error.lua
> +++ b/vshard/error.lua
> @@ -130,6 +130,25 @@ local error_message_template = {
> name = 'TOO_MANY_RECEIVING',
> msg = 'Too many receiving buckets at once, please, throttle'
> },
> + [26] = {
> + name = 'STORAGE_IS_REFERENCED',
> + msg = 'Storage is referenced'
> + },
> + [27] = {
> + name = 'STORAGE_REF_ADD',
> + msg = 'Can not add a storage ref: %s',
> + args = {'reason'},
> + },
> + [28] = {
> + name = 'STORAGE_REF_USE',
> + msg = 'Can not use a storage ref: %s',
> + args = {'reason'},
> + },
> + [29] = {
> + name = 'STORAGE_REF_DEL',
> + msg = 'Can not delete a storage ref: %s',
> + args = {'reason'},
> + },
> }
>
> --
> diff --git a/vshard/storage/CMakeLists.txt b/vshard/storage/CMakeLists.txt
> index 3f4ed43..7c1e97d 100644
> --- a/vshard/storage/CMakeLists.txt
> +++ b/vshard/storage/CMakeLists.txt
> @@ -1,2 +1,2 @@
> -install(FILES init.lua reload_evolution.lua
> +install(FILES init.lua reload_evolution.lua ref.lua
> DESTINATION ${TARANTOOL_INSTALL_LUADIR}/vshard/storage)
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index c3ed236..2957f48 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -17,6 +17,7 @@ if rawget(_G, MODULE_INTERNALS) then
> 'vshard.replicaset', 'vshard.util',
> 'vshard.storage.reload_evolution',
> 'vshard.lua_gc', 'vshard.rlist', 'vshard.registry',
> + 'vshard.heap', 'vshard.storage.ref',
> }
> for _, module in pairs(vshard_modules) do
> package.loaded[module] = nil
> @@ -30,6 +31,7 @@ local lreplicaset = require('vshard.replicaset')
> local util = require('vshard.util')
> local lua_gc = require('vshard.lua_gc')
> local lregistry = require('vshard.registry')
> +local lref = require('vshard.storage.ref')
> local reload_evolution = require('vshard.storage.reload_evolution')
> local fiber_cond_wait = util.fiber_cond_wait
> local bucket_ref_new
> @@ -1140,6 +1142,9 @@ local function bucket_recv_xc(bucket_id, from, data, opts)
> return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id, msg,
> from)
> end
> + if lref.count > 0 then
> + return nil, lerror.vshard(lerror.code.STORAGE_IS_REFERENCED)
> + end
You will remove this part in the next patch. Do you really need it? Or
you add it just for tests?
> if is_this_replicaset_locked() then
> return nil, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED)
> end
> @@ -1441,6 +1446,9 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard)
>
> local _bucket = box.space._bucket
> local bucket = _bucket:get({bucket_id})
> + if lref.count > 0 then
> + return nil, lerror.vshard(lerror.code.STORAGE_IS_REFERENCED)
> + end
Ditto.
> if is_this_replicaset_locked() then
> return nil, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED)
> end
> @@ -2528,6 +2536,7 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
> box.space._bucket:on_replace(nil, M.bucket_on_replace)
> M.bucket_on_replace = nil
> end
> + lref.cfg()
> if is_master then
> box.space._bucket:on_replace(bucket_generation_increment)
> M.bucket_on_replace = bucket_generation_increment
> diff --git a/vshard/storage/ref.lua b/vshard/storage/ref.lua
> new file mode 100644
> index 0000000..7589cb9
> --- /dev/null
> +++ b/vshard/storage/ref.lua
> @@ -0,0 +1,371 @@
> +--
> +-- 'Ref' module helps to ensure that all buckets on the storage stay writable
> +-- while there is at least one ref on the storage.
> +-- Having storage referenced allows to execute any kinds of requests on all the
> +-- visible data in all spaces in locally stored buckets. This is useful when
> +-- need to access tons of buckets at once, especially when exact bucket IDs are
> +-- not known.
> +--
> +-- Refs have deadlines. So as the storage wouldn't freeze not being able to move
> +-- buckets until restart in case a ref is not deleted due to an error in user's
> +-- code or disconnect.
> +--
> +-- The disconnects and restarts mean the refs can't be global. Otherwise any
> +-- kinds of global counters, uuids and so on, even paired with any ids from a
> +-- client could clash between clients on their reconnects or storage restarts.
> +-- Unless they establish a TCP-like session, which would be too complicated.
> +--
> +-- Instead, the refs are spread over the existing box sessions. This allows to
> +-- bind refs of each client to its TCP connection and not care about how to make
> +-- them unique across all sessions, how not to mess the refs on restart, and how
> +-- to drop the refs when a client disconnects.
> +--
> +
> +local MODULE_INTERNALS = '__module_vshard_storage_ref'
> +-- Update when change behaviour of anything in the file, to be able to reload.
> +local MODULE_VERSION = 1
> +
> +local lfiber = require('fiber')
> +local lheap = require('vshard.heap')
> +local lerror = require('vshard.error')
> +local lconsts = require('vshard.consts')
> +local lregistry = require('vshard.registry')
> +local fiber_clock = lfiber.clock
> +local fiber_yield = lfiber.yield
> +local DEADLINE_INFINITY = lconsts.DEADLINE_INFINITY
> +local LUA_CHUNK_SIZE = lconsts.LUA_CHUNK_SIZE
> +
> +--
> +-- Binary heap sort. Object with the closest deadline should be on top.
> +--
> +local function heap_min_deadline_cmp(ref1, ref2)
> + return ref1.deadline < ref2.deadline
> +end
> +
> +local M = rawget(_G, MODULE_INTERNALS)
> +if not M then
> + M = {
> + module_version = MODULE_VERSION,
> + -- Total number of references in all sessions.
> + count = 0,
> + -- Heap of session objects. Each session has refs sorted by their
> + -- deadline. The sessions themselves are also sorted by deadlines.
> + -- Session deadline is defined as the closest deadline of all its refs.
> + -- Or infinity in case there are no refs in it.
> + session_heap = lheap.new(heap_min_deadline_cmp),
> + -- Map of session objects. This is used to get session object by its ID.
> + session_map = {},
> + -- On session disconnect trigger to kill the dead sessions. It is saved
> + -- here for the sake of future reload to be able to delete the old
> + -- on disconnect function before setting a new one.
> + on_disconnect = nil,
> + }
> +else
> + -- No reload so far. This is a first version. Return as is.
> + return M
> +end
> +
> +local function ref_session_new(sid)
> + -- Session object does store its internal hot attributes in a table. Because
> + -- it would mean access to any session attribute would cost at least one
> + -- table indexing operation. Instead, all internal fields are stored as
> + -- upvalues referenced by the methods defined as closures.
> + --
> + -- This means session creation may not very suitable for jitting, but it is
> + -- very rare and attempts to optimize the most common case.
> + --
> + -- Still the public functions take 'self' object to make it look normally.
> + -- They even use it a bit.
> +
> + -- Ref map to get ref object by its ID.
> + local ref_map = {}
> + -- Ref heap sorted by their deadlines.
> + local ref_heap = lheap.new(heap_min_deadline_cmp)
> + -- Total number of refs of the session. Is used to drop the session without
> + -- fullscan of the ref map. Heap size can't be used because not all refs are
> + -- stored here. See more on that below.
> + local count = 0
> + -- Cache global session storages as upvalues to save on M indexing.
> + local global_heap = M.session_heap
> + local global_map = M.session_map
> +
> + local function ref_session_discount(self, del_count)
> + local new_count = M.count - del_count
> + assert(new_count >= 0)
> + M.count = new_count
> +
> + new_count = count - del_count
> + assert(new_count >= 0)
> + count = new_count
> + end
> +
> + local function ref_session_update_deadline(self)
> + local ref = ref_heap:top()
> + if not ref then
> + self.deadline = DEADLINE_INFINITY
> + global_heap:update(self)
> + else
> + local deadline = ref.deadline
> + if deadline ~= self.deadline then
> + self.deadline = deadline
> + global_heap:update(self)
> + end
> + end
> + end
> +
> + --
> + -- Garbage collect at most 2 expired refs. The idea is that there is no a
> + -- dedicated fiber for expired refs collection. It would be too expensive to
> + -- wakeup a fiber on each added or removed or updated ref.
> + --
> + -- Instead, ref GC is mostly incremental and works by the principle "remove
> + -- more than add". On each new ref added, two old refs try to expire. This
> + -- way refs don't stack infinitely, and the expired refs are eventually
> + -- removed. Because removal is faster than addition: -2 for each +1.
> + --
> + local function ref_session_gc_step(self, now)
> + -- This is inlined 2 iterations of the more general GC procedure. The
> + -- latter is not called in order to save on not having a loop,
> + -- additional branches and variables.
> + if self.deadline > now then
> + return
> + end
> + local top = ref_heap:top()
> + ref_heap:remove_top()
> + ref_map[top.id] = nil
> + top = ref_heap:top()
> + if not top then
> + self.deadline = DEADLINE_INFINITY
> + global_heap:update(self)
> + ref_session_discount(self, 1)
> + return
> + end
> + local deadline = top.deadline
> + if deadline >= now then
> + self.deadline = deadline
> + global_heap:update(self)
> + ref_session_discount(self, 1)
> + return
> + end
> + ref_heap:remove_top()
> + ref_map[top.id] = nil
> + top = ref_heap:top()
> + if not top then
> + self.deadline = DEADLINE_INFINITY
> + else
> + self.deadline = top.deadline
> + end
> + global_heap:update(self)
> + ref_session_discount(self, 2)
> + end
> +
> + --
> + -- GC expired refs until they end or the limit on the number of iterations
> + -- is exhausted. The limit is supposed to prevent too long GC which would
> + -- occupy TX thread unfairly.
> + --
> + -- Returns false if nothing to GC, or number of iterations left from the
> + -- limit. The caller is supposed to yield when 0 is returned, and retry GC
> + -- until it returns false.
> + -- The function itself does not yield, because it is used from a more
> + -- generic function GCing all sessions. It would not ever yield if all
> + -- sessions would have less than limit refs, even if total ref count would
> + -- be much bigger.
> + --
> + -- Besides, the session might be killed during general GC. There must not be
> + -- any yields in session methods so as not to introduce a support of dead
> + -- sessions.
> + --
> + local function ref_session_gc(self, limit, now)
> + if self.deadline >= now then
> + return false
> + end
> + local top = ref_heap:top()
> + local del = 1
> + local rest = 0
> + local deadline
> + repeat
> + ref_heap:remove_top()
> + ref_map[top.id] = nil
> + top = ref_heap:top()
> + if not top then
> + self.deadline = DEADLINE_INFINITY
> + rest = limit - del
> + break
> + end
> + deadline = top.deadline
> + if deadline >= now then
> + self.deadline = deadline
> + rest = limit - del
> + break
> + end
> + del = del + 1
> + until del >= limit
> + ref_session_discount(self, del)
> + global_heap:update(self)
> + return rest
> + end
> +
> + local function ref_session_add(self, rid, deadline, now)
> + if ref_map[rid] then
> + return nil, lerror.vshard(lerror.code.STORAGE_REF_ADD,
> + 'duplicate ref')
> + end
> + local ref = {
> + deadline = deadline,
> + id = rid,
> + -- Used by the heap.
> + index = -1,
> + }
> + ref_session_gc_step(self, now)
> + ref_map[rid] = ref
> + ref_heap:push(ref)
> + if deadline < self.deadline then
> + self.deadline = deadline
> + global_heap:update(self)
> + end
> + count = count + 1
> + M.count = M.count + 1
> + return true
> + end
> +
> + --
> + -- Ref use means it can't be expired until deleted explicitly. Should be
> + -- done when the request affecting the whole storage starts. After use it is
> + -- important to call del afterwards - GC won't delete it automatically now.
> + -- Unless the entire session is killed.
> + --
> + local function ref_session_use(self, rid)
> + local ref = ref_map[rid]
> + if not ref then
> + return nil, lerror.vshard(lerror.code.STORAGE_REF_USE, 'no ref')
> + end
> + ref_heap:remove(ref)
> + ref_session_update_deadline(self)
> + return true
> + end
> +
> + local function ref_session_del(self, rid)
> + local ref = ref_map[rid]
> + if not ref then
> + return nil, lerror.vshard(lerror.code.STORAGE_REF_DEL, 'no ref')
> + end
> + ref_heap:remove_try(ref)
> + ref_map[rid] = nil
> + ref_session_update_deadline(self)
> + ref_session_discount(self, 1)
> + return true
> + end
> +
> + local function ref_session_kill(self)
> + global_map[sid] = nil
> + global_heap:remove(self)
> + ref_session_discount(self, count)
> + end
> +
> + -- Don't use __index. It is useless since all sessions use closures as
> + -- methods. Also it is probably slower because on each method call would
> + -- need to get the metatable, get __index, find the method here. While now
> + -- it is only an index operation on the session object.
> + local session = {
> + deadline = DEADLINE_INFINITY,
> + -- Used by the heap.
> + index = -1,
> + -- Methods.
> + del = ref_session_del,
> + gc = ref_session_gc,
> + add = ref_session_add,
> + use = ref_session_use,
> + kill = ref_session_kill,
> + }
> + global_map[sid] = session
> + global_heap:push(session)
> + return session
> +end
> +
> +local function ref_gc()
> + local session_heap = M.session_heap
> + local session = session_heap:top()
> + if not session then
> + return
> + end
> + local limit = LUA_CHUNK_SIZE
> + local now = fiber_clock()
> + repeat
> + limit = session:gc(limit, now)
> + if not limit then
> + return
> + end
> + if limit == 0 then
> + fiber_yield()
> + limit = LUA_CHUNK_SIZE
> + now = fiber_clock()
> + end
> + session = session_heap:top()
> + until not session
> +end
> +
> +local function ref_add(rid, sid, timeout)
> + local now = fiber_clock()
> + local deadline = now + timeout
> + local ok, err, session
> + local storage = lregistry.storage
> + while not storage.bucket_are_all_rw() do
> + ok, err = storage.bucket_generation_wait(timeout)
> + if not ok then
> + return nil, err
> + end
> + now = fiber_clock()
> + timeout = deadline - now
> + end
> + session = M.session_map[sid]
> + if not session then
> + session = ref_session_new(sid)
> + end
> + return session:add(rid, deadline, now)
> +end
> +
> +local function ref_use(rid, sid)
> + local session = M.session_map[sid]
> + if not session then
> + return nil, lerror.vshard(lerror.code.STORAGE_REF_USE, 'no session')
> + end
> + return session:use(rid)
> +end
> +
> +local function ref_del(rid, sid)
> + local session = M.session_map[sid]
> + if not session then
> + return nil, lerror.vshard(lerror.code.STORAGE_REF_DEL, 'no session')
> + end
> + return session:del(rid)
> +end
> +
> +local function ref_kill_session(sid)
> + local session = M.session_map[sid]
> + if session then
> + session:kill()
> + end
> +end
> +
> +local function ref_on_session_disconnect()
> + ref_kill_session(box.session.id())
> +end
> +
> +local function ref_cfg()
> + if M.on_disconnect then
> + pcall(box.session.on_disconnect, nil, M.on_disconnect)
> + end
> + box.session.on_disconnect(ref_on_session_disconnect)
> + M.on_disconnect = ref_on_session_disconnect
> +end
> +
> +M.del = ref_del
> +M.gc = ref_gc
> +M.add = ref_add
> +M.use = ref_use
> +M.cfg = ref_cfg
> +M.kill = ref_kill_session
> +lregistry.storage_ref = M
> +
> +return M
More information about the Tarantool-patches
mailing list