[tarantool-patches] [PATCH 06/10] swim: Lua bindings to access individual members
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Wed May 15 22:36:43 MSK 2019
Expose API to search members by UUID, to read their attributes,
to set payload.
Part of #3234
---
src/lua/swim.lua | 222 ++++++++++++++++++++++++
test/swim/swim.result | 366 +++++++++++++++++++++++++++++++++++++++-
test/swim/swim.test.lua | 119 +++++++++++++
3 files changed, 705 insertions(+), 2 deletions(-)
diff --git a/src/lua/swim.lua b/src/lua/swim.lua
index c828faceb..e40edda18 100644
--- a/src/lua/swim.lua
+++ b/src/lua/swim.lua
@@ -1,5 +1,7 @@
local ffi = require('ffi')
local uuid = require('uuid')
+local buffer = require('buffer')
+local msgpack = require('msgpack')
ffi.cdef[[
struct swim;
@@ -100,6 +102,14 @@ ffi.cdef[[
local capi = ffi.C
local swim_t = ffi.typeof('struct swim *')
+local swim_member_t = ffi.typeof('struct swim_member *')
+
+local swim_member_status_strs = {
+ [capi.MEMBER_ALIVE] = 'alive',
+ [capi.MEMBER_SUSPECTED] = 'suspected',
+ [capi.MEMBER_DEAD] = 'dead',
+ [capi.MEMBER_LEFT] = 'left'
+}
--
-- Check if @a value is something that can be passed as a
@@ -212,6 +222,142 @@ local function swim_check_instance(s, func_name)
return error(func_name..': first argument is not a SWIM instance')
end
+--
+-- The same for SWIM member.
+--
+local function swim_check_member(m, func_name)
+ if type(m) == 'table' then
+ local ptr = m.ptr
+ if ffi.istype(swim_member_t, ptr) then
+ return ptr
+ end
+ end
+ return error(func_name..': first argument is not a SWIM member')
+end
+
+--
+-- Member methods. Most of them are one-liners, not much to
+-- comment.
+--
+
+local function swim_member_status(m)
+ local ptr = swim_check_member(m, 'member:status()')
+ return swim_member_status_strs[tonumber(capi.swim_member_status(ptr))]
+end
+
+local function swim_member_uri(m)
+ local ptr = swim_check_member(m, 'member:uri()')
+ return ffi.string(capi.swim_member_uri(ptr))
+end
+
+local function swim_member_incarnation(m)
+ local ptr = swim_check_member(m, 'member:incarnation()')
+ return capi.swim_member_incarnation(ptr)
+end
+
+local function swim_member_is_dropped(m)
+ local ptr = swim_check_member(m, 'member:is_dropped()')
+ return capi.swim_member_is_dropped(ptr)
+end
+
+local function swim_member_payload_raw(ptr)
+ local int = buffer.scalar.ai
+ local cdata = capi.swim_member_payload(ptr, int)
+ return cdata, int[0]
+end
+
+--
+-- Payload can be bigger than KB, and probably it is undesirable
+-- to copy it into a Lua string or decode MessagePack into a
+-- Lua object. This method is the cheapest way of taking payload.
+--
+local function swim_member_payload_cdata(m)
+ local ptr = swim_check_member(m, 'member:payload_cdata()')
+ return swim_member_payload_raw(ptr)
+end
+
+--
+-- Cdata requires to keep explicit size, besides not all user
+-- methods can be able to work with cdata. This is why it may be
+-- needed to take payload as a string - text or binary.
+--
+local function swim_member_payload_str(m)
+ local ptr = swim_check_member(m, 'member:payload_str()')
+ return ffi.string(swim_member_payload_raw(ptr))
+end
+
+--
+-- Since this is a Lua module, a user is likely to use Lua objects
+-- as a payload - tables, numbers, string etc. And it is natural
+-- to expect that member:payload() should return the same object
+-- which was passed into swim:set_payload() on another instance.
+-- This member method tries to interpret payload as MessagePack,
+-- and if fails, returns the payload as a string.
+--
+local function swim_member_payload(m)
+ local ptr = swim_check_member(m, 'member:payload()')
+ local cdata, size = swim_member_payload_raw(ptr)
+ if size == 0 then
+ return ''
+ end
+ local ok, res = pcall(msgpack.decode, cdata, size)
+ if not ok then
+ return ffi.string(cdata, size)
+ end
+ return res
+end
+
+--
+-- Cdata UUID. It is ok to return cdata, because struct tt_uuid
+-- type has strong support by 'uuid' Lua module with nice
+-- metatable, serialization, string conversions etc.
+--
+local function swim_member_uuid(m)
+ return capi.swim_member_uuid(swim_check_member(m, 'member:uuid()'))
+end
+
+local function swim_member_serialize(m)
+ local _, size = swim_member_payload_raw(m.ptr)
+ return {
+ status = swim_member_status(m),
+ uuid = swim_member_uuid(m),
+ uri = swim_member_uri(m),
+ incarnation = swim_member_incarnation(m),
+ -- There are many ways to interpret a payload, and it is
+ -- not a job of a serialization method. Only binary size
+ -- here is returned to allow a user to detect, whether a
+ -- payload exists.
+ payload_size = size,
+ }
+end
+
+local swim_member_mt = {
+ __index = {
+ status = swim_member_status,
+ uuid = swim_member_uuid,
+ uri = swim_member_uri,
+ incarnation = swim_member_incarnation,
+ payload_cdata = swim_member_payload_cdata,
+ payload_str = swim_member_payload_str,
+ payload = swim_member_payload,
+ is_dropped = swim_member_is_dropped,
+ },
+ __serialize = swim_member_serialize,
+ __newindex = function(m)
+ return error('swim_member is a read-only object')
+ end
+}
+
+--
+-- Wrap a SWIM member into a table with proper metamethods. Also
+-- it is going to be used to cache a decoded payload.
+--
+local function swim_member_wrap(ptr)
+ capi.swim_member_ref(ptr)
+ ffi.gc(ptr, capi.swim_member_unref)
+ return setmetatable({ptr = ptr}, swim_member_mt)
+end
+
--
-- When a SWIM instance is deleted or has quited, it can't be used
-- anymore. This function replaces all methods of a deleted
@@ -339,6 +485,78 @@ local function swim_broadcast(s, port)
return true
end
+--
+-- Shortcut to get the self member in O(1) not making a lookup
+-- into the member table.
+--
+local function swim_self(s)
+ return swim_member_wrap(capi.swim_self(swim_check_instance(s, 'swim:self')))
+end
+
+--
+-- Find a member by UUID in the local member table.
+--
+local function swim_member_by_uuid(s, uuid)
+ local func_name = 'swim:member_by_uuid'
+ local ptr = swim_check_instance(s, func_name)
+ uuid = swim_check_uuid(uuid, func_name)
+ local m = capi.swim_member_by_uuid(ptr, uuid)
+ if m == nil then
+ return nil
+ end
+ return swim_member_wrap(m)
+end
+
+--
+-- Set raw payload without any preprocessing nor encoding. It can
+-- be anything, not necessary MessagePack.
+--
+local function swim_set_payload_raw(s, payload, payload_size)
+ local func_name = 'swim:set_payload_raw'
+ local ptr = swim_check_instance(s, func_name)
+ if payload_size ~= nil and type(payload_size) ~= 'number' then
+ return error(func_name..': expected number payload size')
+ end
+ if type(payload) == 'cdata' then
+ if not payload_size then
+ return error(func_name..': size is mandatory for cdata payload')
+ end
+ payload = ffi.cast('const char *', payload)
+ elseif type(payload) == 'string' then
+ if not payload_size then
+ payload_size = payload:len()
+ elseif payload_size > payload:len() then
+ return error(func_name..': explicit payload size > string length')
+ end
+ else
+ return error(func_name..': raw payload should be either string or '..
+ 'cdata')
+ end
+ if capi.swim_set_payload(ptr, payload, payload_size) ~= 0 then
+ return nil, box.error.last()
+ end
+ return true
+end
+
+--
+-- Set Lua object as a payload. It is encoded into MessagePack.
+--
+local function swim_set_payload(s, payload)
+ local func_name = 'swim:set_payload'
+ local ptr = swim_check_instance(s, func_name)
+ local payload_size = 0
+ if payload ~= nil then
+ local buf = buffer.IBUF_SHARED
+ buf:reset()
+ payload_size = msgpack.encode(payload, buf)
+ payload = buf.rpos
+ end
+ if capi.swim_set_payload(ptr, payload, payload_size) ~= 0 then
+ return nil, box.error.last()
+ end
+ return true
+end
+
--
-- Normal metatable of a configured SWIM instance.
--
@@ -352,6 +570,10 @@ local swim_mt = {
add_member = swim_add_member,
remove_member = swim_remove_member,
broadcast = swim_broadcast,
+ self = swim_self,
+ member_by_uuid = swim_member_by_uuid,
+ set_payload_raw = swim_set_payload_raw,
+ set_payload = swim_set_payload,
},
__serialize = swim_serialize
}
diff --git a/test/swim/swim.result b/test/swim/swim.result
index 0ab3dafe0..4cf5c7f90 100644
--- a/test/swim/swim.result
+++ b/test/swim/swim.result
@@ -5,6 +5,16 @@ test_run:cmd("push filter '\\.lua.*:[0-9]+: ' to '.lua:<line>: '")
---
- true
...
+test_run:cmd("push filter '127.0.0.1:[0-9]+$' to '127.0.0.1:<port>'")
+---
+- true
+...
+msgpack = require('msgpack')
+---
+...
+ffi = require('ffi')
+---
+...
--
-- gh-3234: SWIM gossip protocol.
--
@@ -104,7 +114,7 @@ s:size()
s.cfg
---
- uuid: 00000000-0000-1000-8000-000000000001
- uri: 127.0.0.1:0
+ uri: 127.0.0.1:<port>
...
s.cfg.gc_mode = 'off'
---
@@ -119,7 +129,7 @@ s.cfg
---
- gc_mode: off
uuid: 00000000-0000-1000-8000-000000000001
- uri: 127.0.0.1:0
+ uri: 127.0.0.1:<port>
...
s.cfg.gc_mode
---
@@ -296,6 +306,18 @@ s1:cfg({uuid = uuid(3)})
---
- true
...
+s1:self():uuid()
+---
+- 00000000-0000-1000-8000-000000000003
+...
+s1:member_by_uuid(uuid(1))
+---
+- uri: 127.0.0.1:<port>
+ status: left
+ incarnation: 1
+ uuid: 00000000-0000-1000-8000-000000000001
+ payload_size: 0
+...
-- Can't remove self.
s1:remove_member(uuid(3))
---
@@ -366,6 +388,346 @@ s1:delete()
s2:delete()
---
...
+--
+-- Member API.
+--
+s1 = swim.new({uuid = uuid(1), uri = uri()})
+---
+...
+s = s1:self()
+---
+...
+s
+---
+- uri: 127.0.0.1:<port>
+ status: alive
+ incarnation: 1
+ uuid: 00000000-0000-1000-8000-000000000001
+ payload_size: 0
+...
+s:status()
+---
+- alive
+...
+s:uuid()
+---
+- 00000000-0000-1000-8000-000000000001
+...
+s:uri()
+---
+- 127.0.0.1:<port>
+...
+s:incarnation()
+---
+- 1
+...
+s:payload_cdata()
+---
+- 'cdata<const char *>: NULL'
+- 0
+...
+s:payload_str()
+---
+-
+...
+s:payload()
+---
+-
+...
+s:is_dropped()
+---
+- false
+...
+s.unknown_index
+---
+- null
+...
+s.status()
+---
+- error: 'builtin/swim.lua:<line>: member:status(): first argument is not a SWIM member'
+...
+s.uuid()
+---
+- error: 'builtin/swim.lua:<line>: member:uuid(): first argument is not a SWIM member'
+...
+s.uri()
+---
+- error: 'builtin/swim.lua:<line>: member:uri(): first argument is not a SWIM member'
+...
+s.incarnation()
+---
+- error: 'builtin/swim.lua:<line>: member:incarnation(): first argument is not a SWIM
+ member'
+...
+s.payload_cdata()
+---
+- error: 'builtin/swim.lua:<line>: member:payload_cdata(): first argument is not a SWIM
+ member'
+...
+s.payload_str()
+---
+- error: 'builtin/swim.lua:<line>: member:payload_str(): first argument is not a SWIM
+ member'
+...
+s.payload()
+---
+- error: 'builtin/swim.lua:<line>: member:payload(): first argument is not a SWIM member'
+...
+s.is_dropped()
+---
+- error: 'builtin/swim.lua:<line>: member:is_dropped(): first argument is not a SWIM
+ member'
+...
+s1:member_by_uuid(uuid(1)) ~= nil
+---
+- true
+...
+s1:member_by_uuid(50)
+---
+- error: 'builtin/swim.lua:<line>: swim:member_by_uuid: expected string UUID'
+...
+s1:member_by_uuid(uuid(2))
+---
+- null
+...
+s1:quit()
+---
+...
+s:status()
+---
+- left
+...
+s:is_dropped()
+---
+- true
+...
+--
+-- Payload.
+--
+s = swim.new({uuid = uuid(1), uri = uri()})
+---
+...
+s.set_payload()
+---
+- error: 'builtin/swim.lua:<line>: swim:set_payload: first argument is not a SWIM instance'
+...
+s.set_payload_raw()
+---
+- error: 'builtin/swim.lua:<line>: swim:set_payload_raw: first argument is not a SWIM
+ instance'
+...
+self = s:self()
+---
+...
+s:set_payload()
+---
+- true
+...
+self:payload()
+---
+-
+...
+s:set_payload({a = 100})
+---
+- true
+...
+self:payload()
+---
+- {'a': 100}
+...
+s:set_payload(100)
+---
+- true
+...
+self:payload()
+---
+- 100
+...
+s:set_payload(false)
+---
+- true
+...
+self:payload()
+---
+- false
+...
+p = self:payload_str()
+---
+...
+p
+---
+- !!binary wg==
+...
+(msgpack.decode(p))
+---
+- false
+...
+p, size = self:payload_cdata()
+---
+...
+type(p)
+---
+- cdata
+...
+size
+---
+- 1
+...
+(msgpack.decode(p, size))
+---
+- false
+...
+s:set_payload(string.rep('a', 1500))
+---
+- null
+- Payload should be <= 1200 and >= 0
+...
+self:payload()
+---
+- false
+...
+s:set_payload()
+---
+- true
+...
+self:payload()
+---
+-
+...
+-- Raw payload setting can be used when MessagePack is not needed,
+-- or already encoded.
+s:set_payload_raw(nil, '123')
+---
+- error: 'swim:set_payload_raw: expected number payload size'
+...
+s:set_payload_raw('123', -1)
+---
+- null
+- Payload should be <= 1200 and >= 0
+...
+size = 10
+---
+...
+cdata = ffi.new('int[?]', size)
+---
+...
+for i = 0, size - 1 do cdata[i] = i end
+---
+...
+bsize = ffi.sizeof('int') * size
+---
+...
+s:set_payload_raw(cdata)
+---
+- error: 'swim:set_payload_raw: size is mandatory for cdata payload'
+...
+s:set_payload_raw('str', 4)
+---
+- error: 'swim:set_payload_raw: explicit payload size > string length'
+...
+s:set_payload_raw(true)
+---
+- error: 'swim:set_payload_raw: raw payload should be either string or cdata'
+...
+s:set_payload_raw(cdata, bsize)
+---
+- true
+...
+self:payload_str():len() == bsize
+---
+- true
+...
+self_cdata, self_bsize = self:payload_cdata()
+---
+...
+self_bsize == bsize
+---
+- true
+...
+self_cdata = ffi.cast('int *', self_cdata)
+---
+...
+for i = 0, size - 1 do assert(self_cdata[i] == cdata[i]) end
+---
+...
+s:set_payload_raw('raw str')
+---
+- true
+...
+self:payload_str()
+---
+- raw str
+...
+s:set_payload_raw('raw str', 3)
+---
+- true
+...
+self:payload_str()
+---
+- raw
+...
+s:delete()
+---
+...
+self:is_dropped()
+---
+- true
+...
+--
+-- Check payload dissemination.
+--
+s1 = swim.new({uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01})
+---
+...
+s2 = swim.new({uuid = uuid(2), uri = listen_port, heartbeat_rate = 0.01})
+---
+...
+s1:add_member({uuid = uuid(2), uri = listen_port})
+---
+- true
+...
+while s2:size() ~= 2 do fiber.sleep(0.01) end
+---
+...
+s1_view = s2:member_by_uuid(uuid(1))
+---
+...
+s1_view:payload()
+---
+-
+...
+s1_view:incarnation()
+---
+- 1
+...
+s1:set_payload('payload')
+---
+- true
+...
+while s1_view:payload() ~= 'payload' do fiber.sleep(0.01) end
+---
+...
+s1_view:incarnation()
+---
+- 2
+...
+s1:set_payload('payload2')
+---
+- true
+...
+while s1_view:payload() ~= 'payload2' do fiber.sleep(0.01) end
+---
+...
+s1_view:incarnation()
+---
+- 3
+...
+s1:delete()
+---
+...
+s2:delete()
+---
+...
test_run:cmd("clear filter")
---
- true
diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua
index 1e55a828a..8e7b426fe 100644
--- a/test/swim/swim.test.lua
+++ b/test/swim/swim.test.lua
@@ -1,5 +1,8 @@
test_run = require('test_run').new()
test_run:cmd("push filter '\\.lua.*:[0-9]+: ' to '.lua:<line>: '")
+test_run:cmd("push filter '127.0.0.1:[0-9]+$' to '127.0.0.1:<port>'")
+msgpack = require('msgpack')
+ffi = require('ffi')
--
-- gh-3234: SWIM gossip protocol.
--
@@ -99,6 +102,8 @@ s1:add_member({uri = listen_uri, uuid = uuid(2)})
s1:size()
s1:cfg({uuid = uuid(3)})
+s1:self():uuid()
+s1:member_by_uuid(uuid(1))
-- Can't remove self.
s1:remove_member(uuid(3))
-- Not existing.
@@ -125,4 +130,118 @@ s2:size()
s1:delete()
s2:delete()
+--
+-- Member API.
+--
+s1 = swim.new({uuid = uuid(1), uri = uri()})
+s = s1:self()
+s
+s:status()
+s:uuid()
+s:uri()
+s:incarnation()
+s:payload_cdata()
+s:payload_str()
+s:payload()
+s:is_dropped()
+s.unknown_index
+
+s.status()
+s.uuid()
+s.uri()
+s.incarnation()
+s.payload_cdata()
+s.payload_str()
+s.payload()
+s.is_dropped()
+
+s1:member_by_uuid(uuid(1)) ~= nil
+s1:member_by_uuid(50)
+s1:member_by_uuid(uuid(2))
+
+s1:quit()
+s:status()
+s:is_dropped()
+
+--
+-- Payload.
+--
+s = swim.new({uuid = uuid(1), uri = uri()})
+s.set_payload()
+s.set_payload_raw()
+
+self = s:self()
+s:set_payload()
+self:payload()
+s:set_payload({a = 100})
+self:payload()
+s:set_payload(100)
+self:payload()
+s:set_payload(false)
+self:payload()
+
+p = self:payload_str()
+p
+(msgpack.decode(p))
+
+p, size = self:payload_cdata()
+type(p)
+size
+(msgpack.decode(p, size))
+
+s:set_payload(string.rep('a', 1500))
+self:payload()
+s:set_payload()
+self:payload()
+
+-- Raw payload setting can be used when MessagePack is not needed,
+-- or already encoded.
+s:set_payload_raw(nil, '123')
+s:set_payload_raw('123', -1)
+
+size = 10
+cdata = ffi.new('int[?]', size)
+for i = 0, size - 1 do cdata[i] = i end
+bsize = ffi.sizeof('int') * size
+s:set_payload_raw(cdata)
+s:set_payload_raw('str', 4)
+s:set_payload_raw(true)
+
+s:set_payload_raw(cdata, bsize)
+self:payload_str():len() == bsize
+self_cdata, self_bsize = self:payload_cdata()
+self_bsize == bsize
+self_cdata = ffi.cast('int *', self_cdata)
+for i = 0, size - 1 do assert(self_cdata[i] == cdata[i]) end
+
+s:set_payload_raw('raw str')
+self:payload_str()
+s:set_payload_raw('raw str', 3)
+self:payload_str()
+
+s:delete()
+self:is_dropped()
+
+--
+-- Check payload dissemination.
+--
+s1 = swim.new({uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01})
+s2 = swim.new({uuid = uuid(2), uri = listen_port, heartbeat_rate = 0.01})
+s1:add_member({uuid = uuid(2), uri = listen_port})
+while s2:size() ~= 2 do fiber.sleep(0.01) end
+s1_view = s2:member_by_uuid(uuid(1))
+s1_view:payload()
+s1_view:incarnation()
+
+s1:set_payload('payload')
+while s1_view:payload() ~= 'payload' do fiber.sleep(0.01) end
+s1_view:incarnation()
+
+s1:set_payload('payload2')
+while s1_view:payload() ~= 'payload2' do fiber.sleep(0.01) end
+s1_view:incarnation()
+
+s1:delete()
+s2:delete()
+
test_run:cmd("clear filter")
--
2.20.1 (Apple Git-117)
More information about the Tarantool-patches
mailing list