Tarantool development patches archive
 help / color / mirror / Atom feed
From: Alex Khatskevich <avkhatskevich@tarantool.org>
To: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>,
	tarantool-patches@freelists.org
Subject: [tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber
Date: Wed, 27 Jun 2018 13:44:10 +0300	[thread overview]
Message-ID: <51114d53-806d-5c65-3424-7a965a434a98@tarantool.org> (raw)
In-Reply-To: <527ad322-e091-dcf6-ad74-5de5a4fcc6ac@tarantool.org>



On 27.06.2018 12:54, Vladislav Shpilevoy wrote:
> Hello. Thanks for the fixes! See 4 comments below.
>
> On 26/06/2018 15:50, Alex Khatskevich wrote:
>> Sorry, I forgot to put the new patch here
>>
>> commit 4ddab3c47c5963c3138701d89ba3091d5da9848a
>> Author: AKhatskevich <avkhatskevich@tarantool.org>
>> Date:   Thu Jun 14 14:03:07 2018 +0300
>>
>>      Reload reloadable fiber
>>
>>      Fixed a problem:
>>      The `reloadable_fiber_f` was running an infinite while loop and
>>      preventing the whole module from being reloaded.
>>
>>      This behavior is fixed by calling new version of 
>> `reloadable_fiber_f` in
>>      a return statement instead of the where loop.  Note: calling a 
>> function
>
> 1. Again 'where loop'.
>
Sorry
>>      in a return statement doesn't increase a stack size.
>>
>>      Extra changes:
>>       * transfer write "started" message responsibility from caller to
>>         reloadable fiber
>>
>>      Closes #116
>>
>> diff --git a/test/unit/util.result b/test/unit/util.result
>> new file mode 100644
>> index 0000000..dbfcce9
>> --- /dev/null
>> +++ b/test/unit/util.result
>> @@ -0,0 +1,72 @@
>> +test_run = require('test_run').new()
>> +---
>> +...
>> +fiber = require('fiber')
>> +---
>> +...
>> +log = require('log')
>> +---
>> +...
>> +test_util = require('util')
>> +---
>> +...
>> +util = require('vshard.util')
>> +---
>> +...
>> +test_run:cmd("setopt delimiter ';'")
>> +---
>> +- true
>> +...
>> +fake_M = {
>> +    reloadable_func = nil,
>> +    module_version = 1,
>> +};
>> +---
>> +...
>> +test_run:cmd("setopt delimiter ''");
>> +---
>> +- true
>> +...
>> +-- Check autoreload on function change during failure.
>> +fake_M.reloadable_function = function () fake_M.reloadable_function 
>> = 42; error('Error happened.') end
>> +---
>> +...
>> +fib = fiber.create(util.reloadable_fiber_f, fake_M, 
>> 'reloadable_function', 'Worker_name')
>> +---
>> +...
>> +fib:cancel()
>> +---
>> +...
>> +test_run:grep_log('default', 'Worker_name: reloadable function 
>> reloadable_function has changed')
>
> 2. 'Has been' changed?
I has changed it, but there are two notes:
1. Both are correct 
https://english.stackexchange.com/questions/97243/has-been-changed-or-has-changed
2. vshard uses both options:
    a. "Configuration has changed, restart"
    b. "Rebalancer location has changed"
>
>> +---
>> +- 'Worker_name: reloadable function reloadable_function has changed'
>> +...
>> diff --git a/vshard/util.lua b/vshard/util.lua
>> index bb71318..920f53e 100644
>> --- a/vshard/util.lua
>> +++ b/vshard/util.lua
>> @@ -19,33 +32,37 @@ local function tuple_extract_key(tuple, parts)
>> +local function reloadable_fiber_f(module, func_name, worker_name)
>> +    log.info('%s has been started', worker_name)
>> +    local func = module[func_name]
>> +    local ok, err = pcall(func, module.module_version)
>> +    if not ok then
>> +        log.error('%s has been failed: %s', worker_name, err)
>> +        if func ~= module[func_name] then
>> +            log.warn('%s: reloadable function %s has changed',
>> +                        worker_name, func_name)
>
> 3. Why is these check and warn here, after an error? 
Changed:
1. warn -> error
2. Comment added:
         -- There is a chance that error was raised during reload
         -- (or caused by reload). Perform reload in case function
         -- has been changed.
>
>>           end
>>       end
>> +    -- yield serves two pursoses:
>> +    --  * makes this fiber cancellable
>> +    --  * prevents 100% cpu consumption
>> +    fiber.yield()
>> +    log.info('%s is reloaded, restarting', worker_name)
>
> 4. Why do you print 'is reloaded' even if it just thrown an error
> and the function is not changed actually?

Fixed by goto
>
>> +    -- luajit drops this frame if next function is called in
>> +    -- return statement.
>> +    return M.reloadable_fiber_f(module, func_name, worker_name)
>>   end

Full diff:

commit d098bf0930935b029fbca3f4fd78211dd5022fb1
Author: AKhatskevich <avkhatskevich@tarantool.org>
Date:   Thu Jun 14 14:03:07 2018 +0300

     Reload reloadable fiber

     Fixed a problem:
     The `reloadable_fiber_f` was running an infinite while loop and
     preventing the whole module from being reloaded.

     This behavior is fixed by calling new version of 
`reloadable_fiber_f` in
     a return statement instead of the while loop. Note: calling a function
     in a return statement doesn't increase a stack size.

     Extra changes:
      * transfer write "started" message responsibility from caller to
        reloadable fiber

     Closes #116

diff --git a/test/rebalancer/rebalancer.result 
b/test/rebalancer/rebalancer.result
index 22100fe..6e3aa81 100644
--- a/test/rebalancer/rebalancer.result
+++ b/test/rebalancer/rebalancer.result
@@ -424,7 +424,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)
  fiber = require('fiber')
  ---
  ...
-while not test_run:grep_log('box_2_a', "Run rebalancer") do 
fiber.sleep(0.1) end
+while not test_run:grep_log('box_2_a', "Rebalancer has been started") 
do fiber.sleep(0.1) end
  ---
  ...
  while not test_run:grep_log('box_1_a', "Rebalancer location has 
changed") do fiber.sleep(0.1) end
diff --git a/test/rebalancer/rebalancer.test.lua 
b/test/rebalancer/rebalancer.test.lua
index 3327f30..922357a 100644
--- a/test/rebalancer/rebalancer.test.lua
+++ b/test/rebalancer/rebalancer.test.lua
@@ -197,7 +197,7 @@ switch_rs1_master()
  vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)

  fiber = require('fiber')
-while not test_run:grep_log('box_2_a', "Run rebalancer") do 
fiber.sleep(0.1) end
+while not test_run:grep_log('box_2_a', "Rebalancer has been started") 
do fiber.sleep(0.1) end
  while not test_run:grep_log('box_1_a', "Rebalancer location has 
changed") do fiber.sleep(0.1) end

  --
diff --git a/test/router/reload.result b/test/router/reload.result
index 19a9ead..47f3c2e 100644
--- a/test/router/reload.result
+++ b/test/router/reload.result
@@ -116,10 +116,10 @@ vshard.router.module_version()
  check_reloaded()
  ---
  ...
-while test_run:grep_log('router_1', 'Failover has been reloaded') == 
nil do fiber.sleep(0.1) end
+while test_run:grep_log('router_1', 'Failover has been started') == nil 
do fiber.sleep(0.1) end
  ---
  ...
-while test_run:grep_log('router_1', 'Discovery has been reloaded') == 
nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+while test_run:grep_log('router_1', 'Discovery has been started') == 
nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
  ---
  ...
  check_reloaded()
diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
index 6e21b74..af2939d 100644
--- a/test/router/reload.test.lua
+++ b/test/router/reload.test.lua
@@ -59,8 +59,8 @@ vshard.router.module_version()

  check_reloaded()

-while test_run:grep_log('router_1', 'Failover has been reloaded') == 
nil do fiber.sleep(0.1) end
-while test_run:grep_log('router_1', 'Discovery has been reloaded') == 
nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+while test_run:grep_log('router_1', 'Failover has been started') == nil 
do fiber.sleep(0.1) end
+while test_run:grep_log('router_1', 'Discovery has been started') == 
nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end

  check_reloaded()

diff --git a/test/storage/reload.result b/test/storage/reload.result
index f689cf4..531d984 100644
--- a/test/storage/reload.result
+++ b/test/storage/reload.result
@@ -113,13 +113,13 @@ vshard.storage.module_version()
  check_reloaded()
  ---
  ...
-while test_run:grep_log('storage_2_a', 'Garbage collector has been 
reloaded') == nil do fiber.sleep(0.1) end
+while test_run:grep_log('storage_2_a', 'Garbage collector has been 
started') == nil do fiber.sleep(0.1) end
  ---
  ...
-while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == 
nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
+while test_run:grep_log('storage_2_a', 'Recovery has been started') == 
nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
  ---
  ...
-while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') 
== nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
+while test_run:grep_log('storage_2_a', 'Rebalancer has been started') 
== nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
  ---
  ...
  check_reloaded()
diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
index 6e19a92..64c3a60 100644
--- a/test/storage/reload.test.lua
+++ b/test/storage/reload.test.lua
@@ -59,9 +59,9 @@ vshard.storage.module_version()

  check_reloaded()

-while test_run:grep_log('storage_2_a', 'Garbage collector has been 
reloaded') == nil do fiber.sleep(0.1) end
-while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == 
nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
-while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') 
== nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
+while test_run:grep_log('storage_2_a', 'Garbage collector has been 
started') == nil do fiber.sleep(0.1) end
+while test_run:grep_log('storage_2_a', 'Recovery has been started') == 
nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
+while test_run:grep_log('storage_2_a', 'Rebalancer has been started') 
== nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end

  check_reloaded()

diff --git a/test/unit/util.result b/test/unit/util.result
new file mode 100644
index 0000000..a7b176d
--- /dev/null
+++ b/test/unit/util.result
@@ -0,0 +1,72 @@
+test_run = require('test_run').new()
+---
+...
+fiber = require('fiber')
+---
+...
+log = require('log')
+---
+...
+test_util = require('util')
+---
+...
+util = require('vshard.util')
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+fake_M = {
+    reloadable_func = nil,
+    module_version = 1,
+};
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Check autoreload on function change during failure.
+fake_M.reloadable_function = function () fake_M.reloadable_function = 
42; error('Error happened.') end
+---
+...
+fib = fiber.create(util.reloadable_fiber_f, fake_M, 
'reloadable_function', 'Worker_name')
+---
+...
+fib:cancel()
+---
+...
+test_run:grep_log('default', 'Worker_name: reloadable function 
reloadable_function has been changed')
+---
+- 'Worker_name: reloadable function reloadable_function has been changed'
+...
+test_run:grep_log('default', 'Worker_name is reloaded, restarting')
+---
+- Worker_name is reloaded, restarting
+...
+test_run:grep_log('default', 'Worker_name has been started')
+---
+- Worker_name has been started
+...
+log.info(string.rep('a', 1000))
+---
+...
+-- Check reload feature.
+fake_M.reloadable_function = function () return true end
+---
+...
+fib = fiber.create(util.reloadable_fiber_f, fake_M, 
'reloadable_function', 'Worker_name')
+---
+...
+fib:cancel()
+---
+...
+test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000)
+---
+- Worker_name is reloaded, restarting
+...
+test_run:grep_log('default', 'Worker_name has been started', 1000)
+---
+- Worker_name has been started
+...
diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua
new file mode 100644
index 0000000..9359378
--- /dev/null
+++ b/test/unit/util.test.lua
@@ -0,0 +1,29 @@
+test_run = require('test_run').new()
+fiber = require('fiber')
+log = require('log')
+test_util = require('util')
+util = require('vshard.util')
+
+test_run:cmd("setopt delimiter ';'")
+fake_M = {
+    reloadable_func = nil,
+    module_version = 1,
+};
+test_run:cmd("setopt delimiter ''");
+
+-- Check autoreload on function change during failure.
+fake_M.reloadable_function = function () fake_M.reloadable_function = 
42; error('Error happened.') end
+
+fib = fiber.create(util.reloadable_fiber_f, fake_M, 
'reloadable_function', 'Worker_name')
+fib:cancel()
+test_run:grep_log('default', 'Worker_name: reloadable function 
reloadable_function has been changed')
+test_run:grep_log('default', 'Worker_name is reloaded, restarting')
+test_run:grep_log('default', 'Worker_name has been started')
+log.info(string.rep('a', 1000))
+
+-- Check reload feature.
+fake_M.reloadable_function = function () return true end
+fib = fiber.create(util.reloadable_fiber_f, fake_M, 
'reloadable_function', 'Worker_name')
+fib:cancel()
+test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000)
+test_run:grep_log('default', 'Worker_name has been started', 1000)
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 21093e5..9e18b27 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -491,11 +491,9 @@ local function router_cfg(cfg)
      end
      lreplicaset.wait_masters_connect(new_replicasets)
      if M.failover_fiber == nil then
-        log.info('Start failover fiber')
          lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 
'Failover')
      end
      if M.discovery_fiber == nil then
-        log.info('Start discovery fiber')
          lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 
'Discovery')
      end
      -- Destroy connections, not used in a new configuration.
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 57076e1..300751f 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -692,10 +692,8 @@ local function local_on_master_enable()
      M.collect_bucket_garbage_fiber =
          lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f',
                        'Garbage collector')
-    log.info("GC is started")
      M.recovery_fiber =
          lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 
'Recovery')
-    log.info('Recovery is started')
      -- TODO: check current status
      log.info("Took on replicaset master role")
  end
@@ -1585,7 +1583,6 @@ local function storage_cfg(cfg, this_replica_uuid)

      if min_master == this_replica then
          if not M.rebalancer_fiber then
-            log.info('Run rebalancer')
              M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M,
'rebalancer_f', 'Rebalancer')
          else
diff --git a/vshard/util.lua b/vshard/util.lua
index bb71318..ce79930 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -2,6 +2,19 @@
  local log = require('log')
  local fiber = require('fiber')

+local MODULE_INTERNALS = '__module_vshard_util'
+local M = rawget(_G, MODULE_INTERNALS)
+if not M then
+    --
+    -- The module is loaded for the first time.
+    --
+    M = {
+        -- Latest versions of functions.
+        reloadable_fiber_f = nil,
+    }
+    rawset(_G, MODULE_INTERNALS, M)
+end
+
  --
  -- Extract parts of a tuple.
  -- @param tuple Tuple to extract a key from.
@@ -19,33 +32,42 @@ local function tuple_extract_key(tuple, parts)
  end

  --
--- Wrapper to run @a func in infinite loop and restart it on the
--- module reload. This function CAN NOT BE AUTORELOADED. To update
--- it you must manualy stop all fibers, run by this function, do
--- reload, and then restart all stopped fibers. This can be done,
--- for example, by calling vshard.storage/router.cfg() again with
--- the same config as earlier.
+-- Wrapper to run a func in infinite loop and restart it on
+-- errors and module reload.
+-- To handle module reload and run new version of a function
+-- in the module, the function should just return.
  --
--- @param func Reloadable function to run. It must accept current
---        module version as an argument, and interrupt itself,
---        when it is changed.
--- @param worker_name Name of the function. Usual infinite fiber
---        represents a background subsystem, which has a name. For
---        example: "Garbage Collector", "Recovery", "Discovery",
---        "Rebalancer".
--- @param M Module which can reload.
+-- @param module Module which can be reloaded.
+-- @param func_name Name of a function to be executed in the
+--        module.
+-- @param worker_name Name of the reloadable background subsystem.
+--        For example: "Garbage Collector", "Recovery", "Discovery",
+--        "Rebalancer". Used only for an activity logging.
  --
-local function reloadable_fiber_f(M, func_name, worker_name)
-    while true do
-        local ok, err = pcall(M[func_name], M.module_version)
-        if not ok then
-            log.error('%s has been failed: %s', worker_name, err)
-            fiber.yield()
-        else
-            log.info('%s has been reloaded', worker_name)
-            fiber.yield()
+local function reloadable_fiber_f(module, func_name, worker_name)
+    log.info('%s has been started', worker_name)
+    local func = module[func_name]
+::reload_loop::
+    local ok, err = pcall(func, module.module_version)
+    -- yield serves two pursoses:
+    --  * makes this fiber cancellable
+    --  * prevents 100% cpu consumption
+    fiber.yield()
+    if not ok then
+        log.error('%s has been failed: %s', worker_name, err)
+        if func == module[func_name] then
+            goto reload_loop
          end
+        -- There is a chance that error was raised during reload
+        -- (or caused by reload). Perform reload in case function
+        -- has been changed.
+        log.error('%s: reloadable function %s has been changed',
+                  worker_name, func_name)
      end
+    log.info('%s is reloaded, restarting', worker_name)
+    -- luajit drops this frame if next function is called in
+    -- return statement.
+    return M.reloadable_fiber_f(module, func_name, worker_name)
  end

  --
@@ -75,8 +97,12 @@ local function generate_self_checker(obj_name, 
func_name, mt, func)
      end
  end

+-- Update latest versions of function
+M.reloadable_fiber_f = reloadable_fiber_f
+
  return {
      tuple_extract_key = tuple_extract_key,
      reloadable_fiber_f = reloadable_fiber_f,
      generate_self_checker = generate_self_checker,
+    internal = M,
  }

  reply	other threads:[~2018-06-27 10:44 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-06-14 11:42 [tarantool-patches] " AKhatskevich
2018-06-14 11:58 ` AKhatskevich
2018-06-21 12:04 ` [tarantool-patches] " Vladislav Shpilevoy
2018-06-26 12:43   ` Alex Khatskevich
2018-06-26 12:50     ` Alex Khatskevich
2018-06-27  9:54       ` Vladislav Shpilevoy
2018-06-27 10:44         ` Alex Khatskevich [this message]
2018-06-27 11:34           ` Vladislav Shpilevoy
2018-06-27 11:45             ` Alex Khatskevich
2018-06-27 11:53               ` Vladislav Shpilevoy
2018-06-27 13:11                 ` Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=51114d53-806d-5c65-3424-7a965a434a98@tarantool.org \
    --to=avkhatskevich@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='[tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox