[Tarantool-patches] [PATCH 2/9] Use fiber.clock() instead of .time() everywhere

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Feb 10 02:46:08 MSK 2021


Fiber.time() returns real time. It is affected by time corrections
in the system, and can be not monotonic.

The patch makes everything in vshard use fiber.clock() instead of
fiber.time(). Also fiber.clock function is saved as an upvalue for
all functions in all modules using it. This makes the code a bit
shorter and saves 1 indexing of 'fiber' table.

The main reason - in the future map-reduce feature the current
time will be used quite often. In some places it probably will be
the slowest action (given how slow FFI can be when not compiled by
JIT).

Needed for #147
---
 test/failover/failover.result   |  4 ++--
 test/failover/failover.test.lua |  4 ++--
 vshard/replicaset.lua           | 13 +++++++------
 vshard/router/init.lua          | 16 ++++++++--------
 vshard/storage/init.lua         | 16 ++++++++--------
 5 files changed, 27 insertions(+), 26 deletions(-)

diff --git a/test/failover/failover.result b/test/failover/failover.result
index 452694c..bae57fa 100644
--- a/test/failover/failover.result
+++ b/test/failover/failover.result
@@ -261,13 +261,13 @@ test_run:cmd('start server box_1_d')
 ---
 - true
 ...
-ts1 = fiber.time()
+ts1 = fiber.clock()
 ---
 ...
 while rs1.replica.name ~= 'box_1_d' do fiber.sleep(0.1) end
 ---
 ...
-ts2 = fiber.time()
+ts2 = fiber.clock()
 ---
 ...
 ts2 - ts1 < vshard.consts.FAILOVER_UP_TIMEOUT
diff --git a/test/failover/failover.test.lua b/test/failover/failover.test.lua
index 13c517b..a969e0e 100644
--- a/test/failover/failover.test.lua
+++ b/test/failover/failover.test.lua
@@ -109,9 +109,9 @@ test_run:switch('router_1')
 -- Revive the best replica. A router must reconnect to it in
 -- FAILOVER_UP_TIMEOUT seconds.
 test_run:cmd('start server box_1_d')
-ts1 = fiber.time()
+ts1 = fiber.clock()
 while rs1.replica.name ~= 'box_1_d' do fiber.sleep(0.1) end
-ts2 = fiber.time()
+ts2 = fiber.clock()
 ts2 - ts1 < vshard.consts.FAILOVER_UP_TIMEOUT
 test_run:grep_log('router_1', 'New replica box_1_d%(storage%@')
 
diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index b13d05e..a74c0f8 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -54,6 +54,7 @@ local luri = require('uri')
 local luuid = require('uuid')
 local ffi = require('ffi')
 local util = require('vshard.util')
+local clock = fiber.clock
 local gsc = util.generate_self_checker
 
 --
@@ -88,7 +89,7 @@ local function netbox_on_connect(conn)
         -- biggest priority. Really, it is not neccessary to
         -- increase replica connection priority, if the current
         -- one already has the biggest priority. (See failover_f).
-        rs.replica_up_ts = fiber.time()
+        rs.replica_up_ts = clock()
     end
 end
 
@@ -100,7 +101,7 @@ local function netbox_on_disconnect(conn)
     assert(conn.replica)
     -- Replica is down - remember this time to decrease replica
     -- priority after FAILOVER_DOWN_TIMEOUT seconds.
-    conn.replica.down_ts = fiber.time()
+    conn.replica.down_ts = clock()
 end
 
 --
@@ -174,7 +175,7 @@ local function replicaset_up_replica_priority(replicaset)
     local old_replica = replicaset.replica
     if old_replica == replicaset.priority_list[1] and
        old_replica:is_connected() then
-        replicaset.replica_up_ts = fiber.time()
+        replicaset.replica_up_ts = clock()
         return
     end
     for _, replica in pairs(replicaset.priority_list) do
@@ -403,7 +404,7 @@ local function replicaset_template_multicallro(prefer_replica, balance)
             net_status, err = pcall(box.error, box.error.TIMEOUT)
             return nil, lerror.make(err)
         end
-        local end_time = fiber.time() + timeout
+        local end_time = clock() + timeout
         while not net_status and timeout > 0 do
             replica, err = pick_next_replica(replicaset)
             if not replica then
@@ -412,7 +413,7 @@ local function replicaset_template_multicallro(prefer_replica, balance)
             opts.timeout = timeout
             net_status, storage_status, retval, err =
                 replica_call(replica, func, args, opts)
-            timeout = end_time - fiber.time()
+            timeout = end_time - clock()
             if not net_status and not storage_status and
                not can_retry_after_error(retval) then
                 -- There is no sense to retry LuaJit errors, such as
@@ -680,7 +681,7 @@ local function buildall(sharding_cfg)
     else
         zone_weights = {}
     end
