[Tarantool-patches] [PATCH vshard 07/11] storage: introduce bucket_generation_wait()

Oleg Babin olegrok at tarantool.org
Wed Feb 24 13:27:51 MSK 2021


Thanks for your patch. LGTM.

On 23.02.2021 03:15, Vladislav Shpilevoy wrote:
> In the future map-reduce code it will be needed to be able to wait
> until all buckets on the storage enter writable state. If they are
> not writable, the code should wait efficiently, without polling.
>
> The patch adds a function bucket_generation_wait() which is
> registered in registry.storage.
>
> It helps to wait until state of any bucket is changed. The caller
> code, if wants to wait for all buckets to enter writable state,
> should wait on the generation and re-check the requested condition
> until it matches or timeout happens.
>
> Part of #147
> ---
>   test/storage/storage.result   | 86 +++++++++++++++++++++++++++++++++++
>   test/storage/storage.test.lua | 36 +++++++++++++++
>   vshard/storage/init.lua       |  6 +++
>   3 files changed, 128 insertions(+)
>
> diff --git a/test/storage/storage.result b/test/storage/storage.result
> index edb45be..4730e20 100644
> --- a/test/storage/storage.result
> +++ b/test/storage/storage.result
> @@ -722,6 +722,92 @@ assert(vshard.storage.buckets_count() == 10)
>   ---
>   - true
>   ...
> +--
> +-- Bucket_generation_wait() registry function.
> +--
> +lstorage = require('vshard.registry').storage
> +---
> +...
> +ok, err = lstorage.bucket_generation_wait(-1)
> +---
> +...
> +assert(not ok and err.message)
> +---
> +- Timeout exceeded
> +...
> +ok, err = lstorage.bucket_generation_wait(0)
> +---
> +...
> +assert(not ok and err.message)
> +---
> +- Timeout exceeded
> +...
> +small_timeout = 0.000001
> +---
> +...
> +ok, err = lstorage.bucket_generation_wait(small_timeout)
> +---
> +...
> +assert(not ok and err.message)
> +---
> +- Timeout exceeded
> +...
> +ok, err = nil
> +---
> +...
> +big_timeout = 1000000
> +---
> +...
> +_ = fiber.create(function()                                                     \
> +    ok, err = lstorage.bucket_generation_wait(big_timeout)                      \
> +end)
> +---
> +...
> +fiber.sleep(small_timeout)
> +---
> +...
> +assert(not ok and not err)
> +---
> +- true
> +...
> +vshard.storage.bucket_force_drop(10)
> +---
> +- true
> +...
> +test_run:wait_cond(function() return ok or err end)
> +---
> +- true
> +...
> +assert(ok)
> +---
> +- true
> +...
> +-- Cancel should interrupt the waiting.
> +ok, err = nil
> +---
> +...
> +f = fiber.create(function()                                                     \
> +    ok, err = lstorage.bucket_generation_wait(big_timeout)                      \
> +end)
> +---
> +...
> +fiber.sleep(small_timeout)
> +---
> +...
> +assert(not ok and not err)
> +---
> +- true
> +...
> +f:cancel()
> +---
> +...
> +_ = test_run:wait_cond(function() return ok or err end)
> +---
> +...
> +assert(not ok and err.message)
> +---
> +- fiber is cancelled
> +...
>   _ = test_run:switch("default") --- ... diff --git a/test/storage/storage.test.lua 
> b/test/storage/storage.test.lua index db014ef..86c5e33 100644 --- 
> a/test/storage/storage.test.lua +++ b/test/storage/storage.test.lua @@ 
> -205,6 +205,42 @@ assert(vshard.storage.buckets_count() == 5) 
> vshard.storage.bucket_force_create(6, 5) 
> assert(vshard.storage.buckets_count() == 10) +-- +-- 
> Bucket_generation_wait() registry function. +-- +lstorage = 
> require('vshard.registry').storage +ok, err = 
> lstorage.bucket_generation_wait(-1) +assert(not ok and err.message) + 
> +ok, err = lstorage.bucket_generation_wait(0) +assert(not ok and 
> err.message) + +small_timeout = 0.000001 +ok, err = 
> lstorage.bucket_generation_wait(small_timeout) +assert(not ok and 
> err.message) + +ok, err = nil +big_timeout = 1000000 +_ = 
> fiber.create(function() \ + ok, err = 
> lstorage.bucket_generation_wait(big_timeout) \ +end) 
> +fiber.sleep(small_timeout) +assert(not ok and not err) 
> +vshard.storage.bucket_force_drop(10) +test_run:wait_cond(function() 
> return ok or err end) +assert(ok) + +-- Cancel should interrupt the 
> waiting. +ok, err = nil +f = fiber.create(function() \ + ok, err = 
> lstorage.bucket_generation_wait(big_timeout) \ +end) 
> +fiber.sleep(small_timeout) +assert(not ok and not err) +f:cancel() +_ 
> = test_run:wait_cond(function() return ok or err end) +assert(not ok 
> and err.message) + _ = test_run:switch("default")
>   test_run:drop_cluster(REPLICASET_2)
>   test_run:drop_cluster(REPLICASET_1)
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index b47665b..ffa48b6 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -31,6 +31,7 @@ local util = require('vshard.util')
>   local lua_gc = require('vshard.lua_gc')
>   local lregistry = require('vshard.registry')
>   local reload_evolution = require('vshard.storage.reload_evolution')
> +local fiber_cond_wait = util.fiber_cond_wait
>   local bucket_ref_new
>   
>   local M = rawget(_G, MODULE_INTERNALS)
> @@ -229,6 +230,10 @@ local function bucket_generation_increment()
>       M.bucket_generation_cond:broadcast()
>   end
>   
> +local function bucket_generation_wait(timeout)
> +    return fiber_cond_wait(M.bucket_generation_cond, timeout)
> +end
> +
>   --
>   -- Check if this replicaset is locked. It means be invisible for
>   -- the rebalancer.
> @@ -2783,6 +2788,7 @@ M.schema_upgrade_handlers = schema_upgrade_handlers
>   M.schema_version_make = schema_version_make
>   M.schema_bootstrap = schema_init_0_1_15_0
>   
> +M.bucket_generation_wait = bucket_generation_wait
>   lregistry.storage = M
>   
>   return {


More information about the Tarantool-patches mailing list