From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 0B77A30D84 for ; Sat, 8 Jun 2019 07:06:08 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id kxGcSD4NRN5p for ; Sat, 8 Jun 2019 07:06:07 -0400 (EDT) Received: from smtp47.i.mail.ru (smtp47.i.mail.ru [94.100.177.107]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 539A330D49 for ; Sat, 8 Jun 2019 07:06:07 -0400 (EDT) Subject: [tarantool-patches] Re: [PATCH v2 0/5] SWIM on_member_update From: Vladislav Shpilevoy References: <66bb16b1-ddc6-c98b-4c8c-5688bf473755@tarantool.org> Message-ID: <5330faec-d09e-02ff-83ff-d30ec8f34b4b@tarantool.org> Date: Sat, 8 Jun 2019 13:06:09 +0200 MIME-Version: 1.0 In-Reply-To: <66bb16b1-ddc6-c98b-4c8c-5688bf473755@tarantool.org> Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 7bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org Sorry, freelists have problems with word 'subscribe' be at the beginning of a line. So I sent the email manually and moved 'subscribe' on the previous line. On 08/06/2019 14:04, Vladislav Shpilevoy wrote: > From 9e33c61c00eefab6866225ed7ab904f84f65a65f Mon Sep 17 00:00:00 2001 > Message-Id: <9e33c61c00eefab6866225ed7ab904f84f65a65f.1559989748.git.v.shpilevoy@tarantool.org> > In-Reply-To: > References: > From: Vladislav Shpilevoy > Date: Sat, 1 Jun 2019 20:40:21 +0200 > Subject: [PATCH v2 5/5] swim: expose Lua triggers on member events > > SWIM as a monitoring module is hard to use without an ability to subscribe > on events. Otherwise a user should have polled a SWIM > member table for updates - it would be too inefficient. > > This commit exposes an ability to hang Lua triggers. > > Closes #4250 > > @TarantoolBot document > Title: SWIM: swim:on_member_event > > Now a user can hang triggers on member table updates. There is a > function for that, which can be used in one of several ways: > ```Lua > swim:on_member_event(new_trigger[, ctx]) > ``` > Add a new trigger on member table update. The function > `new_trigger` will be called on each new member appearance, an > existing member drop, and update. It should take 3 arguments: > first is an updated SWIM member, second is an events object, > third is `ctx` passed as is. > > Events object has methods to help a user to determine what event > has happened. > ```Lua > local function on_event(member, events, ctx) > if events:is_new() then > ... > elseif events:is_drop() then > ... > end > > if events:is_update() then > if events:is_new_status() then > ... > elseif events:is_new_uri() then > ... > elseif events:is_new_incarnation() then > ... > elseif events:is_new_payload() then > ... > end > end > end > > s:on_member_event(on_event, ctx) > ``` > Note, that multiple events can happen simultaneously. A user > should be ready to that. Additionally, 'new' and 'drop' never > happen together. But they can happen with 'update', easily. > Especially if there are lots of events, and triggers work too > slow. Then a member can be added and updated after a while, but > still does not reach a trigger. > > A remarkable case is 'new' + 'new payload'. This case does not > correlate with triggers speed. The thing is that payload absence > and payload of size 0 are not the same. And sometimes is happens, > that a member is added without a payload. For example, a ping > was received - pings do not carry payload. In such a case the > missed payload is received later eventually. If that matters for > a user's application, it should be ready to that: 'new' does not > mean, that the member already has a payload, and payload size > says nothing about its presence or absence. > > Second usage case: > ```Lua > swim:on_member_event(nil, old_trigger) > ``` > Drop an old trigger. > > Third usage case: > ```Lua > swim:on_member_event(new_trigger, old_trigger[, ctx]) > ``` > Replace an existing trigger in-place, with keeping its position > in the trigger list. > > Fourth usage case: > ```Lua > swim:on_member_event() > ``` > Get a list of triggers. > > When drop or replace a trigger, a user should be attentive - the > following code does not work: > ```Lua > tr = function() ... end > -- Add a trigger. > s:on_member_event(tr) > ... > -- Drop a trigger. > s:on_member_event(nil, tr) > ``` > The last line, if be precise. This is because SWIM wraps user > triggers with an internal closure for parameters preprocessing. > To drop a trigger a user should save result of the first > on_member_event() call. > > This code works: > ```Lua > tr = function() ... end > -- Add a trigger. > tr_id = s:on_member_event(tr) > ... > -- Drop a trigger. > s:on_member_event(nil, tr_id) > ``` > > The triggers are executed one by one in a separate fiber. And > they can yield. These two facts mean that if one trigger sleeps > too long - other triggers wait. It does not block SWIM from doing > its routine operations, but block other triggers. > > The last point to remember is that if a member was added and > dropped before its appearance has reached a trigger, then such > a member does not fire triggers at all. A user will not notice > that easy rider member. > --- > src/lua/swim.c | 27 ++++- > src/lua/swim.lua | 90 ++++++++++++++ > test/swim/swim.result | 263 ++++++++++++++++++++++++++++++++++++++++ > test/swim/swim.test.lua | 85 +++++++++++++ > 4 files changed, 464 insertions(+), 1 deletion(-) > > diff --git a/src/lua/swim.c b/src/lua/swim.c > index 3b9e229be..c3a0a9911 100644 > --- a/src/lua/swim.c > +++ b/src/lua/swim.c > @@ -33,8 +33,31 @@ > #include "diag.h" > #include "lua/utils.h" > > +static uint32_t ctid_swim_member_ptr; > static uint32_t ctid_swim_ptr; > > +/** Push member event context into a Lua stack. */ > +static int > +lua_swim_member_event_push(struct lua_State *L, void *event) > +{ > + struct swim_on_member_event_ctx *ctx = > + (struct swim_on_member_event_ctx *) event; > + *(struct swim_member **) luaL_pushcdata(L, ctid_swim_member_ptr) = > + ctx->member; > + lua_pushinteger(L, ctx->events); > + return 2; > +} > + > +/** Set or/and delete a trigger on a SWIM member event. */ > +static int > +lua_swim_on_member_event(struct lua_State *L) > +{ > + uint32_t ctypeid; > + struct swim *s = *(struct swim **) luaL_checkcdata(L, 1, &ctypeid); > + return lbox_trigger_reset(L, 3, swim_trigger_list_on_member_event(s), > + lua_swim_member_event_push, NULL); > +} > + > /** > * Create a new SWIM instance. SWIM is not created via FFI, > * because this operation yields. > @@ -68,12 +91,14 @@ lua_swim_delete(struct lua_State *L) > void > tarantool_lua_swim_init(struct lua_State *L) > { > - luaL_cdef(L, "struct swim;"); > + luaL_cdef(L, "struct swim_member; struct swim;"); > + ctid_swim_member_ptr = luaL_ctypeid(L, "struct swim_member *"); > ctid_swim_ptr = luaL_ctypeid(L, "struct swim *"); > > static const struct luaL_Reg lua_swim_internal_methods [] = { > {"swim_new", lua_swim_new}, > {"swim_delete", lua_swim_delete}, > + {"swim_on_member_event", lua_swim_on_member_event}, > {NULL, NULL} > }; > luaL_register_module(L, "swim", lua_swim_internal_methods); > diff --git a/src/lua/swim.lua b/src/lua/swim.lua > index 527299284..a7d5caab3 100644 > --- a/src/lua/swim.lua > +++ b/src/lua/swim.lua > @@ -24,6 +24,16 @@ ffi.cdef[[ > MEMBER_LEFT, > }; > > + enum swim_ev_mask { > + SWIM_EV_NEW = 0b00000001, > + SWIM_EV_NEW_STATUS = 0b00000010, > + SWIM_EV_NEW_URI = 0b00000100, > + SWIM_EV_NEW_INCARNATION = 0b00001000, > + SWIM_EV_NEW_PAYLOAD = 0b00010000, > + SWIM_EV_UPDATE = 0b00011110, > + SWIM_EV_DROP = 0b00100000, > + }; > + > bool > swim_is_configured(const struct swim *swim); > > @@ -689,6 +699,84 @@ local function swim_pairs(s) > return swim_pairs_next, {swim = s, iterator = iterator}, nil > end > > +local swim_member_event_index = { > + is_new = function(self) > + return bit.band(self[1], capi.SWIM_EV_NEW) ~= 0 > + end, > + is_drop = function(self) > + return bit.band(self[1], capi.SWIM_EV_DROP) ~= 0 > + end, > + is_update = function(self) > + return bit.band(self[1], capi.SWIM_EV_UPDATE) ~= 0 > + end, > + is_new_status = function(self) > + return bit.band(self[1], capi.SWIM_EV_NEW_STATUS) ~= 0 > + end, > + is_new_uri = function(self) > + return bit.band(self[1], capi.SWIM_EV_NEW_URI) ~= 0 > + end, > + is_new_incarnation = function(self) > + return bit.band(self[1], capi.SWIM_EV_NEW_INCARNATION) ~= 0 > + end, > + is_new_payload = function(self) > + return bit.band(self[1], capi.SWIM_EV_NEW_PAYLOAD) ~= 0 > + end, > +} > + > +local swim_member_event_mt = { > + __index = swim_member_event_index, > + __serialize = function(self) > + local res = {} > + for k, v in pairs(swim_member_event_index) do > + v = v(self) > + if v then > + res[k] = v > + end > + end > + return res > + end, > +} > + > +-- > +-- Create a closure function for preprocessing raw SWIM member > +-- event trigger parameters. > +-- @param s SWIM instance. > +-- @param callback User functions to call. > +-- @param ctx An optional parameter for @a callback passed as is. > +-- @return A function to set as a trigger. > +-- > +local function swim_on_member_event_new(s, callback, ctx) > + return function(member_ptr, event_mask) > + local m = swim_wrap_member(s, member_ptr) > + local event = setmetatable({event_mask}, swim_member_event_mt) > + return callback(m, event, ctx) > + end > +end > + > +-- > +-- Add or/and delete a trigger on member event. Possible usages: > +-- > +-- * on_member_event(new[, ctx]) - add a new trigger. It should > +-- accept 3 arguments: an updated member, an events object, an > +-- optional @a ctx parameter passed as is. > +-- > +-- * on_member_event(new, old[, ctx]) - add a new trigger @a new > +-- if not nil, in place of @a old trigger. > +-- > +-- * on_member_event() - get a list of triggers. > +-- > +local function swim_on_member_event(s, new, old, ctx) > + local ptr = swim_check_instance(s, 'swim:on_member_event') > + if type(old) ~= 'function' then > + ctx = old > + old = nil > + end > + if new ~= nil then > + new = swim_on_member_event_new(s, new, ctx) > + end > + return internal.swim_on_member_event(ptr, new, old) > +end > + > -- > -- Normal metatable of a configured SWIM instance. > -- > @@ -708,6 +796,7 @@ local swim_mt = { > set_payload = swim_set_payload, > set_codec = swim_set_codec, > pairs = swim_pairs, > + on_member_event = swim_on_member_event, > }, > __serialize = swim_serialize > } > @@ -800,6 +889,7 @@ local swim_not_configured_mt = { > delete = swim_delete, > is_configured = swim_is_configured, > set_codec = swim_set_codec, > + on_member_event = swim_on_member_event, > }, > __serialize = swim_serialize > } > diff --git a/test/swim/swim.result b/test/swim/swim.result > index 436d4e579..0196837c8 100644 > --- a/test/swim/swim.result > +++ b/test/swim/swim.result > @@ -1216,6 +1216,269 @@ s1:delete() > s2:delete() > --- > ... > +-- > +-- gh-4250: allow to set triggers on a new member appearance, old > +-- member drop, member update. > +-- > +s1 = swim.new() > +--- > +... > +s1.on_member_event() > +--- > +- error: 'builtin/swim.lua:: swim:on_member_event: first argument is not a SWIM > + instance' > +... > +m_list = {} > +--- > +... > +e_list = {} > +--- > +... > +ctx_list = {} > +--- > +... > +f = nil > +--- > +... > +f_need_sleep = false > +--- > +... > +_ = test_run:cmd("setopt delimiter ';'") > +--- > +... > +t_save_event = function(m, e, ctx) > + table.insert(m_list, m) > + table.insert(e_list, e) > + table.insert(ctx_list, ctx) > +end; > +--- > +... > +t_yield = function(m, e, ctx) > + f = fiber.self() > + t_save_event(m, e, ctx) > + while f_need_sleep do fiber.sleep(10000) end > +end; > +--- > +... > +_ = test_run:cmd("setopt delimiter ''"); > +--- > +... > +t_save_event_id = s1:on_member_event(t_save_event, 'ctx') > +--- > +... > +-- Not equal, because SWIM wraps user triggers with a closure for > +-- context preprocessing. > +t_save_event_id ~= t_save_event > +--- > +- true > +... > +s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01} > +--- > +- true > +... > +while #m_list < 1 do fiber.sleep(0) end > +--- > +... > +m_list > +--- > +- - uri: 127.0.0.1: > + status: alive > + incarnation: 1 > + uuid: 00000000-0000-1000-8000-000000000001 > + payload_size: 0 > +... > +e_list > +--- > +- - is_new_payload: true > + is_new_uri: true > + is_new: true > + is_update: true > +... > +ctx_list > +--- > +- - ctx > +... > +m_list = {} e_list = {} ctx_list = {} > +--- > +... > +t_yield_id = s1:on_member_event(t_yield, 'ctx2') > +--- > +... > +f_need_sleep = true > +--- > +... > +s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01}) > +--- > +... > +s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()}) > +--- > +- true > +... > +while s1:size() ~= 2 do fiber.sleep(0.01) end > +--- > +... > +-- Only first trigger worked. Second is waiting, because first > +-- sleeps. > +m_list > +--- > +- - uri: 127.0.0.1: > + status: alive > + incarnation: 1 > + uuid: 00000000-0000-1000-8000-000000000002 > + payload_size: 0 > +... > +e_list > +--- > +- - is_new_payload: true > + is_new: true > + is_update: true > +... > +ctx_list > +--- > +- - ctx2 > +... > +m_list = {} e_list = {} ctx_list = {} > +--- > +... > +-- But it does not prevent normal SWIM operation. > +s1:set_payload('payload') > +--- > +- true > +... > +while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end > +--- > +... > +s2:member_by_uuid(s1:self():uuid()):payload() > +--- > +- payload > +... > +f_need_sleep = false > +--- > +... > +fiber.wakeup(f) > +--- > +... > +while #m_list ~= 3 do fiber.sleep(0.01) end > +--- > +... > +m_list > +--- > +- - uri: 127.0.0.1: > + status: alive > + incarnation: 1 > + uuid: 00000000-0000-1000-8000-000000000002 > + payload_size: 0 > + - uri: 127.0.0.1: > + status: alive > + incarnation: 2 > + uuid: 00000000-0000-1000-8000-000000000001 > + payload_size: 8 > + - uri: 127.0.0.1: > + status: alive > + incarnation: 2 > + uuid: 00000000-0000-1000-8000-000000000001 > + payload_size: 8 > +... > +e_list > +--- > +- - is_new_payload: true > + is_new: true > + is_update: true > + - is_new_payload: true > + is_update: true > + is_new_incarnation: true > + - is_new_payload: true > + is_update: true > + is_new_incarnation: true > +... > +ctx_list > +--- > +- - ctx > + - ctx2 > + - ctx > +... > +m_list = {} e_list = {} ctx_list = {} > +--- > +... > +#s1:on_member_event() > +--- > +- 2 > +... > +s1:on_member_event(nil, t_yield_id) > +--- > +... > +s2:quit() > +--- > +... > +while s1:size() ~= 1 do fiber.sleep(0.01) end > +--- > +... > +-- Process event. > +fiber.sleep(0) > +--- > +... > +-- Two events - status update to 'left', and 'drop'. > +m_list > +--- > +- - uri: 127.0.0.1: > + status: left > + incarnation: 1 > + uuid: 00000000-0000-1000-8000-000000000002 > + payload_size: 0 > + - uri: 127.0.0.1: > + status: left > + incarnation: 1 > + uuid: 00000000-0000-1000-8000-000000000002 > + payload_size: 0 > +... > +e_list > +--- > +- - is_new_status: true > + is_update: true > + - is_drop: true > +... > +ctx_list > +--- > +- - ctx > + - ctx > +... > +m = m_list[1] > +--- > +... > +-- Cached member table works even when a member is deleted. > +m_list[1] == m_list[2] > +--- > +- true > +... > +m_list = {} e_list = {} ctx_list = {} > +--- > +... > +s1:on_member_event(nil, t_save_event_id) > +--- > +... > +s1:add_member({uuid = m:uuid(), uri = m:uri()}) > +--- > +- true > +... > +fiber.sleep(0) > +--- > +... > +-- No events - all the triggers are dropped. > +m_list > +--- > +- [] > +... > +e_list > +--- > +- [] > +... > +ctx_list > +--- > +- [] > +... > +s1:delete() > +--- > +... > test_run:cmd("clear filter") > --- > - true > diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua > index a3eac9b46..458e349e0 100644 > --- a/test/swim/swim.test.lua > +++ b/test/swim/swim.test.lua > @@ -413,4 +413,89 @@ s2:member_by_uuid(s1:self():uuid()) > s1:delete() > s2:delete() > > +-- > +-- gh-4250: allow to set triggers on a new member appearance, old > +-- member drop, member update. > +-- > +s1 = swim.new() > +s1.on_member_event() > + > +m_list = {} > +e_list = {} > +ctx_list = {} > +f = nil > +f_need_sleep = false > +_ = test_run:cmd("setopt delimiter ';'") > +t_save_event = function(m, e, ctx) > + table.insert(m_list, m) > + table.insert(e_list, e) > + table.insert(ctx_list, ctx) > +end; > +t_yield = function(m, e, ctx) > + f = fiber.self() > + t_save_event(m, e, ctx) > + while f_need_sleep do fiber.sleep(10000) end > +end; > +_ = test_run:cmd("setopt delimiter ''"); > +t_save_event_id = s1:on_member_event(t_save_event, 'ctx') > +-- Not equal, because SWIM wraps user triggers with a closure for > +-- context preprocessing. > +t_save_event_id ~= t_save_event > + > +s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01} > +while #m_list < 1 do fiber.sleep(0) end > +m_list > +e_list > +ctx_list > +m_list = {} e_list = {} ctx_list = {} > + > +t_yield_id = s1:on_member_event(t_yield, 'ctx2') > +f_need_sleep = true > +s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01}) > +s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()}) > +while s1:size() ~= 2 do fiber.sleep(0.01) end > +-- Only first trigger worked. Second is waiting, because first > +-- sleeps. > +m_list > +e_list > +ctx_list > +m_list = {} e_list = {} ctx_list = {} > +-- But it does not prevent normal SWIM operation. > +s1:set_payload('payload') > +while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end > +s2:member_by_uuid(s1:self():uuid()):payload() > + > +f_need_sleep = false > +fiber.wakeup(f) > +while #m_list ~= 3 do fiber.sleep(0.01) end > +m_list > +e_list > +ctx_list > +m_list = {} e_list = {} ctx_list = {} > +#s1:on_member_event() > + > +s1:on_member_event(nil, t_yield_id) > +s2:quit() > +while s1:size() ~= 1 do fiber.sleep(0.01) end > +-- Process event. > +fiber.sleep(0) > +-- Two events - status update to 'left', and 'drop'. > +m_list > +e_list > +ctx_list > +m = m_list[1] > +-- Cached member table works even when a member is deleted. > +m_list[1] == m_list[2] > +m_list = {} e_list = {} ctx_list = {} > + > +s1:on_member_event(nil, t_save_event_id) > +s1:add_member({uuid = m:uuid(), uri = m:uri()}) > +fiber.sleep(0) > +-- No events - all the triggers are dropped. > +m_list > +e_list > +ctx_list > + > +s1:delete() > + > test_run:cmd("clear filter") >