From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 88B0B71814; Tue, 23 Feb 2021 03:20:28 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 88B0B71814 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1614039628; bh=XdLOumsTDmNrbRtOvugl4YeEsXJn+YDtyQG1zDlsbS4=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To: From; b=uuPOvzaNXBHMGsW2j9+ZtAHtV5MVSEiCaIwutSrnQ8uM4sSkwJ5PTjDJtSUwFv0hg Kv3uSaw0Offzvu0Tn7SvTpyhjzND+NV0WkegK9OY9aO+OIasAX+wDsfOrAIBlB9XjO yMJKgxxT5xSxly92pjAxy0fPH6FqgqbkhI86D3Ek= Received: from smtpng2.m.smailru.net (smtpng2.m.smailru.net [94.100.179.3]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 376CB718AF for ; Tue, 23 Feb 2021 03:16:00 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 376CB718AF Received: by smtpng2.m.smailru.net with esmtpa (envelope-from ) id 1lELMt-0003CR-EK; Tue, 23 Feb 2021 03:15:59 +0300 To: tarantool-patches@dev.tarantool.org, olegrok@tarantool.org, yaroslav.dynnikov@tarantool.org Date: Tue, 23 Feb 2021 01:15:45 +0100 Message-Id: X-Mailer: git-send-email 2.24.3 (Apple Git-128) In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD975C3EC174F56692254B0AABE1FB071B229FBFD4D57485274182A05F53808504094856BED174B99D53D7CC4D466FA69439CD6EBA2DD06477FC4F4BFDDE7271C27 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE75263010198C72082EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F790063750965CC5CDB672DE8638F802B75D45FF5571747095F342E8C7A0BC55FA0FE5FC425FEB8730D384030C9BB0C287CD321813F9EC4BC5BB9190389733CBF5DBD5E913377AFFFEAFD269176DF2183F8FC7C0ECC8AC47CD0EDEFF8941B15DA834481FCF19DD082D7633A0EF3E4896CB9E6436389733CBF5DBD5E9D5E8D9A59859A8B6E5E764EB5D94DBD4CC7F00164DA146DA6F5DAA56C3B73B23C77107234E2CFBA567F23339F89546C55F5C1EE8F4F765FC08F9A42B2210255C75ECD9A6C639B01BBD4B6F7A4D31EC0BC0CAF46E325F83A522CA9DD8327EE4930A3850AC1BE2E73579C543ECCDAE434EC4224003CC836476C0CAF46E325F83A50BF2EBBBDD9D6B0F347543BADC64E7283B503F486389A921A5CC5B56E945C8DA X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8186998911F362727C4C7A0BC55FA0FE5FC425FEB8730D384030C9BB0C287CD321819CA04A15360A768B1881A6453793CE9C32612AADDFBE061C61BE10805914D3804EBA3D8E7E5B87ABF8C51168CD8EBDB587F3D2152687E5CDC48ACC2A39D04F89CDFB48F4795C241BDAD6C7F3747799A X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D349EC559D073CA5B687BE9C822F1587D5909C8CF7917C649BA269E6D8F83028418E4C9FC0162F91CB31D7E09C32AA3244CEDE1DD8BE203BF958D677D40649480BA55E75C8D0ED9F6EEFACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2bioj2drqE2xHc+H8T+8CkiA6uw== X-Mailru-Sender: 689FA8AB762F73936BC43F508A0638222511874AB447CED5FB0E125837477ACB3841015FED1DE5223CC9A89AB576DD93FB559BB5D741EB963CF37A108A312F5C27E8A8C3839CE0E267EA787935ED9F1B X-Mras: Ok Subject: [Tarantool-patches] [PATCH vshard 07/11] storage: introduce bucket_generation_wait() X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Vladislav Shpilevoy via Tarantool-patches Reply-To: Vladislav Shpilevoy Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" In the future map-reduce code it will be needed to be able to wait until all buckets on the storage enter writable state. If they are not writable, the code should wait efficiently, without polling. The patch adds a function bucket_generation_wait() which is registered in registry.storage. It helps to wait until state of any bucket is changed. The caller code, if wants to wait for all buckets to enter writable state, should wait on the generation and re-check the requested condition until it matches or timeout happens. Part of #147 --- test/storage/storage.result | 86 +++++++++++++++++++++++++++++++++++ test/storage/storage.test.lua | 36 +++++++++++++++ vshard/storage/init.lua | 6 +++ 3 files changed, 128 insertions(+) diff --git a/test/storage/storage.result b/test/storage/storage.result index edb45be..4730e20 100644 --- a/test/storage/storage.result +++ b/test/storage/storage.result @@ -722,6 +722,92 @@ assert(vshard.storage.buckets_count() == 10) --- - true ... +-- +-- Bucket_generation_wait() registry function. +-- +lstorage = require('vshard.registry').storage +--- +... +ok, err = lstorage.bucket_generation_wait(-1) +--- +... +assert(not ok and err.message) +--- +- Timeout exceeded +... +ok, err = lstorage.bucket_generation_wait(0) +--- +... +assert(not ok and err.message) +--- +- Timeout exceeded +... +small_timeout = 0.000001 +--- +... +ok, err = lstorage.bucket_generation_wait(small_timeout) +--- +... +assert(not ok and err.message) +--- +- Timeout exceeded +... +ok, err = nil +--- +... +big_timeout = 1000000 +--- +... +_ = fiber.create(function() \ + ok, err = lstorage.bucket_generation_wait(big_timeout) \ +end) +--- +... +fiber.sleep(small_timeout) +--- +... +assert(not ok and not err) +--- +- true +... +vshard.storage.bucket_force_drop(10) +--- +- true +... +test_run:wait_cond(function() return ok or err end) +--- +- true +... +assert(ok) +--- +- true +... +-- Cancel should interrupt the waiting. +ok, err = nil +--- +... +f = fiber.create(function() \ + ok, err = lstorage.bucket_generation_wait(big_timeout) \ +end) +--- +... +fiber.sleep(small_timeout) +--- +... +assert(not ok and not err) +--- +- true +... +f:cancel() +--- +... +_ = test_run:wait_cond(function() return ok or err end) +--- +... +assert(not ok and err.message) +--- +- fiber is cancelled +... _ = test_run:switch("default") --- ... diff --git a/test/storage/storage.test.lua b/test/storage/storage.test.lua index db014ef..86c5e33 100644 --- a/test/storage/storage.test.lua +++ b/test/storage/storage.test.lua @@ -205,6 +205,42 @@ assert(vshard.storage.buckets_count() == 5) vshard.storage.bucket_force_create(6, 5) assert(vshard.storage.buckets_count() == 10) +-- +-- Bucket_generation_wait() registry function. +-- +lstorage = require('vshard.registry').storage +ok, err = lstorage.bucket_generation_wait(-1) +assert(not ok and err.message) + +ok, err = lstorage.bucket_generation_wait(0) +assert(not ok and err.message) + +small_timeout = 0.000001 +ok, err = lstorage.bucket_generation_wait(small_timeout) +assert(not ok and err.message) + +ok, err = nil +big_timeout = 1000000 +_ = fiber.create(function() \ + ok, err = lstorage.bucket_generation_wait(big_timeout) \ +end) +fiber.sleep(small_timeout) +assert(not ok and not err) +vshard.storage.bucket_force_drop(10) +test_run:wait_cond(function() return ok or err end) +assert(ok) + +-- Cancel should interrupt the waiting. +ok, err = nil +f = fiber.create(function() \ + ok, err = lstorage.bucket_generation_wait(big_timeout) \ +end) +fiber.sleep(small_timeout) +assert(not ok and not err) +f:cancel() +_ = test_run:wait_cond(function() return ok or err end) +assert(not ok and err.message) + _ = test_run:switch("default") test_run:drop_cluster(REPLICASET_2) test_run:drop_cluster(REPLICASET_1) diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index b47665b..ffa48b6 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -31,6 +31,7 @@ local util = require('vshard.util') local lua_gc = require('vshard.lua_gc') local lregistry = require('vshard.registry') local reload_evolution = require('vshard.storage.reload_evolution') +local fiber_cond_wait = util.fiber_cond_wait local bucket_ref_new local M = rawget(_G, MODULE_INTERNALS) @@ -229,6 +230,10 @@ local function bucket_generation_increment() M.bucket_generation_cond:broadcast() end +local function bucket_generation_wait(timeout) + return fiber_cond_wait(M.bucket_generation_cond, timeout) +end + -- -- Check if this replicaset is locked. It means be invisible for -- the rebalancer. @@ -2783,6 +2788,7 @@ M.schema_upgrade_handlers = schema_upgrade_handlers M.schema_version_make = schema_version_make M.schema_bootstrap = schema_init_0_1_15_0 +M.bucket_generation_wait = bucket_generation_wait lregistry.storage = M return { -- 2.24.3 (Apple Git-128)