[tarantool-patches] Re: [PATCH v2 0/5] SWIM on_member_update

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Jun 8 14:04:48 MSK 2019


>From 9e33c61c00eefab6866225ed7ab904f84f65a65f Mon Sep 17 00:00:00 2001
Message-Id: <9e33c61c00eefab6866225ed7ab904f84f65a65f.1559989748.git.v.shpilevoy at tarantool.org>
In-Reply-To: <cover.1559989748.git.v.shpilevoy at tarantool.org>
References: <cover.1559989748.git.v.shpilevoy at tarantool.org>
From: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
Date: Sat, 1 Jun 2019 20:40:21 +0200
Subject: [PATCH v2 5/5] swim: expose Lua triggers on member events

SWIM as a monitoring module is hard to use without an ability to subscribe
on events. Otherwise a user should have polled a SWIM
member table for updates - it would be too inefficient.

This commit exposes an ability to hang Lua triggers.

Closes #4250

@TarantoolBot document
Title: SWIM: swim:on_member_event

Now a user can hang triggers on member table updates. There is a
function for that, which can be used in one of several ways:
```Lua
swim:on_member_event(new_trigger[, ctx])
```
Add a new trigger on member table update. The function
`new_trigger` will be called on each new member appearance, an
existing member drop, and update. It should take 3 arguments:
first is an updated SWIM member, second is an events object,
third is `ctx` passed as is.

Events object has methods to help a user to determine what event
has happened.
```Lua
local function on_event(member, events, ctx)
    if events:is_new() then
        ...
    elseif events:is_drop() then
        ...
    end

    if events:is_update() then
        if events:is_new_status() then
            ...
        elseif events:is_new_uri() then
            ...
        elseif events:is_new_incarnation() then
            ...
        elseif events:is_new_payload() then
            ...
        end
    end
end

s:on_member_event(on_event, ctx)
```
Note, that multiple events can happen simultaneously. A user
should be ready to that. Additionally, 'new' and 'drop' never
happen together. But they can happen with 'update', easily.
Especially if there are lots of events, and triggers work too
slow. Then a member can be added and updated after a while, but
still does not reach a trigger.

A remarkable case is 'new' + 'new payload'. This case does not
correlate with triggers speed. The thing is that payload absence
and payload of size 0 are not the same. And sometimes is happens,
that a member is added without a payload. For example, a ping
was received - pings do not carry payload. In such a case the
missed payload is received later eventually. If that matters for
a user's application, it should be ready to that: 'new' does not
mean, that the member already has a payload, and payload size
says nothing about its presence or absence.

Second usage case:
```Lua
swim:on_member_event(nil, old_trigger)
```
Drop an old trigger.

Third usage case:
```Lua
swim:on_member_event(new_trigger, old_trigger[, ctx])
```
Replace an existing trigger in-place, with keeping its position
in the trigger list.

Fourth usage case:
```Lua
swim:on_member_event()
```
Get a list of triggers.

When drop or replace a trigger, a user should be attentive - the
following code does not work:
```Lua
tr = function() ... end
-- Add a trigger.
s:on_member_event(tr)
...
-- Drop a trigger.
s:on_member_event(nil, tr)
```
The last line, if be precise. This is because SWIM wraps user
triggers with an internal closure for parameters preprocessing.
To drop a trigger a user should save result of the first
on_member_event() call.

This code works:
```Lua
tr = function() ... end
-- Add a trigger.
tr_id = s:on_member_event(tr)
...
-- Drop a trigger.
s:on_member_event(nil, tr_id)
```

The triggers are executed one by one in a separate fiber. And
they can yield. These two facts mean that if one trigger sleeps
too long - other triggers wait. It does not block SWIM from doing
its routine operations, but block other triggers.

