[tarantool-patches] [PATCH V3] Prohibit router/storage configuration from different fibers

Evgeniy Zaikin evgen4rave at gmail.com
Tue Sep 11 13:38:31 MSK 2018


Subject: [PATCH V3] Prohibit router/storage configuration from different fibers
Fixes: #140
---
Ticket:https://github.com/tarantool/vshard/issues/140
Branch:https://github.com/tarantool/vshard/tree/rave4life/gh-140-prohibit-multiple-cfg


On Thu, 30 Aug 2018 at 03:56, Vladislav Shpilevoy
<v.shpilevoy at tarantool.org> wrote:
>
> Hi! Thanks for the fixes. See 9 comments below.
>
> 1. Please, put your fixes inlined right after my
> remarks in a response email. It is hard sometimes
> so track which of my remarks are not fixed without
> this.
>

OK, see my patch below in this email.

> 2. Write all your questions about a patch into
> tarantool-patches. I paste here your question
> from a private email:
>
> > Hi! I have a question about writing test module. The test should fail
> > if patch (or any part of this patch) is absent. But it is difficult to
> > make it possible without any architectural distortion (e.g. without
> > making nested function calls, without redundant constructions and code
> > injection, without any overkills in working code). There are no
> > questions If no test needed, but...
> > So which way is correct:
> > 1. to make one more nested (redundant) call in router/storage code;
> > 2. to inject additional counters in 'vshard.util' module, witch
> > contains implementation of my path ?
> > Current PATCH v2, witch was already sent, implements 2nd way (test
> > fails if M.lock:put(true) / M.lock:get() is absent or disabled).
> > https://www.freelists.org/post/tarantool-patches/PATCH-v2-Prohibit-routerstorage-configuration-from-different-fibers
> >
> > Thank you for reviewing my patch.
>
> Answer is the following: neither of those things. Use M.errinj
> table for such error injections, as I said in the previous review.
> Look at its current content and how is it used to make the similar
> test for concurrent cfgs. Or dig into errinj.h in Tarantool to
> find which of existing ones can be used.
>

Fixed. M.errinj.ERRINJ_MULTIPLE_CFG was introduced.

> On 21/08/2018 06:27, Евгений Заикин wrote:
> > Adds prohibition of simultaneous router/storage configuration from
> > multiple fibers.
> > Fixes: #140
> >
> > ---
> > Ticket:https://github.com/tarantool/vshard/issues/140
> > Branch:https://github.com/tarantool/vshard/tree/rave4life/gh-140-prohibit-multiple-cfg
> >
> > Changes in v2:
> >    - separated test for router and storage
> >    - locker wrapper function moved to vshard.utils
> >   test/router/multi_fiber_cfg.result    | 38 +++++++++++++++++++++++++
> >   test/router/multi_fiber_cfg.test.lua  | 12 ++++++++
> >   test/storage/multi_fiber_cfg.result   | 41 +++++++++++++++++++++++++++
> >   test/storage/multi_fiber_cfg.test.lua | 13 +++++++++
> >   vshard/router/init.lua                |  6 ++--
> >   vshard/storage/init.lua               |  4 +--
> >   vshard/util.lua                       | 17 +++++++++++
> >   7 files changed, 126 insertions(+), 5 deletions(-)
> >   create mode 100644 test/router/multi_fiber_cfg.result
> >   create mode 100644 test/router/multi_fiber_cfg.test.lua
> >   create mode 100644 test/storage/multi_fiber_cfg.result
> >   create mode 100644 test/storage/multi_fiber_cfg.test.lua
> >
> > diff --git a/test/router/multi_fiber_cfg.result
> > b/test/router/multi_fiber_cfg.result
> > new file mode 100644
> > index 0000000..42a4055
> > --- /dev/null
> > +++ b/test/router/multi_fiber_cfg.result
> > @@ -0,0 +1,38 @@
> > +test_run = require('test_run').new()
> > +---
> > +...
> > +cfg = require('localcfg')
> > +---
> > +...
> > +cfg.memtx_memory = nil
>
> 3. Why do you modify cfg?
> The same question for storage test.
>

