From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 8211630560 for ; Sat, 1 Jun 2019 20:10:07 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id bfHEhHmr1bgV for ; Sat, 1 Jun 2019 20:10:07 -0400 (EDT) Received: from smtp53.i.mail.ru (smtp53.i.mail.ru [94.100.177.113]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 1DD412B981 for ; Sat, 1 Jun 2019 20:10:07 -0400 (EDT) Subject: [tarantool-patches] [PATCH 5/5] swim: expose Lua triggers on member update From: Vladislav Shpilevoy References: Message-ID: <0c433f21-734a-c755-110a-9b71a7172c99@tarantool.org> Date: Sun, 2 Jun 2019 02:10:03 +0200 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 7bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org SWIM as a monitoring module is hard to use without an ability to subscribe on events. Otherwise a user should have polled a SWIM member table for updates - it would be too inefficient. This commit exposes an ability to hang Lua triggers. Closes #4250 @TarantoolBot document Title: SWIM: swim:on_member_update Now a user can hang triggers on member table updates. There is a function for that, which can be used in one of several ways: ```Lua swim:on_member_update(new_trigger[, ctx]) ``` Add a new trigger on member table update. The function `new_trigger` will be called on each new member appearance, an existing member drop, and update. It should take 3 arguments: first is an updated SWIM member, second is an events object, third is `ctx` passed as is. Events object has methods to help a user to determine what update has happened. ```Lua local function on_update(member, events, ctx) if events:is_new() then ... elseif events:is_drop() then ... end if events:is_update() then if events:is_new_status() then ... elseif events:is_new_uri() then ... elseif events:is_new_incarnation() then ... elseif events:is_new_payload() then ... end end end s:on_member_update(on_update, ctx) ``` Note, that multiple events can happen simultaneously. A user should be ready to that. Additionally, 'new' and 'drop' never happen together. But they can happen with 'update', easily. Especially if there are lots of updates, and triggers work too slow. Then a member can be added and updated after a while, but still does not reach a trigger. A remarkable case is 'new' + 'new payload'. This case does not correlate with triggers speed. The thing is that payload absence and payload of size 0 are not the same. And sometimes is happens, that a member is added without a payload. For example, a ping was received - pings do not carry payload. In such a case the missed payload is received later eventually. If that matters for a user's application, it should be ready to that: 'new' does not mean, that the member already has a payload, and payload size says nothing about its presence or absence. Second usage case: ```Lua swim:on_member_update(nil, old_trigger) ``` Drop an old trigger. Third usage case: ```Lua swim:on_member_update(new_trigger, old_trigger[, ctx]) ``` Replace an existing trigger in-place, with keeping its position in the trigger list. Fourth usage case: ```Lua swim:on_member_update() ``` Get a list of triggers. When drop or replace a trigger, a user should be attentive - the following code does not work: ```Lua tr = function() ... end -- Add a trigger. s:on_member_update(tr) ... -- Drop a trigger. s:on_member_update(nil, tr) ``` The last line, if be precise. This is because SWIM wraps user triggers with an internal closure for parameters preprocessing. To drop a trigger a user should save result of the first on_member_update() call. This code works: ```Lua tr = function() ... end -- Add a trigger. tr_id = s:on_member_update(tr) ... -- Drop a trigger. s:on_member_update(nil, tr_id) ``` The triggers are executed one by one in a separate fiber. And they can yield. These two facts mean that if one trigger sleeps too long - other triggers wait. It does not block SWIM from doing its routine operations, but block other triggers. The last point to remember is that if a member was added and dropped before its appearance has reached a trigger, then such a member does not fire triggers at all. A user will not notice that easy rider member. --- src/lua/swim.c | 27 ++++- src/lua/swim.lua | 90 ++++++++++++++ test/swim/swim.result | 263 ++++++++++++++++++++++++++++++++++++++++ test/swim/swim.test.lua | 85 +++++++++++++ 4 files changed, 464 insertions(+), 1 deletion(-) diff --git a/src/lua/swim.c b/src/lua/swim.c index 3b9e229be..17441e58c 100644 --- a/src/lua/swim.c +++ b/src/lua/swim.c @@ -33,8 +33,31 @@ #include "diag.h" #include "lua/utils.h" +static uint32_t ctid_swim_member_ptr; static uint32_t ctid_swim_ptr; +/** Push member update context into a Lua stack. */ +static int +lua_swim_on_member_update_push(struct lua_State *L, void *event) +{ + struct swim_on_member_update_ctx *ctx = + (struct swim_on_member_update_ctx *) event; + *(struct swim_member **) luaL_pushcdata(L, ctid_swim_member_ptr) = + ctx->member; + lua_pushinteger(L, ctx->events); + return 2; +} + +/** Hang or/and delete a trigger on a SWIM member update. */ +static int +lua_swim_on_member_update(struct lua_State *L) +{ + uint32_t ctypeid; + struct swim *s = *(struct swim **) luaL_checkcdata(L, 1, &ctypeid); + return lbox_trigger_reset(L, 3, swim_trigger_list_on_member_update(s), + lua_swim_on_member_update_push, NULL); +} + /** * Create a new SWIM instance. SWIM is not created via FFI, * because this operation yields. @@ -68,12 +91,14 @@ lua_swim_delete(struct lua_State *L) void tarantool_lua_swim_init(struct lua_State *L) { - luaL_cdef(L, "struct swim;"); + luaL_cdef(L, "struct swim_member; struct swim;"); + ctid_swim_member_ptr = luaL_ctypeid(L, "struct swim_member *"); ctid_swim_ptr = luaL_ctypeid(L, "struct swim *"); static const struct luaL_Reg lua_swim_internal_methods [] = { {"swim_new", lua_swim_new}, {"swim_delete", lua_swim_delete}, + {"swim_on_member_update", lua_swim_on_member_update}, {NULL, NULL} }; luaL_register_module(L, "swim", lua_swim_internal_methods); diff --git a/src/lua/swim.lua b/src/lua/swim.lua index 527299284..9b8ff1c29 100644 --- a/src/lua/swim.lua +++ b/src/lua/swim.lua @@ -24,6 +24,16 @@ ffi.cdef[[ MEMBER_LEFT, }; + enum swim_ev_mask { + SWIM_EV_NEW = 0b00000001, + SWIM_EV_NEW_STATUS = 0b00000010, + SWIM_EV_NEW_URI = 0b00000100, + SWIM_EV_NEW_INCARNATION = 0b00001000, + SWIM_EV_NEW_PAYLOAD = 0b00010000, + SWIM_EV_UPDATE = 0b00011110, + SWIM_EV_DROP = 0b00100000, + }; + bool swim_is_configured(const struct swim *swim); @@ -689,6 +699,84 @@ local function swim_pairs(s) return swim_pairs_next, {swim = s, iterator = iterator}, nil end +local swim_on_member_update_index = { + is_new = function(self) + return bit.band(self[1], capi.SWIM_EV_NEW) ~= 0 + end, + is_drop = function(self) + return bit.band(self[1], capi.SWIM_EV_DROP) ~= 0 + end, + is_update = function(self) + return bit.band(self[1], capi.SWIM_EV_UPDATE) ~= 0 + end, + is_new_status = function(self) + return bit.band(self[1], capi.SWIM_EV_NEW_STATUS) ~= 0 + end, + is_new_uri = function(self) + return bit.band(self[1], capi.SWIM_EV_NEW_URI) ~= 0 + end, + is_new_incarnation = function(self) + return bit.band(self[1], capi.SWIM_EV_NEW_INCARNATION) ~= 0 + end, + is_new_payload = function(self) + return bit.band(self[1], capi.SWIM_EV_NEW_PAYLOAD) ~= 0 + end, +} + +local swim_on_member_update_event_mt = { + __index = swim_on_member_update_index, + __serialize = function(self) + local res = {} + for k, v in pairs(swim_on_member_update_index) do + v = v(self) + if v then + res[k] = v + end + end + return res + end, +} + +-- +-- Create a closure function for preprocessing raw SWIM member +-- update trigger parameters. +-- @param s SWIM instance. +-- @param callback User functions to call. +-- @param ctx An optional parameter for @a callback passed as is. +-- @return A function to set as a trigger. +-- +local function swim_on_member_update_new(s, callback, ctx) + return function(member_ptr, event_mask) + local m = swim_wrap_member(s, member_ptr) + local event = setmetatable({event_mask}, swim_on_member_update_event_mt) + return callback(m, event, ctx) + end +end + +-- +-- Add or/and delete a trigger on member update. Possible usages: +-- +-- * on_member_update(new[, ctx]) - add a new trigger. It should +-- accept 3 arguments: an updated member, an events object, an +-- optional @a ctx parameter passed as is. +-- +-- * on_member_update(new, old[, ctx]) - add a new trigger @a new +-- if not nil, in place of @a old trigger. +-- +-- * on_member_update() - get a list of triggers. +-- +local function swim_on_member_update(s, new, old, ctx) + local ptr = swim_check_instance(s, 'swim:on_member_update') + if type(old) ~= 'function' then + ctx = old + old = nil + end + if new ~= nil then + new = swim_on_member_update_new(s, new, ctx) + end + return internal.swim_on_member_update(ptr, new, old) +end + -- -- Normal metatable of a configured SWIM instance. -- @@ -708,6 +796,7 @@ local swim_mt = { set_payload = swim_set_payload, set_codec = swim_set_codec, pairs = swim_pairs, + on_member_update = swim_on_member_update, }, __serialize = swim_serialize } @@ -800,6 +889,7 @@ local swim_not_configured_mt = { delete = swim_delete, is_configured = swim_is_configured, set_codec = swim_set_codec, + on_member_update = swim_on_member_update, }, __serialize = swim_serialize } diff --git a/test/swim/swim.result b/test/swim/swim.result index 436d4e579..1de903b04 100644 --- a/test/swim/swim.result +++ b/test/swim/swim.result @@ -1216,6 +1216,269 @@ s1:delete() s2:delete() --- ... +-- +-- gh-4250: allow to set triggers on a new member appearance, old +-- member drop, member update. +-- +s1 = swim.new() +--- +... +s1.on_member_update() +--- +- error: 'builtin/swim.lua:: swim:on_member_update: first argument is not a SWIM + instance' +... +m_list = {} +--- +... +e_list = {} +--- +... +ctx_list = {} +--- +... +f = nil +--- +... +f_need_sleep = false +--- +... +_ = test_run:cmd("setopt delimiter ';'") +--- +... +t_save_event = function(m, e, ctx) + table.insert(m_list, m) + table.insert(e_list, e) + table.insert(ctx_list, ctx) +end; +--- +... +t_yield = function(m, e, ctx) + f = fiber.self() + t_save_event(m, e, ctx) + while f_need_sleep do fiber.sleep(10000) end +end; +--- +... +_ = test_run:cmd("setopt delimiter ''"); +--- +... +t_save_event_id = s1:on_member_update(t_save_event, 'ctx') +--- +... +-- Not equal, because SWIM wraps user triggers with a closure for +-- context preprocessing. +t_save_event_id ~= t_save_event +--- +- true +... +s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01} +--- +- true +... +while #m_list < 1 do fiber.sleep(0) end +--- +... +m_list +--- +- - uri: 127.0.0.1: + status: alive + incarnation: 1 + uuid: 00000000-0000-1000-8000-000000000001 + payload_size: 0 +... +e_list +--- +- - is_new_payload: true + is_new_uri: true + is_new: true + is_update: true +... +ctx_list +--- +- - ctx +... +m_list = {} e_list = {} ctx_list = {} +--- +... +t_yield_id = s1:on_member_update(t_yield, 'ctx2') +--- +... +f_need_sleep = true +--- +... +s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01}) +--- +... +s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()}) +--- +- true +... +while s1:size() ~= 2 do fiber.sleep(0.01) end +--- +... +-- Only first trigger worked. Second is waiting, because first +-- sleeps. +m_list +--- +- - uri: 127.0.0.1: + status: alive + incarnation: 1 + uuid: 00000000-0000-1000-8000-000000000002 + payload_size: 0 +... +e_list +--- +- - is_new_payload: true + is_new: true + is_update: true +... +ctx_list +--- +- - ctx2 +... +m_list = {} e_list = {} ctx_list = {} +--- +... +-- But it does not prevent normal SWIM operation. +s1:set_payload('payload') +--- +- true +... +while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end +--- +... +s2:member_by_uuid(s1:self():uuid()):payload() +--- +- payload +... +f_need_sleep = false +--- +... +fiber.wakeup(f) +--- +... +while #m_list ~= 3 do fiber.sleep(0.01) end +--- +... +m_list +--- +- - uri: 127.0.0.1: + status: alive + incarnation: 1 + uuid: 00000000-0000-1000-8000-000000000002 + payload_size: 0 + - uri: 127.0.0.1: + status: alive + incarnation: 2 + uuid: 00000000-0000-1000-8000-000000000001 + payload_size: 8 + - uri: 127.0.0.1: + status: alive + incarnation: 2 + uuid: 00000000-0000-1000-8000-000000000001 + payload_size: 8 +... +e_list +--- +- - is_new_payload: true + is_new: true + is_update: true + - is_new_payload: true + is_update: true + is_new_incarnation: true + - is_new_payload: true + is_update: true + is_new_incarnation: true +... +ctx_list +--- +- - ctx + - ctx2 + - ctx +... +m_list = {} e_list = {} ctx_list = {} +--- +... +#s1:on_member_update() +--- +- 2 +... +s1:on_member_update(nil, t_yield_id) +--- +... +s2:quit() +--- +... +while s1:size() ~= 1 do fiber.sleep(0.01) end +--- +... +-- Process update. +fiber.sleep(0) +--- +... +-- Two events - status update to 'left', and 'drop'. +m_list +--- +- - uri: 127.0.0.1: + status: left + incarnation: 1 + uuid: 00000000-0000-1000-8000-000000000002 + payload_size: 0 + - uri: 127.0.0.1: + status: left + incarnation: 1 + uuid: 00000000-0000-1000-8000-000000000002 + payload_size: 0 +... +e_list +--- +- - is_new_status: true + is_update: true + - is_drop: true +... +ctx_list +--- +- - ctx + - ctx +... +m = m_list[1] +--- +... +-- Cached member table works even when a member is deleted. +m_list[1] == m_list[2] +--- +- true +... +m_list = {} e_list = {} ctx_list = {} +--- +... +s1:on_member_update(nil, t_save_event_id) +--- +... +s1:add_member({uuid = m:uuid(), uri = m:uri()}) +--- +- true +... +fiber.sleep(0) +--- +... +-- No updates - all the triggers are dropped. +m_list +--- +- [] +... +e_list +--- +- [] +... +ctx_list +--- +- [] +... +s1:delete() +--- +... test_run:cmd("clear filter") --- - true diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua index a3eac9b46..25eb5a7d0 100644 --- a/test/swim/swim.test.lua +++ b/test/swim/swim.test.lua @@ -413,4 +413,89 @@ s2:member_by_uuid(s1:self():uuid()) s1:delete() s2:delete() +-- +-- gh-4250: allow to set triggers on a new member appearance, old +-- member drop, member update. +-- +s1 = swim.new() +s1.on_member_update() + +m_list = {} +e_list = {} +ctx_list = {} +f = nil +f_need_sleep = false +_ = test_run:cmd("setopt delimiter ';'") +t_save_event = function(m, e, ctx) + table.insert(m_list, m) + table.insert(e_list, e) + table.insert(ctx_list, ctx) +end; +t_yield = function(m, e, ctx) + f = fiber.self() + t_save_event(m, e, ctx) + while f_need_sleep do fiber.sleep(10000) end +end; +_ = test_run:cmd("setopt delimiter ''"); +t_save_event_id = s1:on_member_update(t_save_event, 'ctx') +-- Not equal, because SWIM wraps user triggers with a closure for +-- context preprocessing. +t_save_event_id ~= t_save_event + +s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01} +while #m_list < 1 do fiber.sleep(0) end +m_list +e_list +ctx_list +m_list = {} e_list = {} ctx_list = {} + +t_yield_id = s1:on_member_update(t_yield, 'ctx2') +f_need_sleep = true +s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01}) +s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()}) +while s1:size() ~= 2 do fiber.sleep(0.01) end +-- Only first trigger worked. Second is waiting, because first +-- sleeps. +m_list +e_list +ctx_list +m_list = {} e_list = {} ctx_list = {} +-- But it does not prevent normal SWIM operation. +s1:set_payload('payload') +while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end +s2:member_by_uuid(s1:self():uuid()):payload() + +f_need_sleep = false +fiber.wakeup(f) +while #m_list ~= 3 do fiber.sleep(0.01) end +m_list +e_list +ctx_list +m_list = {} e_list = {} ctx_list = {} +#s1:on_member_update() + +s1:on_member_update(nil, t_yield_id) +s2:quit() +while s1:size() ~= 1 do fiber.sleep(0.01) end +-- Process update. +fiber.sleep(0) +-- Two events - status update to 'left', and 'drop'. +m_list +e_list +ctx_list +m = m_list[1] +-- Cached member table works even when a member is deleted. +m_list[1] == m_list[2] +m_list = {} e_list = {} ctx_list = {} + +s1:on_member_update(nil, t_save_event_id) +s1:add_member({uuid = m:uuid(), uri = m:uri()}) +fiber.sleep(0) +-- No updates - all the triggers are dropped. +m_list +e_list +ctx_list + +s1:delete() + test_run:cmd("clear filter") -- 2.20.1 (Apple Git-117)