-    local curr_ts = fiber.time()
+    local curr_ts = clock()
     for replicaset_uuid, replicaset in pairs(sharding_cfg.sharding) do
         local new_replicaset = setmetatable({
             replicas = {},
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index ba1f863..a530c29 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -1,6 +1,7 @@
 local log = require('log')
 local lfiber = require('fiber')
 local table_new = require('table.new')
+local clock = lfiber.clock
 
 local MODULE_INTERNALS = '__module_vshard_router'
 -- Reload requirements, in case this module is reloaded manually.
@@ -527,7 +528,7 @@ local function router_call_impl(router, bucket_id, mode, prefer_replica,
     end
     local timeout = opts.timeout or consts.CALL_TIMEOUT_MIN
     local replicaset, err
-    local tend = lfiber.time() + timeout
+    local tend = clock() + timeout
     if bucket_id > router.total_bucket_count or bucket_id <= 0 then
         error('Bucket is unreachable: bucket id is out of range')
     end
@@ -551,7 +552,7 @@ local function router_call_impl(router, bucket_id, mode, prefer_replica,
         replicaset, err = bucket_resolve(router, bucket_id)
         if replicaset then
 ::replicaset_is_found::
-            opts.timeout = tend - lfiber.time()
+            opts.timeout = tend - clock()
             local storage_call_status, call_status, call_error =
                 replicaset[call](replicaset, 'vshard.storage.call',
                                  {bucket_id, mode, func, args}, opts)
@@ -583,7 +584,7 @@ local function router_call_impl(router, bucket_id, mode, prefer_replica,
                         -- if reconfiguration had been started,
                         -- and while is not executed on router,
                         -- but already is executed on storages.
-                        while lfiber.time() <= tend do
+                        while clock() <= tend do
                             lfiber.sleep(0.05)
                             replicaset = router.replicasets[err.destination]
                             if replicaset then
@@ -598,7 +599,7 @@ local function router_call_impl(router, bucket_id, mode, prefer_replica,
                         -- case of broken cluster, when a bucket
                         -- is sent on two replicasets to each
                         -- other.
-                        if replicaset and lfiber.time() <= tend then
+                        if replicaset and clock() <= tend then
                             goto replicaset_is_found
                         end
                     end
@@ -623,7 +624,7 @@ local function router_call_impl(router, bucket_id, mode, prefer_replica,
             end
         end
         lfiber.yield()
-    until lfiber.time() > tend
+    until clock() > tend
     if err then
         return nil, err
     else
@@ -749,7 +750,7 @@ end
 -- connections must be updated.
 --
 local function failover_collect_to_update(router)
-    local ts = lfiber.time()
+    local ts = clock()
     local uuid_to_update = {}
     for uuid, rs in pairs(router.replicasets) do
         if failover_need_down_priority(rs, ts) or
@@ -772,7 +773,7 @@ local function failover_step(router)
     if #uuid_to_update == 0 then
         return false
     end
-    local curr_ts = lfiber.time()
+    local curr_ts = clock()
     local replica_is_changed = false
     for _, uuid in pairs(uuid_to_update) do
         local rs = router.replicasets[uuid]
@@ -1230,7 +1231,6 @@ local function router_sync(router, timeout)
         timeout = router.sync_timeout
     end
     local arg = {timeout}
-    local clock = lfiber.clock
     local deadline = timeout and (clock() + timeout)
     local opts = {timeout = timeout}
     for rs_uuid, replicaset in pairs(router.replicasets) do
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 1b48bf1..c7335fc 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -5,6 +5,7 @@ local netbox = require('net.box') -- for net.box:self()
 local trigger = require('internal.trigger')
 local ffi = require('ffi')
 local yaml_encode = require('yaml').encode
+local clock = lfiber.clock
 
 local MODULE_INTERNALS = '__module_vshard_storage'
 -- Reload requirements, in case this module is reloaded manually.
@@ -695,7 +696,7 @@ local function sync(timeout)
     log.debug("Synchronizing replicaset...")
     timeout = timeout or M.sync_timeout
     local vclock = box.info.vclock
-    local tstart = lfiber.time()
+    local tstart = clock()
     repeat
         local done = true
         for _, replica in ipairs(box.info.replication) do
@@ -711,7 +712,7 @@ local function sync(timeout)
             return true
         end
         lfiber.sleep(0.001)
-    until not (lfiber.time() <= tstart + timeout)
+    until not (clock() <= tstart + timeout)
     log.warn("Timed out during synchronizing replicaset")
     local ok, err = pcall(box.error, box.error.TIMEOUT)
     return nil, lerror.make(err)
@@ -1280,10 +1281,9 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard)
     ref.rw_lock = true
     exception_guard.ref = ref
     exception_guard.drop_rw_lock = true
-    local deadline = lfiber.clock() + (opts and opts.timeout or 10)
+    local deadline = clock() + (opts and opts.timeout or 10)
     while ref.rw ~= 0 do
-        if not M.bucket_rw_lock_is_ready_cond:wait(deadline -
-                                                   lfiber.clock()) then
+        if not M.bucket_rw_lock_is_ready_cond:wait(deadline - clock()) then
             status, err = pcall(box.error, box.error.TIMEOUT)
             return nil, lerror.make(err)
         end
@@ -1579,7 +1579,7 @@ function gc_bucket_f()
     -- specified time interval the buckets are deleted both from
     -- this array and from _bucket space.
     local buckets_for_redirect = {}
-    local buckets_for_redirect_ts = lfiber.time()
+    local buckets_for_redirect_ts = clock()
     -- Empty sent buckets, updated after each step, and when
     -- buckets_for_redirect is deleted, it gets empty_sent_buckets
     -- for next deletion.
@@ -1614,7 +1614,7 @@ function gc_bucket_f()
             end
         end
 
-        if lfiber.time() - buckets_for_redirect_ts >=
+        if clock() - buckets_for_redirect_ts >=
            consts.BUCKET_SENT_GARBAGE_DELAY then
             status, err = gc_bucket_drop(buckets_for_redirect,
                                          consts.BUCKET.SENT)
@@ -1629,7 +1629,7 @@ function gc_bucket_f()
             else
                 buckets_for_redirect = empty_sent_buckets or {}
                 empty_sent_buckets = nil
-                buckets_for_redirect_ts = lfiber.time()
+                buckets_for_redirect_ts = clock()
             end
         end
 ::continue::
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list