[tarantool-patches] Re: [PATCH v2 0/5] SWIM on_member_update
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Sat Jun 8 14:06:09 MSK 2019
Sorry, freelists have problems with word 'subscribe'
be at the beginning of a line. So I sent the email manually
and moved 'subscribe' on the previous line.
On 08/06/2019 14:04, Vladislav Shpilevoy wrote:
> From 9e33c61c00eefab6866225ed7ab904f84f65a65f Mon Sep 17 00:00:00 2001
> Message-Id: <9e33c61c00eefab6866225ed7ab904f84f65a65f.1559989748.git.v.shpilevoy at tarantool.org>
> In-Reply-To: <cover.1559989748.git.v.shpilevoy at tarantool.org>
> References: <cover.1559989748.git.v.shpilevoy at tarantool.org>
> From: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
> Date: Sat, 1 Jun 2019 20:40:21 +0200
> Subject: [PATCH v2 5/5] swim: expose Lua triggers on member events
>
> SWIM as a monitoring module is hard to use without an ability to subscribe
> on events. Otherwise a user should have polled a SWIM
> member table for updates - it would be too inefficient.
>
> This commit exposes an ability to hang Lua triggers.
>
> Closes #4250
>
> @TarantoolBot document
> Title: SWIM: swim:on_member_event
>
> Now a user can hang triggers on member table updates. There is a
> function for that, which can be used in one of several ways:
> ```Lua
> swim:on_member_event(new_trigger[, ctx])
> ```
> Add a new trigger on member table update. The function
> `new_trigger` will be called on each new member appearance, an
> existing member drop, and update. It should take 3 arguments:
> first is an updated SWIM member, second is an events object,
> third is `ctx` passed as is.
>
> Events object has methods to help a user to determine what event
> has happened.
> ```Lua
> local function on_event(member, events, ctx)
> if events:is_new() then
> ...
> elseif events:is_drop() then
> ...
> end
>
> if events:is_update() then
> if events:is_new_status() then
> ...
> elseif events:is_new_uri() then
> ...
> elseif events:is_new_incarnation() then
> ...
> elseif events:is_new_payload() then
> ...
> end
> end
> end
>
> s:on_member_event(on_event, ctx)
> ```
> Note, that multiple events can happen simultaneously. A user
> should be ready to that. Additionally, 'new' and 'drop' never
> happen together. But they can happen with 'update', easily.
> Especially if there are lots of events, and triggers work too
> slow. Then a member can be added and updated after a while, but
> still does not reach a trigger.
>
> A remarkable case is 'new' + 'new payload'. This case does not
> correlate with triggers speed. The thing is that payload absence
> and payload of size 0 are not the same. And sometimes is happens,
> that a member is added without a payload. For example, a ping
> was received - pings do not carry payload. In such a case the
> missed payload is received later eventually. If that matters for
> a user's application, it should be ready to that: 'new' does not
> mean, that the member already has a payload, and payload size
> says nothing about its presence or absence.
>
> Second usage case:
> ```Lua
> swim:on_member_event(nil, old_trigger)
> ```
> Drop an old trigger.
>
> Third usage case:
> ```Lua
> swim:on_member_event(new_trigger, old_trigger[, ctx])
> ```
> Replace an existing trigger in-place, with keeping its position
> in the trigger list.
>
> Fourth usage case:
> ```Lua
> swim:on_member_event()
> ```
> Get a list of triggers.
>
> When drop or replace a trigger, a user should be attentive - the
> following code does not work:
> ```Lua
> tr = function() ... end
> -- Add a trigger.
> s:on_member_event(tr)
> ...
> -- Drop a trigger.
> s:on_member_event(nil, tr)
> ```
> The last line, if be precise. This is because SWIM wraps user
> triggers with an internal closure for parameters preprocessing.
> To drop a trigger a user should save result of the first
> on_member_event() call.
>
> This code works:
> ```Lua
> tr = function() ... end
> -- Add a trigger.
> tr_id = s:on_member_event(tr)
> ...
> -- Drop a trigger.
> s:on_member_event(nil, tr_id)
> ```
>
> The triggers are executed one by one in a separate fiber. And
> they can yield. These two facts mean that if one trigger sleeps
> too long - other triggers wait. It does not block SWIM from doing
> its routine operations, but block other triggers.
>
> The last point to remember is that if a member was added and
> dropped before its appearance has reached a trigger, then such
> a member does not fire triggers at all. A user will not notice
> that easy rider member.
> ---
> src/lua/swim.c | 27 ++++-
> src/lua/swim.lua | 90 ++++++++++++++
> test/swim/swim.result | 263 ++++++++++++++++++++++++++++++++++++++++
> test/swim/swim.test.lua | 85 +++++++++++++
> 4 files changed, 464 insertions(+), 1 deletion(-)
>
> diff --git a/src/lua/swim.c b/src/lua/swim.c
> index 3b9e229be..c3a0a9911 100644
> --- a/src/lua/swim.c
> +++ b/src/lua/swim.c
> @@ -33,8 +33,31 @@
> #include "diag.h"
> #include "lua/utils.h"
>
> +static uint32_t ctid_swim_member_ptr;
> static uint32_t ctid_swim_ptr;
>
> +/** Push member event context into a Lua stack. */
> +static int
> +lua_swim_member_event_push(struct lua_State *L, void *event)
> +{
> + struct swim_on_member_event_ctx *ctx =
> + (struct swim_on_member_event_ctx *) event;
> + *(struct swim_member **) luaL_pushcdata(L, ctid_swim_member_ptr) =
> + ctx->member;
> + lua_pushinteger(L, ctx->events);
> + return 2;
> +}
> +
> +/** Set or/and delete a trigger on a SWIM member event. */
> +static int
> +lua_swim_on_member_event(struct lua_State *L)
> +{
> + uint32_t ctypeid;
> + struct swim *s = *(struct swim **) luaL_checkcdata(L, 1, &ctypeid);
> + return lbox_trigger_reset(L, 3, swim_trigger_list_on_member_event(s),
> + lua_swim_member_event_push, NULL);
> +}
> +
> /**
> * Create a new SWIM instance. SWIM is not created via FFI,
> * because this operation yields.
> @@ -68,12 +91,14 @@ lua_swim_delete(struct lua_State *L)
> void
> tarantool_lua_swim_init(struct lua_State *L)
> {
> - luaL_cdef(L, "struct swim;");
> + luaL_cdef(L, "struct swim_member; struct swim;");
> + ctid_swim_member_ptr = luaL_ctypeid(L, "struct swim_member *");
> ctid_swim_ptr = luaL_ctypeid(L, "struct swim *");
>
> static const struct luaL_Reg lua_swim_internal_methods [] = {
> {"swim_new", lua_swim_new},
> {"swim_delete", lua_swim_delete},
> + {"swim_on_member_event", lua_swim_on_member_event},
> {NULL, NULL}
> };
> luaL_register_module(L, "swim", lua_swim_internal_methods);
> diff --git a/src/lua/swim.lua b/src/lua/swim.lua
> index 527299284..a7d5caab3 100644
> --- a/src/lua/swim.lua
> +++ b/src/lua/swim.lua
> @@ -24,6 +24,16 @@ ffi.cdef[[
> MEMBER_LEFT,
> };
>
> + enum swim_ev_mask {
> + SWIM_EV_NEW = 0b00000001,
> + SWIM_EV_NEW_STATUS = 0b00000010,
> + SWIM_EV_NEW_URI = 0b00000100,
> + SWIM_EV_NEW_INCARNATION = 0b00001000,
> + SWIM_EV_NEW_PAYLOAD = 0b00010000,
> + SWIM_EV_UPDATE = 0b00011110,
> + SWIM_EV_DROP = 0b00100000,
> + };
> +
> bool
> swim_is_configured(const struct swim *swim);
>
> @@ -689,6 +699,84 @@ local function swim_pairs(s)
> return swim_pairs_next, {swim = s, iterator = iterator}, nil
> end
>
> +local swim_member_event_index = {
> + is_new = function(self)
> + return bit.band(self[1], capi.SWIM_EV_NEW) ~= 0
> + end,
> + is_drop = function(self)
> + return bit.band(self[1], capi.SWIM_EV_DROP) ~= 0
> + end,
> + is_update = function(self)
> + return bit.band(self[1], capi.SWIM_EV_UPDATE) ~= 0
> + end,
> + is_new_status = function(self)
> + return bit.band(self[1], capi.SWIM_EV_NEW_STATUS) ~= 0
> + end,
> + is_new_uri = function(self)
> + return bit.band(self[1], capi.SWIM_EV_NEW_URI) ~= 0
> + end,
> + is_new_incarnation = function(self)
> + return bit.band(self[1], capi.SWIM_EV_NEW_INCARNATION) ~= 0
> + end,
> + is_new_payload = function(self)
> + return bit.band(self[1], capi.SWIM_EV_NEW_PAYLOAD) ~= 0
> + end,
> +}
> +
> +local swim_member_event_mt = {
> + __index = swim_member_event_index,
> + __serialize = function(self)
> + local res = {}
> + for k, v in pairs(swim_member_event_index) do
> + v = v(self)
> + if v then
> + res[k] = v
> + end
> + end
> + return res
> + end,
> +}
> +
> +--
> +-- Create a closure function for preprocessing raw SWIM member
> +-- event trigger parameters.
> +-- @param s SWIM instance.
> +-- @param callback User functions to call.
> +-- @param ctx An optional parameter for @a callback passed as is.
> +-- @return A function to set as a trigger.
> +--
> +local function swim_on_member_event_new(s, callback, ctx)
> + return function(member_ptr, event_mask)
> + local m = swim_wrap_member(s, member_ptr)
> + local event = setmetatable({event_mask}, swim_member_event_mt)
> + return callback(m, event, ctx)
> + end
> +end
> +
> +--
> +-- Add or/and delete a trigger on member event. Possible usages:
> +--
> +-- * on_member_event(new[, ctx]) - add a new trigger. It should
> +-- accept 3 arguments: an updated member, an events object, an
> +-- optional @a ctx parameter passed as is.
> +--
> +-- * on_member_event(new, old[, ctx]) - add a new trigger @a new
> +-- if not nil, in place of @a old trigger.
> +--
> +-- * on_member_event() - get a list of triggers.
> +--
> +local function swim_on_member_event(s, new, old, ctx)
> + local ptr = swim_check_instance(s, 'swim:on_member_event')
> + if type(old) ~= 'function' then
> + ctx = old
> + old = nil
> + end
> + if new ~= nil then
> + new = swim_on_member_event_new(s, new, ctx)
> + end
> + return internal.swim_on_member_event(ptr, new, old)
> +end
> +
> --
> -- Normal metatable of a configured SWIM instance.
> --
> @@ -708,6 +796,7 @@ local swim_mt = {
> set_payload = swim_set_payload,
> set_codec = swim_set_codec,
> pairs = swim_pairs,
> + on_member_event = swim_on_member_event,
> },
> __serialize = swim_serialize
> }
> @@ -800,6 +889,7 @@ local swim_not_configured_mt = {
> delete = swim_delete,
> is_configured = swim_is_configured,
> set_codec = swim_set_codec,
> + on_member_event = swim_on_member_event,
> },
> __serialize = swim_serialize
> }
> diff --git a/test/swim/swim.result b/test/swim/swim.result
> index 436d4e579..0196837c8 100644
> --- a/test/swim/swim.result
> +++ b/test/swim/swim.result
> @@ -1216,6 +1216,269 @@ s1:delete()
> s2:delete()
> ---
> ...
> +--
> +-- gh-4250: allow to set triggers on a new member appearance, old
> +-- member drop, member update.
> +--
> +s1 = swim.new()
> +---
> +...
> +s1.on_member_event()
> +---
> +- error: 'builtin/swim.lua:<line>: swim:on_member_event: first argument is not a SWIM
> + instance'
> +...
> +m_list = {}
> +---
> +...
> +e_list = {}
> +---
> +...
> +ctx_list = {}
> +---
> +...
> +f = nil
> +---
> +...
> +f_need_sleep = false
> +---
> +...
> +_ = test_run:cmd("setopt delimiter ';'")
> +---
> +...
> +t_save_event = function(m, e, ctx)
> + table.insert(m_list, m)
> + table.insert(e_list, e)
> + table.insert(ctx_list, ctx)
> +end;
> +---
> +...
> +t_yield = function(m, e, ctx)
> + f = fiber.self()
> + t_save_event(m, e, ctx)
> + while f_need_sleep do fiber.sleep(10000) end
> +end;
> +---
> +...
> +_ = test_run:cmd("setopt delimiter ''");
> +---
> +...
> +t_save_event_id = s1:on_member_event(t_save_event, 'ctx')
> +---
> +...
> +-- Not equal, because SWIM wraps user triggers with a closure for
> +-- context preprocessing.
> +t_save_event_id ~= t_save_event
> +---
> +- true
> +...
> +s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}
> +---
> +- true
> +...
> +while #m_list < 1 do fiber.sleep(0) end
> +---
> +...
> +m_list
> +---
> +- - uri: 127.0.0.1:<port>
> + status: alive
> + incarnation: 1
> + uuid: 00000000-0000-1000-8000-000000000001
> + payload_size: 0
> +...
> +e_list
> +---
> +- - is_new_payload: true
> + is_new_uri: true
> + is_new: true
> + is_update: true
> +...
> +ctx_list
> +---
> +- - ctx
> +...
> +m_list = {} e_list = {} ctx_list = {}
> +---
> +...
> +t_yield_id = s1:on_member_event(t_yield, 'ctx2')
> +---
> +...
> +f_need_sleep = true
> +---
> +...
> +s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
> +---
> +...
> +s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()})
> +---
> +- true
> +...
> +while s1:size() ~= 2 do fiber.sleep(0.01) end
> +---
> +...
> +-- Only first trigger worked. Second is waiting, because first
> +-- sleeps.
> +m_list
> +---
> +- - uri: 127.0.0.1:<port>
> + status: alive
> + incarnation: 1
> + uuid: 00000000-0000-1000-8000-000000000002
> + payload_size: 0
> +...
> +e_list
> +---
> +- - is_new_payload: true
> + is_new: true
> + is_update: true
> +...
> +ctx_list
> +---
> +- - ctx2
> +...
> +m_list = {} e_list = {} ctx_list = {}
> +---
> +...
> +-- But it does not prevent normal SWIM operation.
> +s1:set_payload('payload')
> +---
> +- true
> +...
> +while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end
> +---
> +...
> +s2:member_by_uuid(s1:self():uuid()):payload()
> +---
> +- payload
> +...
> +f_need_sleep = false
> +---
> +...
> +fiber.wakeup(f)
> +---
> +...
> +while #m_list ~= 3 do fiber.sleep(0.01) end
> +---
> +...
> +m_list
> +---
> +- - uri: 127.0.0.1:<port>
> + status: alive
> + incarnation: 1
> + uuid: 00000000-0000-1000-8000-000000000002
> + payload_size: 0
> + - uri: 127.0.0.1:<port>
> + status: alive
> + incarnation: 2
> + uuid: 00000000-0000-1000-8000-000000000001
> + payload_size: 8
> + - uri: 127.0.0.1:<port>
> + status: alive
> + incarnation: 2
> + uuid: 00000000-0000-1000-8000-000000000001
> + payload_size: 8
> +...
> +e_list
> +---
> +- - is_new_payload: true
> + is_new: true
> + is_update: true
> + - is_new_payload: true
> + is_update: true
> + is_new_incarnation: true
> + - is_new_payload: true
> + is_update: true
> + is_new_incarnation: true
> +...
> +ctx_list
> +---
> +- - ctx
> + - ctx2
> + - ctx
> +...
> +m_list = {} e_list = {} ctx_list = {}
> +---
> +...
> +#s1:on_member_event()
> +---
> +- 2
> +...
> +s1:on_member_event(nil, t_yield_id)
> +---
> +...
> +s2:quit()
> +---
> +...
> +while s1:size() ~= 1 do fiber.sleep(0.01) end
> +---
> +...
> +-- Process event.
> +fiber.sleep(0)
> +---
> +...
> +-- Two events - status update to 'left', and 'drop'.
> +m_list
> +---
> +- - uri: 127.0.0.1:<port>
> + status: left
> + incarnation: 1
> + uuid: 00000000-0000-1000-8000-000000000002
> + payload_size: 0
> + - uri: 127.0.0.1:<port>
> + status: left
> + incarnation: 1
> + uuid: 00000000-0000-1000-8000-000000000002
> + payload_size: 0
> +...
> +e_list
> +---
> +- - is_new_status: true
> + is_update: true
> + - is_drop: true
> +...
> +ctx_list
> +---
> +- - ctx
> + - ctx
> +...
> +m = m_list[1]
> +---
> +...
> +-- Cached member table works even when a member is deleted.
> +m_list[1] == m_list[2]
> +---
> +- true
> +...
> +m_list = {} e_list = {} ctx_list = {}
> +---
> +...
> +s1:on_member_event(nil, t_save_event_id)
> +---
> +...
> +s1:add_member({uuid = m:uuid(), uri = m:uri()})
> +---
> +- true
> +...
> +fiber.sleep(0)
> +---
> +...
> +-- No events - all the triggers are dropped.
> +m_list
> +---
> +- []
> +...
> +e_list
> +---
> +- []
> +...
> +ctx_list
> +---
> +- []
> +...
> +s1:delete()
> +---
> +...
> test_run:cmd("clear filter")
> ---
> - true
> diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua
> index a3eac9b46..458e349e0 100644
> --- a/test/swim/swim.test.lua
> +++ b/test/swim/swim.test.lua
> @@ -413,4 +413,89 @@ s2:member_by_uuid(s1:self():uuid())
> s1:delete()
> s2:delete()
>
> +--
> +-- gh-4250: allow to set triggers on a new member appearance, old
> +-- member drop, member update.
> +--
> +s1 = swim.new()
> +s1.on_member_event()
> +
> +m_list = {}
> +e_list = {}
> +ctx_list = {}
> +f = nil
> +f_need_sleep = false
> +_ = test_run:cmd("setopt delimiter ';'")
> +t_save_event = function(m, e, ctx)
> + table.insert(m_list, m)
> + table.insert(e_list, e)
> + table.insert(ctx_list, ctx)
> +end;
> +t_yield = function(m, e, ctx)
> + f = fiber.self()
> + t_save_event(m, e, ctx)
> + while f_need_sleep do fiber.sleep(10000) end
> +end;
> +_ = test_run:cmd("setopt delimiter ''");
> +t_save_event_id = s1:on_member_event(t_save_event, 'ctx')
> +-- Not equal, because SWIM wraps user triggers with a closure for
> +-- context preprocessing.
> +t_save_event_id ~= t_save_event
> +
> +s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}
> +while #m_list < 1 do fiber.sleep(0) end
> +m_list
> +e_list
> +ctx_list
> +m_list = {} e_list = {} ctx_list = {}
> +
> +t_yield_id = s1:on_member_event(t_yield, 'ctx2')
> +f_need_sleep = true
> +s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
> +s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()})
> +while s1:size() ~= 2 do fiber.sleep(0.01) end
> +-- Only first trigger worked. Second is waiting, because first
> +-- sleeps.
> +m_list
> +e_list
> +ctx_list
> +m_list = {} e_list = {} ctx_list = {}
> +-- But it does not prevent normal SWIM operation.
> +s1:set_payload('payload')
> +while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end
> +s2:member_by_uuid(s1:self():uuid()):payload()
> +
> +f_need_sleep = false
> +fiber.wakeup(f)
> +while #m_list ~= 3 do fiber.sleep(0.01) end
> +m_list
> +e_list
> +ctx_list
> +m_list = {} e_list = {} ctx_list = {}
> +#s1:on_member_event()
> +
> +s1:on_member_event(nil, t_yield_id)
> +s2:quit()
> +while s1:size() ~= 1 do fiber.sleep(0.01) end
> +-- Process event.
> +fiber.sleep(0)
> +-- Two events - status update to 'left', and 'drop'.
> +m_list
> +e_list
> +ctx_list
> +m = m_list[1]
> +-- Cached member table works even when a member is deleted.
> +m_list[1] == m_list[2]
> +m_list = {} e_list = {} ctx_list = {}
> +
> +s1:on_member_event(nil, t_save_event_id)
> +s1:add_member({uuid = m:uuid(), uri = m:uri()})
> +fiber.sleep(0)
> +-- No events - all the triggers are dropped.
> +m_list
> +e_list
> +ctx_list
> +
> +s1:delete()
> +
> test_run:cmd("clear filter")
>
More information about the Tarantool-patches
mailing list