[tarantool-patches] Re: [PATCH v2 0/5] SWIM on_member_update

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Jun 8 14:06:09 MSK 2019


Sorry, freelists have problems with word 'subscribe'
be at the beginning of a line. So I sent the email manually
and moved 'subscribe' on the previous line.

On 08/06/2019 14:04, Vladislav Shpilevoy wrote:
> From 9e33c61c00eefab6866225ed7ab904f84f65a65f Mon Sep 17 00:00:00 2001
> Message-Id: <9e33c61c00eefab6866225ed7ab904f84f65a65f.1559989748.git.v.shpilevoy at tarantool.org>
> In-Reply-To: <cover.1559989748.git.v.shpilevoy at tarantool.org>
> References: <cover.1559989748.git.v.shpilevoy at tarantool.org>
> From: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
> Date: Sat, 1 Jun 2019 20:40:21 +0200
> Subject: [PATCH v2 5/5] swim: expose Lua triggers on member events
> 
> SWIM as a monitoring module is hard to use without an ability to subscribe
> on events. Otherwise a user should have polled a SWIM
> member table for updates - it would be too inefficient.
> 
> This commit exposes an ability to hang Lua triggers.
> 
> Closes #4250
> 
> @TarantoolBot document
> Title: SWIM: swim:on_member_event
> 
> Now a user can hang triggers on member table updates. There is a
> function for that, which can be used in one of several ways:
> ```Lua
> swim:on_member_event(new_trigger[, ctx])
> ```
> Add a new trigger on member table update. The function
> `new_trigger` will be called on each new member appearance, an
> existing member drop, and update. It should take 3 arguments:
> first is an updated SWIM member, second is an events object,
> third is `ctx` passed as is.
> 
> Events object has methods to help a user to determine what event
> has happened.
> ```Lua
> local function on_event(member, events, ctx)
>     if events:is_new() then
>         ...
>     elseif events:is_drop() then
>         ...
>     end
> 
>     if events:is_update() then
>         if events:is_new_status() then
>             ...
>         elseif events:is_new_uri() then
>             ...
>         elseif events:is_new_incarnation() then
>             ...
>         elseif events:is_new_payload() then
>             ...
>         end
>     end
> end
> 
> s:on_member_event(on_event, ctx)
> ```
> Note, that multiple events can happen simultaneously. A user
> should be ready to that. Additionally, 'new' and 'drop' never
> happen together. But they can happen with 'update', easily.
> Especially if there are lots of events, and triggers work too
> slow. Then a member can be added and updated after a while, but
> still does not reach a trigger.
> 
> A remarkable case is 'new' + 'new payload'. This case does not
> correlate with triggers speed. The thing is that payload absence
> and payload of size 0 are not the same. And sometimes is happens,
> that a member is added without a payload. For example, a ping
> was received - pings do not carry payload. In such a case the
> missed payload is received later eventually. If that matters for
> a user's application, it should be ready to that: 'new' does not
> mean, that the member already has a payload, and payload size
> says nothing about its presence or absence.
> 
> Second usage case:
> ```Lua
> swim:on_member_event(nil, old_trigger)
> ```
> Drop an old trigger.
> 
> Third usage case:
> ```Lua
> swim:on_member_event(new_trigger, old_trigger[, ctx])
> ```
> Replace an existing trigger in-place, with keeping its position
> in the trigger list.
> 
> Fourth usage case:
> ```Lua
> swim:on_member_event()
> ```
> Get a list of triggers.
> 
> When drop or replace a trigger, a user should be attentive - the
> following code does not work:
> ```Lua
> tr = function() ... end
> -- Add a trigger.
> s:on_member_event(tr)
> ...
> -- Drop a trigger.
> s:on_member_event(nil, tr)
> ```
> The last line, if be precise. This is because SWIM wraps user
> triggers with an internal closure for parameters preprocessing.
> To drop a trigger a user should save result of the first
> on_member_event() call.
> 
> This code works:
> ```Lua
> tr = function() ... end
> -- Add a trigger.
> tr_id = s:on_member_event(tr)
> ...
> -- Drop a trigger.
> s:on_member_event(nil, tr_id)
> ```
> 
> The triggers are executed one by one in a separate fiber. And
> they can yield. These two facts mean that if one trigger sleeps
> too long - other triggers wait. It does not block SWIM from doing
> its routine operations, but block other triggers.
> 
> The last point to remember is that if a member was added and
> dropped before its appearance has reached a trigger, then such
> a member does not fire triggers at all. A user will not notice
> that easy rider member.
> ---
>  src/lua/swim.c          |  27 ++++-
>  src/lua/swim.lua        |  90 ++++++++++++++
>  test/swim/swim.result   | 263 ++++++++++++++++++++++++++++++++++++++++
>  test/swim/swim.test.lua |  85 +++++++++++++
>  4 files changed, 464 insertions(+), 1 deletion(-)
> 
> diff --git a/src/lua/swim.c b/src/lua/swim.c
> index 3b9e229be..c3a0a9911 100644
> --- a/src/lua/swim.c
> +++ b/src/lua/swim.c
> @@ -33,8 +33,31 @@
>  #include "diag.h"
>  #include "lua/utils.h"
>  
> +static uint32_t ctid_swim_member_ptr;
>  static uint32_t ctid_swim_ptr;
>  
> +/** Push member event context into a Lua stack. */
> +static int
> +lua_swim_member_event_push(struct lua_State *L, void *event)
> +{
> +	struct swim_on_member_event_ctx *ctx =
> +		(struct swim_on_member_event_ctx *) event;
> +	*(struct swim_member **) luaL_pushcdata(L, ctid_swim_member_ptr) =
> +		ctx->member;
> +	lua_pushinteger(L, ctx->events);
> +	return 2;
> +}
> +
> +/** Set or/and delete a trigger on a SWIM member event. */
> +static int
> +lua_swim_on_member_event(struct lua_State *L)
> +{
> +	uint32_t ctypeid;
> +	struct swim *s = *(struct swim **) luaL_checkcdata(L, 1, &ctypeid);
> +	return lbox_trigger_reset(L, 3, swim_trigger_list_on_member_event(s),
> +				  lua_swim_member_event_push, NULL);
> +}
> +
>  /**
>   * Create a new SWIM instance. SWIM is not created via FFI,
>   * because this operation yields.
> @@ -68,12 +91,14 @@ lua_swim_delete(struct lua_State *L)
>  void
>  tarantool_lua_swim_init(struct lua_State *L)
>  {
> -	luaL_cdef(L, "struct swim;");
> +	luaL_cdef(L, "struct swim_member; struct swim;");
> +	ctid_swim_member_ptr = luaL_ctypeid(L, "struct swim_member *");
>  	ctid_swim_ptr = luaL_ctypeid(L, "struct swim *");
>  
>  	static const struct luaL_Reg lua_swim_internal_methods [] = {
>  		{"swim_new", lua_swim_new},
>  		{"swim_delete", lua_swim_delete},
> +		{"swim_on_member_event", lua_swim_on_member_event},
>  		{NULL, NULL}
>  	};
>  	luaL_register_module(L, "swim", lua_swim_internal_methods);
> diff --git a/src/lua/swim.lua b/src/lua/swim.lua
> index 527299284..a7d5caab3 100644
> --- a/src/lua/swim.lua
> +++ b/src/lua/swim.lua
> @@ -24,6 +24,16 @@ ffi.cdef[[
>          MEMBER_LEFT,
>      };
>  
> +    enum swim_ev_mask {
> +        SWIM_EV_NEW             = 0b00000001,
> +        SWIM_EV_NEW_STATUS      = 0b00000010,
> +        SWIM_EV_NEW_URI         = 0b00000100,
> +        SWIM_EV_NEW_INCARNATION = 0b00001000,
> +        SWIM_EV_NEW_PAYLOAD     = 0b00010000,
> +        SWIM_EV_UPDATE          = 0b00011110,
> +        SWIM_EV_DROP            = 0b00100000,
> +    };
> +
>      bool
>      swim_is_configured(const struct swim *swim);
>  
> @@ -689,6 +699,84 @@ local function swim_pairs(s)
>      return swim_pairs_next, {swim = s, iterator = iterator}, nil
>  end
>  
> +local swim_member_event_index = {
> +    is_new = function(self)
> +        return bit.band(self[1], capi.SWIM_EV_NEW) ~= 0
> +    end,
> +    is_drop = function(self)
> +        return bit.band(self[1], capi.SWIM_EV_DROP) ~= 0
> +    end,
> +    is_update = function(self)
> +        return bit.band(self[1], capi.SWIM_EV_UPDATE) ~= 0
> +    end,
> +    is_new_status = function(self)
> +        return bit.band(self[1], capi.SWIM_EV_NEW_STATUS) ~= 0
> +    end,
> +    is_new_uri = function(self)
> +        return bit.band(self[1], capi.SWIM_EV_NEW_URI) ~= 0
> +    end,
> +    is_new_incarnation = function(self)
> +        return bit.band(self[1], capi.SWIM_EV_NEW_INCARNATION) ~= 0
> +    end,
> +    is_new_payload = function(self)
> +        return bit.band(self[1], capi.SWIM_EV_NEW_PAYLOAD) ~= 0
> +    end,
> +}
> +
> +local swim_member_event_mt = {
> +    __index = swim_member_event_index,
> +    __serialize = function(self)
> +        local res = {}
> +        for k, v in pairs(swim_member_event_index) do
> +            v = v(self)
> +            if v then
> +                res[k] = v
> +            end
> +        end
> +        return res
> +    end,
> +}
> +
> +--
> +-- Create a closure function for preprocessing raw SWIM member
> +-- event trigger parameters.
> +-- @param s SWIM instance.
> +-- @param callback User functions to call.
> +-- @param ctx An optional parameter for @a callback passed as is.
> +-- @return A function to set as a trigger.
> +--
> +local function swim_on_member_event_new(s, callback, ctx)
> +    return function(member_ptr, event_mask)
> +        local m = swim_wrap_member(s, member_ptr)
> +        local event = setmetatable({event_mask}, swim_member_event_mt)
> +        return callback(m, event, ctx)
> +    end
> +end
> +
> +--
> +-- Add or/and delete a trigger on member event. Possible usages:
> +--
> +-- * on_member_event(new[, ctx]) - add a new trigger. It should
> +--   accept 3 arguments: an updated member, an events object, an
> +--   optional @a ctx parameter passed as is.
> +--
> +-- * on_member_event(new, old[, ctx]) - add a new trigger @a new
> +--   if not nil, in place of @a old trigger.
> +--
> +-- * on_member_event() - get a list of triggers.
> +--
> +local function swim_on_member_event(s, new, old, ctx)
> +    local ptr = swim_check_instance(s, 'swim:on_member_event')
> +    if type(old) ~= 'function' then
> +        ctx = old
> +        old = nil
> +    end
> +    if new ~= nil then
> +        new = swim_on_member_event_new(s, new, ctx)
> +    end
> +    return internal.swim_on_member_event(ptr, new, old)
> +end
> +
>  --
>  -- Normal metatable of a configured SWIM instance.
>  --
> @@ -708,6 +796,7 @@ local swim_mt = {
>          set_payload = swim_set_payload,
>          set_codec = swim_set_codec,
>          pairs = swim_pairs,
> +        on_member_event = swim_on_member_event,
>      },
>      __serialize = swim_serialize
>  }
> @@ -800,6 +889,7 @@ local swim_not_configured_mt = {
>          delete = swim_delete,
>          is_configured = swim_is_configured,
>          set_codec = swim_set_codec,
> +        on_member_event = swim_on_member_event,
>      },
>      __serialize = swim_serialize
>  }
> diff --git a/test/swim/swim.result b/test/swim/swim.result
> index 436d4e579..0196837c8 100644
> --- a/test/swim/swim.result
> +++ b/test/swim/swim.result
> @@ -1216,6 +1216,269 @@ s1:delete()
>  s2:delete()
>  ---
>  ...
> +--
> +-- gh-4250: allow to set triggers on a new member appearance, old
> +-- member drop, member update.
> +--
> +s1 = swim.new()
> +---
> +...
> +s1.on_member_event()
> +---
> +- error: 'builtin/swim.lua:<line>: swim:on_member_event: first argument is not a SWIM
> +    instance'
> +...
> +m_list = {}
> +---
> +...
> +e_list = {}
> +---
> +...
> +ctx_list = {}
> +---
> +...
> +f = nil
> +---
> +...
> +f_need_sleep = false
> +---
> +...
> +_ = test_run:cmd("setopt delimiter ';'")
> +---
> +...
> +t_save_event = function(m, e, ctx)
> +    table.insert(m_list, m)
> +    table.insert(e_list, e)
> +    table.insert(ctx_list, ctx)
> +end;
> +---
> +...
> +t_yield = function(m, e, ctx)
> +    f = fiber.self()
> +    t_save_event(m, e, ctx)
> +    while f_need_sleep do fiber.sleep(10000) end
> +end;
> +---
> +...
> +_ = test_run:cmd("setopt delimiter ''");
> +---
> +...
> +t_save_event_id = s1:on_member_event(t_save_event, 'ctx')
> +---
> +...
> +-- Not equal, because SWIM wraps user triggers with a closure for
> +-- context preprocessing.
> +t_save_event_id ~= t_save_event
> +---
> +- true
> +...
> +s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}
> +---
> +- true
> +...
> +while #m_list < 1 do fiber.sleep(0) end
> +---
> +...
> +m_list
> +---
> +- - uri: 127.0.0.1:<port>
> +    status: alive
> +    incarnation: 1
> +    uuid: 00000000-0000-1000-8000-000000000001
> +    payload_size: 0
> +...
> +e_list
> +---
> +- - is_new_payload: true
> +    is_new_uri: true
> +    is_new: true
> +    is_update: true
> +...
> +ctx_list
> +---
> +- - ctx
> +...
> +m_list = {} e_list = {} ctx_list = {}
> +---
> +...
> +t_yield_id = s1:on_member_event(t_yield, 'ctx2')
> +---
> +...
> +f_need_sleep = true
> +---
> +...
> +s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
> +---
> +...
> +s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()})
> +---
> +- true
> +...
> +while s1:size() ~= 2 do fiber.sleep(0.01) end
> +---
> +...
> +-- Only first trigger worked. Second is waiting, because first
> +-- sleeps.
> +m_list
> +---
> +- - uri: 127.0.0.1:<port>
> +    status: alive
> +    incarnation: 1
> +    uuid: 00000000-0000-1000-8000-000000000002
> +    payload_size: 0
> +...
> +e_list
> +---
> +- - is_new_payload: true
> +    is_new: true
> +    is_update: true
> +...
> +ctx_list
> +---
> +- - ctx2
> +...
> +m_list = {} e_list = {} ctx_list = {}
> +---
> +...
> +-- But it does not prevent normal SWIM operation.
> +s1:set_payload('payload')
> +---
> +- true
> +...
> +while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end
> +---
> +...
> +s2:member_by_uuid(s1:self():uuid()):payload()
> +---
> +- payload
> +...
> +f_need_sleep = false
> +---
> +...
> +fiber.wakeup(f)
> +---
> +...
> +while #m_list ~= 3 do fiber.sleep(0.01) end
> +---
> +...
> +m_list
> +---
> +- - uri: 127.0.0.1:<port>
> +    status: alive
> +    incarnation: 1
> +    uuid: 00000000-0000-1000-8000-000000000002
> +    payload_size: 0
> +  - uri: 127.0.0.1:<port>
> +    status: alive
> +    incarnation: 2
> +    uuid: 00000000-0000-1000-8000-000000000001
> +    payload_size: 8
> +  - uri: 127.0.0.1:<port>
> +    status: alive
> +    incarnation: 2
> +    uuid: 00000000-0000-1000-8000-000000000001
> +    payload_size: 8
> +...
> +e_list
> +---
> +- - is_new_payload: true
> +    is_new: true
> +    is_update: true
> +  - is_new_payload: true
> +    is_update: true
> +    is_new_incarnation: true
> +  - is_new_payload: true
> +    is_update: true
> +    is_new_incarnation: true
> +...
> +ctx_list
> +---
> +- - ctx
> +  - ctx2
> +  - ctx
> +...
> +m_list = {} e_list = {} ctx_list = {}
> +---
> +...
> +#s1:on_member_event()
> +---
> +- 2
> +...
> +s1:on_member_event(nil, t_yield_id)
> +---
> +...
> +s2:quit()
> +---
> +...
> +while s1:size() ~= 1 do fiber.sleep(0.01) end
> +---
> +...
> +-- Process event.
> +fiber.sleep(0)
> +---
> +...
> +-- Two events - status update to 'left', and 'drop'.
> +m_list
> +---
> +- - uri: 127.0.0.1:<port>
> +    status: left
> +    incarnation: 1
> +    uuid: 00000000-0000-1000-8000-000000000002
> +    payload_size: 0
> +  - uri: 127.0.0.1:<port>
> +    status: left
> +    incarnation: 1
> +    uuid: 00000000-0000-1000-8000-000000000002
> +    payload_size: 0
> +...
> +e_list
> +---
> +- - is_new_status: true
> +    is_update: true
> +  - is_drop: true
> +...
> +ctx_list
> +---
> +- - ctx
> +  - ctx
> +...
> +m = m_list[1]
> +---
> +...
> +-- Cached member table works even when a member is deleted.
> +m_list[1] == m_list[2]
> +---
> +- true
> +...
> +m_list = {} e_list = {} ctx_list = {}
> +---
> +...
> +s1:on_member_event(nil, t_save_event_id)
> +---
> +...
> +s1:add_member({uuid = m:uuid(), uri = m:uri()})
> +---
> +- true
> +...
> +fiber.sleep(0)
> +---
> +...
> +-- No events - all the triggers are dropped.
> +m_list
> +---
> +- []
> +...
> +e_list
> +---
> +- []
> +...
> +ctx_list
> +---
> +- []
> +...
> +s1:delete()
> +---
> +...
>  test_run:cmd("clear filter")
>  ---
>  - true
> diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua
> index a3eac9b46..458e349e0 100644
> --- a/test/swim/swim.test.lua
> +++ b/test/swim/swim.test.lua
> @@ -413,4 +413,89 @@ s2:member_by_uuid(s1:self():uuid())
>  s1:delete()
>  s2:delete()
>  
> +--
> +-- gh-4250: allow to set triggers on a new member appearance, old
> +-- member drop, member update.
> +--
> +s1 = swim.new()
> +s1.on_member_event()
> +
> +m_list = {}
> +e_list = {}
> +ctx_list = {}
> +f = nil
> +f_need_sleep = false
> +_ = test_run:cmd("setopt delimiter ';'")
> +t_save_event = function(m, e, ctx)
> +    table.insert(m_list, m)
> +    table.insert(e_list, e)
> +    table.insert(ctx_list, ctx)
> +end;
> +t_yield = function(m, e, ctx)
> +    f = fiber.self()
> +    t_save_event(m, e, ctx)
> +    while f_need_sleep do fiber.sleep(10000) end
> +end;
> +_ = test_run:cmd("setopt delimiter ''");
> +t_save_event_id = s1:on_member_event(t_save_event, 'ctx')
> +-- Not equal, because SWIM wraps user triggers with a closure for
> +-- context preprocessing.
> +t_save_event_id ~= t_save_event
> +
> +s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}
> +while #m_list < 1 do fiber.sleep(0) end
> +m_list
> +e_list
> +ctx_list
> +m_list = {} e_list = {} ctx_list = {}
> +
> +t_yield_id = s1:on_member_event(t_yield, 'ctx2')
> +f_need_sleep = true
> +s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
> +s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()})
> +while s1:size() ~= 2 do fiber.sleep(0.01) end
> +-- Only first trigger worked. Second is waiting, because first
> +-- sleeps.
> +m_list
> +e_list
> +ctx_list
> +m_list = {} e_list = {} ctx_list = {}
> +-- But it does not prevent normal SWIM operation.
> +s1:set_payload('payload')
> +while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end
> +s2:member_by_uuid(s1:self():uuid()):payload()
> +
> +f_need_sleep = false
> +fiber.wakeup(f)
> +while #m_list ~= 3 do fiber.sleep(0.01) end
> +m_list
> +e_list
> +ctx_list
> +m_list = {} e_list = {} ctx_list = {}
> +#s1:on_member_event()
> +
> +s1:on_member_event(nil, t_yield_id)
> +s2:quit()
> +while s1:size() ~= 1 do fiber.sleep(0.01) end
> +-- Process event.
> +fiber.sleep(0)
> +-- Two events - status update to 'left', and 'drop'.
> +m_list
> +e_list
> +ctx_list
> +m = m_list[1]
> +-- Cached member table works even when a member is deleted.
> +m_list[1] == m_list[2]
> +m_list = {} e_list = {} ctx_list = {}
> +
> +s1:on_member_event(nil, t_save_event_id)
> +s1:add_member({uuid = m:uuid(), uri = m:uri()})
> +fiber.sleep(0)
> +-- No events - all the triggers are dropped.
> +m_list
> +e_list
> +ctx_list
> +
> +s1:delete()
> +
>  test_run:cmd("clear filter")
> 




More information about the Tarantool-patches mailing list