The last point to remember is that if a member was added and
dropped before its appearance has reached a trigger, then such
a member does not fire triggers at all. A user will not notice
that easy rider member.
---
 src/lua/swim.c          |  27 ++++-
 src/lua/swim.lua        |  90 ++++++++++++++
 test/swim/swim.result   | 263 ++++++++++++++++++++++++++++++++++++++++
 test/swim/swim.test.lua |  85 +++++++++++++
 4 files changed, 464 insertions(+), 1 deletion(-)

diff --git a/src/lua/swim.c b/src/lua/swim.c
index 3b9e229be..c3a0a9911 100644
--- a/src/lua/swim.c
+++ b/src/lua/swim.c
@@ -33,8 +33,31 @@
 #include "diag.h"
 #include "lua/utils.h"
 
+static uint32_t ctid_swim_member_ptr;
 static uint32_t ctid_swim_ptr;
 
+/** Push member event context into a Lua stack. */
+static int
+lua_swim_member_event_push(struct lua_State *L, void *event)
+{
+	struct swim_on_member_event_ctx *ctx =
+		(struct swim_on_member_event_ctx *) event;
+	*(struct swim_member **) luaL_pushcdata(L, ctid_swim_member_ptr) =
+		ctx->member;
+	lua_pushinteger(L, ctx->events);
+	return 2;
+}
+
+/** Set or/and delete a trigger on a SWIM member event. */
+static int
+lua_swim_on_member_event(struct lua_State *L)
+{
+	uint32_t ctypeid;
+	struct swim *s = *(struct swim **) luaL_checkcdata(L, 1, &ctypeid);
+	return lbox_trigger_reset(L, 3, swim_trigger_list_on_member_event(s),
+				  lua_swim_member_event_push, NULL);
+}
+
 /**
  * Create a new SWIM instance. SWIM is not created via FFI,
  * because this operation yields.
@@ -68,12 +91,14 @@ lua_swim_delete(struct lua_State *L)
 void
 tarantool_lua_swim_init(struct lua_State *L)
 {
-	luaL_cdef(L, "struct swim;");
+	luaL_cdef(L, "struct swim_member; struct swim;");
+	ctid_swim_member_ptr = luaL_ctypeid(L, "struct swim_member *");
 	ctid_swim_ptr = luaL_ctypeid(L, "struct swim *");
 
 	static const struct luaL_Reg lua_swim_internal_methods [] = {
 		{"swim_new", lua_swim_new},
 		{"swim_delete", lua_swim_delete},
+		{"swim_on_member_event", lua_swim_on_member_event},
 		{NULL, NULL}
 	};
 	luaL_register_module(L, "swim", lua_swim_internal_methods);
diff --git a/src/lua/swim.lua b/src/lua/swim.lua
index 527299284..a7d5caab3 100644
--- a/src/lua/swim.lua
+++ b/src/lua/swim.lua
@@ -24,6 +24,16 @@ ffi.cdef[[
         MEMBER_LEFT,
     };
 
+    enum swim_ev_mask {
+        SWIM_EV_NEW             = 0b00000001,
+        SWIM_EV_NEW_STATUS      = 0b00000010,
+        SWIM_EV_NEW_URI         = 0b00000100,
+        SWIM_EV_NEW_INCARNATION = 0b00001000,
+        SWIM_EV_NEW_PAYLOAD     = 0b00010000,
+        SWIM_EV_UPDATE          = 0b00011110,
+        SWIM_EV_DROP            = 0b00100000,
+    };
+
     bool
     swim_is_configured(const struct swim *swim);
 
@@ -689,6 +699,84 @@ local function swim_pairs(s)
     return swim_pairs_next, {swim = s, iterator = iterator}, nil
 end
 
+local swim_member_event_index = {
+    is_new = function(self)
+        return bit.band(self[1], capi.SWIM_EV_NEW) ~= 0
+    end,
+    is_drop = function(self)
+        return bit.band(self[1], capi.SWIM_EV_DROP) ~= 0
+    end,
+    is_update = function(self)
+        return bit.band(self[1], capi.SWIM_EV_UPDATE) ~= 0
+    end,
+    is_new_status = function(self)
+        return bit.band(self[1], capi.SWIM_EV_NEW_STATUS) ~= 0
+    end,
+    is_new_uri = function(self)
+        return bit.band(self[1], capi.SWIM_EV_NEW_URI) ~= 0
+    end,
+    is_new_incarnation = function(self)
+        return bit.band(self[1], capi.SWIM_EV_NEW_INCARNATION) ~= 0
+    end,
+    is_new_payload = function(self)
+        return bit.band(self[1], capi.SWIM_EV_NEW_PAYLOAD) ~= 0
+    end,
+}
+
+local swim_member_event_mt = {
+    __index = swim_member_event_index,
+    __serialize = function(self)
+        local res = {}
+        for k, v in pairs(swim_member_event_index) do
+            v = v(self)
+            if v then
+                res[k] = v
+            end
+        end
+        return res
+    end,
+}
+
+--
+-- Create a closure function for preprocessing raw SWIM member
+-- event trigger parameters.
+-- @param s SWIM instance.
+-- @param callback User functions to call.
+-- @param ctx An optional parameter for @a callback passed as is.
+-- @return A function to set as a trigger.
+--
+local function swim_on_member_event_new(s, callback, ctx)
+    return function(member_ptr, event_mask)
+        local m = swim_wrap_member(s, member_ptr)
+        local event = setmetatable({event_mask}, swim_member_event_mt)
+        return callback(m, event, ctx)
+    end
+end
+
+--
+-- Add or/and delete a trigger on member event. Possible usages:
+--
+-- * on_member_event(new[, ctx]) - add a new trigger. It should
+--   accept 3 arguments: an updated member, an events object, an
+--   optional @a ctx parameter passed as is.
+--
+-- * on_member_event(new, old[, ctx]) - add a new trigger @a new
+--   if not nil, in place of @a old trigger.
+--
+-- * on_member_event() - get a list of triggers.
+--
+local function swim_on_member_event(s, new, old, ctx)
+    local ptr = swim_check_instance(s, 'swim:on_member_event')
+    if type(old) ~= 'function' then
+        ctx = old
+        old = nil
+    end
+    if new ~= nil then
+        new = swim_on_member_event_new(s, new, ctx)
+    end
+    return internal.swim_on_member_event(ptr, new, old)
+end
+
 --
 -- Normal metatable of a configured SWIM instance.
 --
@@ -708,6 +796,7 @@ local swim_mt = {
         set_payload = swim_set_payload,
         set_codec = swim_set_codec,
         pairs = swim_pairs,
+        on_member_event = swim_on_member_event,
     },
     __serialize = swim_serialize
 }
@@ -800,6 +889,7 @@ local swim_not_configured_mt = {
         delete = swim_delete,
         is_configured = swim_is_configured,
         set_codec = swim_set_codec,
+        on_member_event = swim_on_member_event,
     },
     __serialize = swim_serialize
 }
diff --git a/test/swim/swim.result b/test/swim/swim.result
index 436d4e579..0196837c8 100644
--- a/test/swim/swim.result
+++ b/test/swim/swim.result
@@ -1216,6 +1216,269 @@ s1:delete()
 s2:delete()
 ---
 ...
+--
+-- gh-4250: allow to set triggers on a new member appearance, old
+-- member drop, member update.
+--
+s1 = swim.new()
+---
+...
+s1.on_member_event()
+---
+- error: 'builtin/swim.lua:<line>: swim:on_member_event: first argument is not a SWIM
+    instance'
+...
+m_list = {}
+---
+...
+e_list = {}
+---
+...
+ctx_list = {}
+---
+...
+f = nil
+---
+...
+f_need_sleep = false
+---
+...
+_ = test_run:cmd("setopt delimiter ';'")
+---
+...
+t_save_event = function(m, e, ctx)
+    table.insert(m_list, m)
+    table.insert(e_list, e)
+    table.insert(ctx_list, ctx)
+end;
+---
+...
+t_yield = function(m, e, ctx)
+    f = fiber.self()
+    t_save_event(m, e, ctx)
+    while f_need_sleep do fiber.sleep(10000) end
+end;
+---
+...
+_ = test_run:cmd("setopt delimiter ''");
+---
+...
+t_save_event_id = s1:on_member_event(t_save_event, 'ctx')
+---
+...
+-- Not equal, because SWIM wraps user triggers with a closure for
+-- context preprocessing.
+t_save_event_id ~= t_save_event
+---
+- true
+...
+s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}
+---
+- true
+...
+while #m_list < 1 do fiber.sleep(0) end
+---
+...
+m_list
+---
+- - uri: 127.0.0.1:<port>
+    status: alive
+    incarnation: 1
+    uuid: 00000000-0000-1000-8000-000000000001
+    payload_size: 0
+...
+e_list
+---
+- - is_new_payload: true
+    is_new_uri: true
+    is_new: true
+    is_update: true
+...
+ctx_list
+---
+- - ctx
+...
+m_list = {} e_list = {} ctx_list = {}
+---
+...
+t_yield_id = s1:on_member_event(t_yield, 'ctx2')
+---
+...
+f_need_sleep = true
+---
+...
+s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
+---
+...
+s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()})
+---
+- true
+...
+while s1:size() ~= 2 do fiber.sleep(0.01) end
+---
+...
+-- Only first trigger worked. Second is waiting, because first
+-- sleeps.
+m_list
+---
+- - uri: 127.0.0.1:<port>
+    status: alive
+    incarnation: 1
+    uuid: 00000000-0000-1000-8000-000000000002
+    payload_size: 0
+...
+e_list
+---
+- - is_new_payload: true
+    is_new: true
+    is_update: true
+...
+ctx_list
+---
+- - ctx2
+...
+m_list = {} e_list = {} ctx_list = {}
+---
+...
+-- But it does not prevent normal SWIM operation.
+s1:set_payload('payload')
+---
+- true
+...
+while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end
+---
+...
+s2:member_by_uuid(s1:self():uuid()):payload()
+---
+- payload
+...
+f_need_sleep = false
+---
+...
+fiber.wakeup(f)
+---
+...
+while #m_list ~= 3 do fiber.sleep(0.01) end
+---
+...
+m_list
+---
+- - uri: 127.0.0.1:<port>
+    status: alive
+    incarnation: 1
+    uuid: 00000000-0000-1000-8000-000000000002
+    payload_size: 0
+  - uri: 127.0.0.1:<port>
+    status: alive
+    incarnation: 2
+    uuid: 00000000-0000-1000-8000-000000000001
+    payload_size: 8
+  - uri: 127.0.0.1:<port>
+    status: alive
+    incarnation: 2
+    uuid: 00000000-0000-1000-8000-000000000001
+    payload_size: 8
+...
+e_list
+---
+- - is_new_payload: true
+    is_new: true
+    is_update: true
+  - is_new_payload: true
+    is_update: true
+    is_new_incarnation: true
+  - is_new_payload: true
+    is_update: true
+    is_new_incarnation: true
+...
+ctx_list
+---
+- - ctx
+  - ctx2
+  - ctx
+...
+m_list = {} e_list = {} ctx_list = {}
+---
+...
+#s1:on_member_event()
+---
+- 2
+...
+s1:on_member_event(nil, t_yield_id)
+---
+...
+s2:quit()
+---
+...
+while s1:size() ~= 1 do fiber.sleep(0.01) end
+---
+...
+-- Process event.
+fiber.sleep(0)
+---
+...
+-- Two events - status update to 'left', and 'drop'.
+m_list
+---
+- - uri: 127.0.0.1:<port>
+    status: left
+    incarnation: 1
+    uuid: 00000000-0000-1000-8000-000000000002
+    payload_size: 0
+  - uri: 127.0.0.1:<port>
+    status: left
+    incarnation: 1
+    uuid: 00000000-0000-1000-8000-000000000002
+    payload_size: 0
+...
+e_list
+---
+- - is_new_status: true
+    is_update: true
+  - is_drop: true
+...
+ctx_list
+---
+- - ctx
+  - ctx
+...
+m = m_list[1]
+---
+...
+-- Cached member table works even when a member is deleted.
+m_list[1] == m_list[2]
+---
+- true
+...
+m_list = {} e_list = {} ctx_list = {}
+---
+...
+s1:on_member_event(nil, t_save_event_id)
+---
+...
+s1:add_member({uuid = m:uuid(), uri = m:uri()})
+---
+- true
+...
+fiber.sleep(0)
+---
+...
+-- No events - all the triggers are dropped.
+m_list
+---
+- []
+...
+e_list
+---
+- []
+...
+ctx_list
+---
+- []
+...
+s1:delete()
+---
+...
 test_run:cmd("clear filter")
 ---
 - true
diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua
index a3eac9b46..458e349e0 100644
--- a/test/swim/swim.test.lua
+++ b/test/swim/swim.test.lua
@@ -413,4 +413,89 @@ s2:member_by_uuid(s1:self():uuid())
 s1:delete()
 s2:delete()
 
+--
+-- gh-4250: allow to set triggers on a new member appearance, old
+-- member drop, member update.
+--
+s1 = swim.new()
+s1.on_member_event()
+
+m_list = {}
+e_list = {}
+ctx_list = {}
+f = nil
+f_need_sleep = false
+_ = test_run:cmd("setopt delimiter ';'")
+t_save_event = function(m, e, ctx)
+    table.insert(m_list, m)
+    table.insert(e_list, e)
+    table.insert(ctx_list, ctx)
+end;
+t_yield = function(m, e, ctx)
+    f = fiber.self()
+    t_save_event(m, e, ctx)
+    while f_need_sleep do fiber.sleep(10000) end
+end;
+_ = test_run:cmd("setopt delimiter ''");
+t_save_event_id = s1:on_member_event(t_save_event, 'ctx')
+-- Not equal, because SWIM wraps user triggers with a closure for
+-- context preprocessing.
+t_save_event_id ~= t_save_event
+
+s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}
+while #m_list < 1 do fiber.sleep(0) end
+m_list
+e_list
+ctx_list
+m_list = {} e_list = {} ctx_list = {}
+
+t_yield_id = s1:on_member_event(t_yield, 'ctx2')
+f_need_sleep = true
+s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
+s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()})
+while s1:size() ~= 2 do fiber.sleep(0.01) end
+-- Only first trigger worked. Second is waiting, because first
+-- sleeps.
+m_list
+e_list
+ctx_list
+m_list = {} e_list = {} ctx_list = {}
+-- But it does not prevent normal SWIM operation.
+s1:set_payload('payload')
+while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end
+s2:member_by_uuid(s1:self():uuid()):payload()
+
+f_need_sleep = false
+fiber.wakeup(f)
+while #m_list ~= 3 do fiber.sleep(0.01) end
+m_list
+e_list
+ctx_list
+m_list = {} e_list = {} ctx_list = {}
+#s1:on_member_event()
+
+s1:on_member_event(nil, t_yield_id)
+s2:quit()
+while s1:size() ~= 1 do fiber.sleep(0.01) end
+-- Process event.
+fiber.sleep(0)
+-- Two events - status update to 'left', and 'drop'.
+m_list
+e_list
+ctx_list
+m = m_list[1]
+-- Cached member table works even when a member is deleted.
+m_list[1] == m_list[2]
+m_list = {} e_list = {} ctx_list = {}
+
+s1:on_member_event(nil, t_save_event_id)
+s1:add_member({uuid = m:uuid(), uri = m:uri()})
+fiber.sleep(0)
+-- No events - all the triggers are dropped.
+m_list
+e_list
+ctx_list
+
+s1:delete()
+
 test_run:cmd("clear filter")
-- 
2.20.1 (Apple Git-117)





More information about the Tarantool-patches mailing list