[Tarantool-patches] [PATCH vshard 09/11] ref: introduce vshard.storage.ref module
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Sun Mar 21 21:49:07 MSK 2021
After another self-review I found a bug here - a disconnected session was deleted
even if it had running requests. Because on_disconnect triggers are called in the
moment of disconnect, not when the session is deleted. My bad.
It should not happen, because there still might be a read-write request changing
the data, which needs them to be consistent.
It was also possible to make the session not deleted even if has unused refs,
but I decided that it would be safer to delete when used count becomes 0. This
should help in cases when users might set too big timeouts, but didn't get to the
"map" stage, which would prevent the sessions from deletion, and blocking the
rebalancing for too long time.
It still might happen, but at least we can tell such users to reconnect/restart
their routers instead of restarting the storage.
I updated the patch. Below is the incremental diff and a full diff of the patch
in the end.
====================
diff --git a/test/storage/ref.result b/test/storage/ref.result
index d5f4166..c115d99 100644
--- a/test/storage/ref.result
+++ b/test/storage/ref.result
@@ -371,18 +371,66 @@ _ = test_run:switch('storage_1_a')
| ...
--
--- Session disconnect removes its refs.
+-- Session disconnect keeps the refs, but the session is deleted when
+-- used ref count becomes 0. Unused refs don't prevent session deletion.
--
-c:call('make_ref', {3, big_timeout})
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+keep_long_ref = true
+ | ---
+ | ...
+function long_ref_request(rid) \
+ local sid = box.session.id() \
+ assert(lref.add(rid, sid, big_timeout)) \
+ assert(lref.use(rid, sid)) \
+ while keep_long_ref do \
+ fiber.sleep(small_timeout) \
+ end \
+ assert(lref.del(rid, sid)) \
+end
+ | ---
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+_ = c:call('long_ref_request', {3}, {is_async = true})
+ | ---
+ | ...
+c:call('make_ref', {4, big_timeout})
| ---
| - true
| ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 2 end)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
c:close()
| ---
| ...
+
_ = test_run:switch('storage_2_a')
| ---
| ...
+-- Still 2 refs.
+assert(lref.count == 2)
+ | ---
+ | - true
+ | ...
+-- The long request ends and the session must be deleted - that was the last
+-- used ref.
+keep_long_ref = false
+ | ---
+ | ...
test_run:wait_cond(function() return lref.count == 0 end)
| ---
| - true
diff --git a/test/storage/ref.test.lua b/test/storage/ref.test.lua
index b34a294..5b57ea4 100644
--- a/test/storage/ref.test.lua
+++ b/test/storage/ref.test.lua
@@ -154,11 +154,37 @@ assert(lref.count == 1)
_ = test_run:switch('storage_1_a')
--
--- Session disconnect removes its refs.
+-- Session disconnect keeps the refs, but the session is deleted when
+-- used ref count becomes 0. Unused refs don't prevent session deletion.
--
-c:call('make_ref', {3, big_timeout})
+_ = test_run:switch('storage_2_a')
+keep_long_ref = true
+function long_ref_request(rid) \
+ local sid = box.session.id() \
+ assert(lref.add(rid, sid, big_timeout)) \
+ assert(lref.use(rid, sid)) \
+ while keep_long_ref do \
+ fiber.sleep(small_timeout) \
+ end \
+ assert(lref.del(rid, sid)) \
+end
+
+_ = test_run:switch('storage_1_a')
+_ = c:call('long_ref_request', {3}, {is_async = true})
+c:call('make_ref', {4, big_timeout})
+
+_ = test_run:switch('storage_2_a')
+test_run:wait_cond(function() return lref.count == 2 end)
+
+_ = test_run:switch('storage_1_a')
c:close()
+
_ = test_run:switch('storage_2_a')
+-- Still 2 refs.
+assert(lref.count == 2)
+-- The long request ends and the session must be deleted - that was the last
+-- used ref.
+keep_long_ref = false
test_run:wait_cond(function() return lref.count == 0 end)
_ = test_run:switch("default")
diff --git a/test/unit-tap/ref.test.lua b/test/unit-tap/ref.test.lua
index 99ef69f..fdd0477 100755
--- a/test/unit-tap/ref.test.lua
+++ b/test/unit-tap/ref.test.lua
@@ -190,13 +190,39 @@ local function test_ref_del(test)
test:is(lref.count, 0, 'now all is deleted')
end
+local function test_ref_dead_session(test)
+ test:plan(4)
+
+ --
+ -- Session after disconnect still might have running requests. It must
+ -- be kept alive with its refs until the requests are done.
+ --
+ assert(lref.add(0, sid, small_timeout))
+ assert(lref.use(0, sid))
+ lref.kill(sid)
+ test:ok(lref.del(0, sid))
+
+ --
+ -- The dead session is kept only while the used requests are running. It is
+ -- deleted when use count becomes 0 even if there were unused refs.
+ --
+ assert(lref.add(0, sid, big_timeout))
+ assert(lref.add(1, sid, big_timeout))
+ assert(lref.use(0, sid))
+ lref.kill(sid)
+ test:is(lref.count, 2, '2 refs in a dead session')
+ test:ok(lref.del(0, sid), 'delete the used ref')
+ test:is(lref.count, 0, '0 refs - the unused ref was deleted with session')
+end
+
local test = tap.test('ref')
-test:plan(5)
+test:plan(6)
test:test('basic', test_ref_basic)
test:test('incremental gc', test_ref_incremental_gc)
test:test('gc', test_ref_gc)
test:test('use', test_ref_use)
test:test('del', test_ref_del)
+test:test('dead session use', test_ref_dead_session)
os.exit(test:check() and 0 or 1)
diff --git a/vshard/storage/ref.lua b/vshard/storage/ref.lua
index 620913e..a024d8e 100644
--- a/vshard/storage/ref.lua
+++ b/vshard/storage/ref.lua
@@ -81,10 +81,17 @@ local function ref_session_new(sid)
local ref_map = {}
-- Ref heap sorted by their deadlines.
local ref_heap = lheap.new(heap_min_deadline_cmp)
- -- Total number of refs of the session. Is used to drop the session without
- -- fullscan of the ref map. Heap size can't be used because not all refs are
- -- stored here. See more on that below.
- local ref_count = 0
+ -- Total number of refs of the session. Is used to drop the session when it
+ -- it is disconnected and has no refs anymore. Heap size can't be used
+ -- because not all refs are stored here.
+ local ref_count_total = 0
+ -- Number of refs in use. They are included into the total count. The used
+ -- refs are accounted explicitly in order to detect when a disconnected
+ -- session has no used refs anymore and can be deleted.
+ local ref_count_use = 0
+ -- When the session becomes disconnected, it must be deleted from the global
+ -- heap when all its used refs are gone.
+ local is_disconnected = false
-- Cache global session storages as upvalues to save on M indexing.
local global_heap = M.session_heap
local global_map = M.session_map
@@ -94,9 +101,18 @@ local function ref_session_new(sid)
assert(new_count >= 0)
M.count = new_count
- new_count = ref_count - del_count
+ new_count = ref_count_total - del_count
assert(new_count >= 0)
- ref_count = new_count
+ ref_count_total = new_count
+ end
+
+ local function ref_session_delete_if_not_used(self)
+ if not is_disconnected or ref_count_use > 0 then
+ return
+ end
+ ref_session_discount(self, ref_count_total)
+ global_map[sid] = nil
+ global_heap:remove(self)
end
local function ref_session_update_deadline(self)
@@ -224,7 +240,7 @@ local function ref_session_new(sid)
self.deadline = deadline
global_heap:update(self)
end
- ref_count = ref_count + 1
+ ref_count_total = ref_count_total + 1
M.count = M.count + 1
return true
end
@@ -242,6 +258,7 @@ local function ref_session_new(sid)
end
ref_heap:remove(ref)
ref_session_update_deadline(self)
+ ref_count_use = ref_count_use + 1
return true
end
@@ -250,17 +267,24 @@ local function ref_session_new(sid)
if not ref then
return nil, lerror.vshard(lerror.code.STORAGE_REF_DEL, 'no ref')
end
- ref_heap:remove_try(ref)
ref_map[rid] = nil
- ref_session_update_deadline(self)
- ref_session_discount(self, 1)
+ if ref.index == -1 then
+ ref_session_update_deadline(self)
+ ref_session_discount(self, 1)
+ ref_count_use = ref_count_use - 1
+ ref_session_delete_if_not_used(self)
+ else
+ ref_heap:remove(ref)
+ ref_session_update_deadline(self)
+ ref_session_discount(self, 1)
+ end
return true
end
local function ref_session_kill(self)
- global_map[sid] = nil
- global_heap:remove(self)
- ref_session_discount(self, ref_count)
+ assert(not is_disconnected)
+ is_disconnected = true
+ ref_session_delete_if_not_used(self)
end
-- Don't use __index. It is useless since all sessions use closures as
====================
Here is the full patch:
====================
diff --git a/test/reload_evolution/storage.result b/test/reload_evolution/storage.result
index 9d30a04..c4a0cdd 100644
--- a/test/reload_evolution/storage.result
+++ b/test/reload_evolution/storage.result
@@ -227,6 +227,72 @@ box.space._bucket.index.status:count({vshard.consts.BUCKET.ACTIVE})
---
- 1500
...
+--
+-- Ensure storage refs are enabled and work from the scratch via reload.
+--
+lref = require('vshard.storage.ref')
+---
+...
+vshard.storage.rebalancer_disable()
+---
+...
+big_timeout = 1000000
+---
+...
+timeout = 0.01
+---
+...
+lref.add(0, 0, big_timeout)
+---
+- true
+...
+status_index = box.space._bucket.index.status
+---
+...
+bucket_id_to_move = status_index:min({vshard.consts.BUCKET.ACTIVE}).id
+---
+...
+ok, err = vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[2], \
+ {timeout = timeout})
+---
+...
+assert(not ok and err.message)
+---
+- Storage is referenced
+...
+lref.del(0, 0)
+---
+- true
+...
+vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[2], \
+ {timeout = big_timeout})
+---
+- true
+...
+wait_bucket_is_collected(bucket_id_to_move)
+---
+...
+test_run:switch('storage_2_a')
+---
+- true
+...
+vshard.storage.rebalancer_disable()
+---
+...
+big_timeout = 1000000
+---
+...
+bucket_id_to_move = test_run:eval('storage_1_a', 'return bucket_id_to_move')[1]
+---
+...
+vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[1], \
+ {timeout = big_timeout})
+---
+- true
+...
+wait_bucket_is_collected(bucket_id_to_move)
+---
+...
test_run:switch('default')
---
- true
diff --git a/test/reload_evolution/storage.test.lua b/test/reload_evolution/storage.test.lua
index 639553e..c351ada 100644
--- a/test/reload_evolution/storage.test.lua
+++ b/test/reload_evolution/storage.test.lua
@@ -83,6 +83,34 @@ box.space._bucket.index.status:count({vshard.consts.BUCKET.ACTIVE})
test_run:switch('storage_1_a')
box.space._bucket.index.status:count({vshard.consts.BUCKET.ACTIVE})
+--
+-- Ensure storage refs are enabled and work from the scratch via reload.
+--
+lref = require('vshard.storage.ref')
+vshard.storage.rebalancer_disable()
+
+big_timeout = 1000000
+timeout = 0.01
+lref.add(0, 0, big_timeout)
+status_index = box.space._bucket.index.status
+bucket_id_to_move = status_index:min({vshard.consts.BUCKET.ACTIVE}).id
+ok, err = vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[2], \
+ {timeout = timeout})
+assert(not ok and err.message)
+lref.del(0, 0)
+vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[2], \
+ {timeout = big_timeout})
+wait_bucket_is_collected(bucket_id_to_move)
+
+test_run:switch('storage_2_a')
+vshard.storage.rebalancer_disable()
+
+big_timeout = 1000000
+bucket_id_to_move = test_run:eval('storage_1_a', 'return bucket_id_to_move')[1]
+vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[1], \
+ {timeout = big_timeout})
+wait_bucket_is_collected(bucket_id_to_move)
+
test_run:switch('default')
test_run:drop_cluster(REPLICASET_2)
test_run:drop_cluster(REPLICASET_1)
diff --git a/test/storage/ref.result b/test/storage/ref.result
new file mode 100644
index 0000000..c115d99
--- /dev/null
+++ b/test/storage/ref.result
@@ -0,0 +1,447 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+netbox = require('net.box')
+ | ---
+ | ...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+ | ---
+ | ...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+ | ---
+ | ...
+
+test_run:create_cluster(REPLICASET_1, 'storage')
+ | ---
+ | ...
+test_run:create_cluster(REPLICASET_2, 'storage')
+ | ---
+ | ...
+util = require('util')
+ | ---
+ | ...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+ | ---
+ | ...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+ | ---
+ | ...
+util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()')
+ | ---
+ | ...
+
+--
+-- gh-147: refs allow to pin all the buckets on the storage at once. Is invented
+-- for map-reduce functionality to pin all buckets on all storages in the
+-- cluster to execute consistent map-reduce calls on all cluster data.
+--
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+vshard.storage.rebalancer_disable()
+ | ---
+ | ...
+vshard.storage.bucket_force_create(1, 1500)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+vshard.storage.rebalancer_disable()
+ | ---
+ | ...
+vshard.storage.bucket_force_create(1501, 1500)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+lref = require('vshard.storage.ref')
+ | ---
+ | ...
+
+--
+-- Bucket moves are not allowed under a ref.
+--
+util = require('util')
+ | ---
+ | ...
+sid = 0
+ | ---
+ | ...
+rid = 0
+ | ---
+ | ...
+big_timeout = 1000000
+ | ---
+ | ...
+small_timeout = 0.001
+ | ---
+ | ...
+lref.add(rid, sid, big_timeout)
+ | ---
+ | - true
+ | ...
+-- Send fails.
+ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \
+ {timeout = big_timeout})
+ | ---
+ | ...
+assert(not ok and err.message)
+ | ---
+ | - Storage is referenced
+ | ...
+lref.use(rid, sid)
+ | ---
+ | - true
+ | ...
+-- Still fails - use only makes ref undead until it is deleted explicitly.
+ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \
+ {timeout = big_timeout})
+ | ---
+ | ...
+assert(not ok and err.message)
+ | ---
+ | - Storage is referenced
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+-- Receive (from another replicaset) also fails.
+big_timeout = 1000000
+ | ---
+ | ...
+ok, err = vshard.storage.bucket_send(1501, util.replicasets[1], \
+ {timeout = big_timeout})
+ | ---
+ | ...
+assert(not ok and err.message)
+ | ---
+ | - Storage is referenced
+ | ...
+
+--
+-- After unref all the bucket moves are allowed again.
+--
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+lref.del(rid, sid)
+ | ---
+ | - true
+ | ...
+
+vshard.storage.bucket_send(1, util.replicasets[2], {timeout = big_timeout})
+ | ---
+ | - true
+ | ...
+wait_bucket_is_collected(1)
+ | ---
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+vshard.storage.bucket_send(1, util.replicasets[1], {timeout = big_timeout})
+ | ---
+ | - true
+ | ...
+wait_bucket_is_collected(1)
+ | ---
+ | ...
+
+--
+-- While bucket move is in progress, ref won't work.
+--
+vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
+ | ---
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+_ = fiber.create(vshard.storage.bucket_send, 1, util.replicasets[2], \
+ {timeout = big_timeout})
+ | ---
+ | ...
+ok, err = lref.add(rid, sid, small_timeout)
+ | ---
+ | ...
+assert(not ok and err.message)
+ | ---
+ | - Timeout exceeded
+ | ...
+-- Ref will wait if timeout is big enough.
+ok, err = nil
+ | ---
+ | ...
+_ = fiber.create(function() \
+ ok, err = lref.add(rid, sid, big_timeout) \
+end)
+ | ---
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false
+ | ---
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+wait_bucket_is_collected(1)
+ | ---
+ | ...
+test_run:wait_cond(function() return ok or err end)
+ | ---
+ | - true
+ | ...
+lref.use(rid, sid)
+ | ---
+ | - true
+ | ...
+lref.del(rid, sid)
+ | ---
+ | - true
+ | ...
+assert(ok and not err)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+vshard.storage.bucket_send(1, util.replicasets[1], {timeout = big_timeout})
+ | ---
+ | - true
+ | ...
+wait_bucket_is_collected(1)
+ | ---
+ | ...
+
+--
+-- Refs are bound to sessions.
+--
+box.schema.user.grant('storage', 'super')
+ | ---
+ | ...
+lref = require('vshard.storage.ref')
+ | ---
+ | ...
+small_timeout = 0.001
+ | ---
+ | ...
+function make_ref(rid, timeout) \
+ return lref.add(rid, box.session.id(), timeout) \
+end
+ | ---
+ | ...
+function use_ref(rid) \
+ return lref.use(rid, box.session.id()) \
+end
+ | ---
+ | ...
+function del_ref(rid) \
+ return lref.del(rid, box.session.id()) \
+end
+ | ---
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+netbox = require('net.box')
+ | ---
+ | ...
+remote_uri = test_run:eval('storage_2_a', 'return box.cfg.listen')[1]
+ | ---
+ | ...
+c = netbox.connect(remote_uri)
+ | ---
+ | ...
+
+-- Ref is added and does not disappear anywhere on its own.
+c:call('make_ref', {1, small_timeout})
+ | ---
+ | - true
+ | ...
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+assert(lref.count == 1)
+ | ---
+ | - true
+ | ...
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+
+-- Use works.
+c:call('use_ref', {1})
+ | ---
+ | - true
+ | ...
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+assert(lref.count == 1)
+ | ---
+ | - true
+ | ...
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+
+-- Del works.
+c:call('del_ref', {1})
+ | ---
+ | - true
+ | ...
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+assert(lref.count == 0)
+ | ---
+ | - true
+ | ...
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+
+-- Expiration works. Try to add a second ref when the first one is expired - the
+-- first is collected and a subsequent use and del won't work.
+c:call('make_ref', {1, small_timeout})
+ | ---
+ | - true
+ | ...
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+assert(lref.count == 1)
+ | ---
+ | - true
+ | ...
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+
+fiber.sleep(small_timeout)
+ | ---
+ | ...
+c:call('make_ref', {2, small_timeout})
+ | ---
+ | - true
+ | ...
+ok, err = c:call('use_ref', {1})
+ | ---
+ | ...
+assert(ok == nil and err.message)
+ | ---
+ | - 'Can not use a storage ref: no ref'
+ | ...
+ok, err = c:call('del_ref', {1})
+ | ---
+ | ...
+assert(ok == nil and err.message)
+ | ---
+ | - 'Can not delete a storage ref: no ref'
+ | ...
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+assert(lref.count == 1)
+ | ---
+ | - true
+ | ...
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+
+--
+-- Session disconnect keeps the refs, but the session is deleted when
+-- used ref count becomes 0. Unused refs don't prevent session deletion.
+--
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+keep_long_ref = true
+ | ---
+ | ...
+function long_ref_request(rid) \
+ local sid = box.session.id() \
+ assert(lref.add(rid, sid, big_timeout)) \
+ assert(lref.use(rid, sid)) \
+ while keep_long_ref do \
+ fiber.sleep(small_timeout) \
+ end \
+ assert(lref.del(rid, sid)) \
+end
+ | ---
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+_ = c:call('long_ref_request', {3}, {is_async = true})
+ | ---
+ | ...
+c:call('make_ref', {4, big_timeout})
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 2 end)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+c:close()
+ | ---
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+-- Still 2 refs.
+assert(lref.count == 2)
+ | ---
+ | - true
+ | ...
+-- The long request ends and the session must be deleted - that was the last
+-- used ref.
+keep_long_ref = false
+ | ---
+ | ...
+test_run:wait_cond(function() return lref.count == 0 end)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch("default")
+ | ---
+ | ...
+test_run:drop_cluster(REPLICASET_2)
+ | ---
+ | ...
+test_run:drop_cluster(REPLICASET_1)
+ | ---
+ | ...
diff --git a/test/storage/ref.test.lua b/test/storage/ref.test.lua
new file mode 100644
index 0000000..5b57ea4
--- /dev/null
+++ b/test/storage/ref.test.lua
@@ -0,0 +1,192 @@
+test_run = require('test_run').new()
+netbox = require('net.box')
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+
+test_run:create_cluster(REPLICASET_1, 'storage')
+test_run:create_cluster(REPLICASET_2, 'storage')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()')
+
+--
+-- gh-147: refs allow to pin all the buckets on the storage at once. Is invented
+-- for map-reduce functionality to pin all buckets on all storages in the
+-- cluster to execute consistent map-reduce calls on all cluster data.
+--
+
+_ = test_run:switch('storage_1_a')
+vshard.storage.rebalancer_disable()
+vshard.storage.bucket_force_create(1, 1500)
+
+_ = test_run:switch('storage_2_a')
+vshard.storage.rebalancer_disable()
+vshard.storage.bucket_force_create(1501, 1500)
+
+_ = test_run:switch('storage_1_a')
+lref = require('vshard.storage.ref')
+
+--
+-- Bucket moves are not allowed under a ref.
+--
+util = require('util')
+sid = 0
+rid = 0
+big_timeout = 1000000
+small_timeout = 0.001
+lref.add(rid, sid, big_timeout)
+-- Send fails.
+ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \
+ {timeout = big_timeout})
+assert(not ok and err.message)
+lref.use(rid, sid)
+-- Still fails - use only makes ref undead until it is deleted explicitly.
+ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \
+ {timeout = big_timeout})
+assert(not ok and err.message)
+
+_ = test_run:switch('storage_2_a')
+-- Receive (from another replicaset) also fails.
+big_timeout = 1000000
+ok, err = vshard.storage.bucket_send(1501, util.replicasets[1], \
+ {timeout = big_timeout})
+assert(not ok and err.message)
+
+--
+-- After unref all the bucket moves are allowed again.
+--
+_ = test_run:switch('storage_1_a')
+lref.del(rid, sid)
+
+vshard.storage.bucket_send(1, util.replicasets[2], {timeout = big_timeout})
+wait_bucket_is_collected(1)
+
+_ = test_run:switch('storage_2_a')
+vshard.storage.bucket_send(1, util.replicasets[1], {timeout = big_timeout})
+wait_bucket_is_collected(1)
+
+--
+-- While bucket move is in progress, ref won't work.
+--
+vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
+
+_ = test_run:switch('storage_1_a')
+fiber = require('fiber')
+_ = fiber.create(vshard.storage.bucket_send, 1, util.replicasets[2], \
+ {timeout = big_timeout})
+ok, err = lref.add(rid, sid, small_timeout)
+assert(not ok and err.message)
+-- Ref will wait if timeout is big enough.
+ok, err = nil
+_ = fiber.create(function() \
+ ok, err = lref.add(rid, sid, big_timeout) \
+end)
+
+_ = test_run:switch('storage_2_a')
+vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false
+
+_ = test_run:switch('storage_1_a')
+wait_bucket_is_collected(1)
+test_run:wait_cond(function() return ok or err end)
+lref.use(rid, sid)
+lref.del(rid, sid)
+assert(ok and not err)
+
+_ = test_run:switch('storage_2_a')
+vshard.storage.bucket_send(1, util.replicasets[1], {timeout = big_timeout})
+wait_bucket_is_collected(1)
+
+--
+-- Refs are bound to sessions.
+--
+box.schema.user.grant('storage', 'super')
+lref = require('vshard.storage.ref')
+small_timeout = 0.001
+function make_ref(rid, timeout) \
+ return lref.add(rid, box.session.id(), timeout) \
+end
+function use_ref(rid) \
+ return lref.use(rid, box.session.id()) \
+end
+function del_ref(rid) \
+ return lref.del(rid, box.session.id()) \
+end
+
+_ = test_run:switch('storage_1_a')
+netbox = require('net.box')
+remote_uri = test_run:eval('storage_2_a', 'return box.cfg.listen')[1]
+c = netbox.connect(remote_uri)
+
+-- Ref is added and does not disappear anywhere on its own.
+c:call('make_ref', {1, small_timeout})
+_ = test_run:switch('storage_2_a')
+assert(lref.count == 1)
+_ = test_run:switch('storage_1_a')
+
+-- Use works.
+c:call('use_ref', {1})
+_ = test_run:switch('storage_2_a')
+assert(lref.count == 1)
+_ = test_run:switch('storage_1_a')
+
+-- Del works.
+c:call('del_ref', {1})
+_ = test_run:switch('storage_2_a')
+assert(lref.count == 0)
+_ = test_run:switch('storage_1_a')
+
+-- Expiration works. Try to add a second ref when the first one is expired - the
+-- first is collected and a subsequent use and del won't work.
+c:call('make_ref', {1, small_timeout})
+_ = test_run:switch('storage_2_a')
+assert(lref.count == 1)
+_ = test_run:switch('storage_1_a')
+
+fiber.sleep(small_timeout)
+c:call('make_ref', {2, small_timeout})
+ok, err = c:call('use_ref', {1})
+assert(ok == nil and err.message)
+ok, err = c:call('del_ref', {1})
+assert(ok == nil and err.message)
+_ = test_run:switch('storage_2_a')
+assert(lref.count == 1)
+_ = test_run:switch('storage_1_a')
+
+--
+-- Session disconnect keeps the refs, but the session is deleted when
+-- used ref count becomes 0. Unused refs don't prevent session deletion.
+--
+_ = test_run:switch('storage_2_a')
+keep_long_ref = true
+function long_ref_request(rid) \
+ local sid = box.session.id() \
+ assert(lref.add(rid, sid, big_timeout)) \
+ assert(lref.use(rid, sid)) \
+ while keep_long_ref do \
+ fiber.sleep(small_timeout) \
+ end \
+ assert(lref.del(rid, sid)) \
+end
+
+_ = test_run:switch('storage_1_a')
+_ = c:call('long_ref_request', {3}, {is_async = true})
+c:call('make_ref', {4, big_timeout})
+
+_ = test_run:switch('storage_2_a')
+test_run:wait_cond(function() return lref.count == 2 end)
+
+_ = test_run:switch('storage_1_a')
+c:close()
+
+_ = test_run:switch('storage_2_a')
+-- Still 2 refs.
+assert(lref.count == 2)
+-- The long request ends and the session must be deleted - that was the last
+-- used ref.
+keep_long_ref = false
+test_run:wait_cond(function() return lref.count == 0 end)
+
+_ = test_run:switch("default")
+test_run:drop_cluster(REPLICASET_2)
+test_run:drop_cluster(REPLICASET_1)
diff --git a/test/unit-tap/ref.test.lua b/test/unit-tap/ref.test.lua
new file mode 100755
index 0000000..fdd0477
--- /dev/null
+++ b/test/unit-tap/ref.test.lua
@@ -0,0 +1,228 @@
+#!/usr/bin/env tarantool
+
+local tap = require('tap')
+local fiber = require('fiber')
+local lregistry = require('vshard.registry')
+local lref = require('vshard.storage.ref')
+
+local big_timeout = 1000000
+local small_timeout = 0.000001
+local sid = 0
+local sid2 = 1
+local sid3 = 2
+
+--
+-- gh-147: refs allow to pin all the buckets on the storage at once. Is invented
+-- for map-reduce functionality to pin all buckets on all storages in the
+-- cluster to execute consistent map-reduce calls on all cluster data.
+--
+
+--
+-- Refs use storage API to get bucket space state and wait on its changes. But
+-- not important for these unit tests.
+--
+local function bucket_are_all_rw()
+ return true
+end
+
+lregistry.storage = {
+ bucket_are_all_rw = bucket_are_all_rw,
+}
+
+local function test_ref_basic(test)
+ test:plan(15)
+
+ local rid = 0
+ local ok, err
+ --
+ -- Basic ref/unref.
+ --
+ ok, err = lref.add(rid, sid, big_timeout)
+ test:ok(ok and not err, '+1 ref')
+ test:is(lref.count, 1, 'accounted')
+ ok, err = lref.use(rid, sid)
+ test:ok(ok and not err, 'use the ref')
+ test:is(lref.count, 1, 'but still accounted')
+ ok, err = lref.del(rid, sid)
+ test:ok(ok and not err, '-1 ref')
+ test:is(lref.count, 0, 'accounted')
+
+ --
+ -- Bad ref ID.
+ --
+ rid = 1
+ ok, err = lref.use(rid, sid)
+ test:ok(not ok and err, 'invalid RID at use')
+ ok, err = lref.del(rid, sid)
+ test:ok(not ok and err, 'invalid RID at del')
+
+ --
+ -- Bad session ID.
+ --
+ lref.kill(sid)
+ rid = 0
+ ok, err = lref.use(rid, sid)
+ test:ok(not ok and err, 'invalid SID at use')
+ ok, err = lref.del(rid, sid)
+ test:ok(not ok and err, 'invalid SID at del')
+
+ --
+ -- Duplicate ID.
+ --
+ ok, err = lref.add(rid, sid, big_timeout)
+ test:ok(ok and not err, 'add ref')
+ ok, err = lref.add(rid, sid, big_timeout)
+ test:ok(not ok and err, 'duplicate ref')
+ test:is(lref.count, 1, 'did not affect count')
+ test:ok(lref.use(rid, sid) and lref.del(rid, sid), 'del old ref')
+ test:is(lref.count, 0, 'accounted')
+end
+
+local function test_ref_incremental_gc(test)
+ test:plan(20)
+
+ --
+ -- Ref addition expires 2 old refs.
+ --
+ local ok, err
+ for i = 0, 2 do
+ assert(lref.add(i, sid, small_timeout))
+ end
+ fiber.sleep(small_timeout)
+ test:is(lref.count, 3, 'expired refs are still here')
+ test:ok(lref.add(3, sid, 0), 'add new ref')
+ -- 3 + 1 new - 2 old = 2.
+ test:is(lref.count, 2, 'it collected 2 old refs')
+ -- Sleep again so the just created ref with 0 timeout becomes older than the
+ -- deadline.
+ fiber.sleep(small_timeout)
+ test:ok(lref.add(4, sid, 0), 'add new ref')
+ -- 2 + 1 new - 2 old = 1.
+ test:is(lref.count, 1, 'it collected 2 old refs')
+ test:ok(lref.del(4, sid), 'del the latest manually')
+
+ --
+ -- Incremental GC works fine if only one ref was GCed.
+ --
+ test:ok(lref.add(0, sid, small_timeout), 'add ref with small timeout')
+ test:ok(lref.add(1, sid, big_timeout), 'add ref with big timeout')
+ fiber.sleep(small_timeout)
+ test:ok(lref.add(2, sid, 0), 'add ref with 0 timeout')
+ test:is(lref.count, 2, 'collected 1 old ref, 1 is kept')
+ test:ok(lref.del(2, sid), 'del newest ref, it was not collected')
+ test:ok(lref.del(1, sid), 'del ref with big timeout')
+ test:ok(lref.count, 0, 'all is deleted')
+
+ --
+ -- GC works fine when only one ref was left and it was expired.
+ --
+ test:ok(lref.add(0, sid, small_timeout), 'add ref with small timeout')
+ test:is(lref.count, 1, '1 ref total')
+ fiber.sleep(small_timeout)
+ test:ok(lref.add(1, sid, big_timeout), 'add ref with big timeout')
+ test:is(lref.count, 1, 'collected the old one')
+ lref.gc()
+ test:is(lref.count, 1, 'still 1 - timeout was big')
+ test:ok(lref.del(1, sid), 'delete it')
+ test:is(lref.count, 0, 'no refs')
+end
+
+local function test_ref_gc(test)
+ test:plan(7)
+
+ --
+ -- Generic GC works fine with multiple sessions.
+ --
+ assert(lref.add(0, sid, big_timeout))
+ assert(lref.add(1, sid, small_timeout))
+ assert(lref.add(0, sid3, small_timeout))
+ assert(lref.add(0, sid2, small_timeout))
+ assert(lref.add(1, sid2, big_timeout))
+ assert(lref.add(1, sid3, big_timeout))
+ test:is(lref.count, 6, 'add 6 refs total')
+ fiber.sleep(small_timeout)
+ lref.gc()
+ test:is(lref.count, 3, '3 collected')
+ test:ok(lref.del(0, sid), 'del first')
+ test:ok(lref.del(1, sid2), 'del second')
+ test:ok(lref.del(1, sid3), 'del third')
+ test:is(lref.count, 0, '3 deleted')
+ lref.gc()
+ test:is(lref.count, 0, 'gc on empty refs did not break anything')
+end
+
+local function test_ref_use(test)
+ test:plan(7)
+
+ --
+ -- Ref use updates the session heap.
+ --
+ assert(lref.add(0, sid, small_timeout))
+ assert(lref.add(0, sid2, big_timeout))
+ test:ok(lref.count, 2, 'add 2 refs')
+ test:ok(lref.use(0, sid), 'use one with small timeout')
+ lref.gc()
+ test:is(lref.count, 2, 'still 2 refs')
+ fiber.sleep(small_timeout)
+ test:is(lref.count, 2, 'still 2 refs after sleep')
+ test:ok(lref.del(0, sid, 'del first'))
+ test:ok(lref.del(0, sid2, 'del second'))
+ test:is(lref.count, 0, 'now all is deleted')
+end
+
+local function test_ref_del(test)
+ test:plan(7)
+
+ --
+ -- Ref del updates the session heap.
+ --
+ assert(lref.add(0, sid, small_timeout))
+ assert(lref.add(0, sid2, big_timeout))
+ test:is(lref.count, 2, 'add 2 refs')
+ test:ok(lref.del(0, sid), 'del with small timeout')
+ lref.gc()
+ test:is(lref.count, 1, '1 ref remains')
+ fiber.sleep(small_timeout)
+ test:is(lref.count, 1, '1 ref remains after sleep')
+ lref.gc()
+ test:is(lref.count, 1, '1 ref remains after sleep and gc')
+ test:ok(lref.del(0, sid2), 'del with big timeout')
+ test:is(lref.count, 0, 'now all is deleted')
+end
+
+local function test_ref_dead_session(test)
+ test:plan(4)
+
+ --
+ -- Session after disconnect still might have running requests. It must
+ -- be kept alive with its refs until the requests are done.
+ --
+ assert(lref.add(0, sid, small_timeout))
+ assert(lref.use(0, sid))
+ lref.kill(sid)
+ test:ok(lref.del(0, sid))
+
+ --
+ -- The dead session is kept only while the used requests are running. It is
+ -- deleted when use count becomes 0 even if there were unused refs.
+ --
+ assert(lref.add(0, sid, big_timeout))
+ assert(lref.add(1, sid, big_timeout))
+ assert(lref.use(0, sid))
+ lref.kill(sid)
+ test:is(lref.count, 2, '2 refs in a dead session')
+ test:ok(lref.del(0, sid), 'delete the used ref')
+ test:is(lref.count, 0, '0 refs - the unused ref was deleted with session')
+end
+
+local test = tap.test('ref')
+test:plan(6)
+
+test:test('basic', test_ref_basic)
+test:test('incremental gc', test_ref_incremental_gc)
+test:test('gc', test_ref_gc)
+test:test('use', test_ref_use)
+test:test('del', test_ref_del)
+test:test('dead session use', test_ref_dead_session)
+
+os.exit(test:check() and 0 or 1)
diff --git a/vshard/consts.lua b/vshard/consts.lua
index cf3f422..0ffe0e2 100644
--- a/vshard/consts.lua
+++ b/vshard/consts.lua
@@ -48,4 +48,5 @@ return {
DISCOVERY_TIMEOUT = 10,
TIMEOUT_INFINITY = 500 * 365 * 86400,
+ DEADLINE_INFINITY = math.huge,
}
diff --git a/vshard/error.lua b/vshard/error.lua
index a6f46a9..b02bfe9 100644
--- a/vshard/error.lua
+++ b/vshard/error.lua
@@ -130,6 +130,25 @@ local error_message_template = {
name = 'TOO_MANY_RECEIVING',
msg = 'Too many receiving buckets at once, please, throttle'
},
+ [26] = {
+ name = 'STORAGE_IS_REFERENCED',
+ msg = 'Storage is referenced'
+ },
+ [27] = {
+ name = 'STORAGE_REF_ADD',
+ msg = 'Can not add a storage ref: %s',
+ args = {'reason'},
+ },
+ [28] = {
+ name = 'STORAGE_REF_USE',
+ msg = 'Can not use a storage ref: %s',
+ args = {'reason'},
+ },
+ [29] = {
+ name = 'STORAGE_REF_DEL',
+ msg = 'Can not delete a storage ref: %s',
+ args = {'reason'},
+ },
}
--
diff --git a/vshard/storage/CMakeLists.txt b/vshard/storage/CMakeLists.txt
index 3f4ed43..7c1e97d 100644
--- a/vshard/storage/CMakeLists.txt
+++ b/vshard/storage/CMakeLists.txt
@@ -1,2 +1,2 @@
-install(FILES init.lua reload_evolution.lua
+install(FILES init.lua reload_evolution.lua ref.lua
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/vshard/storage)
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index de05531..d023583 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -17,6 +17,7 @@ if rawget(_G, MODULE_INTERNALS) then
'vshard.replicaset', 'vshard.util',
'vshard.storage.reload_evolution',
'vshard.lua_gc', 'vshard.rlist', 'vshard.registry',
+ 'vshard.heap', 'vshard.storage.ref',
}
for _, module in pairs(vshard_modules) do
package.loaded[module] = nil
@@ -30,6 +31,7 @@ local lreplicaset = require('vshard.replicaset')
local util = require('vshard.util')
local lua_gc = require('vshard.lua_gc')
local lregistry = require('vshard.registry')
+local lref = require('vshard.storage.ref')
local reload_evolution = require('vshard.storage.reload_evolution')
local fiber_cond_wait = util.fiber_cond_wait
local bucket_ref_new
@@ -1140,6 +1142,9 @@ local function bucket_recv_xc(bucket_id, from, data, opts)
return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id, msg,
from)
end
+ if lref.count > 0 then
+ return nil, lerror.vshard(lerror.code.STORAGE_IS_REFERENCED)
+ end
if is_this_replicaset_locked() then
return nil, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED)
end
@@ -1441,6 +1446,9 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard)
local _bucket = box.space._bucket
local bucket = _bucket:get({bucket_id})
+ if lref.count > 0 then
+ return nil, lerror.vshard(lerror.code.STORAGE_IS_REFERENCED)
+ end
if is_this_replicaset_locked() then
return nil, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED)
end
@@ -2528,6 +2536,7 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
box.space._bucket:on_replace(nil, M.bucket_on_replace)
M.bucket_on_replace = nil
end
+ lref.cfg()
if is_master then
box.space._bucket:on_replace(bucket_generation_increment)
M.bucket_on_replace = bucket_generation_increment
diff --git a/vshard/storage/ref.lua b/vshard/storage/ref.lua
new file mode 100644
index 0000000..a024d8e
--- /dev/null
+++ b/vshard/storage/ref.lua
@@ -0,0 +1,395 @@
+--
+-- 'Ref' module helps to ensure that all buckets on the storage stay writable
+-- while there is at least one ref on the storage.
+-- Having storage referenced allows to execute any kinds of requests on all the
+-- visible data in all spaces in locally stored buckets. This is useful when
+-- need to access tons of buckets at once, especially when exact bucket IDs are
+-- not known.
+--
+-- Refs have deadlines. So as the storage wouldn't freeze not being able to move
+-- buckets until restart in case a ref is not deleted due to an error in user's
+-- code or disconnect.
+--
+-- The disconnects and restarts mean the refs can't be global. Otherwise any
+-- kinds of global counters, uuids and so on, even paired with any ids from a
+-- client could clash between clients on their reconnects or storage restarts.
+-- Unless they establish a TCP-like session, which would be too complicated.
+--
+-- Instead, the refs are spread over the existing box sessions. This allows to
+-- bind refs of each client to its TCP connection and not care about how to make
+-- them unique across all sessions, how not to mess the refs on restart, and how
+-- to drop the refs when a client disconnects.
+--
+
+local MODULE_INTERNALS = '__module_vshard_storage_ref'
+-- Update when change behaviour of anything in the file, to be able to reload.
+local MODULE_VERSION = 1
+
+local lfiber = require('fiber')
+local lheap = require('vshard.heap')
+local lerror = require('vshard.error')
+local lconsts = require('vshard.consts')
+local lregistry = require('vshard.registry')
+local fiber_clock = lfiber.clock
+local fiber_yield = lfiber.yield
+local DEADLINE_INFINITY = lconsts.DEADLINE_INFINITY
+local LUA_CHUNK_SIZE = lconsts.LUA_CHUNK_SIZE
+
+--
+-- Binary heap sort. Object with the closest deadline should be on top.
+--
+local function heap_min_deadline_cmp(ref1, ref2)
+ return ref1.deadline < ref2.deadline
+end
+
+local M = rawget(_G, MODULE_INTERNALS)
+if not M then
+ M = {
+ module_version = MODULE_VERSION,
+ -- Total number of references in all sessions.
+ count = 0,
+ -- Heap of session objects. Each session has refs sorted by their
+ -- deadline. The sessions themselves are also sorted by deadlines.
+ -- Session deadline is defined as the closest deadline of all its refs.
+ -- Or infinity in case there are no refs in it.
+ session_heap = lheap.new(heap_min_deadline_cmp),
+ -- Map of session objects. This is used to get session object by its ID.
+ session_map = {},
+ -- On session disconnect trigger to kill the dead sessions. It is saved
+ -- here for the sake of future reload to be able to delete the old
+ -- on disconnect function before setting a new one.
+ on_disconnect = nil,
+ }
+else
+ -- No reload so far. This is a first version. Return as is.
+ return M
+end
+
+local function ref_session_new(sid)
+ -- Session object does not store its internal hot attributes in a table.
+ -- Because it would mean access to any session attribute would cost at least
+ -- one table indexing operation. Instead, all internal fields are stored as
+ -- upvalues referenced by the methods defined as closures.
+ --
+ -- This means session creation may not very suitable for jitting, but it is
+ -- very rare and attempts to optimize the most common case.
+ --
+ -- Still the public functions take 'self' object to make it look normally.
+ -- They even use it a bit.
+
+ -- Ref map to get ref object by its ID.
+ local ref_map = {}
+ -- Ref heap sorted by their deadlines.
+ local ref_heap = lheap.new(heap_min_deadline_cmp)
+ -- Total number of refs of the session. Is used to drop the session when it
+ -- it is disconnected and has no refs anymore. Heap size can't be used
+ -- because not all refs are stored here.
+ local ref_count_total = 0
+ -- Number of refs in use. They are included into the total count. The used
+ -- refs are accounted explicitly in order to detect when a disconnected
+ -- session has no used refs anymore and can be deleted.
+ local ref_count_use = 0
+ -- When the session becomes disconnected, it must be deleted from the global
+ -- heap when all its used refs are gone.
+ local is_disconnected = false
+ -- Cache global session storages as upvalues to save on M indexing.
+ local global_heap = M.session_heap
+ local global_map = M.session_map
+
+ local function ref_session_discount(self, del_count)
+ local new_count = M.count - del_count
+ assert(new_count >= 0)
+ M.count = new_count
+
+ new_count = ref_count_total - del_count
+ assert(new_count >= 0)
+ ref_count_total = new_count
+ end
+
+ local function ref_session_delete_if_not_used(self)
+ if not is_disconnected or ref_count_use > 0 then
+ return
+ end
+ ref_session_discount(self, ref_count_total)
+ global_map[sid] = nil
+ global_heap:remove(self)
+ end
+
+ local function ref_session_update_deadline(self)
+ local ref = ref_heap:top()
+ if not ref then
+ self.deadline = DEADLINE_INFINITY
+ global_heap:update(self)
+ else
+ local deadline = ref.deadline
+ if deadline ~= self.deadline then
+ self.deadline = deadline
+ global_heap:update(self)
+ end
+ end
+ end
+
+ --
+ -- Garbage collect at most 2 expired refs. The idea is that there is no a
+ -- dedicated fiber for expired refs collection. It would be too expensive to
+ -- wakeup a fiber on each added or removed or updated ref.
+ --
+ -- Instead, ref GC is mostly incremental and works by the principle "remove
+ -- more than add". On each new ref added, two old refs try to expire. This
+ -- way refs don't stack infinitely, and the expired refs are eventually
+ -- removed. Because removal is faster than addition: -2 for each +1.
+ --
+ local function ref_session_gc_step(self, now)
+ -- This is inlined 2 iterations of the more general GC procedure. The
+ -- latter is not called in order to save on not having a loop,
+ -- additional branches and variables.
+ if self.deadline > now then
+ return
+ end
+ local top = ref_heap:top()
+ ref_heap:remove_top()
+ ref_map[top.id] = nil
+ top = ref_heap:top()
+ if not top then
+ self.deadline = DEADLINE_INFINITY
+ global_heap:update(self)
+ ref_session_discount(self, 1)
+ return
+ end
+ local deadline = top.deadline
+ if deadline >= now then
+ self.deadline = deadline
+ global_heap:update(self)
+ ref_session_discount(self, 1)
+ return
+ end
+ ref_heap:remove_top()
+ ref_map[top.id] = nil
+ top = ref_heap:top()
+ if not top then
+ self.deadline = DEADLINE_INFINITY
+ else
+ self.deadline = top.deadline
+ end
+ global_heap:update(self)
+ ref_session_discount(self, 2)
+ end
+
+ --
+ -- GC expired refs until they end or the limit on the number of iterations
+ -- is exhausted. The limit is supposed to prevent too long GC which would
+ -- occupy TX thread unfairly.
+ --
+ -- Returns nil if nothing to GC, or number of iterations left from the
+ -- limit. The caller is supposed to yield when 0 is returned, and retry GC
+ -- until it returns nil.
+ -- The function itself does not yield, because it is used from a more
+ -- generic function GCing all sessions. It would not ever yield if all
+ -- sessions would have less than limit refs, even if total ref count would
+ -- be much bigger.
+ --
+ -- Besides, the session might be killed during general GC. There must not be
+ -- any yields in session methods so as not to introduce a support of dead
+ -- sessions.
+ --
+ local function ref_session_gc(self, limit, now)
+ if self.deadline >= now then
+ return nil
+ end
+ local top = ref_heap:top()
+ local del = 1
+ local rest = 0
+ local deadline
+ repeat
+ ref_heap:remove_top()
+ ref_map[top.id] = nil
+ top = ref_heap:top()
+ if not top then
+ self.deadline = DEADLINE_INFINITY
+ rest = limit - del
+ break
+ end
+ deadline = top.deadline
+ if deadline >= now then
+ self.deadline = deadline
+ rest = limit - del
+ break
+ end
+ del = del + 1
+ until del >= limit
+ ref_session_discount(self, del)
+ global_heap:update(self)
+ return rest
+ end
+
+ local function ref_session_add(self, rid, deadline, now)
+ if ref_map[rid] then
+ return nil, lerror.vshard(lerror.code.STORAGE_REF_ADD,
+ 'duplicate ref')
+ end
+ local ref = {
+ deadline = deadline,
+ id = rid,
+ -- Used by the heap.
+ index = -1,
+ }
+ ref_session_gc_step(self, now)
+ ref_map[rid] = ref
+ ref_heap:push(ref)
+ if deadline < self.deadline then
+ self.deadline = deadline
+ global_heap:update(self)
+ end
+ ref_count_total = ref_count_total + 1
+ M.count = M.count + 1
+ return true
+ end
+
+ --
+ -- Ref use means it can't be expired until deleted explicitly. Should be
+ -- done when the request affecting the whole storage starts. After use it is
+ -- important to call del afterwards - GC won't delete it automatically now.
+ -- Unless the entire session is killed.
+ --
+ local function ref_session_use(self, rid)
+ local ref = ref_map[rid]
+ if not ref then
+ return nil, lerror.vshard(lerror.code.STORAGE_REF_USE, 'no ref')
+ end
+ ref_heap:remove(ref)
+ ref_session_update_deadline(self)
+ ref_count_use = ref_count_use + 1
+ return true
+ end
+
+ local function ref_session_del(self, rid)
+ local ref = ref_map[rid]
+ if not ref then
+ return nil, lerror.vshard(lerror.code.STORAGE_REF_DEL, 'no ref')
+ end
+ ref_map[rid] = nil
+ if ref.index == -1 then
+ ref_session_update_deadline(self)
+ ref_session_discount(self, 1)
+ ref_count_use = ref_count_use - 1
+ ref_session_delete_if_not_used(self)
+ else
+ ref_heap:remove(ref)
+ ref_session_update_deadline(self)
+ ref_session_discount(self, 1)
+ end
+ return true
+ end
+
+ local function ref_session_kill(self)
+ assert(not is_disconnected)
+ is_disconnected = true
+ ref_session_delete_if_not_used(self)
+ end
+
+ -- Don't use __index. It is useless since all sessions use closures as
+ -- methods. Also it is probably slower because on each method call would
+ -- need to get the metatable, get __index, find the method here. While now
+ -- it is only an index operation on the session object.
+ local session = {
+ deadline = DEADLINE_INFINITY,
+ -- Used by the heap.
+ index = -1,
+ -- Methods.
+ del = ref_session_del,
+ gc = ref_session_gc,
+ add = ref_session_add,
+ use = ref_session_use,
+ kill = ref_session_kill,
+ }
+ global_map[sid] = session
+ global_heap:push(session)
+ return session
+end
+
+local function ref_gc()
+ local session_heap = M.session_heap
+ local session = session_heap:top()
+ if not session then
+ return
+ end
+ local limit = LUA_CHUNK_SIZE
+ local now = fiber_clock()
+ repeat
+ limit = session:gc(limit, now)
+ if not limit then
+ return
+ end
+ if limit == 0 then
+ fiber_yield()
+ limit = LUA_CHUNK_SIZE
+ now = fiber_clock()
+ end
+ session = session_heap:top()
+ until not session
+end
+
+local function ref_add(rid, sid, timeout)
+ local now = fiber_clock()
+ local deadline = now + timeout
+ local ok, err, session
+ local storage = lregistry.storage
+ while not storage.bucket_are_all_rw() do
+ ok, err = storage.bucket_generation_wait(timeout)
+ if not ok then
+ return nil, err
+ end
+ now = fiber_clock()
+ timeout = deadline - now
+ end
+ session = M.session_map[sid]
+ if not session then
+ session = ref_session_new(sid)
+ end
+ return session:add(rid, deadline, now)
+end
+
+local function ref_use(rid, sid)
+ local session = M.session_map[sid]
+ if not session then
+ return nil, lerror.vshard(lerror.code.STORAGE_REF_USE, 'no session')
+ end
+ return session:use(rid)
+end
+
+local function ref_del(rid, sid)
+ local session = M.session_map[sid]
+ if not session then
+ return nil, lerror.vshard(lerror.code.STORAGE_REF_DEL, 'no session')
+ end
+ return session:del(rid)
+end
+
+local function ref_kill_session(sid)
+ local session = M.session_map[sid]
+ if session then
+ session:kill()
+ end
+end
+
+local function ref_on_session_disconnect()
+ ref_kill_session(box.session.id())
+end
+
+local function ref_cfg()
+ if M.on_disconnect then
+ pcall(box.session.on_disconnect, nil, M.on_disconnect)
+ end
+ box.session.on_disconnect(ref_on_session_disconnect)
+ M.on_disconnect = ref_on_session_disconnect
+end
+
+M.del = ref_del
+M.gc = ref_gc
+M.add = ref_add
+M.use = ref_use
+M.cfg = ref_cfg
+M.kill = ref_kill_session
+lregistry.storage_ref = M
+
+return M
More information about the Tarantool-patches
mailing list