[Tarantool-patches] [PATCH vshard 07/11] storage: introduce bucket_generation_wait()
Oleg Babin
olegrok at tarantool.org
Wed Feb 24 13:27:51 MSK 2021
Thanks for your patch. LGTM.
On 23.02.2021 03:15, Vladislav Shpilevoy wrote:
> In the future map-reduce code it will be needed to be able to wait
> until all buckets on the storage enter writable state. If they are
> not writable, the code should wait efficiently, without polling.
>
> The patch adds a function bucket_generation_wait() which is
> registered in registry.storage.
>
> It helps to wait until state of any bucket is changed. The caller
> code, if wants to wait for all buckets to enter writable state,
> should wait on the generation and re-check the requested condition
> until it matches or timeout happens.
>
> Part of #147
> ---
> test/storage/storage.result | 86 +++++++++++++++++++++++++++++++++++
> test/storage/storage.test.lua | 36 +++++++++++++++
> vshard/storage/init.lua | 6 +++
> 3 files changed, 128 insertions(+)
>
> diff --git a/test/storage/storage.result b/test/storage/storage.result
> index edb45be..4730e20 100644
> --- a/test/storage/storage.result
> +++ b/test/storage/storage.result
> @@ -722,6 +722,92 @@ assert(vshard.storage.buckets_count() == 10)
> ---
> - true
> ...
> +--
> +-- Bucket_generation_wait() registry function.
> +--
> +lstorage = require('vshard.registry').storage
> +---
> +...
> +ok, err = lstorage.bucket_generation_wait(-1)
> +---
> +...
> +assert(not ok and err.message)
> +---
> +- Timeout exceeded
> +...
> +ok, err = lstorage.bucket_generation_wait(0)
> +---
> +...
> +assert(not ok and err.message)
> +---
> +- Timeout exceeded
> +...
> +small_timeout = 0.000001
> +---
> +...
> +ok, err = lstorage.bucket_generation_wait(small_timeout)
> +---
> +...
> +assert(not ok and err.message)
> +---
> +- Timeout exceeded
> +...
> +ok, err = nil
> +---
> +...
> +big_timeout = 1000000
> +---
> +...
> +_ = fiber.create(function() \
> + ok, err = lstorage.bucket_generation_wait(big_timeout) \
> +end)
> +---
> +...
> +fiber.sleep(small_timeout)
> +---
> +...
> +assert(not ok and not err)
> +---
> +- true
> +...
> +vshard.storage.bucket_force_drop(10)
> +---
> +- true
> +...
> +test_run:wait_cond(function() return ok or err end)
> +---
> +- true
> +...
> +assert(ok)
> +---
> +- true
> +...
> +-- Cancel should interrupt the waiting.
> +ok, err = nil
> +---
> +...
> +f = fiber.create(function() \
> + ok, err = lstorage.bucket_generation_wait(big_timeout) \
> +end)
> +---
> +...
> +fiber.sleep(small_timeout)
> +---
> +...
> +assert(not ok and not err)
> +---
> +- true
> +...
> +f:cancel()
> +---
> +...
> +_ = test_run:wait_cond(function() return ok or err end)
> +---
> +...
> +assert(not ok and err.message)
> +---
> +- fiber is cancelled
> +...
> _ = test_run:switch("default") --- ... diff --git a/test/storage/storage.test.lua
> b/test/storage/storage.test.lua index db014ef..86c5e33 100644 ---
> a/test/storage/storage.test.lua +++ b/test/storage/storage.test.lua @@
> -205,6 +205,42 @@ assert(vshard.storage.buckets_count() == 5)
> vshard.storage.bucket_force_create(6, 5)
> assert(vshard.storage.buckets_count() == 10) +-- +--
> Bucket_generation_wait() registry function. +-- +lstorage =
> require('vshard.registry').storage +ok, err =
> lstorage.bucket_generation_wait(-1) +assert(not ok and err.message) +
> +ok, err = lstorage.bucket_generation_wait(0) +assert(not ok and
> err.message) + +small_timeout = 0.000001 +ok, err =
> lstorage.bucket_generation_wait(small_timeout) +assert(not ok and
> err.message) + +ok, err = nil +big_timeout = 1000000 +_ =
> fiber.create(function() \ + ok, err =
> lstorage.bucket_generation_wait(big_timeout) \ +end)
> +fiber.sleep(small_timeout) +assert(not ok and not err)
> +vshard.storage.bucket_force_drop(10) +test_run:wait_cond(function()
> return ok or err end) +assert(ok) + +-- Cancel should interrupt the
> waiting. +ok, err = nil +f = fiber.create(function() \ + ok, err =
> lstorage.bucket_generation_wait(big_timeout) \ +end)
> +fiber.sleep(small_timeout) +assert(not ok and not err) +f:cancel() +_
> = test_run:wait_cond(function() return ok or err end) +assert(not ok
> and err.message) + _ = test_run:switch("default")
> test_run:drop_cluster(REPLICASET_2)
> test_run:drop_cluster(REPLICASET_1)
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index b47665b..ffa48b6 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -31,6 +31,7 @@ local util = require('vshard.util')
> local lua_gc = require('vshard.lua_gc')
> local lregistry = require('vshard.registry')
> local reload_evolution = require('vshard.storage.reload_evolution')
> +local fiber_cond_wait = util.fiber_cond_wait
> local bucket_ref_new
>
> local M = rawget(_G, MODULE_INTERNALS)
> @@ -229,6 +230,10 @@ local function bucket_generation_increment()
> M.bucket_generation_cond:broadcast()
> end
>
> +local function bucket_generation_wait(timeout)
> + return fiber_cond_wait(M.bucket_generation_cond, timeout)
> +end
> +
> --
> -- Check if this replicaset is locked. It means be invisible for
> -- the rebalancer.
> @@ -2783,6 +2788,7 @@ M.schema_upgrade_handlers = schema_upgrade_handlers
> M.schema_version_make = schema_version_make
> M.schema_bootstrap = schema_init_0_1_15_0
>
> +M.bucket_generation_wait = bucket_generation_wait
> lregistry.storage = M
>
> return {
More information about the Tarantool-patches
mailing list