See 'localcfg.lua': memtx_memory = 100 * 1024 * 1024.
Cause memtx_memory always set to larger size in previous tests, an error occured
while vshard.router/vshard.storage applies this config:
cannot decrease memory size at runtime.
Removed obvious nullifying by reinitialization 'cfg' entries on-a-fly.
Should I kill all instances (or full cluster clleanup) before this test?
Prefered not to do it making tests run as fast as possible.

> > +---
> > +...
> > +vshard = require('vshard')
>
> 4. As I said in the previous review, please,
> put this test into router/router.test.lua. It
> is not worth a separate file.

OK, fixed.

>
> The same remark for storage test.
>
> > +---
> > +...
> > +fiber = require('fiber')
> > +---
> > +...
> > +vshard.router.cfg(cfg)
> > +---
> > +...
> > +c = fiber.channel(2)
> > +---
> > +...
> > +f = function() fiber.create(function() local status, err =
> > pcall(vshard.router.cfg, cfg) c:put(status and true or false) end) end
> > +---
> > +...
> > +f()
> > +---
> > +...
> > +f()
> > +---
> > +...
> > +c:get()
> > +---
> > +- true
> > +...
> > +c:get()
> > +---
> > +- true
> > +...
> > diff --git a/vshard/router/init.lua b/vshard/router/init.lua
> > index 60ceafa..6b7a2c5 100644
> > --- a/vshard/router/init.lua
> > +++ b/vshard/router/init.lua
> > @@ -860,7 +860,7 @@ end
> >
> >   local router_mt = {
> >       __index = {
> > -        cfg = function(router, cfg) return router_cfg(router, cfg, false) end,
> > +        cfg = function(router, cfg) return
> > util.locker_func(router_cfg, router, cfg, false) end,
>
> 5. Out of 80 symbols per line.

Fixed.

>
> >           info = router_info;
> >           buckets_info = router_buckets_info;
> >           call = router_call;
> > diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> > index f9b8000..83f7343 100644
> > --- a/vshard/storage/init.lua
> > +++ b/vshard/storage/init.lua
> > @@ -1871,7 +1871,7 @@ if not rawget(_G, MODULE_INTERNALS) then
> >       rawset(_G, MODULE_INTERNALS, M)
> >   else
> >       reload_evolution.upgrade(M)
> > -    storage_cfg(M.current_cfg, M.this_replica.uuid, true)
> > + util.locker_func(storage_cfg, M.current_cfg, M.this_replica.uuid, true)
>
> 6. Do not use tabs in Lua files for indentation.
>

OK, tabs are replaced by spaces.

> >       M.module_version = M.module_version + 1
> >   end
> >
> > @@ -1908,7 +1908,7 @@ return {
> >       rebalancing_is_in_progress = rebalancing_is_in_progress,
> >       recovery_wakeup = recovery_wakeup,
> >       call = storage_call,
> > -    cfg = function(cfg, uuid) return storage_cfg(cfg, uuid, false) end,
> > +    cfg = function(cfg, uuid) return util.locker_func(storage_cfg,
> > cfg, uuid, false) end,
>
> 7. Out of 80.
>

Fixed.

> >       info = storage_info,
> >       buckets_info = storage_buckets_info,
> >       buckets_count = storage_buckets_count,
> > diff --git a/vshard/util.lua b/vshard/util.lua
> > index 3afaa61..d52598b 100644
> > --- a/vshard/util.lua
> > +++ b/vshard/util.lua
> > @@ -134,10 +136,25 @@ local function async_task(delay, task, ...)
> >       fiber.create(sync_task, delay, task, ...)
> >   end
> >
> > +local function locker_func(f, ...)
> > +    M.lock:put(true)
> > +    if M.cfg_counter > 0 then
> > +        error('Error: cfg_counter > 0')
> > +    end
> > +    M.cfg_counter = M.cfg_counter + 1
> > +    local status, err = pcall(f, ...)
> > +    M.cfg_counter = M.cfg_counter - 1
> > +    M.lock:get()
> > +    if not status then
> > +        error(err)
> > +    end
> > +end
>
> 8. Since you started working on this patch, multiple
> routers per process feature was pushed, so M now can
> not be used in router for the counter and the lock.
> Please, rebase. Now each router should have its own
> non-global lock.
>

Rebased, moved again to router and storage. Latch implementation
now is different for router and storage, because multiple routers
was pushed.

> 9. Please, write a comment to this function like it
> is done for other ones.
>

OK, short comment is written. But since latch wrapper function is
local per lua file, there is no need to write full description.

> > +
> >   return {
> >       tuple_extract_key = tuple_extract_key,
> >       reloadable_fiber_create = reloadable_fiber_create,
> >       generate_self_checker = generate_self_checker,
> >       async_task = async_task,
> > +    locker_func = locker_func,
> >       internal = M,
> >   }
> >

Please find below the fixed patch.

Changes in v3:
  - Separated test for router and storage (rebased to existing tests)
  - Locker wrapper function moved back to router/init.lua and storage/init.lua
  - Error injection added
  - Tabs removed

---
 test/router/router.result     | 39 ++++++++++++++++++++++++++++++++
 test/router/router.test.lua   | 14 ++++++++++++
 test/storage/storage.result   | 42 +++++++++++++++++++++++++++++++++++
 test/storage/storage.test.lua | 15 +++++++++++++
 vshard/router/init.lua        | 35 ++++++++++++++++++++++++++---
 vshard/storage/init.lua       | 32 ++++++++++++++++++++++++--
 6 files changed, 172 insertions(+), 5 deletions(-)

diff --git a/test/router/router.result b/test/router/router.result
index 000307d..ada8309 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -1140,6 +1140,45 @@ test_run:drop_cluster(REPLICASET_2)
 while test_run:grep_log('router_1', 'disconnected from ') == nil do
fiber.sleep(0.1) end
 ---
 ...
+-- gh-140: prohibit vshard.router/storage.cfg from multiple fibers.
+cfg = {sharding = require('localcfg').sharding}
+---
+...
+vshard = require('vshard')
+---
+...
+fiber = require('fiber')
+---
+...
+vshard.router.cfg(cfg)
+---
+...
+vshard.router.internal.errinj.ERRINJ_MULTIPLE_CFG = false
+---
+...
+c = fiber.channel(2)
+---
+...
+f = function() fiber.create(function() local status, err =
pcall(vshard.router.cfg, cfg) c:put(status and true or false) end) end
+---
+...
+f()
+---
+...
+f()
+---
+...
+c:get()
+---
+- true
+...
+c:get()
+---
+- true
+...
+vshard.router.internal.errinj.ERRINJ_MULTIPLE_CFG = false
+---
+...
 _ = test_run:cmd("stop server router_1")
 ---
 ...
diff --git a/test/router/router.test.lua b/test/router/router.test.lua
index 9f32d3e..fab476e 100644
--- a/test/router/router.test.lua
+++ b/test/router/router.test.lua
@@ -440,6 +440,20 @@ test_run:drop_cluster(REPLICASET_2)
 -- gh-24: log all connnect/disconnect events.
 while test_run:grep_log('router_1', 'disconnected from ') == nil do
fiber.sleep(0.1) end

+-- gh-140: prohibit vshard.router/storage.cfg from multiple fibers.
+cfg = {sharding = require('localcfg').sharding}
+vshard = require('vshard')
+fiber = require('fiber')
+vshard.router.cfg(cfg)
+vshard.router.internal.errinj.ERRINJ_MULTIPLE_CFG = false
+c = fiber.channel(2)
+f = function() fiber.create(function() local status, err =
pcall(vshard.router.cfg, cfg) c:put(status and true or false) end) end
+f()
+f()
+c:get()
+c:get()
+vshard.router.internal.errinj.ERRINJ_MULTIPLE_CFG = false
+
 _ = test_run:cmd("stop server router_1")
 _ = test_run:cmd("cleanup server router_1")
 test_run:drop_cluster(REPLICASET_1)
diff --git a/test/storage/storage.result b/test/storage/storage.result
index e3e9e7f..0228445 100644
--- a/test/storage/storage.result
+++ b/test/storage/storage.result
@@ -688,6 +688,48 @@ rs:callro('echo', {'some_data'})
 - null
 - null
 ...
+-- gh-140: prohibit vshard.router/storage.cfg from multiple fibers.
+cfg = {sharding = {}}
+---
+...
+cfg.sharding[box.info.cluster.uuid] = {replicas = {}}
+---
+...
+cfg.sharding[box.info.cluster.uuid].replicas[box.info.uuid] = {uri =
'storage:storage at 127.0.0.1:3309', name = 'test_storage', master =
true}
+---
+...
+vshard = require('vshard')
+---
+...
+fiber = require('fiber')
+---
+...
+vshard.storage.internal.errinj.ERRINJ_MULTIPLE_CFG = false
+---
+...
+c = fiber.channel(2)
+---
+...
+f = function() fiber.create(function() local status, err =
pcall(vshard.storage.cfg, cfg, box.info.uuid) c:put(status and true or
false) end) end
+---
+...
+f()
+---
+...
+f()
+---
+...
+c:get()
+---
+- true
+...
+c:get()
+---
+- true
+...
+vshard.storage.internal.errinj.ERRINJ_MULTIPLE_CFG = false
+---
+...
 _ = test_run:switch("default")
 ---
 ...
diff --git a/test/storage/storage.test.lua b/test/storage/storage.test.lua
index c07730b..6ceaa52 100644
--- a/test/storage/storage.test.lua
+++ b/test/storage/storage.test.lua
@@ -182,6 +182,21 @@ util.has_same_fields(old_internal, vshard.storage.internal)
 _, rs = next(vshard.storage.internal.replicasets)
 rs:callro('echo', {'some_data'})

+-- gh-140: prohibit vshard.router/storage.cfg from multiple fibers.
+cfg = {sharding = {}}
+cfg.sharding[box.info.cluster.uuid] = {replicas = {}}
+cfg.sharding[box.info.cluster.uuid].replicas[box.info.uuid] = {uri =
'storage:storage at 127.0.0.1:3309', name = 'test_storage', master =
true}
+vshard = require('vshard')
+fiber = require('fiber')
+vshard.storage.internal.errinj.ERRINJ_MULTIPLE_CFG = false
+c = fiber.channel(2)
+f = function() fiber.create(function() local status, err =
pcall(vshard.storage.cfg, cfg, box.info.uuid) c:put(status and true or
false) end) end
+f()
+f()
+c:get()
+c:get()
+vshard.storage.internal.errinj.ERRINJ_MULTIPLE_CFG = false
+
 _ = test_run:switch("default")
 test_run:drop_cluster(REPLICASET_2)
 test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 7573801..446b1a0 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -28,6 +28,7 @@ if not M then
         ---------------- Common module attributes ----------------
         errinj = {
             ERRINJ_CFG = false,
+            ERRINJ_MULTIPLE_CFG = false,
             ERRINJ_FAILOVER_CHANGE_CFG = false,
             ERRINJ_RELOAD = false,
             ERRINJ_LONG_DISCOVERY = false,
@@ -53,6 +54,10 @@ local ROUTER_TEMPLATE = {
         name = nil,
         -- The last passed configuration.
         current_cfg = nil,
+        -- Fiber channel as the locker to prevent multiple config.
+        cfg_lock = nil,
+        -- Configuration counter to control and verify locker.
+        cfg_counter = 0,
         -- Time to outdate old objects on reload.
         connection_outdate_delay = nil,
         -- Bucket map cache.
@@ -594,6 +599,29 @@ local function router_cfg(router, cfg, is_reload)
     end
 end

+--------------------------------------------------------------------------------
+-- Configuration wrapper
+--------------------------------------------------------------------------------
+
+local function cfg_wrapper(router, ...)
+    local enable_lock = not M.errinj.ERRINJ_MULTIPLE_CFG
+    if enable_lock then
+        router.cfg_lock:put(true)
+    end
+    if router.cfg_counter > 0 then
+        error('Error: cfg_counter > 0')
+    end
+    router.cfg_counter = router.cfg_counter + 1
+    local status, err = pcall(router_cfg, router, ...)
+    router.cfg_counter = router.cfg_counter - 1
+    if enable_lock then
+        router.cfg_lock:get()
+    end
+    if not status then
+        error(err)
+    end
+end
+
 --------------------------------------------------------------------------------
 -- Bootstrap
 --------------------------------------------------------------------------------
@@ -862,7 +890,7 @@ end

 local router_mt = {
     __index = {
-        cfg = function(router, cfg) return router_cfg(router, cfg, false) end,
+        cfg = function(router, cfg) return cfg_wrapper(router, cfg, false) end,
         info = router_info;
         buckets_info = router_buckets_info;
         call = router_call;
@@ -920,6 +948,7 @@ local function router_new(name, cfg)
     local router = table.deepcopy(ROUTER_TEMPLATE)
     setmetatable(router, router_mt)
     router.name = name
+    router.cfg_lock = lfiber.channel(1)
     M.routers[name] = router
     local ok, err = pcall(router_cfg, router, cfg)
     if not ok then
@@ -936,7 +965,7 @@ end
 local function legacy_cfg(cfg)
     if M.static_router then
         -- Reconfigure.
-        router_cfg(M.static_router, cfg, false)
+        cfg_wrapper(M.static_router, cfg, false)
     else
         -- Create new static instance.
         local router, err = router_new(STATIC_ROUTER_NAME, cfg)
@@ -961,7 +990,7 @@ if not rawget(_G, MODULE_INTERNALS) then
     rawset(_G, MODULE_INTERNALS, M)
 else
     for _, router in pairs(M.routers) do
-        router_cfg(router, router.current_cfg, true)
+        cfg_wrapper(router, router.current_cfg, true)
         setmetatable(router, router_mt)
     end
     if M.static_router then
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index dc059ba..7deb67b 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -48,6 +48,10 @@ if not M then
         ---------------- Common module attributes ----------------
         -- The last passed configuration.
         current_cfg = nil,
+        -- Fiber channel as the locker to prevent multiple config.
+        cfg_lock = lfiber.channel(1),
+        -- Configuration counter to control and verify locker.
+        cfg_counter = 0,
         --
         -- All known replicasets used for bucket re-balancing.
         -- See format in replicaset.lua.
@@ -66,6 +70,7 @@ if not M then
         total_bucket_count = 0,
         errinj = {
             ERRINJ_CFG = false,
+            ERRINJ_MULTIPLE_CFG = false,
             ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
             ERRINJ_RELOAD = false,
             ERRINJ_CFG_DELAY = false,
@@ -1843,6 +1848,29 @@ local function storage_cfg(cfg,
this_replica_uuid, is_reload)
     collectgarbage()
 end

+--------------------------------------------------------------------------------
+-- Configuration wrapper
+--------------------------------------------------------------------------------
+
+local function storage_cfg_wrapper(...)
+    local enable_lock = not M.errinj.ERRINJ_MULTIPLE_CFG
+    if enable_lock then
+        M.cfg_lock:put(true)
+    end
+    if M.cfg_counter > 0 then
+       error('Error: cfg_counter > 0')
+    end
+    M.cfg_counter = M.cfg_counter + 1
+    local status, err = pcall(storage_cfg, ...)
+    M.cfg_counter = M.cfg_counter - 1
+    if enable_lock then
+        M.cfg_lock:get()
+    end
+    if not status then
+        error(err)
+    end
+end
+
 --------------------------------------------------------------------------------
 -- Monitoring
 --------------------------------------------------------------------------------
@@ -2060,7 +2088,7 @@ if not rawget(_G, MODULE_INTERNALS) then
     rawset(_G, MODULE_INTERNALS, M)
 else
     reload_evolution.upgrade(M)
-    storage_cfg(M.current_cfg, M.this_replica.uuid, true)
+    storage_cfg_wrapper(M.current_cfg, M.this_replica.uuid, true)
     M.module_version = M.module_version + 1
 end

@@ -2103,7 +2131,7 @@ return {
     rebalancing_is_in_progress = rebalancing_is_in_progress,
     recovery_wakeup = recovery_wakeup,
     call = storage_call,
-    cfg = function(cfg, uuid) return storage_cfg(cfg, uuid, false) end,
+    cfg = function(cfg, uuid) return storage_cfg_wrapper(cfg, uuid, false) end,
     info = storage_info,
     buckets_info = storage_buckets_info,
     buckets_count = storage_buckets_count,
-- 
2.17.1.windows.1




More information about the Tarantool-patches mailing list