[tarantool-patches] [PATCH 06/10] swim: Lua bindings to access individual members

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed May 15 22:36:43 MSK 2019


Expose API to search members by UUID, to read their attributes,
to set payload.

Part of #3234
---
 src/lua/swim.lua        | 222 ++++++++++++++++++++++++
 test/swim/swim.result   | 366 +++++++++++++++++++++++++++++++++++++++-
 test/swim/swim.test.lua | 119 +++++++++++++
 3 files changed, 705 insertions(+), 2 deletions(-)

diff --git a/src/lua/swim.lua b/src/lua/swim.lua
index c828faceb..e40edda18 100644
--- a/src/lua/swim.lua
+++ b/src/lua/swim.lua
@@ -1,5 +1,7 @@
 local ffi = require('ffi')
 local uuid = require('uuid')
+local buffer = require('buffer')
+local msgpack = require('msgpack')
 
 ffi.cdef[[
     struct swim;
@@ -100,6 +102,14 @@ ffi.cdef[[
 local capi = ffi.C
 
 local swim_t = ffi.typeof('struct swim *')
+local swim_member_t = ffi.typeof('struct swim_member *')
+
+local swim_member_status_strs = {
+    [capi.MEMBER_ALIVE] = 'alive',
+    [capi.MEMBER_SUSPECTED] = 'suspected',
+    [capi.MEMBER_DEAD] = 'dead',
+    [capi.MEMBER_LEFT] = 'left'
+}
 
 --
 -- Check if @a value is something that can be passed as a
@@ -212,6 +222,142 @@ local function swim_check_instance(s, func_name)
     return error(func_name..': first argument is not a SWIM instance')
 end
 
+--
+-- The same for SWIM member.
+--
+local function swim_check_member(m, func_name)
+    if type(m) == 'table' then
+        local ptr = m.ptr
+        if ffi.istype(swim_member_t, ptr) then
+            return ptr
+        end
+    end
+    return error(func_name..': first argument is not a SWIM member')
+end
+
+--
+-- Member methods. Most of them are one-liners, not much to
+-- comment.
+--
+
+local function swim_member_status(m)
+    local ptr = swim_check_member(m, 'member:status()')
+    return swim_member_status_strs[tonumber(capi.swim_member_status(ptr))]
+end
+
+local function swim_member_uri(m)
+    local ptr = swim_check_member(m, 'member:uri()')
+    return ffi.string(capi.swim_member_uri(ptr))
+end
+
+local function swim_member_incarnation(m)
+    local ptr = swim_check_member(m, 'member:incarnation()')
+    return capi.swim_member_incarnation(ptr)
+end
+
+local function swim_member_is_dropped(m)
+    local ptr = swim_check_member(m, 'member:is_dropped()')
+    return capi.swim_member_is_dropped(ptr)
+end
+
+local function swim_member_payload_raw(ptr)
+    local int = buffer.scalar.ai
+    local cdata = capi.swim_member_payload(ptr, int)
+    return cdata, int[0]
+end
+
+--
+-- Payload can be bigger than KB, and probably it is undesirable
+-- to copy it into a Lua string or decode MessagePack into a
+-- Lua object. This method is the cheapest way of taking payload.
+--
+local function swim_member_payload_cdata(m)
+    local ptr = swim_check_member(m, 'member:payload_cdata()')
+    return swim_member_payload_raw(ptr)
+end
+
+--
+-- Cdata requires to keep explicit size, besides not all user
+-- methods can be able to work with cdata. This is why it may be
+-- needed to take payload as a string - text or binary.
+--
+local function swim_member_payload_str(m)
+    local ptr = swim_check_member(m, 'member:payload_str()')
+    return ffi.string(swim_member_payload_raw(ptr))
+end
+
+--
+-- Since this is a Lua module, a user is likely to use Lua objects
+-- as a payload - tables, numbers, string etc. And it is natural
+-- to expect that member:payload() should return the same object
+-- which was passed into swim:set_payload() on another instance.
+-- This member method tries to interpret payload as MessagePack,
+-- and if fails, returns the payload as a string.
+--
+local function swim_member_payload(m)
+    local ptr = swim_check_member(m, 'member:payload()')
+    local cdata, size = swim_member_payload_raw(ptr)
+    if size == 0 then
+        return ''
+    end
+    local ok, res = pcall(msgpack.decode, cdata, size)
+    if not ok then
+        return ffi.string(cdata, size)
+    end
+    return res
+end
+
+--
+-- Cdata UUID. It is ok to return cdata, because struct tt_uuid
+-- type has strong support by 'uuid' Lua module with nice
+-- metatable, serialization, string conversions etc.
+--
+local function swim_member_uuid(m)
+    return capi.swim_member_uuid(swim_check_member(m, 'member:uuid()'))
+end
+
+local function swim_member_serialize(m)
+    local _, size = swim_member_payload_raw(m.ptr)
+    return {
+        status = swim_member_status(m),
+        uuid = swim_member_uuid(m),
+        uri = swim_member_uri(m),
+        incarnation = swim_member_incarnation(m),
+        -- There are many ways to interpret a payload, and it is
+        -- not a job of a serialization method. Only binary size
+        -- here is returned to allow a user to detect, whether a
+        -- payload exists.
+        payload_size = size,
+    }
+end
+
+local swim_member_mt = {
+    __index = {
+        status = swim_member_status,
+        uuid = swim_member_uuid,
+        uri = swim_member_uri,
+        incarnation = swim_member_incarnation,
+        payload_cdata = swim_member_payload_cdata,
+        payload_str = swim_member_payload_str,
+        payload = swim_member_payload,
+        is_dropped = swim_member_is_dropped,
+    },
+    __serialize = swim_member_serialize,
+    __newindex = function(m)
+        return error('swim_member is a read-only object')
+    end
+}
+
+--
+-- Wrap a SWIM member into a table with proper metamethods. Also
+-- it is going to be used to cache a decoded payload.
+--
+local function swim_member_wrap(ptr)
+    capi.swim_member_ref(ptr)
+    ffi.gc(ptr, capi.swim_member_unref)
+    return setmetatable({ptr = ptr}, swim_member_mt)
+end
+
 --
 -- When a SWIM instance is deleted or has quited, it can't be used
 -- anymore. This function replaces all methods of a deleted
@@ -339,6 +485,78 @@ local function swim_broadcast(s, port)
     return true
 end
 
+--
+-- Shortcut to get the self member in O(1) not making a lookup
+-- into the member table.
+--
+local function swim_self(s)
+    return swim_member_wrap(capi.swim_self(swim_check_instance(s, 'swim:self')))
+end
+
+--
+-- Find a member by UUID in the local member table.
+--
+local function swim_member_by_uuid(s, uuid)
+    local func_name = 'swim:member_by_uuid'
+    local ptr = swim_check_instance(s, func_name)
+    uuid = swim_check_uuid(uuid, func_name)
+    local m = capi.swim_member_by_uuid(ptr, uuid)
+    if m == nil then
+        return nil
+    end
+    return swim_member_wrap(m)
+end
+
+--
+-- Set raw payload without any preprocessing nor encoding. It can
+-- be anything, not necessary MessagePack.
+--
+local function swim_set_payload_raw(s, payload, payload_size)
+    local func_name = 'swim:set_payload_raw'
+    local ptr = swim_check_instance(s, func_name)
+    if payload_size ~= nil and type(payload_size) ~= 'number' then
+        return error(func_name..': expected number payload size')
+    end
+    if type(payload) == 'cdata' then
+        if not payload_size then
+            return error(func_name..': size is mandatory for cdata payload')
+        end
+        payload = ffi.cast('const char *', payload)
+    elseif type(payload) == 'string' then
+        if not payload_size then
+            payload_size = payload:len()
+        elseif payload_size > payload:len() then
+            return error(func_name..': explicit payload size > string length')
+        end
+    else
+        return error(func_name..': raw payload should be either string or '..
+                     'cdata')
+    end
+    if capi.swim_set_payload(ptr, payload, payload_size) ~= 0 then
+        return nil, box.error.last()
+    end
+    return true
+end
+
+--
+-- Set Lua object as a payload. It is encoded into MessagePack.
+--
+local function swim_set_payload(s, payload)
+    local func_name = 'swim:set_payload'
+    local ptr = swim_check_instance(s, func_name)
+    local payload_size = 0
+    if payload ~= nil then
+        local buf = buffer.IBUF_SHARED
+        buf:reset()
+        payload_size = msgpack.encode(payload, buf)
+        payload = buf.rpos
+    end
+    if capi.swim_set_payload(ptr, payload, payload_size) ~= 0 then
+        return nil, box.error.last()
+    end
+    return true
+end
+
 --
 -- Normal metatable of a configured SWIM instance.
 --
@@ -352,6 +570,10 @@ local swim_mt = {
         add_member = swim_add_member,
         remove_member = swim_remove_member,
         broadcast = swim_broadcast,
+        self = swim_self,
+        member_by_uuid = swim_member_by_uuid,
+        set_payload_raw = swim_set_payload_raw,
+        set_payload = swim_set_payload,
     },
     __serialize = swim_serialize
 }
diff --git a/test/swim/swim.result b/test/swim/swim.result
index 0ab3dafe0..4cf5c7f90 100644
--- a/test/swim/swim.result
+++ b/test/swim/swim.result
@@ -5,6 +5,16 @@ test_run:cmd("push filter '\\.lua.*:[0-9]+: ' to '.lua:<line>: '")
 ---
 - true
 ...
+test_run:cmd("push filter '127.0.0.1:[0-9]+$' to '127.0.0.1:<port>'")
+---
+- true
+...
+msgpack = require('msgpack')
+---
+...
+ffi = require('ffi')
+---
+...
 --
 -- gh-3234: SWIM gossip protocol.
 --
@@ -104,7 +114,7 @@ s:size()
 s.cfg
 ---
 - uuid: 00000000-0000-1000-8000-000000000001
-  uri: 127.0.0.1:0
+  uri: 127.0.0.1:<port>
 ...
 s.cfg.gc_mode = 'off'
 ---
@@ -119,7 +129,7 @@ s.cfg
 ---
 - gc_mode: off
   uuid: 00000000-0000-1000-8000-000000000001
-  uri: 127.0.0.1:0
+  uri: 127.0.0.1:<port>
 ...
 s.cfg.gc_mode
 ---
@@ -296,6 +306,18 @@ s1:cfg({uuid = uuid(3)})
 ---
 - true
 ...
+s1:self():uuid()
+---
+- 00000000-0000-1000-8000-000000000003
+...
+s1:member_by_uuid(uuid(1))
+---
+- uri: 127.0.0.1:<port>
+  status: left
+  incarnation: 1
+  uuid: 00000000-0000-1000-8000-000000000001
+  payload_size: 0
+...
 -- Can't remove self.
 s1:remove_member(uuid(3))
 ---
@@ -366,6 +388,346 @@ s1:delete()
 s2:delete()
 ---
 ...
+--
+-- Member API.
+--
+s1 = swim.new({uuid = uuid(1), uri = uri()})
+---
+...
+s = s1:self()
+---
+...
+s
+---
+- uri: 127.0.0.1:<port>
+  status: alive
+  incarnation: 1
+  uuid: 00000000-0000-1000-8000-000000000001
+  payload_size: 0
+...
+s:status()
+---
+- alive
+...
+s:uuid()
+---
+- 00000000-0000-1000-8000-000000000001
+...
+s:uri()
+---
+- 127.0.0.1:<port>
+...
+s:incarnation()
+---
+- 1
+...
+s:payload_cdata()
+---
+- 'cdata<const char *>: NULL'
+- 0
+...
+s:payload_str()
+---
+- 
+...
+s:payload()
+---
+- 
+...
+s:is_dropped()
+---
+- false
+...
+s.unknown_index
+---
+- null
+...
+s.status()
+---
+- error: 'builtin/swim.lua:<line>: member:status(): first argument is not a SWIM member'
+...
+s.uuid()
+---
+- error: 'builtin/swim.lua:<line>: member:uuid(): first argument is not a SWIM member'
+...
+s.uri()
+---
+- error: 'builtin/swim.lua:<line>: member:uri(): first argument is not a SWIM member'
+...
+s.incarnation()
+---
+- error: 'builtin/swim.lua:<line>: member:incarnation(): first argument is not a SWIM
+    member'
+...
+s.payload_cdata()
+---
+- error: 'builtin/swim.lua:<line>: member:payload_cdata(): first argument is not a SWIM
+    member'
+...
+s.payload_str()
+---
+- error: 'builtin/swim.lua:<line>: member:payload_str(): first argument is not a SWIM
+    member'
+...
+s.payload()
+---
+- error: 'builtin/swim.lua:<line>: member:payload(): first argument is not a SWIM member'
+...
+s.is_dropped()
+---
+- error: 'builtin/swim.lua:<line>: member:is_dropped(): first argument is not a SWIM
+    member'
+...
+s1:member_by_uuid(uuid(1)) ~= nil
+---
+- true
+...
+s1:member_by_uuid(50)
+---
+- error: 'builtin/swim.lua:<line>: swim:member_by_uuid: expected string UUID'
+...
+s1:member_by_uuid(uuid(2))
+---
+- null
+...
+s1:quit()
+---
+...
+s:status()
+---
+- left
+...
+s:is_dropped()
+---
+- true
+...
+--
+-- Payload.
+--
+s = swim.new({uuid = uuid(1), uri = uri()})
+---
+...
+s.set_payload()
+---
+- error: 'builtin/swim.lua:<line>: swim:set_payload: first argument is not a SWIM instance'
+...
+s.set_payload_raw()
+---
+- error: 'builtin/swim.lua:<line>: swim:set_payload_raw: first argument is not a SWIM
+    instance'
+...
+self = s:self()
+---
+...
+s:set_payload()
+---
+- true
+...
+self:payload()
+---
+- 
+...
+s:set_payload({a = 100})
+---
+- true
+...
+self:payload()
+---
+- {'a': 100}
+...
+s:set_payload(100)
+---
+- true
+...
+self:payload()
+---
+- 100
+...
+s:set_payload(false)
+---
+- true
+...
+self:payload()
+---
+- false
+...
+p = self:payload_str()
+---
+...
+p
+---
+- !!binary wg==
+...
+(msgpack.decode(p))
+---
+- false
+...
+p, size = self:payload_cdata()
+---
+...
+type(p)
+---
+- cdata
+...
+size
+---
+- 1
+...
+(msgpack.decode(p, size))
+---
+- false
+...
+s:set_payload(string.rep('a', 1500))
+---
+- null
+- Payload should be <= 1200 and >= 0
+...
+self:payload()
+---
+- false
+...
+s:set_payload()
+---
+- true
+...
+self:payload()
+---
+- 
+...
+-- Raw payload setting can be used when MessagePack is not needed,
+-- or already encoded.
+s:set_payload_raw(nil, '123')
+---
+- error: 'swim:set_payload_raw: expected number payload size'
+...
+s:set_payload_raw('123', -1)
+---
+- null
+- Payload should be <= 1200 and >= 0
+...
+size = 10
+---
+...
+cdata = ffi.new('int[?]', size)
+---
+...
+for i = 0, size - 1 do cdata[i] = i end
+---
+...
+bsize = ffi.sizeof('int') * size
+---
+...
+s:set_payload_raw(cdata)
+---
+- error: 'swim:set_payload_raw: size is mandatory for cdata payload'
+...
+s:set_payload_raw('str', 4)
+---
+- error: 'swim:set_payload_raw: explicit payload size > string length'
+...
+s:set_payload_raw(true)
+---
+- error: 'swim:set_payload_raw: raw payload should be either string or cdata'
+...
+s:set_payload_raw(cdata, bsize)
+---
+- true
+...
+self:payload_str():len() == bsize
+---
+- true
+...
+self_cdata, self_bsize = self:payload_cdata()
+---
+...
+self_bsize == bsize
+---
+- true
+...
+self_cdata = ffi.cast('int *', self_cdata)
+---
+...
+for i = 0, size - 1 do assert(self_cdata[i] == cdata[i]) end
+---
+...
+s:set_payload_raw('raw str')
+---
+- true
+...
+self:payload_str()
+---
+- raw str
+...
+s:set_payload_raw('raw str', 3)
+---
+- true
+...
+self:payload_str()
+---
+- raw
+...
+s:delete()
+---
+...
+self:is_dropped()
+---
+- true
+...
+--
+-- Check payload dissemination.
+--
+s1 = swim.new({uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01})
+---
+...
+s2 = swim.new({uuid = uuid(2), uri = listen_port, heartbeat_rate = 0.01})
+---
+...
+s1:add_member({uuid = uuid(2), uri = listen_port})
+---
+- true
+...
+while s2:size() ~= 2 do fiber.sleep(0.01) end
+---
+...
+s1_view = s2:member_by_uuid(uuid(1))
+---
+...
+s1_view:payload()
+---
+- 
+...
+s1_view:incarnation()
+---
+- 1
+...
+s1:set_payload('payload')
+---
+- true
+...
+while s1_view:payload() ~= 'payload' do fiber.sleep(0.01) end
+---
+...
+s1_view:incarnation()
+---
+- 2
+...
+s1:set_payload('payload2')
+---
+- true
+...
+while s1_view:payload() ~= 'payload2' do fiber.sleep(0.01) end
+---
+...
+s1_view:incarnation()
+---
+- 3
+...
+s1:delete()
+---
+...
+s2:delete()
+---
+...
 test_run:cmd("clear filter")
 ---
 - true
diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua
index 1e55a828a..8e7b426fe 100644
--- a/test/swim/swim.test.lua
+++ b/test/swim/swim.test.lua
@@ -1,5 +1,8 @@
 test_run = require('test_run').new()
 test_run:cmd("push filter '\\.lua.*:[0-9]+: ' to '.lua:<line>: '")
+test_run:cmd("push filter '127.0.0.1:[0-9]+$' to '127.0.0.1:<port>'")
+msgpack = require('msgpack')
+ffi = require('ffi')
 --
 -- gh-3234: SWIM gossip protocol.
 --
@@ -99,6 +102,8 @@ s1:add_member({uri = listen_uri, uuid = uuid(2)})
 s1:size()
 
 s1:cfg({uuid = uuid(3)})
+s1:self():uuid()
+s1:member_by_uuid(uuid(1))
 -- Can't remove self.
 s1:remove_member(uuid(3))
 -- Not existing.
@@ -125,4 +130,118 @@ s2:size()
 s1:delete()
 s2:delete()
 
+--
+-- Member API.
+--
+s1 = swim.new({uuid = uuid(1), uri = uri()})
+s = s1:self()
+s
+s:status()
+s:uuid()
+s:uri()
+s:incarnation()
+s:payload_cdata()
+s:payload_str()
+s:payload()
+s:is_dropped()
+s.unknown_index
+
+s.status()
+s.uuid()
+s.uri()
+s.incarnation()
+s.payload_cdata()
+s.payload_str()
+s.payload()
+s.is_dropped()
+
+s1:member_by_uuid(uuid(1)) ~= nil
+s1:member_by_uuid(50)
+s1:member_by_uuid(uuid(2))
+
+s1:quit()
+s:status()
+s:is_dropped()
+
+--
+-- Payload.
+--
+s = swim.new({uuid = uuid(1), uri = uri()})
+s.set_payload()
+s.set_payload_raw()
+
+self = s:self()
+s:set_payload()
+self:payload()
+s:set_payload({a = 100})
+self:payload()
+s:set_payload(100)
+self:payload()
+s:set_payload(false)
+self:payload()
+
+p = self:payload_str()
+p
+(msgpack.decode(p))
+
+p, size = self:payload_cdata()
+type(p)
+size
+(msgpack.decode(p, size))
+
+s:set_payload(string.rep('a', 1500))
+self:payload()
+s:set_payload()
+self:payload()
+
+-- Raw payload setting can be used when MessagePack is not needed,
+-- or already encoded.
+s:set_payload_raw(nil, '123')
+s:set_payload_raw('123', -1)
+
+size = 10
+cdata = ffi.new('int[?]', size)
+for i = 0, size - 1 do cdata[i] = i end
+bsize = ffi.sizeof('int') * size
+s:set_payload_raw(cdata)
+s:set_payload_raw('str', 4)
+s:set_payload_raw(true)
+
+s:set_payload_raw(cdata, bsize)
+self:payload_str():len() == bsize
+self_cdata, self_bsize = self:payload_cdata()
+self_bsize == bsize
+self_cdata = ffi.cast('int *', self_cdata)
+for i = 0, size - 1 do assert(self_cdata[i] == cdata[i]) end
+
+s:set_payload_raw('raw str')
+self:payload_str()
+s:set_payload_raw('raw str', 3)
+self:payload_str()
+
+s:delete()
+self:is_dropped()
+
+--
+-- Check payload dissemination.
+--
+s1 = swim.new({uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01})
+s2 = swim.new({uuid = uuid(2), uri = listen_port, heartbeat_rate = 0.01})
+s1:add_member({uuid = uuid(2), uri = listen_port})
+while s2:size() ~= 2 do fiber.sleep(0.01) end
+s1_view = s2:member_by_uuid(uuid(1))
+s1_view:payload()
+s1_view:incarnation()
+
+s1:set_payload('payload')
+while s1_view:payload() ~= 'payload' do fiber.sleep(0.01) end
+s1_view:incarnation()
+
+s1:set_payload('payload2')
+while s1_view:payload() ~= 'payload2' do fiber.sleep(0.01) end
+s1_view:incarnation()
+
+s1:delete()
+s2:delete()
+
 test_run:cmd("clear filter")
-- 
2.20.1 (Apple Git-117)





More information about the Tarantool-patches mailing list