[Tarantool-patches] [PATCH vshard 07/11] storage: introduce bucket_generation_wait()

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Feb 23 03:15:45 MSK 2021


In the future map-reduce code it will be needed to be able to wait
until all buckets on the storage enter writable state. If they are
not writable, the code should wait efficiently, without polling.

The patch adds a function bucket_generation_wait() which is
registered in registry.storage.

It helps to wait until state of any bucket is changed. The caller
code, if wants to wait for all buckets to enter writable state,
should wait on the generation and re-check the requested condition
until it matches or timeout happens.

Part of #147
---
 test/storage/storage.result   | 86 +++++++++++++++++++++++++++++++++++
 test/storage/storage.test.lua | 36 +++++++++++++++
 vshard/storage/init.lua       |  6 +++
 3 files changed, 128 insertions(+)

diff --git a/test/storage/storage.result b/test/storage/storage.result
index edb45be..4730e20 100644
--- a/test/storage/storage.result
+++ b/test/storage/storage.result
@@ -722,6 +722,92 @@ assert(vshard.storage.buckets_count() == 10)
 ---
 - true
 ...
+--
+-- Bucket_generation_wait() registry function.
+--
+lstorage = require('vshard.registry').storage
+---
+...
+ok, err = lstorage.bucket_generation_wait(-1)
+---
+...
+assert(not ok and err.message)
+---
+- Timeout exceeded
+...
+ok, err = lstorage.bucket_generation_wait(0)
+---
+...
+assert(not ok and err.message)
+---
+- Timeout exceeded
+...
+small_timeout = 0.000001
+---
+...
+ok, err = lstorage.bucket_generation_wait(small_timeout)
+---
+...
+assert(not ok and err.message)
+---
+- Timeout exceeded
+...
+ok, err = nil
+---
+...
+big_timeout = 1000000
+---
+...
+_ = fiber.create(function()                                                     \
+    ok, err = lstorage.bucket_generation_wait(big_timeout)                      \
+end)
+---
+...
+fiber.sleep(small_timeout)
+---
+...
+assert(not ok and not err)
+---
+- true
+...
+vshard.storage.bucket_force_drop(10)
+---
+- true
+...
+test_run:wait_cond(function() return ok or err end)
+---
+- true
+...
+assert(ok)
+---
+- true
+...
+-- Cancel should interrupt the waiting.
+ok, err = nil
+---
+...
+f = fiber.create(function()                                                     \
+    ok, err = lstorage.bucket_generation_wait(big_timeout)                      \
+end)
+---
+...
+fiber.sleep(small_timeout)
+---
+...
+assert(not ok and not err)
+---
+- true
+...
+f:cancel()
+---
+...
+_ = test_run:wait_cond(function() return ok or err end)
+---
+...
+assert(not ok and err.message)
+---
+- fiber is cancelled
+...
 _ = test_run:switch("default")
 ---
 ...
diff --git a/test/storage/storage.test.lua b/test/storage/storage.test.lua
index db014ef..86c5e33 100644
--- a/test/storage/storage.test.lua
+++ b/test/storage/storage.test.lua
@@ -205,6 +205,42 @@ assert(vshard.storage.buckets_count() == 5)
 vshard.storage.bucket_force_create(6, 5)
 assert(vshard.storage.buckets_count() == 10)
 
+--
+-- Bucket_generation_wait() registry function.
+--
+lstorage = require('vshard.registry').storage
+ok, err = lstorage.bucket_generation_wait(-1)
+assert(not ok and err.message)
+
+ok, err = lstorage.bucket_generation_wait(0)
+assert(not ok and err.message)
+
+small_timeout = 0.000001
+ok, err = lstorage.bucket_generation_wait(small_timeout)
+assert(not ok and err.message)
+
+ok, err = nil
+big_timeout = 1000000
+_ = fiber.create(function()                                                     \
+    ok, err = lstorage.bucket_generation_wait(big_timeout)                      \
+end)
+fiber.sleep(small_timeout)
+assert(not ok and not err)
+vshard.storage.bucket_force_drop(10)
+test_run:wait_cond(function() return ok or err end)
+assert(ok)
+
+-- Cancel should interrupt the waiting.
+ok, err = nil
+f = fiber.create(function()                                                     \
+    ok, err = lstorage.bucket_generation_wait(big_timeout)                      \
+end)
+fiber.sleep(small_timeout)
+assert(not ok and not err)
+f:cancel()
+_ = test_run:wait_cond(function() return ok or err end)
+assert(not ok and err.message)
+
 _ = test_run:switch("default")
 test_run:drop_cluster(REPLICASET_2)
 test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index b47665b..ffa48b6 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -31,6 +31,7 @@ local util = require('vshard.util')
 local lua_gc = require('vshard.lua_gc')
 local lregistry = require('vshard.registry')
 local reload_evolution = require('vshard.storage.reload_evolution')
+local fiber_cond_wait = util.fiber_cond_wait
 local bucket_ref_new
 
 local M = rawget(_G, MODULE_INTERNALS)
@@ -229,6 +230,10 @@ local function bucket_generation_increment()
     M.bucket_generation_cond:broadcast()
 end
 
+local function bucket_generation_wait(timeout)
+    return fiber_cond_wait(M.bucket_generation_cond, timeout)
+end
+
 --
 -- Check if this replicaset is locked. It means be invisible for
 -- the rebalancer.
@@ -2783,6 +2788,7 @@ M.schema_upgrade_handlers = schema_upgrade_handlers
 M.schema_version_make = schema_version_make
 M.schema_bootstrap = schema_init_0_1_15_0
 
+M.bucket_generation_wait = bucket_generation_wait
 lregistry.storage = M
 
 return {
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list