From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 2EFD82EBC3 for ; Wed, 15 May 2019 15:36:53 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id teZHbAXglXan for ; Wed, 15 May 2019 15:36:53 -0400 (EDT) Received: from smtpng1.m.smailru.net (smtpng1.m.smailru.net [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id BBBAD2EC6F for ; Wed, 15 May 2019 15:36:52 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH 06/10] swim: Lua bindings to access individual members Date: Wed, 15 May 2019 22:36:43 +0300 Message-Id: <69fdcfe81ca73d424e6dd528dcef8d0b4d857662.1557948687.git.v.shpilevoy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org Expose API to search members by UUID, to read their attributes, to set payload. Part of #3234 --- src/lua/swim.lua | 222 ++++++++++++++++++++++++ test/swim/swim.result | 366 +++++++++++++++++++++++++++++++++++++++- test/swim/swim.test.lua | 119 +++++++++++++ 3 files changed, 705 insertions(+), 2 deletions(-) diff --git a/src/lua/swim.lua b/src/lua/swim.lua index c828faceb..e40edda18 100644 --- a/src/lua/swim.lua +++ b/src/lua/swim.lua @@ -1,5 +1,7 @@ local ffi = require('ffi') local uuid = require('uuid') +local buffer = require('buffer') +local msgpack = require('msgpack') ffi.cdef[[ struct swim; @@ -100,6 +102,14 @@ ffi.cdef[[ local capi = ffi.C local swim_t = ffi.typeof('struct swim *') +local swim_member_t = ffi.typeof('struct swim_member *') + +local swim_member_status_strs = { + [capi.MEMBER_ALIVE] = 'alive', + [capi.MEMBER_SUSPECTED] = 'suspected', + [capi.MEMBER_DEAD] = 'dead', + [capi.MEMBER_LEFT] = 'left' +} -- -- Check if @a value is something that can be passed as a @@ -212,6 +222,142 @@ local function swim_check_instance(s, func_name) return error(func_name..': first argument is not a SWIM instance') end +-- +-- The same for SWIM member. +-- +local function swim_check_member(m, func_name) + if type(m) == 'table' then + local ptr = m.ptr + if ffi.istype(swim_member_t, ptr) then + return ptr + end + end + return error(func_name..': first argument is not a SWIM member') +end + +-- +-- Member methods. Most of them are one-liners, not much to +-- comment. +-- + +local function swim_member_status(m) + local ptr = swim_check_member(m, 'member:status()') + return swim_member_status_strs[tonumber(capi.swim_member_status(ptr))] +end + +local function swim_member_uri(m) + local ptr = swim_check_member(m, 'member:uri()') + return ffi.string(capi.swim_member_uri(ptr)) +end + +local function swim_member_incarnation(m) + local ptr = swim_check_member(m, 'member:incarnation()') + return capi.swim_member_incarnation(ptr) +end + +local function swim_member_is_dropped(m) + local ptr = swim_check_member(m, 'member:is_dropped()') + return capi.swim_member_is_dropped(ptr) +end + +local function swim_member_payload_raw(ptr) + local int = buffer.scalar.ai + local cdata = capi.swim_member_payload(ptr, int) + return cdata, int[0] +end + +-- +-- Payload can be bigger than KB, and probably it is undesirable +-- to copy it into a Lua string or decode MessagePack into a +-- Lua object. This method is the cheapest way of taking payload. +-- +local function swim_member_payload_cdata(m) + local ptr = swim_check_member(m, 'member:payload_cdata()') + return swim_member_payload_raw(ptr) +end + +-- +-- Cdata requires to keep explicit size, besides not all user +-- methods can be able to work with cdata. This is why it may be +-- needed to take payload as a string - text or binary. +-- +local function swim_member_payload_str(m) + local ptr = swim_check_member(m, 'member:payload_str()') + return ffi.string(swim_member_payload_raw(ptr)) +end + +-- +-- Since this is a Lua module, a user is likely to use Lua objects +-- as a payload - tables, numbers, string etc. And it is natural +-- to expect that member:payload() should return the same object +-- which was passed into swim:set_payload() on another instance. +-- This member method tries to interpret payload as MessagePack, +-- and if fails, returns the payload as a string. +-- +local function swim_member_payload(m) + local ptr = swim_check_member(m, 'member:payload()') + local cdata, size = swim_member_payload_raw(ptr) + if size == 0 then + return '' + end + local ok, res = pcall(msgpack.decode, cdata, size) + if not ok then + return ffi.string(cdata, size) + end + return res +end + +-- +-- Cdata UUID. It is ok to return cdata, because struct tt_uuid +-- type has strong support by 'uuid' Lua module with nice +-- metatable, serialization, string conversions etc. +-- +local function swim_member_uuid(m) + return capi.swim_member_uuid(swim_check_member(m, 'member:uuid()')) +end + +local function swim_member_serialize(m) + local _, size = swim_member_payload_raw(m.ptr) + return { + status = swim_member_status(m), + uuid = swim_member_uuid(m), + uri = swim_member_uri(m), + incarnation = swim_member_incarnation(m), + -- There are many ways to interpret a payload, and it is + -- not a job of a serialization method. Only binary size + -- here is returned to allow a user to detect, whether a + -- payload exists. + payload_size = size, + } +end + +local swim_member_mt = { + __index = { + status = swim_member_status, + uuid = swim_member_uuid, + uri = swim_member_uri, + incarnation = swim_member_incarnation, + payload_cdata = swim_member_payload_cdata, + payload_str = swim_member_payload_str, + payload = swim_member_payload, + is_dropped = swim_member_is_dropped, + }, + __serialize = swim_member_serialize, + __newindex = function(m) + return error('swim_member is a read-only object') + end +} + +-- +-- Wrap a SWIM member into a table with proper metamethods. Also +-- it is going to be used to cache a decoded payload. +-- +local function swim_member_wrap(ptr) + capi.swim_member_ref(ptr) + ffi.gc(ptr, capi.swim_member_unref) + return setmetatable({ptr = ptr}, swim_member_mt) +end + -- -- When a SWIM instance is deleted or has quited, it can't be used -- anymore. This function replaces all methods of a deleted @@ -339,6 +485,78 @@ local function swim_broadcast(s, port) return true end +-- +-- Shortcut to get the self member in O(1) not making a lookup +-- into the member table. +-- +local function swim_self(s) + return swim_member_wrap(capi.swim_self(swim_check_instance(s, 'swim:self'))) +end + +-- +-- Find a member by UUID in the local member table. +-- +local function swim_member_by_uuid(s, uuid) + local func_name = 'swim:member_by_uuid' + local ptr = swim_check_instance(s, func_name) + uuid = swim_check_uuid(uuid, func_name) + local m = capi.swim_member_by_uuid(ptr, uuid) + if m == nil then + return nil + end + return swim_member_wrap(m) +end + +-- +-- Set raw payload without any preprocessing nor encoding. It can +-- be anything, not necessary MessagePack. +-- +local function swim_set_payload_raw(s, payload, payload_size) + local func_name = 'swim:set_payload_raw' + local ptr = swim_check_instance(s, func_name) + if payload_size ~= nil and type(payload_size) ~= 'number' then + return error(func_name..': expected number payload size') + end + if type(payload) == 'cdata' then + if not payload_size then + return error(func_name..': size is mandatory for cdata payload') + end + payload = ffi.cast('const char *', payload) + elseif type(payload) == 'string' then + if not payload_size then + payload_size = payload:len() + elseif payload_size > payload:len() then + return error(func_name..': explicit payload size > string length') + end + else + return error(func_name..': raw payload should be either string or '.. + 'cdata') + end + if capi.swim_set_payload(ptr, payload, payload_size) ~= 0 then + return nil, box.error.last() + end + return true +end + +-- +-- Set Lua object as a payload. It is encoded into MessagePack. +-- +local function swim_set_payload(s, payload) + local func_name = 'swim:set_payload' + local ptr = swim_check_instance(s, func_name) + local payload_size = 0 + if payload ~= nil then + local buf = buffer.IBUF_SHARED + buf:reset() + payload_size = msgpack.encode(payload, buf) + payload = buf.rpos + end + if capi.swim_set_payload(ptr, payload, payload_size) ~= 0 then + return nil, box.error.last() + end + return true +end + -- -- Normal metatable of a configured SWIM instance. -- @@ -352,6 +570,10 @@ local swim_mt = { add_member = swim_add_member, remove_member = swim_remove_member, broadcast = swim_broadcast, + self = swim_self, + member_by_uuid = swim_member_by_uuid, + set_payload_raw = swim_set_payload_raw, + set_payload = swim_set_payload, }, __serialize = swim_serialize } diff --git a/test/swim/swim.result b/test/swim/swim.result index 0ab3dafe0..4cf5c7f90 100644 --- a/test/swim/swim.result +++ b/test/swim/swim.result @@ -5,6 +5,16 @@ test_run:cmd("push filter '\\.lua.*:[0-9]+: ' to '.lua:: '") --- - true ... +test_run:cmd("push filter '127.0.0.1:[0-9]+$' to '127.0.0.1:'") +--- +- true +... +msgpack = require('msgpack') +--- +... +ffi = require('ffi') +--- +... -- -- gh-3234: SWIM gossip protocol. -- @@ -104,7 +114,7 @@ s:size() s.cfg --- - uuid: 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1:0 + uri: 127.0.0.1: ... s.cfg.gc_mode = 'off' --- @@ -119,7 +129,7 @@ s.cfg --- - gc_mode: off uuid: 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1:0 + uri: 127.0.0.1: ... s.cfg.gc_mode --- @@ -296,6 +306,18 @@ s1:cfg({uuid = uuid(3)}) --- - true ... +s1:self():uuid() +--- +- 00000000-0000-1000-8000-000000000003 +... +s1:member_by_uuid(uuid(1)) +--- +- uri: 127.0.0.1: + status: left + incarnation: 1 + uuid: 00000000-0000-1000-8000-000000000001 + payload_size: 0 +... -- Can't remove self. s1:remove_member(uuid(3)) --- @@ -366,6 +388,346 @@ s1:delete() s2:delete() --- ... +-- +-- Member API. +-- +s1 = swim.new({uuid = uuid(1), uri = uri()}) +--- +... +s = s1:self() +--- +... +s +--- +- uri: 127.0.0.1: + status: alive + incarnation: 1 + uuid: 00000000-0000-1000-8000-000000000001 + payload_size: 0 +... +s:status() +--- +- alive +... +s:uuid() +--- +- 00000000-0000-1000-8000-000000000001 +... +s:uri() +--- +- 127.0.0.1: +... +s:incarnation() +--- +- 1 +... +s:payload_cdata() +--- +- 'cdata: NULL' +- 0 +... +s:payload_str() +--- +- +... +s:payload() +--- +- +... +s:is_dropped() +--- +- false +... +s.unknown_index +--- +- null +... +s.status() +--- +- error: 'builtin/swim.lua:: member:status(): first argument is not a SWIM member' +... +s.uuid() +--- +- error: 'builtin/swim.lua:: member:uuid(): first argument is not a SWIM member' +... +s.uri() +--- +- error: 'builtin/swim.lua:: member:uri(): first argument is not a SWIM member' +... +s.incarnation() +--- +- error: 'builtin/swim.lua:: member:incarnation(): first argument is not a SWIM + member' +... +s.payload_cdata() +--- +- error: 'builtin/swim.lua:: member:payload_cdata(): first argument is not a SWIM + member' +... +s.payload_str() +--- +- error: 'builtin/swim.lua:: member:payload_str(): first argument is not a SWIM + member' +... +s.payload() +--- +- error: 'builtin/swim.lua:: member:payload(): first argument is not a SWIM member' +... +s.is_dropped() +--- +- error: 'builtin/swim.lua:: member:is_dropped(): first argument is not a SWIM + member' +... +s1:member_by_uuid(uuid(1)) ~= nil +--- +- true +... +s1:member_by_uuid(50) +--- +- error: 'builtin/swim.lua:: swim:member_by_uuid: expected string UUID' +... +s1:member_by_uuid(uuid(2)) +--- +- null +... +s1:quit() +--- +... +s:status() +--- +- left +... +s:is_dropped() +--- +- true +... +-- +-- Payload. +-- +s = swim.new({uuid = uuid(1), uri = uri()}) +--- +... +s.set_payload() +--- +- error: 'builtin/swim.lua:: swim:set_payload: first argument is not a SWIM instance' +... +s.set_payload_raw() +--- +- error: 'builtin/swim.lua:: swim:set_payload_raw: first argument is not a SWIM + instance' +... +self = s:self() +--- +... +s:set_payload() +--- +- true +... +self:payload() +--- +- +... +s:set_payload({a = 100}) +--- +- true +... +self:payload() +--- +- {'a': 100} +... +s:set_payload(100) +--- +- true +... +self:payload() +--- +- 100 +... +s:set_payload(false) +--- +- true +... +self:payload() +--- +- false +... +p = self:payload_str() +--- +... +p +--- +- !!binary wg== +... +(msgpack.decode(p)) +--- +- false +... +p, size = self:payload_cdata() +--- +... +type(p) +--- +- cdata +... +size +--- +- 1 +... +(msgpack.decode(p, size)) +--- +- false +... +s:set_payload(string.rep('a', 1500)) +--- +- null +- Payload should be <= 1200 and >= 0 +... +self:payload() +--- +- false +... +s:set_payload() +--- +- true +... +self:payload() +--- +- +... +-- Raw payload setting can be used when MessagePack is not needed, +-- or already encoded. +s:set_payload_raw(nil, '123') +--- +- error: 'swim:set_payload_raw: expected number payload size' +... +s:set_payload_raw('123', -1) +--- +- null +- Payload should be <= 1200 and >= 0 +... +size = 10 +--- +... +cdata = ffi.new('int[?]', size) +--- +... +for i = 0, size - 1 do cdata[i] = i end +--- +... +bsize = ffi.sizeof('int') * size +--- +... +s:set_payload_raw(cdata) +--- +- error: 'swim:set_payload_raw: size is mandatory for cdata payload' +... +s:set_payload_raw('str', 4) +--- +- error: 'swim:set_payload_raw: explicit payload size > string length' +... +s:set_payload_raw(true) +--- +- error: 'swim:set_payload_raw: raw payload should be either string or cdata' +... +s:set_payload_raw(cdata, bsize) +--- +- true +... +self:payload_str():len() == bsize +--- +- true +... +self_cdata, self_bsize = self:payload_cdata() +--- +... +self_bsize == bsize +--- +- true +... +self_cdata = ffi.cast('int *', self_cdata) +--- +... +for i = 0, size - 1 do assert(self_cdata[i] == cdata[i]) end +--- +... +s:set_payload_raw('raw str') +--- +- true +... +self:payload_str() +--- +- raw str +... +s:set_payload_raw('raw str', 3) +--- +- true +... +self:payload_str() +--- +- raw +... +s:delete() +--- +... +self:is_dropped() +--- +- true +... +-- +-- Check payload dissemination. +-- +s1 = swim.new({uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}) +--- +... +s2 = swim.new({uuid = uuid(2), uri = listen_port, heartbeat_rate = 0.01}) +--- +... +s1:add_member({uuid = uuid(2), uri = listen_port}) +--- +- true +... +while s2:size() ~= 2 do fiber.sleep(0.01) end +--- +... +s1_view = s2:member_by_uuid(uuid(1)) +--- +... +s1_view:payload() +--- +- +... +s1_view:incarnation() +--- +- 1 +... +s1:set_payload('payload') +--- +- true +... +while s1_view:payload() ~= 'payload' do fiber.sleep(0.01) end +--- +... +s1_view:incarnation() +--- +- 2 +... +s1:set_payload('payload2') +--- +- true +... +while s1_view:payload() ~= 'payload2' do fiber.sleep(0.01) end +--- +... +s1_view:incarnation() +--- +- 3 +... +s1:delete() +--- +... +s2:delete() +--- +... test_run:cmd("clear filter") --- - true diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua index 1e55a828a..8e7b426fe 100644 --- a/test/swim/swim.test.lua +++ b/test/swim/swim.test.lua @@ -1,5 +1,8 @@ test_run = require('test_run').new() test_run:cmd("push filter '\\.lua.*:[0-9]+: ' to '.lua:: '") +test_run:cmd("push filter '127.0.0.1:[0-9]+$' to '127.0.0.1:'") +msgpack = require('msgpack') +ffi = require('ffi') -- -- gh-3234: SWIM gossip protocol. -- @@ -99,6 +102,8 @@ s1:add_member({uri = listen_uri, uuid = uuid(2)}) s1:size() s1:cfg({uuid = uuid(3)}) +s1:self():uuid() +s1:member_by_uuid(uuid(1)) -- Can't remove self. s1:remove_member(uuid(3)) -- Not existing. @@ -125,4 +130,118 @@ s2:size() s1:delete() s2:delete() +-- +-- Member API. +-- +s1 = swim.new({uuid = uuid(1), uri = uri()}) +s = s1:self() +s +s:status() +s:uuid() +s:uri() +s:incarnation() +s:payload_cdata() +s:payload_str() +s:payload() +s:is_dropped() +s.unknown_index + +s.status() +s.uuid() +s.uri() +s.incarnation() +s.payload_cdata() +s.payload_str() +s.payload() +s.is_dropped() + +s1:member_by_uuid(uuid(1)) ~= nil +s1:member_by_uuid(50) +s1:member_by_uuid(uuid(2)) + +s1:quit() +s:status() +s:is_dropped() + +-- +-- Payload. +-- +s = swim.new({uuid = uuid(1), uri = uri()}) +s.set_payload() +s.set_payload_raw() + +self = s:self() +s:set_payload() +self:payload() +s:set_payload({a = 100}) +self:payload() +s:set_payload(100) +self:payload() +s:set_payload(false) +self:payload() + +p = self:payload_str() +p +(msgpack.decode(p)) + +p, size = self:payload_cdata() +type(p) +size +(msgpack.decode(p, size)) + +s:set_payload(string.rep('a', 1500)) +self:payload() +s:set_payload() +self:payload() + +-- Raw payload setting can be used when MessagePack is not needed, +-- or already encoded. +s:set_payload_raw(nil, '123') +s:set_payload_raw('123', -1) + +size = 10 +cdata = ffi.new('int[?]', size) +for i = 0, size - 1 do cdata[i] = i end +bsize = ffi.sizeof('int') * size +s:set_payload_raw(cdata) +s:set_payload_raw('str', 4) +s:set_payload_raw(true) + +s:set_payload_raw(cdata, bsize) +self:payload_str():len() == bsize +self_cdata, self_bsize = self:payload_cdata() +self_bsize == bsize +self_cdata = ffi.cast('int *', self_cdata) +for i = 0, size - 1 do assert(self_cdata[i] == cdata[i]) end + +s:set_payload_raw('raw str') +self:payload_str() +s:set_payload_raw('raw str', 3) +self:payload_str() + +s:delete() +self:is_dropped() + +-- +-- Check payload dissemination. +-- +s1 = swim.new({uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}) +s2 = swim.new({uuid = uuid(2), uri = listen_port, heartbeat_rate = 0.01}) +s1:add_member({uuid = uuid(2), uri = listen_port}) +while s2:size() ~= 2 do fiber.sleep(0.01) end +s1_view = s2:member_by_uuid(uuid(1)) +s1_view:payload() +s1_view:incarnation() + +s1:set_payload('payload') +while s1_view:payload() ~= 'payload' do fiber.sleep(0.01) end +s1_view:incarnation() + +s1:set_payload('payload2') +while s1_view:payload() ~= 'payload2' do fiber.sleep(0.01) end +s1_view:incarnation() + +s1:delete() +s2:delete() + test_run:cmd("clear filter") -- 2.20.1 (Apple Git-117)